This patch introduces "BATCH" mode.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
148
149 struct callback_flush_data_s
150 {
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
157
158 enum queue_side_e
159 {
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
164
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
168
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173
174 static listen_socket_t *listen_fds = NULL;
175 static size_t listen_fds_num = 0;
176
177 static int do_shutdown = 0;
178
179 static pthread_t queue_thread;
180
181 static pthread_t *connection_threads = NULL;
182 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
183 static int connection_threads_num = 0;
184
185 /* Cache stuff */
186 static GTree          *cache_tree = NULL;
187 static cache_item_t   *cache_queue_head = NULL;
188 static cache_item_t   *cache_queue_tail = NULL;
189 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
190 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
191
192 static int config_write_interval = 300;
193 static int config_write_jitter   = 0;
194 static int config_flush_interval = 3600;
195 static int config_flush_at_shutdown = 0;
196 static char *config_pid_file = NULL;
197 static char *config_base_dir = NULL;
198 static size_t _config_base_dir_len = 0;
199 static int config_write_base_only = 0;
200
201 static listen_socket_t **config_listen_address_list = NULL;
202 static int config_listen_address_list_len = 0;
203
204 static uint64_t stats_queue_length = 0;
205 static uint64_t stats_updates_received = 0;
206 static uint64_t stats_flush_received = 0;
207 static uint64_t stats_updates_written = 0;
208 static uint64_t stats_data_sets_written = 0;
209 static uint64_t stats_journal_bytes = 0;
210 static uint64_t stats_journal_rotate = 0;
211 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
212
213 /* Journaled updates */
214 static char *journal_cur = NULL;
215 static char *journal_old = NULL;
216 static FILE *journal_fh = NULL;
217 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
218 static int journal_write(char *cmd, char *args);
219 static void journal_done(void);
220 static void journal_rotate(void);
221
222 /* 
223  * Functions
224  */
225 static void sig_common (const char *sig) /* {{{ */
226 {
227   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
228   do_shutdown++;
229   pthread_cond_broadcast(&cache_cond);
230 } /* }}} void sig_common */
231
232 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
233 {
234   sig_common("INT");
235 } /* }}} void sig_int_handler */
236
237 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
238 {
239   sig_common("TERM");
240 } /* }}} void sig_term_handler */
241
242 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
243 {
244   config_flush_at_shutdown = 1;
245   sig_common("USR1");
246 } /* }}} void sig_usr1_handler */
247
248 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   config_flush_at_shutdown = 0;
251   sig_common("USR2");
252 } /* }}} void sig_usr2_handler */
253
254 static void install_signal_handlers(void) /* {{{ */
255 {
256   /* These structures are static, because `sigaction' behaves weird if the are
257    * overwritten.. */
258   static struct sigaction sa_int;
259   static struct sigaction sa_term;
260   static struct sigaction sa_pipe;
261   static struct sigaction sa_usr1;
262   static struct sigaction sa_usr2;
263
264   /* Install signal handlers */
265   memset (&sa_int, 0, sizeof (sa_int));
266   sa_int.sa_handler = sig_int_handler;
267   sigaction (SIGINT, &sa_int, NULL);
268
269   memset (&sa_term, 0, sizeof (sa_term));
270   sa_term.sa_handler = sig_term_handler;
271   sigaction (SIGTERM, &sa_term, NULL);
272
273   memset (&sa_pipe, 0, sizeof (sa_pipe));
274   sa_pipe.sa_handler = SIG_IGN;
275   sigaction (SIGPIPE, &sa_pipe, NULL);
276
277   memset (&sa_pipe, 0, sizeof (sa_usr1));
278   sa_usr1.sa_handler = sig_usr1_handler;
279   sigaction (SIGUSR1, &sa_usr1, NULL);
280
281   memset (&sa_usr2, 0, sizeof (sa_usr2));
282   sa_usr2.sa_handler = sig_usr2_handler;
283   sigaction (SIGUSR2, &sa_usr2, NULL);
284
285 } /* }}} void install_signal_handlers */
286
287 static int open_pidfile(void) /* {{{ */
288 {
289   int fd;
290   char *file;
291
292   file = (config_pid_file != NULL)
293     ? config_pid_file
294     : LOCALSTATEDIR "/run/rrdcached.pid";
295
296   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
297   if (fd < 0)
298     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
299             file, rrd_strerror(errno));
300
301   return(fd);
302 } /* }}} static int open_pidfile */
303
304 static int write_pidfile (int fd) /* {{{ */
305 {
306   pid_t pid;
307   FILE *fh;
308
309   pid = getpid ();
310
311   fh = fdopen (fd, "w");
312   if (fh == NULL)
313   {
314     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
315     close(fd);
316     return (-1);
317   }
318
319   fprintf (fh, "%i\n", (int) pid);
320   fclose (fh);
321
322   return (0);
323 } /* }}} int write_pidfile */
324
325 static int remove_pidfile (void) /* {{{ */
326 {
327   char *file;
328   int status;
329
330   file = (config_pid_file != NULL)
331     ? config_pid_file
332     : LOCALSTATEDIR "/run/rrdcached.pid";
333
334   status = unlink (file);
335   if (status == 0)
336     return (0);
337   return (errno);
338 } /* }}} int remove_pidfile */
339
340 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
341 {
342   char *eol;
343
344   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
345                sock->next_read - sock->next_cmd);
346
347   if (eol == NULL)
348   {
349     /* no commands left, move remainder back to front of rbuf */
350     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
351             sock->next_read - sock->next_cmd);
352     sock->next_read -= sock->next_cmd;
353     sock->next_cmd = 0;
354     *len = 0;
355     return NULL;
356   }
357   else
358   {
359     char *cmd = sock->rbuf + sock->next_cmd;
360     *eol = '\0';
361
362     sock->next_cmd = eol - sock->rbuf + 1;
363
364     if (eol > sock->rbuf && *(eol-1) == '\r')
365       *(--eol) = '\0'; /* handle "\r\n" EOL */
366
367     *len = eol - cmd;
368
369     return cmd;
370   }
371
372   /* NOTREACHED */
373   assert(1==0);
374 }
375
376 /* add the characters directly to the write buffer */
377 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
378 {
379   char *new_buf;
380
381   assert(sock != NULL);
382
383   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
384   if (new_buf == NULL)
385   {
386     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
387     return -1;
388   }
389
390   strncpy(new_buf + sock->wbuf_len, str, len + 1);
391
392   sock->wbuf = new_buf;
393   sock->wbuf_len += len;
394
395   return 0;
396 } /* }}} static int add_to_wbuf */
397
398 /* add the text to the "extra" info that's sent after the status line */
399 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
400 {
401   va_list argp;
402   char buffer[CMD_MAX];
403   int len;
404
405   if (sock == NULL) return 0; /* journal replay mode */
406   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
407
408   va_start(argp, fmt);
409 #ifdef HAVE_VSNPRINTF
410   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
411 #else
412   len = vsprintf(buffer, fmt, argp);
413 #endif
414   va_end(argp);
415   if (len < 0)
416   {
417     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
418     return -1;
419   }
420
421   return add_to_wbuf(sock, buffer, len);
422 } /* }}} static int add_response_info */
423
424 static int count_lines(char *str) /* {{{ */
425 {
426   int lines = 0;
427
428   if (str != NULL)
429   {
430     while ((str = strchr(str, '\n')) != NULL)
431     {
432       ++lines;
433       ++str;
434     }
435   }
436
437   return lines;
438 } /* }}} static int count_lines */
439
440 /* send the response back to the user.
441  * returns 0 on success, -1 on error
442  * write buffer is always zeroed after this call */
443 static int send_response (listen_socket_t *sock, response_code rc,
444                           char *fmt, ...) /* {{{ */
445 {
446   va_list argp;
447   char buffer[CMD_MAX];
448   int lines;
449   ssize_t wrote;
450   int rclen, len;
451
452   if (sock == NULL) return rc;  /* journal replay mode */
453
454   if (sock->batch_mode)
455   {
456     if (rc == RESP_OK)
457       return rc; /* no response on success during BATCH */
458     lines = sock->batch_cmd;
459   }
460   else if (rc == RESP_OK)
461     lines = count_lines(sock->wbuf);
462   else
463     lines = -1;
464
465   rclen = sprintf(buffer, "%d ", lines);
466   va_start(argp, fmt);
467 #ifdef HAVE_VSNPRINTF
468   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
469 #else
470   len = vsprintf(buffer+rclen, fmt, argp);
471 #endif
472   va_end(argp);
473   if (len < 0)
474     return -1;
475
476   len += rclen;
477
478   /* append the result to the wbuf, don't write to the user */
479   if (sock->batch_mode)
480     return add_to_wbuf(sock, buffer, len);
481
482   /* first write must be complete */
483   if (len != write(sock->fd, buffer, len))
484   {
485     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
486     return -1;
487   }
488
489   if (sock->wbuf != NULL)
490   {
491     wrote = 0;
492     while (wrote < sock->wbuf_len)
493     {
494       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
495       if (wb <= 0)
496       {
497         RRDD_LOG(LOG_INFO, "send_response: could not write results");
498         return -1;
499       }
500       wrote += wb;
501     }
502   }
503
504   free(sock->wbuf); sock->wbuf = NULL;
505   sock->wbuf_len = 0;
506
507   return 0;
508 } /* }}} */
509
510 static void wipe_ci_values(cache_item_t *ci, time_t when)
511 {
512   ci->values = NULL;
513   ci->values_num = 0;
514
515   ci->last_flush_time = when;
516   if (config_write_jitter > 0)
517     ci->last_flush_time += (random() % config_write_jitter);
518 }
519
520 /* remove_from_queue
521  * remove a "cache_item_t" item from the queue.
522  * must hold 'cache_lock' when calling this
523  */
524 static void remove_from_queue(cache_item_t *ci) /* {{{ */
525 {
526   if (ci == NULL) return;
527
528   if (ci->prev == NULL)
529     cache_queue_head = ci->next; /* reset head */
530   else
531     ci->prev->next = ci->next;
532
533   if (ci->next == NULL)
534     cache_queue_tail = ci->prev; /* reset the tail */
535   else
536     ci->next->prev = ci->prev;
537
538   ci->next = ci->prev = NULL;
539   ci->flags &= ~CI_FLAGS_IN_QUEUE;
540 } /* }}} static void remove_from_queue */
541
542 /*
543  * enqueue_cache_item:
544  * `cache_lock' must be acquired before calling this function!
545  */
546 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
547     queue_side_t side)
548 {
549   if (ci == NULL)
550     return (-1);
551
552   if (ci->values_num == 0)
553     return (0);
554
555   if (side == HEAD)
556   {
557     if (cache_queue_head == ci)
558       return 0;
559
560     /* remove from the double linked list */
561     if (ci->flags & CI_FLAGS_IN_QUEUE)
562       remove_from_queue(ci);
563
564     ci->prev = NULL;
565     ci->next = cache_queue_head;
566     if (ci->next != NULL)
567       ci->next->prev = ci;
568     cache_queue_head = ci;
569
570     if (cache_queue_tail == NULL)
571       cache_queue_tail = cache_queue_head;
572   }
573   else /* (side == TAIL) */
574   {
575     /* We don't move values back in the list.. */
576     if (ci->flags & CI_FLAGS_IN_QUEUE)
577       return (0);
578
579     assert (ci->next == NULL);
580     assert (ci->prev == NULL);
581
582     ci->prev = cache_queue_tail;
583
584     if (cache_queue_tail == NULL)
585       cache_queue_head = ci;
586     else
587       cache_queue_tail->next = ci;
588
589     cache_queue_tail = ci;
590   }
591
592   ci->flags |= CI_FLAGS_IN_QUEUE;
593
594   pthread_cond_broadcast(&cache_cond);
595   pthread_mutex_lock (&stats_lock);
596   stats_queue_length++;
597   pthread_mutex_unlock (&stats_lock);
598
599   return (0);
600 } /* }}} int enqueue_cache_item */
601
602 /*
603  * tree_callback_flush:
604  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
605  * while this is in progress.
606  */
607 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
608     gpointer data)
609 {
610   cache_item_t *ci;
611   callback_flush_data_t *cfd;
612
613   ci = (cache_item_t *) value;
614   cfd = (callback_flush_data_t *) data;
615
616   if ((ci->last_flush_time <= cfd->abs_timeout)
617       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
618       && (ci->values_num > 0))
619   {
620     enqueue_cache_item (ci, TAIL);
621   }
622   else if ((do_shutdown != 0)
623       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
624       && (ci->values_num > 0))
625   {
626     enqueue_cache_item (ci, TAIL);
627   }
628   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
629       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
630       && (ci->values_num <= 0))
631   {
632     char **temp;
633
634     temp = (char **) realloc (cfd->keys,
635         sizeof (char *) * (cfd->keys_num + 1));
636     if (temp == NULL)
637     {
638       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
639       return (FALSE);
640     }
641     cfd->keys = temp;
642     /* Make really sure this points to the _same_ place */
643     assert ((char *) key == ci->file);
644     cfd->keys[cfd->keys_num] = (char *) key;
645     cfd->keys_num++;
646   }
647
648   return (FALSE);
649 } /* }}} gboolean tree_callback_flush */
650
651 static int flush_old_values (int max_age)
652 {
653   callback_flush_data_t cfd;
654   size_t k;
655
656   memset (&cfd, 0, sizeof (cfd));
657   /* Pass the current time as user data so that we don't need to call
658    * `time' for each node. */
659   cfd.now = time (NULL);
660   cfd.keys = NULL;
661   cfd.keys_num = 0;
662
663   if (max_age > 0)
664     cfd.abs_timeout = cfd.now - max_age;
665   else
666     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
667
668   /* `tree_callback_flush' will return the keys of all values that haven't
669    * been touched in the last `config_flush_interval' seconds in `cfd'.
670    * The char*'s in this array point to the same memory as ci->file, so we
671    * don't need to free them separately. */
672   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
673
674   for (k = 0; k < cfd.keys_num; k++)
675   {
676     cache_item_t *ci;
677
678     /* This must not fail. */
679     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
680     assert (ci != NULL);
681
682     /* If we end up here with values available, something's seriously
683      * messed up. */
684     assert (ci->values_num == 0);
685
686     /* Remove the node from the tree */
687     g_tree_remove (cache_tree, cfd.keys[k]);
688     cfd.keys[k] = NULL;
689
690     /* Now free and clean up `ci'. */
691     free (ci->file);
692     ci->file = NULL;
693     free (ci);
694     ci = NULL;
695   } /* for (k = 0; k < cfd.keys_num; k++) */
696
697   if (cfd.keys != NULL)
698   {
699     free (cfd.keys);
700     cfd.keys = NULL;
701   }
702
703   return (0);
704 } /* int flush_old_values */
705
706 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
707 {
708   struct timeval now;
709   struct timespec next_flush;
710   int final_flush = 0; /* make sure we only flush once on shutdown */
711
712   gettimeofday (&now, NULL);
713   next_flush.tv_sec = now.tv_sec + config_flush_interval;
714   next_flush.tv_nsec = 1000 * now.tv_usec;
715
716   pthread_mutex_lock (&cache_lock);
717   while ((do_shutdown == 0) || (cache_queue_head != NULL))
718   {
719     cache_item_t *ci;
720     char *file;
721     char **values;
722     int values_num;
723     int status;
724     int i;
725
726     /* First, check if it's time to do the cache flush. */
727     gettimeofday (&now, NULL);
728     if ((now.tv_sec > next_flush.tv_sec)
729         || ((now.tv_sec == next_flush.tv_sec)
730           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
731     {
732       /* Flush all values that haven't been written in the last
733        * `config_write_interval' seconds. */
734       flush_old_values (config_write_interval);
735
736       /* Determine the time of the next cache flush. */
737       while (next_flush.tv_sec <= now.tv_sec)
738         next_flush.tv_sec += config_flush_interval;
739
740       /* unlock the cache while we rotate so we don't block incoming
741        * updates if the fsync() blocks on disk I/O */
742       pthread_mutex_unlock(&cache_lock);
743       journal_rotate();
744       pthread_mutex_lock(&cache_lock);
745     }
746
747     /* Now, check if there's something to store away. If not, wait until
748      * something comes in or it's time to do the cache flush.  if we are
749      * shutting down, do not wait around.  */
750     if (cache_queue_head == NULL && !do_shutdown)
751     {
752       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
753       if ((status != 0) && (status != ETIMEDOUT))
754       {
755         RRDD_LOG (LOG_ERR, "queue_thread_main: "
756             "pthread_cond_timedwait returned %i.", status);
757       }
758     }
759
760     /* We're about to shut down */
761     if (do_shutdown != 0 && !final_flush++)
762     {
763       if (config_flush_at_shutdown)
764         flush_old_values (-1); /* flush everything */
765       else
766         break;
767     }
768
769     /* Check if a value has arrived. This may be NULL if we timed out or there
770      * was an interrupt such as a signal. */
771     if (cache_queue_head == NULL)
772       continue;
773
774     ci = cache_queue_head;
775
776     /* copy the relevant parts */
777     file = strdup (ci->file);
778     if (file == NULL)
779     {
780       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
781       continue;
782     }
783
784     assert(ci->values != NULL);
785     assert(ci->values_num > 0);
786
787     values = ci->values;
788     values_num = ci->values_num;
789
790     wipe_ci_values(ci, time(NULL));
791     remove_from_queue(ci);
792
793     pthread_mutex_lock (&stats_lock);
794     assert (stats_queue_length > 0);
795     stats_queue_length--;
796     pthread_mutex_unlock (&stats_lock);
797
798     pthread_mutex_unlock (&cache_lock);
799
800     rrd_clear_error ();
801     status = rrd_update_r (file, NULL, values_num, (void *) values);
802     if (status != 0)
803     {
804       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
805           "rrd_update_r (%s) failed with status %i. (%s)",
806           file, status, rrd_get_error());
807     }
808
809     journal_write("wrote", file);
810     pthread_cond_broadcast(&ci->flushed);
811
812     for (i = 0; i < values_num; i++)
813       free (values[i]);
814
815     free(values);
816     free(file);
817
818     if (status == 0)
819     {
820       pthread_mutex_lock (&stats_lock);
821       stats_updates_written++;
822       stats_data_sets_written += values_num;
823       pthread_mutex_unlock (&stats_lock);
824     }
825
826     pthread_mutex_lock (&cache_lock);
827
828     /* We're about to shut down */
829     if (do_shutdown != 0 && !final_flush++)
830     {
831       if (config_flush_at_shutdown)
832           flush_old_values (-1); /* flush everything */
833       else
834         break;
835     }
836   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
837   pthread_mutex_unlock (&cache_lock);
838
839   if (config_flush_at_shutdown)
840   {
841     assert(cache_queue_head == NULL);
842     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
843   }
844
845   journal_done();
846
847   return (NULL);
848 } /* }}} void *queue_thread_main */
849
850 static int buffer_get_field (char **buffer_ret, /* {{{ */
851     size_t *buffer_size_ret, char **field_ret)
852 {
853   char *buffer;
854   size_t buffer_pos;
855   size_t buffer_size;
856   char *field;
857   size_t field_size;
858   int status;
859
860   buffer = *buffer_ret;
861   buffer_pos = 0;
862   buffer_size = *buffer_size_ret;
863   field = *buffer_ret;
864   field_size = 0;
865
866   if (buffer_size <= 0)
867     return (-1);
868
869   /* This is ensured by `handle_request'. */
870   assert (buffer[buffer_size - 1] == '\0');
871
872   status = -1;
873   while (buffer_pos < buffer_size)
874   {
875     /* Check for end-of-field or end-of-buffer */
876     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
877     {
878       field[field_size] = 0;
879       field_size++;
880       buffer_pos++;
881       status = 0;
882       break;
883     }
884     /* Handle escaped characters. */
885     else if (buffer[buffer_pos] == '\\')
886     {
887       if (buffer_pos >= (buffer_size - 1))
888         break;
889       buffer_pos++;
890       field[field_size] = buffer[buffer_pos];
891       field_size++;
892       buffer_pos++;
893     }
894     /* Normal operation */ 
895     else
896     {
897       field[field_size] = buffer[buffer_pos];
898       field_size++;
899       buffer_pos++;
900     }
901   } /* while (buffer_pos < buffer_size) */
902
903   if (status != 0)
904     return (status);
905
906   *buffer_ret = buffer + buffer_pos;
907   *buffer_size_ret = buffer_size - buffer_pos;
908   *field_ret = field;
909
910   return (0);
911 } /* }}} int buffer_get_field */
912
913 /* if we're restricting writes to the base directory,
914  * check whether the file falls within the dir
915  * returns 1 if OK, otherwise 0
916  */
917 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
918 {
919   assert(file != NULL);
920
921   if (!config_write_base_only
922       || sock == NULL /* journal replay */
923       || config_base_dir == NULL)
924     return 1;
925
926   if (strstr(file, "../") != NULL) goto err;
927
928   /* relative paths without "../" are ok */
929   if (*file != '/') return 1;
930
931   /* file must be of the format base + "/" + <1+ char filename> */
932   if (strlen(file) < _config_base_dir_len + 2) goto err;
933   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
934   if (*(file + _config_base_dir_len) != '/') goto err;
935
936   return 1;
937
938 err:
939   if (sock != NULL && sock->fd >= 0)
940     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
941
942   return 0;
943 } /* }}} static int check_file_access */
944
945 static int flush_file (const char *filename) /* {{{ */
946 {
947   cache_item_t *ci;
948
949   pthread_mutex_lock (&cache_lock);
950
951   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
952   if (ci == NULL)
953   {
954     pthread_mutex_unlock (&cache_lock);
955     return (ENOENT);
956   }
957
958   if (ci->values_num > 0)
959   {
960     /* Enqueue at head */
961     enqueue_cache_item (ci, HEAD);
962     pthread_cond_wait(&ci->flushed, &cache_lock);
963   }
964
965   pthread_mutex_unlock(&cache_lock);
966
967   return (0);
968 } /* }}} int flush_file */
969
970 static int handle_request_help (listen_socket_t *sock, /* {{{ */
971     char *buffer, size_t buffer_size)
972 {
973   int status;
974   char **help_text;
975   char *command;
976
977   char *help_help[2] =
978   {
979     "Command overview\n"
980     ,
981     "FLUSH <filename>\n"
982     "FLUSHALL\n"
983     "HELP [<command>]\n"
984     "UPDATE <filename> <values> [<values> ...]\n"
985     "BATCH\n"
986     "STATS\n"
987   };
988
989   char *help_flush[2] =
990   {
991     "Help for FLUSH\n"
992     ,
993     "Usage: FLUSH <filename>\n"
994     "\n"
995     "Adds the given filename to the head of the update queue and returns\n"
996     "after is has been dequeued.\n"
997   };
998
999   char *help_flushall[2] =
1000   {
1001     "Help for FLUSHALL\n"
1002     ,
1003     "Usage: FLUSHALL\n"
1004     "\n"
1005     "Triggers writing of all pending updates.  Returns immediately.\n"
1006   };
1007
1008   char *help_update[2] =
1009   {
1010     "Help for UPDATE\n"
1011     ,
1012     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1013     "\n"
1014     "Adds the given file to the internal cache if it is not yet known and\n"
1015     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1016     "for details.\n"
1017     "\n"
1018     "Each <values> has the following form:\n"
1019     "  <values> = <time>:<value>[:<value>[...]]\n"
1020     "See the rrdupdate(1) manpage for details.\n"
1021   };
1022
1023   char *help_stats[2] =
1024   {
1025     "Help for STATS\n"
1026     ,
1027     "Usage: STATS\n"
1028     "\n"
1029     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1030     "a description of the values.\n"
1031   };
1032
1033   char *help_batch[2] =
1034   {
1035     "Help for BATCH\n"
1036     ,
1037     "The 'BATCH' command permits the client to initiate a bulk load\n"
1038     "   of commands to rrdcached.\n"
1039     "\n"
1040     "Usage:\n"
1041     "\n"
1042     "    client: BATCH\n"
1043     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1044     "    client: command #1\n"
1045     "    client: command #2\n"
1046     "    client: ... and so on\n"
1047     "    client: .\n"
1048     "    server: 2 errors\n"
1049     "    server: 7 message for command #7\n"
1050     "    server: 9 message for command #9\n"
1051     "\n"
1052     "For more information, consult the rrdcached(1) documentation.\n"
1053   };
1054
1055   status = buffer_get_field (&buffer, &buffer_size, &command);
1056   if (status != 0)
1057     help_text = help_help;
1058   else
1059   {
1060     if (strcasecmp (command, "update") == 0)
1061       help_text = help_update;
1062     else if (strcasecmp (command, "flush") == 0)
1063       help_text = help_flush;
1064     else if (strcasecmp (command, "flushall") == 0)
1065       help_text = help_flushall;
1066     else if (strcasecmp (command, "stats") == 0)
1067       help_text = help_stats;
1068     else if (strcasecmp (command, "batch") == 0)
1069       help_text = help_batch;
1070     else
1071       help_text = help_help;
1072   }
1073
1074   add_response_info(sock, help_text[1]);
1075   return send_response(sock, RESP_OK, help_text[0]);
1076 } /* }}} int handle_request_help */
1077
1078 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1079 {
1080   uint64_t copy_queue_length;
1081   uint64_t copy_updates_received;
1082   uint64_t copy_flush_received;
1083   uint64_t copy_updates_written;
1084   uint64_t copy_data_sets_written;
1085   uint64_t copy_journal_bytes;
1086   uint64_t copy_journal_rotate;
1087
1088   uint64_t tree_nodes_number;
1089   uint64_t tree_depth;
1090
1091   pthread_mutex_lock (&stats_lock);
1092   copy_queue_length       = stats_queue_length;
1093   copy_updates_received   = stats_updates_received;
1094   copy_flush_received     = stats_flush_received;
1095   copy_updates_written    = stats_updates_written;
1096   copy_data_sets_written  = stats_data_sets_written;
1097   copy_journal_bytes      = stats_journal_bytes;
1098   copy_journal_rotate     = stats_journal_rotate;
1099   pthread_mutex_unlock (&stats_lock);
1100
1101   pthread_mutex_lock (&cache_lock);
1102   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1103   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1104   pthread_mutex_unlock (&cache_lock);
1105
1106   add_response_info(sock,
1107                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1108   add_response_info(sock,
1109                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1110   add_response_info(sock,
1111                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1112   add_response_info(sock,
1113                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1114   add_response_info(sock,
1115                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1116   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1117   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1118   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1119   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1120
1121   send_response(sock, RESP_OK, "Statistics follow\n");
1122
1123   return (0);
1124 } /* }}} int handle_request_stats */
1125
1126 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1127     char *buffer, size_t buffer_size)
1128 {
1129   char *file;
1130   int status;
1131
1132   status = buffer_get_field (&buffer, &buffer_size, &file);
1133   if (status != 0)
1134   {
1135     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1136   }
1137   else
1138   {
1139     pthread_mutex_lock(&stats_lock);
1140     stats_flush_received++;
1141     pthread_mutex_unlock(&stats_lock);
1142
1143     if (!check_file_access(file, sock)) return 0;
1144
1145     status = flush_file (file);
1146     if (status == 0)
1147       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1148     else if (status == ENOENT)
1149     {
1150       /* no file in our tree; see whether it exists at all */
1151       struct stat statbuf;
1152
1153       memset(&statbuf, 0, sizeof(statbuf));
1154       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1155         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1156       else
1157         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1158     }
1159     else if (status < 0)
1160       return send_response(sock, RESP_ERR, "Internal error.\n");
1161     else
1162       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1163   }
1164
1165   /* NOTREACHED */
1166   assert(1==0);
1167 } /* }}} int handle_request_slurp */
1168
1169 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1170 {
1171
1172   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1173
1174   pthread_mutex_lock(&cache_lock);
1175   flush_old_values(-1);
1176   pthread_mutex_unlock(&cache_lock);
1177
1178   return send_response(sock, RESP_OK, "Started flush.\n");
1179 } /* }}} static int handle_request_flushall */
1180
1181 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1182     char *buffer, size_t buffer_size)
1183 {
1184   char *file;
1185   int values_num = 0;
1186   int status;
1187
1188   time_t now;
1189   cache_item_t *ci;
1190
1191   now = time (NULL);
1192
1193   status = buffer_get_field (&buffer, &buffer_size, &file);
1194   if (status != 0)
1195     return send_response(sock, RESP_ERR,
1196                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1197
1198   pthread_mutex_lock(&stats_lock);
1199   stats_updates_received++;
1200   pthread_mutex_unlock(&stats_lock);
1201
1202   if (!check_file_access(file, sock)) return 0;
1203
1204   pthread_mutex_lock (&cache_lock);
1205   ci = g_tree_lookup (cache_tree, file);
1206
1207   if (ci == NULL) /* {{{ */
1208   {
1209     struct stat statbuf;
1210
1211     /* don't hold the lock while we setup; stat(2) might block */
1212     pthread_mutex_unlock(&cache_lock);
1213
1214     memset (&statbuf, 0, sizeof (statbuf));
1215     status = stat (file, &statbuf);
1216     if (status != 0)
1217     {
1218       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1219
1220       status = errno;
1221       if (status == ENOENT)
1222         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1223       else
1224         return send_response(sock, RESP_ERR,
1225                              "stat failed with error %i.\n", status);
1226     }
1227     if (!S_ISREG (statbuf.st_mode))
1228       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1229
1230     if (access(file, R_OK|W_OK) != 0)
1231       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1232                            file, rrd_strerror(errno));
1233
1234     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1235     if (ci == NULL)
1236     {
1237       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1238
1239       return send_response(sock, RESP_ERR, "malloc failed.\n");
1240     }
1241     memset (ci, 0, sizeof (cache_item_t));
1242
1243     ci->file = strdup (file);
1244     if (ci->file == NULL)
1245     {
1246       free (ci);
1247       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1248
1249       return send_response(sock, RESP_ERR, "strdup failed.\n");
1250     }
1251
1252     wipe_ci_values(ci, now);
1253     ci->flags = CI_FLAGS_IN_TREE;
1254
1255     pthread_mutex_lock(&cache_lock);
1256     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1257   } /* }}} */
1258   assert (ci != NULL);
1259
1260   while (buffer_size > 0)
1261   {
1262     char **temp;
1263     char *value;
1264
1265     status = buffer_get_field (&buffer, &buffer_size, &value);
1266     if (status != 0)
1267     {
1268       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1269       break;
1270     }
1271
1272     temp = (char **) realloc (ci->values,
1273         sizeof (char *) * (ci->values_num + 1));
1274     if (temp == NULL)
1275     {
1276       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1277       continue;
1278     }
1279     ci->values = temp;
1280
1281     ci->values[ci->values_num] = strdup (value);
1282     if (ci->values[ci->values_num] == NULL)
1283     {
1284       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1285       continue;
1286     }
1287     ci->values_num++;
1288
1289     values_num++;
1290   }
1291
1292   if (((now - ci->last_flush_time) >= config_write_interval)
1293       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1294       && (ci->values_num > 0))
1295   {
1296     enqueue_cache_item (ci, TAIL);
1297   }
1298
1299   pthread_mutex_unlock (&cache_lock);
1300
1301   if (values_num < 1)
1302     return send_response(sock, RESP_ERR, "No values updated.\n");
1303   else
1304     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1305
1306   /* NOTREACHED */
1307   assert(1==0);
1308
1309 } /* }}} int handle_request_update */
1310
1311 /* we came across a "WROTE" entry during journal replay.
1312  * throw away any values that we have accumulated for this file
1313  */
1314 static int handle_request_wrote (const char *buffer) /* {{{ */
1315 {
1316   int i;
1317   cache_item_t *ci;
1318   const char *file = buffer;
1319
1320   pthread_mutex_lock(&cache_lock);
1321
1322   ci = g_tree_lookup(cache_tree, file);
1323   if (ci == NULL)
1324   {
1325     pthread_mutex_unlock(&cache_lock);
1326     return (0);
1327   }
1328
1329   if (ci->values)
1330   {
1331     for (i=0; i < ci->values_num; i++)
1332       free(ci->values[i]);
1333
1334     free(ci->values);
1335   }
1336
1337   wipe_ci_values(ci, time(NULL));
1338   remove_from_queue(ci);
1339
1340   pthread_mutex_unlock(&cache_lock);
1341   return (0);
1342 } /* }}} int handle_request_wrote */
1343
1344 /* start "BATCH" processing */
1345 static int batch_start (listen_socket_t *sock) /* {{{ */
1346 {
1347   int status;
1348   if (sock->batch_mode)
1349     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1350
1351   status = send_response(sock, RESP_OK,
1352                          "Go ahead.  End with dot '.' on its own line.\n");
1353   sock->batch_mode = 1;
1354   sock->batch_cmd = 0;
1355
1356   return status;
1357 } /* }}} static int batch_start */
1358
1359 /* finish "BATCH" processing and return results to the client */
1360 static int batch_done (listen_socket_t *sock) /* {{{ */
1361 {
1362   assert(sock->batch_mode);
1363   sock->batch_mode = 0;
1364   sock->batch_cmd  = 0;
1365   return send_response(sock, RESP_OK, "errors\n");
1366 } /* }}} static int batch_done */
1367
1368 /* returns 1 if we have the required privilege level */
1369 static int has_privilege (listen_socket_t *sock, /* {{{ */
1370                           socket_privilege priv)
1371 {
1372   if (sock == NULL) /* journal replay */
1373     return 1;
1374
1375   if (sock->privilege >= priv)
1376     return 1;
1377
1378   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1379 } /* }}} static int has_privilege */
1380
1381 /* if sock==NULL, we are in journal replay mode */
1382 static int handle_request (listen_socket_t *sock, /* {{{ */
1383                            char *buffer, size_t buffer_size)
1384 {
1385   char *buffer_ptr;
1386   char *command;
1387   int status;
1388
1389   assert (buffer[buffer_size - 1] == '\0');
1390
1391   buffer_ptr = buffer;
1392   command = NULL;
1393   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1394   if (status != 0)
1395   {
1396     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1397     return (-1);
1398   }
1399
1400   if (sock != NULL && sock->batch_mode)
1401     sock->batch_cmd++;
1402
1403   if (strcasecmp (command, "update") == 0)
1404   {
1405     status = has_privilege(sock, PRIV_HIGH);
1406     if (status <= 0)
1407       return status;
1408
1409     /* don't re-write updates in replay mode */
1410     if (sock != NULL)
1411       journal_write(command, buffer_ptr);
1412
1413     return (handle_request_update (sock, buffer_ptr, buffer_size));
1414   }
1415   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1416   {
1417     /* this is only valid in replay mode */
1418     return (handle_request_wrote (buffer_ptr));
1419   }
1420   else if (strcasecmp (command, "flush") == 0)
1421     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1422   else if (strcasecmp (command, "flushall") == 0)
1423   {
1424     status = has_privilege(sock, PRIV_HIGH);
1425     if (status <= 0)
1426       return status;
1427
1428     return (handle_request_flushall(sock));
1429   }
1430   else if (strcasecmp (command, "stats") == 0)
1431     return (handle_request_stats (sock));
1432   else if (strcasecmp (command, "help") == 0)
1433     return (handle_request_help (sock, buffer_ptr, buffer_size));
1434   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1435     return batch_start(sock);
1436   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1437     return batch_done(sock);
1438   else
1439     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1440
1441   /* NOTREACHED */
1442   assert(1==0);
1443 } /* }}} int handle_request */
1444
1445 /* MUST NOT hold journal_lock before calling this */
1446 static void journal_rotate(void) /* {{{ */
1447 {
1448   FILE *old_fh = NULL;
1449
1450   if (journal_cur == NULL || journal_old == NULL)
1451     return;
1452
1453   pthread_mutex_lock(&journal_lock);
1454
1455   /* we rotate this way (rename before close) so that the we can release
1456    * the journal lock as fast as possible.  Journal writes to the new
1457    * journal can proceed immediately after the new file is opened.  The
1458    * fclose can then block without affecting new updates.
1459    */
1460   if (journal_fh != NULL)
1461   {
1462     old_fh = journal_fh;
1463     rename(journal_cur, journal_old);
1464     ++stats_journal_rotate;
1465   }
1466
1467   journal_fh = fopen(journal_cur, "a");
1468   pthread_mutex_unlock(&journal_lock);
1469
1470   if (old_fh != NULL)
1471     fclose(old_fh);
1472
1473   if (journal_fh == NULL)
1474   {
1475     RRDD_LOG(LOG_CRIT,
1476              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1477              journal_cur, rrd_strerror(errno));
1478
1479     RRDD_LOG(LOG_ERR,
1480              "JOURNALING DISABLED: All values will be flushed at shutdown");
1481     config_flush_at_shutdown = 1;
1482   }
1483
1484 } /* }}} static void journal_rotate */
1485
1486 static void journal_done(void) /* {{{ */
1487 {
1488   if (journal_cur == NULL)
1489     return;
1490
1491   pthread_mutex_lock(&journal_lock);
1492   if (journal_fh != NULL)
1493   {
1494     fclose(journal_fh);
1495     journal_fh = NULL;
1496   }
1497
1498   if (config_flush_at_shutdown)
1499   {
1500     RRDD_LOG(LOG_INFO, "removing journals");
1501     unlink(journal_old);
1502     unlink(journal_cur);
1503   }
1504   else
1505   {
1506     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1507              "journals will be used at next startup");
1508   }
1509
1510   pthread_mutex_unlock(&journal_lock);
1511
1512 } /* }}} static void journal_done */
1513
1514 static int journal_write(char *cmd, char *args) /* {{{ */
1515 {
1516   int chars;
1517
1518   if (journal_fh == NULL)
1519     return 0;
1520
1521   pthread_mutex_lock(&journal_lock);
1522   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1523   pthread_mutex_unlock(&journal_lock);
1524
1525   if (chars > 0)
1526   {
1527     pthread_mutex_lock(&stats_lock);
1528     stats_journal_bytes += chars;
1529     pthread_mutex_unlock(&stats_lock);
1530   }
1531
1532   return chars;
1533 } /* }}} static int journal_write */
1534
1535 static int journal_replay (const char *file) /* {{{ */
1536 {
1537   FILE *fh;
1538   int entry_cnt = 0;
1539   int fail_cnt = 0;
1540   uint64_t line = 0;
1541   char entry[CMD_MAX];
1542
1543   if (file == NULL) return 0;
1544
1545   fh = fopen(file, "r");
1546   if (fh == NULL)
1547   {
1548     if (errno != ENOENT)
1549       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1550                file, rrd_strerror(errno));
1551     return 0;
1552   }
1553   else
1554     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1555
1556   while(!feof(fh))
1557   {
1558     size_t entry_len;
1559
1560     ++line;
1561     if (fgets(entry, sizeof(entry), fh) == NULL)
1562       break;
1563     entry_len = strlen(entry);
1564
1565     /* check \n termination in case journal writing crashed mid-line */
1566     if (entry_len == 0)
1567       continue;
1568     else if (entry[entry_len - 1] != '\n')
1569     {
1570       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1571       ++fail_cnt;
1572       continue;
1573     }
1574
1575     entry[entry_len - 1] = '\0';
1576
1577     if (handle_request(NULL, entry, entry_len) == 0)
1578       ++entry_cnt;
1579     else
1580       ++fail_cnt;
1581   }
1582
1583   fclose(fh);
1584
1585   if (entry_cnt > 0)
1586   {
1587     RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1588              entry_cnt, fail_cnt);
1589     return 1;
1590   }
1591   else
1592     return 0;
1593
1594 } /* }}} static int journal_replay */
1595
1596 static void close_connection(listen_socket_t *sock)
1597 {
1598   close(sock->fd) ;  sock->fd   = -1;
1599   free(sock->rbuf);  sock->rbuf = NULL;
1600   free(sock->wbuf);  sock->wbuf = NULL;
1601
1602   free(sock);
1603 }
1604
1605 static void *connection_thread_main (void *args) /* {{{ */
1606 {
1607   pthread_t self;
1608   listen_socket_t *sock;
1609   int i;
1610   int fd;
1611
1612   sock = (listen_socket_t *) args;
1613   fd = sock->fd;
1614
1615   /* init read buffers */
1616   sock->next_read = sock->next_cmd = 0;
1617   sock->rbuf = malloc(RBUF_SIZE);
1618   if (sock->rbuf == NULL)
1619   {
1620     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1621     close_connection(sock);
1622     return NULL;
1623   }
1624
1625   pthread_mutex_lock (&connection_threads_lock);
1626   {
1627     pthread_t *temp;
1628
1629     temp = (pthread_t *) realloc (connection_threads,
1630         sizeof (pthread_t) * (connection_threads_num + 1));
1631     if (temp == NULL)
1632     {
1633       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1634     }
1635     else
1636     {
1637       connection_threads = temp;
1638       connection_threads[connection_threads_num] = pthread_self ();
1639       connection_threads_num++;
1640     }
1641   }
1642   pthread_mutex_unlock (&connection_threads_lock);
1643
1644   while (do_shutdown == 0)
1645   {
1646     char *cmd;
1647     ssize_t cmd_len;
1648     ssize_t rbytes;
1649
1650     struct pollfd pollfd;
1651     int status;
1652
1653     pollfd.fd = fd;
1654     pollfd.events = POLLIN | POLLPRI;
1655     pollfd.revents = 0;
1656
1657     status = poll (&pollfd, 1, /* timeout = */ 500);
1658     if (do_shutdown)
1659       break;
1660     else if (status == 0) /* timeout */
1661       continue;
1662     else if (status < 0) /* error */
1663     {
1664       status = errno;
1665       if (status == EINTR)
1666         continue;
1667       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1668       continue;
1669     }
1670
1671     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1672     {
1673       close_connection(sock);
1674       break;
1675     }
1676     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1677     {
1678       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1679           "poll(2) returned something unexpected: %#04hx",
1680           pollfd.revents);
1681       close_connection(sock);
1682       break;
1683     }
1684
1685     rbytes = read(fd, sock->rbuf + sock->next_read,
1686                   RBUF_SIZE - sock->next_read);
1687     if (rbytes < 0)
1688     {
1689       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1690       break;
1691     }
1692     else if (rbytes == 0)
1693       break; /* eof */
1694
1695     sock->next_read += rbytes;
1696
1697     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1698     {
1699       status = handle_request (sock, cmd, cmd_len+1);
1700       if (status != 0)
1701         goto out_close;
1702     }
1703   }
1704
1705 out_close:
1706   close_connection(sock);
1707
1708   self = pthread_self ();
1709   /* Remove this thread from the connection threads list */
1710   pthread_mutex_lock (&connection_threads_lock);
1711   /* Find out own index in the array */
1712   for (i = 0; i < connection_threads_num; i++)
1713     if (pthread_equal (connection_threads[i], self) != 0)
1714       break;
1715   assert (i < connection_threads_num);
1716
1717   /* Move the trailing threads forward. */
1718   if (i < (connection_threads_num - 1))
1719   {
1720     memmove (connection_threads + i,
1721         connection_threads + i + 1,
1722         sizeof (pthread_t) * (connection_threads_num - i - 1));
1723   }
1724
1725   connection_threads_num--;
1726   pthread_mutex_unlock (&connection_threads_lock);
1727
1728   return (NULL);
1729 } /* }}} void *connection_thread_main */
1730
1731 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1732 {
1733   int fd;
1734   struct sockaddr_un sa;
1735   listen_socket_t *temp;
1736   int status;
1737   const char *path;
1738
1739   path = sock->addr;
1740   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1741     path += strlen("unix:");
1742
1743   temp = (listen_socket_t *) realloc (listen_fds,
1744       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1745   if (temp == NULL)
1746   {
1747     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1748     return (-1);
1749   }
1750   listen_fds = temp;
1751   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1752
1753   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1754   if (fd < 0)
1755   {
1756     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1757     return (-1);
1758   }
1759
1760   memset (&sa, 0, sizeof (sa));
1761   sa.sun_family = AF_UNIX;
1762   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1763
1764   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1765   if (status != 0)
1766   {
1767     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1768     close (fd);
1769     unlink (path);
1770     return (-1);
1771   }
1772
1773   status = listen (fd, /* backlog = */ 10);
1774   if (status != 0)
1775   {
1776     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1777     close (fd);
1778     unlink (path);
1779     return (-1);
1780   }
1781
1782   listen_fds[listen_fds_num].fd = fd;
1783   listen_fds[listen_fds_num].family = PF_UNIX;
1784   strncpy(listen_fds[listen_fds_num].addr, path,
1785           sizeof (listen_fds[listen_fds_num].addr) - 1);
1786   listen_fds_num++;
1787
1788   return (0);
1789 } /* }}} int open_listen_socket_unix */
1790
1791 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1792 {
1793   struct addrinfo ai_hints;
1794   struct addrinfo *ai_res;
1795   struct addrinfo *ai_ptr;
1796   char addr_copy[NI_MAXHOST];
1797   char *addr;
1798   char *port;
1799   int status;
1800
1801   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1802   addr_copy[sizeof (addr_copy) - 1] = 0;
1803   addr = addr_copy;
1804
1805   memset (&ai_hints, 0, sizeof (ai_hints));
1806   ai_hints.ai_flags = 0;
1807 #ifdef AI_ADDRCONFIG
1808   ai_hints.ai_flags |= AI_ADDRCONFIG;
1809 #endif
1810   ai_hints.ai_family = AF_UNSPEC;
1811   ai_hints.ai_socktype = SOCK_STREAM;
1812
1813   port = NULL;
1814   if (*addr == '[') /* IPv6+port format */
1815   {
1816     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1817     addr++;
1818
1819     port = strchr (addr, ']');
1820     if (port == NULL)
1821     {
1822       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1823           sock->addr);
1824       return (-1);
1825     }
1826     *port = 0;
1827     port++;
1828
1829     if (*port == ':')
1830       port++;
1831     else if (*port == 0)
1832       port = NULL;
1833     else
1834     {
1835       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1836           port);
1837       return (-1);
1838     }
1839   } /* if (*addr = ']') */
1840   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1841   {
1842     port = rindex(addr, ':');
1843     if (port != NULL)
1844     {
1845       *port = 0;
1846       port++;
1847     }
1848   }
1849   ai_res = NULL;
1850   status = getaddrinfo (addr,
1851                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1852                         &ai_hints, &ai_res);
1853   if (status != 0)
1854   {
1855     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1856         "%s", addr, gai_strerror (status));
1857     return (-1);
1858   }
1859
1860   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1861   {
1862     int fd;
1863     listen_socket_t *temp;
1864     int one = 1;
1865
1866     temp = (listen_socket_t *) realloc (listen_fds,
1867         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1868     if (temp == NULL)
1869     {
1870       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1871       continue;
1872     }
1873     listen_fds = temp;
1874     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1875
1876     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1877     if (fd < 0)
1878     {
1879       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1880       continue;
1881     }
1882
1883     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1884
1885     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1886     if (status != 0)
1887     {
1888       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1889       close (fd);
1890       continue;
1891     }
1892
1893     status = listen (fd, /* backlog = */ 10);
1894     if (status != 0)
1895     {
1896       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1897       close (fd);
1898       return (-1);
1899     }
1900
1901     listen_fds[listen_fds_num].fd = fd;
1902     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1903     listen_fds_num++;
1904   } /* for (ai_ptr) */
1905
1906   return (0);
1907 } /* }}} static int open_listen_socket_network */
1908
1909 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1910 {
1911   assert(sock != NULL);
1912   assert(sock->addr != NULL);
1913
1914   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1915       || sock->addr[0] == '/')
1916     return (open_listen_socket_unix(sock));
1917   else
1918     return (open_listen_socket_network(sock));
1919 } /* }}} int open_listen_socket */
1920
1921 static int close_listen_sockets (void) /* {{{ */
1922 {
1923   size_t i;
1924
1925   for (i = 0; i < listen_fds_num; i++)
1926   {
1927     close (listen_fds[i].fd);
1928
1929     if (listen_fds[i].family == PF_UNIX)
1930       unlink(listen_fds[i].addr);
1931   }
1932
1933   free (listen_fds);
1934   listen_fds = NULL;
1935   listen_fds_num = 0;
1936
1937   return (0);
1938 } /* }}} int close_listen_sockets */
1939
1940 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
1941 {
1942   struct pollfd *pollfds;
1943   int pollfds_num;
1944   int status;
1945   int i;
1946
1947   for (i = 0; i < config_listen_address_list_len; i++)
1948     open_listen_socket (config_listen_address_list[i]);
1949
1950   if (config_listen_address_list_len < 1)
1951   {
1952     listen_socket_t sock;
1953     memset(&sock, 0, sizeof(sock));
1954     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
1955     open_listen_socket (&sock);
1956   }
1957
1958   if (listen_fds_num < 1)
1959   {
1960     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
1961         "could be opened. Sorry.");
1962     return (NULL);
1963   }
1964
1965   pollfds_num = listen_fds_num;
1966   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
1967   if (pollfds == NULL)
1968   {
1969     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
1970     return (NULL);
1971   }
1972   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
1973
1974   RRDD_LOG(LOG_INFO, "listening for connections");
1975
1976   while (do_shutdown == 0)
1977   {
1978     assert (pollfds_num == ((int) listen_fds_num));
1979     for (i = 0; i < pollfds_num; i++)
1980     {
1981       pollfds[i].fd = listen_fds[i].fd;
1982       pollfds[i].events = POLLIN | POLLPRI;
1983       pollfds[i].revents = 0;
1984     }
1985
1986     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
1987     if (do_shutdown)
1988       break;
1989     else if (status == 0) /* timeout */
1990       continue;
1991     else if (status < 0) /* error */
1992     {
1993       status = errno;
1994       if (status != EINTR)
1995       {
1996         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
1997       }
1998       continue;
1999     }
2000
2001     for (i = 0; i < pollfds_num; i++)
2002     {
2003       listen_socket_t *client_sock;
2004       struct sockaddr_storage client_sa;
2005       socklen_t client_sa_size;
2006       pthread_t tid;
2007       pthread_attr_t attr;
2008
2009       if (pollfds[i].revents == 0)
2010         continue;
2011
2012       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2013       {
2014         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2015             "poll(2) returned something unexpected for listen FD #%i.",
2016             pollfds[i].fd);
2017         continue;
2018       }
2019
2020       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2021       if (client_sock == NULL)
2022       {
2023         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2024         continue;
2025       }
2026       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2027
2028       client_sa_size = sizeof (client_sa);
2029       client_sock->fd = accept (pollfds[i].fd,
2030           (struct sockaddr *) &client_sa, &client_sa_size);
2031       if (client_sock->fd < 0)
2032       {
2033         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2034         free(client_sock);
2035         continue;
2036       }
2037
2038       pthread_attr_init (&attr);
2039       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2040
2041       status = pthread_create (&tid, &attr, connection_thread_main,
2042                                client_sock);
2043       if (status != 0)
2044       {
2045         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2046         close_connection(client_sock);
2047         continue;
2048       }
2049     } /* for (pollfds_num) */
2050   } /* while (do_shutdown == 0) */
2051
2052   RRDD_LOG(LOG_INFO, "starting shutdown");
2053
2054   close_listen_sockets ();
2055
2056   pthread_mutex_lock (&connection_threads_lock);
2057   while (connection_threads_num > 0)
2058   {
2059     pthread_t wait_for;
2060
2061     wait_for = connection_threads[0];
2062
2063     pthread_mutex_unlock (&connection_threads_lock);
2064     pthread_join (wait_for, /* retval = */ NULL);
2065     pthread_mutex_lock (&connection_threads_lock);
2066   }
2067   pthread_mutex_unlock (&connection_threads_lock);
2068
2069   return (NULL);
2070 } /* }}} void *listen_thread_main */
2071
2072 static int daemonize (void) /* {{{ */
2073 {
2074   int status;
2075   int fd;
2076   char *base_dir;
2077
2078   fd = open_pidfile();
2079   if (fd < 0) return fd;
2080
2081   if (!stay_foreground)
2082   {
2083     pid_t child;
2084
2085     child = fork ();
2086     if (child < 0)
2087     {
2088       fprintf (stderr, "daemonize: fork(2) failed.\n");
2089       return (-1);
2090     }
2091     else if (child > 0)
2092     {
2093       return (1);
2094     }
2095
2096     /* Become session leader */
2097     setsid ();
2098
2099     /* Open the first three file descriptors to /dev/null */
2100     close (2);
2101     close (1);
2102     close (0);
2103
2104     open ("/dev/null", O_RDWR);
2105     dup (0);
2106     dup (0);
2107   } /* if (!stay_foreground) */
2108
2109   /* Change into the /tmp directory. */
2110   base_dir = (config_base_dir != NULL)
2111     ? config_base_dir
2112     : "/tmp";
2113   status = chdir (base_dir);
2114   if (status != 0)
2115   {
2116     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2117     return (-1);
2118   }
2119
2120   install_signal_handlers();
2121
2122   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2123   RRDD_LOG(LOG_INFO, "starting up");
2124
2125   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2126   if (cache_tree == NULL)
2127   {
2128     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2129     return (-1);
2130   }
2131
2132   status = write_pidfile (fd);
2133   return status;
2134 } /* }}} int daemonize */
2135
2136 static int cleanup (void) /* {{{ */
2137 {
2138   do_shutdown++;
2139
2140   pthread_cond_signal (&cache_cond);
2141   pthread_join (queue_thread, /* return = */ NULL);
2142
2143   remove_pidfile ();
2144
2145   RRDD_LOG(LOG_INFO, "goodbye");
2146   closelog ();
2147
2148   return (0);
2149 } /* }}} int cleanup */
2150
2151 static int read_options (int argc, char **argv) /* {{{ */
2152 {
2153   int option;
2154   int status = 0;
2155
2156   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2157   {
2158     switch (option)
2159     {
2160       case 'g':
2161         stay_foreground=1;
2162         break;
2163
2164       case 'L':
2165       case 'l':
2166       {
2167         listen_socket_t **temp;
2168         listen_socket_t *new;
2169
2170         new = malloc(sizeof(listen_socket_t));
2171         if (new == NULL)
2172         {
2173           fprintf(stderr, "read_options: malloc failed.\n");
2174           return(2);
2175         }
2176         memset(new, 0, sizeof(listen_socket_t));
2177
2178         temp = (listen_socket_t **) realloc (config_listen_address_list,
2179             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2180         if (temp == NULL)
2181         {
2182           fprintf (stderr, "read_options: realloc failed.\n");
2183           return (2);
2184         }
2185         config_listen_address_list = temp;
2186
2187         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2188         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2189
2190         temp[config_listen_address_list_len] = new;
2191         config_listen_address_list_len++;
2192       }
2193       break;
2194
2195       case 'f':
2196       {
2197         int temp;
2198
2199         temp = atoi (optarg);
2200         if (temp > 0)
2201           config_flush_interval = temp;
2202         else
2203         {
2204           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2205           status = 3;
2206         }
2207       }
2208       break;
2209
2210       case 'w':
2211       {
2212         int temp;
2213
2214         temp = atoi (optarg);
2215         if (temp > 0)
2216           config_write_interval = temp;
2217         else
2218         {
2219           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2220           status = 2;
2221         }
2222       }
2223       break;
2224
2225       case 'z':
2226       {
2227         int temp;
2228
2229         temp = atoi(optarg);
2230         if (temp > 0)
2231           config_write_jitter = temp;
2232         else
2233         {
2234           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2235           status = 2;
2236         }
2237
2238         break;
2239       }
2240
2241       case 'B':
2242         config_write_base_only = 1;
2243         break;
2244
2245       case 'b':
2246       {
2247         size_t len;
2248
2249         if (config_base_dir != NULL)
2250           free (config_base_dir);
2251         config_base_dir = strdup (optarg);
2252         if (config_base_dir == NULL)
2253         {
2254           fprintf (stderr, "read_options: strdup failed.\n");
2255           return (3);
2256         }
2257
2258         len = strlen (config_base_dir);
2259         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2260         {
2261           config_base_dir[len - 1] = 0;
2262           len--;
2263         }
2264
2265         if (len < 1)
2266         {
2267           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2268           return (4);
2269         }
2270
2271         _config_base_dir_len = len;
2272       }
2273       break;
2274
2275       case 'p':
2276       {
2277         if (config_pid_file != NULL)
2278           free (config_pid_file);
2279         config_pid_file = strdup (optarg);
2280         if (config_pid_file == NULL)
2281         {
2282           fprintf (stderr, "read_options: strdup failed.\n");
2283           return (3);
2284         }
2285       }
2286       break;
2287
2288       case 'F':
2289         config_flush_at_shutdown = 1;
2290         break;
2291
2292       case 'j':
2293       {
2294         struct stat statbuf;
2295         const char *dir = optarg;
2296
2297         status = stat(dir, &statbuf);
2298         if (status != 0)
2299         {
2300           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2301           return 6;
2302         }
2303
2304         if (!S_ISDIR(statbuf.st_mode)
2305             || access(dir, R_OK|W_OK|X_OK) != 0)
2306         {
2307           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2308                   errno ? rrd_strerror(errno) : "");
2309           return 6;
2310         }
2311
2312         journal_cur = malloc(PATH_MAX + 1);
2313         journal_old = malloc(PATH_MAX + 1);
2314         if (journal_cur == NULL || journal_old == NULL)
2315         {
2316           fprintf(stderr, "malloc failure for journal files\n");
2317           return 6;
2318         }
2319         else 
2320         {
2321           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2322           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2323         }
2324       }
2325       break;
2326
2327       case 'h':
2328       case '?':
2329         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2330             "\n"
2331             "Usage: rrdcached [options]\n"
2332             "\n"
2333             "Valid options are:\n"
2334             "  -l <address>  Socket address to listen to.\n"
2335             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2336             "  -w <seconds>  Interval in which to write data.\n"
2337             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2338             "  -f <seconds>  Interval in which to flush dead data.\n"
2339             "  -p <file>     Location of the PID-file.\n"
2340             "  -b <dir>      Base directory to change to.\n"
2341             "  -B            Restrict file access to paths within -b <dir>\n"
2342             "  -g            Do not fork and run in the foreground.\n"
2343             "  -j <dir>      Directory in which to create the journal files.\n"
2344             "  -F            Always flush all updates at shutdown\n"
2345             "\n"
2346             "For more information and a detailed description of all options "
2347             "please refer\n"
2348             "to the rrdcached(1) manual page.\n",
2349             VERSION);
2350         status = -1;
2351         break;
2352     } /* switch (option) */
2353   } /* while (getopt) */
2354
2355   /* advise the user when values are not sane */
2356   if (config_flush_interval < 2 * config_write_interval)
2357     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2358             " 2x write interval (-w) !\n");
2359   if (config_write_jitter > config_write_interval)
2360     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2361             " write interval (-w) !\n");
2362
2363   if (config_write_base_only && config_base_dir == NULL)
2364     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2365             "  Consult the rrdcached documentation\n");
2366
2367   if (journal_cur == NULL)
2368     config_flush_at_shutdown = 1;
2369
2370   return (status);
2371 } /* }}} int read_options */
2372
2373 int main (int argc, char **argv)
2374 {
2375   int status;
2376
2377   status = read_options (argc, argv);
2378   if (status != 0)
2379   {
2380     if (status < 0)
2381       status = 0;
2382     return (status);
2383   }
2384
2385   status = daemonize ();
2386   if (status == 1)
2387   {
2388     struct sigaction sigchld;
2389
2390     memset (&sigchld, 0, sizeof (sigchld));
2391     sigchld.sa_handler = SIG_IGN;
2392     sigaction (SIGCHLD, &sigchld, NULL);
2393
2394     return (0);
2395   }
2396   else if (status != 0)
2397   {
2398     fprintf (stderr, "daemonize failed, exiting.\n");
2399     return (1);
2400   }
2401
2402   if (journal_cur != NULL)
2403   {
2404     int had_journal = 0;
2405
2406     pthread_mutex_lock(&journal_lock);
2407
2408     RRDD_LOG(LOG_INFO, "checking for journal files");
2409
2410     had_journal += journal_replay(journal_old);
2411     had_journal += journal_replay(journal_cur);
2412
2413     if (had_journal)
2414       flush_old_values(-1);
2415
2416     pthread_mutex_unlock(&journal_lock);
2417     journal_rotate();
2418
2419     RRDD_LOG(LOG_INFO, "journal processing complete");
2420   }
2421
2422   /* start the queue thread */
2423   memset (&queue_thread, 0, sizeof (queue_thread));
2424   status = pthread_create (&queue_thread,
2425                            NULL, /* attr */
2426                            queue_thread_main,
2427                            NULL); /* args */
2428   if (status != 0)
2429   {
2430     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2431     cleanup();
2432     return (1);
2433   }
2434
2435   listen_thread_main (NULL);
2436   cleanup ();
2437
2438   return (0);
2439 } /* int main */
2440
2441 /*
2442  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2443  */