ea607d83df80810ddecd660e4bd5552cb05f5a22
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
148
149 struct callback_flush_data_s
150 {
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
157
158 enum queue_side_e
159 {
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
164
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
168
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
174
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
177
178 static int do_shutdown = 0;
179
180 static pthread_t queue_thread;
181
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
185
186 /* Cache stuff */
187 static GTree          *cache_tree = NULL;
188 static cache_item_t   *cache_queue_head = NULL;
189 static cache_item_t   *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
192
193 static int config_write_interval = 300;
194 static int config_write_jitter   = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
201
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
204
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
222
223 /* 
224  * Functions
225  */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229   do_shutdown++;
230   pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
232
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   sig_common("INT");
236 } /* }}} void sig_int_handler */
237
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240   sig_common("TERM");
241 } /* }}} void sig_term_handler */
242
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   config_flush_at_shutdown = 1;
246   sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
248
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251   config_flush_at_shutdown = 0;
252   sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
254
255 static void install_signal_handlers(void) /* {{{ */
256 {
257   /* These structures are static, because `sigaction' behaves weird if the are
258    * overwritten.. */
259   static struct sigaction sa_int;
260   static struct sigaction sa_term;
261   static struct sigaction sa_pipe;
262   static struct sigaction sa_usr1;
263   static struct sigaction sa_usr2;
264
265   /* Install signal handlers */
266   memset (&sa_int, 0, sizeof (sa_int));
267   sa_int.sa_handler = sig_int_handler;
268   sigaction (SIGINT, &sa_int, NULL);
269
270   memset (&sa_term, 0, sizeof (sa_term));
271   sa_term.sa_handler = sig_term_handler;
272   sigaction (SIGTERM, &sa_term, NULL);
273
274   memset (&sa_pipe, 0, sizeof (sa_pipe));
275   sa_pipe.sa_handler = SIG_IGN;
276   sigaction (SIGPIPE, &sa_pipe, NULL);
277
278   memset (&sa_pipe, 0, sizeof (sa_usr1));
279   sa_usr1.sa_handler = sig_usr1_handler;
280   sigaction (SIGUSR1, &sa_usr1, NULL);
281
282   memset (&sa_usr2, 0, sizeof (sa_usr2));
283   sa_usr2.sa_handler = sig_usr2_handler;
284   sigaction (SIGUSR2, &sa_usr2, NULL);
285
286 } /* }}} void install_signal_handlers */
287
288 static int open_pidfile(void) /* {{{ */
289 {
290   int fd;
291   char *file;
292
293   file = (config_pid_file != NULL)
294     ? config_pid_file
295     : LOCALSTATEDIR "/run/rrdcached.pid";
296
297   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298   if (fd < 0)
299     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300             file, rrd_strerror(errno));
301
302   return(fd);
303 } /* }}} static int open_pidfile */
304
305 static int write_pidfile (int fd) /* {{{ */
306 {
307   pid_t pid;
308   FILE *fh;
309
310   pid = getpid ();
311
312   fh = fdopen (fd, "w");
313   if (fh == NULL)
314   {
315     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316     close(fd);
317     return (-1);
318   }
319
320   fprintf (fh, "%i\n", (int) pid);
321   fclose (fh);
322
323   return (0);
324 } /* }}} int write_pidfile */
325
326 static int remove_pidfile (void) /* {{{ */
327 {
328   char *file;
329   int status;
330
331   file = (config_pid_file != NULL)
332     ? config_pid_file
333     : LOCALSTATEDIR "/run/rrdcached.pid";
334
335   status = unlink (file);
336   if (status == 0)
337     return (0);
338   return (errno);
339 } /* }}} int remove_pidfile */
340
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343   char *eol;
344
345   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346                sock->next_read - sock->next_cmd);
347
348   if (eol == NULL)
349   {
350     /* no commands left, move remainder back to front of rbuf */
351     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352             sock->next_read - sock->next_cmd);
353     sock->next_read -= sock->next_cmd;
354     sock->next_cmd = 0;
355     *len = 0;
356     return NULL;
357   }
358   else
359   {
360     char *cmd = sock->rbuf + sock->next_cmd;
361     *eol = '\0';
362
363     sock->next_cmd = eol - sock->rbuf + 1;
364
365     if (eol > sock->rbuf && *(eol-1) == '\r')
366       *(--eol) = '\0'; /* handle "\r\n" EOL */
367
368     *len = eol - cmd;
369
370     return cmd;
371   }
372
373   /* NOTREACHED */
374   assert(1==0);
375 }
376
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380   char *new_buf;
381
382   assert(sock != NULL);
383
384   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385   if (new_buf == NULL)
386   {
387     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388     return -1;
389   }
390
391   strncpy(new_buf + sock->wbuf_len, str, len + 1);
392
393   sock->wbuf = new_buf;
394   sock->wbuf_len += len;
395
396   return 0;
397 } /* }}} static int add_to_wbuf */
398
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402   va_list argp;
403   char buffer[CMD_MAX];
404   int len;
405
406   if (sock == NULL) return 0; /* journal replay mode */
407   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408
409   va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413   len = vsprintf(buffer, fmt, argp);
414 #endif
415   va_end(argp);
416   if (len < 0)
417   {
418     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419     return -1;
420   }
421
422   return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
424
425 static int count_lines(char *str) /* {{{ */
426 {
427   int lines = 0;
428
429   if (str != NULL)
430   {
431     while ((str = strchr(str, '\n')) != NULL)
432     {
433       ++lines;
434       ++str;
435     }
436   }
437
438   return lines;
439 } /* }}} static int count_lines */
440
441 /* send the response back to the user.
442  * returns 0 on success, -1 on error
443  * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445                           char *fmt, ...) /* {{{ */
446 {
447   va_list argp;
448   char buffer[CMD_MAX];
449   int lines;
450   ssize_t wrote;
451   int rclen, len;
452
453   if (sock == NULL) return rc;  /* journal replay mode */
454
455   if (sock->batch_mode)
456   {
457     if (rc == RESP_OK)
458       return rc; /* no response on success during BATCH */
459     lines = sock->batch_cmd;
460   }
461   else if (rc == RESP_OK)
462     lines = count_lines(sock->wbuf);
463   else
464     lines = -1;
465
466   rclen = sprintf(buffer, "%d ", lines);
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471   len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475     return -1;
476
477   len += rclen;
478
479   /* append the result to the wbuf, don't write to the user */
480   if (sock->batch_mode)
481     return add_to_wbuf(sock, buffer, len);
482
483   /* first write must be complete */
484   if (len != write(sock->fd, buffer, len))
485   {
486     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487     return -1;
488   }
489
490   if (sock->wbuf != NULL)
491   {
492     wrote = 0;
493     while (wrote < sock->wbuf_len)
494     {
495       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496       if (wb <= 0)
497       {
498         RRDD_LOG(LOG_INFO, "send_response: could not write results");
499         return -1;
500       }
501       wrote += wb;
502     }
503   }
504
505   free(sock->wbuf); sock->wbuf = NULL;
506   sock->wbuf_len = 0;
507
508   return 0;
509 } /* }}} */
510
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513   ci->values = NULL;
514   ci->values_num = 0;
515
516   ci->last_flush_time = when;
517   if (config_write_jitter > 0)
518     ci->last_flush_time += (random() % config_write_jitter);
519 }
520
521 /* remove_from_queue
522  * remove a "cache_item_t" item from the queue.
523  * must hold 'cache_lock' when calling this
524  */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527   if (ci == NULL) return;
528
529   if (ci->prev == NULL)
530     cache_queue_head = ci->next; /* reset head */
531   else
532     ci->prev->next = ci->next;
533
534   if (ci->next == NULL)
535     cache_queue_tail = ci->prev; /* reset the tail */
536   else
537     ci->next->prev = ci->prev;
538
539   ci->next = ci->prev = NULL;
540   ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
542
543 /*
544  * enqueue_cache_item:
545  * `cache_lock' must be acquired before calling this function!
546  */
547 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
548     queue_side_t side)
549 {
550   if (ci == NULL)
551     return (-1);
552
553   if (ci->values_num == 0)
554     return (0);
555
556   if (side == HEAD)
557   {
558     if (cache_queue_head == ci)
559       return 0;
560
561     /* remove from the double linked list */
562     if (ci->flags & CI_FLAGS_IN_QUEUE)
563       remove_from_queue(ci);
564
565     ci->prev = NULL;
566     ci->next = cache_queue_head;
567     if (ci->next != NULL)
568       ci->next->prev = ci;
569     cache_queue_head = ci;
570
571     if (cache_queue_tail == NULL)
572       cache_queue_tail = cache_queue_head;
573   }
574   else /* (side == TAIL) */
575   {
576     /* We don't move values back in the list.. */
577     if (ci->flags & CI_FLAGS_IN_QUEUE)
578       return (0);
579
580     assert (ci->next == NULL);
581     assert (ci->prev == NULL);
582
583     ci->prev = cache_queue_tail;
584
585     if (cache_queue_tail == NULL)
586       cache_queue_head = ci;
587     else
588       cache_queue_tail->next = ci;
589
590     cache_queue_tail = ci;
591   }
592
593   ci->flags |= CI_FLAGS_IN_QUEUE;
594
595   pthread_cond_broadcast(&cache_cond);
596   pthread_mutex_lock (&stats_lock);
597   stats_queue_length++;
598   pthread_mutex_unlock (&stats_lock);
599
600   return (0);
601 } /* }}} int enqueue_cache_item */
602
603 /*
604  * tree_callback_flush:
605  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
606  * while this is in progress.
607  */
608 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
609     gpointer data)
610 {
611   cache_item_t *ci;
612   callback_flush_data_t *cfd;
613
614   ci = (cache_item_t *) value;
615   cfd = (callback_flush_data_t *) data;
616
617   if ((ci->last_flush_time <= cfd->abs_timeout)
618       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
619       && (ci->values_num > 0))
620   {
621     enqueue_cache_item (ci, TAIL);
622   }
623   else if ((do_shutdown != 0)
624       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
625       && (ci->values_num > 0))
626   {
627     enqueue_cache_item (ci, TAIL);
628   }
629   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
630       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
631       && (ci->values_num <= 0))
632   {
633     char **temp;
634
635     temp = (char **) realloc (cfd->keys,
636         sizeof (char *) * (cfd->keys_num + 1));
637     if (temp == NULL)
638     {
639       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
640       return (FALSE);
641     }
642     cfd->keys = temp;
643     /* Make really sure this points to the _same_ place */
644     assert ((char *) key == ci->file);
645     cfd->keys[cfd->keys_num] = (char *) key;
646     cfd->keys_num++;
647   }
648
649   return (FALSE);
650 } /* }}} gboolean tree_callback_flush */
651
652 static int flush_old_values (int max_age)
653 {
654   callback_flush_data_t cfd;
655   size_t k;
656
657   memset (&cfd, 0, sizeof (cfd));
658   /* Pass the current time as user data so that we don't need to call
659    * `time' for each node. */
660   cfd.now = time (NULL);
661   cfd.keys = NULL;
662   cfd.keys_num = 0;
663
664   if (max_age > 0)
665     cfd.abs_timeout = cfd.now - max_age;
666   else
667     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
668
669   /* `tree_callback_flush' will return the keys of all values that haven't
670    * been touched in the last `config_flush_interval' seconds in `cfd'.
671    * The char*'s in this array point to the same memory as ci->file, so we
672    * don't need to free them separately. */
673   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
674
675   for (k = 0; k < cfd.keys_num; k++)
676   {
677     cache_item_t *ci;
678
679     /* This must not fail. */
680     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
681     assert (ci != NULL);
682
683     /* If we end up here with values available, something's seriously
684      * messed up. */
685     assert (ci->values_num == 0);
686
687     /* Remove the node from the tree */
688     g_tree_remove (cache_tree, cfd.keys[k]);
689     cfd.keys[k] = NULL;
690
691     /* Now free and clean up `ci'. */
692     free (ci->file);
693     ci->file = NULL;
694     free (ci);
695     ci = NULL;
696   } /* for (k = 0; k < cfd.keys_num; k++) */
697
698   if (cfd.keys != NULL)
699   {
700     free (cfd.keys);
701     cfd.keys = NULL;
702   }
703
704   return (0);
705 } /* int flush_old_values */
706
707 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
708 {
709   struct timeval now;
710   struct timespec next_flush;
711   int final_flush = 0; /* make sure we only flush once on shutdown */
712
713   gettimeofday (&now, NULL);
714   next_flush.tv_sec = now.tv_sec + config_flush_interval;
715   next_flush.tv_nsec = 1000 * now.tv_usec;
716
717   pthread_mutex_lock (&cache_lock);
718   while ((do_shutdown == 0) || (cache_queue_head != NULL))
719   {
720     cache_item_t *ci;
721     char *file;
722     char **values;
723     int values_num;
724     int status;
725     int i;
726
727     /* First, check if it's time to do the cache flush. */
728     gettimeofday (&now, NULL);
729     if ((now.tv_sec > next_flush.tv_sec)
730         || ((now.tv_sec == next_flush.tv_sec)
731           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
732     {
733       /* Flush all values that haven't been written in the last
734        * `config_write_interval' seconds. */
735       flush_old_values (config_write_interval);
736
737       /* Determine the time of the next cache flush. */
738       while (next_flush.tv_sec <= now.tv_sec)
739         next_flush.tv_sec += config_flush_interval;
740
741       /* unlock the cache while we rotate so we don't block incoming
742        * updates if the fsync() blocks on disk I/O */
743       pthread_mutex_unlock(&cache_lock);
744       journal_rotate();
745       pthread_mutex_lock(&cache_lock);
746     }
747
748     /* Now, check if there's something to store away. If not, wait until
749      * something comes in or it's time to do the cache flush.  if we are
750      * shutting down, do not wait around.  */
751     if (cache_queue_head == NULL && !do_shutdown)
752     {
753       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
754       if ((status != 0) && (status != ETIMEDOUT))
755       {
756         RRDD_LOG (LOG_ERR, "queue_thread_main: "
757             "pthread_cond_timedwait returned %i.", status);
758       }
759     }
760
761     /* We're about to shut down */
762     if (do_shutdown != 0 && !final_flush++)
763     {
764       if (config_flush_at_shutdown)
765         flush_old_values (-1); /* flush everything */
766       else
767         break;
768     }
769
770     /* Check if a value has arrived. This may be NULL if we timed out or there
771      * was an interrupt such as a signal. */
772     if (cache_queue_head == NULL)
773       continue;
774
775     ci = cache_queue_head;
776
777     /* copy the relevant parts */
778     file = strdup (ci->file);
779     if (file == NULL)
780     {
781       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
782       continue;
783     }
784
785     assert(ci->values != NULL);
786     assert(ci->values_num > 0);
787
788     values = ci->values;
789     values_num = ci->values_num;
790
791     wipe_ci_values(ci, time(NULL));
792     remove_from_queue(ci);
793
794     pthread_mutex_lock (&stats_lock);
795     assert (stats_queue_length > 0);
796     stats_queue_length--;
797     pthread_mutex_unlock (&stats_lock);
798
799     pthread_mutex_unlock (&cache_lock);
800
801     rrd_clear_error ();
802     status = rrd_update_r (file, NULL, values_num, (void *) values);
803     if (status != 0)
804     {
805       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
806           "rrd_update_r (%s) failed with status %i. (%s)",
807           file, status, rrd_get_error());
808     }
809
810     journal_write("wrote", file);
811     pthread_cond_broadcast(&ci->flushed);
812
813     for (i = 0; i < values_num; i++)
814       free (values[i]);
815
816     free(values);
817     free(file);
818
819     if (status == 0)
820     {
821       pthread_mutex_lock (&stats_lock);
822       stats_updates_written++;
823       stats_data_sets_written += values_num;
824       pthread_mutex_unlock (&stats_lock);
825     }
826
827     pthread_mutex_lock (&cache_lock);
828
829     /* We're about to shut down */
830     if (do_shutdown != 0 && !final_flush++)
831     {
832       if (config_flush_at_shutdown)
833           flush_old_values (-1); /* flush everything */
834       else
835         break;
836     }
837   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
838   pthread_mutex_unlock (&cache_lock);
839
840   if (config_flush_at_shutdown)
841   {
842     assert(cache_queue_head == NULL);
843     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
844   }
845
846   journal_done();
847
848   return (NULL);
849 } /* }}} void *queue_thread_main */
850
851 static int buffer_get_field (char **buffer_ret, /* {{{ */
852     size_t *buffer_size_ret, char **field_ret)
853 {
854   char *buffer;
855   size_t buffer_pos;
856   size_t buffer_size;
857   char *field;
858   size_t field_size;
859   int status;
860
861   buffer = *buffer_ret;
862   buffer_pos = 0;
863   buffer_size = *buffer_size_ret;
864   field = *buffer_ret;
865   field_size = 0;
866
867   if (buffer_size <= 0)
868     return (-1);
869
870   /* This is ensured by `handle_request'. */
871   assert (buffer[buffer_size - 1] == '\0');
872
873   status = -1;
874   while (buffer_pos < buffer_size)
875   {
876     /* Check for end-of-field or end-of-buffer */
877     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
878     {
879       field[field_size] = 0;
880       field_size++;
881       buffer_pos++;
882       status = 0;
883       break;
884     }
885     /* Handle escaped characters. */
886     else if (buffer[buffer_pos] == '\\')
887     {
888       if (buffer_pos >= (buffer_size - 1))
889         break;
890       buffer_pos++;
891       field[field_size] = buffer[buffer_pos];
892       field_size++;
893       buffer_pos++;
894     }
895     /* Normal operation */ 
896     else
897     {
898       field[field_size] = buffer[buffer_pos];
899       field_size++;
900       buffer_pos++;
901     }
902   } /* while (buffer_pos < buffer_size) */
903
904   if (status != 0)
905     return (status);
906
907   *buffer_ret = buffer + buffer_pos;
908   *buffer_size_ret = buffer_size - buffer_pos;
909   *field_ret = field;
910
911   return (0);
912 } /* }}} int buffer_get_field */
913
914 /* if we're restricting writes to the base directory,
915  * check whether the file falls within the dir
916  * returns 1 if OK, otherwise 0
917  */
918 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
919 {
920   assert(file != NULL);
921
922   if (!config_write_base_only
923       || sock == NULL /* journal replay */
924       || config_base_dir == NULL)
925     return 1;
926
927   if (strstr(file, "../") != NULL) goto err;
928
929   /* relative paths without "../" are ok */
930   if (*file != '/') return 1;
931
932   /* file must be of the format base + "/" + <1+ char filename> */
933   if (strlen(file) < _config_base_dir_len + 2) goto err;
934   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
935   if (*(file + _config_base_dir_len) != '/') goto err;
936
937   return 1;
938
939 err:
940   if (sock != NULL && sock->fd >= 0)
941     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
942
943   return 0;
944 } /* }}} static int check_file_access */
945
946 static int flush_file (const char *filename) /* {{{ */
947 {
948   cache_item_t *ci;
949
950   pthread_mutex_lock (&cache_lock);
951
952   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
953   if (ci == NULL)
954   {
955     pthread_mutex_unlock (&cache_lock);
956     return (ENOENT);
957   }
958
959   if (ci->values_num > 0)
960   {
961     /* Enqueue at head */
962     enqueue_cache_item (ci, HEAD);
963     pthread_cond_wait(&ci->flushed, &cache_lock);
964   }
965
966   pthread_mutex_unlock(&cache_lock);
967
968   return (0);
969 } /* }}} int flush_file */
970
971 static int handle_request_help (listen_socket_t *sock, /* {{{ */
972     char *buffer, size_t buffer_size)
973 {
974   int status;
975   char **help_text;
976   char *command;
977
978   char *help_help[2] =
979   {
980     "Command overview\n"
981     ,
982     "FLUSH <filename>\n"
983     "FLUSHALL\n"
984     "HELP [<command>]\n"
985     "UPDATE <filename> <values> [<values> ...]\n"
986     "BATCH\n"
987     "STATS\n"
988   };
989
990   char *help_flush[2] =
991   {
992     "Help for FLUSH\n"
993     ,
994     "Usage: FLUSH <filename>\n"
995     "\n"
996     "Adds the given filename to the head of the update queue and returns\n"
997     "after is has been dequeued.\n"
998   };
999
1000   char *help_flushall[2] =
1001   {
1002     "Help for FLUSHALL\n"
1003     ,
1004     "Usage: FLUSHALL\n"
1005     "\n"
1006     "Triggers writing of all pending updates.  Returns immediately.\n"
1007   };
1008
1009   char *help_update[2] =
1010   {
1011     "Help for UPDATE\n"
1012     ,
1013     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1014     "\n"
1015     "Adds the given file to the internal cache if it is not yet known and\n"
1016     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1017     "for details.\n"
1018     "\n"
1019     "Each <values> has the following form:\n"
1020     "  <values> = <time>:<value>[:<value>[...]]\n"
1021     "See the rrdupdate(1) manpage for details.\n"
1022   };
1023
1024   char *help_stats[2] =
1025   {
1026     "Help for STATS\n"
1027     ,
1028     "Usage: STATS\n"
1029     "\n"
1030     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1031     "a description of the values.\n"
1032   };
1033
1034   char *help_batch[2] =
1035   {
1036     "Help for BATCH\n"
1037     ,
1038     "The 'BATCH' command permits the client to initiate a bulk load\n"
1039     "   of commands to rrdcached.\n"
1040     "\n"
1041     "Usage:\n"
1042     "\n"
1043     "    client: BATCH\n"
1044     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1045     "    client: command #1\n"
1046     "    client: command #2\n"
1047     "    client: ... and so on\n"
1048     "    client: .\n"
1049     "    server: 2 errors\n"
1050     "    server: 7 message for command #7\n"
1051     "    server: 9 message for command #9\n"
1052     "\n"
1053     "For more information, consult the rrdcached(1) documentation.\n"
1054   };
1055
1056   status = buffer_get_field (&buffer, &buffer_size, &command);
1057   if (status != 0)
1058     help_text = help_help;
1059   else
1060   {
1061     if (strcasecmp (command, "update") == 0)
1062       help_text = help_update;
1063     else if (strcasecmp (command, "flush") == 0)
1064       help_text = help_flush;
1065     else if (strcasecmp (command, "flushall") == 0)
1066       help_text = help_flushall;
1067     else if (strcasecmp (command, "stats") == 0)
1068       help_text = help_stats;
1069     else if (strcasecmp (command, "batch") == 0)
1070       help_text = help_batch;
1071     else
1072       help_text = help_help;
1073   }
1074
1075   add_response_info(sock, help_text[1]);
1076   return send_response(sock, RESP_OK, help_text[0]);
1077 } /* }}} int handle_request_help */
1078
1079 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1080 {
1081   uint64_t copy_queue_length;
1082   uint64_t copy_updates_received;
1083   uint64_t copy_flush_received;
1084   uint64_t copy_updates_written;
1085   uint64_t copy_data_sets_written;
1086   uint64_t copy_journal_bytes;
1087   uint64_t copy_journal_rotate;
1088
1089   uint64_t tree_nodes_number;
1090   uint64_t tree_depth;
1091
1092   pthread_mutex_lock (&stats_lock);
1093   copy_queue_length       = stats_queue_length;
1094   copy_updates_received   = stats_updates_received;
1095   copy_flush_received     = stats_flush_received;
1096   copy_updates_written    = stats_updates_written;
1097   copy_data_sets_written  = stats_data_sets_written;
1098   copy_journal_bytes      = stats_journal_bytes;
1099   copy_journal_rotate     = stats_journal_rotate;
1100   pthread_mutex_unlock (&stats_lock);
1101
1102   pthread_mutex_lock (&cache_lock);
1103   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1104   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1105   pthread_mutex_unlock (&cache_lock);
1106
1107   add_response_info(sock,
1108                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1109   add_response_info(sock,
1110                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1111   add_response_info(sock,
1112                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1113   add_response_info(sock,
1114                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1115   add_response_info(sock,
1116                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1117   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1118   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1119   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1120   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1121
1122   send_response(sock, RESP_OK, "Statistics follow\n");
1123
1124   return (0);
1125 } /* }}} int handle_request_stats */
1126
1127 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1128     char *buffer, size_t buffer_size)
1129 {
1130   char *file;
1131   int status;
1132
1133   status = buffer_get_field (&buffer, &buffer_size, &file);
1134   if (status != 0)
1135   {
1136     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1137   }
1138   else
1139   {
1140     pthread_mutex_lock(&stats_lock);
1141     stats_flush_received++;
1142     pthread_mutex_unlock(&stats_lock);
1143
1144     if (!check_file_access(file, sock)) return 0;
1145
1146     status = flush_file (file);
1147     if (status == 0)
1148       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1149     else if (status == ENOENT)
1150     {
1151       /* no file in our tree; see whether it exists at all */
1152       struct stat statbuf;
1153
1154       memset(&statbuf, 0, sizeof(statbuf));
1155       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1156         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1157       else
1158         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1159     }
1160     else if (status < 0)
1161       return send_response(sock, RESP_ERR, "Internal error.\n");
1162     else
1163       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1164   }
1165
1166   /* NOTREACHED */
1167   assert(1==0);
1168 } /* }}} int handle_request_slurp */
1169
1170 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1171 {
1172
1173   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1174
1175   pthread_mutex_lock(&cache_lock);
1176   flush_old_values(-1);
1177   pthread_mutex_unlock(&cache_lock);
1178
1179   return send_response(sock, RESP_OK, "Started flush.\n");
1180 } /* }}} static int handle_request_flushall */
1181
1182 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1183     char *buffer, size_t buffer_size)
1184 {
1185   char *file;
1186   int values_num = 0;
1187   int status;
1188
1189   time_t now;
1190   cache_item_t *ci;
1191
1192   now = time (NULL);
1193
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196     return send_response(sock, RESP_ERR,
1197                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1198
1199   pthread_mutex_lock(&stats_lock);
1200   stats_updates_received++;
1201   pthread_mutex_unlock(&stats_lock);
1202
1203   if (!check_file_access(file, sock)) return 0;
1204
1205   pthread_mutex_lock (&cache_lock);
1206   ci = g_tree_lookup (cache_tree, file);
1207
1208   if (ci == NULL) /* {{{ */
1209   {
1210     struct stat statbuf;
1211
1212     /* don't hold the lock while we setup; stat(2) might block */
1213     pthread_mutex_unlock(&cache_lock);
1214
1215     memset (&statbuf, 0, sizeof (statbuf));
1216     status = stat (file, &statbuf);
1217     if (status != 0)
1218     {
1219       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1220
1221       status = errno;
1222       if (status == ENOENT)
1223         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1224       else
1225         return send_response(sock, RESP_ERR,
1226                              "stat failed with error %i.\n", status);
1227     }
1228     if (!S_ISREG (statbuf.st_mode))
1229       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1230
1231     if (access(file, R_OK|W_OK) != 0)
1232       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1233                            file, rrd_strerror(errno));
1234
1235     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1236     if (ci == NULL)
1237     {
1238       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1239
1240       return send_response(sock, RESP_ERR, "malloc failed.\n");
1241     }
1242     memset (ci, 0, sizeof (cache_item_t));
1243
1244     ci->file = strdup (file);
1245     if (ci->file == NULL)
1246     {
1247       free (ci);
1248       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1249
1250       return send_response(sock, RESP_ERR, "strdup failed.\n");
1251     }
1252
1253     wipe_ci_values(ci, now);
1254     ci->flags = CI_FLAGS_IN_TREE;
1255
1256     pthread_mutex_lock(&cache_lock);
1257     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1258   } /* }}} */
1259   assert (ci != NULL);
1260
1261   while (buffer_size > 0)
1262   {
1263     char **temp;
1264     char *value;
1265
1266     status = buffer_get_field (&buffer, &buffer_size, &value);
1267     if (status != 0)
1268     {
1269       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1270       break;
1271     }
1272
1273     temp = (char **) realloc (ci->values,
1274         sizeof (char *) * (ci->values_num + 1));
1275     if (temp == NULL)
1276     {
1277       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1278       continue;
1279     }
1280     ci->values = temp;
1281
1282     ci->values[ci->values_num] = strdup (value);
1283     if (ci->values[ci->values_num] == NULL)
1284     {
1285       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1286       continue;
1287     }
1288     ci->values_num++;
1289
1290     values_num++;
1291   }
1292
1293   if (((now - ci->last_flush_time) >= config_write_interval)
1294       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1295       && (ci->values_num > 0))
1296   {
1297     enqueue_cache_item (ci, TAIL);
1298   }
1299
1300   pthread_mutex_unlock (&cache_lock);
1301
1302   if (values_num < 1)
1303     return send_response(sock, RESP_ERR, "No values updated.\n");
1304   else
1305     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1306
1307   /* NOTREACHED */
1308   assert(1==0);
1309
1310 } /* }}} int handle_request_update */
1311
1312 /* we came across a "WROTE" entry during journal replay.
1313  * throw away any values that we have accumulated for this file
1314  */
1315 static int handle_request_wrote (const char *buffer) /* {{{ */
1316 {
1317   int i;
1318   cache_item_t *ci;
1319   const char *file = buffer;
1320
1321   pthread_mutex_lock(&cache_lock);
1322
1323   ci = g_tree_lookup(cache_tree, file);
1324   if (ci == NULL)
1325   {
1326     pthread_mutex_unlock(&cache_lock);
1327     return (0);
1328   }
1329
1330   if (ci->values)
1331   {
1332     for (i=0; i < ci->values_num; i++)
1333       free(ci->values[i]);
1334
1335     free(ci->values);
1336   }
1337
1338   wipe_ci_values(ci, time(NULL));
1339   remove_from_queue(ci);
1340
1341   pthread_mutex_unlock(&cache_lock);
1342   return (0);
1343 } /* }}} int handle_request_wrote */
1344
1345 /* start "BATCH" processing */
1346 static int batch_start (listen_socket_t *sock) /* {{{ */
1347 {
1348   int status;
1349   if (sock->batch_mode)
1350     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1351
1352   status = send_response(sock, RESP_OK,
1353                          "Go ahead.  End with dot '.' on its own line.\n");
1354   sock->batch_mode = 1;
1355   sock->batch_cmd = 0;
1356
1357   return status;
1358 } /* }}} static int batch_start */
1359
1360 /* finish "BATCH" processing and return results to the client */
1361 static int batch_done (listen_socket_t *sock) /* {{{ */
1362 {
1363   assert(sock->batch_mode);
1364   sock->batch_mode = 0;
1365   sock->batch_cmd  = 0;
1366   return send_response(sock, RESP_OK, "errors\n");
1367 } /* }}} static int batch_done */
1368
1369 /* returns 1 if we have the required privilege level */
1370 static int has_privilege (listen_socket_t *sock, /* {{{ */
1371                           socket_privilege priv)
1372 {
1373   if (sock == NULL) /* journal replay */
1374     return 1;
1375
1376   if (sock->privilege >= priv)
1377     return 1;
1378
1379   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1380 } /* }}} static int has_privilege */
1381
1382 /* if sock==NULL, we are in journal replay mode */
1383 static int handle_request (listen_socket_t *sock, /* {{{ */
1384                            char *buffer, size_t buffer_size)
1385 {
1386   char *buffer_ptr;
1387   char *command;
1388   int status;
1389
1390   assert (buffer[buffer_size - 1] == '\0');
1391
1392   buffer_ptr = buffer;
1393   command = NULL;
1394   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1395   if (status != 0)
1396   {
1397     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1398     return (-1);
1399   }
1400
1401   if (sock != NULL && sock->batch_mode)
1402     sock->batch_cmd++;
1403
1404   if (strcasecmp (command, "update") == 0)
1405   {
1406     status = has_privilege(sock, PRIV_HIGH);
1407     if (status <= 0)
1408       return status;
1409
1410     /* don't re-write updates in replay mode */
1411     if (sock != NULL)
1412       journal_write(command, buffer_ptr);
1413
1414     return (handle_request_update (sock, buffer_ptr, buffer_size));
1415   }
1416   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1417   {
1418     /* this is only valid in replay mode */
1419     return (handle_request_wrote (buffer_ptr));
1420   }
1421   else if (strcasecmp (command, "flush") == 0)
1422     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1423   else if (strcasecmp (command, "flushall") == 0)
1424   {
1425     status = has_privilege(sock, PRIV_HIGH);
1426     if (status <= 0)
1427       return status;
1428
1429     return (handle_request_flushall(sock));
1430   }
1431   else if (strcasecmp (command, "stats") == 0)
1432     return (handle_request_stats (sock));
1433   else if (strcasecmp (command, "help") == 0)
1434     return (handle_request_help (sock, buffer_ptr, buffer_size));
1435   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1436     return batch_start(sock);
1437   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1438     return batch_done(sock);
1439   else
1440     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1441
1442   /* NOTREACHED */
1443   assert(1==0);
1444 } /* }}} int handle_request */
1445
1446 /* MUST NOT hold journal_lock before calling this */
1447 static void journal_rotate(void) /* {{{ */
1448 {
1449   FILE *old_fh = NULL;
1450   int new_fd;
1451
1452   if (journal_cur == NULL || journal_old == NULL)
1453     return;
1454
1455   pthread_mutex_lock(&journal_lock);
1456
1457   /* we rotate this way (rename before close) so that the we can release
1458    * the journal lock as fast as possible.  Journal writes to the new
1459    * journal can proceed immediately after the new file is opened.  The
1460    * fclose can then block without affecting new updates.
1461    */
1462   if (journal_fh != NULL)
1463   {
1464     old_fh = journal_fh;
1465     journal_fh = NULL;
1466     rename(journal_cur, journal_old);
1467     ++stats_journal_rotate;
1468   }
1469
1470   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1471                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1472   if (new_fd >= 0)
1473   {
1474     journal_fh = fdopen(new_fd, "a");
1475     if (journal_fh == NULL)
1476       close(new_fd);
1477   }
1478
1479   pthread_mutex_unlock(&journal_lock);
1480
1481   if (old_fh != NULL)
1482     fclose(old_fh);
1483
1484   if (journal_fh == NULL)
1485   {
1486     RRDD_LOG(LOG_CRIT,
1487              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1488              journal_cur, rrd_strerror(errno));
1489
1490     RRDD_LOG(LOG_ERR,
1491              "JOURNALING DISABLED: All values will be flushed at shutdown");
1492     config_flush_at_shutdown = 1;
1493   }
1494
1495 } /* }}} static void journal_rotate */
1496
1497 static void journal_done(void) /* {{{ */
1498 {
1499   if (journal_cur == NULL)
1500     return;
1501
1502   pthread_mutex_lock(&journal_lock);
1503   if (journal_fh != NULL)
1504   {
1505     fclose(journal_fh);
1506     journal_fh = NULL;
1507   }
1508
1509   if (config_flush_at_shutdown)
1510   {
1511     RRDD_LOG(LOG_INFO, "removing journals");
1512     unlink(journal_old);
1513     unlink(journal_cur);
1514   }
1515   else
1516   {
1517     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1518              "journals will be used at next startup");
1519   }
1520
1521   pthread_mutex_unlock(&journal_lock);
1522
1523 } /* }}} static void journal_done */
1524
1525 static int journal_write(char *cmd, char *args) /* {{{ */
1526 {
1527   int chars;
1528
1529   if (journal_fh == NULL)
1530     return 0;
1531
1532   pthread_mutex_lock(&journal_lock);
1533   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1534   pthread_mutex_unlock(&journal_lock);
1535
1536   if (chars > 0)
1537   {
1538     pthread_mutex_lock(&stats_lock);
1539     stats_journal_bytes += chars;
1540     pthread_mutex_unlock(&stats_lock);
1541   }
1542
1543   return chars;
1544 } /* }}} static int journal_write */
1545
1546 static int journal_replay (const char *file) /* {{{ */
1547 {
1548   FILE *fh;
1549   int entry_cnt = 0;
1550   int fail_cnt = 0;
1551   uint64_t line = 0;
1552   char entry[CMD_MAX];
1553
1554   if (file == NULL) return 0;
1555
1556   {
1557     char *reason;
1558     int status = 0;
1559     struct stat statbuf;
1560
1561     memset(&statbuf, 0, sizeof(statbuf));
1562     if (stat(file, &statbuf) != 0)
1563     {
1564       if (errno == ENOENT)
1565         return 0;
1566
1567       reason = "stat error";
1568       status = errno;
1569     }
1570     else if (!S_ISREG(statbuf.st_mode))
1571     {
1572       reason = "not a regular file";
1573       status = EPERM;
1574     }
1575     if (statbuf.st_uid != daemon_uid)
1576     {
1577       reason = "not owned by daemon user";
1578       status = EACCES;
1579     }
1580     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1581     {
1582       reason = "must not be user/group writable";
1583       status = EACCES;
1584     }
1585
1586     if (status != 0)
1587     {
1588       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1589                file, rrd_strerror(status), reason);
1590       return 0;
1591     }
1592   }
1593
1594   fh = fopen(file, "r");
1595   if (fh == NULL)
1596   {
1597     if (errno != ENOENT)
1598       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1599                file, rrd_strerror(errno));
1600     return 0;
1601   }
1602   else
1603     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1604
1605   while(!feof(fh))
1606   {
1607     size_t entry_len;
1608
1609     ++line;
1610     if (fgets(entry, sizeof(entry), fh) == NULL)
1611       break;
1612     entry_len = strlen(entry);
1613
1614     /* check \n termination in case journal writing crashed mid-line */
1615     if (entry_len == 0)
1616       continue;
1617     else if (entry[entry_len - 1] != '\n')
1618     {
1619       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1620       ++fail_cnt;
1621       continue;
1622     }
1623
1624     entry[entry_len - 1] = '\0';
1625
1626     if (handle_request(NULL, entry, entry_len) == 0)
1627       ++entry_cnt;
1628     else
1629       ++fail_cnt;
1630   }
1631
1632   fclose(fh);
1633
1634   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1635            entry_cnt, fail_cnt);
1636
1637   return entry_cnt > 0 ? 1 : 0;
1638 } /* }}} static int journal_replay */
1639
1640 static void journal_init(void) /* {{{ */
1641 {
1642   int had_journal = 0;
1643
1644   if (journal_cur == NULL) return;
1645
1646   pthread_mutex_lock(&journal_lock);
1647
1648   RRDD_LOG(LOG_INFO, "checking for journal files");
1649
1650   had_journal += journal_replay(journal_old);
1651   had_journal += journal_replay(journal_cur);
1652
1653   /* it must have been a crash.  start a flush */
1654   if (had_journal && config_flush_at_shutdown)
1655     flush_old_values(-1);
1656
1657   pthread_mutex_unlock(&journal_lock);
1658   journal_rotate();
1659
1660   RRDD_LOG(LOG_INFO, "journal processing complete");
1661
1662 } /* }}} static void journal_init */
1663
1664 static void close_connection(listen_socket_t *sock)
1665 {
1666   close(sock->fd) ;  sock->fd   = -1;
1667   free(sock->rbuf);  sock->rbuf = NULL;
1668   free(sock->wbuf);  sock->wbuf = NULL;
1669
1670   free(sock);
1671 }
1672
1673 static void *connection_thread_main (void *args) /* {{{ */
1674 {
1675   pthread_t self;
1676   listen_socket_t *sock;
1677   int i;
1678   int fd;
1679
1680   sock = (listen_socket_t *) args;
1681   fd = sock->fd;
1682
1683   /* init read buffers */
1684   sock->next_read = sock->next_cmd = 0;
1685   sock->rbuf = malloc(RBUF_SIZE);
1686   if (sock->rbuf == NULL)
1687   {
1688     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1689     close_connection(sock);
1690     return NULL;
1691   }
1692
1693   pthread_mutex_lock (&connection_threads_lock);
1694   {
1695     pthread_t *temp;
1696
1697     temp = (pthread_t *) realloc (connection_threads,
1698         sizeof (pthread_t) * (connection_threads_num + 1));
1699     if (temp == NULL)
1700     {
1701       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1702     }
1703     else
1704     {
1705       connection_threads = temp;
1706       connection_threads[connection_threads_num] = pthread_self ();
1707       connection_threads_num++;
1708     }
1709   }
1710   pthread_mutex_unlock (&connection_threads_lock);
1711
1712   while (do_shutdown == 0)
1713   {
1714     char *cmd;
1715     ssize_t cmd_len;
1716     ssize_t rbytes;
1717
1718     struct pollfd pollfd;
1719     int status;
1720
1721     pollfd.fd = fd;
1722     pollfd.events = POLLIN | POLLPRI;
1723     pollfd.revents = 0;
1724
1725     status = poll (&pollfd, 1, /* timeout = */ 500);
1726     if (do_shutdown)
1727       break;
1728     else if (status == 0) /* timeout */
1729       continue;
1730     else if (status < 0) /* error */
1731     {
1732       status = errno;
1733       if (status == EINTR)
1734         continue;
1735       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1736       continue;
1737     }
1738
1739     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1740     {
1741       close_connection(sock);
1742       break;
1743     }
1744     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1745     {
1746       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1747           "poll(2) returned something unexpected: %#04hx",
1748           pollfd.revents);
1749       close_connection(sock);
1750       break;
1751     }
1752
1753     rbytes = read(fd, sock->rbuf + sock->next_read,
1754                   RBUF_SIZE - sock->next_read);
1755     if (rbytes < 0)
1756     {
1757       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1758       break;
1759     }
1760     else if (rbytes == 0)
1761       break; /* eof */
1762
1763     sock->next_read += rbytes;
1764
1765     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1766     {
1767       status = handle_request (sock, cmd, cmd_len+1);
1768       if (status != 0)
1769         goto out_close;
1770     }
1771   }
1772
1773 out_close:
1774   close_connection(sock);
1775
1776   self = pthread_self ();
1777   /* Remove this thread from the connection threads list */
1778   pthread_mutex_lock (&connection_threads_lock);
1779   /* Find out own index in the array */
1780   for (i = 0; i < connection_threads_num; i++)
1781     if (pthread_equal (connection_threads[i], self) != 0)
1782       break;
1783   assert (i < connection_threads_num);
1784
1785   /* Move the trailing threads forward. */
1786   if (i < (connection_threads_num - 1))
1787   {
1788     memmove (connection_threads + i,
1789         connection_threads + i + 1,
1790         sizeof (pthread_t) * (connection_threads_num - i - 1));
1791   }
1792
1793   connection_threads_num--;
1794   pthread_mutex_unlock (&connection_threads_lock);
1795
1796   return (NULL);
1797 } /* }}} void *connection_thread_main */
1798
1799 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1800 {
1801   int fd;
1802   struct sockaddr_un sa;
1803   listen_socket_t *temp;
1804   int status;
1805   const char *path;
1806
1807   path = sock->addr;
1808   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1809     path += strlen("unix:");
1810
1811   temp = (listen_socket_t *) realloc (listen_fds,
1812       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1813   if (temp == NULL)
1814   {
1815     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1816     return (-1);
1817   }
1818   listen_fds = temp;
1819   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1820
1821   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1822   if (fd < 0)
1823   {
1824     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1825     return (-1);
1826   }
1827
1828   memset (&sa, 0, sizeof (sa));
1829   sa.sun_family = AF_UNIX;
1830   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1831
1832   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1833   if (status != 0)
1834   {
1835     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1836     close (fd);
1837     unlink (path);
1838     return (-1);
1839   }
1840
1841   status = listen (fd, /* backlog = */ 10);
1842   if (status != 0)
1843   {
1844     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1845     close (fd);
1846     unlink (path);
1847     return (-1);
1848   }
1849
1850   listen_fds[listen_fds_num].fd = fd;
1851   listen_fds[listen_fds_num].family = PF_UNIX;
1852   strncpy(listen_fds[listen_fds_num].addr, path,
1853           sizeof (listen_fds[listen_fds_num].addr) - 1);
1854   listen_fds_num++;
1855
1856   return (0);
1857 } /* }}} int open_listen_socket_unix */
1858
1859 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1860 {
1861   struct addrinfo ai_hints;
1862   struct addrinfo *ai_res;
1863   struct addrinfo *ai_ptr;
1864   char addr_copy[NI_MAXHOST];
1865   char *addr;
1866   char *port;
1867   int status;
1868
1869   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1870   addr_copy[sizeof (addr_copy) - 1] = 0;
1871   addr = addr_copy;
1872
1873   memset (&ai_hints, 0, sizeof (ai_hints));
1874   ai_hints.ai_flags = 0;
1875 #ifdef AI_ADDRCONFIG
1876   ai_hints.ai_flags |= AI_ADDRCONFIG;
1877 #endif
1878   ai_hints.ai_family = AF_UNSPEC;
1879   ai_hints.ai_socktype = SOCK_STREAM;
1880
1881   port = NULL;
1882   if (*addr == '[') /* IPv6+port format */
1883   {
1884     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1885     addr++;
1886
1887     port = strchr (addr, ']');
1888     if (port == NULL)
1889     {
1890       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1891           sock->addr);
1892       return (-1);
1893     }
1894     *port = 0;
1895     port++;
1896
1897     if (*port == ':')
1898       port++;
1899     else if (*port == 0)
1900       port = NULL;
1901     else
1902     {
1903       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1904           port);
1905       return (-1);
1906     }
1907   } /* if (*addr = ']') */
1908   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1909   {
1910     port = rindex(addr, ':');
1911     if (port != NULL)
1912     {
1913       *port = 0;
1914       port++;
1915     }
1916   }
1917   ai_res = NULL;
1918   status = getaddrinfo (addr,
1919                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1920                         &ai_hints, &ai_res);
1921   if (status != 0)
1922   {
1923     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1924         "%s", addr, gai_strerror (status));
1925     return (-1);
1926   }
1927
1928   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1929   {
1930     int fd;
1931     listen_socket_t *temp;
1932     int one = 1;
1933
1934     temp = (listen_socket_t *) realloc (listen_fds,
1935         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1936     if (temp == NULL)
1937     {
1938       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1939       continue;
1940     }
1941     listen_fds = temp;
1942     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1943
1944     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1945     if (fd < 0)
1946     {
1947       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1948       continue;
1949     }
1950
1951     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1952
1953     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1954     if (status != 0)
1955     {
1956       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1957       close (fd);
1958       continue;
1959     }
1960
1961     status = listen (fd, /* backlog = */ 10);
1962     if (status != 0)
1963     {
1964       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1965       close (fd);
1966       return (-1);
1967     }
1968
1969     listen_fds[listen_fds_num].fd = fd;
1970     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1971     listen_fds_num++;
1972   } /* for (ai_ptr) */
1973
1974   return (0);
1975 } /* }}} static int open_listen_socket_network */
1976
1977 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1978 {
1979   assert(sock != NULL);
1980   assert(sock->addr != NULL);
1981
1982   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1983       || sock->addr[0] == '/')
1984     return (open_listen_socket_unix(sock));
1985   else
1986     return (open_listen_socket_network(sock));
1987 } /* }}} int open_listen_socket */
1988
1989 static int close_listen_sockets (void) /* {{{ */
1990 {
1991   size_t i;
1992
1993   for (i = 0; i < listen_fds_num; i++)
1994   {
1995     close (listen_fds[i].fd);
1996
1997     if (listen_fds[i].family == PF_UNIX)
1998       unlink(listen_fds[i].addr);
1999   }
2000
2001   free (listen_fds);
2002   listen_fds = NULL;
2003   listen_fds_num = 0;
2004
2005   return (0);
2006 } /* }}} int close_listen_sockets */
2007
2008 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2009 {
2010   struct pollfd *pollfds;
2011   int pollfds_num;
2012   int status;
2013   int i;
2014
2015   for (i = 0; i < config_listen_address_list_len; i++)
2016     open_listen_socket (config_listen_address_list[i]);
2017
2018   if (config_listen_address_list_len < 1)
2019   {
2020     listen_socket_t sock;
2021     memset(&sock, 0, sizeof(sock));
2022     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2023     open_listen_socket (&sock);
2024   }
2025
2026   if (listen_fds_num < 1)
2027   {
2028     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2029         "could be opened. Sorry.");
2030     return (NULL);
2031   }
2032
2033   pollfds_num = listen_fds_num;
2034   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2035   if (pollfds == NULL)
2036   {
2037     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2038     return (NULL);
2039   }
2040   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2041
2042   RRDD_LOG(LOG_INFO, "listening for connections");
2043
2044   while (do_shutdown == 0)
2045   {
2046     assert (pollfds_num == ((int) listen_fds_num));
2047     for (i = 0; i < pollfds_num; i++)
2048     {
2049       pollfds[i].fd = listen_fds[i].fd;
2050       pollfds[i].events = POLLIN | POLLPRI;
2051       pollfds[i].revents = 0;
2052     }
2053
2054     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2055     if (do_shutdown)
2056       break;
2057     else if (status == 0) /* timeout */
2058       continue;
2059     else if (status < 0) /* error */
2060     {
2061       status = errno;
2062       if (status != EINTR)
2063       {
2064         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2065       }
2066       continue;
2067     }
2068
2069     for (i = 0; i < pollfds_num; i++)
2070     {
2071       listen_socket_t *client_sock;
2072       struct sockaddr_storage client_sa;
2073       socklen_t client_sa_size;
2074       pthread_t tid;
2075       pthread_attr_t attr;
2076
2077       if (pollfds[i].revents == 0)
2078         continue;
2079
2080       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2081       {
2082         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2083             "poll(2) returned something unexpected for listen FD #%i.",
2084             pollfds[i].fd);
2085         continue;
2086       }
2087
2088       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2089       if (client_sock == NULL)
2090       {
2091         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2092         continue;
2093       }
2094       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2095
2096       client_sa_size = sizeof (client_sa);
2097       client_sock->fd = accept (pollfds[i].fd,
2098           (struct sockaddr *) &client_sa, &client_sa_size);
2099       if (client_sock->fd < 0)
2100       {
2101         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2102         free(client_sock);
2103         continue;
2104       }
2105
2106       pthread_attr_init (&attr);
2107       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2108
2109       status = pthread_create (&tid, &attr, connection_thread_main,
2110                                client_sock);
2111       if (status != 0)
2112       {
2113         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2114         close_connection(client_sock);
2115         continue;
2116       }
2117     } /* for (pollfds_num) */
2118   } /* while (do_shutdown == 0) */
2119
2120   RRDD_LOG(LOG_INFO, "starting shutdown");
2121
2122   close_listen_sockets ();
2123
2124   pthread_mutex_lock (&connection_threads_lock);
2125   while (connection_threads_num > 0)
2126   {
2127     pthread_t wait_for;
2128
2129     wait_for = connection_threads[0];
2130
2131     pthread_mutex_unlock (&connection_threads_lock);
2132     pthread_join (wait_for, /* retval = */ NULL);
2133     pthread_mutex_lock (&connection_threads_lock);
2134   }
2135   pthread_mutex_unlock (&connection_threads_lock);
2136
2137   return (NULL);
2138 } /* }}} void *listen_thread_main */
2139
2140 static int daemonize (void) /* {{{ */
2141 {
2142   int status;
2143   int fd;
2144   char *base_dir;
2145
2146   daemon_uid = geteuid();
2147
2148   fd = open_pidfile();
2149   if (fd < 0) return fd;
2150
2151   if (!stay_foreground)
2152   {
2153     pid_t child;
2154
2155     child = fork ();
2156     if (child < 0)
2157     {
2158       fprintf (stderr, "daemonize: fork(2) failed.\n");
2159       return (-1);
2160     }
2161     else if (child > 0)
2162     {
2163       return (1);
2164     }
2165
2166     /* Become session leader */
2167     setsid ();
2168
2169     /* Open the first three file descriptors to /dev/null */
2170     close (2);
2171     close (1);
2172     close (0);
2173
2174     open ("/dev/null", O_RDWR);
2175     dup (0);
2176     dup (0);
2177   } /* if (!stay_foreground) */
2178
2179   /* Change into the /tmp directory. */
2180   base_dir = (config_base_dir != NULL)
2181     ? config_base_dir
2182     : "/tmp";
2183   status = chdir (base_dir);
2184   if (status != 0)
2185   {
2186     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2187     return (-1);
2188   }
2189
2190   install_signal_handlers();
2191
2192   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2193   RRDD_LOG(LOG_INFO, "starting up");
2194
2195   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2196   if (cache_tree == NULL)
2197   {
2198     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2199     return (-1);
2200   }
2201
2202   status = write_pidfile (fd);
2203   return status;
2204 } /* }}} int daemonize */
2205
2206 static int cleanup (void) /* {{{ */
2207 {
2208   do_shutdown++;
2209
2210   pthread_cond_signal (&cache_cond);
2211   pthread_join (queue_thread, /* return = */ NULL);
2212
2213   remove_pidfile ();
2214
2215   RRDD_LOG(LOG_INFO, "goodbye");
2216   closelog ();
2217
2218   return (0);
2219 } /* }}} int cleanup */
2220
2221 static int read_options (int argc, char **argv) /* {{{ */
2222 {
2223   int option;
2224   int status = 0;
2225
2226   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2227   {
2228     switch (option)
2229     {
2230       case 'g':
2231         stay_foreground=1;
2232         break;
2233
2234       case 'L':
2235       case 'l':
2236       {
2237         listen_socket_t **temp;
2238         listen_socket_t *new;
2239
2240         new = malloc(sizeof(listen_socket_t));
2241         if (new == NULL)
2242         {
2243           fprintf(stderr, "read_options: malloc failed.\n");
2244           return(2);
2245         }
2246         memset(new, 0, sizeof(listen_socket_t));
2247
2248         temp = (listen_socket_t **) realloc (config_listen_address_list,
2249             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2250         if (temp == NULL)
2251         {
2252           fprintf (stderr, "read_options: realloc failed.\n");
2253           return (2);
2254         }
2255         config_listen_address_list = temp;
2256
2257         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2258         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2259
2260         temp[config_listen_address_list_len] = new;
2261         config_listen_address_list_len++;
2262       }
2263       break;
2264
2265       case 'f':
2266       {
2267         int temp;
2268
2269         temp = atoi (optarg);
2270         if (temp > 0)
2271           config_flush_interval = temp;
2272         else
2273         {
2274           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2275           status = 3;
2276         }
2277       }
2278       break;
2279
2280       case 'w':
2281       {
2282         int temp;
2283
2284         temp = atoi (optarg);
2285         if (temp > 0)
2286           config_write_interval = temp;
2287         else
2288         {
2289           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2290           status = 2;
2291         }
2292       }
2293       break;
2294
2295       case 'z':
2296       {
2297         int temp;
2298
2299         temp = atoi(optarg);
2300         if (temp > 0)
2301           config_write_jitter = temp;
2302         else
2303         {
2304           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2305           status = 2;
2306         }
2307
2308         break;
2309       }
2310
2311       case 'B':
2312         config_write_base_only = 1;
2313         break;
2314
2315       case 'b':
2316       {
2317         size_t len;
2318
2319         if (config_base_dir != NULL)
2320           free (config_base_dir);
2321         config_base_dir = strdup (optarg);
2322         if (config_base_dir == NULL)
2323         {
2324           fprintf (stderr, "read_options: strdup failed.\n");
2325           return (3);
2326         }
2327
2328         len = strlen (config_base_dir);
2329         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2330         {
2331           config_base_dir[len - 1] = 0;
2332           len--;
2333         }
2334
2335         if (len < 1)
2336         {
2337           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2338           return (4);
2339         }
2340
2341         _config_base_dir_len = len;
2342       }
2343       break;
2344
2345       case 'p':
2346       {
2347         if (config_pid_file != NULL)
2348           free (config_pid_file);
2349         config_pid_file = strdup (optarg);
2350         if (config_pid_file == NULL)
2351         {
2352           fprintf (stderr, "read_options: strdup failed.\n");
2353           return (3);
2354         }
2355       }
2356       break;
2357
2358       case 'F':
2359         config_flush_at_shutdown = 1;
2360         break;
2361
2362       case 'j':
2363       {
2364         struct stat statbuf;
2365         const char *dir = optarg;
2366
2367         status = stat(dir, &statbuf);
2368         if (status != 0)
2369         {
2370           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2371           return 6;
2372         }
2373
2374         if (!S_ISDIR(statbuf.st_mode)
2375             || access(dir, R_OK|W_OK|X_OK) != 0)
2376         {
2377           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2378                   errno ? rrd_strerror(errno) : "");
2379           return 6;
2380         }
2381
2382         journal_cur = malloc(PATH_MAX + 1);
2383         journal_old = malloc(PATH_MAX + 1);
2384         if (journal_cur == NULL || journal_old == NULL)
2385         {
2386           fprintf(stderr, "malloc failure for journal files\n");
2387           return 6;
2388         }
2389         else 
2390         {
2391           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2392           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2393         }
2394       }
2395       break;
2396
2397       case 'h':
2398       case '?':
2399         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2400             "\n"
2401             "Usage: rrdcached [options]\n"
2402             "\n"
2403             "Valid options are:\n"
2404             "  -l <address>  Socket address to listen to.\n"
2405             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2406             "  -w <seconds>  Interval in which to write data.\n"
2407             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2408             "  -f <seconds>  Interval in which to flush dead data.\n"
2409             "  -p <file>     Location of the PID-file.\n"
2410             "  -b <dir>      Base directory to change to.\n"
2411             "  -B            Restrict file access to paths within -b <dir>\n"
2412             "  -g            Do not fork and run in the foreground.\n"
2413             "  -j <dir>      Directory in which to create the journal files.\n"
2414             "  -F            Always flush all updates at shutdown\n"
2415             "\n"
2416             "For more information and a detailed description of all options "
2417             "please refer\n"
2418             "to the rrdcached(1) manual page.\n",
2419             VERSION);
2420         status = -1;
2421         break;
2422     } /* switch (option) */
2423   } /* while (getopt) */
2424
2425   /* advise the user when values are not sane */
2426   if (config_flush_interval < 2 * config_write_interval)
2427     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2428             " 2x write interval (-w) !\n");
2429   if (config_write_jitter > config_write_interval)
2430     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2431             " write interval (-w) !\n");
2432
2433   if (config_write_base_only && config_base_dir == NULL)
2434     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2435             "  Consult the rrdcached documentation\n");
2436
2437   if (journal_cur == NULL)
2438     config_flush_at_shutdown = 1;
2439
2440   return (status);
2441 } /* }}} int read_options */
2442
2443 int main (int argc, char **argv)
2444 {
2445   int status;
2446
2447   status = read_options (argc, argv);
2448   if (status != 0)
2449   {
2450     if (status < 0)
2451       status = 0;
2452     return (status);
2453   }
2454
2455   status = daemonize ();
2456   if (status == 1)
2457   {
2458     struct sigaction sigchld;
2459
2460     memset (&sigchld, 0, sizeof (sigchld));
2461     sigchld.sa_handler = SIG_IGN;
2462     sigaction (SIGCHLD, &sigchld, NULL);
2463
2464     return (0);
2465   }
2466   else if (status != 0)
2467   {
2468     fprintf (stderr, "daemonize failed, exiting.\n");
2469     return (1);
2470   }
2471
2472   journal_init();
2473
2474   /* start the queue thread */
2475   memset (&queue_thread, 0, sizeof (queue_thread));
2476   status = pthread_create (&queue_thread,
2477                            NULL, /* attr */
2478                            queue_thread_main,
2479                            NULL); /* args */
2480   if (status != 0)
2481   {
2482     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2483     cleanup();
2484     return (1);
2485   }
2486
2487   listen_thread_main (NULL);
2488   cleanup ();
2489
2490   return (0);
2491 } /* int main */
2492
2493 /*
2494  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2495  */