c491d14870905af2e1ea7cc85822533f939ab24b
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t *queue_threads;
191 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
192 static int config_queue_threads = 4;
193
194 static pthread_t flush_thread;
195 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
196
197 static pthread_t *connection_threads = NULL;
198 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
199 static int connection_threads_num = 0;
200
201 /* Cache stuff */
202 static GTree          *cache_tree = NULL;
203 static cache_item_t   *cache_queue_head = NULL;
204 static cache_item_t   *cache_queue_tail = NULL;
205 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
206
207 static int config_write_interval = 300;
208 static int config_write_jitter   = 0;
209 static int config_flush_interval = 3600;
210 static int config_flush_at_shutdown = 0;
211 static char *config_pid_file = NULL;
212 static char *config_base_dir = NULL;
213 static size_t _config_base_dir_len = 0;
214 static int config_write_base_only = 0;
215
216 static listen_socket_t **config_listen_address_list = NULL;
217 static int config_listen_address_list_len = 0;
218
219 static uint64_t stats_queue_length = 0;
220 static uint64_t stats_updates_received = 0;
221 static uint64_t stats_flush_received = 0;
222 static uint64_t stats_updates_written = 0;
223 static uint64_t stats_data_sets_written = 0;
224 static uint64_t stats_journal_bytes = 0;
225 static uint64_t stats_journal_rotate = 0;
226 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
227
228 /* Journaled updates */
229 static char *journal_cur = NULL;
230 static char *journal_old = NULL;
231 static FILE *journal_fh = NULL;
232 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
233 static int journal_write(char *cmd, char *args);
234 static void journal_done(void);
235 static void journal_rotate(void);
236
237 /* 
238  * Functions
239  */
240 static void sig_common (const char *sig) /* {{{ */
241 {
242   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
243   do_shutdown++;
244   pthread_cond_broadcast(&flush_cond);
245   pthread_cond_broadcast(&queue_cond);
246 } /* }}} void sig_common */
247
248 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("INT");
251 } /* }}} void sig_int_handler */
252
253 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   sig_common("TERM");
256 } /* }}} void sig_term_handler */
257
258 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
259 {
260   config_flush_at_shutdown = 1;
261   sig_common("USR1");
262 } /* }}} void sig_usr1_handler */
263
264 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
265 {
266   config_flush_at_shutdown = 0;
267   sig_common("USR2");
268 } /* }}} void sig_usr2_handler */
269
270 static void install_signal_handlers(void) /* {{{ */
271 {
272   /* These structures are static, because `sigaction' behaves weird if the are
273    * overwritten.. */
274   static struct sigaction sa_int;
275   static struct sigaction sa_term;
276   static struct sigaction sa_pipe;
277   static struct sigaction sa_usr1;
278   static struct sigaction sa_usr2;
279
280   /* Install signal handlers */
281   memset (&sa_int, 0, sizeof (sa_int));
282   sa_int.sa_handler = sig_int_handler;
283   sigaction (SIGINT, &sa_int, NULL);
284
285   memset (&sa_term, 0, sizeof (sa_term));
286   sa_term.sa_handler = sig_term_handler;
287   sigaction (SIGTERM, &sa_term, NULL);
288
289   memset (&sa_pipe, 0, sizeof (sa_pipe));
290   sa_pipe.sa_handler = SIG_IGN;
291   sigaction (SIGPIPE, &sa_pipe, NULL);
292
293   memset (&sa_pipe, 0, sizeof (sa_usr1));
294   sa_usr1.sa_handler = sig_usr1_handler;
295   sigaction (SIGUSR1, &sa_usr1, NULL);
296
297   memset (&sa_usr2, 0, sizeof (sa_usr2));
298   sa_usr2.sa_handler = sig_usr2_handler;
299   sigaction (SIGUSR2, &sa_usr2, NULL);
300
301 } /* }}} void install_signal_handlers */
302
303 static int open_pidfile(char *action, int oflag) /* {{{ */
304 {
305   int fd;
306   char *file;
307
308   file = (config_pid_file != NULL)
309     ? config_pid_file
310     : LOCALSTATEDIR "/run/rrdcached.pid";
311
312   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
313   if (fd < 0)
314     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
315             action, file, rrd_strerror(errno));
316
317   return(fd);
318 } /* }}} static int open_pidfile */
319
320 /* check existing pid file to see whether a daemon is running */
321 static int check_pidfile(void)
322 {
323   int pid_fd;
324   pid_t pid;
325   char pid_str[16];
326
327   pid_fd = open_pidfile("open", O_RDWR);
328   if (pid_fd < 0)
329     return pid_fd;
330
331   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
332     return -1;
333
334   pid = atoi(pid_str);
335   if (pid <= 0)
336     return -1;
337
338   /* another running process that we can signal COULD be
339    * a competing rrdcached */
340   if (pid != getpid() && kill(pid, 0) == 0)
341   {
342     fprintf(stderr,
343             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
344     close(pid_fd);
345     return -1;
346   }
347
348   lseek(pid_fd, 0, SEEK_SET);
349   ftruncate(pid_fd, 0);
350
351   fprintf(stderr,
352           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
353           "rrdcached: starting normally.\n", pid);
354
355   return pid_fd;
356 } /* }}} static int check_pidfile */
357
358 static int write_pidfile (int fd) /* {{{ */
359 {
360   pid_t pid;
361   FILE *fh;
362
363   pid = getpid ();
364
365   fh = fdopen (fd, "w");
366   if (fh == NULL)
367   {
368     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
369     close(fd);
370     return (-1);
371   }
372
373   fprintf (fh, "%i\n", (int) pid);
374   fclose (fh);
375
376   return (0);
377 } /* }}} int write_pidfile */
378
379 static int remove_pidfile (void) /* {{{ */
380 {
381   char *file;
382   int status;
383
384   file = (config_pid_file != NULL)
385     ? config_pid_file
386     : LOCALSTATEDIR "/run/rrdcached.pid";
387
388   status = unlink (file);
389   if (status == 0)
390     return (0);
391   return (errno);
392 } /* }}} int remove_pidfile */
393
394 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
395 {
396   char *eol;
397
398   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
399                sock->next_read - sock->next_cmd);
400
401   if (eol == NULL)
402   {
403     /* no commands left, move remainder back to front of rbuf */
404     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
405             sock->next_read - sock->next_cmd);
406     sock->next_read -= sock->next_cmd;
407     sock->next_cmd = 0;
408     *len = 0;
409     return NULL;
410   }
411   else
412   {
413     char *cmd = sock->rbuf + sock->next_cmd;
414     *eol = '\0';
415
416     sock->next_cmd = eol - sock->rbuf + 1;
417
418     if (eol > sock->rbuf && *(eol-1) == '\r')
419       *(--eol) = '\0'; /* handle "\r\n" EOL */
420
421     *len = eol - cmd;
422
423     return cmd;
424   }
425
426   /* NOTREACHED */
427   assert(1==0);
428 }
429
430 /* add the characters directly to the write buffer */
431 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
432 {
433   char *new_buf;
434
435   assert(sock != NULL);
436
437   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
438   if (new_buf == NULL)
439   {
440     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
441     return -1;
442   }
443
444   strncpy(new_buf + sock->wbuf_len, str, len + 1);
445
446   sock->wbuf = new_buf;
447   sock->wbuf_len += len;
448
449   return 0;
450 } /* }}} static int add_to_wbuf */
451
452 /* add the text to the "extra" info that's sent after the status line */
453 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
454 {
455   va_list argp;
456   char buffer[CMD_MAX];
457   int len;
458
459   if (sock == NULL) return 0; /* journal replay mode */
460   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
461
462   va_start(argp, fmt);
463 #ifdef HAVE_VSNPRINTF
464   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
465 #else
466   len = vsprintf(buffer, fmt, argp);
467 #endif
468   va_end(argp);
469   if (len < 0)
470   {
471     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
472     return -1;
473   }
474
475   return add_to_wbuf(sock, buffer, len);
476 } /* }}} static int add_response_info */
477
478 static int count_lines(char *str) /* {{{ */
479 {
480   int lines = 0;
481
482   if (str != NULL)
483   {
484     while ((str = strchr(str, '\n')) != NULL)
485     {
486       ++lines;
487       ++str;
488     }
489   }
490
491   return lines;
492 } /* }}} static int count_lines */
493
494 /* send the response back to the user.
495  * returns 0 on success, -1 on error
496  * write buffer is always zeroed after this call */
497 static int send_response (listen_socket_t *sock, response_code rc,
498                           char *fmt, ...) /* {{{ */
499 {
500   va_list argp;
501   char buffer[CMD_MAX];
502   int lines;
503   ssize_t wrote;
504   int rclen, len;
505
506   if (sock == NULL) return rc;  /* journal replay mode */
507
508   if (sock->batch_start)
509   {
510     if (rc == RESP_OK)
511       return rc; /* no response on success during BATCH */
512     lines = sock->batch_cmd;
513   }
514   else if (rc == RESP_OK)
515     lines = count_lines(sock->wbuf);
516   else
517     lines = -1;
518
519   rclen = sprintf(buffer, "%d ", lines);
520   va_start(argp, fmt);
521 #ifdef HAVE_VSNPRINTF
522   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
523 #else
524   len = vsprintf(buffer+rclen, fmt, argp);
525 #endif
526   va_end(argp);
527   if (len < 0)
528     return -1;
529
530   len += rclen;
531
532   /* append the result to the wbuf, don't write to the user */
533   if (sock->batch_start)
534     return add_to_wbuf(sock, buffer, len);
535
536   /* first write must be complete */
537   if (len != write(sock->fd, buffer, len))
538   {
539     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
540     return -1;
541   }
542
543   if (sock->wbuf != NULL && rc == RESP_OK)
544   {
545     wrote = 0;
546     while (wrote < sock->wbuf_len)
547     {
548       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
549       if (wb <= 0)
550       {
551         RRDD_LOG(LOG_INFO, "send_response: could not write results");
552         return -1;
553       }
554       wrote += wb;
555     }
556   }
557
558   free(sock->wbuf); sock->wbuf = NULL;
559   sock->wbuf_len = 0;
560
561   return 0;
562 } /* }}} */
563
564 static void wipe_ci_values(cache_item_t *ci, time_t when)
565 {
566   ci->values = NULL;
567   ci->values_num = 0;
568
569   ci->last_flush_time = when;
570   if (config_write_jitter > 0)
571     ci->last_flush_time += (random() % config_write_jitter);
572 }
573
574 /* remove_from_queue
575  * remove a "cache_item_t" item from the queue.
576  * must hold 'cache_lock' when calling this
577  */
578 static void remove_from_queue(cache_item_t *ci) /* {{{ */
579 {
580   if (ci == NULL) return;
581   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
582
583   if (ci->prev == NULL)
584     cache_queue_head = ci->next; /* reset head */
585   else
586     ci->prev->next = ci->next;
587
588   if (ci->next == NULL)
589     cache_queue_tail = ci->prev; /* reset the tail */
590   else
591     ci->next->prev = ci->prev;
592
593   ci->next = ci->prev = NULL;
594   ci->flags &= ~CI_FLAGS_IN_QUEUE;
595
596   pthread_mutex_lock (&stats_lock);
597   assert (stats_queue_length > 0);
598   stats_queue_length--;
599   pthread_mutex_unlock (&stats_lock);
600
601 } /* }}} static void remove_from_queue */
602
603 /* free the resources associated with the cache_item_t
604  * must hold cache_lock when calling this function
605  */
606 static void *free_cache_item(cache_item_t *ci) /* {{{ */
607 {
608   if (ci == NULL) return NULL;
609
610   remove_from_queue(ci);
611
612   for (int i=0; i < ci->values_num; i++)
613     free(ci->values[i]);
614
615   free (ci->values);
616   free (ci->file);
617
618   /* in case anyone is waiting */
619   pthread_cond_broadcast(&ci->flushed);
620
621   free (ci);
622
623   return NULL;
624 } /* }}} static void *free_cache_item */
625
626 /*
627  * enqueue_cache_item:
628  * `cache_lock' must be acquired before calling this function!
629  */
630 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
631     queue_side_t side)
632 {
633   if (ci == NULL)
634     return (-1);
635
636   if (ci->values_num == 0)
637     return (0);
638
639   if (side == HEAD)
640   {
641     if (cache_queue_head == ci)
642       return 0;
643
644     /* remove if further down in queue */
645     remove_from_queue(ci);
646
647     ci->prev = NULL;
648     ci->next = cache_queue_head;
649     if (ci->next != NULL)
650       ci->next->prev = ci;
651     cache_queue_head = ci;
652
653     if (cache_queue_tail == NULL)
654       cache_queue_tail = cache_queue_head;
655   }
656   else /* (side == TAIL) */
657   {
658     /* We don't move values back in the list.. */
659     if (ci->flags & CI_FLAGS_IN_QUEUE)
660       return (0);
661
662     assert (ci->next == NULL);
663     assert (ci->prev == NULL);
664
665     ci->prev = cache_queue_tail;
666
667     if (cache_queue_tail == NULL)
668       cache_queue_head = ci;
669     else
670       cache_queue_tail->next = ci;
671
672     cache_queue_tail = ci;
673   }
674
675   ci->flags |= CI_FLAGS_IN_QUEUE;
676
677   pthread_cond_signal(&queue_cond);
678   pthread_mutex_lock (&stats_lock);
679   stats_queue_length++;
680   pthread_mutex_unlock (&stats_lock);
681
682   return (0);
683 } /* }}} int enqueue_cache_item */
684
685 /*
686  * tree_callback_flush:
687  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
688  * while this is in progress.
689  */
690 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
691     gpointer data)
692 {
693   cache_item_t *ci;
694   callback_flush_data_t *cfd;
695
696   ci = (cache_item_t *) value;
697   cfd = (callback_flush_data_t *) data;
698
699   if (ci->flags & CI_FLAGS_IN_QUEUE)
700     return FALSE;
701
702   if ((ci->last_flush_time <= cfd->abs_timeout)
703       && (ci->values_num > 0))
704   {
705     enqueue_cache_item (ci, TAIL);
706   }
707   else if ((do_shutdown != 0)
708       && (ci->values_num > 0))
709   {
710     enqueue_cache_item (ci, TAIL);
711   }
712   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
713       && (ci->values_num <= 0))
714   {
715     char **temp;
716
717     temp = (char **) rrd_realloc (cfd->keys,
718         sizeof (char *) * (cfd->keys_num + 1));
719     if (temp == NULL)
720     {
721       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
722       return (FALSE);
723     }
724     cfd->keys = temp;
725     /* Make really sure this points to the _same_ place */
726     assert ((char *) key == ci->file);
727     cfd->keys[cfd->keys_num] = (char *) key;
728     cfd->keys_num++;
729   }
730
731   return (FALSE);
732 } /* }}} gboolean tree_callback_flush */
733
734 static int flush_old_values (int max_age)
735 {
736   callback_flush_data_t cfd;
737   size_t k;
738
739   memset (&cfd, 0, sizeof (cfd));
740   /* Pass the current time as user data so that we don't need to call
741    * `time' for each node. */
742   cfd.now = time (NULL);
743   cfd.keys = NULL;
744   cfd.keys_num = 0;
745
746   if (max_age > 0)
747     cfd.abs_timeout = cfd.now - max_age;
748   else
749     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
750
751   /* `tree_callback_flush' will return the keys of all values that haven't
752    * been touched in the last `config_flush_interval' seconds in `cfd'.
753    * The char*'s in this array point to the same memory as ci->file, so we
754    * don't need to free them separately. */
755   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
756
757   for (k = 0; k < cfd.keys_num; k++)
758   {
759     /* should never fail, since we have held the cache_lock
760      * the entire time */
761     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
762   }
763
764   if (cfd.keys != NULL)
765   {
766     free (cfd.keys);
767     cfd.keys = NULL;
768   }
769
770   return (0);
771 } /* int flush_old_values */
772
773 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
774 {
775   struct timeval now;
776   struct timespec next_flush;
777   int status;
778
779   gettimeofday (&now, NULL);
780   next_flush.tv_sec = now.tv_sec + config_flush_interval;
781   next_flush.tv_nsec = 1000 * now.tv_usec;
782
783   pthread_mutex_lock(&cache_lock);
784
785   while (!do_shutdown)
786   {
787     gettimeofday (&now, NULL);
788     if ((now.tv_sec > next_flush.tv_sec)
789         || ((now.tv_sec == next_flush.tv_sec)
790           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
791     {
792       /* Flush all values that haven't been written in the last
793        * `config_write_interval' seconds. */
794       flush_old_values (config_write_interval);
795
796       /* Determine the time of the next cache flush. */
797       next_flush.tv_sec =
798         now.tv_sec + next_flush.tv_sec % config_flush_interval;
799
800       /* unlock the cache while we rotate so we don't block incoming
801        * updates if the fsync() blocks on disk I/O */
802       pthread_mutex_unlock(&cache_lock);
803       journal_rotate();
804       pthread_mutex_lock(&cache_lock);
805     }
806
807     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
808     if (status != 0 && status != ETIMEDOUT)
809     {
810       RRDD_LOG (LOG_ERR, "flush_thread_main: "
811                 "pthread_cond_timedwait returned %i.", status);
812     }
813   }
814
815   if (config_flush_at_shutdown)
816     flush_old_values (-1); /* flush everything */
817
818   pthread_mutex_unlock(&cache_lock);
819
820   return NULL;
821 } /* void *flush_thread_main */
822
823 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
824 {
825   pthread_mutex_lock (&cache_lock);
826
827   while (!do_shutdown
828          || (cache_queue_head != NULL && config_flush_at_shutdown))
829   {
830     cache_item_t *ci;
831     char *file;
832     char **values;
833     int values_num;
834     int status;
835     int i;
836
837     /* Now, check if there's something to store away. If not, wait until
838      * something comes in.  if we are shutting down, do not wait around.  */
839     if (cache_queue_head == NULL && !do_shutdown)
840     {
841       status = pthread_cond_wait (&queue_cond, &cache_lock);
842       if ((status != 0) && (status != ETIMEDOUT))
843       {
844         RRDD_LOG (LOG_ERR, "queue_thread_main: "
845             "pthread_cond_wait returned %i.", status);
846       }
847     }
848
849     /* Check if a value has arrived. This may be NULL if we timed out or there
850      * was an interrupt such as a signal. */
851     if (cache_queue_head == NULL)
852       continue;
853
854     ci = cache_queue_head;
855
856     /* copy the relevant parts */
857     file = strdup (ci->file);
858     if (file == NULL)
859     {
860       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
861       continue;
862     }
863
864     assert(ci->values != NULL);
865     assert(ci->values_num > 0);
866
867     values = ci->values;
868     values_num = ci->values_num;
869
870     wipe_ci_values(ci, time(NULL));
871     remove_from_queue(ci);
872
873     pthread_mutex_unlock (&cache_lock);
874
875     rrd_clear_error ();
876     status = rrd_update_r (file, NULL, values_num, (void *) values);
877     if (status != 0)
878     {
879       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
880           "rrd_update_r (%s) failed with status %i. (%s)",
881           file, status, rrd_get_error());
882     }
883
884     journal_write("wrote", file);
885     pthread_cond_broadcast(&ci->flushed);
886
887     for (i = 0; i < values_num; i++)
888       free (values[i]);
889
890     free(values);
891     free(file);
892
893     if (status == 0)
894     {
895       pthread_mutex_lock (&stats_lock);
896       stats_updates_written++;
897       stats_data_sets_written += values_num;
898       pthread_mutex_unlock (&stats_lock);
899     }
900
901     pthread_mutex_lock (&cache_lock);
902   }
903   pthread_mutex_unlock (&cache_lock);
904
905   return (NULL);
906 } /* }}} void *queue_thread_main */
907
908 static int buffer_get_field (char **buffer_ret, /* {{{ */
909     size_t *buffer_size_ret, char **field_ret)
910 {
911   char *buffer;
912   size_t buffer_pos;
913   size_t buffer_size;
914   char *field;
915   size_t field_size;
916   int status;
917
918   buffer = *buffer_ret;
919   buffer_pos = 0;
920   buffer_size = *buffer_size_ret;
921   field = *buffer_ret;
922   field_size = 0;
923
924   if (buffer_size <= 0)
925     return (-1);
926
927   /* This is ensured by `handle_request'. */
928   assert (buffer[buffer_size - 1] == '\0');
929
930   status = -1;
931   while (buffer_pos < buffer_size)
932   {
933     /* Check for end-of-field or end-of-buffer */
934     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
935     {
936       field[field_size] = 0;
937       field_size++;
938       buffer_pos++;
939       status = 0;
940       break;
941     }
942     /* Handle escaped characters. */
943     else if (buffer[buffer_pos] == '\\')
944     {
945       if (buffer_pos >= (buffer_size - 1))
946         break;
947       buffer_pos++;
948       field[field_size] = buffer[buffer_pos];
949       field_size++;
950       buffer_pos++;
951     }
952     /* Normal operation */ 
953     else
954     {
955       field[field_size] = buffer[buffer_pos];
956       field_size++;
957       buffer_pos++;
958     }
959   } /* while (buffer_pos < buffer_size) */
960
961   if (status != 0)
962     return (status);
963
964   *buffer_ret = buffer + buffer_pos;
965   *buffer_size_ret = buffer_size - buffer_pos;
966   *field_ret = field;
967
968   return (0);
969 } /* }}} int buffer_get_field */
970
971 /* if we're restricting writes to the base directory,
972  * check whether the file falls within the dir
973  * returns 1 if OK, otherwise 0
974  */
975 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
976 {
977   assert(file != NULL);
978
979   if (!config_write_base_only
980       || sock == NULL /* journal replay */
981       || config_base_dir == NULL)
982     return 1;
983
984   if (strstr(file, "../") != NULL) goto err;
985
986   /* relative paths without "../" are ok */
987   if (*file != '/') return 1;
988
989   /* file must be of the format base + "/" + <1+ char filename> */
990   if (strlen(file) < _config_base_dir_len + 2) goto err;
991   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
992   if (*(file + _config_base_dir_len) != '/') goto err;
993
994   return 1;
995
996 err:
997   if (sock != NULL && sock->fd >= 0)
998     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
999
1000   return 0;
1001 } /* }}} static int check_file_access */
1002
1003 /* when using a base dir, convert relative paths to absolute paths.
1004  * if necessary, modifies the "filename" pointer to point
1005  * to the new path created in "tmp".  "tmp" is provided
1006  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1007  *
1008  * this allows us to optimize for the expected case (absolute path)
1009  * with a no-op.
1010  */
1011 static void get_abs_path(char **filename, char *tmp)
1012 {
1013   assert(tmp != NULL);
1014   assert(filename != NULL && *filename != NULL);
1015
1016   if (config_base_dir == NULL || **filename == '/')
1017     return;
1018
1019   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1020   *filename = tmp;
1021 } /* }}} static int get_abs_path */
1022
1023 /* returns 1 if we have the required privilege level,
1024  * otherwise issue an error to the user on sock */
1025 static int has_privilege (listen_socket_t *sock, /* {{{ */
1026                           socket_privilege priv)
1027 {
1028   if (sock == NULL) /* journal replay */
1029     return 1;
1030
1031   if (sock->privilege >= priv)
1032     return 1;
1033
1034   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1035 } /* }}} static int has_privilege */
1036
1037 static int flush_file (const char *filename) /* {{{ */
1038 {
1039   cache_item_t *ci;
1040
1041   pthread_mutex_lock (&cache_lock);
1042
1043   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1044   if (ci == NULL)
1045   {
1046     pthread_mutex_unlock (&cache_lock);
1047     return (ENOENT);
1048   }
1049
1050   if (ci->values_num > 0)
1051   {
1052     /* Enqueue at head */
1053     enqueue_cache_item (ci, HEAD);
1054     pthread_cond_wait(&ci->flushed, &cache_lock);
1055   }
1056
1057   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1058    * may have been purged during our cond_wait() */
1059
1060   pthread_mutex_unlock(&cache_lock);
1061
1062   return (0);
1063 } /* }}} int flush_file */
1064
1065 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1066     char *buffer, size_t buffer_size)
1067 {
1068   int status;
1069   char **help_text;
1070   char *command;
1071
1072   char *help_help[2] =
1073   {
1074     "Command overview\n"
1075     ,
1076     "HELP [<command>]\n"
1077     "FLUSH <filename>\n"
1078     "FLUSHALL\n"
1079     "PENDING <filename>\n"
1080     "FORGET <filename>\n"
1081     "QUEUE\n"
1082     "UPDATE <filename> <values> [<values> ...]\n"
1083     "BATCH\n"
1084     "STATS\n"
1085     "QUIT\n"
1086   };
1087
1088   char *help_flush[2] =
1089   {
1090     "Help for FLUSH\n"
1091     ,
1092     "Usage: FLUSH <filename>\n"
1093     "\n"
1094     "Adds the given filename to the head of the update queue and returns\n"
1095     "after is has been dequeued.\n"
1096   };
1097
1098   char *help_flushall[2] =
1099   {
1100     "Help for FLUSHALL\n"
1101     ,
1102     "Usage: FLUSHALL\n"
1103     "\n"
1104     "Triggers writing of all pending updates.  Returns immediately.\n"
1105   };
1106
1107   char *help_pending[2] =
1108   {
1109     "Help for PENDING\n"
1110     ,
1111     "Usage: PENDING <filename>\n"
1112     "\n"
1113     "Shows any 'pending' updates for a file, in order.\n"
1114     "The updates shown have not yet been written to the underlying RRD file.\n"
1115   };
1116
1117   char *help_forget[2] =
1118   {
1119     "Help for FORGET\n"
1120     ,
1121     "Usage: FORGET <filename>\n"
1122     "\n"
1123     "Removes the file completely from the cache.\n"
1124     "Any pending updates for the file will be lost.\n"
1125   };
1126
1127   char *help_queue[2] =
1128   {
1129     "Help for QUEUE\n"
1130     ,
1131     "Shows all files in the output queue.\n"
1132     "The output is zero or more lines in the following format:\n"
1133     "(where <num_vals> is the number of values to be written)\n"
1134     "\n"
1135     "<num_vals> <filename>\n"
1136     "\n"
1137   };
1138
1139   char *help_update[2] =
1140   {
1141     "Help for UPDATE\n"
1142     ,
1143     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1144     "\n"
1145     "Adds the given file to the internal cache if it is not yet known and\n"
1146     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1147     "for details.\n"
1148     "\n"
1149     "Each <values> has the following form:\n"
1150     "  <values> = <time>:<value>[:<value>[...]]\n"
1151     "See the rrdupdate(1) manpage for details.\n"
1152   };
1153
1154   char *help_stats[2] =
1155   {
1156     "Help for STATS\n"
1157     ,
1158     "Usage: STATS\n"
1159     "\n"
1160     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1161     "a description of the values.\n"
1162   };
1163
1164   char *help_batch[2] =
1165   {
1166     "Help for BATCH\n"
1167     ,
1168     "The 'BATCH' command permits the client to initiate a bulk load\n"
1169     "   of commands to rrdcached.\n"
1170     "\n"
1171     "Usage:\n"
1172     "\n"
1173     "    client: BATCH\n"
1174     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1175     "    client: command #1\n"
1176     "    client: command #2\n"
1177     "    client: ... and so on\n"
1178     "    client: .\n"
1179     "    server: 2 errors\n"
1180     "    server: 7 message for command #7\n"
1181     "    server: 9 message for command #9\n"
1182     "\n"
1183     "For more information, consult the rrdcached(1) documentation.\n"
1184   };
1185
1186   char *help_quit[2] =
1187   {
1188     "Help for QUIT\n"
1189     ,
1190     "Disconnect from rrdcached.\n"
1191   };
1192
1193   status = buffer_get_field (&buffer, &buffer_size, &command);
1194   if (status != 0)
1195     help_text = help_help;
1196   else
1197   {
1198     if (strcasecmp (command, "update") == 0)
1199       help_text = help_update;
1200     else if (strcasecmp (command, "flush") == 0)
1201       help_text = help_flush;
1202     else if (strcasecmp (command, "flushall") == 0)
1203       help_text = help_flushall;
1204     else if (strcasecmp (command, "pending") == 0)
1205       help_text = help_pending;
1206     else if (strcasecmp (command, "forget") == 0)
1207       help_text = help_forget;
1208     else if (strcasecmp (command, "queue") == 0)
1209       help_text = help_queue;
1210     else if (strcasecmp (command, "stats") == 0)
1211       help_text = help_stats;
1212     else if (strcasecmp (command, "batch") == 0)
1213       help_text = help_batch;
1214     else if (strcasecmp (command, "quit") == 0)
1215       help_text = help_quit;
1216     else
1217       help_text = help_help;
1218   }
1219
1220   add_response_info(sock, help_text[1]);
1221   return send_response(sock, RESP_OK, help_text[0]);
1222 } /* }}} int handle_request_help */
1223
1224 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1225 {
1226   uint64_t copy_queue_length;
1227   uint64_t copy_updates_received;
1228   uint64_t copy_flush_received;
1229   uint64_t copy_updates_written;
1230   uint64_t copy_data_sets_written;
1231   uint64_t copy_journal_bytes;
1232   uint64_t copy_journal_rotate;
1233
1234   uint64_t tree_nodes_number;
1235   uint64_t tree_depth;
1236
1237   pthread_mutex_lock (&stats_lock);
1238   copy_queue_length       = stats_queue_length;
1239   copy_updates_received   = stats_updates_received;
1240   copy_flush_received     = stats_flush_received;
1241   copy_updates_written    = stats_updates_written;
1242   copy_data_sets_written  = stats_data_sets_written;
1243   copy_journal_bytes      = stats_journal_bytes;
1244   copy_journal_rotate     = stats_journal_rotate;
1245   pthread_mutex_unlock (&stats_lock);
1246
1247   pthread_mutex_lock (&cache_lock);
1248   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1249   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1250   pthread_mutex_unlock (&cache_lock);
1251
1252   add_response_info(sock,
1253                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1254   add_response_info(sock,
1255                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1256   add_response_info(sock,
1257                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1258   add_response_info(sock,
1259                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1260   add_response_info(sock,
1261                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1262   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1263   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1264   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1265   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1266
1267   send_response(sock, RESP_OK, "Statistics follow\n");
1268
1269   return (0);
1270 } /* }}} int handle_request_stats */
1271
1272 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1273     char *buffer, size_t buffer_size)
1274 {
1275   char *file, file_tmp[PATH_MAX];
1276   int status;
1277
1278   status = buffer_get_field (&buffer, &buffer_size, &file);
1279   if (status != 0)
1280   {
1281     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1282   }
1283   else
1284   {
1285     pthread_mutex_lock(&stats_lock);
1286     stats_flush_received++;
1287     pthread_mutex_unlock(&stats_lock);
1288
1289     get_abs_path(&file, file_tmp);
1290     if (!check_file_access(file, sock)) return 0;
1291
1292     status = flush_file (file);
1293     if (status == 0)
1294       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1295     else if (status == ENOENT)
1296     {
1297       /* no file in our tree; see whether it exists at all */
1298       struct stat statbuf;
1299
1300       memset(&statbuf, 0, sizeof(statbuf));
1301       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1302         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1303       else
1304         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1305     }
1306     else if (status < 0)
1307       return send_response(sock, RESP_ERR, "Internal error.\n");
1308     else
1309       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1310   }
1311
1312   /* NOTREACHED */
1313   assert(1==0);
1314 } /* }}} int handle_request_flush */
1315
1316 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1317 {
1318   int status;
1319
1320   status = has_privilege(sock, PRIV_HIGH);
1321   if (status <= 0)
1322     return status;
1323
1324   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1325
1326   pthread_mutex_lock(&cache_lock);
1327   flush_old_values(-1);
1328   pthread_mutex_unlock(&cache_lock);
1329
1330   return send_response(sock, RESP_OK, "Started flush.\n");
1331 } /* }}} static int handle_request_flushall */
1332
1333 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1334                                   char *buffer, size_t buffer_size)
1335 {
1336   int status;
1337   char *file, file_tmp[PATH_MAX];
1338   cache_item_t *ci;
1339
1340   status = buffer_get_field(&buffer, &buffer_size, &file);
1341   if (status != 0)
1342     return send_response(sock, RESP_ERR,
1343                          "Usage: PENDING <filename>\n");
1344
1345   status = has_privilege(sock, PRIV_HIGH);
1346   if (status <= 0)
1347     return status;
1348
1349   get_abs_path(&file, file_tmp);
1350
1351   pthread_mutex_lock(&cache_lock);
1352   ci = g_tree_lookup(cache_tree, file);
1353   if (ci == NULL)
1354   {
1355     pthread_mutex_unlock(&cache_lock);
1356     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1357   }
1358
1359   for (int i=0; i < ci->values_num; i++)
1360     add_response_info(sock, "%s\n", ci->values[i]);
1361
1362   pthread_mutex_unlock(&cache_lock);
1363   return send_response(sock, RESP_OK, "updates pending\n");
1364 } /* }}} static int handle_request_pending */
1365
1366 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1367                                  char *buffer, size_t buffer_size)
1368 {
1369   int status;
1370   gboolean found;
1371   char *file, file_tmp[PATH_MAX];
1372
1373   status = buffer_get_field(&buffer, &buffer_size, &file);
1374   if (status != 0)
1375     return send_response(sock, RESP_ERR,
1376                          "Usage: FORGET <filename>\n");
1377
1378   status = has_privilege(sock, PRIV_HIGH);
1379   if (status <= 0)
1380     return status;
1381
1382   get_abs_path(&file, file_tmp);
1383   if (!check_file_access(file, sock)) return 0;
1384
1385   pthread_mutex_lock(&cache_lock);
1386   found = g_tree_remove(cache_tree, file);
1387   pthread_mutex_unlock(&cache_lock);
1388
1389   if (found == TRUE)
1390   {
1391     if (sock != NULL)
1392       journal_write("forget", file);
1393
1394     return send_response(sock, RESP_OK, "Gone!\n");
1395   }
1396   else
1397     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1398
1399   /* NOTREACHED */
1400   assert(1==0);
1401 } /* }}} static int handle_request_forget */
1402
1403 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1404 {
1405   cache_item_t *ci;
1406
1407   pthread_mutex_lock(&cache_lock);
1408
1409   ci = cache_queue_head;
1410   while (ci != NULL)
1411   {
1412     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1413     ci = ci->next;
1414   }
1415
1416   pthread_mutex_unlock(&cache_lock);
1417
1418   return send_response(sock, RESP_OK, "in queue.\n");
1419 } /* }}} int handle_request_queue */
1420
1421 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1422                                   time_t now,
1423                                   char *buffer, size_t buffer_size)
1424 {
1425   char *file, file_tmp[PATH_MAX];
1426   int values_num = 0;
1427   int status;
1428   char orig_buf[CMD_MAX];
1429
1430   cache_item_t *ci;
1431
1432   status = has_privilege(sock, PRIV_HIGH);
1433   if (status <= 0)
1434     return status;
1435
1436   /* save it for the journal later */
1437   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1438
1439   status = buffer_get_field (&buffer, &buffer_size, &file);
1440   if (status != 0)
1441     return send_response(sock, RESP_ERR,
1442                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1443
1444   pthread_mutex_lock(&stats_lock);
1445   stats_updates_received++;
1446   pthread_mutex_unlock(&stats_lock);
1447
1448   get_abs_path(&file, file_tmp);
1449   if (!check_file_access(file, sock)) return 0;
1450
1451   pthread_mutex_lock (&cache_lock);
1452   ci = g_tree_lookup (cache_tree, file);
1453
1454   if (ci == NULL) /* {{{ */
1455   {
1456     struct stat statbuf;
1457
1458     /* don't hold the lock while we setup; stat(2) might block */
1459     pthread_mutex_unlock(&cache_lock);
1460
1461     memset (&statbuf, 0, sizeof (statbuf));
1462     status = stat (file, &statbuf);
1463     if (status != 0)
1464     {
1465       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1466
1467       status = errno;
1468       if (status == ENOENT)
1469         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1470       else
1471         return send_response(sock, RESP_ERR,
1472                              "stat failed with error %i.\n", status);
1473     }
1474     if (!S_ISREG (statbuf.st_mode))
1475       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1476
1477     if (access(file, R_OK|W_OK) != 0)
1478       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1479                            file, rrd_strerror(errno));
1480
1481     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1482     if (ci == NULL)
1483     {
1484       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1485
1486       return send_response(sock, RESP_ERR, "malloc failed.\n");
1487     }
1488     memset (ci, 0, sizeof (cache_item_t));
1489
1490     ci->file = strdup (file);
1491     if (ci->file == NULL)
1492     {
1493       free (ci);
1494       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1495
1496       return send_response(sock, RESP_ERR, "strdup failed.\n");
1497     }
1498
1499     wipe_ci_values(ci, now);
1500     ci->flags = CI_FLAGS_IN_TREE;
1501     pthread_cond_init(&ci->flushed, NULL);
1502
1503     pthread_mutex_lock(&cache_lock);
1504     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1505   } /* }}} */
1506   assert (ci != NULL);
1507
1508   /* don't re-write updates in replay mode */
1509   if (sock != NULL)
1510     journal_write("update", orig_buf);
1511
1512   while (buffer_size > 0)
1513   {
1514     char **temp;
1515     char *value;
1516     time_t stamp;
1517     char *eostamp;
1518
1519     status = buffer_get_field (&buffer, &buffer_size, &value);
1520     if (status != 0)
1521     {
1522       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1523       break;
1524     }
1525
1526     /* make sure update time is always moving forward */
1527     stamp = strtol(value, &eostamp, 10);
1528     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1529     {
1530       pthread_mutex_unlock(&cache_lock);
1531       return send_response(sock, RESP_ERR,
1532                            "Cannot find timestamp in '%s'!\n", value);
1533     }
1534     else if (stamp <= ci->last_update_stamp)
1535     {
1536       pthread_mutex_unlock(&cache_lock);
1537       return send_response(sock, RESP_ERR,
1538                            "illegal attempt to update using time %ld when last"
1539                            " update time is %ld (minimum one second step)\n",
1540                            stamp, ci->last_update_stamp);
1541     }
1542     else
1543       ci->last_update_stamp = stamp;
1544
1545     temp = (char **) rrd_realloc (ci->values,
1546         sizeof (char *) * (ci->values_num + 1));
1547     if (temp == NULL)
1548     {
1549       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1550       continue;
1551     }
1552     ci->values = temp;
1553
1554     ci->values[ci->values_num] = strdup (value);
1555     if (ci->values[ci->values_num] == NULL)
1556     {
1557       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1558       continue;
1559     }
1560     ci->values_num++;
1561
1562     values_num++;
1563   }
1564
1565   if (((now - ci->last_flush_time) >= config_write_interval)
1566       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1567       && (ci->values_num > 0))
1568   {
1569     enqueue_cache_item (ci, TAIL);
1570   }
1571
1572   pthread_mutex_unlock (&cache_lock);
1573
1574   if (values_num < 1)
1575     return send_response(sock, RESP_ERR, "No values updated.\n");
1576   else
1577     return send_response(sock, RESP_OK,
1578                          "errors, enqueued %i value(s).\n", values_num);
1579
1580   /* NOTREACHED */
1581   assert(1==0);
1582
1583 } /* }}} int handle_request_update */
1584
1585 /* we came across a "WROTE" entry during journal replay.
1586  * throw away any values that we have accumulated for this file
1587  */
1588 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1589 {
1590   int i;
1591   cache_item_t *ci;
1592   const char *file = buffer;
1593
1594   pthread_mutex_lock(&cache_lock);
1595
1596   ci = g_tree_lookup(cache_tree, file);
1597   if (ci == NULL)
1598   {
1599     pthread_mutex_unlock(&cache_lock);
1600     return (0);
1601   }
1602
1603   if (ci->values)
1604   {
1605     for (i=0; i < ci->values_num; i++)
1606       free(ci->values[i]);
1607
1608     free(ci->values);
1609   }
1610
1611   wipe_ci_values(ci, now);
1612   remove_from_queue(ci);
1613
1614   pthread_mutex_unlock(&cache_lock);
1615   return (0);
1616 } /* }}} int handle_request_wrote */
1617
1618 /* start "BATCH" processing */
1619 static int batch_start (listen_socket_t *sock) /* {{{ */
1620 {
1621   int status;
1622   if (sock->batch_start)
1623     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1624
1625   status = send_response(sock, RESP_OK,
1626                          "Go ahead.  End with dot '.' on its own line.\n");
1627   sock->batch_start = time(NULL);
1628   sock->batch_cmd = 0;
1629
1630   return status;
1631 } /* }}} static int batch_start */
1632
1633 /* finish "BATCH" processing and return results to the client */
1634 static int batch_done (listen_socket_t *sock) /* {{{ */
1635 {
1636   assert(sock->batch_start);
1637   sock->batch_start = 0;
1638   sock->batch_cmd  = 0;
1639   return send_response(sock, RESP_OK, "errors\n");
1640 } /* }}} static int batch_done */
1641
1642 /* if sock==NULL, we are in journal replay mode */
1643 static int handle_request (listen_socket_t *sock, /* {{{ */
1644                            time_t now,
1645                            char *buffer, size_t buffer_size)
1646 {
1647   char *buffer_ptr;
1648   char *command;
1649   int status;
1650
1651   assert (buffer[buffer_size - 1] == '\0');
1652
1653   buffer_ptr = buffer;
1654   command = NULL;
1655   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1656   if (status != 0)
1657   {
1658     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1659     return (-1);
1660   }
1661
1662   if (sock != NULL && sock->batch_start)
1663     sock->batch_cmd++;
1664
1665   if (strcasecmp (command, "update") == 0)
1666     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1667   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1668   {
1669     /* this is only valid in replay mode */
1670     return (handle_request_wrote (buffer_ptr, now));
1671   }
1672   else if (strcasecmp (command, "flush") == 0)
1673     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1674   else if (strcasecmp (command, "flushall") == 0)
1675     return (handle_request_flushall(sock));
1676   else if (strcasecmp (command, "pending") == 0)
1677     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1678   else if (strcasecmp (command, "forget") == 0)
1679     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1680   else if (strcasecmp (command, "queue") == 0)
1681     return (handle_request_queue(sock));
1682   else if (strcasecmp (command, "stats") == 0)
1683     return (handle_request_stats (sock));
1684   else if (strcasecmp (command, "help") == 0)
1685     return (handle_request_help (sock, buffer_ptr, buffer_size));
1686   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1687     return batch_start(sock);
1688   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1689     return batch_done(sock);
1690   else if (strcasecmp (command, "quit") == 0)
1691     return -1;
1692   else
1693     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1694
1695   /* NOTREACHED */
1696   assert(1==0);
1697 } /* }}} int handle_request */
1698
1699 /* MUST NOT hold journal_lock before calling this */
1700 static void journal_rotate(void) /* {{{ */
1701 {
1702   FILE *old_fh = NULL;
1703   int new_fd;
1704
1705   if (journal_cur == NULL || journal_old == NULL)
1706     return;
1707
1708   pthread_mutex_lock(&journal_lock);
1709
1710   /* we rotate this way (rename before close) so that the we can release
1711    * the journal lock as fast as possible.  Journal writes to the new
1712    * journal can proceed immediately after the new file is opened.  The
1713    * fclose can then block without affecting new updates.
1714    */
1715   if (journal_fh != NULL)
1716   {
1717     old_fh = journal_fh;
1718     journal_fh = NULL;
1719     rename(journal_cur, journal_old);
1720     ++stats_journal_rotate;
1721   }
1722
1723   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1724                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1725   if (new_fd >= 0)
1726   {
1727     journal_fh = fdopen(new_fd, "a");
1728     if (journal_fh == NULL)
1729       close(new_fd);
1730   }
1731
1732   pthread_mutex_unlock(&journal_lock);
1733
1734   if (old_fh != NULL)
1735     fclose(old_fh);
1736
1737   if (journal_fh == NULL)
1738   {
1739     RRDD_LOG(LOG_CRIT,
1740              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1741              journal_cur, rrd_strerror(errno));
1742
1743     RRDD_LOG(LOG_ERR,
1744              "JOURNALING DISABLED: All values will be flushed at shutdown");
1745     config_flush_at_shutdown = 1;
1746   }
1747
1748 } /* }}} static void journal_rotate */
1749
1750 static void journal_done(void) /* {{{ */
1751 {
1752   if (journal_cur == NULL)
1753     return;
1754
1755   pthread_mutex_lock(&journal_lock);
1756   if (journal_fh != NULL)
1757   {
1758     fclose(journal_fh);
1759     journal_fh = NULL;
1760   }
1761
1762   if (config_flush_at_shutdown)
1763   {
1764     RRDD_LOG(LOG_INFO, "removing journals");
1765     unlink(journal_old);
1766     unlink(journal_cur);
1767   }
1768   else
1769   {
1770     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1771              "journals will be used at next startup");
1772   }
1773
1774   pthread_mutex_unlock(&journal_lock);
1775
1776 } /* }}} static void journal_done */
1777
1778 static int journal_write(char *cmd, char *args) /* {{{ */
1779 {
1780   int chars;
1781
1782   if (journal_fh == NULL)
1783     return 0;
1784
1785   pthread_mutex_lock(&journal_lock);
1786   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1787   pthread_mutex_unlock(&journal_lock);
1788
1789   if (chars > 0)
1790   {
1791     pthread_mutex_lock(&stats_lock);
1792     stats_journal_bytes += chars;
1793     pthread_mutex_unlock(&stats_lock);
1794   }
1795
1796   return chars;
1797 } /* }}} static int journal_write */
1798
1799 static int journal_replay (const char *file) /* {{{ */
1800 {
1801   FILE *fh;
1802   int entry_cnt = 0;
1803   int fail_cnt = 0;
1804   uint64_t line = 0;
1805   char entry[CMD_MAX];
1806   time_t now;
1807
1808   if (file == NULL) return 0;
1809
1810   {
1811     char *reason = "unknown error";
1812     int status = 0;
1813     struct stat statbuf;
1814
1815     memset(&statbuf, 0, sizeof(statbuf));
1816     if (stat(file, &statbuf) != 0)
1817     {
1818       if (errno == ENOENT)
1819         return 0;
1820
1821       reason = "stat error";
1822       status = errno;
1823     }
1824     else if (!S_ISREG(statbuf.st_mode))
1825     {
1826       reason = "not a regular file";
1827       status = EPERM;
1828     }
1829     if (statbuf.st_uid != daemon_uid)
1830     {
1831       reason = "not owned by daemon user";
1832       status = EACCES;
1833     }
1834     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1835     {
1836       reason = "must not be user/group writable";
1837       status = EACCES;
1838     }
1839
1840     if (status != 0)
1841     {
1842       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1843                file, rrd_strerror(status), reason);
1844       return 0;
1845     }
1846   }
1847
1848   fh = fopen(file, "r");
1849   if (fh == NULL)
1850   {
1851     if (errno != ENOENT)
1852       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1853                file, rrd_strerror(errno));
1854     return 0;
1855   }
1856   else
1857     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1858
1859   now = time(NULL);
1860
1861   while(!feof(fh))
1862   {
1863     size_t entry_len;
1864
1865     ++line;
1866     if (fgets(entry, sizeof(entry), fh) == NULL)
1867       break;
1868     entry_len = strlen(entry);
1869
1870     /* check \n termination in case journal writing crashed mid-line */
1871     if (entry_len == 0)
1872       continue;
1873     else if (entry[entry_len - 1] != '\n')
1874     {
1875       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1876       ++fail_cnt;
1877       continue;
1878     }
1879
1880     entry[entry_len - 1] = '\0';
1881
1882     if (handle_request(NULL, now, entry, entry_len) == 0)
1883       ++entry_cnt;
1884     else
1885       ++fail_cnt;
1886   }
1887
1888   fclose(fh);
1889
1890   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1891            entry_cnt, fail_cnt);
1892
1893   return entry_cnt > 0 ? 1 : 0;
1894 } /* }}} static int journal_replay */
1895
1896 static void journal_init(void) /* {{{ */
1897 {
1898   int had_journal = 0;
1899
1900   if (journal_cur == NULL) return;
1901
1902   pthread_mutex_lock(&journal_lock);
1903
1904   RRDD_LOG(LOG_INFO, "checking for journal files");
1905
1906   had_journal += journal_replay(journal_old);
1907   had_journal += journal_replay(journal_cur);
1908
1909   /* it must have been a crash.  start a flush */
1910   if (had_journal && config_flush_at_shutdown)
1911     flush_old_values(-1);
1912
1913   pthread_mutex_unlock(&journal_lock);
1914   journal_rotate();
1915
1916   RRDD_LOG(LOG_INFO, "journal processing complete");
1917
1918 } /* }}} static void journal_init */
1919
1920 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1921 {
1922   assert(sock != NULL);
1923
1924   free(sock->rbuf);  sock->rbuf = NULL;
1925   free(sock->wbuf);  sock->wbuf = NULL;
1926   free(sock);
1927 } /* }}} void free_listen_socket */
1928
1929 static void close_connection(listen_socket_t *sock) /* {{{ */
1930 {
1931   if (sock->fd >= 0)
1932   {
1933     close(sock->fd);
1934     sock->fd = -1;
1935   }
1936
1937   free_listen_socket(sock);
1938
1939 } /* }}} void close_connection */
1940
1941 static void *connection_thread_main (void *args) /* {{{ */
1942 {
1943   listen_socket_t *sock;
1944   int i;
1945   int fd;
1946
1947   sock = (listen_socket_t *) args;
1948   fd = sock->fd;
1949
1950   /* init read buffers */
1951   sock->next_read = sock->next_cmd = 0;
1952   sock->rbuf = malloc(RBUF_SIZE);
1953   if (sock->rbuf == NULL)
1954   {
1955     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1956     close_connection(sock);
1957     return NULL;
1958   }
1959
1960   pthread_mutex_lock (&connection_threads_lock);
1961   {
1962     pthread_t *temp;
1963
1964     temp = (pthread_t *) rrd_realloc (connection_threads,
1965         sizeof (pthread_t) * (connection_threads_num + 1));
1966     if (temp == NULL)
1967     {
1968       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1969     }
1970     else
1971     {
1972       connection_threads = temp;
1973       connection_threads[connection_threads_num] = pthread_self ();
1974       connection_threads_num++;
1975     }
1976   }
1977   pthread_mutex_unlock (&connection_threads_lock);
1978
1979   while (do_shutdown == 0)
1980   {
1981     char *cmd;
1982     ssize_t cmd_len;
1983     ssize_t rbytes;
1984     time_t now;
1985
1986     struct pollfd pollfd;
1987     int status;
1988
1989     pollfd.fd = fd;
1990     pollfd.events = POLLIN | POLLPRI;
1991     pollfd.revents = 0;
1992
1993     status = poll (&pollfd, 1, /* timeout = */ 500);
1994     if (do_shutdown)
1995       break;
1996     else if (status == 0) /* timeout */
1997       continue;
1998     else if (status < 0) /* error */
1999     {
2000       status = errno;
2001       if (status != EINTR)
2002         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2003       continue;
2004     }
2005
2006     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2007       break;
2008     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2009     {
2010       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2011           "poll(2) returned something unexpected: %#04hx",
2012           pollfd.revents);
2013       break;
2014     }
2015
2016     rbytes = read(fd, sock->rbuf + sock->next_read,
2017                   RBUF_SIZE - sock->next_read);
2018     if (rbytes < 0)
2019     {
2020       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2021       break;
2022     }
2023     else if (rbytes == 0)
2024       break; /* eof */
2025
2026     sock->next_read += rbytes;
2027
2028     if (sock->batch_start)
2029       now = sock->batch_start;
2030     else
2031       now = time(NULL);
2032
2033     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2034     {
2035       status = handle_request (sock, now, cmd, cmd_len+1);
2036       if (status != 0)
2037         goto out_close;
2038     }
2039   }
2040
2041 out_close:
2042   close_connection(sock);
2043
2044   /* Remove this thread from the connection threads list */
2045   pthread_mutex_lock (&connection_threads_lock);
2046   {
2047     pthread_t self;
2048     pthread_t *temp;
2049
2050     /* Find out own index in the array */
2051     self = pthread_self ();
2052     for (i = 0; i < connection_threads_num; i++)
2053       if (pthread_equal (connection_threads[i], self) != 0)
2054         break;
2055     assert (i < connection_threads_num);
2056
2057     /* Move the trailing threads forward. */
2058     if (i < (connection_threads_num - 1))
2059     {
2060       memmove (connection_threads + i,
2061                connection_threads + i + 1,
2062                sizeof (pthread_t) * (connection_threads_num - i - 1));
2063     }
2064
2065     connection_threads_num--;
2066
2067     temp = rrd_realloc(connection_threads,
2068                    sizeof(*connection_threads) * connection_threads_num);
2069     if (connection_threads_num > 0 && temp == NULL)
2070       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2071     else
2072       connection_threads = temp;
2073   }
2074   pthread_mutex_unlock (&connection_threads_lock);
2075
2076   return (NULL);
2077 } /* }}} void *connection_thread_main */
2078
2079 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2080 {
2081   int fd;
2082   struct sockaddr_un sa;
2083   listen_socket_t *temp;
2084   int status;
2085   const char *path;
2086
2087   path = sock->addr;
2088   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2089     path += strlen("unix:");
2090
2091   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2092       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2093   if (temp == NULL)
2094   {
2095     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2096     return (-1);
2097   }
2098   listen_fds = temp;
2099   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2100
2101   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2102   if (fd < 0)
2103   {
2104     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2105              rrd_strerror(errno));
2106     return (-1);
2107   }
2108
2109   memset (&sa, 0, sizeof (sa));
2110   sa.sun_family = AF_UNIX;
2111   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2112
2113   /* if we've gotten this far, we own the pid file.  any daemon started
2114    * with the same args must not be alive.  therefore, ensure that we can
2115    * create the socket...
2116    */
2117   unlink(path);
2118
2119   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2120   if (status != 0)
2121   {
2122     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2123              path, rrd_strerror(errno));
2124     close (fd);
2125     return (-1);
2126   }
2127
2128   status = listen (fd, /* backlog = */ 10);
2129   if (status != 0)
2130   {
2131     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2132              path, rrd_strerror(errno));
2133     close (fd);
2134     unlink (path);
2135     return (-1);
2136   }
2137
2138   listen_fds[listen_fds_num].fd = fd;
2139   listen_fds[listen_fds_num].family = PF_UNIX;
2140   strncpy(listen_fds[listen_fds_num].addr, path,
2141           sizeof (listen_fds[listen_fds_num].addr) - 1);
2142   listen_fds_num++;
2143
2144   return (0);
2145 } /* }}} int open_listen_socket_unix */
2146
2147 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2148 {
2149   struct addrinfo ai_hints;
2150   struct addrinfo *ai_res;
2151   struct addrinfo *ai_ptr;
2152   char addr_copy[NI_MAXHOST];
2153   char *addr;
2154   char *port;
2155   int status;
2156
2157   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2158   addr_copy[sizeof (addr_copy) - 1] = 0;
2159   addr = addr_copy;
2160
2161   memset (&ai_hints, 0, sizeof (ai_hints));
2162   ai_hints.ai_flags = 0;
2163 #ifdef AI_ADDRCONFIG
2164   ai_hints.ai_flags |= AI_ADDRCONFIG;
2165 #endif
2166   ai_hints.ai_family = AF_UNSPEC;
2167   ai_hints.ai_socktype = SOCK_STREAM;
2168
2169   port = NULL;
2170   if (*addr == '[') /* IPv6+port format */
2171   {
2172     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2173     addr++;
2174
2175     port = strchr (addr, ']');
2176     if (port == NULL)
2177     {
2178       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2179       return (-1);
2180     }
2181     *port = 0;
2182     port++;
2183
2184     if (*port == ':')
2185       port++;
2186     else if (*port == 0)
2187       port = NULL;
2188     else
2189     {
2190       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2191       return (-1);
2192     }
2193   } /* if (*addr = ']') */
2194   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2195   {
2196     port = rindex(addr, ':');
2197     if (port != NULL)
2198     {
2199       *port = 0;
2200       port++;
2201     }
2202   }
2203   ai_res = NULL;
2204   status = getaddrinfo (addr,
2205                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2206                         &ai_hints, &ai_res);
2207   if (status != 0)
2208   {
2209     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2210              addr, gai_strerror (status));
2211     return (-1);
2212   }
2213
2214   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2215   {
2216     int fd;
2217     listen_socket_t *temp;
2218     int one = 1;
2219
2220     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2221         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2222     if (temp == NULL)
2223     {
2224       fprintf (stderr,
2225                "rrdcached: open_listen_socket_network: realloc failed.\n");
2226       continue;
2227     }
2228     listen_fds = temp;
2229     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2230
2231     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2232     if (fd < 0)
2233     {
2234       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2235                rrd_strerror(errno));
2236       continue;
2237     }
2238
2239     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2240
2241     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2242     if (status != 0)
2243     {
2244       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2245                sock->addr, rrd_strerror(errno));
2246       close (fd);
2247       continue;
2248     }
2249
2250     status = listen (fd, /* backlog = */ 10);
2251     if (status != 0)
2252     {
2253       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2254                sock->addr, rrd_strerror(errno));
2255       close (fd);
2256       freeaddrinfo(ai_res);
2257       return (-1);
2258     }
2259
2260     listen_fds[listen_fds_num].fd = fd;
2261     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2262     listen_fds_num++;
2263   } /* for (ai_ptr) */
2264
2265   freeaddrinfo(ai_res);
2266   return (0);
2267 } /* }}} static int open_listen_socket_network */
2268
2269 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2270 {
2271   assert(sock != NULL);
2272   assert(sock->addr != NULL);
2273
2274   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2275       || sock->addr[0] == '/')
2276     return (open_listen_socket_unix(sock));
2277   else
2278     return (open_listen_socket_network(sock));
2279 } /* }}} int open_listen_socket */
2280
2281 static int close_listen_sockets (void) /* {{{ */
2282 {
2283   size_t i;
2284
2285   for (i = 0; i < listen_fds_num; i++)
2286   {
2287     close (listen_fds[i].fd);
2288
2289     if (listen_fds[i].family == PF_UNIX)
2290       unlink(listen_fds[i].addr);
2291   }
2292
2293   free (listen_fds);
2294   listen_fds = NULL;
2295   listen_fds_num = 0;
2296
2297   return (0);
2298 } /* }}} int close_listen_sockets */
2299
2300 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2301 {
2302   struct pollfd *pollfds;
2303   int pollfds_num;
2304   int status;
2305   int i;
2306
2307   if (listen_fds_num < 1)
2308   {
2309     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2310     return (NULL);
2311   }
2312
2313   pollfds_num = listen_fds_num;
2314   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2315   if (pollfds == NULL)
2316   {
2317     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2318     return (NULL);
2319   }
2320   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2321
2322   RRDD_LOG(LOG_INFO, "listening for connections");
2323
2324   while (do_shutdown == 0)
2325   {
2326     for (i = 0; i < pollfds_num; i++)
2327     {
2328       pollfds[i].fd = listen_fds[i].fd;
2329       pollfds[i].events = POLLIN | POLLPRI;
2330       pollfds[i].revents = 0;
2331     }
2332
2333     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2334     if (do_shutdown)
2335       break;
2336     else if (status == 0) /* timeout */
2337       continue;
2338     else if (status < 0) /* error */
2339     {
2340       status = errno;
2341       if (status != EINTR)
2342       {
2343         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2344       }
2345       continue;
2346     }
2347
2348     for (i = 0; i < pollfds_num; i++)
2349     {
2350       listen_socket_t *client_sock;
2351       struct sockaddr_storage client_sa;
2352       socklen_t client_sa_size;
2353       pthread_t tid;
2354       pthread_attr_t attr;
2355
2356       if (pollfds[i].revents == 0)
2357         continue;
2358
2359       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2360       {
2361         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2362             "poll(2) returned something unexpected for listen FD #%i.",
2363             pollfds[i].fd);
2364         continue;
2365       }
2366
2367       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2368       if (client_sock == NULL)
2369       {
2370         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2371         continue;
2372       }
2373       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2374
2375       client_sa_size = sizeof (client_sa);
2376       client_sock->fd = accept (pollfds[i].fd,
2377           (struct sockaddr *) &client_sa, &client_sa_size);
2378       if (client_sock->fd < 0)
2379       {
2380         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2381         free(client_sock);
2382         continue;
2383       }
2384
2385       pthread_attr_init (&attr);
2386       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2387
2388       status = pthread_create (&tid, &attr, connection_thread_main,
2389                                client_sock);
2390       if (status != 0)
2391       {
2392         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2393         close_connection(client_sock);
2394         continue;
2395       }
2396     } /* for (pollfds_num) */
2397   } /* while (do_shutdown == 0) */
2398
2399   RRDD_LOG(LOG_INFO, "starting shutdown");
2400
2401   close_listen_sockets ();
2402
2403   pthread_mutex_lock (&connection_threads_lock);
2404   while (connection_threads_num > 0)
2405   {
2406     pthread_t wait_for;
2407
2408     wait_for = connection_threads[0];
2409
2410     pthread_mutex_unlock (&connection_threads_lock);
2411     pthread_join (wait_for, /* retval = */ NULL);
2412     pthread_mutex_lock (&connection_threads_lock);
2413   }
2414   pthread_mutex_unlock (&connection_threads_lock);
2415
2416   free(pollfds);
2417
2418   return (NULL);
2419 } /* }}} void *listen_thread_main */
2420
2421 static int daemonize (void) /* {{{ */
2422 {
2423   int pid_fd;
2424   char *base_dir;
2425
2426   daemon_uid = geteuid();
2427
2428   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2429   if (pid_fd < 0)
2430     pid_fd = check_pidfile();
2431   if (pid_fd < 0)
2432     return pid_fd;
2433
2434   /* open all the listen sockets */
2435   if (config_listen_address_list_len > 0)
2436   {
2437     for (int i = 0; i < config_listen_address_list_len; i++)
2438     {
2439       open_listen_socket (config_listen_address_list[i]);
2440       free_listen_socket (config_listen_address_list[i]);
2441     }
2442
2443     free(config_listen_address_list);
2444   }
2445   else
2446   {
2447     listen_socket_t sock;
2448     memset(&sock, 0, sizeof(sock));
2449     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2450     open_listen_socket (&sock);
2451   }
2452
2453   if (listen_fds_num < 1)
2454   {
2455     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2456     goto error;
2457   }
2458
2459   if (!stay_foreground)
2460   {
2461     pid_t child;
2462
2463     child = fork ();
2464     if (child < 0)
2465     {
2466       fprintf (stderr, "daemonize: fork(2) failed.\n");
2467       goto error;
2468     }
2469     else if (child > 0)
2470       exit(0);
2471
2472     /* Become session leader */
2473     setsid ();
2474
2475     /* Open the first three file descriptors to /dev/null */
2476     close (2);
2477     close (1);
2478     close (0);
2479
2480     open ("/dev/null", O_RDWR);
2481     dup (0);
2482     dup (0);
2483   } /* if (!stay_foreground) */
2484
2485   /* Change into the /tmp directory. */
2486   base_dir = (config_base_dir != NULL)
2487     ? config_base_dir
2488     : "/tmp";
2489
2490   if (chdir (base_dir) != 0)
2491   {
2492     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2493     goto error;
2494   }
2495
2496   install_signal_handlers();
2497
2498   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2499   RRDD_LOG(LOG_INFO, "starting up");
2500
2501   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2502                                 (GDestroyNotify) free_cache_item);
2503   if (cache_tree == NULL)
2504   {
2505     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2506     goto error;
2507   }
2508
2509   return write_pidfile (pid_fd);
2510
2511 error:
2512   remove_pidfile();
2513   return -1;
2514 } /* }}} int daemonize */
2515
2516 static int cleanup (void) /* {{{ */
2517 {
2518   do_shutdown++;
2519
2520   pthread_cond_broadcast (&flush_cond);
2521   pthread_join (flush_thread, NULL);
2522
2523   pthread_cond_broadcast (&queue_cond);
2524   for (int i = 0; i < config_queue_threads; i++)
2525     pthread_join (queue_threads[i], NULL);
2526
2527   if (config_flush_at_shutdown)
2528   {
2529     assert(cache_queue_head == NULL);
2530     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2531   }
2532
2533   journal_done();
2534   remove_pidfile ();
2535
2536   free(queue_threads);
2537   free(config_base_dir);
2538   free(config_pid_file);
2539   free(journal_cur);
2540   free(journal_old);
2541
2542   pthread_mutex_lock(&cache_lock);
2543   g_tree_destroy(cache_tree);
2544
2545   RRDD_LOG(LOG_INFO, "goodbye");
2546   closelog ();
2547
2548   return (0);
2549 } /* }}} int cleanup */
2550
2551 static int read_options (int argc, char **argv) /* {{{ */
2552 {
2553   int option;
2554   int status = 0;
2555
2556   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2557   {
2558     switch (option)
2559     {
2560       case 'g':
2561         stay_foreground=1;
2562         break;
2563
2564       case 'L':
2565       case 'l':
2566       {
2567         listen_socket_t **temp;
2568         listen_socket_t *new;
2569
2570         new = malloc(sizeof(listen_socket_t));
2571         if (new == NULL)
2572         {
2573           fprintf(stderr, "read_options: malloc failed.\n");
2574           return(2);
2575         }
2576         memset(new, 0, sizeof(listen_socket_t));
2577
2578         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2579             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2580         if (temp == NULL)
2581         {
2582           fprintf (stderr, "read_options: realloc failed.\n");
2583           return (2);
2584         }
2585         config_listen_address_list = temp;
2586
2587         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2588         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2589
2590         temp[config_listen_address_list_len] = new;
2591         config_listen_address_list_len++;
2592       }
2593       break;
2594
2595       case 'f':
2596       {
2597         int temp;
2598
2599         temp = atoi (optarg);
2600         if (temp > 0)
2601           config_flush_interval = temp;
2602         else
2603         {
2604           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2605           status = 3;
2606         }
2607       }
2608       break;
2609
2610       case 'w':
2611       {
2612         int temp;
2613
2614         temp = atoi (optarg);
2615         if (temp > 0)
2616           config_write_interval = temp;
2617         else
2618         {
2619           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2620           status = 2;
2621         }
2622       }
2623       break;
2624
2625       case 'z':
2626       {
2627         int temp;
2628
2629         temp = atoi(optarg);
2630         if (temp > 0)
2631           config_write_jitter = temp;
2632         else
2633         {
2634           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2635           status = 2;
2636         }
2637
2638         break;
2639       }
2640
2641       case 't':
2642       {
2643         int threads;
2644         threads = atoi(optarg);
2645         if (threads >= 1)
2646           config_queue_threads = threads;
2647         else
2648         {
2649           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2650           return 1;
2651         }
2652       }
2653       break;
2654
2655       case 'B':
2656         config_write_base_only = 1;
2657         break;
2658
2659       case 'b':
2660       {
2661         size_t len;
2662         char base_realpath[PATH_MAX];
2663
2664         if (config_base_dir != NULL)
2665           free (config_base_dir);
2666         config_base_dir = strdup (optarg);
2667         if (config_base_dir == NULL)
2668         {
2669           fprintf (stderr, "read_options: strdup failed.\n");
2670           return (3);
2671         }
2672
2673         /* make sure that the base directory is not resolved via
2674          * symbolic links.  this makes some performance-enhancing
2675          * assumptions possible (we don't have to resolve paths
2676          * that start with a "/")
2677          */
2678         if (realpath(config_base_dir, base_realpath) == NULL)
2679         {
2680           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2681           return 5;
2682         }
2683         else if (strncmp(config_base_dir,
2684                          base_realpath, sizeof(base_realpath)) != 0)
2685         {
2686           fprintf(stderr,
2687                   "Base directory (-b) resolved via file system links!\n"
2688                   "Please consult rrdcached '-b' documentation!\n"
2689                   "Consider specifying the real directory (%s)\n",
2690                   base_realpath);
2691           return 5;
2692         }
2693
2694         len = strlen (config_base_dir);
2695         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2696         {
2697           config_base_dir[len - 1] = 0;
2698           len--;
2699         }
2700
2701         if (len < 1)
2702         {
2703           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2704           return (4);
2705         }
2706
2707         _config_base_dir_len = len;
2708       }
2709       break;
2710
2711       case 'p':
2712       {
2713         if (config_pid_file != NULL)
2714           free (config_pid_file);
2715         config_pid_file = strdup (optarg);
2716         if (config_pid_file == NULL)
2717         {
2718           fprintf (stderr, "read_options: strdup failed.\n");
2719           return (3);
2720         }
2721       }
2722       break;
2723
2724       case 'F':
2725         config_flush_at_shutdown = 1;
2726         break;
2727
2728       case 'j':
2729       {
2730         struct stat statbuf;
2731         const char *dir = optarg;
2732
2733         status = stat(dir, &statbuf);
2734         if (status != 0)
2735         {
2736           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2737           return 6;
2738         }
2739
2740         if (!S_ISDIR(statbuf.st_mode)
2741             || access(dir, R_OK|W_OK|X_OK) != 0)
2742         {
2743           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2744                   errno ? rrd_strerror(errno) : "");
2745           return 6;
2746         }
2747
2748         journal_cur = malloc(PATH_MAX + 1);
2749         journal_old = malloc(PATH_MAX + 1);
2750         if (journal_cur == NULL || journal_old == NULL)
2751         {
2752           fprintf(stderr, "malloc failure for journal files\n");
2753           return 6;
2754         }
2755         else 
2756         {
2757           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2758           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2759         }
2760       }
2761       break;
2762
2763       case 'h':
2764       case '?':
2765         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2766             "\n"
2767             "Usage: rrdcached [options]\n"
2768             "\n"
2769             "Valid options are:\n"
2770             "  -l <address>  Socket address to listen to.\n"
2771             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2772             "  -w <seconds>  Interval in which to write data.\n"
2773             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2774             "  -t <threads>  Number of write threads.\n"
2775             "  -f <seconds>  Interval in which to flush dead data.\n"
2776             "  -p <file>     Location of the PID-file.\n"
2777             "  -b <dir>      Base directory to change to.\n"
2778             "  -B            Restrict file access to paths within -b <dir>\n"
2779             "  -g            Do not fork and run in the foreground.\n"
2780             "  -j <dir>      Directory in which to create the journal files.\n"
2781             "  -F            Always flush all updates at shutdown\n"
2782             "\n"
2783             "For more information and a detailed description of all options "
2784             "please refer\n"
2785             "to the rrdcached(1) manual page.\n",
2786             VERSION);
2787         status = -1;
2788         break;
2789     } /* switch (option) */
2790   } /* while (getopt) */
2791
2792   /* advise the user when values are not sane */
2793   if (config_flush_interval < 2 * config_write_interval)
2794     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2795             " 2x write interval (-w) !\n");
2796   if (config_write_jitter > config_write_interval)
2797     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2798             " write interval (-w) !\n");
2799
2800   if (config_write_base_only && config_base_dir == NULL)
2801     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2802             "  Consult the rrdcached documentation\n");
2803
2804   if (journal_cur == NULL)
2805     config_flush_at_shutdown = 1;
2806
2807   return (status);
2808 } /* }}} int read_options */
2809
2810 int main (int argc, char **argv)
2811 {
2812   int status;
2813
2814   status = read_options (argc, argv);
2815   if (status != 0)
2816   {
2817     if (status < 0)
2818       status = 0;
2819     return (status);
2820   }
2821
2822   status = daemonize ();
2823   if (status != 0)
2824   {
2825     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2826     return (1);
2827   }
2828
2829   journal_init();
2830
2831   /* start the queue threads */
2832   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2833   if (queue_threads == NULL)
2834   {
2835     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2836     cleanup();
2837     return (1);
2838   }
2839   for (int i = 0; i < config_queue_threads; i++)
2840   {
2841     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2842     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2843     if (status != 0)
2844     {
2845       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2846       cleanup();
2847       return (1);
2848     }
2849   }
2850
2851   /* start the flush thread */
2852   memset(&flush_thread, 0, sizeof(flush_thread));
2853   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2854   if (status != 0)
2855   {
2856     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2857     cleanup();
2858     return (1);
2859   }
2860
2861   listen_thread_main (NULL);
2862   cleanup ();
2863
2864   return (0);
2865 } /* int main */
2866
2867 /*
2868  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2869  */