* Open all listen sockets in daemonize(), while we still have stderr.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
67
68 #include <stdlib.h>
69 #include <stdint.h>
70 #include <stdio.h>
71 #include <unistd.h>
72 #include <string.h>
73 #include <strings.h>
74 #include <stdint.h>
75 #include <inttypes.h>
76
77 #include <sys/types.h>
78 #include <sys/stat.h>
79 #include <fcntl.h>
80 #include <signal.h>
81 #include <sys/socket.h>
82 #include <sys/un.h>
83 #include <netdb.h>
84 #include <poll.h>
85 #include <syslog.h>
86 #include <pthread.h>
87 #include <errno.h>
88 #include <assert.h>
89 #include <sys/time.h>
90 #include <time.h>
91
92 #include <glib-2.0/glib.h>
93 /* }}} */
94
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
96
97 #ifndef __GNUC__
98 # define __attribute__(x) /**/
99 #endif
100
101 /*
102  * Types
103  */
104 typedef enum
105 {
106   PRIV_LOW,
107   PRIV_HIGH
108 } socket_privilege;
109
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
111
112 struct listen_socket_s
113 {
114   int fd;
115   char addr[PATH_MAX + 1];
116   int family;
117   socket_privilege privilege;
118
119   /* state for BATCH processing */
120   time_t batch_start;
121   int batch_cmd;
122
123   /* buffered IO */
124   char *rbuf;
125   off_t next_cmd;
126   off_t next_read;
127
128   char *wbuf;
129   ssize_t wbuf_len;
130 };
131 typedef struct listen_socket_s listen_socket_t;
132
133 struct cache_item_s;
134 typedef struct cache_item_s cache_item_t;
135 struct cache_item_s
136 {
137   char *file;
138   char **values;
139   int values_num;
140   time_t last_flush_time;
141   time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE  (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
144   int flags;
145   pthread_cond_t  flushed;
146   cache_item_t *prev;
147   cache_item_t *next;
148 };
149
150 struct callback_flush_data_s
151 {
152   time_t now;
153   time_t abs_timeout;
154   char **keys;
155   size_t keys_num;
156 };
157 typedef struct callback_flush_data_s callback_flush_data_t;
158
159 enum queue_side_e
160 {
161   HEAD,
162   TAIL
163 };
164 typedef enum queue_side_e queue_side_t;
165
166 /* max length of socket command or response */
167 #define CMD_MAX 4096
168 #define RBUF_SIZE (CMD_MAX*2)
169
170 /*
171  * Variables
172  */
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
175
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
178
179 static int do_shutdown = 0;
180
181 static pthread_t queue_thread;
182
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
186
187 /* Cache stuff */
188 static GTree          *cache_tree = NULL;
189 static cache_item_t   *cache_queue_head = NULL;
190 static cache_item_t   *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
193
194 static int config_write_interval = 300;
195 static int config_write_jitter   = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
202
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
205
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
214
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
223
224 /* 
225  * Functions
226  */
227 static void sig_common (const char *sig) /* {{{ */
228 {
229   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
230   do_shutdown++;
231   pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
233
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
235 {
236   sig_common("INT");
237 } /* }}} void sig_int_handler */
238
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
240 {
241   sig_common("TERM");
242 } /* }}} void sig_term_handler */
243
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
245 {
246   config_flush_at_shutdown = 1;
247   sig_common("USR1");
248 } /* }}} void sig_usr1_handler */
249
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
251 {
252   config_flush_at_shutdown = 0;
253   sig_common("USR2");
254 } /* }}} void sig_usr2_handler */
255
256 static void install_signal_handlers(void) /* {{{ */
257 {
258   /* These structures are static, because `sigaction' behaves weird if the are
259    * overwritten.. */
260   static struct sigaction sa_int;
261   static struct sigaction sa_term;
262   static struct sigaction sa_pipe;
263   static struct sigaction sa_usr1;
264   static struct sigaction sa_usr2;
265
266   /* Install signal handlers */
267   memset (&sa_int, 0, sizeof (sa_int));
268   sa_int.sa_handler = sig_int_handler;
269   sigaction (SIGINT, &sa_int, NULL);
270
271   memset (&sa_term, 0, sizeof (sa_term));
272   sa_term.sa_handler = sig_term_handler;
273   sigaction (SIGTERM, &sa_term, NULL);
274
275   memset (&sa_pipe, 0, sizeof (sa_pipe));
276   sa_pipe.sa_handler = SIG_IGN;
277   sigaction (SIGPIPE, &sa_pipe, NULL);
278
279   memset (&sa_pipe, 0, sizeof (sa_usr1));
280   sa_usr1.sa_handler = sig_usr1_handler;
281   sigaction (SIGUSR1, &sa_usr1, NULL);
282
283   memset (&sa_usr2, 0, sizeof (sa_usr2));
284   sa_usr2.sa_handler = sig_usr2_handler;
285   sigaction (SIGUSR2, &sa_usr2, NULL);
286
287 } /* }}} void install_signal_handlers */
288
289 static int open_pidfile(char *action, int oflag) /* {{{ */
290 {
291   int fd;
292   char *file;
293
294   file = (config_pid_file != NULL)
295     ? config_pid_file
296     : LOCALSTATEDIR "/run/rrdcached.pid";
297
298   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
299   if (fd < 0)
300     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
301             action, file, rrd_strerror(errno));
302
303   return(fd);
304 } /* }}} static int open_pidfile */
305
306 /* check existing pid file to see whether a daemon is running */
307 static int check_pidfile(void)
308 {
309   int pid_fd;
310   pid_t pid;
311   char pid_str[16];
312
313   pid_fd = open_pidfile("open", O_RDWR);
314   if (pid_fd < 0)
315     return pid_fd;
316
317   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
318     return -1;
319
320   pid = atoi(pid_str);
321   if (pid <= 0)
322     return -1;
323
324   /* another running process that we can signal COULD be
325    * a competing rrdcached */
326   if (pid != getpid() && kill(pid, 0) == 0)
327   {
328     fprintf(stderr,
329             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
330     close(pid_fd);
331     return -1;
332   }
333
334   lseek(pid_fd, 0, SEEK_SET);
335   ftruncate(pid_fd, 0);
336
337   fprintf(stderr,
338           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
339           "rrdcached: starting normally.\n", pid);
340
341   return pid_fd;
342 } /* }}} static int check_pidfile */
343
344 static int write_pidfile (int fd) /* {{{ */
345 {
346   pid_t pid;
347   FILE *fh;
348
349   pid = getpid ();
350
351   fh = fdopen (fd, "w");
352   if (fh == NULL)
353   {
354     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
355     close(fd);
356     return (-1);
357   }
358
359   fprintf (fh, "%i\n", (int) pid);
360   fclose (fh);
361
362   return (0);
363 } /* }}} int write_pidfile */
364
365 static int remove_pidfile (void) /* {{{ */
366 {
367   char *file;
368   int status;
369
370   file = (config_pid_file != NULL)
371     ? config_pid_file
372     : LOCALSTATEDIR "/run/rrdcached.pid";
373
374   status = unlink (file);
375   if (status == 0)
376     return (0);
377   return (errno);
378 } /* }}} int remove_pidfile */
379
380 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
381 {
382   char *eol;
383
384   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
385                sock->next_read - sock->next_cmd);
386
387   if (eol == NULL)
388   {
389     /* no commands left, move remainder back to front of rbuf */
390     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
391             sock->next_read - sock->next_cmd);
392     sock->next_read -= sock->next_cmd;
393     sock->next_cmd = 0;
394     *len = 0;
395     return NULL;
396   }
397   else
398   {
399     char *cmd = sock->rbuf + sock->next_cmd;
400     *eol = '\0';
401
402     sock->next_cmd = eol - sock->rbuf + 1;
403
404     if (eol > sock->rbuf && *(eol-1) == '\r')
405       *(--eol) = '\0'; /* handle "\r\n" EOL */
406
407     *len = eol - cmd;
408
409     return cmd;
410   }
411
412   /* NOTREACHED */
413   assert(1==0);
414 }
415
416 /* add the characters directly to the write buffer */
417 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
418 {
419   char *new_buf;
420
421   assert(sock != NULL);
422
423   new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
424   if (new_buf == NULL)
425   {
426     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
427     return -1;
428   }
429
430   strncpy(new_buf + sock->wbuf_len, str, len + 1);
431
432   sock->wbuf = new_buf;
433   sock->wbuf_len += len;
434
435   return 0;
436 } /* }}} static int add_to_wbuf */
437
438 /* add the text to the "extra" info that's sent after the status line */
439 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
440 {
441   va_list argp;
442   char buffer[CMD_MAX];
443   int len;
444
445   if (sock == NULL) return 0; /* journal replay mode */
446   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
447
448   va_start(argp, fmt);
449 #ifdef HAVE_VSNPRINTF
450   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
451 #else
452   len = vsprintf(buffer, fmt, argp);
453 #endif
454   va_end(argp);
455   if (len < 0)
456   {
457     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
458     return -1;
459   }
460
461   return add_to_wbuf(sock, buffer, len);
462 } /* }}} static int add_response_info */
463
464 static int count_lines(char *str) /* {{{ */
465 {
466   int lines = 0;
467
468   if (str != NULL)
469   {
470     while ((str = strchr(str, '\n')) != NULL)
471     {
472       ++lines;
473       ++str;
474     }
475   }
476
477   return lines;
478 } /* }}} static int count_lines */
479
480 /* send the response back to the user.
481  * returns 0 on success, -1 on error
482  * write buffer is always zeroed after this call */
483 static int send_response (listen_socket_t *sock, response_code rc,
484                           char *fmt, ...) /* {{{ */
485 {
486   va_list argp;
487   char buffer[CMD_MAX];
488   int lines;
489   ssize_t wrote;
490   int rclen, len;
491
492   if (sock == NULL) return rc;  /* journal replay mode */
493
494   if (sock->batch_start)
495   {
496     if (rc == RESP_OK)
497       return rc; /* no response on success during BATCH */
498     lines = sock->batch_cmd;
499   }
500   else if (rc == RESP_OK)
501     lines = count_lines(sock->wbuf);
502   else
503     lines = -1;
504
505   rclen = sprintf(buffer, "%d ", lines);
506   va_start(argp, fmt);
507 #ifdef HAVE_VSNPRINTF
508   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
509 #else
510   len = vsprintf(buffer+rclen, fmt, argp);
511 #endif
512   va_end(argp);
513   if (len < 0)
514     return -1;
515
516   len += rclen;
517
518   /* append the result to the wbuf, don't write to the user */
519   if (sock->batch_start)
520     return add_to_wbuf(sock, buffer, len);
521
522   /* first write must be complete */
523   if (len != write(sock->fd, buffer, len))
524   {
525     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
526     return -1;
527   }
528
529   if (sock->wbuf != NULL && rc == RESP_OK)
530   {
531     wrote = 0;
532     while (wrote < sock->wbuf_len)
533     {
534       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
535       if (wb <= 0)
536       {
537         RRDD_LOG(LOG_INFO, "send_response: could not write results");
538         return -1;
539       }
540       wrote += wb;
541     }
542   }
543
544   free(sock->wbuf); sock->wbuf = NULL;
545   sock->wbuf_len = 0;
546
547   return 0;
548 } /* }}} */
549
550 static void wipe_ci_values(cache_item_t *ci, time_t when)
551 {
552   ci->values = NULL;
553   ci->values_num = 0;
554
555   ci->last_flush_time = when;
556   if (config_write_jitter > 0)
557     ci->last_flush_time += (random() % config_write_jitter);
558 }
559
560 /* remove_from_queue
561  * remove a "cache_item_t" item from the queue.
562  * must hold 'cache_lock' when calling this
563  */
564 static void remove_from_queue(cache_item_t *ci) /* {{{ */
565 {
566   if (ci == NULL) return;
567
568   if (ci->prev == NULL)
569     cache_queue_head = ci->next; /* reset head */
570   else
571     ci->prev->next = ci->next;
572
573   if (ci->next == NULL)
574     cache_queue_tail = ci->prev; /* reset the tail */
575   else
576     ci->next->prev = ci->prev;
577
578   ci->next = ci->prev = NULL;
579   ci->flags &= ~CI_FLAGS_IN_QUEUE;
580 } /* }}} static void remove_from_queue */
581
582 /* remove an entry from the tree and free all its resources.
583  * must hold 'cache lock' while calling this.
584  * returns 0 on success, otherwise errno */
585 static int forget_file(const char *file)
586 {
587   cache_item_t *ci;
588
589   ci = g_tree_lookup(cache_tree, file);
590   if (ci == NULL)
591     return ENOENT;
592
593   g_tree_remove (cache_tree, file);
594   remove_from_queue(ci);
595
596   for (int i=0; i < ci->values_num; i++)
597     free(ci->values[i]);
598
599   free (ci->values);
600   free (ci->file);
601
602   /* in case anyone is waiting */
603   pthread_cond_broadcast(&ci->flushed);
604
605   free (ci);
606
607   return 0;
608 } /* }}} static int forget_file */
609
610 /*
611  * enqueue_cache_item:
612  * `cache_lock' must be acquired before calling this function!
613  */
614 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
615     queue_side_t side)
616 {
617   if (ci == NULL)
618     return (-1);
619
620   if (ci->values_num == 0)
621     return (0);
622
623   if (side == HEAD)
624   {
625     if (cache_queue_head == ci)
626       return 0;
627
628     /* remove from the double linked list */
629     if (ci->flags & CI_FLAGS_IN_QUEUE)
630       remove_from_queue(ci);
631
632     ci->prev = NULL;
633     ci->next = cache_queue_head;
634     if (ci->next != NULL)
635       ci->next->prev = ci;
636     cache_queue_head = ci;
637
638     if (cache_queue_tail == NULL)
639       cache_queue_tail = cache_queue_head;
640   }
641   else /* (side == TAIL) */
642   {
643     /* We don't move values back in the list.. */
644     if (ci->flags & CI_FLAGS_IN_QUEUE)
645       return (0);
646
647     assert (ci->next == NULL);
648     assert (ci->prev == NULL);
649
650     ci->prev = cache_queue_tail;
651
652     if (cache_queue_tail == NULL)
653       cache_queue_head = ci;
654     else
655       cache_queue_tail->next = ci;
656
657     cache_queue_tail = ci;
658   }
659
660   ci->flags |= CI_FLAGS_IN_QUEUE;
661
662   pthread_cond_broadcast(&cache_cond);
663   pthread_mutex_lock (&stats_lock);
664   stats_queue_length++;
665   pthread_mutex_unlock (&stats_lock);
666
667   return (0);
668 } /* }}} int enqueue_cache_item */
669
670 /*
671  * tree_callback_flush:
672  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
673  * while this is in progress.
674  */
675 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
676     gpointer data)
677 {
678   cache_item_t *ci;
679   callback_flush_data_t *cfd;
680
681   ci = (cache_item_t *) value;
682   cfd = (callback_flush_data_t *) data;
683
684   if ((ci->last_flush_time <= cfd->abs_timeout)
685       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
686       && (ci->values_num > 0))
687   {
688     enqueue_cache_item (ci, TAIL);
689   }
690   else if ((do_shutdown != 0)
691       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
697       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
698       && (ci->values_num <= 0))
699   {
700     char **temp;
701
702     temp = (char **) realloc (cfd->keys,
703         sizeof (char *) * (cfd->keys_num + 1));
704     if (temp == NULL)
705     {
706       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
707       return (FALSE);
708     }
709     cfd->keys = temp;
710     /* Make really sure this points to the _same_ place */
711     assert ((char *) key == ci->file);
712     cfd->keys[cfd->keys_num] = (char *) key;
713     cfd->keys_num++;
714   }
715
716   return (FALSE);
717 } /* }}} gboolean tree_callback_flush */
718
719 static int flush_old_values (int max_age)
720 {
721   callback_flush_data_t cfd;
722   size_t k;
723
724   memset (&cfd, 0, sizeof (cfd));
725   /* Pass the current time as user data so that we don't need to call
726    * `time' for each node. */
727   cfd.now = time (NULL);
728   cfd.keys = NULL;
729   cfd.keys_num = 0;
730
731   if (max_age > 0)
732     cfd.abs_timeout = cfd.now - max_age;
733   else
734     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
735
736   /* `tree_callback_flush' will return the keys of all values that haven't
737    * been touched in the last `config_flush_interval' seconds in `cfd'.
738    * The char*'s in this array point to the same memory as ci->file, so we
739    * don't need to free them separately. */
740   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
741
742   for (k = 0; k < cfd.keys_num; k++)
743   {
744     /* should never fail, since we have held the cache_lock
745      * the entire time */
746     assert( forget_file(cfd.keys[k]) == 0 );
747   }
748
749   if (cfd.keys != NULL)
750   {
751     free (cfd.keys);
752     cfd.keys = NULL;
753   }
754
755   return (0);
756 } /* int flush_old_values */
757
758 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
759 {
760   struct timeval now;
761   struct timespec next_flush;
762   int final_flush = 0; /* make sure we only flush once on shutdown */
763
764   gettimeofday (&now, NULL);
765   next_flush.tv_sec = now.tv_sec + config_flush_interval;
766   next_flush.tv_nsec = 1000 * now.tv_usec;
767
768   pthread_mutex_lock (&cache_lock);
769   while ((do_shutdown == 0) || (cache_queue_head != NULL))
770   {
771     cache_item_t *ci;
772     char *file;
773     char **values;
774     int values_num;
775     int status;
776     int i;
777
778     /* First, check if it's time to do the cache flush. */
779     gettimeofday (&now, NULL);
780     if ((now.tv_sec > next_flush.tv_sec)
781         || ((now.tv_sec == next_flush.tv_sec)
782           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
783     {
784       /* Flush all values that haven't been written in the last
785        * `config_write_interval' seconds. */
786       flush_old_values (config_write_interval);
787
788       /* Determine the time of the next cache flush. */
789       next_flush.tv_sec =
790         now.tv_sec + next_flush.tv_sec % config_flush_interval;
791
792       /* unlock the cache while we rotate so we don't block incoming
793        * updates if the fsync() blocks on disk I/O */
794       pthread_mutex_unlock(&cache_lock);
795       journal_rotate();
796       pthread_mutex_lock(&cache_lock);
797     }
798
799     /* Now, check if there's something to store away. If not, wait until
800      * something comes in or it's time to do the cache flush.  if we are
801      * shutting down, do not wait around.  */
802     if (cache_queue_head == NULL && !do_shutdown)
803     {
804       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
805       if ((status != 0) && (status != ETIMEDOUT))
806       {
807         RRDD_LOG (LOG_ERR, "queue_thread_main: "
808             "pthread_cond_timedwait returned %i.", status);
809       }
810     }
811
812     /* We're about to shut down */
813     if (do_shutdown != 0 && !final_flush++)
814     {
815       if (config_flush_at_shutdown)
816         flush_old_values (-1); /* flush everything */
817       else
818         break;
819     }
820
821     /* Check if a value has arrived. This may be NULL if we timed out or there
822      * was an interrupt such as a signal. */
823     if (cache_queue_head == NULL)
824       continue;
825
826     ci = cache_queue_head;
827
828     /* copy the relevant parts */
829     file = strdup (ci->file);
830     if (file == NULL)
831     {
832       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
833       continue;
834     }
835
836     assert(ci->values != NULL);
837     assert(ci->values_num > 0);
838
839     values = ci->values;
840     values_num = ci->values_num;
841
842     wipe_ci_values(ci, time(NULL));
843     remove_from_queue(ci);
844
845     pthread_mutex_lock (&stats_lock);
846     assert (stats_queue_length > 0);
847     stats_queue_length--;
848     pthread_mutex_unlock (&stats_lock);
849
850     pthread_mutex_unlock (&cache_lock);
851
852     rrd_clear_error ();
853     status = rrd_update_r (file, NULL, values_num, (void *) values);
854     if (status != 0)
855     {
856       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
857           "rrd_update_r (%s) failed with status %i. (%s)",
858           file, status, rrd_get_error());
859     }
860
861     journal_write("wrote", file);
862     pthread_cond_broadcast(&ci->flushed);
863
864     for (i = 0; i < values_num; i++)
865       free (values[i]);
866
867     free(values);
868     free(file);
869
870     if (status == 0)
871     {
872       pthread_mutex_lock (&stats_lock);
873       stats_updates_written++;
874       stats_data_sets_written += values_num;
875       pthread_mutex_unlock (&stats_lock);
876     }
877
878     pthread_mutex_lock (&cache_lock);
879
880     /* We're about to shut down */
881     if (do_shutdown != 0 && !final_flush++)
882     {
883       if (config_flush_at_shutdown)
884           flush_old_values (-1); /* flush everything */
885       else
886         break;
887     }
888   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
889   pthread_mutex_unlock (&cache_lock);
890
891   if (config_flush_at_shutdown)
892   {
893     assert(cache_queue_head == NULL);
894     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
895   }
896
897   journal_done();
898
899   return (NULL);
900 } /* }}} void *queue_thread_main */
901
902 static int buffer_get_field (char **buffer_ret, /* {{{ */
903     size_t *buffer_size_ret, char **field_ret)
904 {
905   char *buffer;
906   size_t buffer_pos;
907   size_t buffer_size;
908   char *field;
909   size_t field_size;
910   int status;
911
912   buffer = *buffer_ret;
913   buffer_pos = 0;
914   buffer_size = *buffer_size_ret;
915   field = *buffer_ret;
916   field_size = 0;
917
918   if (buffer_size <= 0)
919     return (-1);
920
921   /* This is ensured by `handle_request'. */
922   assert (buffer[buffer_size - 1] == '\0');
923
924   status = -1;
925   while (buffer_pos < buffer_size)
926   {
927     /* Check for end-of-field or end-of-buffer */
928     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
929     {
930       field[field_size] = 0;
931       field_size++;
932       buffer_pos++;
933       status = 0;
934       break;
935     }
936     /* Handle escaped characters. */
937     else if (buffer[buffer_pos] == '\\')
938     {
939       if (buffer_pos >= (buffer_size - 1))
940         break;
941       buffer_pos++;
942       field[field_size] = buffer[buffer_pos];
943       field_size++;
944       buffer_pos++;
945     }
946     /* Normal operation */ 
947     else
948     {
949       field[field_size] = buffer[buffer_pos];
950       field_size++;
951       buffer_pos++;
952     }
953   } /* while (buffer_pos < buffer_size) */
954
955   if (status != 0)
956     return (status);
957
958   *buffer_ret = buffer + buffer_pos;
959   *buffer_size_ret = buffer_size - buffer_pos;
960   *field_ret = field;
961
962   return (0);
963 } /* }}} int buffer_get_field */
964
965 /* if we're restricting writes to the base directory,
966  * check whether the file falls within the dir
967  * returns 1 if OK, otherwise 0
968  */
969 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
970 {
971   assert(file != NULL);
972
973   if (!config_write_base_only
974       || sock == NULL /* journal replay */
975       || config_base_dir == NULL)
976     return 1;
977
978   if (strstr(file, "../") != NULL) goto err;
979
980   /* relative paths without "../" are ok */
981   if (*file != '/') return 1;
982
983   /* file must be of the format base + "/" + <1+ char filename> */
984   if (strlen(file) < _config_base_dir_len + 2) goto err;
985   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
986   if (*(file + _config_base_dir_len) != '/') goto err;
987
988   return 1;
989
990 err:
991   if (sock != NULL && sock->fd >= 0)
992     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
993
994   return 0;
995 } /* }}} static int check_file_access */
996
997 /* when using a base dir, convert relative paths to absolute paths.
998  * if necessary, modifies the "filename" pointer to point
999  * to the new path created in "tmp".  "tmp" is provided
1000  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1001  *
1002  * this allows us to optimize for the expected case (absolute path)
1003  * with a no-op.
1004  */
1005 static void get_abs_path(char **filename, char *tmp)
1006 {
1007   assert(tmp != NULL);
1008   assert(filename != NULL && *filename != NULL);
1009
1010   if (config_base_dir == NULL || **filename == '/')
1011     return;
1012
1013   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1014   *filename = tmp;
1015 } /* }}} static int get_abs_path */
1016
1017 /* returns 1 if we have the required privilege level,
1018  * otherwise issue an error to the user on sock */
1019 static int has_privilege (listen_socket_t *sock, /* {{{ */
1020                           socket_privilege priv)
1021 {
1022   if (sock == NULL) /* journal replay */
1023     return 1;
1024
1025   if (sock->privilege >= priv)
1026     return 1;
1027
1028   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1029 } /* }}} static int has_privilege */
1030
1031 static int flush_file (const char *filename) /* {{{ */
1032 {
1033   cache_item_t *ci;
1034
1035   pthread_mutex_lock (&cache_lock);
1036
1037   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1038   if (ci == NULL)
1039   {
1040     pthread_mutex_unlock (&cache_lock);
1041     return (ENOENT);
1042   }
1043
1044   if (ci->values_num > 0)
1045   {
1046     /* Enqueue at head */
1047     enqueue_cache_item (ci, HEAD);
1048     pthread_cond_wait(&ci->flushed, &cache_lock);
1049   }
1050
1051   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1052    * may have been purged during our cond_wait() */
1053
1054   pthread_mutex_unlock(&cache_lock);
1055
1056   return (0);
1057 } /* }}} int flush_file */
1058
1059 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1060     char *buffer, size_t buffer_size)
1061 {
1062   int status;
1063   char **help_text;
1064   char *command;
1065
1066   char *help_help[2] =
1067   {
1068     "Command overview\n"
1069     ,
1070     "HELP [<command>]\n"
1071     "FLUSH <filename>\n"
1072     "FLUSHALL\n"
1073     "PENDING <filename>\n"
1074     "FORGET <filename>\n"
1075     "UPDATE <filename> <values> [<values> ...]\n"
1076     "BATCH\n"
1077     "STATS\n"
1078   };
1079
1080   char *help_flush[2] =
1081   {
1082     "Help for FLUSH\n"
1083     ,
1084     "Usage: FLUSH <filename>\n"
1085     "\n"
1086     "Adds the given filename to the head of the update queue and returns\n"
1087     "after is has been dequeued.\n"
1088   };
1089
1090   char *help_flushall[2] =
1091   {
1092     "Help for FLUSHALL\n"
1093     ,
1094     "Usage: FLUSHALL\n"
1095     "\n"
1096     "Triggers writing of all pending updates.  Returns immediately.\n"
1097   };
1098
1099   char *help_pending[2] =
1100   {
1101     "Help for PENDING\n"
1102     ,
1103     "Usage: PENDING <filename>\n"
1104     "\n"
1105     "Shows any 'pending' updates for a file, in order.\n"
1106     "The updates shown have not yet been written to the underlying RRD file.\n"
1107   };
1108
1109   char *help_forget[2] =
1110   {
1111     "Help for FORGET\n"
1112     ,
1113     "Usage: FORGET <filename>\n"
1114     "\n"
1115     "Removes the file completely from the cache.\n"
1116     "Any pending updates for the file will be lost.\n"
1117   };
1118
1119   char *help_update[2] =
1120   {
1121     "Help for UPDATE\n"
1122     ,
1123     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1124     "\n"
1125     "Adds the given file to the internal cache if it is not yet known and\n"
1126     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1127     "for details.\n"
1128     "\n"
1129     "Each <values> has the following form:\n"
1130     "  <values> = <time>:<value>[:<value>[...]]\n"
1131     "See the rrdupdate(1) manpage for details.\n"
1132   };
1133
1134   char *help_stats[2] =
1135   {
1136     "Help for STATS\n"
1137     ,
1138     "Usage: STATS\n"
1139     "\n"
1140     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1141     "a description of the values.\n"
1142   };
1143
1144   char *help_batch[2] =
1145   {
1146     "Help for BATCH\n"
1147     ,
1148     "The 'BATCH' command permits the client to initiate a bulk load\n"
1149     "   of commands to rrdcached.\n"
1150     "\n"
1151     "Usage:\n"
1152     "\n"
1153     "    client: BATCH\n"
1154     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1155     "    client: command #1\n"
1156     "    client: command #2\n"
1157     "    client: ... and so on\n"
1158     "    client: .\n"
1159     "    server: 2 errors\n"
1160     "    server: 7 message for command #7\n"
1161     "    server: 9 message for command #9\n"
1162     "\n"
1163     "For more information, consult the rrdcached(1) documentation.\n"
1164   };
1165
1166   status = buffer_get_field (&buffer, &buffer_size, &command);
1167   if (status != 0)
1168     help_text = help_help;
1169   else
1170   {
1171     if (strcasecmp (command, "update") == 0)
1172       help_text = help_update;
1173     else if (strcasecmp (command, "flush") == 0)
1174       help_text = help_flush;
1175     else if (strcasecmp (command, "flushall") == 0)
1176       help_text = help_flushall;
1177     else if (strcasecmp (command, "pending") == 0)
1178       help_text = help_pending;
1179     else if (strcasecmp (command, "forget") == 0)
1180       help_text = help_forget;
1181     else if (strcasecmp (command, "stats") == 0)
1182       help_text = help_stats;
1183     else if (strcasecmp (command, "batch") == 0)
1184       help_text = help_batch;
1185     else
1186       help_text = help_help;
1187   }
1188
1189   add_response_info(sock, help_text[1]);
1190   return send_response(sock, RESP_OK, help_text[0]);
1191 } /* }}} int handle_request_help */
1192
1193 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1194 {
1195   uint64_t copy_queue_length;
1196   uint64_t copy_updates_received;
1197   uint64_t copy_flush_received;
1198   uint64_t copy_updates_written;
1199   uint64_t copy_data_sets_written;
1200   uint64_t copy_journal_bytes;
1201   uint64_t copy_journal_rotate;
1202
1203   uint64_t tree_nodes_number;
1204   uint64_t tree_depth;
1205
1206   pthread_mutex_lock (&stats_lock);
1207   copy_queue_length       = stats_queue_length;
1208   copy_updates_received   = stats_updates_received;
1209   copy_flush_received     = stats_flush_received;
1210   copy_updates_written    = stats_updates_written;
1211   copy_data_sets_written  = stats_data_sets_written;
1212   copy_journal_bytes      = stats_journal_bytes;
1213   copy_journal_rotate     = stats_journal_rotate;
1214   pthread_mutex_unlock (&stats_lock);
1215
1216   pthread_mutex_lock (&cache_lock);
1217   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1218   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1219   pthread_mutex_unlock (&cache_lock);
1220
1221   add_response_info(sock,
1222                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1223   add_response_info(sock,
1224                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1225   add_response_info(sock,
1226                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1227   add_response_info(sock,
1228                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1229   add_response_info(sock,
1230                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1231   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1232   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1233   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1234   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1235
1236   send_response(sock, RESP_OK, "Statistics follow\n");
1237
1238   return (0);
1239 } /* }}} int handle_request_stats */
1240
1241 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1242     char *buffer, size_t buffer_size)
1243 {
1244   char *file, file_tmp[PATH_MAX];
1245   int status;
1246
1247   status = buffer_get_field (&buffer, &buffer_size, &file);
1248   if (status != 0)
1249   {
1250     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1251   }
1252   else
1253   {
1254     pthread_mutex_lock(&stats_lock);
1255     stats_flush_received++;
1256     pthread_mutex_unlock(&stats_lock);
1257
1258     get_abs_path(&file, file_tmp);
1259     if (!check_file_access(file, sock)) return 0;
1260
1261     status = flush_file (file);
1262     if (status == 0)
1263       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1264     else if (status == ENOENT)
1265     {
1266       /* no file in our tree; see whether it exists at all */
1267       struct stat statbuf;
1268
1269       memset(&statbuf, 0, sizeof(statbuf));
1270       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1271         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1272       else
1273         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1274     }
1275     else if (status < 0)
1276       return send_response(sock, RESP_ERR, "Internal error.\n");
1277     else
1278       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1279   }
1280
1281   /* NOTREACHED */
1282   assert(1==0);
1283 } /* }}} int handle_request_flush */
1284
1285 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1286 {
1287   int status;
1288
1289   status = has_privilege(sock, PRIV_HIGH);
1290   if (status <= 0)
1291     return status;
1292
1293   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1294
1295   pthread_mutex_lock(&cache_lock);
1296   flush_old_values(-1);
1297   pthread_mutex_unlock(&cache_lock);
1298
1299   return send_response(sock, RESP_OK, "Started flush.\n");
1300 } /* }}} static int handle_request_flushall */
1301
1302 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1303                                   char *buffer, size_t buffer_size)
1304 {
1305   int status;
1306   char *file, file_tmp[PATH_MAX];
1307   cache_item_t *ci;
1308
1309   status = buffer_get_field(&buffer, &buffer_size, &file);
1310   if (status != 0)
1311     return send_response(sock, RESP_ERR,
1312                          "Usage: PENDING <filename>\n");
1313
1314   status = has_privilege(sock, PRIV_HIGH);
1315   if (status <= 0)
1316     return status;
1317
1318   get_abs_path(&file, file_tmp);
1319
1320   pthread_mutex_lock(&cache_lock);
1321   ci = g_tree_lookup(cache_tree, file);
1322   if (ci == NULL)
1323   {
1324     pthread_mutex_unlock(&cache_lock);
1325     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1326   }
1327
1328   for (int i=0; i < ci->values_num; i++)
1329     add_response_info(sock, "%s\n", ci->values[i]);
1330
1331   pthread_mutex_unlock(&cache_lock);
1332   return send_response(sock, RESP_OK, "updates pending\n");
1333 } /* }}} static int handle_request_pending */
1334
1335 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1336                                  char *buffer, size_t buffer_size)
1337 {
1338   int status;
1339   char *file, file_tmp[PATH_MAX];
1340
1341   status = buffer_get_field(&buffer, &buffer_size, &file);
1342   if (status != 0)
1343     return send_response(sock, RESP_ERR,
1344                          "Usage: FORGET <filename>\n");
1345
1346   status = has_privilege(sock, PRIV_HIGH);
1347   if (status <= 0)
1348     return status;
1349
1350   get_abs_path(&file, file_tmp);
1351   if (!check_file_access(file, sock)) return 0;
1352
1353   pthread_mutex_lock(&cache_lock);
1354   status = forget_file(file);
1355   pthread_mutex_unlock(&cache_lock);
1356
1357   if (status == 0)
1358   {
1359     if (sock != NULL)
1360       journal_write("forget", file);
1361
1362     return send_response(sock, RESP_OK, "Gone!\n");
1363   }
1364   else
1365     return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1366                          status < 0 ? "Internal error" : rrd_strerror(status));
1367
1368   /* NOTREACHED */
1369   assert(1==0);
1370 } /* }}} static int handle_request_forget */
1371
1372 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1373                                   time_t now,
1374                                   char *buffer, size_t buffer_size)
1375 {
1376   char *file, file_tmp[PATH_MAX];
1377   int values_num = 0;
1378   int bad_timestamps = 0;
1379   int status;
1380   char orig_buf[CMD_MAX];
1381
1382   cache_item_t *ci;
1383
1384   status = has_privilege(sock, PRIV_HIGH);
1385   if (status <= 0)
1386     return status;
1387
1388   /* save it for the journal later */
1389   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1390
1391   status = buffer_get_field (&buffer, &buffer_size, &file);
1392   if (status != 0)
1393     return send_response(sock, RESP_ERR,
1394                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1395
1396   pthread_mutex_lock(&stats_lock);
1397   stats_updates_received++;
1398   pthread_mutex_unlock(&stats_lock);
1399
1400   get_abs_path(&file, file_tmp);
1401   if (!check_file_access(file, sock)) return 0;
1402
1403   pthread_mutex_lock (&cache_lock);
1404   ci = g_tree_lookup (cache_tree, file);
1405
1406   if (ci == NULL) /* {{{ */
1407   {
1408     struct stat statbuf;
1409
1410     /* don't hold the lock while we setup; stat(2) might block */
1411     pthread_mutex_unlock(&cache_lock);
1412
1413     memset (&statbuf, 0, sizeof (statbuf));
1414     status = stat (file, &statbuf);
1415     if (status != 0)
1416     {
1417       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1418
1419       status = errno;
1420       if (status == ENOENT)
1421         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1422       else
1423         return send_response(sock, RESP_ERR,
1424                              "stat failed with error %i.\n", status);
1425     }
1426     if (!S_ISREG (statbuf.st_mode))
1427       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1428
1429     if (access(file, R_OK|W_OK) != 0)
1430       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1431                            file, rrd_strerror(errno));
1432
1433     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1434     if (ci == NULL)
1435     {
1436       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1437
1438       return send_response(sock, RESP_ERR, "malloc failed.\n");
1439     }
1440     memset (ci, 0, sizeof (cache_item_t));
1441
1442     ci->file = strdup (file);
1443     if (ci->file == NULL)
1444     {
1445       free (ci);
1446       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1447
1448       return send_response(sock, RESP_ERR, "strdup failed.\n");
1449     }
1450
1451     wipe_ci_values(ci, now);
1452     ci->flags = CI_FLAGS_IN_TREE;
1453
1454     pthread_mutex_lock(&cache_lock);
1455     g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1456   } /* }}} */
1457   assert (ci != NULL);
1458
1459   /* don't re-write updates in replay mode */
1460   if (sock != NULL)
1461     journal_write("update", orig_buf);
1462
1463   while (buffer_size > 0)
1464   {
1465     char **temp;
1466     char *value;
1467     time_t stamp;
1468     char *eostamp;
1469
1470     status = buffer_get_field (&buffer, &buffer_size, &value);
1471     if (status != 0)
1472     {
1473       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1474       break;
1475     }
1476
1477     /* make sure update time is always moving forward */
1478     stamp = strtol(value, &eostamp, 10);
1479     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1480     {
1481       ++bad_timestamps;
1482       add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1483       continue;
1484     }
1485     else if (stamp <= ci->last_update_stamp)
1486     {
1487       ++bad_timestamps;
1488       add_response_info(sock,
1489                         "illegal attempt to update using time %ld when"
1490                         " last update time is %ld (minimum one second step)\n",
1491                         stamp, ci->last_update_stamp);
1492       continue;
1493     }
1494     else
1495       ci->last_update_stamp = stamp;
1496
1497     temp = (char **) realloc (ci->values,
1498         sizeof (char *) * (ci->values_num + 1));
1499     if (temp == NULL)
1500     {
1501       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1502       continue;
1503     }
1504     ci->values = temp;
1505
1506     ci->values[ci->values_num] = strdup (value);
1507     if (ci->values[ci->values_num] == NULL)
1508     {
1509       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1510       continue;
1511     }
1512     ci->values_num++;
1513
1514     values_num++;
1515   }
1516
1517   if (((now - ci->last_flush_time) >= config_write_interval)
1518       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1519       && (ci->values_num > 0))
1520   {
1521     enqueue_cache_item (ci, TAIL);
1522   }
1523
1524   pthread_mutex_unlock (&cache_lock);
1525
1526   if (values_num < 1)
1527   {
1528     /* journal replay mode */
1529     if (sock == NULL) return RESP_ERR;
1530
1531     /* if we had only one update attempt, then return the full
1532        error message... try to get the most information out
1533        of the limited error space allowed by the protocol
1534     */
1535     if (bad_timestamps == 1)
1536       return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1537     else
1538       return send_response(sock, RESP_ERR,
1539                            "No values updated (%d bad timestamps).\n",
1540                            bad_timestamps);
1541   }
1542   else
1543     return send_response(sock, RESP_OK,
1544                          "errors, enqueued %i value(s).\n", values_num);
1545
1546   /* NOTREACHED */
1547   assert(1==0);
1548
1549 } /* }}} int handle_request_update */
1550
1551 /* we came across a "WROTE" entry during journal replay.
1552  * throw away any values that we have accumulated for this file
1553  */
1554 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1555 {
1556   int i;
1557   cache_item_t *ci;
1558   const char *file = buffer;
1559
1560   pthread_mutex_lock(&cache_lock);
1561
1562   ci = g_tree_lookup(cache_tree, file);
1563   if (ci == NULL)
1564   {
1565     pthread_mutex_unlock(&cache_lock);
1566     return (0);
1567   }
1568
1569   if (ci->values)
1570   {
1571     for (i=0; i < ci->values_num; i++)
1572       free(ci->values[i]);
1573
1574     free(ci->values);
1575   }
1576
1577   wipe_ci_values(ci, now);
1578   remove_from_queue(ci);
1579
1580   pthread_mutex_unlock(&cache_lock);
1581   return (0);
1582 } /* }}} int handle_request_wrote */
1583
1584 /* start "BATCH" processing */
1585 static int batch_start (listen_socket_t *sock) /* {{{ */
1586 {
1587   int status;
1588   if (sock->batch_start)
1589     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1590
1591   status = send_response(sock, RESP_OK,
1592                          "Go ahead.  End with dot '.' on its own line.\n");
1593   sock->batch_start = time(NULL);
1594   sock->batch_cmd = 0;
1595
1596   return status;
1597 } /* }}} static int batch_start */
1598
1599 /* finish "BATCH" processing and return results to the client */
1600 static int batch_done (listen_socket_t *sock) /* {{{ */
1601 {
1602   assert(sock->batch_start);
1603   sock->batch_start = 0;
1604   sock->batch_cmd  = 0;
1605   return send_response(sock, RESP_OK, "errors\n");
1606 } /* }}} static int batch_done */
1607
1608 /* if sock==NULL, we are in journal replay mode */
1609 static int handle_request (listen_socket_t *sock, /* {{{ */
1610                            time_t now,
1611                            char *buffer, size_t buffer_size)
1612 {
1613   char *buffer_ptr;
1614   char *command;
1615   int status;
1616
1617   assert (buffer[buffer_size - 1] == '\0');
1618
1619   buffer_ptr = buffer;
1620   command = NULL;
1621   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1622   if (status != 0)
1623   {
1624     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1625     return (-1);
1626   }
1627
1628   if (sock != NULL && sock->batch_start)
1629     sock->batch_cmd++;
1630
1631   if (strcasecmp (command, "update") == 0)
1632     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1633   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1634   {
1635     /* this is only valid in replay mode */
1636     return (handle_request_wrote (buffer_ptr, now));
1637   }
1638   else if (strcasecmp (command, "flush") == 0)
1639     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1640   else if (strcasecmp (command, "flushall") == 0)
1641     return (handle_request_flushall(sock));
1642   else if (strcasecmp (command, "pending") == 0)
1643     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1644   else if (strcasecmp (command, "forget") == 0)
1645     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1646   else if (strcasecmp (command, "stats") == 0)
1647     return (handle_request_stats (sock));
1648   else if (strcasecmp (command, "help") == 0)
1649     return (handle_request_help (sock, buffer_ptr, buffer_size));
1650   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1651     return batch_start(sock);
1652   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1653     return batch_done(sock);
1654   else
1655     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1656
1657   /* NOTREACHED */
1658   assert(1==0);
1659 } /* }}} int handle_request */
1660
1661 /* MUST NOT hold journal_lock before calling this */
1662 static void journal_rotate(void) /* {{{ */
1663 {
1664   FILE *old_fh = NULL;
1665   int new_fd;
1666
1667   if (journal_cur == NULL || journal_old == NULL)
1668     return;
1669
1670   pthread_mutex_lock(&journal_lock);
1671
1672   /* we rotate this way (rename before close) so that the we can release
1673    * the journal lock as fast as possible.  Journal writes to the new
1674    * journal can proceed immediately after the new file is opened.  The
1675    * fclose can then block without affecting new updates.
1676    */
1677   if (journal_fh != NULL)
1678   {
1679     old_fh = journal_fh;
1680     journal_fh = NULL;
1681     rename(journal_cur, journal_old);
1682     ++stats_journal_rotate;
1683   }
1684
1685   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1686                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1687   if (new_fd >= 0)
1688   {
1689     journal_fh = fdopen(new_fd, "a");
1690     if (journal_fh == NULL)
1691       close(new_fd);
1692   }
1693
1694   pthread_mutex_unlock(&journal_lock);
1695
1696   if (old_fh != NULL)
1697     fclose(old_fh);
1698
1699   if (journal_fh == NULL)
1700   {
1701     RRDD_LOG(LOG_CRIT,
1702              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1703              journal_cur, rrd_strerror(errno));
1704
1705     RRDD_LOG(LOG_ERR,
1706              "JOURNALING DISABLED: All values will be flushed at shutdown");
1707     config_flush_at_shutdown = 1;
1708   }
1709
1710 } /* }}} static void journal_rotate */
1711
1712 static void journal_done(void) /* {{{ */
1713 {
1714   if (journal_cur == NULL)
1715     return;
1716
1717   pthread_mutex_lock(&journal_lock);
1718   if (journal_fh != NULL)
1719   {
1720     fclose(journal_fh);
1721     journal_fh = NULL;
1722   }
1723
1724   if (config_flush_at_shutdown)
1725   {
1726     RRDD_LOG(LOG_INFO, "removing journals");
1727     unlink(journal_old);
1728     unlink(journal_cur);
1729   }
1730   else
1731   {
1732     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1733              "journals will be used at next startup");
1734   }
1735
1736   pthread_mutex_unlock(&journal_lock);
1737
1738 } /* }}} static void journal_done */
1739
1740 static int journal_write(char *cmd, char *args) /* {{{ */
1741 {
1742   int chars;
1743
1744   if (journal_fh == NULL)
1745     return 0;
1746
1747   pthread_mutex_lock(&journal_lock);
1748   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1749   pthread_mutex_unlock(&journal_lock);
1750
1751   if (chars > 0)
1752   {
1753     pthread_mutex_lock(&stats_lock);
1754     stats_journal_bytes += chars;
1755     pthread_mutex_unlock(&stats_lock);
1756   }
1757
1758   return chars;
1759 } /* }}} static int journal_write */
1760
1761 static int journal_replay (const char *file) /* {{{ */
1762 {
1763   FILE *fh;
1764   int entry_cnt = 0;
1765   int fail_cnt = 0;
1766   uint64_t line = 0;
1767   char entry[CMD_MAX];
1768   time_t now;
1769
1770   if (file == NULL) return 0;
1771
1772   {
1773     char *reason;
1774     int status = 0;
1775     struct stat statbuf;
1776
1777     memset(&statbuf, 0, sizeof(statbuf));
1778     if (stat(file, &statbuf) != 0)
1779     {
1780       if (errno == ENOENT)
1781         return 0;
1782
1783       reason = "stat error";
1784       status = errno;
1785     }
1786     else if (!S_ISREG(statbuf.st_mode))
1787     {
1788       reason = "not a regular file";
1789       status = EPERM;
1790     }
1791     if (statbuf.st_uid != daemon_uid)
1792     {
1793       reason = "not owned by daemon user";
1794       status = EACCES;
1795     }
1796     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1797     {
1798       reason = "must not be user/group writable";
1799       status = EACCES;
1800     }
1801
1802     if (status != 0)
1803     {
1804       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1805                file, rrd_strerror(status), reason);
1806       return 0;
1807     }
1808   }
1809
1810   fh = fopen(file, "r");
1811   if (fh == NULL)
1812   {
1813     if (errno != ENOENT)
1814       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1815                file, rrd_strerror(errno));
1816     return 0;
1817   }
1818   else
1819     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1820
1821   now = time(NULL);
1822
1823   while(!feof(fh))
1824   {
1825     size_t entry_len;
1826
1827     ++line;
1828     if (fgets(entry, sizeof(entry), fh) == NULL)
1829       break;
1830     entry_len = strlen(entry);
1831
1832     /* check \n termination in case journal writing crashed mid-line */
1833     if (entry_len == 0)
1834       continue;
1835     else if (entry[entry_len - 1] != '\n')
1836     {
1837       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1838       ++fail_cnt;
1839       continue;
1840     }
1841
1842     entry[entry_len - 1] = '\0';
1843
1844     if (handle_request(NULL, now, entry, entry_len) == 0)
1845       ++entry_cnt;
1846     else
1847       ++fail_cnt;
1848   }
1849
1850   fclose(fh);
1851
1852   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1853            entry_cnt, fail_cnt);
1854
1855   return entry_cnt > 0 ? 1 : 0;
1856 } /* }}} static int journal_replay */
1857
1858 static void journal_init(void) /* {{{ */
1859 {
1860   int had_journal = 0;
1861
1862   if (journal_cur == NULL) return;
1863
1864   pthread_mutex_lock(&journal_lock);
1865
1866   RRDD_LOG(LOG_INFO, "checking for journal files");
1867
1868   had_journal += journal_replay(journal_old);
1869   had_journal += journal_replay(journal_cur);
1870
1871   /* it must have been a crash.  start a flush */
1872   if (had_journal && config_flush_at_shutdown)
1873     flush_old_values(-1);
1874
1875   pthread_mutex_unlock(&journal_lock);
1876   journal_rotate();
1877
1878   RRDD_LOG(LOG_INFO, "journal processing complete");
1879
1880 } /* }}} static void journal_init */
1881
1882 static void close_connection(listen_socket_t *sock)
1883 {
1884   close(sock->fd) ;  sock->fd   = -1;
1885   free(sock->rbuf);  sock->rbuf = NULL;
1886   free(sock->wbuf);  sock->wbuf = NULL;
1887
1888   free(sock);
1889 }
1890
1891 static void *connection_thread_main (void *args) /* {{{ */
1892 {
1893   pthread_t self;
1894   listen_socket_t *sock;
1895   int i;
1896   int fd;
1897
1898   sock = (listen_socket_t *) args;
1899   fd = sock->fd;
1900
1901   /* init read buffers */
1902   sock->next_read = sock->next_cmd = 0;
1903   sock->rbuf = malloc(RBUF_SIZE);
1904   if (sock->rbuf == NULL)
1905   {
1906     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1907     close_connection(sock);
1908     return NULL;
1909   }
1910
1911   pthread_mutex_lock (&connection_threads_lock);
1912   {
1913     pthread_t *temp;
1914
1915     temp = (pthread_t *) realloc (connection_threads,
1916         sizeof (pthread_t) * (connection_threads_num + 1));
1917     if (temp == NULL)
1918     {
1919       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1920     }
1921     else
1922     {
1923       connection_threads = temp;
1924       connection_threads[connection_threads_num] = pthread_self ();
1925       connection_threads_num++;
1926     }
1927   }
1928   pthread_mutex_unlock (&connection_threads_lock);
1929
1930   while (do_shutdown == 0)
1931   {
1932     char *cmd;
1933     ssize_t cmd_len;
1934     ssize_t rbytes;
1935     time_t now;
1936
1937     struct pollfd pollfd;
1938     int status;
1939
1940     pollfd.fd = fd;
1941     pollfd.events = POLLIN | POLLPRI;
1942     pollfd.revents = 0;
1943
1944     status = poll (&pollfd, 1, /* timeout = */ 500);
1945     if (do_shutdown)
1946       break;
1947     else if (status == 0) /* timeout */
1948       continue;
1949     else if (status < 0) /* error */
1950     {
1951       status = errno;
1952       if (status != EINTR)
1953         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1954       continue;
1955     }
1956
1957     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1958       break;
1959     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1960     {
1961       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1962           "poll(2) returned something unexpected: %#04hx",
1963           pollfd.revents);
1964       break;
1965     }
1966
1967     rbytes = read(fd, sock->rbuf + sock->next_read,
1968                   RBUF_SIZE - sock->next_read);
1969     if (rbytes < 0)
1970     {
1971       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1972       break;
1973     }
1974     else if (rbytes == 0)
1975       break; /* eof */
1976
1977     sock->next_read += rbytes;
1978
1979     if (sock->batch_start)
1980       now = sock->batch_start;
1981     else
1982       now = time(NULL);
1983
1984     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1985     {
1986       status = handle_request (sock, now, cmd, cmd_len+1);
1987       if (status != 0)
1988         goto out_close;
1989     }
1990   }
1991
1992 out_close:
1993   close_connection(sock);
1994
1995   self = pthread_self ();
1996   /* Remove this thread from the connection threads list */
1997   pthread_mutex_lock (&connection_threads_lock);
1998   /* Find out own index in the array */
1999   for (i = 0; i < connection_threads_num; i++)
2000     if (pthread_equal (connection_threads[i], self) != 0)
2001       break;
2002   assert (i < connection_threads_num);
2003
2004   /* Move the trailing threads forward. */
2005   if (i < (connection_threads_num - 1))
2006   {
2007     memmove (connection_threads + i,
2008         connection_threads + i + 1,
2009         sizeof (pthread_t) * (connection_threads_num - i - 1));
2010   }
2011
2012   connection_threads_num--;
2013   pthread_mutex_unlock (&connection_threads_lock);
2014
2015   return (NULL);
2016 } /* }}} void *connection_thread_main */
2017
2018 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2019 {
2020   int fd;
2021   struct sockaddr_un sa;
2022   listen_socket_t *temp;
2023   int status;
2024   const char *path;
2025
2026   path = sock->addr;
2027   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2028     path += strlen("unix:");
2029
2030   temp = (listen_socket_t *) realloc (listen_fds,
2031       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2032   if (temp == NULL)
2033   {
2034     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2035     return (-1);
2036   }
2037   listen_fds = temp;
2038   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2039
2040   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2041   if (fd < 0)
2042   {
2043     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2044              rrd_strerror(errno));
2045     return (-1);
2046   }
2047
2048   memset (&sa, 0, sizeof (sa));
2049   sa.sun_family = AF_UNIX;
2050   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2051
2052   /* if we've gotten this far, we own the pid file.  any daemon started
2053    * with the same args must not be alive.  therefore, ensure that we can
2054    * create the socket...
2055    */
2056   unlink(path);
2057
2058   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2059   if (status != 0)
2060   {
2061     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2062              path, rrd_strerror(errno));
2063     close (fd);
2064     return (-1);
2065   }
2066
2067   status = listen (fd, /* backlog = */ 10);
2068   if (status != 0)
2069   {
2070     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2071              path, rrd_strerror(errno));
2072     close (fd);
2073     unlink (path);
2074     return (-1);
2075   }
2076
2077   listen_fds[listen_fds_num].fd = fd;
2078   listen_fds[listen_fds_num].family = PF_UNIX;
2079   strncpy(listen_fds[listen_fds_num].addr, path,
2080           sizeof (listen_fds[listen_fds_num].addr) - 1);
2081   listen_fds_num++;
2082
2083   return (0);
2084 } /* }}} int open_listen_socket_unix */
2085
2086 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2087 {
2088   struct addrinfo ai_hints;
2089   struct addrinfo *ai_res;
2090   struct addrinfo *ai_ptr;
2091   char addr_copy[NI_MAXHOST];
2092   char *addr;
2093   char *port;
2094   int status;
2095
2096   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2097   addr_copy[sizeof (addr_copy) - 1] = 0;
2098   addr = addr_copy;
2099
2100   memset (&ai_hints, 0, sizeof (ai_hints));
2101   ai_hints.ai_flags = 0;
2102 #ifdef AI_ADDRCONFIG
2103   ai_hints.ai_flags |= AI_ADDRCONFIG;
2104 #endif
2105   ai_hints.ai_family = AF_UNSPEC;
2106   ai_hints.ai_socktype = SOCK_STREAM;
2107
2108   port = NULL;
2109   if (*addr == '[') /* IPv6+port format */
2110   {
2111     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2112     addr++;
2113
2114     port = strchr (addr, ']');
2115     if (port == NULL)
2116     {
2117       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2118       return (-1);
2119     }
2120     *port = 0;
2121     port++;
2122
2123     if (*port == ':')
2124       port++;
2125     else if (*port == 0)
2126       port = NULL;
2127     else
2128     {
2129       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2130       return (-1);
2131     }
2132   } /* if (*addr = ']') */
2133   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2134   {
2135     port = rindex(addr, ':');
2136     if (port != NULL)
2137     {
2138       *port = 0;
2139       port++;
2140     }
2141   }
2142   ai_res = NULL;
2143   status = getaddrinfo (addr,
2144                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2145                         &ai_hints, &ai_res);
2146   if (status != 0)
2147   {
2148     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2149              addr, gai_strerror (status));
2150     return (-1);
2151   }
2152
2153   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2154   {
2155     int fd;
2156     listen_socket_t *temp;
2157     int one = 1;
2158
2159     temp = (listen_socket_t *) realloc (listen_fds,
2160         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2161     if (temp == NULL)
2162     {
2163       fprintf (stderr,
2164                "rrdcached: open_listen_socket_network: realloc failed.\n");
2165       continue;
2166     }
2167     listen_fds = temp;
2168     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2169
2170     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2171     if (fd < 0)
2172     {
2173       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2174                rrd_strerror(errno));
2175       continue;
2176     }
2177
2178     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2179
2180     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2181     if (status != 0)
2182     {
2183       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2184                sock->addr, rrd_strerror(errno));
2185       close (fd);
2186       continue;
2187     }
2188
2189     status = listen (fd, /* backlog = */ 10);
2190     if (status != 0)
2191     {
2192       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2193                sock->addr, rrd_strerror(errno));
2194       close (fd);
2195       return (-1);
2196     }
2197
2198     listen_fds[listen_fds_num].fd = fd;
2199     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2200     listen_fds_num++;
2201   } /* for (ai_ptr) */
2202
2203   return (0);
2204 } /* }}} static int open_listen_socket_network */
2205
2206 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2207 {
2208   assert(sock != NULL);
2209   assert(sock->addr != NULL);
2210
2211   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2212       || sock->addr[0] == '/')
2213     return (open_listen_socket_unix(sock));
2214   else
2215     return (open_listen_socket_network(sock));
2216 } /* }}} int open_listen_socket */
2217
2218 static int close_listen_sockets (void) /* {{{ */
2219 {
2220   size_t i;
2221
2222   for (i = 0; i < listen_fds_num; i++)
2223   {
2224     close (listen_fds[i].fd);
2225
2226     if (listen_fds[i].family == PF_UNIX)
2227       unlink(listen_fds[i].addr);
2228   }
2229
2230   free (listen_fds);
2231   listen_fds = NULL;
2232   listen_fds_num = 0;
2233
2234   return (0);
2235 } /* }}} int close_listen_sockets */
2236
2237 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2238 {
2239   struct pollfd *pollfds;
2240   int pollfds_num;
2241   int status;
2242   int i;
2243
2244   if (listen_fds_num < 1)
2245   {
2246     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2247     return (NULL);
2248   }
2249
2250   pollfds_num = listen_fds_num;
2251   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2252   if (pollfds == NULL)
2253   {
2254     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2255     return (NULL);
2256   }
2257   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2258
2259   RRDD_LOG(LOG_INFO, "listening for connections");
2260
2261   while (do_shutdown == 0)
2262   {
2263     assert (pollfds_num == ((int) listen_fds_num));
2264     for (i = 0; i < pollfds_num; i++)
2265     {
2266       pollfds[i].fd = listen_fds[i].fd;
2267       pollfds[i].events = POLLIN | POLLPRI;
2268       pollfds[i].revents = 0;
2269     }
2270
2271     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2272     if (do_shutdown)
2273       break;
2274     else if (status == 0) /* timeout */
2275       continue;
2276     else if (status < 0) /* error */
2277     {
2278       status = errno;
2279       if (status != EINTR)
2280       {
2281         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2282       }
2283       continue;
2284     }
2285
2286     for (i = 0; i < pollfds_num; i++)
2287     {
2288       listen_socket_t *client_sock;
2289       struct sockaddr_storage client_sa;
2290       socklen_t client_sa_size;
2291       pthread_t tid;
2292       pthread_attr_t attr;
2293
2294       if (pollfds[i].revents == 0)
2295         continue;
2296
2297       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2298       {
2299         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2300             "poll(2) returned something unexpected for listen FD #%i.",
2301             pollfds[i].fd);
2302         continue;
2303       }
2304
2305       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2306       if (client_sock == NULL)
2307       {
2308         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2309         continue;
2310       }
2311       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2312
2313       client_sa_size = sizeof (client_sa);
2314       client_sock->fd = accept (pollfds[i].fd,
2315           (struct sockaddr *) &client_sa, &client_sa_size);
2316       if (client_sock->fd < 0)
2317       {
2318         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2319         free(client_sock);
2320         continue;
2321       }
2322
2323       pthread_attr_init (&attr);
2324       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2325
2326       status = pthread_create (&tid, &attr, connection_thread_main,
2327                                client_sock);
2328       if (status != 0)
2329       {
2330         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2331         close_connection(client_sock);
2332         continue;
2333       }
2334     } /* for (pollfds_num) */
2335   } /* while (do_shutdown == 0) */
2336
2337   RRDD_LOG(LOG_INFO, "starting shutdown");
2338
2339   close_listen_sockets ();
2340
2341   pthread_mutex_lock (&connection_threads_lock);
2342   while (connection_threads_num > 0)
2343   {
2344     pthread_t wait_for;
2345
2346     wait_for = connection_threads[0];
2347
2348     pthread_mutex_unlock (&connection_threads_lock);
2349     pthread_join (wait_for, /* retval = */ NULL);
2350     pthread_mutex_lock (&connection_threads_lock);
2351   }
2352   pthread_mutex_unlock (&connection_threads_lock);
2353
2354   return (NULL);
2355 } /* }}} void *listen_thread_main */
2356
2357 static int daemonize (void) /* {{{ */
2358 {
2359   int pid_fd;
2360   char *base_dir;
2361
2362   daemon_uid = geteuid();
2363
2364   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2365   if (pid_fd < 0)
2366     pid_fd = check_pidfile();
2367   if (pid_fd < 0)
2368     return pid_fd;
2369
2370   /* open all the listen sockets */
2371   if (config_listen_address_list_len > 0)
2372   {
2373     for (int i = 0; i < config_listen_address_list_len; i++)
2374       open_listen_socket (config_listen_address_list[i]);
2375   }
2376   else
2377   {
2378     listen_socket_t sock;
2379     memset(&sock, 0, sizeof(sock));
2380     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2381     open_listen_socket (&sock);
2382   }
2383
2384   if (listen_fds_num < 1)
2385   {
2386     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2387     goto error;
2388   }
2389
2390   if (!stay_foreground)
2391   {
2392     pid_t child;
2393
2394     child = fork ();
2395     if (child < 0)
2396     {
2397       fprintf (stderr, "daemonize: fork(2) failed.\n");
2398       goto error;
2399     }
2400     else if (child > 0)
2401       exit(0);
2402
2403     /* Become session leader */
2404     setsid ();
2405
2406     /* Open the first three file descriptors to /dev/null */
2407     close (2);
2408     close (1);
2409     close (0);
2410
2411     open ("/dev/null", O_RDWR);
2412     dup (0);
2413     dup (0);
2414   } /* if (!stay_foreground) */
2415
2416   /* Change into the /tmp directory. */
2417   base_dir = (config_base_dir != NULL)
2418     ? config_base_dir
2419     : "/tmp";
2420
2421   if (chdir (base_dir) != 0)
2422   {
2423     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2424     goto error;
2425   }
2426
2427   install_signal_handlers();
2428
2429   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2430   RRDD_LOG(LOG_INFO, "starting up");
2431
2432   cache_tree = g_tree_new ((GCompareFunc) strcmp);
2433   if (cache_tree == NULL)
2434   {
2435     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2436     goto error;
2437   }
2438
2439   return write_pidfile (pid_fd);
2440
2441 error:
2442   remove_pidfile();
2443   return -1;
2444 } /* }}} int daemonize */
2445
2446 static int cleanup (void) /* {{{ */
2447 {
2448   do_shutdown++;
2449
2450   pthread_cond_signal (&cache_cond);
2451   pthread_join (queue_thread, /* return = */ NULL);
2452
2453   remove_pidfile ();
2454
2455   RRDD_LOG(LOG_INFO, "goodbye");
2456   closelog ();
2457
2458   return (0);
2459 } /* }}} int cleanup */
2460
2461 static int read_options (int argc, char **argv) /* {{{ */
2462 {
2463   int option;
2464   int status = 0;
2465
2466   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2467   {
2468     switch (option)
2469     {
2470       case 'g':
2471         stay_foreground=1;
2472         break;
2473
2474       case 'L':
2475       case 'l':
2476       {
2477         listen_socket_t **temp;
2478         listen_socket_t *new;
2479
2480         new = malloc(sizeof(listen_socket_t));
2481         if (new == NULL)
2482         {
2483           fprintf(stderr, "read_options: malloc failed.\n");
2484           return(2);
2485         }
2486         memset(new, 0, sizeof(listen_socket_t));
2487
2488         temp = (listen_socket_t **) realloc (config_listen_address_list,
2489             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2490         if (temp == NULL)
2491         {
2492           fprintf (stderr, "read_options: realloc failed.\n");
2493           return (2);
2494         }
2495         config_listen_address_list = temp;
2496
2497         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2498         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2499
2500         temp[config_listen_address_list_len] = new;
2501         config_listen_address_list_len++;
2502       }
2503       break;
2504
2505       case 'f':
2506       {
2507         int temp;
2508
2509         temp = atoi (optarg);
2510         if (temp > 0)
2511           config_flush_interval = temp;
2512         else
2513         {
2514           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2515           status = 3;
2516         }
2517       }
2518       break;
2519
2520       case 'w':
2521       {
2522         int temp;
2523
2524         temp = atoi (optarg);
2525         if (temp > 0)
2526           config_write_interval = temp;
2527         else
2528         {
2529           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2530           status = 2;
2531         }
2532       }
2533       break;
2534
2535       case 'z':
2536       {
2537         int temp;
2538
2539         temp = atoi(optarg);
2540         if (temp > 0)
2541           config_write_jitter = temp;
2542         else
2543         {
2544           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2545           status = 2;
2546         }
2547
2548         break;
2549       }
2550
2551       case 'B':
2552         config_write_base_only = 1;
2553         break;
2554
2555       case 'b':
2556       {
2557         size_t len;
2558         char base_realpath[PATH_MAX];
2559
2560         if (config_base_dir != NULL)
2561           free (config_base_dir);
2562         config_base_dir = strdup (optarg);
2563         if (config_base_dir == NULL)
2564         {
2565           fprintf (stderr, "read_options: strdup failed.\n");
2566           return (3);
2567         }
2568
2569         /* make sure that the base directory is not resolved via
2570          * symbolic links.  this makes some performance-enhancing
2571          * assumptions possible (we don't have to resolve paths
2572          * that start with a "/")
2573          */
2574         if (realpath(config_base_dir, base_realpath) == NULL)
2575         {
2576           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2577           return 5;
2578         }
2579         else if (strncmp(config_base_dir,
2580                          base_realpath, sizeof(base_realpath)) != 0)
2581         {
2582           fprintf(stderr,
2583                   "Base directory (-b) resolved via file system links!\n"
2584                   "Please consult rrdcached '-b' documentation!\n"
2585                   "Consider specifying the real directory (%s)\n",
2586                   base_realpath);
2587           return 5;
2588         }
2589
2590         len = strlen (config_base_dir);
2591         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2592         {
2593           config_base_dir[len - 1] = 0;
2594           len--;
2595         }
2596
2597         if (len < 1)
2598         {
2599           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2600           return (4);
2601         }
2602
2603         _config_base_dir_len = len;
2604       }
2605       break;
2606
2607       case 'p':
2608       {
2609         if (config_pid_file != NULL)
2610           free (config_pid_file);
2611         config_pid_file = strdup (optarg);
2612         if (config_pid_file == NULL)
2613         {
2614           fprintf (stderr, "read_options: strdup failed.\n");
2615           return (3);
2616         }
2617       }
2618       break;
2619
2620       case 'F':
2621         config_flush_at_shutdown = 1;
2622         break;
2623
2624       case 'j':
2625       {
2626         struct stat statbuf;
2627         const char *dir = optarg;
2628
2629         status = stat(dir, &statbuf);
2630         if (status != 0)
2631         {
2632           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2633           return 6;
2634         }
2635
2636         if (!S_ISDIR(statbuf.st_mode)
2637             || access(dir, R_OK|W_OK|X_OK) != 0)
2638         {
2639           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2640                   errno ? rrd_strerror(errno) : "");
2641           return 6;
2642         }
2643
2644         journal_cur = malloc(PATH_MAX + 1);
2645         journal_old = malloc(PATH_MAX + 1);
2646         if (journal_cur == NULL || journal_old == NULL)
2647         {
2648           fprintf(stderr, "malloc failure for journal files\n");
2649           return 6;
2650         }
2651         else 
2652         {
2653           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2654           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2655         }
2656       }
2657       break;
2658
2659       case 'h':
2660       case '?':
2661         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2662             "\n"
2663             "Usage: rrdcached [options]\n"
2664             "\n"
2665             "Valid options are:\n"
2666             "  -l <address>  Socket address to listen to.\n"
2667             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2668             "  -w <seconds>  Interval in which to write data.\n"
2669             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2670             "  -f <seconds>  Interval in which to flush dead data.\n"
2671             "  -p <file>     Location of the PID-file.\n"
2672             "  -b <dir>      Base directory to change to.\n"
2673             "  -B            Restrict file access to paths within -b <dir>\n"
2674             "  -g            Do not fork and run in the foreground.\n"
2675             "  -j <dir>      Directory in which to create the journal files.\n"
2676             "  -F            Always flush all updates at shutdown\n"
2677             "\n"
2678             "For more information and a detailed description of all options "
2679             "please refer\n"
2680             "to the rrdcached(1) manual page.\n",
2681             VERSION);
2682         status = -1;
2683         break;
2684     } /* switch (option) */
2685   } /* while (getopt) */
2686
2687   /* advise the user when values are not sane */
2688   if (config_flush_interval < 2 * config_write_interval)
2689     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2690             " 2x write interval (-w) !\n");
2691   if (config_write_jitter > config_write_interval)
2692     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2693             " write interval (-w) !\n");
2694
2695   if (config_write_base_only && config_base_dir == NULL)
2696     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2697             "  Consult the rrdcached documentation\n");
2698
2699   if (journal_cur == NULL)
2700     config_flush_at_shutdown = 1;
2701
2702   return (status);
2703 } /* }}} int read_options */
2704
2705 int main (int argc, char **argv)
2706 {
2707   int status;
2708
2709   status = read_options (argc, argv);
2710   if (status != 0)
2711   {
2712     if (status < 0)
2713       status = 0;
2714     return (status);
2715   }
2716
2717   status = daemonize ();
2718   if (status != 0)
2719   {
2720     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2721     return (1);
2722   }
2723
2724   journal_init();
2725
2726   /* start the queue thread */
2727   memset (&queue_thread, 0, sizeof (queue_thread));
2728   status = pthread_create (&queue_thread,
2729                            NULL, /* attr */
2730                            queue_thread_main,
2731                            NULL); /* args */
2732   if (status != 0)
2733   {
2734     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2735     cleanup();
2736     return (1);
2737   }
2738
2739   listen_thread_main (NULL);
2740   cleanup ();
2741
2742   return (0);
2743 } /* int main */
2744
2745 /*
2746  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2747  */