This patch moves the permission handling code around a bit.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
148
149 struct callback_flush_data_s
150 {
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
157
158 enum queue_side_e
159 {
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
164
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
168
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
174
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
177
178 static int do_shutdown = 0;
179
180 static pthread_t queue_thread;
181
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
185
186 /* Cache stuff */
187 static GTree          *cache_tree = NULL;
188 static cache_item_t   *cache_queue_head = NULL;
189 static cache_item_t   *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
192
193 static int config_write_interval = 300;
194 static int config_write_jitter   = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
201
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
204
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
222
223 /* 
224  * Functions
225  */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229   do_shutdown++;
230   pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
232
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   sig_common("INT");
236 } /* }}} void sig_int_handler */
237
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240   sig_common("TERM");
241 } /* }}} void sig_term_handler */
242
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   config_flush_at_shutdown = 1;
246   sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
248
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251   config_flush_at_shutdown = 0;
252   sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
254
255 static void install_signal_handlers(void) /* {{{ */
256 {
257   /* These structures are static, because `sigaction' behaves weird if the are
258    * overwritten.. */
259   static struct sigaction sa_int;
260   static struct sigaction sa_term;
261   static struct sigaction sa_pipe;
262   static struct sigaction sa_usr1;
263   static struct sigaction sa_usr2;
264
265   /* Install signal handlers */
266   memset (&sa_int, 0, sizeof (sa_int));
267   sa_int.sa_handler = sig_int_handler;
268   sigaction (SIGINT, &sa_int, NULL);
269
270   memset (&sa_term, 0, sizeof (sa_term));
271   sa_term.sa_handler = sig_term_handler;
272   sigaction (SIGTERM, &sa_term, NULL);
273
274   memset (&sa_pipe, 0, sizeof (sa_pipe));
275   sa_pipe.sa_handler = SIG_IGN;
276   sigaction (SIGPIPE, &sa_pipe, NULL);
277
278   memset (&sa_pipe, 0, sizeof (sa_usr1));
279   sa_usr1.sa_handler = sig_usr1_handler;
280   sigaction (SIGUSR1, &sa_usr1, NULL);
281
282   memset (&sa_usr2, 0, sizeof (sa_usr2));
283   sa_usr2.sa_handler = sig_usr2_handler;
284   sigaction (SIGUSR2, &sa_usr2, NULL);
285
286 } /* }}} void install_signal_handlers */
287
288 static int open_pidfile(void) /* {{{ */
289 {
290   int fd;
291   char *file;
292
293   file = (config_pid_file != NULL)
294     ? config_pid_file
295     : LOCALSTATEDIR "/run/rrdcached.pid";
296
297   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298   if (fd < 0)
299     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300             file, rrd_strerror(errno));
301
302   return(fd);
303 } /* }}} static int open_pidfile */
304
305 static int write_pidfile (int fd) /* {{{ */
306 {
307   pid_t pid;
308   FILE *fh;
309
310   pid = getpid ();
311
312   fh = fdopen (fd, "w");
313   if (fh == NULL)
314   {
315     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316     close(fd);
317     return (-1);
318   }
319
320   fprintf (fh, "%i\n", (int) pid);
321   fclose (fh);
322
323   return (0);
324 } /* }}} int write_pidfile */
325
326 static int remove_pidfile (void) /* {{{ */
327 {
328   char *file;
329   int status;
330
331   file = (config_pid_file != NULL)
332     ? config_pid_file
333     : LOCALSTATEDIR "/run/rrdcached.pid";
334
335   status = unlink (file);
336   if (status == 0)
337     return (0);
338   return (errno);
339 } /* }}} int remove_pidfile */
340
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343   char *eol;
344
345   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346                sock->next_read - sock->next_cmd);
347
348   if (eol == NULL)
349   {
350     /* no commands left, move remainder back to front of rbuf */
351     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352             sock->next_read - sock->next_cmd);
353     sock->next_read -= sock->next_cmd;
354     sock->next_cmd = 0;
355     *len = 0;
356     return NULL;
357   }
358   else
359   {
360     char *cmd = sock->rbuf + sock->next_cmd;
361     *eol = '\0';
362
363     sock->next_cmd = eol - sock->rbuf + 1;
364
365     if (eol > sock->rbuf && *(eol-1) == '\r')
366       *(--eol) = '\0'; /* handle "\r\n" EOL */
367
368     *len = eol - cmd;
369
370     return cmd;
371   }
372
373   /* NOTREACHED */
374   assert(1==0);
375 }
376
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380   char *new_buf;
381
382   assert(sock != NULL);
383
384   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385   if (new_buf == NULL)
386   {
387     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388     return -1;
389   }
390
391   strncpy(new_buf + sock->wbuf_len, str, len + 1);
392
393   sock->wbuf = new_buf;
394   sock->wbuf_len += len;
395
396   return 0;
397 } /* }}} static int add_to_wbuf */
398
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402   va_list argp;
403   char buffer[CMD_MAX];
404   int len;
405
406   if (sock == NULL) return 0; /* journal replay mode */
407   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408
409   va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413   len = vsprintf(buffer, fmt, argp);
414 #endif
415   va_end(argp);
416   if (len < 0)
417   {
418     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419     return -1;
420   }
421
422   return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
424
425 static int count_lines(char *str) /* {{{ */
426 {
427   int lines = 0;
428
429   if (str != NULL)
430   {
431     while ((str = strchr(str, '\n')) != NULL)
432     {
433       ++lines;
434       ++str;
435     }
436   }
437
438   return lines;
439 } /* }}} static int count_lines */
440
441 /* send the response back to the user.
442  * returns 0 on success, -1 on error
443  * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445                           char *fmt, ...) /* {{{ */
446 {
447   va_list argp;
448   char buffer[CMD_MAX];
449   int lines;
450   ssize_t wrote;
451   int rclen, len;
452
453   if (sock == NULL) return rc;  /* journal replay mode */
454
455   if (sock->batch_mode)
456   {
457     if (rc == RESP_OK)
458       return rc; /* no response on success during BATCH */
459     lines = sock->batch_cmd;
460   }
461   else if (rc == RESP_OK)
462     lines = count_lines(sock->wbuf);
463   else
464     lines = -1;
465
466   rclen = sprintf(buffer, "%d ", lines);
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471   len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475     return -1;
476
477   len += rclen;
478
479   /* append the result to the wbuf, don't write to the user */
480   if (sock->batch_mode)
481     return add_to_wbuf(sock, buffer, len);
482
483   /* first write must be complete */
484   if (len != write(sock->fd, buffer, len))
485   {
486     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487     return -1;
488   }
489
490   if (sock->wbuf != NULL)
491   {
492     wrote = 0;
493     while (wrote < sock->wbuf_len)
494     {
495       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496       if (wb <= 0)
497       {
498         RRDD_LOG(LOG_INFO, "send_response: could not write results");
499         return -1;
500       }
501       wrote += wb;
502     }
503   }
504
505   free(sock->wbuf); sock->wbuf = NULL;
506   sock->wbuf_len = 0;
507
508   return 0;
509 } /* }}} */
510
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513   ci->values = NULL;
514   ci->values_num = 0;
515
516   ci->last_flush_time = when;
517   if (config_write_jitter > 0)
518     ci->last_flush_time += (random() % config_write_jitter);
519 }
520
521 /* remove_from_queue
522  * remove a "cache_item_t" item from the queue.
523  * must hold 'cache_lock' when calling this
524  */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527   if (ci == NULL) return;
528
529   if (ci->prev == NULL)
530     cache_queue_head = ci->next; /* reset head */
531   else
532     ci->prev->next = ci->next;
533
534   if (ci->next == NULL)
535     cache_queue_tail = ci->prev; /* reset the tail */
536   else
537     ci->next->prev = ci->prev;
538
539   ci->next = ci->prev = NULL;
540   ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
542
543 /*
544  * enqueue_cache_item:
545  * `cache_lock' must be acquired before calling this function!
546  */
547 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
548     queue_side_t side)
549 {
550   if (ci == NULL)
551     return (-1);
552
553   if (ci->values_num == 0)
554     return (0);
555
556   if (side == HEAD)
557   {
558     if (cache_queue_head == ci)
559       return 0;
560
561     /* remove from the double linked list */
562     if (ci->flags & CI_FLAGS_IN_QUEUE)
563       remove_from_queue(ci);
564
565     ci->prev = NULL;
566     ci->next = cache_queue_head;
567     if (ci->next != NULL)
568       ci->next->prev = ci;
569     cache_queue_head = ci;
570
571     if (cache_queue_tail == NULL)
572       cache_queue_tail = cache_queue_head;
573   }
574   else /* (side == TAIL) */
575   {
576     /* We don't move values back in the list.. */
577     if (ci->flags & CI_FLAGS_IN_QUEUE)
578       return (0);
579
580     assert (ci->next == NULL);
581     assert (ci->prev == NULL);
582
583     ci->prev = cache_queue_tail;
584
585     if (cache_queue_tail == NULL)
586       cache_queue_head = ci;
587     else
588       cache_queue_tail->next = ci;
589
590     cache_queue_tail = ci;
591   }
592
593   ci->flags |= CI_FLAGS_IN_QUEUE;
594
595   pthread_cond_broadcast(&cache_cond);
596   pthread_mutex_lock (&stats_lock);
597   stats_queue_length++;
598   pthread_mutex_unlock (&stats_lock);
599
600   return (0);
601 } /* }}} int enqueue_cache_item */
602
603 /*
604  * tree_callback_flush:
605  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
606  * while this is in progress.
607  */
608 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
609     gpointer data)
610 {
611   cache_item_t *ci;
612   callback_flush_data_t *cfd;
613
614   ci = (cache_item_t *) value;
615   cfd = (callback_flush_data_t *) data;
616
617   if ((ci->last_flush_time <= cfd->abs_timeout)
618       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
619       && (ci->values_num > 0))
620   {
621     enqueue_cache_item (ci, TAIL);
622   }
623   else if ((do_shutdown != 0)
624       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
625       && (ci->values_num > 0))
626   {
627     enqueue_cache_item (ci, TAIL);
628   }
629   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
630       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
631       && (ci->values_num <= 0))
632   {
633     char **temp;
634
635     temp = (char **) realloc (cfd->keys,
636         sizeof (char *) * (cfd->keys_num + 1));
637     if (temp == NULL)
638     {
639       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
640       return (FALSE);
641     }
642     cfd->keys = temp;
643     /* Make really sure this points to the _same_ place */
644     assert ((char *) key == ci->file);
645     cfd->keys[cfd->keys_num] = (char *) key;
646     cfd->keys_num++;
647   }
648
649   return (FALSE);
650 } /* }}} gboolean tree_callback_flush */
651
652 static int flush_old_values (int max_age)
653 {
654   callback_flush_data_t cfd;
655   size_t k;
656
657   memset (&cfd, 0, sizeof (cfd));
658   /* Pass the current time as user data so that we don't need to call
659    * `time' for each node. */
660   cfd.now = time (NULL);
661   cfd.keys = NULL;
662   cfd.keys_num = 0;
663
664   if (max_age > 0)
665     cfd.abs_timeout = cfd.now - max_age;
666   else
667     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
668
669   /* `tree_callback_flush' will return the keys of all values that haven't
670    * been touched in the last `config_flush_interval' seconds in `cfd'.
671    * The char*'s in this array point to the same memory as ci->file, so we
672    * don't need to free them separately. */
673   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
674
675   for (k = 0; k < cfd.keys_num; k++)
676   {
677     cache_item_t *ci;
678
679     /* This must not fail. */
680     ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
681     assert (ci != NULL);
682
683     /* If we end up here with values available, something's seriously
684      * messed up. */
685     assert (ci->values_num == 0);
686
687     /* Remove the node from the tree */
688     g_tree_remove (cache_tree, cfd.keys[k]);
689     cfd.keys[k] = NULL;
690
691     /* Now free and clean up `ci'. */
692     free (ci->file);
693     ci->file = NULL;
694     free (ci);
695     ci = NULL;
696   } /* for (k = 0; k < cfd.keys_num; k++) */
697
698   if (cfd.keys != NULL)
699   {
700     free (cfd.keys);
701     cfd.keys = NULL;
702   }
703
704   return (0);
705 } /* int flush_old_values */
706
707 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
708 {
709   struct timeval now;
710   struct timespec next_flush;
711   int final_flush = 0; /* make sure we only flush once on shutdown */
712
713   gettimeofday (&now, NULL);
714   next_flush.tv_sec = now.tv_sec + config_flush_interval;
715   next_flush.tv_nsec = 1000 * now.tv_usec;
716
717   pthread_mutex_lock (&cache_lock);
718   while ((do_shutdown == 0) || (cache_queue_head != NULL))
719   {
720     cache_item_t *ci;
721     char *file;
722     char **values;
723     int values_num;
724     int status;
725     int i;
726
727     /* First, check if it's time to do the cache flush. */
728     gettimeofday (&now, NULL);
729     if ((now.tv_sec > next_flush.tv_sec)
730         || ((now.tv_sec == next_flush.tv_sec)
731           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
732     {
733       /* Flush all values that haven't been written in the last
734        * `config_write_interval' seconds. */
735       flush_old_values (config_write_interval);
736
737       /* Determine the time of the next cache flush. */
738       while (next_flush.tv_sec <= now.tv_sec)
739         next_flush.tv_sec += config_flush_interval;
740
741       /* unlock the cache while we rotate so we don't block incoming
742        * updates if the fsync() blocks on disk I/O */
743       pthread_mutex_unlock(&cache_lock);
744       journal_rotate();
745       pthread_mutex_lock(&cache_lock);
746     }
747
748     /* Now, check if there's something to store away. If not, wait until
749      * something comes in or it's time to do the cache flush.  if we are
750      * shutting down, do not wait around.  */
751     if (cache_queue_head == NULL && !do_shutdown)
752     {
753       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
754       if ((status != 0) && (status != ETIMEDOUT))
755       {
756         RRDD_LOG (LOG_ERR, "queue_thread_main: "
757             "pthread_cond_timedwait returned %i.", status);
758       }
759     }
760
761     /* We're about to shut down */
762     if (do_shutdown != 0 && !final_flush++)
763     {
764       if (config_flush_at_shutdown)
765         flush_old_values (-1); /* flush everything */
766       else
767         break;
768     }
769
770     /* Check if a value has arrived. This may be NULL if we timed out or there
771      * was an interrupt such as a signal. */
772     if (cache_queue_head == NULL)
773       continue;
774
775     ci = cache_queue_head;
776
777     /* copy the relevant parts */
778     file = strdup (ci->file);
779     if (file == NULL)
780     {
781       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
782       continue;
783     }
784
785     assert(ci->values != NULL);
786     assert(ci->values_num > 0);
787
788     values = ci->values;
789     values_num = ci->values_num;
790
791     wipe_ci_values(ci, time(NULL));
792     remove_from_queue(ci);
793
794     pthread_mutex_lock (&stats_lock);
795     assert (stats_queue_length > 0);
796     stats_queue_length--;
797     pthread_mutex_unlock (&stats_lock);
798
799     pthread_mutex_unlock (&cache_lock);
800
801     rrd_clear_error ();
802     status = rrd_update_r (file, NULL, values_num, (void *) values);
803     if (status != 0)
804     {
805       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
806           "rrd_update_r (%s) failed with status %i. (%s)",
807           file, status, rrd_get_error());
808     }
809
810     journal_write("wrote", file);
811     pthread_cond_broadcast(&ci->flushed);
812
813     for (i = 0; i < values_num; i++)
814       free (values[i]);
815
816     free(values);
817     free(file);
818
819     if (status == 0)
820     {
821       pthread_mutex_lock (&stats_lock);
822       stats_updates_written++;
823       stats_data_sets_written += values_num;
824       pthread_mutex_unlock (&stats_lock);
825     }
826
827     pthread_mutex_lock (&cache_lock);
828
829     /* We're about to shut down */
830     if (do_shutdown != 0 && !final_flush++)
831     {
832       if (config_flush_at_shutdown)
833           flush_old_values (-1); /* flush everything */
834       else
835         break;
836     }
837   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
838   pthread_mutex_unlock (&cache_lock);
839
840   if (config_flush_at_shutdown)
841   {
842     assert(cache_queue_head == NULL);
843     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
844   }
845
846   journal_done();
847
848   return (NULL);
849 } /* }}} void *queue_thread_main */
850
851 static int buffer_get_field (char **buffer_ret, /* {{{ */
852     size_t *buffer_size_ret, char **field_ret)
853 {
854   char *buffer;
855   size_t buffer_pos;
856   size_t buffer_size;
857   char *field;
858   size_t field_size;
859   int status;
860
861   buffer = *buffer_ret;
862   buffer_pos = 0;
863   buffer_size = *buffer_size_ret;
864   field = *buffer_ret;
865   field_size = 0;
866
867   if (buffer_size <= 0)
868     return (-1);
869
870   /* This is ensured by `handle_request'. */
871   assert (buffer[buffer_size - 1] == '\0');
872
873   status = -1;
874   while (buffer_pos < buffer_size)
875   {
876     /* Check for end-of-field or end-of-buffer */
877     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
878     {
879       field[field_size] = 0;
880       field_size++;
881       buffer_pos++;
882       status = 0;
883       break;
884     }
885     /* Handle escaped characters. */
886     else if (buffer[buffer_pos] == '\\')
887     {
888       if (buffer_pos >= (buffer_size - 1))
889         break;
890       buffer_pos++;
891       field[field_size] = buffer[buffer_pos];
892       field_size++;
893       buffer_pos++;
894     }
895     /* Normal operation */ 
896     else
897     {
898       field[field_size] = buffer[buffer_pos];
899       field_size++;
900       buffer_pos++;
901     }
902   } /* while (buffer_pos < buffer_size) */
903
904   if (status != 0)
905     return (status);
906
907   *buffer_ret = buffer + buffer_pos;
908   *buffer_size_ret = buffer_size - buffer_pos;
909   *field_ret = field;
910
911   return (0);
912 } /* }}} int buffer_get_field */
913
914 /* if we're restricting writes to the base directory,
915  * check whether the file falls within the dir
916  * returns 1 if OK, otherwise 0
917  */
918 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
919 {
920   assert(file != NULL);
921
922   if (!config_write_base_only
923       || sock == NULL /* journal replay */
924       || config_base_dir == NULL)
925     return 1;
926
927   if (strstr(file, "../") != NULL) goto err;
928
929   /* relative paths without "../" are ok */
930   if (*file != '/') return 1;
931
932   /* file must be of the format base + "/" + <1+ char filename> */
933   if (strlen(file) < _config_base_dir_len + 2) goto err;
934   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
935   if (*(file + _config_base_dir_len) != '/') goto err;
936
937   return 1;
938
939 err:
940   if (sock != NULL && sock->fd >= 0)
941     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
942
943   return 0;
944 } /* }}} static int check_file_access */
945
946 /* returns 1 if we have the required privilege level,
947  * otherwise issue an error to the user on sock */
948 static int has_privilege (listen_socket_t *sock, /* {{{ */
949                           socket_privilege priv)
950 {
951   if (sock == NULL) /* journal replay */
952     return 1;
953
954   if (sock->privilege >= priv)
955     return 1;
956
957   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
958 } /* }}} static int has_privilege */
959
960 static int flush_file (const char *filename) /* {{{ */
961 {
962   cache_item_t *ci;
963
964   pthread_mutex_lock (&cache_lock);
965
966   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
967   if (ci == NULL)
968   {
969     pthread_mutex_unlock (&cache_lock);
970     return (ENOENT);
971   }
972
973   if (ci->values_num > 0)
974   {
975     /* Enqueue at head */
976     enqueue_cache_item (ci, HEAD);
977     pthread_cond_wait(&ci->flushed, &cache_lock);
978   }
979
980   pthread_mutex_unlock(&cache_lock);
981
982   return (0);
983 } /* }}} int flush_file */
984
985 static int handle_request_help (listen_socket_t *sock, /* {{{ */
986     char *buffer, size_t buffer_size)
987 {
988   int status;
989   char **help_text;
990   char *command;
991
992   char *help_help[2] =
993   {
994     "Command overview\n"
995     ,
996     "FLUSH <filename>\n"
997     "FLUSHALL\n"
998     "HELP [<command>]\n"
999     "UPDATE <filename> <values> [<values> ...]\n"
1000     "BATCH\n"
1001     "STATS\n"
1002   };
1003
1004   char *help_flush[2] =
1005   {
1006     "Help for FLUSH\n"
1007     ,
1008     "Usage: FLUSH <filename>\n"
1009     "\n"
1010     "Adds the given filename to the head of the update queue and returns\n"
1011     "after is has been dequeued.\n"
1012   };
1013
1014   char *help_flushall[2] =
1015   {
1016     "Help for FLUSHALL\n"
1017     ,
1018     "Usage: FLUSHALL\n"
1019     "\n"
1020     "Triggers writing of all pending updates.  Returns immediately.\n"
1021   };
1022
1023   char *help_update[2] =
1024   {
1025     "Help for UPDATE\n"
1026     ,
1027     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1028     "\n"
1029     "Adds the given file to the internal cache if it is not yet known and\n"
1030     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1031     "for details.\n"
1032     "\n"
1033     "Each <values> has the following form:\n"
1034     "  <values> = <time>:<value>[:<value>[...]]\n"
1035     "See the rrdupdate(1) manpage for details.\n"
1036   };
1037
1038   char *help_stats[2] =
1039   {
1040     "Help for STATS\n"
1041     ,
1042     "Usage: STATS\n"
1043     "\n"
1044     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1045     "a description of the values.\n"
1046   };
1047
1048   char *help_batch[2] =
1049   {
1050     "Help for BATCH\n"
1051     ,
1052     "The 'BATCH' command permits the client to initiate a bulk load\n"
1053     "   of commands to rrdcached.\n"
1054     "\n"
1055     "Usage:\n"
1056     "\n"
1057     "    client: BATCH\n"
1058     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1059     "    client: command #1\n"
1060     "    client: command #2\n"
1061     "    client: ... and so on\n"
1062     "    client: .\n"
1063     "    server: 2 errors\n"
1064     "    server: 7 message for command #7\n"
1065     "    server: 9 message for command #9\n"
1066     "\n"
1067     "For more information, consult the rrdcached(1) documentation.\n"
1068   };
1069
1070   status = buffer_get_field (&buffer, &buffer_size, &command);
1071   if (status != 0)
1072     help_text = help_help;
1073   else
1074   {
1075     if (strcasecmp (command, "update") == 0)
1076       help_text = help_update;
1077     else if (strcasecmp (command, "flush") == 0)
1078       help_text = help_flush;
1079     else if (strcasecmp (command, "flushall") == 0)
1080       help_text = help_flushall;
1081     else if (strcasecmp (command, "stats") == 0)
1082       help_text = help_stats;
1083     else if (strcasecmp (command, "batch") == 0)
1084       help_text = help_batch;
1085     else
1086       help_text = help_help;
1087   }
1088
1089   add_response_info(sock, help_text[1]);
1090   return send_response(sock, RESP_OK, help_text[0]);
1091 } /* }}} int handle_request_help */
1092
1093 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1094 {
1095   uint64_t copy_queue_length;
1096   uint64_t copy_updates_received;
1097   uint64_t copy_flush_received;
1098   uint64_t copy_updates_written;
1099   uint64_t copy_data_sets_written;
1100   uint64_t copy_journal_bytes;
1101   uint64_t copy_journal_rotate;
1102
1103   uint64_t tree_nodes_number;
1104   uint64_t tree_depth;
1105
1106   pthread_mutex_lock (&stats_lock);
1107   copy_queue_length       = stats_queue_length;
1108   copy_updates_received   = stats_updates_received;
1109   copy_flush_received     = stats_flush_received;
1110   copy_updates_written    = stats_updates_written;
1111   copy_data_sets_written  = stats_data_sets_written;
1112   copy_journal_bytes      = stats_journal_bytes;
1113   copy_journal_rotate     = stats_journal_rotate;
1114   pthread_mutex_unlock (&stats_lock);
1115
1116   pthread_mutex_lock (&cache_lock);
1117   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1118   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1119   pthread_mutex_unlock (&cache_lock);
1120
1121   add_response_info(sock,
1122                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1123   add_response_info(sock,
1124                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1125   add_response_info(sock,
1126                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1127   add_response_info(sock,
1128                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1129   add_response_info(sock,
1130                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1131   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1132   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1133   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1134   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1135
1136   send_response(sock, RESP_OK, "Statistics follow\n");
1137
1138   return (0);
1139 } /* }}} int handle_request_stats */
1140
1141 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1142     char *buffer, size_t buffer_size)
1143 {
1144   char *file;
1145   int status;
1146
1147   status = buffer_get_field (&buffer, &buffer_size, &file);
1148   if (status != 0)
1149   {
1150     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1151   }
1152   else
1153   {
1154     pthread_mutex_lock(&stats_lock);
1155     stats_flush_received++;
1156     pthread_mutex_unlock(&stats_lock);
1157
1158     if (!check_file_access(file, sock)) return 0;
1159
1160     status = flush_file (file);
1161     if (status == 0)
1162       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1163     else if (status == ENOENT)
1164     {
1165       /* no file in our tree; see whether it exists at all */
1166       struct stat statbuf;
1167
1168       memset(&statbuf, 0, sizeof(statbuf));
1169       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1170         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1171       else
1172         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1173     }
1174     else if (status < 0)
1175       return send_response(sock, RESP_ERR, "Internal error.\n");
1176     else
1177       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1178   }
1179
1180   /* NOTREACHED */
1181   assert(1==0);
1182 } /* }}} int handle_request_slurp */
1183
1184 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1185 {
1186   int status;
1187
1188   status = has_privilege(sock, PRIV_HIGH);
1189   if (status <= 0)
1190     return status;
1191
1192   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1193
1194   pthread_mutex_lock(&cache_lock);
1195   flush_old_values(-1);
1196   pthread_mutex_unlock(&cache_lock);
1197
1198   return send_response(sock, RESP_OK, "Started flush.\n");
1199 } /* }}} static int handle_request_flushall */
1200
1201 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1202     char *buffer, size_t buffer_size)
1203 {
1204   char *file;
1205   int values_num = 0;
1206   int status;
1207   char orig_buf[CMD_MAX];
1208
1209   time_t now;
1210   cache_item_t *ci;
1211
1212   now = time (NULL);
1213
1214   status = has_privilege(sock, PRIV_HIGH);
1215   if (status <= 0)
1216     return status;
1217
1218   /* save it for the journal later */
1219   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1220
1221   status = buffer_get_field (&buffer, &buffer_size, &file);
1222   if (status != 0)
1223     return send_response(sock, RESP_ERR,
1224                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1225
1226   pthread_mutex_lock(&stats_lock);
1227   stats_updates_received++;
1228   pthread_mutex_unlock(&stats_lock);
1229
1230   if (!check_file_access(file, sock)) return 0;
1231
1232   pthread_mutex_lock (&cache_lock);
1233   ci = g_tree_lookup (cache_tree, file);
1234
1235   if (ci == NULL) /* {{{ */
1236   {
1237     struct stat statbuf;
1238
1239     /* don't hold the lock while we setup; stat(2) might block */
1240     pthread_mutex_unlock(&cache_lock);
1241
1242     memset (&statbuf, 0, sizeof (statbuf));
1243     status = stat (file, &statbuf);
1244     if (status != 0)
1245     {
1246       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1247
1248       status = errno;
1249       if (status == ENOENT)
1250         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1251       else
1252         return send_response(sock, RESP_ERR,
1253                              "stat failed with error %i.\n", status);
1254     }
1255     if (!S_ISREG (statbuf.st_mode))
1256       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1257
1258     if (access(file, R_OK|W_OK) != 0)
1259       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1260                            file, rrd_strerror(errno));
1261
1262     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1263     if (ci == NULL)
1264     {
1265       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1266
1267       return send_response(sock, RESP_ERR, "malloc failed.\n");
1268     }
1269     memset (ci, 0, sizeof (cache_item_t));
1270
1271     ci->file = strdup (file);
1272     if (ci->file == NULL)
1273     {
1274       free (ci);
1275       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1276
1277       return send_response(sock, RESP_ERR, "strdup failed.\n");
1278     }
1279
1280     wipe_ci_values(ci, now);
1281     ci->flags = CI_FLAGS_IN_TREE;
1282
1283     pthread_mutex_lock(&cache_lock);
1284     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1285   } /* }}} */
1286   assert (ci != NULL);
1287
1288   /* don't re-write updates in replay mode */
1289   if (sock != NULL)
1290     journal_write("update", orig_buf);
1291
1292   while (buffer_size > 0)
1293   {
1294     char **temp;
1295     char *value;
1296
1297     status = buffer_get_field (&buffer, &buffer_size, &value);
1298     if (status != 0)
1299     {
1300       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1301       break;
1302     }
1303
1304     temp = (char **) realloc (ci->values,
1305         sizeof (char *) * (ci->values_num + 1));
1306     if (temp == NULL)
1307     {
1308       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1309       continue;
1310     }
1311     ci->values = temp;
1312
1313     ci->values[ci->values_num] = strdup (value);
1314     if (ci->values[ci->values_num] == NULL)
1315     {
1316       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1317       continue;
1318     }
1319     ci->values_num++;
1320
1321     values_num++;
1322   }
1323
1324   if (((now - ci->last_flush_time) >= config_write_interval)
1325       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1326       && (ci->values_num > 0))
1327   {
1328     enqueue_cache_item (ci, TAIL);
1329   }
1330
1331   pthread_mutex_unlock (&cache_lock);
1332
1333   if (values_num < 1)
1334     return send_response(sock, RESP_ERR, "No values updated.\n");
1335   else
1336     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1337
1338   /* NOTREACHED */
1339   assert(1==0);
1340
1341 } /* }}} int handle_request_update */
1342
1343 /* we came across a "WROTE" entry during journal replay.
1344  * throw away any values that we have accumulated for this file
1345  */
1346 static int handle_request_wrote (const char *buffer) /* {{{ */
1347 {
1348   int i;
1349   cache_item_t *ci;
1350   const char *file = buffer;
1351
1352   pthread_mutex_lock(&cache_lock);
1353
1354   ci = g_tree_lookup(cache_tree, file);
1355   if (ci == NULL)
1356   {
1357     pthread_mutex_unlock(&cache_lock);
1358     return (0);
1359   }
1360
1361   if (ci->values)
1362   {
1363     for (i=0; i < ci->values_num; i++)
1364       free(ci->values[i]);
1365
1366     free(ci->values);
1367   }
1368
1369   wipe_ci_values(ci, time(NULL));
1370   remove_from_queue(ci);
1371
1372   pthread_mutex_unlock(&cache_lock);
1373   return (0);
1374 } /* }}} int handle_request_wrote */
1375
1376 /* start "BATCH" processing */
1377 static int batch_start (listen_socket_t *sock) /* {{{ */
1378 {
1379   int status;
1380   if (sock->batch_mode)
1381     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1382
1383   status = send_response(sock, RESP_OK,
1384                          "Go ahead.  End with dot '.' on its own line.\n");
1385   sock->batch_mode = 1;
1386   sock->batch_cmd = 0;
1387
1388   return status;
1389 } /* }}} static int batch_start */
1390
1391 /* finish "BATCH" processing and return results to the client */
1392 static int batch_done (listen_socket_t *sock) /* {{{ */
1393 {
1394   assert(sock->batch_mode);
1395   sock->batch_mode = 0;
1396   sock->batch_cmd  = 0;
1397   return send_response(sock, RESP_OK, "errors\n");
1398 } /* }}} static int batch_done */
1399
1400 /* if sock==NULL, we are in journal replay mode */
1401 static int handle_request (listen_socket_t *sock, /* {{{ */
1402                            char *buffer, size_t buffer_size)
1403 {
1404   char *buffer_ptr;
1405   char *command;
1406   int status;
1407
1408   assert (buffer[buffer_size - 1] == '\0');
1409
1410   buffer_ptr = buffer;
1411   command = NULL;
1412   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1413   if (status != 0)
1414   {
1415     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1416     return (-1);
1417   }
1418
1419   if (sock != NULL && sock->batch_mode)
1420     sock->batch_cmd++;
1421
1422   if (strcasecmp (command, "update") == 0)
1423     return (handle_request_update (sock, buffer_ptr, buffer_size));
1424   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1425   {
1426     /* this is only valid in replay mode */
1427     return (handle_request_wrote (buffer_ptr));
1428   }
1429   else if (strcasecmp (command, "flush") == 0)
1430     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1431   else if (strcasecmp (command, "flushall") == 0)
1432     return (handle_request_flushall(sock));
1433   else if (strcasecmp (command, "stats") == 0)
1434     return (handle_request_stats (sock));
1435   else if (strcasecmp (command, "help") == 0)
1436     return (handle_request_help (sock, buffer_ptr, buffer_size));
1437   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1438     return batch_start(sock);
1439   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1440     return batch_done(sock);
1441   else
1442     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1443
1444   /* NOTREACHED */
1445   assert(1==0);
1446 } /* }}} int handle_request */
1447
1448 /* MUST NOT hold journal_lock before calling this */
1449 static void journal_rotate(void) /* {{{ */
1450 {
1451   FILE *old_fh = NULL;
1452   int new_fd;
1453
1454   if (journal_cur == NULL || journal_old == NULL)
1455     return;
1456
1457   pthread_mutex_lock(&journal_lock);
1458
1459   /* we rotate this way (rename before close) so that the we can release
1460    * the journal lock as fast as possible.  Journal writes to the new
1461    * journal can proceed immediately after the new file is opened.  The
1462    * fclose can then block without affecting new updates.
1463    */
1464   if (journal_fh != NULL)
1465   {
1466     old_fh = journal_fh;
1467     journal_fh = NULL;
1468     rename(journal_cur, journal_old);
1469     ++stats_journal_rotate;
1470   }
1471
1472   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1473                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1474   if (new_fd >= 0)
1475   {
1476     journal_fh = fdopen(new_fd, "a");
1477     if (journal_fh == NULL)
1478       close(new_fd);
1479   }
1480
1481   pthread_mutex_unlock(&journal_lock);
1482
1483   if (old_fh != NULL)
1484     fclose(old_fh);
1485
1486   if (journal_fh == NULL)
1487   {
1488     RRDD_LOG(LOG_CRIT,
1489              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1490              journal_cur, rrd_strerror(errno));
1491
1492     RRDD_LOG(LOG_ERR,
1493              "JOURNALING DISABLED: All values will be flushed at shutdown");
1494     config_flush_at_shutdown = 1;
1495   }
1496
1497 } /* }}} static void journal_rotate */
1498
1499 static void journal_done(void) /* {{{ */
1500 {
1501   if (journal_cur == NULL)
1502     return;
1503
1504   pthread_mutex_lock(&journal_lock);
1505   if (journal_fh != NULL)
1506   {
1507     fclose(journal_fh);
1508     journal_fh = NULL;
1509   }
1510
1511   if (config_flush_at_shutdown)
1512   {
1513     RRDD_LOG(LOG_INFO, "removing journals");
1514     unlink(journal_old);
1515     unlink(journal_cur);
1516   }
1517   else
1518   {
1519     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1520              "journals will be used at next startup");
1521   }
1522
1523   pthread_mutex_unlock(&journal_lock);
1524
1525 } /* }}} static void journal_done */
1526
1527 static int journal_write(char *cmd, char *args) /* {{{ */
1528 {
1529   int chars;
1530
1531   if (journal_fh == NULL)
1532     return 0;
1533
1534   pthread_mutex_lock(&journal_lock);
1535   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1536   pthread_mutex_unlock(&journal_lock);
1537
1538   if (chars > 0)
1539   {
1540     pthread_mutex_lock(&stats_lock);
1541     stats_journal_bytes += chars;
1542     pthread_mutex_unlock(&stats_lock);
1543   }
1544
1545   return chars;
1546 } /* }}} static int journal_write */
1547
1548 static int journal_replay (const char *file) /* {{{ */
1549 {
1550   FILE *fh;
1551   int entry_cnt = 0;
1552   int fail_cnt = 0;
1553   uint64_t line = 0;
1554   char entry[CMD_MAX];
1555
1556   if (file == NULL) return 0;
1557
1558   {
1559     char *reason;
1560     int status = 0;
1561     struct stat statbuf;
1562
1563     memset(&statbuf, 0, sizeof(statbuf));
1564     if (stat(file, &statbuf) != 0)
1565     {
1566       if (errno == ENOENT)
1567         return 0;
1568
1569       reason = "stat error";
1570       status = errno;
1571     }
1572     else if (!S_ISREG(statbuf.st_mode))
1573     {
1574       reason = "not a regular file";
1575       status = EPERM;
1576     }
1577     if (statbuf.st_uid != daemon_uid)
1578     {
1579       reason = "not owned by daemon user";
1580       status = EACCES;
1581     }
1582     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1583     {
1584       reason = "must not be user/group writable";
1585       status = EACCES;
1586     }
1587
1588     if (status != 0)
1589     {
1590       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1591                file, rrd_strerror(status), reason);
1592       return 0;
1593     }
1594   }
1595
1596   fh = fopen(file, "r");
1597   if (fh == NULL)
1598   {
1599     if (errno != ENOENT)
1600       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1601                file, rrd_strerror(errno));
1602     return 0;
1603   }
1604   else
1605     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1606
1607   while(!feof(fh))
1608   {
1609     size_t entry_len;
1610
1611     ++line;
1612     if (fgets(entry, sizeof(entry), fh) == NULL)
1613       break;
1614     entry_len = strlen(entry);
1615
1616     /* check \n termination in case journal writing crashed mid-line */
1617     if (entry_len == 0)
1618       continue;
1619     else if (entry[entry_len - 1] != '\n')
1620     {
1621       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1622       ++fail_cnt;
1623       continue;
1624     }
1625
1626     entry[entry_len - 1] = '\0';
1627
1628     if (handle_request(NULL, entry, entry_len) == 0)
1629       ++entry_cnt;
1630     else
1631       ++fail_cnt;
1632   }
1633
1634   fclose(fh);
1635
1636   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1637            entry_cnt, fail_cnt);
1638
1639   return entry_cnt > 0 ? 1 : 0;
1640 } /* }}} static int journal_replay */
1641
1642 static void journal_init(void) /* {{{ */
1643 {
1644   int had_journal = 0;
1645
1646   if (journal_cur == NULL) return;
1647
1648   pthread_mutex_lock(&journal_lock);
1649
1650   RRDD_LOG(LOG_INFO, "checking for journal files");
1651
1652   had_journal += journal_replay(journal_old);
1653   had_journal += journal_replay(journal_cur);
1654
1655   /* it must have been a crash.  start a flush */
1656   if (had_journal && config_flush_at_shutdown)
1657     flush_old_values(-1);
1658
1659   pthread_mutex_unlock(&journal_lock);
1660   journal_rotate();
1661
1662   RRDD_LOG(LOG_INFO, "journal processing complete");
1663
1664 } /* }}} static void journal_init */
1665
1666 static void close_connection(listen_socket_t *sock)
1667 {
1668   close(sock->fd) ;  sock->fd   = -1;
1669   free(sock->rbuf);  sock->rbuf = NULL;
1670   free(sock->wbuf);  sock->wbuf = NULL;
1671
1672   free(sock);
1673 }
1674
1675 static void *connection_thread_main (void *args) /* {{{ */
1676 {
1677   pthread_t self;
1678   listen_socket_t *sock;
1679   int i;
1680   int fd;
1681
1682   sock = (listen_socket_t *) args;
1683   fd = sock->fd;
1684
1685   /* init read buffers */
1686   sock->next_read = sock->next_cmd = 0;
1687   sock->rbuf = malloc(RBUF_SIZE);
1688   if (sock->rbuf == NULL)
1689   {
1690     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1691     close_connection(sock);
1692     return NULL;
1693   }
1694
1695   pthread_mutex_lock (&connection_threads_lock);
1696   {
1697     pthread_t *temp;
1698
1699     temp = (pthread_t *) realloc (connection_threads,
1700         sizeof (pthread_t) * (connection_threads_num + 1));
1701     if (temp == NULL)
1702     {
1703       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1704     }
1705     else
1706     {
1707       connection_threads = temp;
1708       connection_threads[connection_threads_num] = pthread_self ();
1709       connection_threads_num++;
1710     }
1711   }
1712   pthread_mutex_unlock (&connection_threads_lock);
1713
1714   while (do_shutdown == 0)
1715   {
1716     char *cmd;
1717     ssize_t cmd_len;
1718     ssize_t rbytes;
1719
1720     struct pollfd pollfd;
1721     int status;
1722
1723     pollfd.fd = fd;
1724     pollfd.events = POLLIN | POLLPRI;
1725     pollfd.revents = 0;
1726
1727     status = poll (&pollfd, 1, /* timeout = */ 500);
1728     if (do_shutdown)
1729       break;
1730     else if (status == 0) /* timeout */
1731       continue;
1732     else if (status < 0) /* error */
1733     {
1734       status = errno;
1735       if (status == EINTR)
1736         continue;
1737       RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1738       continue;
1739     }
1740
1741     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1742     {
1743       close_connection(sock);
1744       break;
1745     }
1746     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1747     {
1748       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1749           "poll(2) returned something unexpected: %#04hx",
1750           pollfd.revents);
1751       close_connection(sock);
1752       break;
1753     }
1754
1755     rbytes = read(fd, sock->rbuf + sock->next_read,
1756                   RBUF_SIZE - sock->next_read);
1757     if (rbytes < 0)
1758     {
1759       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1760       break;
1761     }
1762     else if (rbytes == 0)
1763       break; /* eof */
1764
1765     sock->next_read += rbytes;
1766
1767     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1768     {
1769       status = handle_request (sock, cmd, cmd_len+1);
1770       if (status != 0)
1771         goto out_close;
1772     }
1773   }
1774
1775 out_close:
1776   close_connection(sock);
1777
1778   self = pthread_self ();
1779   /* Remove this thread from the connection threads list */
1780   pthread_mutex_lock (&connection_threads_lock);
1781   /* Find out own index in the array */
1782   for (i = 0; i < connection_threads_num; i++)
1783     if (pthread_equal (connection_threads[i], self) != 0)
1784       break;
1785   assert (i < connection_threads_num);
1786
1787   /* Move the trailing threads forward. */
1788   if (i < (connection_threads_num - 1))
1789   {
1790     memmove (connection_threads + i,
1791         connection_threads + i + 1,
1792         sizeof (pthread_t) * (connection_threads_num - i - 1));
1793   }
1794
1795   connection_threads_num--;
1796   pthread_mutex_unlock (&connection_threads_lock);
1797
1798   return (NULL);
1799 } /* }}} void *connection_thread_main */
1800
1801 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1802 {
1803   int fd;
1804   struct sockaddr_un sa;
1805   listen_socket_t *temp;
1806   int status;
1807   const char *path;
1808
1809   path = sock->addr;
1810   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1811     path += strlen("unix:");
1812
1813   temp = (listen_socket_t *) realloc (listen_fds,
1814       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1815   if (temp == NULL)
1816   {
1817     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1818     return (-1);
1819   }
1820   listen_fds = temp;
1821   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1822
1823   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1824   if (fd < 0)
1825   {
1826     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1827     return (-1);
1828   }
1829
1830   memset (&sa, 0, sizeof (sa));
1831   sa.sun_family = AF_UNIX;
1832   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1833
1834   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1835   if (status != 0)
1836   {
1837     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1838     close (fd);
1839     unlink (path);
1840     return (-1);
1841   }
1842
1843   status = listen (fd, /* backlog = */ 10);
1844   if (status != 0)
1845   {
1846     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1847     close (fd);
1848     unlink (path);
1849     return (-1);
1850   }
1851
1852   listen_fds[listen_fds_num].fd = fd;
1853   listen_fds[listen_fds_num].family = PF_UNIX;
1854   strncpy(listen_fds[listen_fds_num].addr, path,
1855           sizeof (listen_fds[listen_fds_num].addr) - 1);
1856   listen_fds_num++;
1857
1858   return (0);
1859 } /* }}} int open_listen_socket_unix */
1860
1861 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1862 {
1863   struct addrinfo ai_hints;
1864   struct addrinfo *ai_res;
1865   struct addrinfo *ai_ptr;
1866   char addr_copy[NI_MAXHOST];
1867   char *addr;
1868   char *port;
1869   int status;
1870
1871   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1872   addr_copy[sizeof (addr_copy) - 1] = 0;
1873   addr = addr_copy;
1874
1875   memset (&ai_hints, 0, sizeof (ai_hints));
1876   ai_hints.ai_flags = 0;
1877 #ifdef AI_ADDRCONFIG
1878   ai_hints.ai_flags |= AI_ADDRCONFIG;
1879 #endif
1880   ai_hints.ai_family = AF_UNSPEC;
1881   ai_hints.ai_socktype = SOCK_STREAM;
1882
1883   port = NULL;
1884   if (*addr == '[') /* IPv6+port format */
1885   {
1886     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1887     addr++;
1888
1889     port = strchr (addr, ']');
1890     if (port == NULL)
1891     {
1892       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
1893           sock->addr);
1894       return (-1);
1895     }
1896     *port = 0;
1897     port++;
1898
1899     if (*port == ':')
1900       port++;
1901     else if (*port == 0)
1902       port = NULL;
1903     else
1904     {
1905       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
1906           port);
1907       return (-1);
1908     }
1909   } /* if (*addr = ']') */
1910   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
1911   {
1912     port = rindex(addr, ':');
1913     if (port != NULL)
1914     {
1915       *port = 0;
1916       port++;
1917     }
1918   }
1919   ai_res = NULL;
1920   status = getaddrinfo (addr,
1921                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
1922                         &ai_hints, &ai_res);
1923   if (status != 0)
1924   {
1925     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
1926         "%s", addr, gai_strerror (status));
1927     return (-1);
1928   }
1929
1930   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
1931   {
1932     int fd;
1933     listen_socket_t *temp;
1934     int one = 1;
1935
1936     temp = (listen_socket_t *) realloc (listen_fds,
1937         sizeof (listen_fds[0]) * (listen_fds_num + 1));
1938     if (temp == NULL)
1939     {
1940       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
1941       continue;
1942     }
1943     listen_fds = temp;
1944     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1945
1946     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
1947     if (fd < 0)
1948     {
1949       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
1950       continue;
1951     }
1952
1953     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
1954
1955     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
1956     if (status != 0)
1957     {
1958       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
1959       close (fd);
1960       continue;
1961     }
1962
1963     status = listen (fd, /* backlog = */ 10);
1964     if (status != 0)
1965     {
1966       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
1967       close (fd);
1968       return (-1);
1969     }
1970
1971     listen_fds[listen_fds_num].fd = fd;
1972     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
1973     listen_fds_num++;
1974   } /* for (ai_ptr) */
1975
1976   return (0);
1977 } /* }}} static int open_listen_socket_network */
1978
1979 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
1980 {
1981   assert(sock != NULL);
1982   assert(sock->addr != NULL);
1983
1984   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
1985       || sock->addr[0] == '/')
1986     return (open_listen_socket_unix(sock));
1987   else
1988     return (open_listen_socket_network(sock));
1989 } /* }}} int open_listen_socket */
1990
1991 static int close_listen_sockets (void) /* {{{ */
1992 {
1993   size_t i;
1994
1995   for (i = 0; i < listen_fds_num; i++)
1996   {
1997     close (listen_fds[i].fd);
1998
1999     if (listen_fds[i].family == PF_UNIX)
2000       unlink(listen_fds[i].addr);
2001   }
2002
2003   free (listen_fds);
2004   listen_fds = NULL;
2005   listen_fds_num = 0;
2006
2007   return (0);
2008 } /* }}} int close_listen_sockets */
2009
2010 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2011 {
2012   struct pollfd *pollfds;
2013   int pollfds_num;
2014   int status;
2015   int i;
2016
2017   for (i = 0; i < config_listen_address_list_len; i++)
2018     open_listen_socket (config_listen_address_list[i]);
2019
2020   if (config_listen_address_list_len < 1)
2021   {
2022     listen_socket_t sock;
2023     memset(&sock, 0, sizeof(sock));
2024     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2025     open_listen_socket (&sock);
2026   }
2027
2028   if (listen_fds_num < 1)
2029   {
2030     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2031         "could be opened. Sorry.");
2032     return (NULL);
2033   }
2034
2035   pollfds_num = listen_fds_num;
2036   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2037   if (pollfds == NULL)
2038   {
2039     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2040     return (NULL);
2041   }
2042   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2043
2044   RRDD_LOG(LOG_INFO, "listening for connections");
2045
2046   while (do_shutdown == 0)
2047   {
2048     assert (pollfds_num == ((int) listen_fds_num));
2049     for (i = 0; i < pollfds_num; i++)
2050     {
2051       pollfds[i].fd = listen_fds[i].fd;
2052       pollfds[i].events = POLLIN | POLLPRI;
2053       pollfds[i].revents = 0;
2054     }
2055
2056     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2057     if (do_shutdown)
2058       break;
2059     else if (status == 0) /* timeout */
2060       continue;
2061     else if (status < 0) /* error */
2062     {
2063       status = errno;
2064       if (status != EINTR)
2065       {
2066         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2067       }
2068       continue;
2069     }
2070
2071     for (i = 0; i < pollfds_num; i++)
2072     {
2073       listen_socket_t *client_sock;
2074       struct sockaddr_storage client_sa;
2075       socklen_t client_sa_size;
2076       pthread_t tid;
2077       pthread_attr_t attr;
2078
2079       if (pollfds[i].revents == 0)
2080         continue;
2081
2082       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2083       {
2084         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2085             "poll(2) returned something unexpected for listen FD #%i.",
2086             pollfds[i].fd);
2087         continue;
2088       }
2089
2090       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2091       if (client_sock == NULL)
2092       {
2093         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2094         continue;
2095       }
2096       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2097
2098       client_sa_size = sizeof (client_sa);
2099       client_sock->fd = accept (pollfds[i].fd,
2100           (struct sockaddr *) &client_sa, &client_sa_size);
2101       if (client_sock->fd < 0)
2102       {
2103         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2104         free(client_sock);
2105         continue;
2106       }
2107
2108       pthread_attr_init (&attr);
2109       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2110
2111       status = pthread_create (&tid, &attr, connection_thread_main,
2112                                client_sock);
2113       if (status != 0)
2114       {
2115         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2116         close_connection(client_sock);
2117         continue;
2118       }
2119     } /* for (pollfds_num) */
2120   } /* while (do_shutdown == 0) */
2121
2122   RRDD_LOG(LOG_INFO, "starting shutdown");
2123
2124   close_listen_sockets ();
2125
2126   pthread_mutex_lock (&connection_threads_lock);
2127   while (connection_threads_num > 0)
2128   {
2129     pthread_t wait_for;
2130
2131     wait_for = connection_threads[0];
2132
2133     pthread_mutex_unlock (&connection_threads_lock);
2134     pthread_join (wait_for, /* retval = */ NULL);
2135     pthread_mutex_lock (&connection_threads_lock);
2136   }
2137   pthread_mutex_unlock (&connection_threads_lock);
2138
2139   return (NULL);
2140 } /* }}} void *listen_thread_main */
2141
2142 static int daemonize (void) /* {{{ */
2143 {
2144   int status;
2145   int fd;
2146   char *base_dir;
2147
2148   daemon_uid = geteuid();
2149
2150   fd = open_pidfile();
2151   if (fd < 0) return fd;
2152
2153   if (!stay_foreground)
2154   {
2155     pid_t child;
2156
2157     child = fork ();
2158     if (child < 0)
2159     {
2160       fprintf (stderr, "daemonize: fork(2) failed.\n");
2161       return (-1);
2162     }
2163     else if (child > 0)
2164     {
2165       return (1);
2166     }
2167
2168     /* Become session leader */
2169     setsid ();
2170
2171     /* Open the first three file descriptors to /dev/null */
2172     close (2);
2173     close (1);
2174     close (0);
2175
2176     open ("/dev/null", O_RDWR);
2177     dup (0);
2178     dup (0);
2179   } /* if (!stay_foreground) */
2180
2181   /* Change into the /tmp directory. */
2182   base_dir = (config_base_dir != NULL)
2183     ? config_base_dir
2184     : "/tmp";
2185   status = chdir (base_dir);
2186   if (status != 0)
2187   {
2188     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2189     return (-1);
2190   }
2191
2192   install_signal_handlers();
2193
2194   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2195   RRDD_LOG(LOG_INFO, "starting up");
2196
2197   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2198   if (cache_tree == NULL)
2199   {
2200     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2201     return (-1);
2202   }
2203
2204   status = write_pidfile (fd);
2205   return status;
2206 } /* }}} int daemonize */
2207
2208 static int cleanup (void) /* {{{ */
2209 {
2210   do_shutdown++;
2211
2212   pthread_cond_signal (&cache_cond);
2213   pthread_join (queue_thread, /* return = */ NULL);
2214
2215   remove_pidfile ();
2216
2217   RRDD_LOG(LOG_INFO, "goodbye");
2218   closelog ();
2219
2220   return (0);
2221 } /* }}} int cleanup */
2222
2223 static int read_options (int argc, char **argv) /* {{{ */
2224 {
2225   int option;
2226   int status = 0;
2227
2228   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2229   {
2230     switch (option)
2231     {
2232       case 'g':
2233         stay_foreground=1;
2234         break;
2235
2236       case 'L':
2237       case 'l':
2238       {
2239         listen_socket_t **temp;
2240         listen_socket_t *new;
2241
2242         new = malloc(sizeof(listen_socket_t));
2243         if (new == NULL)
2244         {
2245           fprintf(stderr, "read_options: malloc failed.\n");
2246           return(2);
2247         }
2248         memset(new, 0, sizeof(listen_socket_t));
2249
2250         temp = (listen_socket_t **) realloc (config_listen_address_list,
2251             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2252         if (temp == NULL)
2253         {
2254           fprintf (stderr, "read_options: realloc failed.\n");
2255           return (2);
2256         }
2257         config_listen_address_list = temp;
2258
2259         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2260         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2261
2262         temp[config_listen_address_list_len] = new;
2263         config_listen_address_list_len++;
2264       }
2265       break;
2266
2267       case 'f':
2268       {
2269         int temp;
2270
2271         temp = atoi (optarg);
2272         if (temp > 0)
2273           config_flush_interval = temp;
2274         else
2275         {
2276           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2277           status = 3;
2278         }
2279       }
2280       break;
2281
2282       case 'w':
2283       {
2284         int temp;
2285
2286         temp = atoi (optarg);
2287         if (temp > 0)
2288           config_write_interval = temp;
2289         else
2290         {
2291           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2292           status = 2;
2293         }
2294       }
2295       break;
2296
2297       case 'z':
2298       {
2299         int temp;
2300
2301         temp = atoi(optarg);
2302         if (temp > 0)
2303           config_write_jitter = temp;
2304         else
2305         {
2306           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2307           status = 2;
2308         }
2309
2310         break;
2311       }
2312
2313       case 'B':
2314         config_write_base_only = 1;
2315         break;
2316
2317       case 'b':
2318       {
2319         size_t len;
2320
2321         if (config_base_dir != NULL)
2322           free (config_base_dir);
2323         config_base_dir = strdup (optarg);
2324         if (config_base_dir == NULL)
2325         {
2326           fprintf (stderr, "read_options: strdup failed.\n");
2327           return (3);
2328         }
2329
2330         len = strlen (config_base_dir);
2331         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2332         {
2333           config_base_dir[len - 1] = 0;
2334           len--;
2335         }
2336
2337         if (len < 1)
2338         {
2339           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2340           return (4);
2341         }
2342
2343         _config_base_dir_len = len;
2344       }
2345       break;
2346
2347       case 'p':
2348       {
2349         if (config_pid_file != NULL)
2350           free (config_pid_file);
2351         config_pid_file = strdup (optarg);
2352         if (config_pid_file == NULL)
2353         {
2354           fprintf (stderr, "read_options: strdup failed.\n");
2355           return (3);
2356         }
2357       }
2358       break;
2359
2360       case 'F':
2361         config_flush_at_shutdown = 1;
2362         break;
2363
2364       case 'j':
2365       {
2366         struct stat statbuf;
2367         const char *dir = optarg;
2368
2369         status = stat(dir, &statbuf);
2370         if (status != 0)
2371         {
2372           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2373           return 6;
2374         }
2375
2376         if (!S_ISDIR(statbuf.st_mode)
2377             || access(dir, R_OK|W_OK|X_OK) != 0)
2378         {
2379           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2380                   errno ? rrd_strerror(errno) : "");
2381           return 6;
2382         }
2383
2384         journal_cur = malloc(PATH_MAX + 1);
2385         journal_old = malloc(PATH_MAX + 1);
2386         if (journal_cur == NULL || journal_old == NULL)
2387         {
2388           fprintf(stderr, "malloc failure for journal files\n");
2389           return 6;
2390         }
2391         else 
2392         {
2393           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2394           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2395         }
2396       }
2397       break;
2398
2399       case 'h':
2400       case '?':
2401         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2402             "\n"
2403             "Usage: rrdcached [options]\n"
2404             "\n"
2405             "Valid options are:\n"
2406             "  -l <address>  Socket address to listen to.\n"
2407             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2408             "  -w <seconds>  Interval in which to write data.\n"
2409             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2410             "  -f <seconds>  Interval in which to flush dead data.\n"
2411             "  -p <file>     Location of the PID-file.\n"
2412             "  -b <dir>      Base directory to change to.\n"
2413             "  -B            Restrict file access to paths within -b <dir>\n"
2414             "  -g            Do not fork and run in the foreground.\n"
2415             "  -j <dir>      Directory in which to create the journal files.\n"
2416             "  -F            Always flush all updates at shutdown\n"
2417             "\n"
2418             "For more information and a detailed description of all options "
2419             "please refer\n"
2420             "to the rrdcached(1) manual page.\n",
2421             VERSION);
2422         status = -1;
2423         break;
2424     } /* switch (option) */
2425   } /* while (getopt) */
2426
2427   /* advise the user when values are not sane */
2428   if (config_flush_interval < 2 * config_write_interval)
2429     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2430             " 2x write interval (-w) !\n");
2431   if (config_write_jitter > config_write_interval)
2432     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2433             " write interval (-w) !\n");
2434
2435   if (config_write_base_only && config_base_dir == NULL)
2436     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2437             "  Consult the rrdcached documentation\n");
2438
2439   if (journal_cur == NULL)
2440     config_flush_at_shutdown = 1;
2441
2442   return (status);
2443 } /* }}} int read_options */
2444
2445 int main (int argc, char **argv)
2446 {
2447   int status;
2448
2449   status = read_options (argc, argv);
2450   if (status != 0)
2451   {
2452     if (status < 0)
2453       status = 0;
2454     return (status);
2455   }
2456
2457   status = daemonize ();
2458   if (status == 1)
2459   {
2460     struct sigaction sigchld;
2461
2462     memset (&sigchld, 0, sizeof (sigchld));
2463     sigchld.sa_handler = SIG_IGN;
2464     sigaction (SIGCHLD, &sigchld, NULL);
2465
2466     return (0);
2467   }
2468   else if (status != 0)
2469   {
2470     fprintf (stderr, "daemonize failed, exiting.\n");
2471     return (1);
2472   }
2473
2474   journal_init();
2475
2476   /* start the queue thread */
2477   memset (&queue_thread, 0, sizeof (queue_thread));
2478   status = pthread_create (&queue_thread,
2479                            NULL, /* attr */
2480                            queue_thread_main,
2481                            NULL); /* args */
2482   if (status != 0)
2483   {
2484     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2485     cleanup();
2486     return (1);
2487   }
2488
2489   listen_thread_main (NULL);
2490   cleanup ();
2491
2492   return (0);
2493 } /* int main */
2494
2495 /*
2496  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2497  */