This patch introduces buffered I/O to rrdcached. Now, rrdcached can
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* buffered IO */
120   char *rbuf;
121   off_t next_cmd;
122   off_t next_read;
123
124   char *wbuf;
125   ssize_t wbuf_len;
126 };
127 typedef struct listen_socket_s listen_socket_t;
128
129 struct cache_item_s;
130 typedef struct cache_item_s cache_item_t;
131 struct cache_item_s
132 {
133   char *file;
134   char **values;
135   int values_num;
136   time_t last_flush_time;
137 #define CI_FLAGS_IN_TREE  (1<<0)
138 #define CI_FLAGS_IN_QUEUE (1<<1)
139   int flags;
140   pthread_cond_t  flushed;
141   cache_item_t *prev;
142   cache_item_t *next;
143 };
144
145 struct callback_flush_data_s
146 {
147   time_t now;
148   time_t abs_timeout;
149   char **keys;
150   size_t keys_num;
151 };
152 typedef struct callback_flush_data_s callback_flush_data_t;
153
154 enum queue_side_e
155 {
156   HEAD,
157   TAIL
158 };
159 typedef enum queue_side_e queue_side_t;
160
161 /* max length of socket command or response */
162 #define CMD_MAX 4096
163 #define RBUF_SIZE (CMD_MAX*2)
164
165 /*
166  * Variables
167  */
168 static int stay_foreground = 0;
169
170 static listen_socket_t *listen_fds = NULL;
171 static size_t listen_fds_num = 0;
172
173 static int do_shutdown = 0;
174
175 static pthread_t queue_thread;
176
177 static pthread_t *connection_threads = NULL;
178 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
179 static int connection_threads_num = 0;
180
181 /* Cache stuff */
182 static GTree          *cache_tree = NULL;
183 static cache_item_t   *cache_queue_head = NULL;
184 static cache_item_t   *cache_queue_tail = NULL;
185 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
186 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
187
188 static int config_write_interval = 300;
189 static int config_write_jitter   = 0;
190 static int config_flush_interval = 3600;
191 static int config_flush_at_shutdown = 0;
192 static char *config_pid_file = NULL;
193 static char *config_base_dir = NULL;
194 static size_t _config_base_dir_len = 0;
195 static int config_write_base_only = 0;
196
197 static listen_socket_t **config_listen_address_list = NULL;
198 static int config_listen_address_list_len = 0;
199
200 static uint64_t stats_queue_length = 0;
201 static uint64_t stats_updates_received = 0;
202 static uint64_t stats_flush_received = 0;
203 static uint64_t stats_updates_written = 0;
204 static uint64_t stats_data_sets_written = 0;
205 static uint64_t stats_journal_bytes = 0;
206 static uint64_t stats_journal_rotate = 0;
207 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
208
209 /* Journaled updates */
210 static char *journal_cur = NULL;
211 static char *journal_old = NULL;
212 static FILE *journal_fh = NULL;
213 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
214 static int journal_write(char *cmd, char *args);
215 static void journal_done(void);
216 static void journal_rotate(void);
217
218 /* 
219  * Functions
220  */
221 static void sig_common (const char *sig) /* {{{ */
222 {
223   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
224   do_shutdown++;
225   pthread_cond_broadcast(&cache_cond);
226 } /* }}} void sig_common */
227
228 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
229 {
230   sig_common("INT");
231 } /* }}} void sig_int_handler */
232
233 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   sig_common("TERM");
236 } /* }}} void sig_term_handler */
237
238 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
239 {
240   config_flush_at_shutdown = 1;
241   sig_common("USR1");
242 } /* }}} void sig_usr1_handler */
243
244 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 0;
247   sig_common("USR2");
248 } /* }}} void sig_usr2_handler */
249
250 static void install_signal_handlers(void) /* {{{ */
251 {
252   /* These structures are static, because `sigaction' behaves weird if the are
253    * overwritten.. */
254   static struct sigaction sa_int;
255   static struct sigaction sa_term;
256   static struct sigaction sa_pipe;
257   static struct sigaction sa_usr1;
258   static struct sigaction sa_usr2;
259
260   /* Install signal handlers */
261   memset (&sa_int, 0, sizeof (sa_int));
262   sa_int.sa_handler = sig_int_handler;
263   sigaction (SIGINT, &sa_int, NULL);
264
265   memset (&sa_term, 0, sizeof (sa_term));
266   sa_term.sa_handler = sig_term_handler;
267   sigaction (SIGTERM, &sa_term, NULL);
268
269   memset (&sa_pipe, 0, sizeof (sa_pipe));
270   sa_pipe.sa_handler = SIG_IGN;
271   sigaction (SIGPIPE, &sa_pipe, NULL);
272
273   memset (&sa_pipe, 0, sizeof (sa_usr1));
274   sa_usr1.sa_handler = sig_usr1_handler;
275   sigaction (SIGUSR1, &sa_usr1, NULL);
276
277   memset (&sa_usr2, 0, sizeof (sa_usr2));
278   sa_usr2.sa_handler = sig_usr2_handler;
279   sigaction (SIGUSR2, &sa_usr2, NULL);
280
281 } /* }}} void install_signal_handlers */
282
283 static int open_pidfile(void) /* {{{ */
284 {
285   int fd;
286   char *file;
287
288   file = (config_pid_file != NULL)
289     ? config_pid_file
290     : LOCALSTATEDIR "/run/rrdcached.pid";
291
292   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
293   if (fd < 0)
294     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
295             file, rrd_strerror(errno));
296
297   return(fd);
298 } /* }}} static int open_pidfile */
299
300 static int write_pidfile (int fd) /* {{{ */
301 {
302   pid_t pid;
303   FILE *fh;
304
305   pid = getpid ();
306
307   fh = fdopen (fd, "w");
308   if (fh == NULL)
309   {
310     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
311     close(fd);
312     return (-1);
313   }
314
315   fprintf (fh, "%i\n", (int) pid);
316   fclose (fh);
317
318   return (0);
319 } /* }}} int write_pidfile */
320
321 static int remove_pidfile (void) /* {{{ */
322 {
323   char *file;
324   int status;
325
326   file = (config_pid_file != NULL)
327     ? config_pid_file
328     : LOCALSTATEDIR "/run/rrdcached.pid";
329
330   status = unlink (file);
331   if (status == 0)
332     return (0);
333   return (errno);
334 } /* }}} int remove_pidfile */
335
336 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
337 {
338   char *eol;
339
340   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
341                sock->next_read - sock->next_cmd);
342
343   if (eol == NULL)
344   {
345     /* no commands left, move remainder back to front of rbuf */
346     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
347             sock->next_read - sock->next_cmd);
348     sock->next_read -= sock->next_cmd;
349     sock->next_cmd = 0;
350     *len = 0;
351     return NULL;
352   }
353   else
354   {
355     char *cmd = sock->rbuf + sock->next_cmd;
356     *eol = '\0';
357
358     sock->next_cmd = eol - sock->rbuf + 1;
359
360     if (eol > sock->rbuf && *(eol-1) == '\r')
361       *(--eol) = '\0'; /* handle "\r\n" EOL */
362
363     *len = eol - cmd;
364
365     return cmd;
366   }
367
368   /* NOTREACHED */
369   assert(1==0);
370 }
371
372 /* add the characters directly to the write buffer */
373 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
374 {
375   char *new_buf;
376
377   assert(sock != NULL);
378
379   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
380   if (new_buf == NULL)
381   {
382     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
383     return -1;
384   }
385
386   strncpy(new_buf + sock->wbuf_len, str, len + 1);
387
388   sock->wbuf = new_buf;
389   sock->wbuf_len += len;
390
391   return 0;
392 } /* }}} static int add_to_wbuf */
393
394 /* add the text to the "extra" info that's sent after the status line */
395 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
396 {
397   va_list argp;
398   char buffer[CMD_MAX];
399   int len;
400
401   if (sock == NULL) return 0; /* journal replay mode */
402
403   va_start(argp, fmt);
404 #ifdef HAVE_VSNPRINTF
405   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
406 #else
407   len = vsprintf(buffer, fmt, argp);
408 #endif
409   va_end(argp);
410   if (len < 0)
411   {
412     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
413     return -1;
414   }
415
416   return add_to_wbuf(sock, buffer, len);
417 } /* }}} static int add_response_info */
418
419 static int count_lines(char *str) /* {{{ */
420 {
421   int lines = 0;
422
423   if (str != NULL)
424   {
425     while ((str = strchr(str, '\n')) != NULL)
426     {
427       ++lines;
428       ++str;
429     }
430   }
431
432   return lines;
433 } /* }}} static int count_lines */
434
435 /* send the response back to the user.
436  * returns 0 on success, -1 on error
437  * write buffer is always zeroed after this call */
438 static int send_response (listen_socket_t *sock, response_code rc,
439                           char *fmt, ...) /* {{{ */
440 {
441   va_list argp;
442   char buffer[CMD_MAX];
443   int lines;
444   ssize_t wrote;
445   int rclen, len;
446
447   if (sock == NULL) return rc;  /* journal replay mode */
448
449   if (rc == RESP_OK)
450   {
451     lines = count_lines(sock->wbuf);
452   }
453   else
454     lines = -1;
455
456   rclen = sprintf(buffer, "%d ", lines);
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
460 #else
461   len = vsprintf(buffer+rclen, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465     return -1;
466
467   len += rclen;
468
469   /* first write must be complete */
470   if (len != write(sock->fd, buffer, len))
471   {
472     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
473     return -1;
474   }
475
476   if (sock->wbuf != NULL)
477   {
478     wrote = 0;
479     while (wrote < sock->wbuf_len)
480     {
481       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
482       if (wb <= 0)
483       {
484         RRDD_LOG(LOG_INFO, "send_response: could not write results");
485         return -1;
486       }
487       wrote += wb;
488     }
489   }
490
491   free(sock->wbuf); sock->wbuf = NULL;
492   sock->wbuf_len = 0;
493
494   return 0;
495 } /* }}} */
496
497 static void wipe_ci_values(cache_item_t *ci, time_t when)
498 {
499   ci->values = NULL;
500   ci->values_num = 0;
501
502   ci->last_flush_time = when;
503   if (config_write_jitter > 0)
504     ci->last_flush_time += (random() % config_write_jitter);
505 }
506
507 /* remove_from_queue
508  * remove a "cache_item_t" item from the queue.
509  * must hold 'cache_lock' when calling this
510  */
511 static void remove_from_queue(cache_item_t *ci) /* {{{ */
512 {
513   if (ci == NULL) return;
514
515   if (ci->prev == NULL)
516     cache_queue_head = ci->next; /* reset head */
517   else
518     ci->prev->next = ci->next;
519
520   if (ci->next == NULL)
521     cache_queue_tail = ci->prev; /* reset the tail */
522   else
523     ci->next->prev = ci->prev;
524
525   ci->next = ci->prev = NULL;
526   ci->flags &= ~CI_FLAGS_IN_QUEUE;
527 } /* }}} static void remove_from_queue */
528
529 /*
530  * enqueue_cache_item:
531  * `cache_lock' must be acquired before calling this function!
532  */
533 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
534     queue_side_t side)
535 {
536   if (ci == NULL)
537     return (-1);
538
539   if (ci->values_num == 0)
540     return (0);
541
542   if (side == HEAD)
543   {
544     if (cache_queue_head == ci)
545       return 0;
546
547     /* remove from the double linked list */
548     if (ci->flags & CI_FLAGS_IN_QUEUE)
549       remove_from_queue(ci);
550
551     ci->prev = NULL;
552     ci->next = cache_queue_head;
553     if (ci->next != NULL)
554       ci->next->prev = ci;
555     cache_queue_head = ci;
556
557     if (cache_queue_tail == NULL)
558       cache_queue_tail = cache_queue_head;
559   }
560   else /* (side == TAIL) */
561   {
562     /* We don't move values back in the list.. */
563     if (ci->flags & CI_FLAGS_IN_QUEUE)
564       return (0);
565
566     assert (ci->next == NULL);
567     assert (ci->prev == NULL);
568
569     ci->prev = cache_queue_tail;
570
571     if (cache_queue_tail == NULL)
572       cache_queue_head = ci;
573     else
574       cache_queue_tail->next = ci;
575
576     cache_queue_tail = ci;
577   }
578
579   ci->flags |= CI_FLAGS_IN_QUEUE;
580
581   pthread_cond_broadcast(&cache_cond);
582   pthread_mutex_lock (&stats_lock);
583   stats_queue_length++;
584   pthread_mutex_unlock (&stats_lock);
585
586   return (0);
587 } /* }}} int enqueue_cache_item */
588
589 /*
590  * tree_callback_flush:
591  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
592  * while this is in progress.
593  */
594 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
595     gpointer data)
596 {
597   cache_item_t *ci;
598   callback_flush_data_t *cfd;
599
600   ci = (cache_item_t *) value;
601   cfd = (callback_flush_data_t *) data;
602
603   if ((ci->last_flush_time <= cfd->abs_timeout)
604       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
605       && (ci->values_num > 0))
606   {
607     enqueue_cache_item (ci, TAIL);
608   }
609   else if ((do_shutdown != 0)
610       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
611       && (ci->values_num > 0))
612   {
613     enqueue_cache_item (ci, TAIL);
614   }
615   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
616       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
617       && (ci->values_num <= 0))
618   {
619     char **temp;
620
621     temp = (char **) realloc (cfd->keys,
622         sizeof (char *) * (cfd->keys_num + 1));
623     if (temp == NULL)
624     {
625       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
626       return (FALSE);
627     }
628     cfd->keys = temp;
629     /* Make really sure this points to the _same_ place */
630     assert ((char *) key == ci->file);
631     cfd->keys[cfd->keys_num] = (char *) key;
632     cfd->keys_num++;
633   }
634
635   return (FALSE);
636 } /* }}} gboolean tree_callback_flush */
637
638 static int flush_old_values (int max_age)
639 {
640   callback_flush_data_t cfd;
641   size_t k;
642
643   memset (&cfd, 0, sizeof (cfd));
644   /* Pass the current time as user data so that we don't need to call
645    * `time' for each node. */
646   cfd.now = time (NULL);
647   cfd.keys = NULL;
648   cfd.keys_num = 0;
649
650   if (max_age > 0)
651     cfd.abs_timeout = cfd.now - max_age;
652   else
653     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
654
655   /* `tree_callback_flush' will return the keys of all values that haven't
656    * been touched in the last `config_flush_interval' seconds in `cfd'.
657    * The char*'s in this array point to the same memory as ci->file, so we
658    * don't need to free them separately. */
659   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
660
661   for (k = 0; k < cfd.keys_num; k++)
662   {
663     cache_item_t *ci;
664
665     /* This must not fail. */
666     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
667     assert (ci != NULL);
668
669     /* If we end up here with values available, something's seriously
670      * messed up. */
671     assert (ci->values_num == 0);
672
673     /* Remove the node from the tree */
674     g_tree_remove (cache_tree, cfd.keys[k]);
675     cfd.keys[k] = NULL;
676
677     /* Now free and clean up `ci'. */
678     free (ci->file);
679     ci->file = NULL;
680     free (ci);
681     ci = NULL;
682   } /* for (k = 0; k < cfd.keys_num; k++) */
683
684   if (cfd.keys != NULL)
685   {
686     free (cfd.keys);
687     cfd.keys = NULL;
688   }
689
690   return (0);
691 } /* int flush_old_values */
692
693 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
694 {
695   struct timeval now;
696   struct timespec next_flush;
697   int final_flush = 0; /* make sure we only flush once on shutdown */
698
699   gettimeofday (&now, NULL);
700   next_flush.tv_sec = now.tv_sec + config_flush_interval;
701   next_flush.tv_nsec = 1000 * now.tv_usec;
702
703   pthread_mutex_lock (&cache_lock);
704   while ((do_shutdown == 0) || (cache_queue_head != NULL))
705   {
706     cache_item_t *ci;
707     char *file;
708     char **values;
709     int values_num;
710     int status;
711     int i;
712
713     /* First, check if it's time to do the cache flush. */
714     gettimeofday (&now, NULL);
715     if ((now.tv_sec > next_flush.tv_sec)
716         || ((now.tv_sec == next_flush.tv_sec)
717           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
718     {
719       /* Flush all values that haven't been written in the last
720        * `config_write_interval' seconds. */
721       flush_old_values (config_write_interval);
722
723       /* Determine the time of the next cache flush. */
724       while (next_flush.tv_sec <= now.tv_sec)
725         next_flush.tv_sec += config_flush_interval;
726
727       /* unlock the cache while we rotate so we don't block incoming
728        * updates if the fsync() blocks on disk I/O */
729       pthread_mutex_unlock(&cache_lock);
730       journal_rotate();
731       pthread_mutex_lock(&cache_lock);
732     }
733
734     /* Now, check if there's something to store away. If not, wait until
735      * something comes in or it's time to do the cache flush.  if we are
736      * shutting down, do not wait around.  */
737     if (cache_queue_head == NULL && !do_shutdown)
738     {
739       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
740       if ((status != 0) && (status != ETIMEDOUT))
741       {
742         RRDD_LOG (LOG_ERR, "queue_thread_main: "
743             "pthread_cond_timedwait returned %i.", status);
744       }
745     }
746
747     /* We're about to shut down */
748     if (do_shutdown != 0 && !final_flush++)
749     {
750       if (config_flush_at_shutdown)
751         flush_old_values (-1); /* flush everything */
752       else
753         break;
754     }
755
756     /* Check if a value has arrived. This may be NULL if we timed out or there
757      * was an interrupt such as a signal. */
758     if (cache_queue_head == NULL)
759       continue;
760
761     ci = cache_queue_head;
762
763     /* copy the relevant parts */
764     file = strdup (ci->file);
765     if (file == NULL)
766     {
767       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
768       continue;
769     }
770
771     assert(ci->values != NULL);
772     assert(ci->values_num > 0);
773
774     values = ci->values;
775     values_num = ci->values_num;
776
777     wipe_ci_values(ci, time(NULL));
778     remove_from_queue(ci);
779
780     pthread_mutex_lock (&stats_lock);
781     assert (stats_queue_length > 0);
782     stats_queue_length--;
783     pthread_mutex_unlock (&stats_lock);
784
785     pthread_mutex_unlock (&cache_lock);
786
787     rrd_clear_error ();
788     status = rrd_update_r (file, NULL, values_num, (void *) values);
789     if (status != 0)
790     {
791       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
792           "rrd_update_r (%s) failed with status %i. (%s)",
793           file, status, rrd_get_error());
794     }
795
796     journal_write("wrote", file);
797     pthread_cond_broadcast(&ci->flushed);
798
799     for (i = 0; i < values_num; i++)
800       free (values[i]);
801
802     free(values);
803     free(file);
804
805     if (status == 0)
806     {
807       pthread_mutex_lock (&stats_lock);
808       stats_updates_written++;
809       stats_data_sets_written += values_num;
810       pthread_mutex_unlock (&stats_lock);
811     }
812
813     pthread_mutex_lock (&cache_lock);
814
815     /* We're about to shut down */
816     if (do_shutdown != 0 && !final_flush++)
817     {
818       if (config_flush_at_shutdown)
819           flush_old_values (-1); /* flush everything */
820       else
821         break;
822     }
823   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
824   pthread_mutex_unlock (&cache_lock);
825
826   if (config_flush_at_shutdown)
827   {
828     assert(cache_queue_head == NULL);
829     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
830   }
831
832   journal_done();
833
834   return (NULL);
835 } /* }}} void *queue_thread_main */
836
837 static int buffer_get_field (char **buffer_ret, /* {{{ */
838     size_t *buffer_size_ret, char **field_ret)
839 {
840   char *buffer;
841   size_t buffer_pos;
842   size_t buffer_size;
843   char *field;
844   size_t field_size;
845   int status;
846
847   buffer = *buffer_ret;
848   buffer_pos = 0;
849   buffer_size = *buffer_size_ret;
850   field = *buffer_ret;
851   field_size = 0;
852
853   if (buffer_size <= 0)
854     return (-1);
855
856   /* This is ensured by `handle_request'. */
857   assert (buffer[buffer_size - 1] == '\0');
858
859   status = -1;
860   while (buffer_pos < buffer_size)
861   {
862     /* Check for end-of-field or end-of-buffer */
863     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
864     {
865       field[field_size] = 0;
866       field_size++;
867       buffer_pos++;
868       status = 0;
869       break;
870     }
871     /* Handle escaped characters. */
872     else if (buffer[buffer_pos] == '\\')
873     {
874       if (buffer_pos >= (buffer_size - 1))
875         break;
876       buffer_pos++;
877       field[field_size] = buffer[buffer_pos];
878       field_size++;
879       buffer_pos++;
880     }
881     /* Normal operation */ 
882     else
883     {
884       field[field_size] = buffer[buffer_pos];
885       field_size++;
886       buffer_pos++;
887     }
888   } /* while (buffer_pos < buffer_size) */
889
890   if (status != 0)
891     return (status);
892
893   *buffer_ret = buffer + buffer_pos;
894   *buffer_size_ret = buffer_size - buffer_pos;
895   *field_ret = field;
896
897   return (0);
898 } /* }}} int buffer_get_field */
899
900 /* if we're restricting writes to the base directory,
901  * check whether the file falls within the dir
902  * returns 1 if OK, otherwise 0
903  */
904 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
905 {
906   assert(file != NULL);
907
908   if (!config_write_base_only
909       || sock == NULL /* journal replay */
910       || config_base_dir == NULL)
911     return 1;
912
913   if (strstr(file, "../") != NULL) goto err;
914
915   /* relative paths without "../" are ok */
916   if (*file != '/') return 1;
917
918   /* file must be of the format base + "/" + <1+ char filename> */
919   if (strlen(file) < _config_base_dir_len + 2) goto err;
920   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
921   if (*(file + _config_base_dir_len) != '/') goto err;
922
923   return 1;
924
925 err:
926   if (sock != NULL && sock->fd >= 0)
927     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
928
929   return 0;
930 } /* }}} static int check_file_access */
931
932 static int flush_file (const char *filename) /* {{{ */
933 {
934   cache_item_t *ci;
935
936   pthread_mutex_lock (&cache_lock);
937
938   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
939   if (ci == NULL)
940   {
941     pthread_mutex_unlock (&cache_lock);
942     return (ENOENT);
943   }
944
945   if (ci->values_num > 0)
946   {
947     /* Enqueue at head */
948     enqueue_cache_item (ci, HEAD);
949     pthread_cond_wait(&ci->flushed, &cache_lock);
950   }
951
952   pthread_mutex_unlock(&cache_lock);
953
954   return (0);
955 } /* }}} int flush_file */
956
957 static int handle_request_help (listen_socket_t *sock, /* {{{ */
958     char *buffer, size_t buffer_size)
959 {
960   int status;
961   char **help_text;
962   char *command;
963
964   char *help_help[2] =
965   {
966     "Command overview\n"
967     ,
968     "FLUSH <filename>\n"
969     "FLUSHALL\n"
970     "HELP [<command>]\n"
971     "UPDATE <filename> <values> [<values> ...]\n"
972     "STATS\n"
973   };
974
975   char *help_flush[2] =
976   {
977     "Help for FLUSH\n"
978     ,
979     "Usage: FLUSH <filename>\n"
980     "\n"
981     "Adds the given filename to the head of the update queue and returns\n"
982     "after is has been dequeued.\n"
983   };
984
985   char *help_flushall[2] =
986   {
987     "Help for FLUSHALL\n"
988     ,
989     "Usage: FLUSHALL\n"
990     "\n"
991     "Triggers writing of all pending updates.  Returns immediately.\n"
992   };
993
994   char *help_update[2] =
995   {
996     "Help for UPDATE\n"
997     ,
998     "Usage: UPDATE <filename> <values> [<values> ...]\n"
999     "\n"
1000     "Adds the given file to the internal cache if it is not yet known and\n"
1001     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1002     "for details.\n"
1003     "\n"
1004     "Each <values> has the following form:\n"
1005     "  <values> = <time>:<value>[:<value>[...]]\n"
1006     "See the rrdupdate(1) manpage for details.\n"
1007   };
1008
1009   char *help_stats[2] =
1010   {
1011     "Help for STATS\n"
1012     ,
1013     "Usage: STATS\n"
1014     "\n"
1015     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1016     "a description of the values.\n"
1017   };
1018
1019   status = buffer_get_field (&buffer, &buffer_size, &command);
1020   if (status != 0)
1021     help_text = help_help;
1022   else
1023   {
1024     if (strcasecmp (command, "update") == 0)
1025       help_text = help_update;
1026     else if (strcasecmp (command, "flush") == 0)
1027       help_text = help_flush;
1028     else if (strcasecmp (command, "flushall") == 0)
1029       help_text = help_flushall;
1030     else if (strcasecmp (command, "stats") == 0)
1031       help_text = help_stats;
1032     else
1033       help_text = help_help;
1034   }
1035
1036   add_response_info(sock, help_text[1]);
1037   return send_response(sock, RESP_OK, help_text[0]);
1038 } /* }}} int handle_request_help */
1039
1040 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1041 {
1042   uint64_t copy_queue_length;
1043   uint64_t copy_updates_received;
1044   uint64_t copy_flush_received;
1045   uint64_t copy_updates_written;
1046   uint64_t copy_data_sets_written;
1047   uint64_t copy_journal_bytes;
1048   uint64_t copy_journal_rotate;
1049
1050   uint64_t tree_nodes_number;
1051   uint64_t tree_depth;
1052
1053   pthread_mutex_lock (&stats_lock);
1054   copy_queue_length       = stats_queue_length;
1055   copy_updates_received   = stats_updates_received;
1056   copy_flush_received     = stats_flush_received;
1057   copy_updates_written    = stats_updates_written;
1058   copy_data_sets_written  = stats_data_sets_written;
1059   copy_journal_bytes      = stats_journal_bytes;
1060   copy_journal_rotate     = stats_journal_rotate;
1061   pthread_mutex_unlock (&stats_lock);
1062
1063   pthread_mutex_lock (&cache_lock);
1064   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1065   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1066   pthread_mutex_unlock (&cache_lock);
1067
1068   add_response_info(sock,
1069                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1070   add_response_info(sock,
1071                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1072   add_response_info(sock,
1073                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1074   add_response_info(sock,
1075                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1076   add_response_info(sock,
1077                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1078   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1079   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1080   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1081   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1082
1083   send_response(sock, RESP_OK, "Statistics follow\n");
1084
1085   return (0);
1086 } /* }}} int handle_request_stats */
1087
1088 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1089     char *buffer, size_t buffer_size)
1090 {
1091   char *file;
1092   int status;
1093
1094   status = buffer_get_field (&buffer, &buffer_size, &file);
1095   if (status != 0)
1096   {
1097     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1098   }
1099   else
1100   {
1101     pthread_mutex_lock(&stats_lock);
1102     stats_flush_received++;
1103     pthread_mutex_unlock(&stats_lock);
1104
1105     if (!check_file_access(file, sock)) return 0;
1106
1107     status = flush_file (file);
1108     if (status == 0)
1109       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1110     else if (status == ENOENT)
1111     {
1112       /* no file in our tree; see whether it exists at all */
1113       struct stat statbuf;
1114
1115       memset(&statbuf, 0, sizeof(statbuf));
1116       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1117         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1118       else
1119         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1120     }
1121     else if (status < 0)
1122       return send_response(sock, RESP_ERR, "Internal error.\n");
1123     else
1124       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1125   }
1126
1127   /* NOTREACHED */
1128   assert(1==0);
1129 } /* }}} int handle_request_slurp */
1130
1131 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1132 {
1133
1134   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1135
1136   pthread_mutex_lock(&cache_lock);
1137   flush_old_values(-1);
1138   pthread_mutex_unlock(&cache_lock);
1139
1140   return send_response(sock, RESP_OK, "Started flush.\n");
1141 } /* }}} static int handle_request_flushall */
1142
1143 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1144     char *buffer, size_t buffer_size)
1145 {
1146   char *file;
1147   int values_num = 0;
1148   int status;
1149
1150   time_t now;
1151   cache_item_t *ci;
1152
1153   now = time (NULL);
1154
1155   status = buffer_get_field (&buffer, &buffer_size, &file);
1156   if (status != 0)
1157     return send_response(sock, RESP_ERR,
1158                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1159
1160   pthread_mutex_lock(&stats_lock);
1161   stats_updates_received++;
1162   pthread_mutex_unlock(&stats_lock);
1163
1164   if (!check_file_access(file, sock)) return 0;
1165
1166   pthread_mutex_lock (&cache_lock);
1167   ci = g_tree_lookup (cache_tree, file);
1168
1169   if (ci == NULL) /* {{{ */
1170   {
1171     struct stat statbuf;
1172
1173     /* don't hold the lock while we setup; stat(2) might block */
1174     pthread_mutex_unlock(&cache_lock);
1175
1176     memset (&statbuf, 0, sizeof (statbuf));
1177     status = stat (file, &statbuf);
1178     if (status != 0)
1179     {
1180       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1181
1182       status = errno;
1183       if (status == ENOENT)
1184         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1185       else
1186         return send_response(sock, RESP_ERR,
1187                              "stat failed with error %i.\n", status);
1188     }
1189     if (!S_ISREG (statbuf.st_mode))
1190       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1191
1192     if (access(file, R_OK|W_OK) != 0)
1193       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1194                            file, rrd_strerror(errno));
1195
1196     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1197     if (ci == NULL)
1198     {
1199       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1200
1201       return send_response(sock, RESP_ERR, "malloc failed.\n");
1202     }
1203     memset (ci, 0, sizeof (cache_item_t));
1204
1205     ci->file = strdup (file);
1206     if (ci->file == NULL)
1207     {
1208       free (ci);
1209       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1210
1211       return send_response(sock, RESP_ERR, "strdup failed.\n");
1212     }
1213
1214     wipe_ci_values(ci, now);
1215     ci->flags = CI_FLAGS_IN_TREE;
1216
1217     pthread_mutex_lock(&cache_lock);
1218     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1219   } /* }}} */
1220   assert (ci != NULL);
1221
1222   while (buffer_size > 0)
1223   {
1224     char **temp;
1225     char *value;
1226
1227     status = buffer_get_field (&buffer, &buffer_size, &value);
1228     if (status != 0)
1229     {
1230       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1231       break;
1232     }
1233
1234     temp = (char **) realloc (ci->values,
1235         sizeof (char *) * (ci->values_num + 1));
1236     if (temp == NULL)
1237     {
1238       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1239       continue;
1240     }
1241     ci->values = temp;
1242
1243     ci->values[ci->values_num] = strdup (value);
1244     if (ci->values[ci->values_num] == NULL)
1245     {
1246       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1247       continue;
1248     }
1249     ci->values_num++;
1250
1251     values_num++;
1252   }
1253
1254   if (((now - ci->last_flush_time) >= config_write_interval)
1255       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1256       && (ci->values_num > 0))
1257   {
1258     enqueue_cache_item (ci, TAIL);
1259   }
1260
1261   pthread_mutex_unlock (&cache_lock);
1262
1263   if (values_num < 1)
1264     return send_response(sock, RESP_ERR, "No values updated.\n");
1265   else
1266     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1267
1268   /* NOTREACHED */
1269   assert(1==0);
1270
1271 } /* }}} int handle_request_update */
1272
1273 /* we came across a "WROTE" entry during journal replay.
1274  * throw away any values that we have accumulated for this file
1275  */
1276 static int handle_request_wrote (const char *buffer) /* {{{ */
1277 {
1278   int i;
1279   cache_item_t *ci;
1280   const char *file = buffer;
1281
1282   pthread_mutex_lock(&cache_lock);
1283
1284   ci = g_tree_lookup(cache_tree, file);
1285   if (ci == NULL)
1286   {
1287     pthread_mutex_unlock(&cache_lock);
1288     return (0);
1289   }
1290
1291   if (ci->values)
1292   {
1293     for (i=0; i < ci->values_num; i++)
1294       free(ci->values[i]);
1295
1296     free(ci->values);
1297   }
1298
1299   wipe_ci_values(ci, time(NULL));
1300   remove_from_queue(ci);
1301
1302   pthread_mutex_unlock(&cache_lock);
1303   return (0);
1304 } /* }}} int handle_request_wrote */
1305
1306 /* returns 1 if we have the required privilege level */
1307 static int has_privilege (listen_socket_t *sock, /* {{{ */
1308                           socket_privilege priv)
1309 {
1310   if (sock == NULL) /* journal replay */
1311     return 1;
1312
1313   if (sock->privilege >= priv)
1314     return 1;
1315
1316   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1317 } /* }}} static int has_privilege */
1318
1319 /* if sock==NULL, we are in journal replay mode */
1320 static int handle_request (listen_socket_t *sock, /* {{{ */
1321                            char *buffer, size_t buffer_size)
1322 {
1323   char *buffer_ptr;
1324   char *command;
1325   int status;
1326
1327   assert (buffer[buffer_size - 1] == '\0');
1328
1329   buffer_ptr = buffer;
1330   command = NULL;
1331   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1332   if (status != 0)
1333   {
1334     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1335     return (-1);
1336   }
1337
1338   if (strcasecmp (command, "update") == 0)
1339   {
1340     status = has_privilege(sock, PRIV_HIGH);
1341     if (status <= 0)
1342       return status;
1343
1344     /* don't re-write updates in replay mode */
1345     if (sock != NULL)
1346       journal_write(command, buffer_ptr);
1347
1348     return (handle_request_update (sock, buffer_ptr, buffer_size));
1349   }
1350   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1351   {
1352     /* this is only valid in replay mode */
1353     return (handle_request_wrote (buffer_ptr));
1354   }
1355   else if (strcasecmp (command, "flush") == 0)
1356     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1357   else if (strcasecmp (command, "flushall") == 0)
1358   {
1359     status = has_privilege(sock, PRIV_HIGH);
1360     if (status <= 0)
1361       return status;
1362
1363     return (handle_request_flushall(sock));
1364   }
1365   else if (strcasecmp (command, "stats") == 0)
1366     return (handle_request_stats (sock));
1367   else if (strcasecmp (command, "help") == 0)
1368     return (handle_request_help (sock, buffer_ptr, buffer_size));
1369   else
1370     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1371
1372   /* NOTREACHED */
1373   assert(1==0);
1374 } /* }}} int handle_request */
1375
1376 /* MUST NOT hold journal_lock before calling this */
1377 static void journal_rotate(void) /* {{{ */
1378 {
1379   FILE *old_fh = NULL;
1380
1381   if (journal_cur == NULL || journal_old == NULL)
1382     return;
1383
1384   pthread_mutex_lock(&journal_lock);
1385
1386   /* we rotate this way (rename before close) so that the we can release
1387    * the journal lock as fast as possible.  Journal writes to the new
1388    * journal can proceed immediately after the new file is opened.  The
1389    * fclose can then block without affecting new updates.
1390    */
1391   if (journal_fh != NULL)
1392   {
1393     old_fh = journal_fh;
1394     rename(journal_cur, journal_old);
1395     ++stats_journal_rotate;
1396   }
1397
1398   journal_fh = fopen(journal_cur, "a");
1399   pthread_mutex_unlock(&journal_lock);
1400
1401   if (old_fh != NULL)
1402     fclose(old_fh);
1403
1404   if (journal_fh == NULL)
1405   {
1406     RRDD_LOG(LOG_CRIT,
1407              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1408              journal_cur, rrd_strerror(errno));
1409
1410     RRDD_LOG(LOG_ERR,
1411              "JOURNALING DISABLED: All values will be flushed at shutdown");
1412     config_flush_at_shutdown = 1;
1413   }
1414
1415 } /* }}} static void journal_rotate */
1416
1417 static void journal_done(void) /* {{{ */
1418 {
1419   if (journal_cur == NULL)
1420     return;
1421
1422   pthread_mutex_lock(&journal_lock);
1423   if (journal_fh != NULL)
1424   {
1425     fclose(journal_fh);
1426     journal_fh = NULL;
1427   }
1428
1429   if (config_flush_at_shutdown)
1430   {
1431     RRDD_LOG(LOG_INFO, "removing journals");
1432     unlink(journal_old);
1433     unlink(journal_cur);
1434   }
1435   else
1436   {
1437     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1438              "journals will be used at next startup");
1439   }
1440
1441   pthread_mutex_unlock(&journal_lock);
1442
1443 } /* }}} static void journal_done */
1444
1445 static int journal_write(char *cmd, char *args) /* {{{ */
1446 {
1447   int chars;
1448
1449   if (journal_fh == NULL)
1450     return 0;
1451
1452   pthread_mutex_lock(&journal_lock);
1453   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1454   pthread_mutex_unlock(&journal_lock);
1455
1456   if (chars > 0)
1457   {
1458     pthread_mutex_lock(&stats_lock);
1459     stats_journal_bytes += chars;
1460     pthread_mutex_unlock(&stats_lock);
1461   }
1462
1463   return chars;
1464 } /* }}} static int journal_write */
1465
1466 static int journal_replay (const char *file) /* {{{ */
1467 {
1468   FILE *fh;
1469   int entry_cnt = 0;
1470   int fail_cnt = 0;
1471   uint64_t line = 0;
1472   char entry[CMD_MAX];
1473
1474   if (file == NULL) return 0;
1475
1476   fh = fopen(file, "r");
1477   if (fh == NULL)
1478   {
1479     if (errno != ENOENT)
1480       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1481                file, rrd_strerror(errno));
1482     return 0;
1483   }
1484   else
1485     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1486
1487   while(!feof(fh))
1488   {
1489     size_t entry_len;
1490
1491     ++line;
1492     if (fgets(entry, sizeof(entry), fh) == NULL)
1493       break;
1494     entry_len = strlen(entry);
1495
1496     /* check \n termination in case journal writing crashed mid-line */
1497     if (entry_len == 0)
1498       continue;
1499     else if (entry[entry_len - 1] != '\n')
1500     {
1501       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1502       ++fail_cnt;
1503       continue;
1504     }
1505
1506     entry[entry_len - 1] = '\0';
1507
1508     if (handle_request(NULL, entry, entry_len) == 0)
1509       ++entry_cnt;
1510     else
1511       ++fail_cnt;
1512   }
1513
1514   fclose(fh);
1515
1516   if (entry_cnt > 0)
1517   {
1518     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1519              entry_cnt, fail_cnt);
1520     return 1;
1521   }
1522   else
1523     return 0;
1524
1525 } /* }}} static int journal_replay */
1526
1527 static void close_connection(listen_socket_t *sock)
1528 {
1529   close(sock->fd) ;  sock->fd   = -1;
1530   free(sock->rbuf);  sock->rbuf = NULL;
1531   free(sock->wbuf);  sock->wbuf = NULL;
1532
1533   free(sock);
1534 }
1535
1536 static void *connection_thread_main (void *args) /* {{{ */
1537 {
1538   pthread_t self;
1539   listen_socket_t *sock;
1540   int i;
1541   int fd;
1542
1543   sock = (listen_socket_t *) args;
1544   fd = sock->fd;
1545
1546   /* init read buffers */
1547   sock->next_read = sock->next_cmd = 0;
1548   sock->rbuf = malloc(RBUF_SIZE);
1549   if (sock->rbuf == NULL)
1550   {
1551     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1552     close_connection(sock);
1553     return NULL;
1554   }
1555
1556   pthread_mutex_lock (&connection_threads_lock);
1557   {
1558     pthread_t *temp;
1559
1560     temp = (pthread_t *) realloc (connection_threads,
1561         sizeof (pthread_t) * (connection_threads_num + 1));
1562     if (temp == NULL)
1563     {
1564       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1565     }
1566     else
1567     {
1568       connection_threads = temp;
1569       connection_threads[connection_threads_num] = pthread_self ();
1570       connection_threads_num++;
1571     }
1572   }
1573   pthread_mutex_unlock (&connection_threads_lock);
1574
1575   while (do_shutdown == 0)
1576   {
1577     char *cmd;
1578     ssize_t cmd_len;
1579     ssize_t rbytes;
1580
1581     struct pollfd pollfd;
1582     int status;
1583
1584     pollfd.fd = fd;
1585     pollfd.events = POLLIN | POLLPRI;
1586     pollfd.revents = 0;
1587
1588     status = poll (&pollfd, 1, /* timeout = */ 500);
1589     if (do_shutdown)
1590       break;
1591     else if (status == 0) /* timeout */
1592       continue;
1593     else if (status < 0) /* error */
1594     {
1595       status = errno;
1596       if (status == EINTR)
1597         continue;
1598       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1599       continue;
1600     }
1601
1602     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1603     {
1604       close_connection(sock);
1605       break;
1606     }
1607     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1608     {
1609       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1610           "poll(2) returned something unexpected: %#04hx",
1611           pollfd.revents);
1612       close_connection(sock);
1613       break;
1614     }
1615
1616     rbytes = read(fd, sock->rbuf + sock->next_read,
1617                   RBUF_SIZE - sock->next_read);
1618     if (rbytes < 0)
1619     {
1620       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1621       break;
1622     }
1623     else if (rbytes == 0)
1624       break; /* eof */
1625
1626     sock->next_read += rbytes;
1627
1628     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1629     {
1630       status = handle_request (sock, cmd, cmd_len+1);
1631       if (status != 0)
1632         goto out_close;
1633     }
1634   }
1635
1636 out_close:
1637   close_connection(sock);
1638
1639   self = pthread_self ();
1640   /* Remove this thread from the connection threads list */
1641   pthread_mutex_lock (&connection_threads_lock);
1642   /* Find out own index in the array */
1643   for (i = 0; i < connection_threads_num; i++)
1644     if (pthread_equal (connection_threads[i], self) != 0)
1645       break;
1646   assert (i < connection_threads_num);
1647
1648   /* Move the trailing threads forward. */
1649   if (i < (connection_threads_num - 1))
1650   {
1651     memmove (connection_threads + i,
1652         connection_threads + i + 1,
1653         sizeof (pthread_t) * (connection_threads_num - i - 1));
1654   }
1655
1656   connection_threads_num--;
1657   pthread_mutex_unlock (&connection_threads_lock);
1658
1659   return (NULL);
1660 } /* }}} void *connection_thread_main */
1661
1662 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1663 {
1664   int fd;
1665   struct sockaddr_un sa;
1666   listen_socket_t *temp;
1667   int status;
1668   const char *path;
1669
1670   path = sock->addr;
1671   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1672     path += strlen("unix:");
1673
1674   temp = (listen_socket_t *) realloc (listen_fds,
1675       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1676   if (temp == NULL)
1677   {
1678     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1679     return (-1);
1680   }
1681   listen_fds = temp;
1682   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1683
1684   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1685   if (fd < 0)
1686   {
1687     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1688     return (-1);
1689   }
1690
1691   memset (&sa, 0, sizeof (sa));
1692   sa.sun_family = AF_UNIX;
1693   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1694
1695   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1696   if (status != 0)
1697   {
1698     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1699     close (fd);
1700     unlink (path);
1701     return (-1);
1702   }
1703
1704   status = listen (fd, /* backlog = */ 10);
1705   if (status != 0)
1706   {
1707     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1708     close (fd);
1709     unlink (path);
1710     return (-1);
1711   }
1712
1713   listen_fds[listen_fds_num].fd = fd;
1714   listen_fds[listen_fds_num].family = PF_UNIX;
1715   strncpy(listen_fds[listen_fds_num].addr, path,
1716           sizeof (listen_fds[listen_fds_num].addr) - 1);
1717   listen_fds_num++;
1718
1719   return (0);
1720 } /* }}} int open_listen_socket_unix */
1721
1722 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1723 {
1724   struct addrinfo ai_hints;
1725   struct addrinfo *ai_res;
1726   struct addrinfo *ai_ptr;
1727   char addr_copy[NI_MAXHOST];
1728   char *addr;
1729   char *port;
1730   int status;
1731
1732   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1733   addr_copy[sizeof (addr_copy) - 1] = 0;
1734   addr = addr_copy;
1735
1736   memset (&ai_hints, 0, sizeof (ai_hints));
1737   ai_hints.ai_flags = 0;
1738 #ifdef AI_ADDRCONFIG
1739   ai_hints.ai_flags |= AI_ADDRCONFIG;
1740 #endif
1741   ai_hints.ai_family = AF_UNSPEC;
1742   ai_hints.ai_socktype = SOCK_STREAM;
1743
1744   port = NULL;
1745   if (*addr == '[') /* IPv6+port format */
1746   {
1747     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1748     addr++;
1749
1750     port = strchr (addr, ']');
1751     if (port == NULL)
1752     {
1753       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1754           sock->addr);
1755       return (-1);
1756     }
1757     *port = 0;
1758     port++;
1759
1760     if (*port == ':')
1761       port++;
1762     else if (*port == 0)
1763       port = NULL;
1764     else
1765     {
1766       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1767           port);
1768       return (-1);
1769     }
1770   } /* if (*addr = ']') */
1771   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1772   {
1773     port = rindex(addr, ':');
1774     if (port != NULL)
1775     {
1776       *port = 0;
1777       port++;
1778     }
1779   }
1780   ai_res = NULL;
1781   status = getaddrinfo (addr,
1782                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1783                         &ai_hints, &ai_res);
1784   if (status != 0)
1785   {
1786     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1787         "%s", addr, gai_strerror (status));
1788     return (-1);
1789   }
1790
1791   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1792   {
1793     int fd;
1794     listen_socket_t *temp;
1795     int one = 1;
1796
1797     temp = (listen_socket_t *) realloc (listen_fds,
1798         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1799     if (temp == NULL)
1800     {
1801       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1802       continue;
1803     }
1804     listen_fds = temp;
1805     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1806
1807     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1808     if (fd < 0)
1809     {
1810       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1811       continue;
1812     }
1813
1814     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1815
1816     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1817     if (status != 0)
1818     {
1819       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1820       close (fd);
1821       continue;
1822     }
1823
1824     status = listen (fd, /* backlog = */ 10);
1825     if (status != 0)
1826     {
1827       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1828       close (fd);
1829       return (-1);
1830     }
1831
1832     listen_fds[listen_fds_num].fd = fd;
1833     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1834     listen_fds_num++;
1835   } /* for (ai_ptr) */
1836
1837   return (0);
1838 } /* }}} static int open_listen_socket_network */
1839
1840 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1841 {
1842   assert(sock != NULL);
1843   assert(sock->addr != NULL);
1844
1845   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1846       || sock->addr[0] == '/')
1847     return (open_listen_socket_unix(sock));
1848   else
1849     return (open_listen_socket_network(sock));
1850 } /* }}} int open_listen_socket */
1851
1852 static int close_listen_sockets (void) /* {{{ */
1853 {
1854   size_t i;
1855
1856   for (i = 0; i < listen_fds_num; i++)
1857   {
1858     close (listen_fds[i].fd);
1859
1860     if (listen_fds[i].family == PF_UNIX)
1861       unlink(listen_fds[i].addr);
1862   }
1863
1864   free (listen_fds);
1865   listen_fds = NULL;
1866   listen_fds_num = 0;
1867
1868   return (0);
1869 } /* }}} int close_listen_sockets */
1870
1871 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1872 {
1873   struct pollfd *pollfds;
1874   int pollfds_num;
1875   int status;
1876   int i;
1877
1878   for (i = 0; i < config_listen_address_list_len; i++)
1879     open_listen_socket (config_listen_address_list[i]);
1880
1881   if (config_listen_address_list_len < 1)
1882   {
1883     listen_socket_t sock;
1884     memset(&sock, 0, sizeof(sock));
1885     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1886     open_listen_socket (&sock);
1887   }
1888
1889   if (listen_fds_num < 1)
1890   {
1891     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1892         "could be opened. Sorry.");
1893     return (NULL);
1894   }
1895
1896   pollfds_num = listen_fds_num;
1897   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1898   if (pollfds == NULL)
1899   {
1900     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1901     return (NULL);
1902   }
1903   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1904
1905   RRDD_LOG(LOG_INFO, "listening for connections");
1906
1907   while (do_shutdown == 0)
1908   {
1909     assert (pollfds_num == ((int) listen_fds_num));
1910     for (i = 0; i < pollfds_num; i++)
1911     {
1912       pollfds[i].fd = listen_fds[i].fd;
1913       pollfds[i].events = POLLIN | POLLPRI;
1914       pollfds[i].revents = 0;
1915     }
1916
1917     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1918     if (do_shutdown)
1919       break;
1920     else if (status == 0) /* timeout */
1921       continue;
1922     else if (status < 0) /* error */
1923     {
1924       status = errno;
1925       if (status != EINTR)
1926       {
1927         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1928       }
1929       continue;
1930     }
1931
1932     for (i = 0; i < pollfds_num; i++)
1933     {
1934       listen_socket_t *client_sock;
1935       struct sockaddr_storage client_sa;
1936       socklen_t client_sa_size;
1937       pthread_t tid;
1938       pthread_attr_t attr;
1939
1940       if (pollfds[i].revents == 0)
1941         continue;
1942
1943       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
1944       {
1945         RRDD_LOG (LOG_ERR, "listen_thread_main: "
1946             "poll(2) returned something unexpected for listen FD #%i.",
1947             pollfds[i].fd);
1948         continue;
1949       }
1950
1951       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
1952       if (client_sock == NULL)
1953       {
1954         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1955         continue;
1956       }
1957       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
1958
1959       client_sa_size = sizeof (client_sa);
1960       client_sock->fd = accept (pollfds[i].fd,
1961           (struct sockaddr *) &client_sa, &client_sa_size);
1962       if (client_sock->fd < 0)
1963       {
1964         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
1965         free(client_sock);
1966         continue;
1967       }
1968
1969       pthread_attr_init (&attr);
1970       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
1971
1972       status = pthread_create (&tid, &attr, connection_thread_main,
1973                                client_sock);
1974       if (status != 0)
1975       {
1976         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
1977         close_connection(client_sock);
1978         continue;
1979       }
1980     } /* for (pollfds_num) */
1981   } /* while (do_shutdown == 0) */
1982
1983   RRDD_LOG(LOG_INFO, "starting shutdown");
1984
1985   close_listen_sockets ();
1986
1987   pthread_mutex_lock (&connection_threads_lock);
1988   while (connection_threads_num > 0)
1989   {
1990     pthread_t wait_for;
1991
1992     wait_for = connection_threads[0];
1993
1994     pthread_mutex_unlock (&connection_threads_lock);
1995     pthread_join (wait_for, /* retval = */ NULL);
1996     pthread_mutex_lock (&connection_threads_lock);
1997   }
1998   pthread_mutex_unlock (&connection_threads_lock);
1999
2000   return (NULL);
2001 } /* }}} void *listen_thread_main */
2002
2003 static int daemonize (void) /* {{{ */
2004 {
2005   int status;
2006   int fd;
2007   char *base_dir;
2008
2009   fd = open_pidfile();
2010   if (fd < 0) return fd;
2011
2012   if (!stay_foreground)
2013   {
2014     pid_t child;
2015
2016     child = fork ();
2017     if (child < 0)
2018     {
2019       fprintf (stderr, "daemonize: fork(2) failed.\n");
2020       return (-1);
2021     }
2022     else if (child > 0)
2023     {
2024       return (1);
2025     }
2026
2027     /* Become session leader */
2028     setsid ();
2029
2030     /* Open the first three file descriptors to /dev/null */
2031     close (2);
2032     close (1);
2033     close (0);
2034
2035     open ("/dev/null", O_RDWR);
2036     dup (0);
2037     dup (0);
2038   } /* if (!stay_foreground) */
2039
2040   /* Change into the /tmp directory. */
2041   base_dir = (config_base_dir != NULL)
2042     ? config_base_dir
2043     : "/tmp";
2044   status = chdir (base_dir);
2045   if (status != 0)
2046   {
2047     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2048     return (-1);
2049   }
2050
2051   install_signal_handlers();
2052
2053   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2054   RRDD_LOG(LOG_INFO, "starting up");
2055
2056   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2057   if (cache_tree == NULL)
2058   {
2059     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2060     return (-1);
2061   }
2062
2063   status = write_pidfile (fd);
2064   return status;
2065 } /* }}} int daemonize */
2066
2067 static int cleanup (void) /* {{{ */
2068 {
2069   do_shutdown++;
2070
2071   pthread_cond_signal (&cache_cond);
2072   pthread_join (queue_thread, /* return = */ NULL);
2073
2074   remove_pidfile ();
2075
2076   RRDD_LOG(LOG_INFO, "goodbye");
2077   closelog ();
2078
2079   return (0);
2080 } /* }}} int cleanup */
2081
2082 static int read_options (int argc, char **argv) /* {{{ */
2083 {
2084   int option;
2085   int status = 0;
2086
2087   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2088   {
2089     switch (option)
2090     {
2091       case 'g':
2092         stay_foreground=1;
2093         break;
2094
2095       case 'L':
2096       case 'l':
2097       {
2098         listen_socket_t **temp;
2099         listen_socket_t *new;
2100
2101         new = malloc(sizeof(listen_socket_t));
2102         if (new == NULL)
2103         {
2104           fprintf(stderr, "read_options: malloc failed.\n");
2105           return(2);
2106         }
2107         memset(new, 0, sizeof(listen_socket_t));
2108
2109         temp = (listen_socket_t **) realloc (config_listen_address_list,
2110             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2111         if (temp == NULL)
2112         {
2113           fprintf (stderr, "read_options: realloc failed.\n");
2114           return (2);
2115         }
2116         config_listen_address_list = temp;
2117
2118         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2119         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2120
2121         temp[config_listen_address_list_len] = new;
2122         config_listen_address_list_len++;
2123       }
2124       break;
2125
2126       case 'f':
2127       {
2128         int temp;
2129
2130         temp = atoi (optarg);
2131         if (temp > 0)
2132           config_flush_interval = temp;
2133         else
2134         {
2135           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2136           status = 3;
2137         }
2138       }
2139       break;
2140
2141       case 'w':
2142       {
2143         int temp;
2144
2145         temp = atoi (optarg);
2146         if (temp > 0)
2147           config_write_interval = temp;
2148         else
2149         {
2150           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2151           status = 2;
2152         }
2153       }
2154       break;
2155
2156       case 'z':
2157       {
2158         int temp;
2159
2160         temp = atoi(optarg);
2161         if (temp > 0)
2162           config_write_jitter = temp;
2163         else
2164         {
2165           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2166           status = 2;
2167         }
2168
2169         break;
2170       }
2171
2172       case 'B':
2173         config_write_base_only = 1;
2174         break;
2175
2176       case 'b':
2177       {
2178         size_t len;
2179
2180         if (config_base_dir != NULL)
2181           free (config_base_dir);
2182         config_base_dir = strdup (optarg);
2183         if (config_base_dir == NULL)
2184         {
2185           fprintf (stderr, "read_options: strdup failed.\n");
2186           return (3);
2187         }
2188
2189         len = strlen (config_base_dir);
2190         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2191         {
2192           config_base_dir[len - 1] = 0;
2193           len--;
2194         }
2195
2196         if (len < 1)
2197         {
2198           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2199           return (4);
2200         }
2201
2202         _config_base_dir_len = len;
2203       }
2204       break;
2205
2206       case 'p':
2207       {
2208         if (config_pid_file != NULL)
2209           free (config_pid_file);
2210         config_pid_file = strdup (optarg);
2211         if (config_pid_file == NULL)
2212         {
2213           fprintf (stderr, "read_options: strdup failed.\n");
2214           return (3);
2215         }
2216       }
2217       break;
2218
2219       case 'F':
2220         config_flush_at_shutdown = 1;
2221         break;
2222
2223       case 'j':
2224       {
2225         struct stat statbuf;
2226         const char *dir = optarg;
2227
2228         status = stat(dir, &statbuf);
2229         if (status != 0)
2230         {
2231           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2232           return 6;
2233         }
2234
2235         if (!S_ISDIR(statbuf.st_mode)
2236             || access(dir, R_OK|W_OK|X_OK) != 0)
2237         {
2238           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2239                   errno ? rrd_strerror(errno) : "");
2240           return 6;
2241         }
2242
2243         journal_cur = malloc(PATH_MAX + 1);
2244         journal_old = malloc(PATH_MAX + 1);
2245         if (journal_cur == NULL || journal_old == NULL)
2246         {
2247           fprintf(stderr, "malloc failure for journal files\n");
2248           return 6;
2249         }
2250         else 
2251         {
2252           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2253           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2254         }
2255       }
2256       break;
2257
2258       case 'h':
2259       case '?':
2260         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2261             "\n"
2262             "Usage: rrdcached [options]\n"
2263             "\n"
2264             "Valid options are:\n"
2265             "  -l <address>  Socket address to listen to.\n"
2266             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2267             "  -w <seconds>  Interval in which to write data.\n"
2268             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2269             "  -f <seconds>  Interval in which to flush dead data.\n"
2270             "  -p <file>     Location of the PID-file.\n"
2271             "  -b <dir>      Base directory to change to.\n"
2272             "  -B            Restrict file access to paths within -b <dir>\n"
2273             "  -g            Do not fork and run in the foreground.\n"
2274             "  -j <dir>      Directory in which to create the journal files.\n"
2275             "  -F            Always flush all updates at shutdown\n"
2276             "\n"
2277             "For more information and a detailed description of all options "
2278             "please refer\n"
2279             "to the rrdcached(1) manual page.\n",
2280             VERSION);
2281         status = -1;
2282         break;
2283     } /* switch (option) */
2284   } /* while (getopt) */
2285
2286   /* advise the user when values are not sane */
2287   if (config_flush_interval < 2 * config_write_interval)
2288     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2289             " 2x write interval (-w) !\n");
2290   if (config_write_jitter > config_write_interval)
2291     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2292             " write interval (-w) !\n");
2293
2294   if (config_write_base_only && config_base_dir == NULL)
2295     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2296             "  Consult the rrdcached documentation\n");
2297
2298   if (journal_cur == NULL)
2299     config_flush_at_shutdown = 1;
2300
2301   return (status);
2302 } /* }}} int read_options */
2303
2304 int main (int argc, char **argv)
2305 {
2306   int status;
2307
2308   status = read_options (argc, argv);
2309   if (status != 0)
2310   {
2311     if (status < 0)
2312       status = 0;
2313     return (status);
2314   }
2315
2316   status = daemonize ();
2317   if (status == 1)
2318   {
2319     struct sigaction sigchld;
2320
2321     memset (&sigchld, 0, sizeof (sigchld));
2322     sigchld.sa_handler = SIG_IGN;
2323     sigaction (SIGCHLD, &sigchld, NULL);
2324
2325     return (0);
2326   }
2327   else if (status != 0)
2328   {
2329     fprintf (stderr, "daemonize failed, exiting.\n");
2330     return (1);
2331   }
2332
2333   if (journal_cur != NULL)
2334   {
2335     int had_journal = 0;
2336
2337     pthread_mutex_lock(&journal_lock);
2338
2339     RRDD_LOG(LOG_INFO, "checking for journal files");
2340
2341     had_journal += journal_replay(journal_old);
2342     had_journal += journal_replay(journal_cur);
2343
2344     if (had_journal)
2345       flush_old_values(-1);
2346
2347     pthread_mutex_unlock(&journal_lock);
2348     journal_rotate();
2349
2350     RRDD_LOG(LOG_INFO, "journal processing complete");
2351   }
2352
2353   /* start the queue thread */
2354   memset (&queue_thread, 0, sizeof (queue_thread));
2355   status = pthread_create (&queue_thread,
2356                            NULL, /* attr */
2357                            queue_thread_main,
2358                            NULL); /* args */
2359   if (status != 0)
2360   {
2361     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2362     cleanup();
2363     return (1);
2364   }
2365
2366   listen_thread_main (NULL);
2367   cleanup ();
2368
2369   return (0);
2370 } /* int main */
2371
2372 /*
2373  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2374  */