36d418b1c0b63cedf787a596e6081d3913e367d0
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   int batch_mode;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141 #define CI_FLAGS_IN_TREE  (1<<0)
142 #define CI_FLAGS_IN_QUEUE (1<<1)
143   int flags;
144   pthread_cond_t  flushed;
145   cache_item_t *prev;
146   cache_item_t *next;
147 };
148
149 struct callback_flush_data_s
150 {
151   time_t now;
152   time_t abs_timeout;
153   char **keys;
154   size_t keys_num;
155 };
156 typedef struct callback_flush_data_s callback_flush_data_t;
157
158 enum queue_side_e
159 {
160   HEAD,
161   TAIL
162 };
163 typedef enum queue_side_e queue_side_t;
164
165 /* max length of socket command or response */
166 #define CMD_MAX 4096
167 #define RBUF_SIZE (CMD_MAX*2)
168
169 /*
170  * Variables
171  */
172 static int stay_foreground = 0;
173 static uid_t daemon_uid;
174
175 static listen_socket_t *listen_fds = NULL;
176 static size_t listen_fds_num = 0;
177
178 static int do_shutdown = 0;
179
180 static pthread_t queue_thread;
181
182 static pthread_t *connection_threads = NULL;
183 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
184 static int connection_threads_num = 0;
185
186 /* Cache stuff */
187 static GTree          *cache_tree = NULL;
188 static cache_item_t   *cache_queue_head = NULL;
189 static cache_item_t   *cache_queue_tail = NULL;
190 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
191 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
192
193 static int config_write_interval = 300;
194 static int config_write_jitter   = 0;
195 static int config_flush_interval = 3600;
196 static int config_flush_at_shutdown = 0;
197 static char *config_pid_file = NULL;
198 static char *config_base_dir = NULL;
199 static size_t _config_base_dir_len = 0;
200 static int config_write_base_only = 0;
201
202 static listen_socket_t **config_listen_address_list = NULL;
203 static int config_listen_address_list_len = 0;
204
205 static uint64_t stats_queue_length = 0;
206 static uint64_t stats_updates_received = 0;
207 static uint64_t stats_flush_received = 0;
208 static uint64_t stats_updates_written = 0;
209 static uint64_t stats_data_sets_written = 0;
210 static uint64_t stats_journal_bytes = 0;
211 static uint64_t stats_journal_rotate = 0;
212 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
213
214 /* Journaled updates */
215 static char *journal_cur = NULL;
216 static char *journal_old = NULL;
217 static FILE *journal_fh = NULL;
218 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
219 static int journal_write(char *cmd, char *args);
220 static void journal_done(void);
221 static void journal_rotate(void);
222
223 /* 
224  * Functions
225  */
226 static void sig_common (const char *sig) /* {{{ */
227 {
228   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
229   do_shutdown++;
230   pthread_cond_broadcast(&cache_cond);
231 } /* }}} void sig_common */
232
233 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
234 {
235   sig_common("INT");
236 } /* }}} void sig_int_handler */
237
238 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
239 {
240   sig_common("TERM");
241 } /* }}} void sig_term_handler */
242
243 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   config_flush_at_shutdown = 1;
246   sig_common("USR1");
247 } /* }}} void sig_usr1_handler */
248
249 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
250 {
251   config_flush_at_shutdown = 0;
252   sig_common("USR2");
253 } /* }}} void sig_usr2_handler */
254
255 static void install_signal_handlers(void) /* {{{ */
256 {
257   /* These structures are static, because `sigaction' behaves weird if the are
258    * overwritten.. */
259   static struct sigaction sa_int;
260   static struct sigaction sa_term;
261   static struct sigaction sa_pipe;
262   static struct sigaction sa_usr1;
263   static struct sigaction sa_usr2;
264
265   /* Install signal handlers */
266   memset (&sa_int, 0, sizeof (sa_int));
267   sa_int.sa_handler = sig_int_handler;
268   sigaction (SIGINT, &sa_int, NULL);
269
270   memset (&sa_term, 0, sizeof (sa_term));
271   sa_term.sa_handler = sig_term_handler;
272   sigaction (SIGTERM, &sa_term, NULL);
273
274   memset (&sa_pipe, 0, sizeof (sa_pipe));
275   sa_pipe.sa_handler = SIG_IGN;
276   sigaction (SIGPIPE, &sa_pipe, NULL);
277
278   memset (&sa_pipe, 0, sizeof (sa_usr1));
279   sa_usr1.sa_handler = sig_usr1_handler;
280   sigaction (SIGUSR1, &sa_usr1, NULL);
281
282   memset (&sa_usr2, 0, sizeof (sa_usr2));
283   sa_usr2.sa_handler = sig_usr2_handler;
284   sigaction (SIGUSR2, &sa_usr2, NULL);
285
286 } /* }}} void install_signal_handlers */
287
288 static int open_pidfile(void) /* {{{ */
289 {
290   int fd;
291   char *file;
292
293   file = (config_pid_file != NULL)
294     ? config_pid_file
295     : LOCALSTATEDIR "/run/rrdcached.pid";
296
297   fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
298   if (fd < 0)
299     fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
300             file, rrd_strerror(errno));
301
302   return(fd);
303 } /* }}} static int open_pidfile */
304
305 static int write_pidfile (int fd) /* {{{ */
306 {
307   pid_t pid;
308   FILE *fh;
309
310   pid = getpid ();
311
312   fh = fdopen (fd, "w");
313   if (fh == NULL)
314   {
315     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
316     close(fd);
317     return (-1);
318   }
319
320   fprintf (fh, "%i\n", (int) pid);
321   fclose (fh);
322
323   return (0);
324 } /* }}} int write_pidfile */
325
326 static int remove_pidfile (void) /* {{{ */
327 {
328   char *file;
329   int status;
330
331   file = (config_pid_file != NULL)
332     ? config_pid_file
333     : LOCALSTATEDIR "/run/rrdcached.pid";
334
335   status = unlink (file);
336   if (status == 0)
337     return (0);
338   return (errno);
339 } /* }}} int remove_pidfile */
340
341 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
342 {
343   char *eol;
344
345   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
346                sock->next_read - sock->next_cmd);
347
348   if (eol == NULL)
349   {
350     /* no commands left, move remainder back to front of rbuf */
351     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
352             sock->next_read - sock->next_cmd);
353     sock->next_read -= sock->next_cmd;
354     sock->next_cmd = 0;
355     *len = 0;
356     return NULL;
357   }
358   else
359   {
360     char *cmd = sock->rbuf + sock->next_cmd;
361     *eol = '\0';
362
363     sock->next_cmd = eol - sock->rbuf + 1;
364
365     if (eol > sock->rbuf && *(eol-1) == '\r')
366       *(--eol) = '\0'; /* handle "\r\n" EOL */
367
368     *len = eol - cmd;
369
370     return cmd;
371   }
372
373   /* NOTREACHED */
374   assert(1==0);
375 }
376
377 /* add the characters directly to the write buffer */
378 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
379 {
380   char *new_buf;
381
382   assert(sock != NULL);
383
384   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
385   if (new_buf == NULL)
386   {
387     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
388     return -1;
389   }
390
391   strncpy(new_buf + sock->wbuf_len, str, len + 1);
392
393   sock->wbuf = new_buf;
394   sock->wbuf_len += len;
395
396   return 0;
397 } /* }}} static int add_to_wbuf */
398
399 /* add the text to the "extra" info that's sent after the status line */
400 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
401 {
402   va_list argp;
403   char buffer[CMD_MAX];
404   int len;
405
406   if (sock == NULL) return 0; /* journal replay mode */
407   if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
408
409   va_start(argp, fmt);
410 #ifdef HAVE_VSNPRINTF
411   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
412 #else
413   len = vsprintf(buffer, fmt, argp);
414 #endif
415   va_end(argp);
416   if (len < 0)
417   {
418     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
419     return -1;
420   }
421
422   return add_to_wbuf(sock, buffer, len);
423 } /* }}} static int add_response_info */
424
425 static int count_lines(char *str) /* {{{ */
426 {
427   int lines = 0;
428
429   if (str != NULL)
430   {
431     while ((str = strchr(str, '\n')) != NULL)
432     {
433       ++lines;
434       ++str;
435     }
436   }
437
438   return lines;
439 } /* }}} static int count_lines */
440
441 /* send the response back to the user.
442  * returns 0 on success, -1 on error
443  * write buffer is always zeroed after this call */
444 static int send_response (listen_socket_t *sock, response_code rc,
445                           char *fmt, ...) /* {{{ */
446 {
447   va_list argp;
448   char buffer[CMD_MAX];
449   int lines;
450   ssize_t wrote;
451   int rclen, len;
452
453   if (sock == NULL) return rc;  /* journal replay mode */
454
455   if (sock->batch_mode)
456   {
457     if (rc == RESP_OK)
458       return rc; /* no response on success during BATCH */
459     lines = sock->batch_cmd;
460   }
461   else if (rc == RESP_OK)
462     lines = count_lines(sock->wbuf);
463   else
464     lines = -1;
465
466   rclen = sprintf(buffer, "%d ", lines);
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
470 #else
471   len = vsprintf(buffer+rclen, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475     return -1;
476
477   len += rclen;
478
479   /* append the result to the wbuf, don't write to the user */
480   if (sock->batch_mode)
481     return add_to_wbuf(sock, buffer, len);
482
483   /* first write must be complete */
484   if (len != write(sock->fd, buffer, len))
485   {
486     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
487     return -1;
488   }
489
490   if (sock->wbuf != NULL)
491   {
492     wrote = 0;
493     while (wrote < sock->wbuf_len)
494     {
495       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
496       if (wb <= 0)
497       {
498         RRDD_LOG(LOG_INFO, "send_response: could not write results");
499         return -1;
500       }
501       wrote += wb;
502     }
503   }
504
505   free(sock->wbuf); sock->wbuf = NULL;
506   sock->wbuf_len = 0;
507
508   return 0;
509 } /* }}} */
510
511 static void wipe_ci_values(cache_item_t *ci, time_t when)
512 {
513   ci->values = NULL;
514   ci->values_num = 0;
515
516   ci->last_flush_time = when;
517   if (config_write_jitter > 0)
518     ci->last_flush_time += (random() % config_write_jitter);
519 }
520
521 /* remove_from_queue
522  * remove a "cache_item_t" item from the queue.
523  * must hold 'cache_lock' when calling this
524  */
525 static void remove_from_queue(cache_item_t *ci) /* {{{ */
526 {
527   if (ci == NULL) return;
528
529   if (ci->prev == NULL)
530     cache_queue_head = ci->next; /* reset head */
531   else
532     ci->prev->next = ci->next;
533
534   if (ci->next == NULL)
535     cache_queue_tail = ci->prev; /* reset the tail */
536   else
537     ci->next->prev = ci->prev;
538
539   ci->next = ci->prev = NULL;
540   ci->flags &= ~CI_FLAGS_IN_QUEUE;
541 } /* }}} static void remove_from_queue */
542
543 /* remove an entry from the tree and free all its resources.
544  * must hold 'cache lock' while calling this.
545  * returns 0 on success, otherwise errno */
546 static int forget_file(const char *file)
547 {
548   cache_item_t *ci;
549
550   ci = g_tree_lookup(cache_tree, file);
551   if (ci == NULL)
552     return ENOENT;
553
554   g_tree_remove (cache_tree, file);
555   remove_from_queue(ci);
556
557   for (int i=0; i < ci->values_num; i++)
558     free(ci->values[i]);
559
560   free (ci->values);
561   free (ci->file);
562
563   /* in case anyone is waiting */
564   pthread_cond_broadcast(&ci->flushed);
565
566   free (ci);
567
568   return 0;
569 } /* }}} static int forget_file */
570
571 /*
572  * enqueue_cache_item:
573  * `cache_lock' must be acquired before calling this function!
574  */
575 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
576     queue_side_t side)
577 {
578   if (ci == NULL)
579     return (-1);
580
581   if (ci->values_num == 0)
582     return (0);
583
584   if (side == HEAD)
585   {
586     if (cache_queue_head == ci)
587       return 0;
588
589     /* remove from the double linked list */
590     if (ci->flags & CI_FLAGS_IN_QUEUE)
591       remove_from_queue(ci);
592
593     ci->prev = NULL;
594     ci->next = cache_queue_head;
595     if (ci->next != NULL)
596       ci->next->prev = ci;
597     cache_queue_head = ci;
598
599     if (cache_queue_tail == NULL)
600       cache_queue_tail = cache_queue_head;
601   }
602   else /* (side == TAIL) */
603   {
604     /* We don't move values back in the list.. */
605     if (ci->flags & CI_FLAGS_IN_QUEUE)
606       return (0);
607
608     assert (ci->next == NULL);
609     assert (ci->prev == NULL);
610
611     ci->prev = cache_queue_tail;
612
613     if (cache_queue_tail == NULL)
614       cache_queue_head = ci;
615     else
616       cache_queue_tail->next = ci;
617
618     cache_queue_tail = ci;
619   }
620
621   ci->flags |= CI_FLAGS_IN_QUEUE;
622
623   pthread_cond_broadcast(&cache_cond);
624   pthread_mutex_lock (&stats_lock);
625   stats_queue_length++;
626   pthread_mutex_unlock (&stats_lock);
627
628   return (0);
629 } /* }}} int enqueue_cache_item */
630
631 /*
632  * tree_callback_flush:
633  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
634  * while this is in progress.
635  */
636 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
637     gpointer data)
638 {
639   cache_item_t *ci;
640   callback_flush_data_t *cfd;
641
642   ci = (cache_item_t *) value;
643   cfd = (callback_flush_data_t *) data;
644
645   if ((ci->last_flush_time <= cfd->abs_timeout)
646       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
647       && (ci->values_num > 0))
648   {
649     enqueue_cache_item (ci, TAIL);
650   }
651   else if ((do_shutdown != 0)
652       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
653       && (ci->values_num > 0))
654   {
655     enqueue_cache_item (ci, TAIL);
656   }
657   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
658       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
659       && (ci->values_num <= 0))
660   {
661     char **temp;
662
663     temp = (char **) realloc (cfd->keys,
664         sizeof (char *) * (cfd->keys_num + 1));
665     if (temp == NULL)
666     {
667       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
668       return (FALSE);
669     }
670     cfd->keys = temp;
671     /* Make really sure this points to the _same_ place */
672     assert ((char *) key == ci->file);
673     cfd->keys[cfd->keys_num] = (char *) key;
674     cfd->keys_num++;
675   }
676
677   return (FALSE);
678 } /* }}} gboolean tree_callback_flush */
679
680 static int flush_old_values (int max_age)
681 {
682   callback_flush_data_t cfd;
683   size_t k;
684
685   memset (&cfd, 0, sizeof (cfd));
686   /* Pass the current time as user data so that we don't need to call
687    * `time' for each node. */
688   cfd.now = time (NULL);
689   cfd.keys = NULL;
690   cfd.keys_num = 0;
691
692   if (max_age > 0)
693     cfd.abs_timeout = cfd.now - max_age;
694   else
695     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
696
697   /* `tree_callback_flush' will return the keys of all values that haven't
698    * been touched in the last `config_flush_interval' seconds in `cfd'.
699    * The char*'s in this array point to the same memory as ci->file, so we
700    * don't need to free them separately. */
701   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
702
703   for (k = 0; k < cfd.keys_num; k++)
704   {
705     /* should never fail, since we have held the cache_lock
706      * the entire time */
707     assert( forget_file(cfd.keys[k]) == 0 );
708   }
709
710   if (cfd.keys != NULL)
711   {
712     free (cfd.keys);
713     cfd.keys = NULL;
714   }
715
716   return (0);
717 } /* int flush_old_values */
718
719 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
720 {
721   struct timeval now;
722   struct timespec next_flush;
723   int final_flush = 0; /* make sure we only flush once on shutdown */
724
725   gettimeofday (&now, NULL);
726   next_flush.tv_sec = now.tv_sec + config_flush_interval;
727   next_flush.tv_nsec = 1000 * now.tv_usec;
728
729   pthread_mutex_lock (&cache_lock);
730   while ((do_shutdown == 0) || (cache_queue_head != NULL))
731   {
732     cache_item_t *ci;
733     char *file;
734     char **values;
735     int values_num;
736     int status;
737     int i;
738
739     /* First, check if it's time to do the cache flush. */
740     gettimeofday (&now, NULL);
741     if ((now.tv_sec > next_flush.tv_sec)
742         || ((now.tv_sec == next_flush.tv_sec)
743           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
744     {
745       /* Flush all values that haven't been written in the last
746        * `config_write_interval' seconds. */
747       flush_old_values (config_write_interval);
748
749       /* Determine the time of the next cache flush. */
750       while (next_flush.tv_sec <= now.tv_sec)
751         next_flush.tv_sec += config_flush_interval;
752
753       /* unlock the cache while we rotate so we don't block incoming
754        * updates if the fsync() blocks on disk I/O */
755       pthread_mutex_unlock(&cache_lock);
756       journal_rotate();
757       pthread_mutex_lock(&cache_lock);
758     }
759
760     /* Now, check if there's something to store away. If not, wait until
761      * something comes in or it's time to do the cache flush.  if we are
762      * shutting down, do not wait around.  */
763     if (cache_queue_head == NULL && !do_shutdown)
764     {
765       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
766       if ((status != 0) && (status != ETIMEDOUT))
767       {
768         RRDD_LOG (LOG_ERR, "queue_thread_main: "
769             "pthread_cond_timedwait returned %i.", status);
770       }
771     }
772
773     /* We're about to shut down */
774     if (do_shutdown != 0 && !final_flush++)
775     {
776       if (config_flush_at_shutdown)
777         flush_old_values (-1); /* flush everything */
778       else
779         break;
780     }
781
782     /* Check if a value has arrived. This may be NULL if we timed out or there
783      * was an interrupt such as a signal. */
784     if (cache_queue_head == NULL)
785       continue;
786
787     ci = cache_queue_head;
788
789     /* copy the relevant parts */
790     file = strdup (ci->file);
791     if (file == NULL)
792     {
793       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
794       continue;
795     }
796
797     assert(ci->values != NULL);
798     assert(ci->values_num > 0);
799
800     values = ci->values;
801     values_num = ci->values_num;
802
803     wipe_ci_values(ci, time(NULL));
804     remove_from_queue(ci);
805
806     pthread_mutex_lock (&stats_lock);
807     assert (stats_queue_length > 0);
808     stats_queue_length--;
809     pthread_mutex_unlock (&stats_lock);
810
811     pthread_mutex_unlock (&cache_lock);
812
813     rrd_clear_error ();
814     status = rrd_update_r (file, NULL, values_num, (void *) values);
815     if (status != 0)
816     {
817       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
818           "rrd_update_r (%s) failed with status %i. (%s)",
819           file, status, rrd_get_error());
820     }
821
822     journal_write("wrote", file);
823     pthread_cond_broadcast(&ci->flushed);
824
825     for (i = 0; i < values_num; i++)
826       free (values[i]);
827
828     free(values);
829     free(file);
830
831     if (status == 0)
832     {
833       pthread_mutex_lock (&stats_lock);
834       stats_updates_written++;
835       stats_data_sets_written += values_num;
836       pthread_mutex_unlock (&stats_lock);
837     }
838
839     pthread_mutex_lock (&cache_lock);
840
841     /* We're about to shut down */
842     if (do_shutdown != 0 && !final_flush++)
843     {
844       if (config_flush_at_shutdown)
845           flush_old_values (-1); /* flush everything */
846       else
847         break;
848     }
849   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
850   pthread_mutex_unlock (&cache_lock);
851
852   if (config_flush_at_shutdown)
853   {
854     assert(cache_queue_head == NULL);
855     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
856   }
857
858   journal_done();
859
860   return (NULL);
861 } /* }}} void *queue_thread_main */
862
863 static int buffer_get_field (char **buffer_ret, /* {{{ */
864     size_t *buffer_size_ret, char **field_ret)
865 {
866   char *buffer;
867   size_t buffer_pos;
868   size_t buffer_size;
869   char *field;
870   size_t field_size;
871   int status;
872
873   buffer = *buffer_ret;
874   buffer_pos = 0;
875   buffer_size = *buffer_size_ret;
876   field = *buffer_ret;
877   field_size = 0;
878
879   if (buffer_size <= 0)
880     return (-1);
881
882   /* This is ensured by `handle_request'. */
883   assert (buffer[buffer_size - 1] == '\0');
884
885   status = -1;
886   while (buffer_pos < buffer_size)
887   {
888     /* Check for end-of-field or end-of-buffer */
889     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
890     {
891       field[field_size] = 0;
892       field_size++;
893       buffer_pos++;
894       status = 0;
895       break;
896     }
897     /* Handle escaped characters. */
898     else if (buffer[buffer_pos] == '\\')
899     {
900       if (buffer_pos >= (buffer_size - 1))
901         break;
902       buffer_pos++;
903       field[field_size] = buffer[buffer_pos];
904       field_size++;
905       buffer_pos++;
906     }
907     /* Normal operation */ 
908     else
909     {
910       field[field_size] = buffer[buffer_pos];
911       field_size++;
912       buffer_pos++;
913     }
914   } /* while (buffer_pos < buffer_size) */
915
916   if (status != 0)
917     return (status);
918
919   *buffer_ret = buffer + buffer_pos;
920   *buffer_size_ret = buffer_size - buffer_pos;
921   *field_ret = field;
922
923   return (0);
924 } /* }}} int buffer_get_field */
925
926 /* if we're restricting writes to the base directory,
927  * check whether the file falls within the dir
928  * returns 1 if OK, otherwise 0
929  */
930 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
931 {
932   assert(file != NULL);
933
934   if (!config_write_base_only
935       || sock == NULL /* journal replay */
936       || config_base_dir == NULL)
937     return 1;
938
939   if (strstr(file, "../") != NULL) goto err;
940
941   /* relative paths without "../" are ok */
942   if (*file != '/') return 1;
943
944   /* file must be of the format base + "/" + <1+ char filename> */
945   if (strlen(file) < _config_base_dir_len + 2) goto err;
946   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
947   if (*(file + _config_base_dir_len) != '/') goto err;
948
949   return 1;
950
951 err:
952   if (sock != NULL && sock->fd >= 0)
953     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
954
955   return 0;
956 } /* }}} static int check_file_access */
957
958 /* returns 1 if we have the required privilege level,
959  * otherwise issue an error to the user on sock */
960 static int has_privilege (listen_socket_t *sock, /* {{{ */
961                           socket_privilege priv)
962 {
963   if (sock == NULL) /* journal replay */
964     return 1;
965
966   if (sock->privilege >= priv)
967     return 1;
968
969   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
970 } /* }}} static int has_privilege */
971
972 static int flush_file (const char *filename) /* {{{ */
973 {
974   cache_item_t *ci;
975
976   pthread_mutex_lock (&cache_lock);
977
978   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
979   if (ci == NULL)
980   {
981     pthread_mutex_unlock (&cache_lock);
982     return (ENOENT);
983   }
984
985   if (ci->values_num > 0)
986   {
987     /* Enqueue at head */
988     enqueue_cache_item (ci, HEAD);
989     pthread_cond_wait(&ci->flushed, &cache_lock);
990   }
991
992   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
993    * may have been purged during our cond_wait() */
994
995   pthread_mutex_unlock(&cache_lock);
996
997   return (0);
998 } /* }}} int flush_file */
999
1000 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1001     char *buffer, size_t buffer_size)
1002 {
1003   int status;
1004   char **help_text;
1005   char *command;
1006
1007   char *help_help[2] =
1008   {
1009     "Command overview\n"
1010     ,
1011     "HELP [<command>]\n"
1012     "FLUSH <filename>\n"
1013     "FLUSHALL\n"
1014     "PENDING <filename>\n"
1015     "FORGET <filename>\n"
1016     "UPDATE <filename> <values> [<values> ...]\n"
1017     "BATCH\n"
1018     "STATS\n"
1019   };
1020
1021   char *help_flush[2] =
1022   {
1023     "Help for FLUSH\n"
1024     ,
1025     "Usage: FLUSH <filename>\n"
1026     "\n"
1027     "Adds the given filename to the head of the update queue and returns\n"
1028     "after is has been dequeued.\n"
1029   };
1030
1031   char *help_flushall[2] =
1032   {
1033     "Help for FLUSHALL\n"
1034     ,
1035     "Usage: FLUSHALL\n"
1036     "\n"
1037     "Triggers writing of all pending updates.  Returns immediately.\n"
1038   };
1039
1040   char *help_pending[2] =
1041   {
1042     "Help for PENDING\n"
1043     ,
1044     "Usage: PENDING <filename>\n"
1045     "\n"
1046     "Shows any 'pending' updates for a file, in order.\n"
1047     "The updates shown have not yet been written to the underlying RRD file.\n"
1048   };
1049
1050   char *help_forget[2] =
1051   {
1052     "Help for FORGET\n"
1053     ,
1054     "Usage: FORGET <filename>\n"
1055     "\n"
1056     "Removes the file completely from the cache.\n"
1057     "Any pending updates for the file will be lost.\n"
1058   };
1059
1060   char *help_update[2] =
1061   {
1062     "Help for UPDATE\n"
1063     ,
1064     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1065     "\n"
1066     "Adds the given file to the internal cache if it is not yet known and\n"
1067     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1068     "for details.\n"
1069     "\n"
1070     "Each <values> has the following form:\n"
1071     "  <values> = <time>:<value>[:<value>[...]]\n"
1072     "See the rrdupdate(1) manpage for details.\n"
1073   };
1074
1075   char *help_stats[2] =
1076   {
1077     "Help for STATS\n"
1078     ,
1079     "Usage: STATS\n"
1080     "\n"
1081     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1082     "a description of the values.\n"
1083   };
1084
1085   char *help_batch[2] =
1086   {
1087     "Help for BATCH\n"
1088     ,
1089     "The 'BATCH' command permits the client to initiate a bulk load\n"
1090     "   of commands to rrdcached.\n"
1091     "\n"
1092     "Usage:\n"
1093     "\n"
1094     "    client: BATCH\n"
1095     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1096     "    client: command #1\n"
1097     "    client: command #2\n"
1098     "    client: ... and so on\n"
1099     "    client: .\n"
1100     "    server: 2 errors\n"
1101     "    server: 7 message for command #7\n"
1102     "    server: 9 message for command #9\n"
1103     "\n"
1104     "For more information, consult the rrdcached(1) documentation.\n"
1105   };
1106
1107   status = buffer_get_field (&buffer, &buffer_size, &command);
1108   if (status != 0)
1109     help_text = help_help;
1110   else
1111   {
1112     if (strcasecmp (command, "update") == 0)
1113       help_text = help_update;
1114     else if (strcasecmp (command, "flush") == 0)
1115       help_text = help_flush;
1116     else if (strcasecmp (command, "flushall") == 0)
1117       help_text = help_flushall;
1118     else if (strcasecmp (command, "pending") == 0)
1119       help_text = help_pending;
1120     else if (strcasecmp (command, "forget") == 0)
1121       help_text = help_forget;
1122     else if (strcasecmp (command, "stats") == 0)
1123       help_text = help_stats;
1124     else if (strcasecmp (command, "batch") == 0)
1125       help_text = help_batch;
1126     else
1127       help_text = help_help;
1128   }
1129
1130   add_response_info(sock, help_text[1]);
1131   return send_response(sock, RESP_OK, help_text[0]);
1132 } /* }}} int handle_request_help */
1133
1134 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1135 {
1136   uint64_t copy_queue_length;
1137   uint64_t copy_updates_received;
1138   uint64_t copy_flush_received;
1139   uint64_t copy_updates_written;
1140   uint64_t copy_data_sets_written;
1141   uint64_t copy_journal_bytes;
1142   uint64_t copy_journal_rotate;
1143
1144   uint64_t tree_nodes_number;
1145   uint64_t tree_depth;
1146
1147   pthread_mutex_lock (&stats_lock);
1148   copy_queue_length       = stats_queue_length;
1149   copy_updates_received   = stats_updates_received;
1150   copy_flush_received     = stats_flush_received;
1151   copy_updates_written    = stats_updates_written;
1152   copy_data_sets_written  = stats_data_sets_written;
1153   copy_journal_bytes      = stats_journal_bytes;
1154   copy_journal_rotate     = stats_journal_rotate;
1155   pthread_mutex_unlock (&stats_lock);
1156
1157   pthread_mutex_lock (&cache_lock);
1158   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1159   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1160   pthread_mutex_unlock (&cache_lock);
1161
1162   add_response_info(sock,
1163                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1164   add_response_info(sock,
1165                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1166   add_response_info(sock,
1167                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1168   add_response_info(sock,
1169                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1170   add_response_info(sock,
1171                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1172   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1173   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1174   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1175   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1176
1177   send_response(sock, RESP_OK, "Statistics follow\n");
1178
1179   return (0);
1180 } /* }}} int handle_request_stats */
1181
1182 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1183     char *buffer, size_t buffer_size)
1184 {
1185   char *file;
1186   int status;
1187
1188   status = buffer_get_field (&buffer, &buffer_size, &file);
1189   if (status != 0)
1190   {
1191     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1192   }
1193   else
1194   {
1195     pthread_mutex_lock(&stats_lock);
1196     stats_flush_received++;
1197     pthread_mutex_unlock(&stats_lock);
1198
1199     if (!check_file_access(file, sock)) return 0;
1200
1201     status = flush_file (file);
1202     if (status == 0)
1203       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1204     else if (status == ENOENT)
1205     {
1206       /* no file in our tree; see whether it exists at all */
1207       struct stat statbuf;
1208
1209       memset(&statbuf, 0, sizeof(statbuf));
1210       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1211         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1212       else
1213         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1214     }
1215     else if (status < 0)
1216       return send_response(sock, RESP_ERR, "Internal error.\n");
1217     else
1218       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1219   }
1220
1221   /* NOTREACHED */
1222   assert(1==0);
1223 } /* }}} int handle_request_slurp */
1224
1225 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1226 {
1227   int status;
1228
1229   status = has_privilege(sock, PRIV_HIGH);
1230   if (status <= 0)
1231     return status;
1232
1233   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1234
1235   pthread_mutex_lock(&cache_lock);
1236   flush_old_values(-1);
1237   pthread_mutex_unlock(&cache_lock);
1238
1239   return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1241
1242 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1243                                   char *buffer, size_t buffer_size)
1244 {
1245   int status;
1246   char *file;
1247   cache_item_t *ci;
1248
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return send_response(sock, RESP_ERR,
1252                          "Usage: PENDING <filename>\n");
1253
1254   status = has_privilege(sock, PRIV_HIGH);
1255   if (status <= 0)
1256     return status;
1257
1258   pthread_mutex_lock(&cache_lock);
1259   ci = g_tree_lookup(cache_tree, file);
1260   if (ci == NULL)
1261   {
1262     pthread_mutex_unlock(&cache_lock);
1263     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1264   }
1265
1266   for (int i=0; i < ci->values_num; i++)
1267     add_response_info(sock, "%s\n", ci->values[i]);
1268
1269   pthread_mutex_unlock(&cache_lock);
1270   return send_response(sock, RESP_OK, "updates pending\n");
1271 } /* }}} static int handle_request_pending */
1272
1273 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1274                                  char *buffer, size_t buffer_size)
1275 {
1276   int status;
1277   char *file;
1278
1279   status = buffer_get_field(&buffer, &buffer_size, &file);
1280   if (status != 0)
1281     return send_response(sock, RESP_ERR,
1282                          "Usage: FORGET <filename>\n");
1283
1284   status = has_privilege(sock, PRIV_HIGH);
1285   if (status <= 0)
1286     return status;
1287
1288   if (!check_file_access(file, sock)) return 0;
1289
1290   pthread_mutex_lock(&cache_lock);
1291   status = forget_file(file);
1292   pthread_mutex_unlock(&cache_lock);
1293
1294   if (status == 0)
1295   {
1296     if (sock != NULL)
1297       journal_write("forget", file);
1298
1299     return send_response(sock, RESP_OK, "Gone!\n");
1300   }
1301   else
1302     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1303                          status < 0 ? "Internal error" : rrd_strerror(status));
1304
1305   /* NOTREACHED */
1306   assert(1==0);
1307 } /* }}} static int handle_request_forget */
1308
1309 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1310     char *buffer, size_t buffer_size)
1311 {
1312   char *file;
1313   int values_num = 0;
1314   int status;
1315   char orig_buf[CMD_MAX];
1316
1317   time_t now;
1318   cache_item_t *ci;
1319
1320   now = time (NULL);
1321
1322   status = has_privilege(sock, PRIV_HIGH);
1323   if (status <= 0)
1324     return status;
1325
1326   /* save it for the journal later */
1327   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1328
1329   status = buffer_get_field (&buffer, &buffer_size, &file);
1330   if (status != 0)
1331     return send_response(sock, RESP_ERR,
1332                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1333
1334   pthread_mutex_lock(&stats_lock);
1335   stats_updates_received++;
1336   pthread_mutex_unlock(&stats_lock);
1337
1338   if (!check_file_access(file, sock)) return 0;
1339
1340   pthread_mutex_lock (&cache_lock);
1341   ci = g_tree_lookup (cache_tree, file);
1342
1343   if (ci == NULL) /* {{{ */
1344   {
1345     struct stat statbuf;
1346
1347     /* don't hold the lock while we setup; stat(2) might block */
1348     pthread_mutex_unlock(&cache_lock);
1349
1350     memset (&statbuf, 0, sizeof (statbuf));
1351     status = stat (file, &statbuf);
1352     if (status != 0)
1353     {
1354       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1355
1356       status = errno;
1357       if (status == ENOENT)
1358         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1359       else
1360         return send_response(sock, RESP_ERR,
1361                              "stat failed with error %i.\n", status);
1362     }
1363     if (!S_ISREG (statbuf.st_mode))
1364       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1365
1366     if (access(file, R_OK|W_OK) != 0)
1367       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1368                            file, rrd_strerror(errno));
1369
1370     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1371     if (ci == NULL)
1372     {
1373       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1374
1375       return send_response(sock, RESP_ERR, "malloc failed.\n");
1376     }
1377     memset (ci, 0, sizeof (cache_item_t));
1378
1379     ci->file = strdup (file);
1380     if (ci->file == NULL)
1381     {
1382       free (ci);
1383       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1384
1385       return send_response(sock, RESP_ERR, "strdup failed.\n");
1386     }
1387
1388     wipe_ci_values(ci, now);
1389     ci->flags = CI_FLAGS_IN_TREE;
1390
1391     pthread_mutex_lock(&cache_lock);
1392     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1393   } /* }}} */
1394   assert (ci != NULL);
1395
1396   /* don't re-write updates in replay mode */
1397   if (sock != NULL)
1398     journal_write("update", orig_buf);
1399
1400   while (buffer_size > 0)
1401   {
1402     char **temp;
1403     char *value;
1404
1405     status = buffer_get_field (&buffer, &buffer_size, &value);
1406     if (status != 0)
1407     {
1408       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1409       break;
1410     }
1411
1412     temp = (char **) realloc (ci->values,
1413         sizeof (char *) * (ci->values_num + 1));
1414     if (temp == NULL)
1415     {
1416       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1417       continue;
1418     }
1419     ci->values = temp;
1420
1421     ci->values[ci->values_num] = strdup (value);
1422     if (ci->values[ci->values_num] == NULL)
1423     {
1424       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1425       continue;
1426     }
1427     ci->values_num++;
1428
1429     values_num++;
1430   }
1431
1432   if (((now - ci->last_flush_time) >= config_write_interval)
1433       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1434       && (ci->values_num > 0))
1435   {
1436     enqueue_cache_item (ci, TAIL);
1437   }
1438
1439   pthread_mutex_unlock (&cache_lock);
1440
1441   if (values_num < 1)
1442     return send_response(sock, RESP_ERR, "No values updated.\n");
1443   else
1444     return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num);
1445
1446   /* NOTREACHED */
1447   assert(1==0);
1448
1449 } /* }}} int handle_request_update */
1450
1451 /* we came across a "WROTE" entry during journal replay.
1452  * throw away any values that we have accumulated for this file
1453  */
1454 static int handle_request_wrote (const char *buffer) /* {{{ */
1455 {
1456   int i;
1457   cache_item_t *ci;
1458   const char *file = buffer;
1459
1460   pthread_mutex_lock(&cache_lock);
1461
1462   ci = g_tree_lookup(cache_tree, file);
1463   if (ci == NULL)
1464   {
1465     pthread_mutex_unlock(&cache_lock);
1466     return (0);
1467   }
1468
1469   if (ci->values)
1470   {
1471     for (i=0; i < ci->values_num; i++)
1472       free(ci->values[i]);
1473
1474     free(ci->values);
1475   }
1476
1477   wipe_ci_values(ci, time(NULL));
1478   remove_from_queue(ci);
1479
1480   pthread_mutex_unlock(&cache_lock);
1481   return (0);
1482 } /* }}} int handle_request_wrote */
1483
1484 /* start "BATCH" processing */
1485 static int batch_start (listen_socket_t *sock) /* {{{ */
1486 {
1487   int status;
1488   if (sock->batch_mode)
1489     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1490
1491   status = send_response(sock, RESP_OK,
1492                          "Go ahead.  End with dot '.' on its own line.\n");
1493   sock->batch_mode = 1;
1494   sock->batch_cmd = 0;
1495
1496   return status;
1497 } /* }}} static int batch_start */
1498
1499 /* finish "BATCH" processing and return results to the client */
1500 static int batch_done (listen_socket_t *sock) /* {{{ */
1501 {
1502   assert(sock->batch_mode);
1503   sock->batch_mode = 0;
1504   sock->batch_cmd  = 0;
1505   return send_response(sock, RESP_OK, "errors\n");
1506 } /* }}} static int batch_done */
1507
1508 /* if sock==NULL, we are in journal replay mode */
1509 static int handle_request (listen_socket_t *sock, /* {{{ */
1510                            char *buffer, size_t buffer_size)
1511 {
1512   char *buffer_ptr;
1513   char *command;
1514   int status;
1515
1516   assert (buffer[buffer_size - 1] == '\0');
1517
1518   buffer_ptr = buffer;
1519   command = NULL;
1520   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1521   if (status != 0)
1522   {
1523     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1524     return (-1);
1525   }
1526
1527   if (sock != NULL && sock->batch_mode)
1528     sock->batch_cmd++;
1529
1530   if (strcasecmp (command, "update") == 0)
1531     return (handle_request_update (sock, buffer_ptr, buffer_size));
1532   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1533   {
1534     /* this is only valid in replay mode */
1535     return (handle_request_wrote (buffer_ptr));
1536   }
1537   else if (strcasecmp (command, "flush") == 0)
1538     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1539   else if (strcasecmp (command, "flushall") == 0)
1540     return (handle_request_flushall(sock));
1541   else if (strcasecmp (command, "pending") == 0)
1542     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1543   else if (strcasecmp (command, "forget") == 0)
1544     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1545   else if (strcasecmp (command, "stats") == 0)
1546     return (handle_request_stats (sock));
1547   else if (strcasecmp (command, "help") == 0)
1548     return (handle_request_help (sock, buffer_ptr, buffer_size));
1549   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1550     return batch_start(sock);
1551   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
1552     return batch_done(sock);
1553   else
1554     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1555
1556   /* NOTREACHED */
1557   assert(1==0);
1558 } /* }}} int handle_request */
1559
1560 /* MUST NOT hold journal_lock before calling this */
1561 static void journal_rotate(void) /* {{{ */
1562 {
1563   FILE *old_fh = NULL;
1564   int new_fd;
1565
1566   if (journal_cur == NULL || journal_old == NULL)
1567     return;
1568
1569   pthread_mutex_lock(&journal_lock);
1570
1571   /* we rotate this way (rename before close) so that the we can release
1572    * the journal lock as fast as possible.  Journal writes to the new
1573    * journal can proceed immediately after the new file is opened.  The
1574    * fclose can then block without affecting new updates.
1575    */
1576   if (journal_fh != NULL)
1577   {
1578     old_fh = journal_fh;
1579     journal_fh = NULL;
1580     rename(journal_cur, journal_old);
1581     ++stats_journal_rotate;
1582   }
1583
1584   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1585                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1586   if (new_fd >= 0)
1587   {
1588     journal_fh = fdopen(new_fd, "a");
1589     if (journal_fh == NULL)
1590       close(new_fd);
1591   }
1592
1593   pthread_mutex_unlock(&journal_lock);
1594
1595   if (old_fh != NULL)
1596     fclose(old_fh);
1597
1598   if (journal_fh == NULL)
1599   {
1600     RRDD_LOG(LOG_CRIT,
1601              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1602              journal_cur, rrd_strerror(errno));
1603
1604     RRDD_LOG(LOG_ERR,
1605              "JOURNALING DISABLED: All values will be flushed at shutdown");
1606     config_flush_at_shutdown = 1;
1607   }
1608
1609 } /* }}} static void journal_rotate */
1610
1611 static void journal_done(void) /* {{{ */
1612 {
1613   if (journal_cur == NULL)
1614     return;
1615
1616   pthread_mutex_lock(&journal_lock);
1617   if (journal_fh != NULL)
1618   {
1619     fclose(journal_fh);
1620     journal_fh = NULL;
1621   }
1622
1623   if (config_flush_at_shutdown)
1624   {
1625     RRDD_LOG(LOG_INFO, "removing journals");
1626     unlink(journal_old);
1627     unlink(journal_cur);
1628   }
1629   else
1630   {
1631     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1632              "journals will be used at next startup");
1633   }
1634
1635   pthread_mutex_unlock(&journal_lock);
1636
1637 } /* }}} static void journal_done */
1638
1639 static int journal_write(char *cmd, char *args) /* {{{ */
1640 {
1641   int chars;
1642
1643   if (journal_fh == NULL)
1644     return 0;
1645
1646   pthread_mutex_lock(&journal_lock);
1647   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1648   pthread_mutex_unlock(&journal_lock);
1649
1650   if (chars > 0)
1651   {
1652     pthread_mutex_lock(&stats_lock);
1653     stats_journal_bytes += chars;
1654     pthread_mutex_unlock(&stats_lock);
1655   }
1656
1657   return chars;
1658 } /* }}} static int journal_write */
1659
1660 static int journal_replay (const char *file) /* {{{ */
1661 {
1662   FILE *fh;
1663   int entry_cnt = 0;
1664   int fail_cnt = 0;
1665   uint64_t line = 0;
1666   char entry[CMD_MAX];
1667
1668   if (file == NULL) return 0;
1669
1670   {
1671     char *reason;
1672     int status = 0;
1673     struct stat statbuf;
1674
1675     memset(&statbuf, 0, sizeof(statbuf));
1676     if (stat(file, &statbuf) != 0)
1677     {
1678       if (errno == ENOENT)
1679         return 0;
1680
1681       reason = "stat error";
1682       status = errno;
1683     }
1684     else if (!S_ISREG(statbuf.st_mode))
1685     {
1686       reason = "not a regular file";
1687       status = EPERM;
1688     }
1689     if (statbuf.st_uid != daemon_uid)
1690     {
1691       reason = "not owned by daemon user";
1692       status = EACCES;
1693     }
1694     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1695     {
1696       reason = "must not be user/group writable";
1697       status = EACCES;
1698     }
1699
1700     if (status != 0)
1701     {
1702       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1703                file, rrd_strerror(status), reason);
1704       return 0;
1705     }
1706   }
1707
1708   fh = fopen(file, "r");
1709   if (fh == NULL)
1710   {
1711     if (errno != ENOENT)
1712       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1713                file, rrd_strerror(errno));
1714     return 0;
1715   }
1716   else
1717     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1718
1719   while(!feof(fh))
1720   {
1721     size_t entry_len;
1722
1723     ++line;
1724     if (fgets(entry, sizeof(entry), fh) == NULL)
1725       break;
1726     entry_len = strlen(entry);
1727
1728     /* check \n termination in case journal writing crashed mid-line */
1729     if (entry_len == 0)
1730       continue;
1731     else if (entry[entry_len - 1] != '\n')
1732     {
1733       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1734       ++fail_cnt;
1735       continue;
1736     }
1737
1738     entry[entry_len - 1] = '\0';
1739
1740     if (handle_request(NULL, entry, entry_len) == 0)
1741       ++entry_cnt;
1742     else
1743       ++fail_cnt;
1744   }
1745
1746   fclose(fh);
1747
1748   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1749            entry_cnt, fail_cnt);
1750
1751   return entry_cnt > 0 ? 1 : 0;
1752 } /* }}} static int journal_replay */
1753
1754 static void journal_init(void) /* {{{ */
1755 {
1756   int had_journal = 0;
1757
1758   if (journal_cur == NULL) return;
1759
1760   pthread_mutex_lock(&journal_lock);
1761
1762   RRDD_LOG(LOG_INFO, "checking for journal files");
1763
1764   had_journal += journal_replay(journal_old);
1765   had_journal += journal_replay(journal_cur);
1766
1767   /* it must have been a crash.  start a flush */
1768   if (had_journal && config_flush_at_shutdown)
1769     flush_old_values(-1);
1770
1771   pthread_mutex_unlock(&journal_lock);
1772   journal_rotate();
1773
1774   RRDD_LOG(LOG_INFO, "journal processing complete");
1775
1776 } /* }}} static void journal_init */
1777
1778 static void close_connection(listen_socket_t *sock)
1779 {
1780   close(sock->fd) ;  sock->fd   = -1;
1781   free(sock->rbuf);  sock->rbuf = NULL;
1782   free(sock->wbuf);  sock->wbuf = NULL;
1783
1784   free(sock);
1785 }
1786
1787 static void *connection_thread_main (void *args) /* {{{ */
1788 {
1789   pthread_t self;
1790   listen_socket_t *sock;
1791   int i;
1792   int fd;
1793
1794   sock = (listen_socket_t *) args;
1795   fd = sock->fd;
1796
1797   /* init read buffers */
1798   sock->next_read = sock->next_cmd = 0;
1799   sock->rbuf = malloc(RBUF_SIZE);
1800   if (sock->rbuf == NULL)
1801   {
1802     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1803     close_connection(sock);
1804     return NULL;
1805   }
1806
1807   pthread_mutex_lock (&connection_threads_lock);
1808   {
1809     pthread_t *temp;
1810
1811     temp = (pthread_t *) realloc (connection_threads,
1812         sizeof (pthread_t) * (connection_threads_num + 1));
1813     if (temp == NULL)
1814     {
1815       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1816     }
1817     else
1818     {
1819       connection_threads = temp;
1820       connection_threads[connection_threads_num] = pthread_self ();
1821       connection_threads_num++;
1822     }
1823   }
1824   pthread_mutex_unlock (&connection_threads_lock);
1825
1826   while (do_shutdown == 0)
1827   {
1828     char *cmd;
1829     ssize_t cmd_len;
1830     ssize_t rbytes;
1831
1832     struct pollfd pollfd;
1833     int status;
1834
1835     pollfd.fd = fd;
1836     pollfd.events = POLLIN | POLLPRI;
1837     pollfd.revents = 0;
1838
1839     status = poll (&pollfd, 1, /* timeout = */ 500);
1840     if (do_shutdown)
1841       break;
1842     else if (status == 0) /* timeout */
1843       continue;
1844     else if (status < 0) /* error */
1845     {
1846       status = errno;
1847       if (status != EINTR)
1848         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1849       continue;
1850     }
1851
1852     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1853       break;
1854     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1855     {
1856       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1857           "poll(2) returned something unexpected: %#04hx",
1858           pollfd.revents);
1859       break;
1860     }
1861
1862     rbytes = read(fd, sock->rbuf + sock->next_read,
1863                   RBUF_SIZE - sock->next_read);
1864     if (rbytes < 0)
1865     {
1866       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1867       break;
1868     }
1869     else if (rbytes == 0)
1870       break; /* eof */
1871
1872     sock->next_read += rbytes;
1873
1874     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1875     {
1876       status = handle_request (sock, cmd, cmd_len+1);
1877       if (status != 0)
1878         goto out_close;
1879     }
1880   }
1881
1882 out_close:
1883   close_connection(sock);
1884
1885   self = pthread_self ();
1886   /* Remove this thread from the connection threads list */
1887   pthread_mutex_lock (&connection_threads_lock);
1888   /* Find out own index in the array */
1889   for (i = 0; i < connection_threads_num; i++)
1890     if (pthread_equal (connection_threads[i], self) != 0)
1891       break;
1892   assert (i < connection_threads_num);
1893
1894   /* Move the trailing threads forward. */
1895   if (i < (connection_threads_num - 1))
1896   {
1897     memmove (connection_threads + i,
1898         connection_threads + i + 1,
1899         sizeof (pthread_t) * (connection_threads_num - i - 1));
1900   }
1901
1902   connection_threads_num--;
1903   pthread_mutex_unlock (&connection_threads_lock);
1904
1905   return (NULL);
1906 } /* }}} void *connection_thread_main */
1907
1908 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1909 {
1910   int fd;
1911   struct sockaddr_un sa;
1912   listen_socket_t *temp;
1913   int status;
1914   const char *path;
1915
1916   path = sock->addr;
1917   if (strncmp(path, "unix:", strlen("unix:")) == 0)
1918     path += strlen("unix:");
1919
1920   temp = (listen_socket_t *) realloc (listen_fds,
1921       sizeof (listen_fds[0]) * (listen_fds_num + 1));
1922   if (temp == NULL)
1923   {
1924     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1925     return (-1);
1926   }
1927   listen_fds = temp;
1928   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1929
1930   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
1931   if (fd < 0)
1932   {
1933     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
1934     return (-1);
1935   }
1936
1937   memset (&sa, 0, sizeof (sa));
1938   sa.sun_family = AF_UNIX;
1939   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
1940
1941   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
1942   if (status != 0)
1943   {
1944     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
1945     close (fd);
1946     unlink (path);
1947     return (-1);
1948   }
1949
1950   status = listen (fd, /* backlog = */ 10);
1951   if (status != 0)
1952   {
1953     RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
1954     close (fd);
1955     unlink (path);
1956     return (-1);
1957   }
1958
1959   listen_fds[listen_fds_num].fd = fd;
1960   listen_fds[listen_fds_num].family = PF_UNIX;
1961   strncpy(listen_fds[listen_fds_num].addr, path,
1962           sizeof (listen_fds[listen_fds_num].addr) - 1);
1963   listen_fds_num++;
1964
1965   return (0);
1966 } /* }}} int open_listen_socket_unix */
1967
1968 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
1969 {
1970   struct addrinfo ai_hints;
1971   struct addrinfo *ai_res;
1972   struct addrinfo *ai_ptr;
1973   char addr_copy[NI_MAXHOST];
1974   char *addr;
1975   char *port;
1976   int status;
1977
1978   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
1979   addr_copy[sizeof (addr_copy) - 1] = 0;
1980   addr = addr_copy;
1981
1982   memset (&ai_hints, 0, sizeof (ai_hints));
1983   ai_hints.ai_flags = 0;
1984 #ifdef AI_ADDRCONFIG
1985   ai_hints.ai_flags |= AI_ADDRCONFIG;
1986 #endif
1987   ai_hints.ai_family = AF_UNSPEC;
1988   ai_hints.ai_socktype = SOCK_STREAM;
1989
1990   port = NULL;
1991   if (*addr == '[') /* IPv6+port format */
1992   {
1993     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
1994     addr++;
1995
1996     port = strchr (addr, ']');
1997     if (port == NULL)
1998     {
1999       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2000           sock->addr);
2001       return (-1);
2002     }
2003     *port = 0;
2004     port++;
2005
2006     if (*port == ':')
2007       port++;
2008     else if (*port == 0)
2009       port = NULL;
2010     else
2011     {
2012       RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2013           port);
2014       return (-1);
2015     }
2016   } /* if (*addr = ']') */
2017   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2018   {
2019     port = rindex(addr, ':');
2020     if (port != NULL)
2021     {
2022       *port = 0;
2023       port++;
2024     }
2025   }
2026   ai_res = NULL;
2027   status = getaddrinfo (addr,
2028                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2029                         &ai_hints, &ai_res);
2030   if (status != 0)
2031   {
2032     RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2033         "%s", addr, gai_strerror (status));
2034     return (-1);
2035   }
2036
2037   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2038   {
2039     int fd;
2040     listen_socket_t *temp;
2041     int one = 1;
2042
2043     temp = (listen_socket_t *) realloc (listen_fds,
2044         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2045     if (temp == NULL)
2046     {
2047       RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2048       continue;
2049     }
2050     listen_fds = temp;
2051     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2052
2053     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2054     if (fd < 0)
2055     {
2056       RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2057       continue;
2058     }
2059
2060     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2061
2062     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2063     if (status != 0)
2064     {
2065       RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2066       close (fd);
2067       continue;
2068     }
2069
2070     status = listen (fd, /* backlog = */ 10);
2071     if (status != 0)
2072     {
2073       RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2074       close (fd);
2075       return (-1);
2076     }
2077
2078     listen_fds[listen_fds_num].fd = fd;
2079     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2080     listen_fds_num++;
2081   } /* for (ai_ptr) */
2082
2083   return (0);
2084 } /* }}} static int open_listen_socket_network */
2085
2086 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2087 {
2088   assert(sock != NULL);
2089   assert(sock->addr != NULL);
2090
2091   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2092       || sock->addr[0] == '/')
2093     return (open_listen_socket_unix(sock));
2094   else
2095     return (open_listen_socket_network(sock));
2096 } /* }}} int open_listen_socket */
2097
2098 static int close_listen_sockets (void) /* {{{ */
2099 {
2100   size_t i;
2101
2102   for (i = 0; i < listen_fds_num; i++)
2103   {
2104     close (listen_fds[i].fd);
2105
2106     if (listen_fds[i].family == PF_UNIX)
2107       unlink(listen_fds[i].addr);
2108   }
2109
2110   free (listen_fds);
2111   listen_fds = NULL;
2112   listen_fds_num = 0;
2113
2114   return (0);
2115 } /* }}} int close_listen_sockets */
2116
2117 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2118 {
2119   struct pollfd *pollfds;
2120   int pollfds_num;
2121   int status;
2122   int i;
2123
2124   for (i = 0; i < config_listen_address_list_len; i++)
2125     open_listen_socket (config_listen_address_list[i]);
2126
2127   if (config_listen_address_list_len < 1)
2128   {
2129     listen_socket_t sock;
2130     memset(&sock, 0, sizeof(sock));
2131     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2132     open_listen_socket (&sock);
2133   }
2134
2135   if (listen_fds_num < 1)
2136   {
2137     RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2138         "could be opened. Sorry.");
2139     return (NULL);
2140   }
2141
2142   pollfds_num = listen_fds_num;
2143   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2144   if (pollfds == NULL)
2145   {
2146     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2147     return (NULL);
2148   }
2149   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2150
2151   RRDD_LOG(LOG_INFO, "listening for connections");
2152
2153   while (do_shutdown == 0)
2154   {
2155     assert (pollfds_num == ((int) listen_fds_num));
2156     for (i = 0; i < pollfds_num; i++)
2157     {
2158       pollfds[i].fd = listen_fds[i].fd;
2159       pollfds[i].events = POLLIN | POLLPRI;
2160       pollfds[i].revents = 0;
2161     }
2162
2163     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2164     if (do_shutdown)
2165       break;
2166     else if (status == 0) /* timeout */
2167       continue;
2168     else if (status < 0) /* error */
2169     {
2170       status = errno;
2171       if (status != EINTR)
2172       {
2173         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2174       }
2175       continue;
2176     }
2177
2178     for (i = 0; i < pollfds_num; i++)
2179     {
2180       listen_socket_t *client_sock;
2181       struct sockaddr_storage client_sa;
2182       socklen_t client_sa_size;
2183       pthread_t tid;
2184       pthread_attr_t attr;
2185
2186       if (pollfds[i].revents == 0)
2187         continue;
2188
2189       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2190       {
2191         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2192             "poll(2) returned something unexpected for listen FD #%i.",
2193             pollfds[i].fd);
2194         continue;
2195       }
2196
2197       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2198       if (client_sock == NULL)
2199       {
2200         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2201         continue;
2202       }
2203       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2204
2205       client_sa_size = sizeof (client_sa);
2206       client_sock->fd = accept (pollfds[i].fd,
2207           (struct sockaddr *) &client_sa, &client_sa_size);
2208       if (client_sock->fd < 0)
2209       {
2210         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2211         free(client_sock);
2212         continue;
2213       }
2214
2215       pthread_attr_init (&attr);
2216       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2217
2218       status = pthread_create (&tid, &attr, connection_thread_main,
2219                                client_sock);
2220       if (status != 0)
2221       {
2222         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2223         close_connection(client_sock);
2224         continue;
2225       }
2226     } /* for (pollfds_num) */
2227   } /* while (do_shutdown == 0) */
2228
2229   RRDD_LOG(LOG_INFO, "starting shutdown");
2230
2231   close_listen_sockets ();
2232
2233   pthread_mutex_lock (&connection_threads_lock);
2234   while (connection_threads_num > 0)
2235   {
2236     pthread_t wait_for;
2237
2238     wait_for = connection_threads[0];
2239
2240     pthread_mutex_unlock (&connection_threads_lock);
2241     pthread_join (wait_for, /* retval = */ NULL);
2242     pthread_mutex_lock (&connection_threads_lock);
2243   }
2244   pthread_mutex_unlock (&connection_threads_lock);
2245
2246   return (NULL);
2247 } /* }}} void *listen_thread_main */
2248
2249 static int daemonize (void) /* {{{ */
2250 {
2251   int status;
2252   int fd;
2253   char *base_dir;
2254
2255   daemon_uid = geteuid();
2256
2257   fd = open_pidfile();
2258   if (fd < 0) return fd;
2259
2260   if (!stay_foreground)
2261   {
2262     pid_t child;
2263
2264     child = fork ();
2265     if (child < 0)
2266     {
2267       fprintf (stderr, "daemonize: fork(2) failed.\n");
2268       return (-1);
2269     }
2270     else if (child > 0)
2271     {
2272       return (1);
2273     }
2274
2275     /* Become session leader */
2276     setsid ();
2277
2278     /* Open the first three file descriptors to /dev/null */
2279     close (2);
2280     close (1);
2281     close (0);
2282
2283     open ("/dev/null", O_RDWR);
2284     dup (0);
2285     dup (0);
2286   } /* if (!stay_foreground) */
2287
2288   /* Change into the /tmp directory. */
2289   base_dir = (config_base_dir != NULL)
2290     ? config_base_dir
2291     : "/tmp";
2292   status = chdir (base_dir);
2293   if (status != 0)
2294   {
2295     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2296     return (-1);
2297   }
2298
2299   install_signal_handlers();
2300
2301   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2302   RRDD_LOG(LOG_INFO, "starting up");
2303
2304   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2305   if (cache_tree == NULL)
2306   {
2307     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2308     return (-1);
2309   }
2310
2311   status = write_pidfile (fd);
2312   return status;
2313 } /* }}} int daemonize */
2314
2315 static int cleanup (void) /* {{{ */
2316 {
2317   do_shutdown++;
2318
2319   pthread_cond_signal (&cache_cond);
2320   pthread_join (queue_thread, /* return = */ NULL);
2321
2322   remove_pidfile ();
2323
2324   RRDD_LOG(LOG_INFO, "goodbye");
2325   closelog ();
2326
2327   return (0);
2328 } /* }}} int cleanup */
2329
2330 static int read_options (int argc, char **argv) /* {{{ */
2331 {
2332   int option;
2333   int status = 0;
2334
2335   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2336   {
2337     switch (option)
2338     {
2339       case 'g':
2340         stay_foreground=1;
2341         break;
2342
2343       case 'L':
2344       case 'l':
2345       {
2346         listen_socket_t **temp;
2347         listen_socket_t *new;
2348
2349         new = malloc(sizeof(listen_socket_t));
2350         if (new == NULL)
2351         {
2352           fprintf(stderr, "read_options: malloc failed.\n");
2353           return(2);
2354         }
2355         memset(new, 0, sizeof(listen_socket_t));
2356
2357         temp = (listen_socket_t **) realloc (config_listen_address_list,
2358             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2359         if (temp == NULL)
2360         {
2361           fprintf (stderr, "read_options: realloc failed.\n");
2362           return (2);
2363         }
2364         config_listen_address_list = temp;
2365
2366         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2367         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2368
2369         temp[config_listen_address_list_len] = new;
2370         config_listen_address_list_len++;
2371       }
2372       break;
2373
2374       case 'f':
2375       {
2376         int temp;
2377
2378         temp = atoi (optarg);
2379         if (temp > 0)
2380           config_flush_interval = temp;
2381         else
2382         {
2383           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2384           status = 3;
2385         }
2386       }
2387       break;
2388
2389       case 'w':
2390       {
2391         int temp;
2392
2393         temp = atoi (optarg);
2394         if (temp > 0)
2395           config_write_interval = temp;
2396         else
2397         {
2398           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2399           status = 2;
2400         }
2401       }
2402       break;
2403
2404       case 'z':
2405       {
2406         int temp;
2407
2408         temp = atoi(optarg);
2409         if (temp > 0)
2410           config_write_jitter = temp;
2411         else
2412         {
2413           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2414           status = 2;
2415         }
2416
2417         break;
2418       }
2419
2420       case 'B':
2421         config_write_base_only = 1;
2422         break;
2423
2424       case 'b':
2425       {
2426         size_t len;
2427
2428         if (config_base_dir != NULL)
2429           free (config_base_dir);
2430         config_base_dir = strdup (optarg);
2431         if (config_base_dir == NULL)
2432         {
2433           fprintf (stderr, "read_options: strdup failed.\n");
2434           return (3);
2435         }
2436
2437         len = strlen (config_base_dir);
2438         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2439         {
2440           config_base_dir[len - 1] = 0;
2441           len--;
2442         }
2443
2444         if (len < 1)
2445         {
2446           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2447           return (4);
2448         }
2449
2450         _config_base_dir_len = len;
2451       }
2452       break;
2453
2454       case 'p':
2455       {
2456         if (config_pid_file != NULL)
2457           free (config_pid_file);
2458         config_pid_file = strdup (optarg);
2459         if (config_pid_file == NULL)
2460         {
2461           fprintf (stderr, "read_options: strdup failed.\n");
2462           return (3);
2463         }
2464       }
2465       break;
2466
2467       case 'F':
2468         config_flush_at_shutdown = 1;
2469         break;
2470
2471       case 'j':
2472       {
2473         struct stat statbuf;
2474         const char *dir = optarg;
2475
2476         status = stat(dir, &statbuf);
2477         if (status != 0)
2478         {
2479           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2480           return 6;
2481         }
2482
2483         if (!S_ISDIR(statbuf.st_mode)
2484             || access(dir, R_OK|W_OK|X_OK) != 0)
2485         {
2486           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2487                   errno ? rrd_strerror(errno) : "");
2488           return 6;
2489         }
2490
2491         journal_cur = malloc(PATH_MAX + 1);
2492         journal_old = malloc(PATH_MAX + 1);
2493         if (journal_cur == NULL || journal_old == NULL)
2494         {
2495           fprintf(stderr, "malloc failure for journal files\n");
2496           return 6;
2497         }
2498         else 
2499         {
2500           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2501           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2502         }
2503       }
2504       break;
2505
2506       case 'h':
2507       case '?':
2508         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2509             "\n"
2510             "Usage: rrdcached [options]\n"
2511             "\n"
2512             "Valid options are:\n"
2513             "  -l <address>  Socket address to listen to.\n"
2514             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2515             "  -w <seconds>  Interval in which to write data.\n"
2516             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2517             "  -f <seconds>  Interval in which to flush dead data.\n"
2518             "  -p <file>     Location of the PID-file.\n"
2519             "  -b <dir>      Base directory to change to.\n"
2520             "  -B            Restrict file access to paths within -b <dir>\n"
2521             "  -g            Do not fork and run in the foreground.\n"
2522             "  -j <dir>      Directory in which to create the journal files.\n"
2523             "  -F            Always flush all updates at shutdown\n"
2524             "\n"
2525             "For more information and a detailed description of all options "
2526             "please refer\n"
2527             "to the rrdcached(1) manual page.\n",
2528             VERSION);
2529         status = -1;
2530         break;
2531     } /* switch (option) */
2532   } /* while (getopt) */
2533
2534   /* advise the user when values are not sane */
2535   if (config_flush_interval < 2 * config_write_interval)
2536     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2537             " 2x write interval (-w) !\n");
2538   if (config_write_jitter > config_write_interval)
2539     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2540             " write interval (-w) !\n");
2541
2542   if (config_write_base_only && config_base_dir == NULL)
2543     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2544             "  Consult the rrdcached documentation\n");
2545
2546   if (journal_cur == NULL)
2547     config_flush_at_shutdown = 1;
2548
2549   return (status);
2550 } /* }}} int read_options */
2551
2552 int main (int argc, char **argv)
2553 {
2554   int status;
2555
2556   status = read_options (argc, argv);
2557   if (status != 0)
2558   {
2559     if (status < 0)
2560       status = 0;
2561     return (status);
2562   }
2563
2564   status = daemonize ();
2565   if (status == 1)
2566   {
2567     struct sigaction sigchld;
2568
2569     memset (&sigchld, 0, sizeof (sigchld));
2570     sigchld.sa_handler = SIG_IGN;
2571     sigaction (SIGCHLD, &sigchld, NULL);
2572
2573     return (0);
2574   }
2575   else if (status != 0)
2576   {
2577     fprintf (stderr, "daemonize failed, exiting.\n");
2578     return (1);
2579   }
2580
2581   journal_init();
2582
2583   /* start the queue thread */
2584   memset (&queue_thread, 0, sizeof (queue_thread));
2585   status = pthread_create (&queue_thread,
2586                            NULL, /* attr */
2587                            queue_thread_main,
2588                            NULL); /* args */
2589   if (status != 0)
2590   {
2591     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2592     cleanup();
2593     return (1);
2594   }
2595
2596   listen_thread_main (NULL);
2597   cleanup ();
2598
2599   return (0);
2600 } /* int main */
2601
2602 /*
2603  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2604  */