7201c52c4e12c86429f90240a3b01272d906f501
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(char *action, int oflag) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301             action, file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
308 {
309   int pid_fd;
310   pid_t pid;
311   char pid_str[16];
312
313   pid_fd = open_pidfile("open", O_RDWR);
314   if (pid_fd < 0)
315     return pid_fd;
316
317   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318     return -1;
319
320   pid = atoi(pid_str);
321   if (pid <= 0)
322     return -1;
323
324   /* another running process that we can signal COULD be
325    * a competing rrdcached */
326   if (pid != getpid() && kill(pid, 0) == 0)
327   {
328     fprintf(stderr,
329             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330     close(pid_fd);
331     return -1;
332   }
333
334   lseek(pid_fd, 0, SEEK_SET);
335   ftruncate(pid_fd, 0);
336
337   fprintf(stderr,
338           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339           "rrdcached: starting normally.\n", pid);
340
341   return pid_fd;
342 } /* }}} static int check_pidfile */
343
344 static int write_pidfile (int fd) /* {{{ */
345 {
346   pid_t pid;
347   FILE *fh;
348
349   pid = getpid ();
350
351   fh = fdopen (fd, "w");
352   if (fh == NULL)
353   {
354     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355     close(fd);
356     return (-1);
357   }
358
359   fprintf (fh, "%i\n", (int) pid);
360   fclose (fh);
361
362   return (0);
363 } /* }}} int write_pidfile */
364
365 static int remove_pidfile (void) /* {{{ */
366 {
367   char *file;
368   int status;
369
370   file = (config_pid_file != NULL)
371     ? config_pid_file
372     : LOCALSTATEDIR "/run/rrdcached.pid";
373
374   status = unlink (file);
375   if (status == 0)
376     return (0);
377   return (errno);
378 } /* }}} int remove_pidfile */
379
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
381 {
382   char *eol;
383
384   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385                sock->next_read - sock->next_cmd);
386
387   if (eol == NULL)
388   {
389     /* no commands left, move remainder back to front of rbuf */
390     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391             sock->next_read - sock->next_cmd);
392     sock->next_read -= sock->next_cmd;
393     sock->next_cmd = 0;
394     *len = 0;
395     return NULL;
396   }
397   else
398   {
399     char *cmd = sock->rbuf + sock->next_cmd;
400     *eol = '\0';
401
402     sock->next_cmd = eol - sock->rbuf + 1;
403
404     if (eol > sock->rbuf && *(eol-1) == '\r')
405       *(--eol) = '\0'; /* handle "\r\n" EOL */
406
407     *len = eol - cmd;
408
409     return cmd;
410   }
411
412   /* NOTREACHED */
413   assert(1==0);
414 }
415
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
418 {
419   char *new_buf;
420
421   assert(sock != NULL);
422
423   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424   if (new_buf == NULL)
425   {
426     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427     return -1;
428   }
429
430   strncpy(new_buf + sock->wbuf_len, str, len + 1);
431
432   sock->wbuf = new_buf;
433   sock->wbuf_len += len;
434
435   return 0;
436 } /* }}} static int add_to_wbuf */
437
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
440 {
441   va_list argp;
442   char buffer[CMD_MAX];
443   int len;
444
445   if (sock == NULL) return 0; /* journal replay mode */
446   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
447
448   va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452   len = vsprintf(buffer, fmt, argp);
453 #endif
454   va_end(argp);
455   if (len < 0)
456   {
457     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458     return -1;
459   }
460
461   return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
463
464 static int count_lines(char *str) /* {{{ */
465 {
466   int lines = 0;
467
468   if (str != NULL)
469   {
470     while ((str = strchr(str, '\n')) != NULL)
471     {
472       ++lines;
473       ++str;
474     }
475   }
476
477   return lines;
478 } /* }}} static int count_lines */
479
480 /* send the response back to the user.
481  * returns 0 on success, -1 on error
482  * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484                           char *fmt, ...) /* {{{ */
485 {
486   va_list argp;
487   char buffer[CMD_MAX];
488   int lines;
489   ssize_t wrote;
490   int rclen, len;
491
492   if (sock == NULL) return rc;  /* journal replay mode */
493
494   if (sock->batch_start)
495   {
496     if (rc == RESP_OK)
497       return rc; /* no response on success during BATCH */
498     lines = sock->batch_cmd;
499   }
500   else if (rc == RESP_OK)
501     lines = count_lines(sock->wbuf);
502   else
503     lines = -1;
504
505   rclen = sprintf(buffer, "%d ", lines);
506   va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510   len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512   va_end(argp);
513   if (len < 0)
514     return -1;
515
516   len += rclen;
517
518   /* append the result to the wbuf, don't write to the user */
519   if (sock->batch_start)
520     return add_to_wbuf(sock, buffer, len);
521
522   /* first write must be complete */
523   if (len != write(sock->fd, buffer, len))
524   {
525     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526     return -1;
527   }
528
529   if (sock->wbuf != NULL && rc == RESP_OK)
530   {
531     wrote = 0;
532     while (wrote < sock->wbuf_len)
533     {
534       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535       if (wb <= 0)
536       {
537         RRDD_LOG(LOG_INFO, "send_response: could not write results");
538         return -1;
539       }
540       wrote += wb;
541     }
542   }
543
544   free(sock->wbuf); sock->wbuf = NULL;
545   sock->wbuf_len = 0;
546
547   return 0;
548 } /* }}} */
549
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
551 {
552   ci->values = NULL;
553   ci->values_num = 0;
554
555   ci->last_flush_time = when;
556   if (config_write_jitter > 0)
557     ci->last_flush_time += (random() % config_write_jitter);
558 }
559
560 /* remove_from_queue
561  * remove a "cache_item_t" item from the queue.
562  * must hold 'cache_lock' when calling this
563  */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
565 {
566   if (ci == NULL) return;
567   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
568
569   if (ci->prev == NULL)
570     cache_queue_head = ci->next; /* reset head */
571   else
572     ci->prev->next = ci->next;
573
574   if (ci->next == NULL)
575     cache_queue_tail = ci->prev; /* reset the tail */
576   else
577     ci->next->prev = ci->prev;
578
579   ci->next = ci->prev = NULL;
580   ci->flags &= ~CI_FLAGS_IN_QUEUE;
581 } /* }}} static void remove_from_queue */
582
583 /* remove an entry from the tree and free all its resources.
584  * must hold 'cache lock' while calling this.
585  * returns 0 on success, otherwise errno */
586 static int forget_file(const char *file)
587 {
588   cache_item_t *ci;
589
590   ci = g_tree_lookup(cache_tree, file);
591   if (ci == NULL)
592     return ENOENT;
593
594   g_tree_remove (cache_tree, file);
595   remove_from_queue(ci);
596
597   for (int i=0; i < ci->values_num; i++)
598     free(ci->values[i]);
599
600   free (ci->values);
601   free (ci->file);
602
603   /* in case anyone is waiting */
604   pthread_cond_broadcast(&ci->flushed);
605
606   free (ci);
607
608   return 0;
609 } /* }}} static int forget_file */
610
611 /*
612  * enqueue_cache_item:
613  * `cache_lock' must be acquired before calling this function!
614  */
615 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
616     queue_side_t side)
617 {
618   if (ci == NULL)
619     return (-1);
620
621   if (ci->values_num == 0)
622     return (0);
623
624   if (side == HEAD)
625   {
626     if (cache_queue_head == ci)
627       return 0;
628
629     /* remove if further down in queue */
630     remove_from_queue(ci);
631
632     ci->prev = NULL;
633     ci->next = cache_queue_head;
634     if (ci->next != NULL)
635       ci->next->prev = ci;
636     cache_queue_head = ci;
637
638     if (cache_queue_tail == NULL)
639       cache_queue_tail = cache_queue_head;
640   }
641   else /* (side == TAIL) */
642   {
643     /* We don't move values back in the list.. */
644     if (ci->flags & CI_FLAGS_IN_QUEUE)
645       return (0);
646
647     assert (ci->next == NULL);
648     assert (ci->prev == NULL);
649
650     ci->prev = cache_queue_tail;
651
652     if (cache_queue_tail == NULL)
653       cache_queue_head = ci;
654     else
655       cache_queue_tail->next = ci;
656
657     cache_queue_tail = ci;
658   }
659
660   ci->flags |= CI_FLAGS_IN_QUEUE;
661
662   pthread_cond_broadcast(&cache_cond);
663   pthread_mutex_lock (&stats_lock);
664   stats_queue_length++;
665   pthread_mutex_unlock (&stats_lock);
666
667   return (0);
668 } /* }}} int enqueue_cache_item */
669
670 /*
671  * tree_callback_flush:
672  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673  * while this is in progress.
674  */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676     gpointer data)
677 {
678   cache_item_t *ci;
679   callback_flush_data_t *cfd;
680
681   ci = (cache_item_t *) value;
682   cfd = (callback_flush_data_t *) data;
683
684   if (ci->flags & CI_FLAGS_IN_QUEUE)
685     return FALSE;
686
687   if ((ci->last_flush_time <= cfd->abs_timeout)
688       && (ci->values_num > 0))
689   {
690     enqueue_cache_item (ci, TAIL);
691   }
692   else if ((do_shutdown != 0)
693       && (ci->values_num > 0))
694   {
695     enqueue_cache_item (ci, TAIL);
696   }
697   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
698       && (ci->values_num <= 0))
699   {
700     char **temp;
701
702     temp = (char **) realloc (cfd->keys,
703         sizeof (char *) * (cfd->keys_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707       return (FALSE);
708     }
709     cfd->keys = temp;
710     /* Make really sure this points to the _same_ place */
711     assert ((char *) key == ci->file);
712     cfd->keys[cfd->keys_num] = (char *) key;
713     cfd->keys_num++;
714   }
715
716   return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
718
719 static int flush_old_values (int max_age)
720 {
721   callback_flush_data_t cfd;
722   size_t k;
723
724   memset (&cfd, 0, sizeof (cfd));
725   /* Pass the current time as user data so that we don't need to call
726    * `time' for each node. */
727   cfd.now = time (NULL);
728   cfd.keys = NULL;
729   cfd.keys_num = 0;
730
731   if (max_age > 0)
732     cfd.abs_timeout = cfd.now - max_age;
733   else
734     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
735
736   /* `tree_callback_flush' will return the keys of all values that haven't
737    * been touched in the last `config_flush_interval' seconds in `cfd'.
738    * The char*'s in this array point to the same memory as ci->file, so we
739    * don't need to free them separately. */
740   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
741
742   for (k = 0; k < cfd.keys_num; k++)
743   {
744     /* should never fail, since we have held the cache_lock
745      * the entire time */
746     assert( forget_file(cfd.keys[k]) == 0 );
747   }
748
749   if (cfd.keys != NULL)
750   {
751     free (cfd.keys);
752     cfd.keys = NULL;
753   }
754
755   return (0);
756 } /* int flush_old_values */
757
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
759 {
760   struct timeval now;
761   struct timespec next_flush;
762   int final_flush = 0; /* make sure we only flush once on shutdown */
763
764   gettimeofday (&now, NULL);
765   next_flush.tv_sec = now.tv_sec + config_flush_interval;
766   next_flush.tv_nsec = 1000 * now.tv_usec;
767
768   pthread_mutex_lock (&cache_lock);
769   while ((do_shutdown == 0) || (cache_queue_head != NULL))
770   {
771     cache_item_t *ci;
772     char *file;
773     char **values;
774     int values_num;
775     int status;
776     int i;
777
778     /* First, check if it's time to do the cache flush. */
779     gettimeofday (&now, NULL);
780     if ((now.tv_sec > next_flush.tv_sec)
781         || ((now.tv_sec == next_flush.tv_sec)
782           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783     {
784       /* Flush all values that haven't been written in the last
785        * `config_write_interval' seconds. */
786       flush_old_values (config_write_interval);
787
788       /* Determine the time of the next cache flush. */
789       next_flush.tv_sec =
790         now.tv_sec + next_flush.tv_sec % config_flush_interval;
791
792       /* unlock the cache while we rotate so we don't block incoming
793        * updates if the fsync() blocks on disk I/O */
794       pthread_mutex_unlock(&cache_lock);
795       journal_rotate();
796       pthread_mutex_lock(&cache_lock);
797     }
798
799     /* Now, check if there's something to store away. If not, wait until
800      * something comes in or it's time to do the cache flush.  if we are
801      * shutting down, do not wait around.  */
802     if (cache_queue_head == NULL && !do_shutdown)
803     {
804       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805       if ((status != 0) && (status != ETIMEDOUT))
806       {
807         RRDD_LOG (LOG_ERR, "queue_thread_main: "
808             "pthread_cond_timedwait returned %i.", status);
809       }
810     }
811
812     /* We're about to shut down */
813     if (do_shutdown != 0 && !final_flush++)
814     {
815       if (config_flush_at_shutdown)
816         flush_old_values (-1); /* flush everything */
817       else
818         break;
819     }
820
821     /* Check if a value has arrived. This may be NULL if we timed out or there
822      * was an interrupt such as a signal. */
823     if (cache_queue_head == NULL)
824       continue;
825
826     ci = cache_queue_head;
827
828     /* copy the relevant parts */
829     file = strdup (ci->file);
830     if (file == NULL)
831     {
832       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833       continue;
834     }
835
836     assert(ci->values != NULL);
837     assert(ci->values_num > 0);
838
839     values = ci->values;
840     values_num = ci->values_num;
841
842     wipe_ci_values(ci, time(NULL));
843     remove_from_queue(ci);
844
845     pthread_mutex_lock (&stats_lock);
846     assert (stats_queue_length > 0);
847     stats_queue_length--;
848     pthread_mutex_unlock (&stats_lock);
849
850     pthread_mutex_unlock (&cache_lock);
851
852     rrd_clear_error ();
853     status = rrd_update_r (file, NULL, values_num, (void *) values);
854     if (status != 0)
855     {
856       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857           "rrd_update_r (%s) failed with status %i. (%s)",
858           file, status, rrd_get_error());
859     }
860
861     journal_write("wrote", file);
862     pthread_cond_broadcast(&ci->flushed);
863
864     for (i = 0; i < values_num; i++)
865       free (values[i]);
866
867     free(values);
868     free(file);
869
870     if (status == 0)
871     {
872       pthread_mutex_lock (&stats_lock);
873       stats_updates_written++;
874       stats_data_sets_written += values_num;
875       pthread_mutex_unlock (&stats_lock);
876     }
877
878     pthread_mutex_lock (&cache_lock);
879
880     /* We're about to shut down */
881     if (do_shutdown != 0 && !final_flush++)
882     {
883       if (config_flush_at_shutdown)
884           flush_old_values (-1); /* flush everything */
885       else
886         break;
887     }
888   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889   pthread_mutex_unlock (&cache_lock);
890
891   if (config_flush_at_shutdown)
892   {
893     assert(cache_queue_head == NULL);
894     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895   }
896
897   journal_done();
898
899   return (NULL);
900 } /* }}} void *queue_thread_main */
901
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903     size_t *buffer_size_ret, char **field_ret)
904 {
905   char *buffer;
906   size_t buffer_pos;
907   size_t buffer_size;
908   char *field;
909   size_t field_size;
910   int status;
911
912   buffer = *buffer_ret;
913   buffer_pos = 0;
914   buffer_size = *buffer_size_ret;
915   field = *buffer_ret;
916   field_size = 0;
917
918   if (buffer_size <= 0)
919     return (-1);
920
921   /* This is ensured by `handle_request'. */
922   assert (buffer[buffer_size - 1] == '\0');
923
924   status = -1;
925   while (buffer_pos < buffer_size)
926   {
927     /* Check for end-of-field or end-of-buffer */
928     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929     {
930       field[field_size] = 0;
931       field_size++;
932       buffer_pos++;
933       status = 0;
934       break;
935     }
936     /* Handle escaped characters. */
937     else if (buffer[buffer_pos] == '\\')
938     {
939       if (buffer_pos >= (buffer_size - 1))
940         break;
941       buffer_pos++;
942       field[field_size] = buffer[buffer_pos];
943       field_size++;
944       buffer_pos++;
945     }
946     /* Normal operation */ 
947     else
948     {
949       field[field_size] = buffer[buffer_pos];
950       field_size++;
951       buffer_pos++;
952     }
953   } /* while (buffer_pos < buffer_size) */
954
955   if (status != 0)
956     return (status);
957
958   *buffer_ret = buffer + buffer_pos;
959   *buffer_size_ret = buffer_size - buffer_pos;
960   *field_ret = field;
961
962   return (0);
963 } /* }}} int buffer_get_field */
964
965 /* if we're restricting writes to the base directory,
966  * check whether the file falls within the dir
967  * returns 1 if OK, otherwise 0
968  */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
970 {
971   assert(file != NULL);
972
973   if (!config_write_base_only
974       || sock == NULL /* journal replay */
975       || config_base_dir == NULL)
976     return 1;
977
978   if (strstr(file, "../") != NULL) goto err;
979
980   /* relative paths without "../" are ok */
981   if (*file != '/') return 1;
982
983   /* file must be of the format base + "/" + <1+ char filename> */
984   if (strlen(file) < _config_base_dir_len + 2) goto err;
985   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986   if (*(file + _config_base_dir_len) != '/') goto err;
987
988   return 1;
989
990 err:
991   if (sock != NULL && sock->fd >= 0)
992     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
993
994   return 0;
995 } /* }}} static int check_file_access */
996
997 /* when using a base dir, convert relative paths to absolute paths.
998  * if necessary, modifies the "filename" pointer to point
999  * to the new path created in "tmp".  "tmp" is provided
1000  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001  *
1002  * this allows us to optimize for the expected case (absolute path)
1003  * with a no-op.
1004  */
1005 static void get_abs_path(char **filename, char *tmp)
1006 {
1007   assert(tmp != NULL);
1008   assert(filename != NULL && *filename != NULL);
1009
1010   if (config_base_dir == NULL || **filename == '/')
1011     return;
1012
1013   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014   *filename = tmp;
1015 } /* }}} static int get_abs_path */
1016
1017 /* returns 1 if we have the required privilege level,
1018  * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020                           socket_privilege priv)
1021 {
1022   if (sock == NULL) /* journal replay */
1023     return 1;
1024
1025   if (sock->privilege >= priv)
1026     return 1;
1027
1028   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1030
1031 static int flush_file (const char *filename) /* {{{ */
1032 {
1033   cache_item_t *ci;
1034
1035   pthread_mutex_lock (&cache_lock);
1036
1037   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038   if (ci == NULL)
1039   {
1040     pthread_mutex_unlock (&cache_lock);
1041     return (ENOENT);
1042   }
1043
1044   if (ci->values_num > 0)
1045   {
1046     /* Enqueue at head */
1047     enqueue_cache_item (ci, HEAD);
1048     pthread_cond_wait(&ci->flushed, &cache_lock);
1049   }
1050
1051   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1052    * may have been purged during our cond_wait() */
1053
1054   pthread_mutex_unlock(&cache_lock);
1055
1056   return (0);
1057 } /* }}} int flush_file */
1058
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060     char *buffer, size_t buffer_size)
1061 {
1062   int status;
1063   char **help_text;
1064   char *command;
1065
1066   char *help_help[2] =
1067   {
1068     "Command overview\n"
1069     ,
1070     "HELP [<command>]\n"
1071     "FLUSH <filename>\n"
1072     "FLUSHALL\n"
1073     "PENDING <filename>\n"
1074     "FORGET <filename>\n"
1075     "UPDATE <filename> <values> [<values> ...]\n"
1076     "BATCH\n"
1077     "STATS\n"
1078   };
1079
1080   char *help_flush[2] =
1081   {
1082     "Help for FLUSH\n"
1083     ,
1084     "Usage: FLUSH <filename>\n"
1085     "\n"
1086     "Adds the given filename to the head of the update queue and returns\n"
1087     "after is has been dequeued.\n"
1088   };
1089
1090   char *help_flushall[2] =
1091   {
1092     "Help for FLUSHALL\n"
1093     ,
1094     "Usage: FLUSHALL\n"
1095     "\n"
1096     "Triggers writing of all pending updates.  Returns immediately.\n"
1097   };
1098
1099   char *help_pending[2] =
1100   {
1101     "Help for PENDING\n"
1102     ,
1103     "Usage: PENDING <filename>\n"
1104     "\n"
1105     "Shows any 'pending' updates for a file, in order.\n"
1106     "The updates shown have not yet been written to the underlying RRD file.\n"
1107   };
1108
1109   char *help_forget[2] =
1110   {
1111     "Help for FORGET\n"
1112     ,
1113     "Usage: FORGET <filename>\n"
1114     "\n"
1115     "Removes the file completely from the cache.\n"
1116     "Any pending updates for the file will be lost.\n"
1117   };
1118
1119   char *help_update[2] =
1120   {
1121     "Help for UPDATE\n"
1122     ,
1123     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124     "\n"
1125     "Adds the given file to the internal cache if it is not yet known and\n"
1126     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127     "for details.\n"
1128     "\n"
1129     "Each <values> has the following form:\n"
1130     "  <values> = <time>:<value>[:<value>[...]]\n"
1131     "See the rrdupdate(1) manpage for details.\n"
1132   };
1133
1134   char *help_stats[2] =
1135   {
1136     "Help for STATS\n"
1137     ,
1138     "Usage: STATS\n"
1139     "\n"
1140     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141     "a description of the values.\n"
1142   };
1143
1144   char *help_batch[2] =
1145   {
1146     "Help for BATCH\n"
1147     ,
1148     "The 'BATCH' command permits the client to initiate a bulk load\n"
1149     "   of commands to rrdcached.\n"
1150     "\n"
1151     "Usage:\n"
1152     "\n"
1153     "    client: BATCH\n"
1154     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1155     "    client: command #1\n"
1156     "    client: command #2\n"
1157     "    client: ... and so on\n"
1158     "    client: .\n"
1159     "    server: 2 errors\n"
1160     "    server: 7 message for command #7\n"
1161     "    server: 9 message for command #9\n"
1162     "\n"
1163     "For more information, consult the rrdcached(1) documentation.\n"
1164   };
1165
1166   status = buffer_get_field (&buffer, &buffer_size, &command);
1167   if (status != 0)
1168     help_text = help_help;
1169   else
1170   {
1171     if (strcasecmp (command, "update") == 0)
1172       help_text = help_update;
1173     else if (strcasecmp (command, "flush") == 0)
1174       help_text = help_flush;
1175     else if (strcasecmp (command, "flushall") == 0)
1176       help_text = help_flushall;
1177     else if (strcasecmp (command, "pending") == 0)
1178       help_text = help_pending;
1179     else if (strcasecmp (command, "forget") == 0)
1180       help_text = help_forget;
1181     else if (strcasecmp (command, "stats") == 0)
1182       help_text = help_stats;
1183     else if (strcasecmp (command, "batch") == 0)
1184       help_text = help_batch;
1185     else
1186       help_text = help_help;
1187   }
1188
1189   add_response_info(sock, help_text[1]);
1190   return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1192
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1194 {
1195   uint64_t copy_queue_length;
1196   uint64_t copy_updates_received;
1197   uint64_t copy_flush_received;
1198   uint64_t copy_updates_written;
1199   uint64_t copy_data_sets_written;
1200   uint64_t copy_journal_bytes;
1201   uint64_t copy_journal_rotate;
1202
1203   uint64_t tree_nodes_number;
1204   uint64_t tree_depth;
1205
1206   pthread_mutex_lock (&stats_lock);
1207   copy_queue_length       = stats_queue_length;
1208   copy_updates_received   = stats_updates_received;
1209   copy_flush_received     = stats_flush_received;
1210   copy_updates_written    = stats_updates_written;
1211   copy_data_sets_written  = stats_data_sets_written;
1212   copy_journal_bytes      = stats_journal_bytes;
1213   copy_journal_rotate     = stats_journal_rotate;
1214   pthread_mutex_unlock (&stats_lock);
1215
1216   pthread_mutex_lock (&cache_lock);
1217   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1219   pthread_mutex_unlock (&cache_lock);
1220
1221   add_response_info(sock,
1222                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1223   add_response_info(sock,
1224                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225   add_response_info(sock,
1226                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227   add_response_info(sock,
1228                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229   add_response_info(sock,
1230                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1235
1236   send_response(sock, RESP_OK, "Statistics follow\n");
1237
1238   return (0);
1239 } /* }}} int handle_request_stats */
1240
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242     char *buffer, size_t buffer_size)
1243 {
1244   char *file, file_tmp[PATH_MAX];
1245   int status;
1246
1247   status = buffer_get_field (&buffer, &buffer_size, &file);
1248   if (status != 0)
1249   {
1250     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251   }
1252   else
1253   {
1254     pthread_mutex_lock(&stats_lock);
1255     stats_flush_received++;
1256     pthread_mutex_unlock(&stats_lock);
1257
1258     get_abs_path(&file, file_tmp);
1259     if (!check_file_access(file, sock)) return 0;
1260
1261     status = flush_file (file);
1262     if (status == 0)
1263       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264     else if (status == ENOENT)
1265     {
1266       /* no file in our tree; see whether it exists at all */
1267       struct stat statbuf;
1268
1269       memset(&statbuf, 0, sizeof(statbuf));
1270       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272       else
1273         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274     }
1275     else if (status < 0)
1276       return send_response(sock, RESP_ERR, "Internal error.\n");
1277     else
1278       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279   }
1280
1281   /* NOTREACHED */
1282   assert(1==0);
1283 } /* }}} int handle_request_flush */
1284
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1286 {
1287   int status;
1288
1289   status = has_privilege(sock, PRIV_HIGH);
1290   if (status <= 0)
1291     return status;
1292
1293   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1294
1295   pthread_mutex_lock(&cache_lock);
1296   flush_old_values(-1);
1297   pthread_mutex_unlock(&cache_lock);
1298
1299   return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1301
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303                                   char *buffer, size_t buffer_size)
1304 {
1305   int status;
1306   char *file, file_tmp[PATH_MAX];
1307   cache_item_t *ci;
1308
1309   status = buffer_get_field(&buffer, &buffer_size, &file);
1310   if (status != 0)
1311     return send_response(sock, RESP_ERR,
1312                          "Usage: PENDING <filename>\n");
1313
1314   status = has_privilege(sock, PRIV_HIGH);
1315   if (status <= 0)
1316     return status;
1317
1318   get_abs_path(&file, file_tmp);
1319
1320   pthread_mutex_lock(&cache_lock);
1321   ci = g_tree_lookup(cache_tree, file);
1322   if (ci == NULL)
1323   {
1324     pthread_mutex_unlock(&cache_lock);
1325     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326   }
1327
1328   for (int i=0; i < ci->values_num; i++)
1329     add_response_info(sock, "%s\n", ci->values[i]);
1330
1331   pthread_mutex_unlock(&cache_lock);
1332   return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1334
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336                                  char *buffer, size_t buffer_size)
1337 {
1338   int status;
1339   char *file, file_tmp[PATH_MAX];
1340
1341   status = buffer_get_field(&buffer, &buffer_size, &file);
1342   if (status != 0)
1343     return send_response(sock, RESP_ERR,
1344                          "Usage: FORGET <filename>\n");
1345
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1349
1350   get_abs_path(&file, file_tmp);
1351   if (!check_file_access(file, sock)) return 0;
1352
1353   pthread_mutex_lock(&cache_lock);
1354   status = forget_file(file);
1355   pthread_mutex_unlock(&cache_lock);
1356
1357   if (status == 0)
1358   {
1359     if (sock != NULL)
1360       journal_write("forget", file);
1361
1362     return send_response(sock, RESP_OK, "Gone!\n");
1363   }
1364   else
1365     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366                          status < 0 ? "Internal error" : rrd_strerror(status));
1367
1368   /* NOTREACHED */
1369   assert(1==0);
1370 } /* }}} static int handle_request_forget */
1371
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373                                   time_t now,
1374                                   char *buffer, size_t buffer_size)
1375 {
1376   char *file, file_tmp[PATH_MAX];
1377   int values_num = 0;
1378   int bad_timestamps = 0;
1379   int status;
1380   char orig_buf[CMD_MAX];
1381
1382   cache_item_t *ci;
1383
1384   status = has_privilege(sock, PRIV_HIGH);
1385   if (status <= 0)
1386     return status;
1387
1388   /* save it for the journal later */
1389   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1390
1391   status = buffer_get_field (&buffer, &buffer_size, &file);
1392   if (status != 0)
1393     return send_response(sock, RESP_ERR,
1394                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1395
1396   pthread_mutex_lock(&stats_lock);
1397   stats_updates_received++;
1398   pthread_mutex_unlock(&stats_lock);
1399
1400   get_abs_path(&file, file_tmp);
1401   if (!check_file_access(file, sock)) return 0;
1402
1403   pthread_mutex_lock (&cache_lock);
1404   ci = g_tree_lookup (cache_tree, file);
1405
1406   if (ci == NULL) /* {{{ */
1407   {
1408     struct stat statbuf;
1409
1410     /* don't hold the lock while we setup; stat(2) might block */
1411     pthread_mutex_unlock(&cache_lock);
1412
1413     memset (&statbuf, 0, sizeof (statbuf));
1414     status = stat (file, &statbuf);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1418
1419       status = errno;
1420       if (status == ENOENT)
1421         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422       else
1423         return send_response(sock, RESP_ERR,
1424                              "stat failed with error %i.\n", status);
1425     }
1426     if (!S_ISREG (statbuf.st_mode))
1427       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1428
1429     if (access(file, R_OK|W_OK) != 0)
1430       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431                            file, rrd_strerror(errno));
1432
1433     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434     if (ci == NULL)
1435     {
1436       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1437
1438       return send_response(sock, RESP_ERR, "malloc failed.\n");
1439     }
1440     memset (ci, 0, sizeof (cache_item_t));
1441
1442     ci->file = strdup (file);
1443     if (ci->file == NULL)
1444     {
1445       free (ci);
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1447
1448       return send_response(sock, RESP_ERR, "strdup failed.\n");
1449     }
1450
1451     wipe_ci_values(ci, now);
1452     ci->flags = CI_FLAGS_IN_TREE;
1453     pthread_cond_init(&ci->flushed, NULL);
1454
1455     pthread_mutex_lock(&cache_lock);
1456     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1457   } /* }}} */
1458   assert (ci != NULL);
1459
1460   /* don't re-write updates in replay mode */
1461   if (sock != NULL)
1462     journal_write("update", orig_buf);
1463
1464   while (buffer_size > 0)
1465   {
1466     char **temp;
1467     char *value;
1468     time_t stamp;
1469     char *eostamp;
1470
1471     status = buffer_get_field (&buffer, &buffer_size, &value);
1472     if (status != 0)
1473     {
1474       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1475       break;
1476     }
1477
1478     /* make sure update time is always moving forward */
1479     stamp = strtol(value, &eostamp, 10);
1480     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1481     {
1482       ++bad_timestamps;
1483       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1484       continue;
1485     }
1486     else if (stamp <= ci->last_update_stamp)
1487     {
1488       ++bad_timestamps;
1489       add_response_info(sock,
1490                         "illegal attempt to update using time %ld when"
1491                         " last update time is %ld (minimum one second step)\n",
1492                         stamp, ci->last_update_stamp);
1493       continue;
1494     }
1495     else
1496       ci->last_update_stamp = stamp;
1497
1498     temp = (char **) realloc (ci->values,
1499         sizeof (char *) * (ci->values_num + 1));
1500     if (temp == NULL)
1501     {
1502       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1503       continue;
1504     }
1505     ci->values = temp;
1506
1507     ci->values[ci->values_num] = strdup (value);
1508     if (ci->values[ci->values_num] == NULL)
1509     {
1510       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1511       continue;
1512     }
1513     ci->values_num++;
1514
1515     values_num++;
1516   }
1517
1518   if (((now - ci->last_flush_time) >= config_write_interval)
1519       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1520       && (ci->values_num > 0))
1521   {
1522     enqueue_cache_item (ci, TAIL);
1523   }
1524
1525   pthread_mutex_unlock (&cache_lock);
1526
1527   if (values_num < 1)
1528   {
1529     /* journal replay mode */
1530     if (sock == NULL) return RESP_ERR;
1531
1532     /* if we had only one update attempt, then return the full
1533        error message... try to get the most information out
1534        of the limited error space allowed by the protocol
1535     */
1536     if (bad_timestamps == 1)
1537       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1538     else
1539       return send_response(sock, RESP_ERR,
1540                            "No values updated (%d bad timestamps).\n",
1541                            bad_timestamps);
1542   }
1543   else
1544     return send_response(sock, RESP_OK,
1545                          "errors, enqueued %i value(s).\n", values_num);
1546
1547   /* NOTREACHED */
1548   assert(1==0);
1549
1550 } /* }}} int handle_request_update */
1551
1552 /* we came across a "WROTE" entry during journal replay.
1553  * throw away any values that we have accumulated for this file
1554  */
1555 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1556 {
1557   int i;
1558   cache_item_t *ci;
1559   const char *file = buffer;
1560
1561   pthread_mutex_lock(&cache_lock);
1562
1563   ci = g_tree_lookup(cache_tree, file);
1564   if (ci == NULL)
1565   {
1566     pthread_mutex_unlock(&cache_lock);
1567     return (0);
1568   }
1569
1570   if (ci->values)
1571   {
1572     for (i=0; i < ci->values_num; i++)
1573       free(ci->values[i]);
1574
1575     free(ci->values);
1576   }
1577
1578   wipe_ci_values(ci, now);
1579   remove_from_queue(ci);
1580
1581   pthread_mutex_unlock(&cache_lock);
1582   return (0);
1583 } /* }}} int handle_request_wrote */
1584
1585 /* start "BATCH" processing */
1586 static int batch_start (listen_socket_t *sock) /* {{{ */
1587 {
1588   int status;
1589   if (sock->batch_start)
1590     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1591
1592   status = send_response(sock, RESP_OK,
1593                          "Go ahead.  End with dot '.' on its own line.\n");
1594   sock->batch_start = time(NULL);
1595   sock->batch_cmd = 0;
1596
1597   return status;
1598 } /* }}} static int batch_start */
1599
1600 /* finish "BATCH" processing and return results to the client */
1601 static int batch_done (listen_socket_t *sock) /* {{{ */
1602 {
1603   assert(sock->batch_start);
1604   sock->batch_start = 0;
1605   sock->batch_cmd  = 0;
1606   return send_response(sock, RESP_OK, "errors\n");
1607 } /* }}} static int batch_done */
1608
1609 /* if sock==NULL, we are in journal replay mode */
1610 static int handle_request (listen_socket_t *sock, /* {{{ */
1611                            time_t now,
1612                            char *buffer, size_t buffer_size)
1613 {
1614   char *buffer_ptr;
1615   char *command;
1616   int status;
1617
1618   assert (buffer[buffer_size - 1] == '\0');
1619
1620   buffer_ptr = buffer;
1621   command = NULL;
1622   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1623   if (status != 0)
1624   {
1625     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1626     return (-1);
1627   }
1628
1629   if (sock != NULL && sock->batch_start)
1630     sock->batch_cmd++;
1631
1632   if (strcasecmp (command, "update") == 0)
1633     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1634   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1635   {
1636     /* this is only valid in replay mode */
1637     return (handle_request_wrote (buffer_ptr, now));
1638   }
1639   else if (strcasecmp (command, "flush") == 0)
1640     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1641   else if (strcasecmp (command, "flushall") == 0)
1642     return (handle_request_flushall(sock));
1643   else if (strcasecmp (command, "pending") == 0)
1644     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1645   else if (strcasecmp (command, "forget") == 0)
1646     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1647   else if (strcasecmp (command, "stats") == 0)
1648     return (handle_request_stats (sock));
1649   else if (strcasecmp (command, "help") == 0)
1650     return (handle_request_help (sock, buffer_ptr, buffer_size));
1651   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1652     return batch_start(sock);
1653   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1654     return batch_done(sock);
1655   else
1656     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1657
1658   /* NOTREACHED */
1659   assert(1==0);
1660 } /* }}} int handle_request */
1661
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1664 {
1665   FILE *old_fh = NULL;
1666   int new_fd;
1667
1668   if (journal_cur == NULL || journal_old == NULL)
1669     return;
1670
1671   pthread_mutex_lock(&journal_lock);
1672
1673   /* we rotate this way (rename before close) so that the we can release
1674    * the journal lock as fast as possible.  Journal writes to the new
1675    * journal can proceed immediately after the new file is opened.  The
1676    * fclose can then block without affecting new updates.
1677    */
1678   if (journal_fh != NULL)
1679   {
1680     old_fh = journal_fh;
1681     journal_fh = NULL;
1682     rename(journal_cur, journal_old);
1683     ++stats_journal_rotate;
1684   }
1685
1686   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688   if (new_fd >= 0)
1689   {
1690     journal_fh = fdopen(new_fd, "a");
1691     if (journal_fh == NULL)
1692       close(new_fd);
1693   }
1694
1695   pthread_mutex_unlock(&journal_lock);
1696
1697   if (old_fh != NULL)
1698     fclose(old_fh);
1699
1700   if (journal_fh == NULL)
1701   {
1702     RRDD_LOG(LOG_CRIT,
1703              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704              journal_cur, rrd_strerror(errno));
1705
1706     RRDD_LOG(LOG_ERR,
1707              "JOURNALING DISABLED: All values will be flushed at shutdown");
1708     config_flush_at_shutdown = 1;
1709   }
1710
1711 } /* }}} static void journal_rotate */
1712
1713 static void journal_done(void) /* {{{ */
1714 {
1715   if (journal_cur == NULL)
1716     return;
1717
1718   pthread_mutex_lock(&journal_lock);
1719   if (journal_fh != NULL)
1720   {
1721     fclose(journal_fh);
1722     journal_fh = NULL;
1723   }
1724
1725   if (config_flush_at_shutdown)
1726   {
1727     RRDD_LOG(LOG_INFO, "removing journals");
1728     unlink(journal_old);
1729     unlink(journal_cur);
1730   }
1731   else
1732   {
1733     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734              "journals will be used at next startup");
1735   }
1736
1737   pthread_mutex_unlock(&journal_lock);
1738
1739 } /* }}} static void journal_done */
1740
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1742 {
1743   int chars;
1744
1745   if (journal_fh == NULL)
1746     return 0;
1747
1748   pthread_mutex_lock(&journal_lock);
1749   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750   pthread_mutex_unlock(&journal_lock);
1751
1752   if (chars > 0)
1753   {
1754     pthread_mutex_lock(&stats_lock);
1755     stats_journal_bytes += chars;
1756     pthread_mutex_unlock(&stats_lock);
1757   }
1758
1759   return chars;
1760 } /* }}} static int journal_write */
1761
1762 static int journal_replay (const char *file) /* {{{ */
1763 {
1764   FILE *fh;
1765   int entry_cnt = 0;
1766   int fail_cnt = 0;
1767   uint64_t line = 0;
1768   char entry[CMD_MAX];
1769   time_t now;
1770
1771   if (file == NULL) return 0;
1772
1773   {
1774     char *reason;
1775     int status = 0;
1776     struct stat statbuf;
1777
1778     memset(&statbuf, 0, sizeof(statbuf));
1779     if (stat(file, &statbuf) != 0)
1780     {
1781       if (errno == ENOENT)
1782         return 0;
1783
1784       reason = "stat error";
1785       status = errno;
1786     }
1787     else if (!S_ISREG(statbuf.st_mode))
1788     {
1789       reason = "not a regular file";
1790       status = EPERM;
1791     }
1792     if (statbuf.st_uid != daemon_uid)
1793     {
1794       reason = "not owned by daemon user";
1795       status = EACCES;
1796     }
1797     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798     {
1799       reason = "must not be user/group writable";
1800       status = EACCES;
1801     }
1802
1803     if (status != 0)
1804     {
1805       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806                file, rrd_strerror(status), reason);
1807       return 0;
1808     }
1809   }
1810
1811   fh = fopen(file, "r");
1812   if (fh == NULL)
1813   {
1814     if (errno != ENOENT)
1815       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816                file, rrd_strerror(errno));
1817     return 0;
1818   }
1819   else
1820     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1821
1822   now = time(NULL);
1823
1824   while(!feof(fh))
1825   {
1826     size_t entry_len;
1827
1828     ++line;
1829     if (fgets(entry, sizeof(entry), fh) == NULL)
1830       break;
1831     entry_len = strlen(entry);
1832
1833     /* check \n termination in case journal writing crashed mid-line */
1834     if (entry_len == 0)
1835       continue;
1836     else if (entry[entry_len - 1] != '\n')
1837     {
1838       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839       ++fail_cnt;
1840       continue;
1841     }
1842
1843     entry[entry_len - 1] = '\0';
1844
1845     if (handle_request(NULL, now, entry, entry_len) == 0)
1846       ++entry_cnt;
1847     else
1848       ++fail_cnt;
1849   }
1850
1851   fclose(fh);
1852
1853   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854            entry_cnt, fail_cnt);
1855
1856   return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1858
1859 static void journal_init(void) /* {{{ */
1860 {
1861   int had_journal = 0;
1862
1863   if (journal_cur == NULL) return;
1864
1865   pthread_mutex_lock(&journal_lock);
1866
1867   RRDD_LOG(LOG_INFO, "checking for journal files");
1868
1869   had_journal += journal_replay(journal_old);
1870   had_journal += journal_replay(journal_cur);
1871
1872   /* it must have been a crash.  start a flush */
1873   if (had_journal && config_flush_at_shutdown)
1874     flush_old_values(-1);
1875
1876   pthread_mutex_unlock(&journal_lock);
1877   journal_rotate();
1878
1879   RRDD_LOG(LOG_INFO, "journal processing complete");
1880
1881 } /* }}} static void journal_init */
1882
1883 static void close_connection(listen_socket_t *sock)
1884 {
1885   close(sock->fd) ;  sock->fd   = -1;
1886   free(sock->rbuf);  sock->rbuf = NULL;
1887   free(sock->wbuf);  sock->wbuf = NULL;
1888
1889   free(sock);
1890 }
1891
1892 static void *connection_thread_main (void *args) /* {{{ */
1893 {
1894   pthread_t self;
1895   listen_socket_t *sock;
1896   int i;
1897   int fd;
1898
1899   sock = (listen_socket_t *) args;
1900   fd = sock->fd;
1901
1902   /* init read buffers */
1903   sock->next_read = sock->next_cmd = 0;
1904   sock->rbuf = malloc(RBUF_SIZE);
1905   if (sock->rbuf == NULL)
1906   {
1907     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1908     close_connection(sock);
1909     return NULL;
1910   }
1911
1912   pthread_mutex_lock (&connection_threads_lock);
1913   {
1914     pthread_t *temp;
1915
1916     temp = (pthread_t *) realloc (connection_threads,
1917         sizeof (pthread_t) * (connection_threads_num + 1));
1918     if (temp == NULL)
1919     {
1920       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1921     }
1922     else
1923     {
1924       connection_threads = temp;
1925       connection_threads[connection_threads_num] = pthread_self ();
1926       connection_threads_num++;
1927     }
1928   }
1929   pthread_mutex_unlock (&connection_threads_lock);
1930
1931   while (do_shutdown == 0)
1932   {
1933     char *cmd;
1934     ssize_t cmd_len;
1935     ssize_t rbytes;
1936     time_t now;
1937
1938     struct pollfd pollfd;
1939     int status;
1940
1941     pollfd.fd = fd;
1942     pollfd.events = POLLIN | POLLPRI;
1943     pollfd.revents = 0;
1944
1945     status = poll (&pollfd, 1, /* timeout = */ 500);
1946     if (do_shutdown)
1947       break;
1948     else if (status == 0) /* timeout */
1949       continue;
1950     else if (status < 0) /* error */
1951     {
1952       status = errno;
1953       if (status != EINTR)
1954         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1955       continue;
1956     }
1957
1958     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1959       break;
1960     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1961     {
1962       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1963           "poll(2) returned something unexpected: %#04hx",
1964           pollfd.revents);
1965       break;
1966     }
1967
1968     rbytes = read(fd, sock->rbuf + sock->next_read,
1969                   RBUF_SIZE - sock->next_read);
1970     if (rbytes < 0)
1971     {
1972       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1973       break;
1974     }
1975     else if (rbytes == 0)
1976       break; /* eof */
1977
1978     sock->next_read += rbytes;
1979
1980     if (sock->batch_start)
1981       now = sock->batch_start;
1982     else
1983       now = time(NULL);
1984
1985     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1986     {
1987       status = handle_request (sock, now, cmd, cmd_len+1);
1988       if (status != 0)
1989         goto out_close;
1990     }
1991   }
1992
1993 out_close:
1994   close_connection(sock);
1995
1996   self = pthread_self ();
1997   /* Remove this thread from the connection threads list */
1998   pthread_mutex_lock (&connection_threads_lock);
1999   /* Find out own index in the array */
2000   for (i = 0; i < connection_threads_num; i++)
2001     if (pthread_equal (connection_threads[i], self) != 0)
2002       break;
2003   assert (i < connection_threads_num);
2004
2005   /* Move the trailing threads forward. */
2006   if (i < (connection_threads_num - 1))
2007   {
2008     memmove (connection_threads + i,
2009         connection_threads + i + 1,
2010         sizeof (pthread_t) * (connection_threads_num - i - 1));
2011   }
2012
2013   connection_threads_num--;
2014   pthread_mutex_unlock (&connection_threads_lock);
2015
2016   return (NULL);
2017 } /* }}} void *connection_thread_main */
2018
2019 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2020 {
2021   int fd;
2022   struct sockaddr_un sa;
2023   listen_socket_t *temp;
2024   int status;
2025   const char *path;
2026
2027   path = sock->addr;
2028   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2029     path += strlen("unix:");
2030
2031   temp = (listen_socket_t *) realloc (listen_fds,
2032       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2033   if (temp == NULL)
2034   {
2035     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2036     return (-1);
2037   }
2038   listen_fds = temp;
2039   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2040
2041   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2042   if (fd < 0)
2043   {
2044     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2045              rrd_strerror(errno));
2046     return (-1);
2047   }
2048
2049   memset (&sa, 0, sizeof (sa));
2050   sa.sun_family = AF_UNIX;
2051   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2052
2053   /* if we've gotten this far, we own the pid file.  any daemon started
2054    * with the same args must not be alive.  therefore, ensure that we can
2055    * create the socket...
2056    */
2057   unlink(path);
2058
2059   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2060   if (status != 0)
2061   {
2062     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2063              path, rrd_strerror(errno));
2064     close (fd);
2065     return (-1);
2066   }
2067
2068   status = listen (fd, /* backlog = */ 10);
2069   if (status != 0)
2070   {
2071     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2072              path, rrd_strerror(errno));
2073     close (fd);
2074     unlink (path);
2075     return (-1);
2076   }
2077
2078   listen_fds[listen_fds_num].fd = fd;
2079   listen_fds[listen_fds_num].family = PF_UNIX;
2080   strncpy(listen_fds[listen_fds_num].addr, path,
2081           sizeof (listen_fds[listen_fds_num].addr) - 1);
2082   listen_fds_num++;
2083
2084   return (0);
2085 } /* }}} int open_listen_socket_unix */
2086
2087 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2088 {
2089   struct addrinfo ai_hints;
2090   struct addrinfo *ai_res;
2091   struct addrinfo *ai_ptr;
2092   char addr_copy[NI_MAXHOST];
2093   char *addr;
2094   char *port;
2095   int status;
2096
2097   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2098   addr_copy[sizeof (addr_copy) - 1] = 0;
2099   addr = addr_copy;
2100
2101   memset (&ai_hints, 0, sizeof (ai_hints));
2102   ai_hints.ai_flags = 0;
2103 #ifdef AI_ADDRCONFIG
2104   ai_hints.ai_flags |= AI_ADDRCONFIG;
2105 #endif
2106   ai_hints.ai_family = AF_UNSPEC;
2107   ai_hints.ai_socktype = SOCK_STREAM;
2108
2109   port = NULL;
2110   if (*addr == '[') /* IPv6+port format */
2111   {
2112     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2113     addr++;
2114
2115     port = strchr (addr, ']');
2116     if (port == NULL)
2117     {
2118       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2119       return (-1);
2120     }
2121     *port = 0;
2122     port++;
2123
2124     if (*port == ':')
2125       port++;
2126     else if (*port == 0)
2127       port = NULL;
2128     else
2129     {
2130       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2131       return (-1);
2132     }
2133   } /* if (*addr = ']') */
2134   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2135   {
2136     port = rindex(addr, ':');
2137     if (port != NULL)
2138     {
2139       *port = 0;
2140       port++;
2141     }
2142   }
2143   ai_res = NULL;
2144   status = getaddrinfo (addr,
2145                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2146                         &ai_hints, &ai_res);
2147   if (status != 0)
2148   {
2149     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2150              addr, gai_strerror (status));
2151     return (-1);
2152   }
2153
2154   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2155   {
2156     int fd;
2157     listen_socket_t *temp;
2158     int one = 1;
2159
2160     temp = (listen_socket_t *) realloc (listen_fds,
2161         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2162     if (temp == NULL)
2163     {
2164       fprintf (stderr,
2165                "rrdcached: open_listen_socket_network: realloc failed.\n");
2166       continue;
2167     }
2168     listen_fds = temp;
2169     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2170
2171     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2172     if (fd < 0)
2173     {
2174       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2175                rrd_strerror(errno));
2176       continue;
2177     }
2178
2179     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2180
2181     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2182     if (status != 0)
2183     {
2184       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2185                sock->addr, rrd_strerror(errno));
2186       close (fd);
2187       continue;
2188     }
2189
2190     status = listen (fd, /* backlog = */ 10);
2191     if (status != 0)
2192     {
2193       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2194                sock->addr, rrd_strerror(errno));
2195       close (fd);
2196       return (-1);
2197     }
2198
2199     listen_fds[listen_fds_num].fd = fd;
2200     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2201     listen_fds_num++;
2202   } /* for (ai_ptr) */
2203
2204   return (0);
2205 } /* }}} static int open_listen_socket_network */
2206
2207 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2208 {
2209   assert(sock != NULL);
2210   assert(sock->addr != NULL);
2211
2212   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2213       || sock->addr[0] == '/')
2214     return (open_listen_socket_unix(sock));
2215   else
2216     return (open_listen_socket_network(sock));
2217 } /* }}} int open_listen_socket */
2218
2219 static int close_listen_sockets (void) /* {{{ */
2220 {
2221   size_t i;
2222
2223   for (i = 0; i < listen_fds_num; i++)
2224   {
2225     close (listen_fds[i].fd);
2226
2227     if (listen_fds[i].family == PF_UNIX)
2228       unlink(listen_fds[i].addr);
2229   }
2230
2231   free (listen_fds);
2232   listen_fds = NULL;
2233   listen_fds_num = 0;
2234
2235   return (0);
2236 } /* }}} int close_listen_sockets */
2237
2238 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2239 {
2240   struct pollfd *pollfds;
2241   int pollfds_num;
2242   int status;
2243   int i;
2244
2245   if (listen_fds_num < 1)
2246   {
2247     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2248     return (NULL);
2249   }
2250
2251   pollfds_num = listen_fds_num;
2252   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2253   if (pollfds == NULL)
2254   {
2255     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2256     return (NULL);
2257   }
2258   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2259
2260   RRDD_LOG(LOG_INFO, "listening for connections");
2261
2262   while (do_shutdown == 0)
2263   {
2264     assert (pollfds_num == ((int) listen_fds_num));
2265     for (i = 0; i < pollfds_num; i++)
2266     {
2267       pollfds[i].fd = listen_fds[i].fd;
2268       pollfds[i].events = POLLIN | POLLPRI;
2269       pollfds[i].revents = 0;
2270     }
2271
2272     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2273     if (do_shutdown)
2274       break;
2275     else if (status == 0) /* timeout */
2276       continue;
2277     else if (status < 0) /* error */
2278     {
2279       status = errno;
2280       if (status != EINTR)
2281       {
2282         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2283       }
2284       continue;
2285     }
2286
2287     for (i = 0; i < pollfds_num; i++)
2288     {
2289       listen_socket_t *client_sock;
2290       struct sockaddr_storage client_sa;
2291       socklen_t client_sa_size;
2292       pthread_t tid;
2293       pthread_attr_t attr;
2294
2295       if (pollfds[i].revents == 0)
2296         continue;
2297
2298       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2299       {
2300         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2301             "poll(2) returned something unexpected for listen FD #%i.",
2302             pollfds[i].fd);
2303         continue;
2304       }
2305
2306       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2307       if (client_sock == NULL)
2308       {
2309         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2310         continue;
2311       }
2312       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2313
2314       client_sa_size = sizeof (client_sa);
2315       client_sock->fd = accept (pollfds[i].fd,
2316           (struct sockaddr *) &client_sa, &client_sa_size);
2317       if (client_sock->fd < 0)
2318       {
2319         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2320         free(client_sock);
2321         continue;
2322       }
2323
2324       pthread_attr_init (&attr);
2325       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2326
2327       status = pthread_create (&tid, &attr, connection_thread_main,
2328                                client_sock);
2329       if (status != 0)
2330       {
2331         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2332         close_connection(client_sock);
2333         continue;
2334       }
2335     } /* for (pollfds_num) */
2336   } /* while (do_shutdown == 0) */
2337
2338   RRDD_LOG(LOG_INFO, "starting shutdown");
2339
2340   close_listen_sockets ();
2341
2342   pthread_mutex_lock (&connection_threads_lock);
2343   while (connection_threads_num > 0)
2344   {
2345     pthread_t wait_for;
2346
2347     wait_for = connection_threads[0];
2348
2349     pthread_mutex_unlock (&connection_threads_lock);
2350     pthread_join (wait_for, /* retval = */ NULL);
2351     pthread_mutex_lock (&connection_threads_lock);
2352   }
2353   pthread_mutex_unlock (&connection_threads_lock);
2354
2355   return (NULL);
2356 } /* }}} void *listen_thread_main */
2357
2358 static int daemonize (void) /* {{{ */
2359 {
2360   int pid_fd;
2361   char *base_dir;
2362
2363   daemon_uid = geteuid();
2364
2365   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2366   if (pid_fd < 0)
2367     pid_fd = check_pidfile();
2368   if (pid_fd < 0)
2369     return pid_fd;
2370
2371   /* open all the listen sockets */
2372   if (config_listen_address_list_len > 0)
2373   {
2374     for (int i = 0; i < config_listen_address_list_len; i++)
2375       open_listen_socket (config_listen_address_list[i]);
2376   }
2377   else
2378   {
2379     listen_socket_t sock;
2380     memset(&sock, 0, sizeof(sock));
2381     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2382     open_listen_socket (&sock);
2383   }
2384
2385   if (listen_fds_num < 1)
2386   {
2387     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2388     goto error;
2389   }
2390
2391   if (!stay_foreground)
2392   {
2393     pid_t child;
2394
2395     child = fork ();
2396     if (child < 0)
2397     {
2398       fprintf (stderr, "daemonize: fork(2) failed.\n");
2399       goto error;
2400     }
2401     else if (child > 0)
2402       exit(0);
2403
2404     /* Become session leader */
2405     setsid ();
2406
2407     /* Open the first three file descriptors to /dev/null */
2408     close (2);
2409     close (1);
2410     close (0);
2411
2412     open ("/dev/null", O_RDWR);
2413     dup (0);
2414     dup (0);
2415   } /* if (!stay_foreground) */
2416
2417   /* Change into the /tmp directory. */
2418   base_dir = (config_base_dir != NULL)
2419     ? config_base_dir
2420     : "/tmp";
2421
2422   if (chdir (base_dir) != 0)
2423   {
2424     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2425     goto error;
2426   }
2427
2428   install_signal_handlers();
2429
2430   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2431   RRDD_LOG(LOG_INFO, "starting up");
2432
2433   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2434   if (cache_tree == NULL)
2435   {
2436     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2437     goto error;
2438   }
2439
2440   return write_pidfile (pid_fd);
2441
2442 error:
2443   remove_pidfile();
2444   return -1;
2445 } /* }}} int daemonize */
2446
2447 static int cleanup (void) /* {{{ */
2448 {
2449   do_shutdown++;
2450
2451   pthread_cond_signal (&cache_cond);
2452   pthread_join (queue_thread, /* return = */ NULL);
2453
2454   remove_pidfile ();
2455
2456   RRDD_LOG(LOG_INFO, "goodbye");
2457   closelog ();
2458
2459   return (0);
2460 } /* }}} int cleanup */
2461
2462 static int read_options (int argc, char **argv) /* {{{ */
2463 {
2464   int option;
2465   int status = 0;
2466
2467   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2468   {
2469     switch (option)
2470     {
2471       case 'g':
2472         stay_foreground=1;
2473         break;
2474
2475       case 'L':
2476       case 'l':
2477       {
2478         listen_socket_t **temp;
2479         listen_socket_t *new;
2480
2481         new = malloc(sizeof(listen_socket_t));
2482         if (new == NULL)
2483         {
2484           fprintf(stderr, "read_options: malloc failed.\n");
2485           return(2);
2486         }
2487         memset(new, 0, sizeof(listen_socket_t));
2488
2489         temp = (listen_socket_t **) realloc (config_listen_address_list,
2490             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2491         if (temp == NULL)
2492         {
2493           fprintf (stderr, "read_options: realloc failed.\n");
2494           return (2);
2495         }
2496         config_listen_address_list = temp;
2497
2498         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2499         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2500
2501         temp[config_listen_address_list_len] = new;
2502         config_listen_address_list_len++;
2503       }
2504       break;
2505
2506       case 'f':
2507       {
2508         int temp;
2509
2510         temp = atoi (optarg);
2511         if (temp > 0)
2512           config_flush_interval = temp;
2513         else
2514         {
2515           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2516           status = 3;
2517         }
2518       }
2519       break;
2520
2521       case 'w':
2522       {
2523         int temp;
2524
2525         temp = atoi (optarg);
2526         if (temp > 0)
2527           config_write_interval = temp;
2528         else
2529         {
2530           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2531           status = 2;
2532         }
2533       }
2534       break;
2535
2536       case 'z':
2537       {
2538         int temp;
2539
2540         temp = atoi(optarg);
2541         if (temp > 0)
2542           config_write_jitter = temp;
2543         else
2544         {
2545           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2546           status = 2;
2547         }
2548
2549         break;
2550       }
2551
2552       case 'B':
2553         config_write_base_only = 1;
2554         break;
2555
2556       case 'b':
2557       {
2558         size_t len;
2559         char base_realpath[PATH_MAX];
2560
2561         if (config_base_dir != NULL)
2562           free (config_base_dir);
2563         config_base_dir = strdup (optarg);
2564         if (config_base_dir == NULL)
2565         {
2566           fprintf (stderr, "read_options: strdup failed.\n");
2567           return (3);
2568         }
2569
2570         /* make sure that the base directory is not resolved via
2571          * symbolic links.  this makes some performance-enhancing
2572          * assumptions possible (we don't have to resolve paths
2573          * that start with a "/")
2574          */
2575         if (realpath(config_base_dir, base_realpath) == NULL)
2576         {
2577           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2578           return 5;
2579         }
2580         else if (strncmp(config_base_dir,
2581                          base_realpath, sizeof(base_realpath)) != 0)
2582         {
2583           fprintf(stderr,
2584                   "Base directory (-b) resolved via file system links!\n"
2585                   "Please consult rrdcached '-b' documentation!\n"
2586                   "Consider specifying the real directory (%s)\n",
2587                   base_realpath);
2588           return 5;
2589         }
2590
2591         len = strlen (config_base_dir);
2592         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2593         {
2594           config_base_dir[len - 1] = 0;
2595           len--;
2596         }
2597
2598         if (len < 1)
2599         {
2600           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2601           return (4);
2602         }
2603
2604         _config_base_dir_len = len;
2605       }
2606       break;
2607
2608       case 'p':
2609       {
2610         if (config_pid_file != NULL)
2611           free (config_pid_file);
2612         config_pid_file = strdup (optarg);
2613         if (config_pid_file == NULL)
2614         {
2615           fprintf (stderr, "read_options: strdup failed.\n");
2616           return (3);
2617         }
2618       }
2619       break;
2620
2621       case 'F':
2622         config_flush_at_shutdown = 1;
2623         break;
2624
2625       case 'j':
2626       {
2627         struct stat statbuf;
2628         const char *dir = optarg;
2629
2630         status = stat(dir, &statbuf);
2631         if (status != 0)
2632         {
2633           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2634           return 6;
2635         }
2636
2637         if (!S_ISDIR(statbuf.st_mode)
2638             || access(dir, R_OK|W_OK|X_OK) != 0)
2639         {
2640           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2641                   errno ? rrd_strerror(errno) : "");
2642           return 6;
2643         }
2644
2645         journal_cur = malloc(PATH_MAX + 1);
2646         journal_old = malloc(PATH_MAX + 1);
2647         if (journal_cur == NULL || journal_old == NULL)
2648         {
2649           fprintf(stderr, "malloc failure for journal files\n");
2650           return 6;
2651         }
2652         else 
2653         {
2654           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2655           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2656         }
2657       }
2658       break;
2659
2660       case 'h':
2661       case '?':
2662         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2663             "\n"
2664             "Usage: rrdcached [options]\n"
2665             "\n"
2666             "Valid options are:\n"
2667             "  -l <address>  Socket address to listen to.\n"
2668             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2669             "  -w <seconds>  Interval in which to write data.\n"
2670             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2671             "  -f <seconds>  Interval in which to flush dead data.\n"
2672             "  -p <file>     Location of the PID-file.\n"
2673             "  -b <dir>      Base directory to change to.\n"
2674             "  -B            Restrict file access to paths within -b <dir>\n"
2675             "  -g            Do not fork and run in the foreground.\n"
2676             "  -j <dir>      Directory in which to create the journal files.\n"
2677             "  -F            Always flush all updates at shutdown\n"
2678             "\n"
2679             "For more information and a detailed description of all options "
2680             "please refer\n"
2681             "to the rrdcached(1) manual page.\n",
2682             VERSION);
2683         status = -1;
2684         break;
2685     } /* switch (option) */
2686   } /* while (getopt) */
2687
2688   /* advise the user when values are not sane */
2689   if (config_flush_interval < 2 * config_write_interval)
2690     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2691             " 2x write interval (-w) !\n");
2692   if (config_write_jitter > config_write_interval)
2693     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2694             " write interval (-w) !\n");
2695
2696   if (config_write_base_only && config_base_dir == NULL)
2697     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2698             "  Consult the rrdcached documentation\n");
2699
2700   if (journal_cur == NULL)
2701     config_flush_at_shutdown = 1;
2702
2703   return (status);
2704 } /* }}} int read_options */
2705
2706 int main (int argc, char **argv)
2707 {
2708   int status;
2709
2710   status = read_options (argc, argv);
2711   if (status != 0)
2712   {
2713     if (status < 0)
2714       status = 0;
2715     return (status);
2716   }
2717
2718   status = daemonize ();
2719   if (status != 0)
2720   {
2721     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2722     return (1);
2723   }
2724
2725   journal_init();
2726
2727   /* start the queue thread */
2728   memset (&queue_thread, 0, sizeof (queue_thread));
2729   status = pthread_create (&queue_thread,
2730                            NULL, /* attr */
2731                            queue_thread_main,
2732                            NULL); /* args */
2733   if (status != 0)
2734   {
2735     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2736     cleanup();
2737     return (1);
2738   }
2739
2740   listen_thread_main (NULL);
2741   cleanup ();
2742
2743   return (0);
2744 } /* int main */
2745
2746 /*
2747  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2748  */