This helps with portability on platforms where realloc doesn't handle NULL.
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78 #include <stdint.h>
79 #include <stdio.h>
80 #include <unistd.h>
81 #include <string.h>
82 #include <strings.h>
83 #include <stdint.h>
84 #include <inttypes.h>
85
86 #include <sys/types.h>
87 #include <sys/stat.h>
88 #include <fcntl.h>
89 #include <signal.h>
90 #include <sys/socket.h>
91 #include <sys/un.h>
92 #include <netdb.h>
93 #include <poll.h>
94 #include <syslog.h>
95 #include <pthread.h>
96 #include <errno.h>
97 #include <assert.h>
98 #include <sys/time.h>
99 #include <time.h>
100
101 #include <glib-2.0/glib.h>
102 /* }}} */
103
104 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
105
106 #ifndef __GNUC__
107 # define __attribute__(x) /**/
108 #endif
109
110 /*
111  * Types
112  */
113 typedef enum
114 {
115   PRIV_LOW,
116   PRIV_HIGH
117 } socket_privilege;
118
119 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
120
121 struct listen_socket_s
122 {
123   int fd;
124   char addr[PATH_MAX + 1];
125   int family;
126   socket_privilege privilege;
127
128   /* state for BATCH processing */
129   time_t batch_start;
130   int batch_cmd;
131
132   /* buffered IO */
133   char *rbuf;
134   off_t next_cmd;
135   off_t next_read;
136
137   char *wbuf;
138   ssize_t wbuf_len;
139 };
140 typedef struct listen_socket_s listen_socket_t;
141
142 struct cache_item_s;
143 typedef struct cache_item_s cache_item_t;
144 struct cache_item_s
145 {
146   char *file;
147   char **values;
148   int values_num;
149   time_t last_flush_time;
150   time_t last_update_stamp;
151 #define CI_FLAGS_IN_TREE  (1<<0)
152 #define CI_FLAGS_IN_QUEUE (1<<1)
153   int flags;
154   pthread_cond_t  flushed;
155   cache_item_t *prev;
156   cache_item_t *next;
157 };
158
159 struct callback_flush_data_s
160 {
161   time_t now;
162   time_t abs_timeout;
163   char **keys;
164   size_t keys_num;
165 };
166 typedef struct callback_flush_data_s callback_flush_data_t;
167
168 enum queue_side_e
169 {
170   HEAD,
171   TAIL
172 };
173 typedef enum queue_side_e queue_side_t;
174
175 /* max length of socket command or response */
176 #define CMD_MAX 4096
177 #define RBUF_SIZE (CMD_MAX*2)
178
179 /*
180  * Variables
181  */
182 static int stay_foreground = 0;
183 static uid_t daemon_uid;
184
185 static listen_socket_t *listen_fds = NULL;
186 static size_t listen_fds_num = 0;
187
188 static int do_shutdown = 0;
189
190 static pthread_t queue_thread;
191
192 static pthread_t *connection_threads = NULL;
193 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
194 static int connection_threads_num = 0;
195
196 /* Cache stuff */
197 static GTree          *cache_tree = NULL;
198 static cache_item_t   *cache_queue_head = NULL;
199 static cache_item_t   *cache_queue_tail = NULL;
200 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
201 static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
202
203 static int config_write_interval = 300;
204 static int config_write_jitter   = 0;
205 static int config_flush_interval = 3600;
206 static int config_flush_at_shutdown = 0;
207 static char *config_pid_file = NULL;
208 static char *config_base_dir = NULL;
209 static size_t _config_base_dir_len = 0;
210 static int config_write_base_only = 0;
211
212 static listen_socket_t **config_listen_address_list = NULL;
213 static int config_listen_address_list_len = 0;
214
215 static uint64_t stats_queue_length = 0;
216 static uint64_t stats_updates_received = 0;
217 static uint64_t stats_flush_received = 0;
218 static uint64_t stats_updates_written = 0;
219 static uint64_t stats_data_sets_written = 0;
220 static uint64_t stats_journal_bytes = 0;
221 static uint64_t stats_journal_rotate = 0;
222 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
223
224 /* Journaled updates */
225 static char *journal_cur = NULL;
226 static char *journal_old = NULL;
227 static FILE *journal_fh = NULL;
228 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
229 static int journal_write(char *cmd, char *args);
230 static void journal_done(void);
231 static void journal_rotate(void);
232
233 /* 
234  * Functions
235  */
236 static void sig_common (const char *sig) /* {{{ */
237 {
238   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
239   do_shutdown++;
240   pthread_cond_broadcast(&cache_cond);
241 } /* }}} void sig_common */
242
243 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
244 {
245   sig_common("INT");
246 } /* }}} void sig_int_handler */
247
248 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
249 {
250   sig_common("TERM");
251 } /* }}} void sig_term_handler */
252
253 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   config_flush_at_shutdown = 1;
256   sig_common("USR1");
257 } /* }}} void sig_usr1_handler */
258
259 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
260 {
261   config_flush_at_shutdown = 0;
262   sig_common("USR2");
263 } /* }}} void sig_usr2_handler */
264
265 static void install_signal_handlers(void) /* {{{ */
266 {
267   /* These structures are static, because `sigaction' behaves weird if the are
268    * overwritten.. */
269   static struct sigaction sa_int;
270   static struct sigaction sa_term;
271   static struct sigaction sa_pipe;
272   static struct sigaction sa_usr1;
273   static struct sigaction sa_usr2;
274
275   /* Install signal handlers */
276   memset (&sa_int, 0, sizeof (sa_int));
277   sa_int.sa_handler = sig_int_handler;
278   sigaction (SIGINT, &sa_int, NULL);
279
280   memset (&sa_term, 0, sizeof (sa_term));
281   sa_term.sa_handler = sig_term_handler;
282   sigaction (SIGTERM, &sa_term, NULL);
283
284   memset (&sa_pipe, 0, sizeof (sa_pipe));
285   sa_pipe.sa_handler = SIG_IGN;
286   sigaction (SIGPIPE, &sa_pipe, NULL);
287
288   memset (&sa_pipe, 0, sizeof (sa_usr1));
289   sa_usr1.sa_handler = sig_usr1_handler;
290   sigaction (SIGUSR1, &sa_usr1, NULL);
291
292   memset (&sa_usr2, 0, sizeof (sa_usr2));
293   sa_usr2.sa_handler = sig_usr2_handler;
294   sigaction (SIGUSR2, &sa_usr2, NULL);
295
296 } /* }}} void install_signal_handlers */
297
298 static int open_pidfile(char *action, int oflag) /* {{{ */
299 {
300   int fd;
301   char *file;
302
303   file = (config_pid_file != NULL)
304     ? config_pid_file
305     : LOCALSTATEDIR "/run/rrdcached.pid";
306
307   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
308   if (fd < 0)
309     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
310             action, file, rrd_strerror(errno));
311
312   return(fd);
313 } /* }}} static int open_pidfile */
314
315 /* check existing pid file to see whether a daemon is running */
316 static int check_pidfile(void)
317 {
318   int pid_fd;
319   pid_t pid;
320   char pid_str[16];
321
322   pid_fd = open_pidfile("open", O_RDWR);
323   if (pid_fd < 0)
324     return pid_fd;
325
326   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
327     return -1;
328
329   pid = atoi(pid_str);
330   if (pid <= 0)
331     return -1;
332
333   /* another running process that we can signal COULD be
334    * a competing rrdcached */
335   if (pid != getpid() && kill(pid, 0) == 0)
336   {
337     fprintf(stderr,
338             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
339     close(pid_fd);
340     return -1;
341   }
342
343   lseek(pid_fd, 0, SEEK_SET);
344   ftruncate(pid_fd, 0);
345
346   fprintf(stderr,
347           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
348           "rrdcached: starting normally.\n", pid);
349
350   return pid_fd;
351 } /* }}} static int check_pidfile */
352
353 static int write_pidfile (int fd) /* {{{ */
354 {
355   pid_t pid;
356   FILE *fh;
357
358   pid = getpid ();
359
360   fh = fdopen (fd, "w");
361   if (fh == NULL)
362   {
363     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
364     close(fd);
365     return (-1);
366   }
367
368   fprintf (fh, "%i\n", (int) pid);
369   fclose (fh);
370
371   return (0);
372 } /* }}} int write_pidfile */
373
374 static int remove_pidfile (void) /* {{{ */
375 {
376   char *file;
377   int status;
378
379   file = (config_pid_file != NULL)
380     ? config_pid_file
381     : LOCALSTATEDIR "/run/rrdcached.pid";
382
383   status = unlink (file);
384   if (status == 0)
385     return (0);
386   return (errno);
387 } /* }}} int remove_pidfile */
388
389 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
390 {
391   char *eol;
392
393   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
394                sock->next_read - sock->next_cmd);
395
396   if (eol == NULL)
397   {
398     /* no commands left, move remainder back to front of rbuf */
399     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
400             sock->next_read - sock->next_cmd);
401     sock->next_read -= sock->next_cmd;
402     sock->next_cmd = 0;
403     *len = 0;
404     return NULL;
405   }
406   else
407   {
408     char *cmd = sock->rbuf + sock->next_cmd;
409     *eol = '\0';
410
411     sock->next_cmd = eol - sock->rbuf + 1;
412
413     if (eol > sock->rbuf && *(eol-1) == '\r')
414       *(--eol) = '\0'; /* handle "\r\n" EOL */
415
416     *len = eol - cmd;
417
418     return cmd;
419   }
420
421   /* NOTREACHED */
422   assert(1==0);
423 }
424
425 /* add the characters directly to the write buffer */
426 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
427 {
428   char *new_buf;
429
430   assert(sock != NULL);
431
432   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
433   if (new_buf == NULL)
434   {
435     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
436     return -1;
437   }
438
439   strncpy(new_buf + sock->wbuf_len, str, len + 1);
440
441   sock->wbuf = new_buf;
442   sock->wbuf_len += len;
443
444   return 0;
445 } /* }}} static int add_to_wbuf */
446
447 /* add the text to the "extra" info that's sent after the status line */
448 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
449 {
450   va_list argp;
451   char buffer[CMD_MAX];
452   int len;
453
454   if (sock == NULL) return 0; /* journal replay mode */
455   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
456
457   va_start(argp, fmt);
458 #ifdef HAVE_VSNPRINTF
459   len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
460 #else
461   len = vsprintf(buffer, fmt, argp);
462 #endif
463   va_end(argp);
464   if (len < 0)
465   {
466     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
467     return -1;
468   }
469
470   return add_to_wbuf(sock, buffer, len);
471 } /* }}} static int add_response_info */
472
473 static int count_lines(char *str) /* {{{ */
474 {
475   int lines = 0;
476
477   if (str != NULL)
478   {
479     while ((str = strchr(str, '\n')) != NULL)
480     {
481       ++lines;
482       ++str;
483     }
484   }
485
486   return lines;
487 } /* }}} static int count_lines */
488
489 /* send the response back to the user.
490  * returns 0 on success, -1 on error
491  * write buffer is always zeroed after this call */
492 static int send_response (listen_socket_t *sock, response_code rc,
493                           char *fmt, ...) /* {{{ */
494 {
495   va_list argp;
496   char buffer[CMD_MAX];
497   int lines;
498   ssize_t wrote;
499   int rclen, len;
500
501   if (sock == NULL) return rc;  /* journal replay mode */
502
503   if (sock->batch_start)
504   {
505     if (rc == RESP_OK)
506       return rc; /* no response on success during BATCH */
507     lines = sock->batch_cmd;
508   }
509   else if (rc == RESP_OK)
510     lines = count_lines(sock->wbuf);
511   else
512     lines = -1;
513
514   rclen = sprintf(buffer, "%d ", lines);
515   va_start(argp, fmt);
516 #ifdef HAVE_VSNPRINTF
517   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
518 #else
519   len = vsprintf(buffer+rclen, fmt, argp);
520 #endif
521   va_end(argp);
522   if (len < 0)
523     return -1;
524
525   len += rclen;
526
527   /* append the result to the wbuf, don't write to the user */
528   if (sock->batch_start)
529     return add_to_wbuf(sock, buffer, len);
530
531   /* first write must be complete */
532   if (len != write(sock->fd, buffer, len))
533   {
534     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
535     return -1;
536   }
537
538   if (sock->wbuf != NULL && rc == RESP_OK)
539   {
540     wrote = 0;
541     while (wrote < sock->wbuf_len)
542     {
543       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
544       if (wb <= 0)
545       {
546         RRDD_LOG(LOG_INFO, "send_response: could not write results");
547         return -1;
548       }
549       wrote += wb;
550     }
551   }
552
553   free(sock->wbuf); sock->wbuf = NULL;
554   sock->wbuf_len = 0;
555
556   return 0;
557 } /* }}} */
558
559 static void wipe_ci_values(cache_item_t *ci, time_t when)
560 {
561   ci->values = NULL;
562   ci->values_num = 0;
563
564   ci->last_flush_time = when;
565   if (config_write_jitter > 0)
566     ci->last_flush_time += (random() % config_write_jitter);
567 }
568
569 /* remove_from_queue
570  * remove a "cache_item_t" item from the queue.
571  * must hold 'cache_lock' when calling this
572  */
573 static void remove_from_queue(cache_item_t *ci) /* {{{ */
574 {
575   if (ci == NULL) return;
576   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
577
578   if (ci->prev == NULL)
579     cache_queue_head = ci->next; /* reset head */
580   else
581     ci->prev->next = ci->next;
582
583   if (ci->next == NULL)
584     cache_queue_tail = ci->prev; /* reset the tail */
585   else
586     ci->next->prev = ci->prev;
587
588   ci->next = ci->prev = NULL;
589   ci->flags &= ~CI_FLAGS_IN_QUEUE;
590 } /* }}} static void remove_from_queue */
591
592 /* free the resources associated with the cache_item_t
593  * must hold cache_lock when calling this function
594  */
595 static void *free_cache_item(cache_item_t *ci) /* {{{ */
596 {
597   if (ci == NULL) return NULL;
598
599   remove_from_queue(ci);
600
601   for (int i=0; i < ci->values_num; i++)
602     free(ci->values[i]);
603
604   free (ci->values);
605   free (ci->file);
606
607   /* in case anyone is waiting */
608   pthread_cond_broadcast(&ci->flushed);
609
610   free (ci);
611
612   return NULL;
613 } /* }}} static void *free_cache_item */
614
615 /*
616  * enqueue_cache_item:
617  * `cache_lock' must be acquired before calling this function!
618  */
619 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
620     queue_side_t side)
621 {
622   if (ci == NULL)
623     return (-1);
624
625   if (ci->values_num == 0)
626     return (0);
627
628   if (side == HEAD)
629   {
630     if (cache_queue_head == ci)
631       return 0;
632
633     /* remove if further down in queue */
634     remove_from_queue(ci);
635
636     ci->prev = NULL;
637     ci->next = cache_queue_head;
638     if (ci->next != NULL)
639       ci->next->prev = ci;
640     cache_queue_head = ci;
641
642     if (cache_queue_tail == NULL)
643       cache_queue_tail = cache_queue_head;
644   }
645   else /* (side == TAIL) */
646   {
647     /* We don't move values back in the list.. */
648     if (ci->flags & CI_FLAGS_IN_QUEUE)
649       return (0);
650
651     assert (ci->next == NULL);
652     assert (ci->prev == NULL);
653
654     ci->prev = cache_queue_tail;
655
656     if (cache_queue_tail == NULL)
657       cache_queue_head = ci;
658     else
659       cache_queue_tail->next = ci;
660
661     cache_queue_tail = ci;
662   }
663
664   ci->flags |= CI_FLAGS_IN_QUEUE;
665
666   pthread_cond_broadcast(&cache_cond);
667   pthread_mutex_lock (&stats_lock);
668   stats_queue_length++;
669   pthread_mutex_unlock (&stats_lock);
670
671   return (0);
672 } /* }}} int enqueue_cache_item */
673
674 /*
675  * tree_callback_flush:
676  * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
677  * while this is in progress.
678  */
679 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
680     gpointer data)
681 {
682   cache_item_t *ci;
683   callback_flush_data_t *cfd;
684
685   ci = (cache_item_t *) value;
686   cfd = (callback_flush_data_t *) data;
687
688   if (ci->flags & CI_FLAGS_IN_QUEUE)
689     return FALSE;
690
691   if ((ci->last_flush_time <= cfd->abs_timeout)
692       && (ci->values_num > 0))
693   {
694     enqueue_cache_item (ci, TAIL);
695   }
696   else if ((do_shutdown != 0)
697       && (ci->values_num > 0))
698   {
699     enqueue_cache_item (ci, TAIL);
700   }
701   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
702       && (ci->values_num <= 0))
703   {
704     char **temp;
705
706     temp = (char **) rrd_realloc (cfd->keys,
707         sizeof (char *) * (cfd->keys_num + 1));
708     if (temp == NULL)
709     {
710       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
711       return (FALSE);
712     }
713     cfd->keys = temp;
714     /* Make really sure this points to the _same_ place */
715     assert ((char *) key == ci->file);
716     cfd->keys[cfd->keys_num] = (char *) key;
717     cfd->keys_num++;
718   }
719
720   return (FALSE);
721 } /* }}} gboolean tree_callback_flush */
722
723 static int flush_old_values (int max_age)
724 {
725   callback_flush_data_t cfd;
726   size_t k;
727
728   memset (&cfd, 0, sizeof (cfd));
729   /* Pass the current time as user data so that we don't need to call
730    * `time' for each node. */
731   cfd.now = time (NULL);
732   cfd.keys = NULL;
733   cfd.keys_num = 0;
734
735   if (max_age > 0)
736     cfd.abs_timeout = cfd.now - max_age;
737   else
738     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
739
740   /* `tree_callback_flush' will return the keys of all values that haven't
741    * been touched in the last `config_flush_interval' seconds in `cfd'.
742    * The char*'s in this array point to the same memory as ci->file, so we
743    * don't need to free them separately. */
744   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
745
746   for (k = 0; k < cfd.keys_num; k++)
747   {
748     /* should never fail, since we have held the cache_lock
749      * the entire time */
750     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
751   }
752
753   if (cfd.keys != NULL)
754   {
755     free (cfd.keys);
756     cfd.keys = NULL;
757   }
758
759   return (0);
760 } /* int flush_old_values */
761
762 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
763 {
764   struct timeval now;
765   struct timespec next_flush;
766   int final_flush = 0; /* make sure we only flush once on shutdown */
767
768   gettimeofday (&now, NULL);
769   next_flush.tv_sec = now.tv_sec + config_flush_interval;
770   next_flush.tv_nsec = 1000 * now.tv_usec;
771
772   pthread_mutex_lock (&cache_lock);
773   while ((do_shutdown == 0) || (cache_queue_head != NULL))
774   {
775     cache_item_t *ci;
776     char *file;
777     char **values;
778     int values_num;
779     int status;
780     int i;
781
782     /* First, check if it's time to do the cache flush. */
783     gettimeofday (&now, NULL);
784     if ((now.tv_sec > next_flush.tv_sec)
785         || ((now.tv_sec == next_flush.tv_sec)
786           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
787     {
788       /* Flush all values that haven't been written in the last
789        * `config_write_interval' seconds. */
790       flush_old_values (config_write_interval);
791
792       /* Determine the time of the next cache flush. */
793       next_flush.tv_sec =
794         now.tv_sec + next_flush.tv_sec % config_flush_interval;
795
796       /* unlock the cache while we rotate so we don't block incoming
797        * updates if the fsync() blocks on disk I/O */
798       pthread_mutex_unlock(&cache_lock);
799       journal_rotate();
800       pthread_mutex_lock(&cache_lock);
801     }
802
803     /* Now, check if there's something to store away. If not, wait until
804      * something comes in or it's time to do the cache flush.  if we are
805      * shutting down, do not wait around.  */
806     if (cache_queue_head == NULL && !do_shutdown)
807     {
808       status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
809       if ((status != 0) && (status != ETIMEDOUT))
810       {
811         RRDD_LOG (LOG_ERR, "queue_thread_main: "
812             "pthread_cond_timedwait returned %i.", status);
813       }
814     }
815
816     /* We're about to shut down */
817     if (do_shutdown != 0 && !final_flush++)
818     {
819       if (config_flush_at_shutdown)
820         flush_old_values (-1); /* flush everything */
821       else
822         break;
823     }
824
825     /* Check if a value has arrived. This may be NULL if we timed out or there
826      * was an interrupt such as a signal. */
827     if (cache_queue_head == NULL)
828       continue;
829
830     ci = cache_queue_head;
831
832     /* copy the relevant parts */
833     file = strdup (ci->file);
834     if (file == NULL)
835     {
836       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
837       continue;
838     }
839
840     assert(ci->values != NULL);
841     assert(ci->values_num > 0);
842
843     values = ci->values;
844     values_num = ci->values_num;
845
846     wipe_ci_values(ci, time(NULL));
847     remove_from_queue(ci);
848
849     pthread_mutex_lock (&stats_lock);
850     assert (stats_queue_length > 0);
851     stats_queue_length--;
852     pthread_mutex_unlock (&stats_lock);
853
854     pthread_mutex_unlock (&cache_lock);
855
856     rrd_clear_error ();
857     status = rrd_update_r (file, NULL, values_num, (void *) values);
858     if (status != 0)
859     {
860       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
861           "rrd_update_r (%s) failed with status %i. (%s)",
862           file, status, rrd_get_error());
863     }
864
865     journal_write("wrote", file);
866     pthread_cond_broadcast(&ci->flushed);
867
868     for (i = 0; i < values_num; i++)
869       free (values[i]);
870
871     free(values);
872     free(file);
873
874     if (status == 0)
875     {
876       pthread_mutex_lock (&stats_lock);
877       stats_updates_written++;
878       stats_data_sets_written += values_num;
879       pthread_mutex_unlock (&stats_lock);
880     }
881
882     pthread_mutex_lock (&cache_lock);
883
884     /* We're about to shut down */
885     if (do_shutdown != 0 && !final_flush++)
886     {
887       if (config_flush_at_shutdown)
888           flush_old_values (-1); /* flush everything */
889       else
890         break;
891     }
892   } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
893   pthread_mutex_unlock (&cache_lock);
894
895   if (config_flush_at_shutdown)
896   {
897     assert(cache_queue_head == NULL);
898     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
899   }
900
901   journal_done();
902
903   return (NULL);
904 } /* }}} void *queue_thread_main */
905
906 static int buffer_get_field (char **buffer_ret, /* {{{ */
907     size_t *buffer_size_ret, char **field_ret)
908 {
909   char *buffer;
910   size_t buffer_pos;
911   size_t buffer_size;
912   char *field;
913   size_t field_size;
914   int status;
915
916   buffer = *buffer_ret;
917   buffer_pos = 0;
918   buffer_size = *buffer_size_ret;
919   field = *buffer_ret;
920   field_size = 0;
921
922   if (buffer_size <= 0)
923     return (-1);
924
925   /* This is ensured by `handle_request'. */
926   assert (buffer[buffer_size - 1] == '\0');
927
928   status = -1;
929   while (buffer_pos < buffer_size)
930   {
931     /* Check for end-of-field or end-of-buffer */
932     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
933     {
934       field[field_size] = 0;
935       field_size++;
936       buffer_pos++;
937       status = 0;
938       break;
939     }
940     /* Handle escaped characters. */
941     else if (buffer[buffer_pos] == '\\')
942     {
943       if (buffer_pos >= (buffer_size - 1))
944         break;
945       buffer_pos++;
946       field[field_size] = buffer[buffer_pos];
947       field_size++;
948       buffer_pos++;
949     }
950     /* Normal operation */ 
951     else
952     {
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957   } /* while (buffer_pos < buffer_size) */
958
959   if (status != 0)
960     return (status);
961
962   *buffer_ret = buffer + buffer_pos;
963   *buffer_size_ret = buffer_size - buffer_pos;
964   *field_ret = field;
965
966   return (0);
967 } /* }}} int buffer_get_field */
968
969 /* if we're restricting writes to the base directory,
970  * check whether the file falls within the dir
971  * returns 1 if OK, otherwise 0
972  */
973 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
974 {
975   assert(file != NULL);
976
977   if (!config_write_base_only
978       || sock == NULL /* journal replay */
979       || config_base_dir == NULL)
980     return 1;
981
982   if (strstr(file, "../") != NULL) goto err;
983
984   /* relative paths without "../" are ok */
985   if (*file != '/') return 1;
986
987   /* file must be of the format base + "/" + <1+ char filename> */
988   if (strlen(file) < _config_base_dir_len + 2) goto err;
989   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
990   if (*(file + _config_base_dir_len) != '/') goto err;
991
992   return 1;
993
994 err:
995   if (sock != NULL && sock->fd >= 0)
996     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
997
998   return 0;
999 } /* }}} static int check_file_access */
1000
1001 /* when using a base dir, convert relative paths to absolute paths.
1002  * if necessary, modifies the "filename" pointer to point
1003  * to the new path created in "tmp".  "tmp" is provided
1004  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1005  *
1006  * this allows us to optimize for the expected case (absolute path)
1007  * with a no-op.
1008  */
1009 static void get_abs_path(char **filename, char *tmp)
1010 {
1011   assert(tmp != NULL);
1012   assert(filename != NULL && *filename != NULL);
1013
1014   if (config_base_dir == NULL || **filename == '/')
1015     return;
1016
1017   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1018   *filename = tmp;
1019 } /* }}} static int get_abs_path */
1020
1021 /* returns 1 if we have the required privilege level,
1022  * otherwise issue an error to the user on sock */
1023 static int has_privilege (listen_socket_t *sock, /* {{{ */
1024                           socket_privilege priv)
1025 {
1026   if (sock == NULL) /* journal replay */
1027     return 1;
1028
1029   if (sock->privilege >= priv)
1030     return 1;
1031
1032   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1033 } /* }}} static int has_privilege */
1034
1035 static int flush_file (const char *filename) /* {{{ */
1036 {
1037   cache_item_t *ci;
1038
1039   pthread_mutex_lock (&cache_lock);
1040
1041   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1042   if (ci == NULL)
1043   {
1044     pthread_mutex_unlock (&cache_lock);
1045     return (ENOENT);
1046   }
1047
1048   if (ci->values_num > 0)
1049   {
1050     /* Enqueue at head */
1051     enqueue_cache_item (ci, HEAD);
1052     pthread_cond_wait(&ci->flushed, &cache_lock);
1053   }
1054
1055   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1056    * may have been purged during our cond_wait() */
1057
1058   pthread_mutex_unlock(&cache_lock);
1059
1060   return (0);
1061 } /* }}} int flush_file */
1062
1063 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1064     char *buffer, size_t buffer_size)
1065 {
1066   int status;
1067   char **help_text;
1068   char *command;
1069
1070   char *help_help[2] =
1071   {
1072     "Command overview\n"
1073     ,
1074     "HELP [<command>]\n"
1075     "FLUSH <filename>\n"
1076     "FLUSHALL\n"
1077     "PENDING <filename>\n"
1078     "FORGET <filename>\n"
1079     "UPDATE <filename> <values> [<values> ...]\n"
1080     "BATCH\n"
1081     "STATS\n"
1082     "QUIT\n"
1083   };
1084
1085   char *help_flush[2] =
1086   {
1087     "Help for FLUSH\n"
1088     ,
1089     "Usage: FLUSH <filename>\n"
1090     "\n"
1091     "Adds the given filename to the head of the update queue and returns\n"
1092     "after is has been dequeued.\n"
1093   };
1094
1095   char *help_flushall[2] =
1096   {
1097     "Help for FLUSHALL\n"
1098     ,
1099     "Usage: FLUSHALL\n"
1100     "\n"
1101     "Triggers writing of all pending updates.  Returns immediately.\n"
1102   };
1103
1104   char *help_pending[2] =
1105   {
1106     "Help for PENDING\n"
1107     ,
1108     "Usage: PENDING <filename>\n"
1109     "\n"
1110     "Shows any 'pending' updates for a file, in order.\n"
1111     "The updates shown have not yet been written to the underlying RRD file.\n"
1112   };
1113
1114   char *help_forget[2] =
1115   {
1116     "Help for FORGET\n"
1117     ,
1118     "Usage: FORGET <filename>\n"
1119     "\n"
1120     "Removes the file completely from the cache.\n"
1121     "Any pending updates for the file will be lost.\n"
1122   };
1123
1124   char *help_update[2] =
1125   {
1126     "Help for UPDATE\n"
1127     ,
1128     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1129     "\n"
1130     "Adds the given file to the internal cache if it is not yet known and\n"
1131     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1132     "for details.\n"
1133     "\n"
1134     "Each <values> has the following form:\n"
1135     "  <values> = <time>:<value>[:<value>[...]]\n"
1136     "See the rrdupdate(1) manpage for details.\n"
1137   };
1138
1139   char *help_stats[2] =
1140   {
1141     "Help for STATS\n"
1142     ,
1143     "Usage: STATS\n"
1144     "\n"
1145     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1146     "a description of the values.\n"
1147   };
1148
1149   char *help_batch[2] =
1150   {
1151     "Help for BATCH\n"
1152     ,
1153     "The 'BATCH' command permits the client to initiate a bulk load\n"
1154     "   of commands to rrdcached.\n"
1155     "\n"
1156     "Usage:\n"
1157     "\n"
1158     "    client: BATCH\n"
1159     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1160     "    client: command #1\n"
1161     "    client: command #2\n"
1162     "    client: ... and so on\n"
1163     "    client: .\n"
1164     "    server: 2 errors\n"
1165     "    server: 7 message for command #7\n"
1166     "    server: 9 message for command #9\n"
1167     "\n"
1168     "For more information, consult the rrdcached(1) documentation.\n"
1169   };
1170
1171   char *help_quit[2] =
1172   {
1173     "Help for QUIT\n"
1174     ,
1175     "Disconnect from rrdcached.\n"
1176   };
1177
1178   status = buffer_get_field (&buffer, &buffer_size, &command);
1179   if (status != 0)
1180     help_text = help_help;
1181   else
1182   {
1183     if (strcasecmp (command, "update") == 0)
1184       help_text = help_update;
1185     else if (strcasecmp (command, "flush") == 0)
1186       help_text = help_flush;
1187     else if (strcasecmp (command, "flushall") == 0)
1188       help_text = help_flushall;
1189     else if (strcasecmp (command, "pending") == 0)
1190       help_text = help_pending;
1191     else if (strcasecmp (command, "forget") == 0)
1192       help_text = help_forget;
1193     else if (strcasecmp (command, "stats") == 0)
1194       help_text = help_stats;
1195     else if (strcasecmp (command, "batch") == 0)
1196       help_text = help_batch;
1197     else if (strcasecmp (command, "quit") == 0)
1198       help_text = help_quit;
1199     else
1200       help_text = help_help;
1201   }
1202
1203   add_response_info(sock, help_text[1]);
1204   return send_response(sock, RESP_OK, help_text[0]);
1205 } /* }}} int handle_request_help */
1206
1207 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1208 {
1209   uint64_t copy_queue_length;
1210   uint64_t copy_updates_received;
1211   uint64_t copy_flush_received;
1212   uint64_t copy_updates_written;
1213   uint64_t copy_data_sets_written;
1214   uint64_t copy_journal_bytes;
1215   uint64_t copy_journal_rotate;
1216
1217   uint64_t tree_nodes_number;
1218   uint64_t tree_depth;
1219
1220   pthread_mutex_lock (&stats_lock);
1221   copy_queue_length       = stats_queue_length;
1222   copy_updates_received   = stats_updates_received;
1223   copy_flush_received     = stats_flush_received;
1224   copy_updates_written    = stats_updates_written;
1225   copy_data_sets_written  = stats_data_sets_written;
1226   copy_journal_bytes      = stats_journal_bytes;
1227   copy_journal_rotate     = stats_journal_rotate;
1228   pthread_mutex_unlock (&stats_lock);
1229
1230   pthread_mutex_lock (&cache_lock);
1231   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1232   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1233   pthread_mutex_unlock (&cache_lock);
1234
1235   add_response_info(sock,
1236                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1237   add_response_info(sock,
1238                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1239   add_response_info(sock,
1240                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1241   add_response_info(sock,
1242                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1243   add_response_info(sock,
1244                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1245   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1246   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1247   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1248   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1249
1250   send_response(sock, RESP_OK, "Statistics follow\n");
1251
1252   return (0);
1253 } /* }}} int handle_request_stats */
1254
1255 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1256     char *buffer, size_t buffer_size)
1257 {
1258   char *file, file_tmp[PATH_MAX];
1259   int status;
1260
1261   status = buffer_get_field (&buffer, &buffer_size, &file);
1262   if (status != 0)
1263   {
1264     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1265   }
1266   else
1267   {
1268     pthread_mutex_lock(&stats_lock);
1269     stats_flush_received++;
1270     pthread_mutex_unlock(&stats_lock);
1271
1272     get_abs_path(&file, file_tmp);
1273     if (!check_file_access(file, sock)) return 0;
1274
1275     status = flush_file (file);
1276     if (status == 0)
1277       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1278     else if (status == ENOENT)
1279     {
1280       /* no file in our tree; see whether it exists at all */
1281       struct stat statbuf;
1282
1283       memset(&statbuf, 0, sizeof(statbuf));
1284       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1285         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1286       else
1287         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1288     }
1289     else if (status < 0)
1290       return send_response(sock, RESP_ERR, "Internal error.\n");
1291     else
1292       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1293   }
1294
1295   /* NOTREACHED */
1296   assert(1==0);
1297 } /* }}} int handle_request_flush */
1298
1299 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1300 {
1301   int status;
1302
1303   status = has_privilege(sock, PRIV_HIGH);
1304   if (status <= 0)
1305     return status;
1306
1307   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1308
1309   pthread_mutex_lock(&cache_lock);
1310   flush_old_values(-1);
1311   pthread_mutex_unlock(&cache_lock);
1312
1313   return send_response(sock, RESP_OK, "Started flush.\n");
1314 } /* }}} static int handle_request_flushall */
1315
1316 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1317                                   char *buffer, size_t buffer_size)
1318 {
1319   int status;
1320   char *file, file_tmp[PATH_MAX];
1321   cache_item_t *ci;
1322
1323   status = buffer_get_field(&buffer, &buffer_size, &file);
1324   if (status != 0)
1325     return send_response(sock, RESP_ERR,
1326                          "Usage: PENDING <filename>\n");
1327
1328   status = has_privilege(sock, PRIV_HIGH);
1329   if (status <= 0)
1330     return status;
1331
1332   get_abs_path(&file, file_tmp);
1333
1334   pthread_mutex_lock(&cache_lock);
1335   ci = g_tree_lookup(cache_tree, file);
1336   if (ci == NULL)
1337   {
1338     pthread_mutex_unlock(&cache_lock);
1339     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1340   }
1341
1342   for (int i=0; i < ci->values_num; i++)
1343     add_response_info(sock, "%s\n", ci->values[i]);
1344
1345   pthread_mutex_unlock(&cache_lock);
1346   return send_response(sock, RESP_OK, "updates pending\n");
1347 } /* }}} static int handle_request_pending */
1348
1349 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1350                                  char *buffer, size_t buffer_size)
1351 {
1352   int status;
1353   gboolean found;
1354   char *file, file_tmp[PATH_MAX];
1355
1356   status = buffer_get_field(&buffer, &buffer_size, &file);
1357   if (status != 0)
1358     return send_response(sock, RESP_ERR,
1359                          "Usage: FORGET <filename>\n");
1360
1361   status = has_privilege(sock, PRIV_HIGH);
1362   if (status <= 0)
1363     return status;
1364
1365   get_abs_path(&file, file_tmp);
1366   if (!check_file_access(file, sock)) return 0;
1367
1368   pthread_mutex_lock(&cache_lock);
1369   found = g_tree_remove(cache_tree, file);
1370   pthread_mutex_unlock(&cache_lock);
1371
1372   if (found == TRUE)
1373   {
1374     if (sock != NULL)
1375       journal_write("forget", file);
1376
1377     return send_response(sock, RESP_OK, "Gone!\n");
1378   }
1379   else
1380     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1381
1382   /* NOTREACHED */
1383   assert(1==0);
1384 } /* }}} static int handle_request_forget */
1385
1386 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1387                                   time_t now,
1388                                   char *buffer, size_t buffer_size)
1389 {
1390   char *file, file_tmp[PATH_MAX];
1391   int values_num = 0;
1392   int status;
1393   char orig_buf[CMD_MAX];
1394
1395   cache_item_t *ci;
1396
1397   status = has_privilege(sock, PRIV_HIGH);
1398   if (status <= 0)
1399     return status;
1400
1401   /* save it for the journal later */
1402   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1403
1404   status = buffer_get_field (&buffer, &buffer_size, &file);
1405   if (status != 0)
1406     return send_response(sock, RESP_ERR,
1407                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1408
1409   pthread_mutex_lock(&stats_lock);
1410   stats_updates_received++;
1411   pthread_mutex_unlock(&stats_lock);
1412
1413   get_abs_path(&file, file_tmp);
1414   if (!check_file_access(file, sock)) return 0;
1415
1416   pthread_mutex_lock (&cache_lock);
1417   ci = g_tree_lookup (cache_tree, file);
1418
1419   if (ci == NULL) /* {{{ */
1420   {
1421     struct stat statbuf;
1422
1423     /* don't hold the lock while we setup; stat(2) might block */
1424     pthread_mutex_unlock(&cache_lock);
1425
1426     memset (&statbuf, 0, sizeof (statbuf));
1427     status = stat (file, &statbuf);
1428     if (status != 0)
1429     {
1430       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1431
1432       status = errno;
1433       if (status == ENOENT)
1434         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1435       else
1436         return send_response(sock, RESP_ERR,
1437                              "stat failed with error %i.\n", status);
1438     }
1439     if (!S_ISREG (statbuf.st_mode))
1440       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1441
1442     if (access(file, R_OK|W_OK) != 0)
1443       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1444                            file, rrd_strerror(errno));
1445
1446     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1447     if (ci == NULL)
1448     {
1449       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1450
1451       return send_response(sock, RESP_ERR, "malloc failed.\n");
1452     }
1453     memset (ci, 0, sizeof (cache_item_t));
1454
1455     ci->file = strdup (file);
1456     if (ci->file == NULL)
1457     {
1458       free (ci);
1459       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1460
1461       return send_response(sock, RESP_ERR, "strdup failed.\n");
1462     }
1463
1464     wipe_ci_values(ci, now);
1465     ci->flags = CI_FLAGS_IN_TREE;
1466     pthread_cond_init(&ci->flushed, NULL);
1467
1468     pthread_mutex_lock(&cache_lock);
1469     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1470   } /* }}} */
1471   assert (ci != NULL);
1472
1473   /* don't re-write updates in replay mode */
1474   if (sock != NULL)
1475     journal_write("update", orig_buf);
1476
1477   while (buffer_size > 0)
1478   {
1479     char **temp;
1480     char *value;
1481     time_t stamp;
1482     char *eostamp;
1483
1484     status = buffer_get_field (&buffer, &buffer_size, &value);
1485     if (status != 0)
1486     {
1487       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1488       break;
1489     }
1490
1491     /* make sure update time is always moving forward */
1492     stamp = strtol(value, &eostamp, 10);
1493     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1494     {
1495       pthread_mutex_unlock(&cache_lock);
1496       return send_response(sock, RESP_ERR,
1497                            "Cannot find timestamp in '%s'!\n", value);
1498     }
1499     else if (stamp <= ci->last_update_stamp)
1500     {
1501       pthread_mutex_unlock(&cache_lock);
1502       return send_response(sock, RESP_ERR,
1503                            "illegal attempt to update using time %ld when last"
1504                            " update time is %ld (minimum one second step)\n",
1505                            stamp, ci->last_update_stamp);
1506     }
1507     else
1508       ci->last_update_stamp = stamp;
1509
1510     temp = (char **) rrd_realloc (ci->values,
1511         sizeof (char *) * (ci->values_num + 1));
1512     if (temp == NULL)
1513     {
1514       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1515       continue;
1516     }
1517     ci->values = temp;
1518
1519     ci->values[ci->values_num] = strdup (value);
1520     if (ci->values[ci->values_num] == NULL)
1521     {
1522       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1523       continue;
1524     }
1525     ci->values_num++;
1526
1527     values_num++;
1528   }
1529
1530   if (((now - ci->last_flush_time) >= config_write_interval)
1531       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1532       && (ci->values_num > 0))
1533   {
1534     enqueue_cache_item (ci, TAIL);
1535   }
1536
1537   pthread_mutex_unlock (&cache_lock);
1538
1539   if (values_num < 1)
1540     return send_response(sock, RESP_ERR, "No values updated.\n");
1541   else
1542     return send_response(sock, RESP_OK,
1543                          "errors, enqueued %i value(s).\n", values_num);
1544
1545   /* NOTREACHED */
1546   assert(1==0);
1547
1548 } /* }}} int handle_request_update */
1549
1550 /* we came across a "WROTE" entry during journal replay.
1551  * throw away any values that we have accumulated for this file
1552  */
1553 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1554 {
1555   int i;
1556   cache_item_t *ci;
1557   const char *file = buffer;
1558
1559   pthread_mutex_lock(&cache_lock);
1560
1561   ci = g_tree_lookup(cache_tree, file);
1562   if (ci == NULL)
1563   {
1564     pthread_mutex_unlock(&cache_lock);
1565     return (0);
1566   }
1567
1568   if (ci->values)
1569   {
1570     for (i=0; i < ci->values_num; i++)
1571       free(ci->values[i]);
1572
1573     free(ci->values);
1574   }
1575
1576   wipe_ci_values(ci, now);
1577   remove_from_queue(ci);
1578
1579   pthread_mutex_unlock(&cache_lock);
1580   return (0);
1581 } /* }}} int handle_request_wrote */
1582
1583 /* start "BATCH" processing */
1584 static int batch_start (listen_socket_t *sock) /* {{{ */
1585 {
1586   int status;
1587   if (sock->batch_start)
1588     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1589
1590   status = send_response(sock, RESP_OK,
1591                          "Go ahead.  End with dot '.' on its own line.\n");
1592   sock->batch_start = time(NULL);
1593   sock->batch_cmd = 0;
1594
1595   return status;
1596 } /* }}} static int batch_start */
1597
1598 /* finish "BATCH" processing and return results to the client */
1599 static int batch_done (listen_socket_t *sock) /* {{{ */
1600 {
1601   assert(sock->batch_start);
1602   sock->batch_start = 0;
1603   sock->batch_cmd  = 0;
1604   return send_response(sock, RESP_OK, "errors\n");
1605 } /* }}} static int batch_done */
1606
1607 /* if sock==NULL, we are in journal replay mode */
1608 static int handle_request (listen_socket_t *sock, /* {{{ */
1609                            time_t now,
1610                            char *buffer, size_t buffer_size)
1611 {
1612   char *buffer_ptr;
1613   char *command;
1614   int status;
1615
1616   assert (buffer[buffer_size - 1] == '\0');
1617
1618   buffer_ptr = buffer;
1619   command = NULL;
1620   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1621   if (status != 0)
1622   {
1623     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1624     return (-1);
1625   }
1626
1627   if (sock != NULL && sock->batch_start)
1628     sock->batch_cmd++;
1629
1630   if (strcasecmp (command, "update") == 0)
1631     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1632   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1633   {
1634     /* this is only valid in replay mode */
1635     return (handle_request_wrote (buffer_ptr, now));
1636   }
1637   else if (strcasecmp (command, "flush") == 0)
1638     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1639   else if (strcasecmp (command, "flushall") == 0)
1640     return (handle_request_flushall(sock));
1641   else if (strcasecmp (command, "pending") == 0)
1642     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1643   else if (strcasecmp (command, "forget") == 0)
1644     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1645   else if (strcasecmp (command, "stats") == 0)
1646     return (handle_request_stats (sock));
1647   else if (strcasecmp (command, "help") == 0)
1648     return (handle_request_help (sock, buffer_ptr, buffer_size));
1649   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1650     return batch_start(sock);
1651   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1652     return batch_done(sock);
1653   else if (strcasecmp (command, "quit") == 0)
1654     return -1;
1655   else
1656     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1657
1658   /* NOTREACHED */
1659   assert(1==0);
1660 } /* }}} int handle_request */
1661
1662 /* MUST NOT hold journal_lock before calling this */
1663 static void journal_rotate(void) /* {{{ */
1664 {
1665   FILE *old_fh = NULL;
1666   int new_fd;
1667
1668   if (journal_cur == NULL || journal_old == NULL)
1669     return;
1670
1671   pthread_mutex_lock(&journal_lock);
1672
1673   /* we rotate this way (rename before close) so that the we can release
1674    * the journal lock as fast as possible.  Journal writes to the new
1675    * journal can proceed immediately after the new file is opened.  The
1676    * fclose can then block without affecting new updates.
1677    */
1678   if (journal_fh != NULL)
1679   {
1680     old_fh = journal_fh;
1681     journal_fh = NULL;
1682     rename(journal_cur, journal_old);
1683     ++stats_journal_rotate;
1684   }
1685
1686   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1687                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1688   if (new_fd >= 0)
1689   {
1690     journal_fh = fdopen(new_fd, "a");
1691     if (journal_fh == NULL)
1692       close(new_fd);
1693   }
1694
1695   pthread_mutex_unlock(&journal_lock);
1696
1697   if (old_fh != NULL)
1698     fclose(old_fh);
1699
1700   if (journal_fh == NULL)
1701   {
1702     RRDD_LOG(LOG_CRIT,
1703              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1704              journal_cur, rrd_strerror(errno));
1705
1706     RRDD_LOG(LOG_ERR,
1707              "JOURNALING DISABLED: All values will be flushed at shutdown");
1708     config_flush_at_shutdown = 1;
1709   }
1710
1711 } /* }}} static void journal_rotate */
1712
1713 static void journal_done(void) /* {{{ */
1714 {
1715   if (journal_cur == NULL)
1716     return;
1717
1718   pthread_mutex_lock(&journal_lock);
1719   if (journal_fh != NULL)
1720   {
1721     fclose(journal_fh);
1722     journal_fh = NULL;
1723   }
1724
1725   if (config_flush_at_shutdown)
1726   {
1727     RRDD_LOG(LOG_INFO, "removing journals");
1728     unlink(journal_old);
1729     unlink(journal_cur);
1730   }
1731   else
1732   {
1733     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1734              "journals will be used at next startup");
1735   }
1736
1737   pthread_mutex_unlock(&journal_lock);
1738
1739 } /* }}} static void journal_done */
1740
1741 static int journal_write(char *cmd, char *args) /* {{{ */
1742 {
1743   int chars;
1744
1745   if (journal_fh == NULL)
1746     return 0;
1747
1748   pthread_mutex_lock(&journal_lock);
1749   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1750   pthread_mutex_unlock(&journal_lock);
1751
1752   if (chars > 0)
1753   {
1754     pthread_mutex_lock(&stats_lock);
1755     stats_journal_bytes += chars;
1756     pthread_mutex_unlock(&stats_lock);
1757   }
1758
1759   return chars;
1760 } /* }}} static int journal_write */
1761
1762 static int journal_replay (const char *file) /* {{{ */
1763 {
1764   FILE *fh;
1765   int entry_cnt = 0;
1766   int fail_cnt = 0;
1767   uint64_t line = 0;
1768   char entry[CMD_MAX];
1769   time_t now;
1770
1771   if (file == NULL) return 0;
1772
1773   {
1774     char *reason = "unknown error";
1775     int status = 0;
1776     struct stat statbuf;
1777
1778     memset(&statbuf, 0, sizeof(statbuf));
1779     if (stat(file, &statbuf) != 0)
1780     {
1781       if (errno == ENOENT)
1782         return 0;
1783
1784       reason = "stat error";
1785       status = errno;
1786     }
1787     else if (!S_ISREG(statbuf.st_mode))
1788     {
1789       reason = "not a regular file";
1790       status = EPERM;
1791     }
1792     if (statbuf.st_uid != daemon_uid)
1793     {
1794       reason = "not owned by daemon user";
1795       status = EACCES;
1796     }
1797     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1798     {
1799       reason = "must not be user/group writable";
1800       status = EACCES;
1801     }
1802
1803     if (status != 0)
1804     {
1805       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1806                file, rrd_strerror(status), reason);
1807       return 0;
1808     }
1809   }
1810
1811   fh = fopen(file, "r");
1812   if (fh == NULL)
1813   {
1814     if (errno != ENOENT)
1815       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1816                file, rrd_strerror(errno));
1817     return 0;
1818   }
1819   else
1820     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1821
1822   now = time(NULL);
1823
1824   while(!feof(fh))
1825   {
1826     size_t entry_len;
1827
1828     ++line;
1829     if (fgets(entry, sizeof(entry), fh) == NULL)
1830       break;
1831     entry_len = strlen(entry);
1832
1833     /* check \n termination in case journal writing crashed mid-line */
1834     if (entry_len == 0)
1835       continue;
1836     else if (entry[entry_len - 1] != '\n')
1837     {
1838       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1839       ++fail_cnt;
1840       continue;
1841     }
1842
1843     entry[entry_len - 1] = '\0';
1844
1845     if (handle_request(NULL, now, entry, entry_len) == 0)
1846       ++entry_cnt;
1847     else
1848       ++fail_cnt;
1849   }
1850
1851   fclose(fh);
1852
1853   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1854            entry_cnt, fail_cnt);
1855
1856   return entry_cnt > 0 ? 1 : 0;
1857 } /* }}} static int journal_replay */
1858
1859 static void journal_init(void) /* {{{ */
1860 {
1861   int had_journal = 0;
1862
1863   if (journal_cur == NULL) return;
1864
1865   pthread_mutex_lock(&journal_lock);
1866
1867   RRDD_LOG(LOG_INFO, "checking for journal files");
1868
1869   had_journal += journal_replay(journal_old);
1870   had_journal += journal_replay(journal_cur);
1871
1872   /* it must have been a crash.  start a flush */
1873   if (had_journal && config_flush_at_shutdown)
1874     flush_old_values(-1);
1875
1876   pthread_mutex_unlock(&journal_lock);
1877   journal_rotate();
1878
1879   RRDD_LOG(LOG_INFO, "journal processing complete");
1880
1881 } /* }}} static void journal_init */
1882
1883 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1884 {
1885   assert(sock != NULL);
1886
1887   free(sock->rbuf);  sock->rbuf = NULL;
1888   free(sock->wbuf);  sock->wbuf = NULL;
1889   free(sock);
1890 } /* }}} void free_listen_socket */
1891
1892 static void close_connection(listen_socket_t *sock) /* {{{ */
1893 {
1894   if (sock->fd >= 0)
1895   {
1896     close(sock->fd);
1897     sock->fd = -1;
1898   }
1899
1900   free_listen_socket(sock);
1901
1902 } /* }}} void close_connection */
1903
1904 static void *connection_thread_main (void *args) /* {{{ */
1905 {
1906   listen_socket_t *sock;
1907   int i;
1908   int fd;
1909
1910   sock = (listen_socket_t *) args;
1911   fd = sock->fd;
1912
1913   /* init read buffers */
1914   sock->next_read = sock->next_cmd = 0;
1915   sock->rbuf = malloc(RBUF_SIZE);
1916   if (sock->rbuf == NULL)
1917   {
1918     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1919     close_connection(sock);
1920     return NULL;
1921   }
1922
1923   pthread_mutex_lock (&connection_threads_lock);
1924   {
1925     pthread_t *temp;
1926
1927     temp = (pthread_t *) rrd_realloc (connection_threads,
1928         sizeof (pthread_t) * (connection_threads_num + 1));
1929     if (temp == NULL)
1930     {
1931       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1932     }
1933     else
1934     {
1935       connection_threads = temp;
1936       connection_threads[connection_threads_num] = pthread_self ();
1937       connection_threads_num++;
1938     }
1939   }
1940   pthread_mutex_unlock (&connection_threads_lock);
1941
1942   while (do_shutdown == 0)
1943   {
1944     char *cmd;
1945     ssize_t cmd_len;
1946     ssize_t rbytes;
1947     time_t now;
1948
1949     struct pollfd pollfd;
1950     int status;
1951
1952     pollfd.fd = fd;
1953     pollfd.events = POLLIN | POLLPRI;
1954     pollfd.revents = 0;
1955
1956     status = poll (&pollfd, 1, /* timeout = */ 500);
1957     if (do_shutdown)
1958       break;
1959     else if (status == 0) /* timeout */
1960       continue;
1961     else if (status < 0) /* error */
1962     {
1963       status = errno;
1964       if (status != EINTR)
1965         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1966       continue;
1967     }
1968
1969     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1970       break;
1971     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1972     {
1973       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1974           "poll(2) returned something unexpected: %#04hx",
1975           pollfd.revents);
1976       break;
1977     }
1978
1979     rbytes = read(fd, sock->rbuf + sock->next_read,
1980                   RBUF_SIZE - sock->next_read);
1981     if (rbytes < 0)
1982     {
1983       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1984       break;
1985     }
1986     else if (rbytes == 0)
1987       break; /* eof */
1988
1989     sock->next_read += rbytes;
1990
1991     if (sock->batch_start)
1992       now = sock->batch_start;
1993     else
1994       now = time(NULL);
1995
1996     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1997     {
1998       status = handle_request (sock, now, cmd, cmd_len+1);
1999       if (status != 0)
2000         goto out_close;
2001     }
2002   }
2003
2004 out_close:
2005   close_connection(sock);
2006
2007   /* Remove this thread from the connection threads list */
2008   pthread_mutex_lock (&connection_threads_lock);
2009   {
2010     pthread_t self;
2011     pthread_t *temp;
2012
2013     /* Find out own index in the array */
2014     self = pthread_self ();
2015     for (i = 0; i < connection_threads_num; i++)
2016       if (pthread_equal (connection_threads[i], self) != 0)
2017         break;
2018     assert (i < connection_threads_num);
2019
2020     /* Move the trailing threads forward. */
2021     if (i < (connection_threads_num - 1))
2022     {
2023       memmove (connection_threads + i,
2024                connection_threads + i + 1,
2025                sizeof (pthread_t) * (connection_threads_num - i - 1));
2026     }
2027
2028     connection_threads_num--;
2029
2030     temp = rrd_realloc(connection_threads,
2031                    sizeof(*connection_threads) * connection_threads_num);
2032     if (connection_threads_num > 0 && temp == NULL)
2033       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2034     else
2035       connection_threads = temp;
2036   }
2037   pthread_mutex_unlock (&connection_threads_lock);
2038
2039   return (NULL);
2040 } /* }}} void *connection_thread_main */
2041
2042 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2043 {
2044   int fd;
2045   struct sockaddr_un sa;
2046   listen_socket_t *temp;
2047   int status;
2048   const char *path;
2049
2050   path = sock->addr;
2051   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2052     path += strlen("unix:");
2053
2054   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2055       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2056   if (temp == NULL)
2057   {
2058     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2059     return (-1);
2060   }
2061   listen_fds = temp;
2062   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2063
2064   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2065   if (fd < 0)
2066   {
2067     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2068              rrd_strerror(errno));
2069     return (-1);
2070   }
2071
2072   memset (&sa, 0, sizeof (sa));
2073   sa.sun_family = AF_UNIX;
2074   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2075
2076   /* if we've gotten this far, we own the pid file.  any daemon started
2077    * with the same args must not be alive.  therefore, ensure that we can
2078    * create the socket...
2079    */
2080   unlink(path);
2081
2082   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2083   if (status != 0)
2084   {
2085     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2086              path, rrd_strerror(errno));
2087     close (fd);
2088     return (-1);
2089   }
2090
2091   status = listen (fd, /* backlog = */ 10);
2092   if (status != 0)
2093   {
2094     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2095              path, rrd_strerror(errno));
2096     close (fd);
2097     unlink (path);
2098     return (-1);
2099   }
2100
2101   listen_fds[listen_fds_num].fd = fd;
2102   listen_fds[listen_fds_num].family = PF_UNIX;
2103   strncpy(listen_fds[listen_fds_num].addr, path,
2104           sizeof (listen_fds[listen_fds_num].addr) - 1);
2105   listen_fds_num++;
2106
2107   return (0);
2108 } /* }}} int open_listen_socket_unix */
2109
2110 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2111 {
2112   struct addrinfo ai_hints;
2113   struct addrinfo *ai_res;
2114   struct addrinfo *ai_ptr;
2115   char addr_copy[NI_MAXHOST];
2116   char *addr;
2117   char *port;
2118   int status;
2119
2120   strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2121   addr_copy[sizeof (addr_copy) - 1] = 0;
2122   addr = addr_copy;
2123
2124   memset (&ai_hints, 0, sizeof (ai_hints));
2125   ai_hints.ai_flags = 0;
2126 #ifdef AI_ADDRCONFIG
2127   ai_hints.ai_flags |= AI_ADDRCONFIG;
2128 #endif
2129   ai_hints.ai_family = AF_UNSPEC;
2130   ai_hints.ai_socktype = SOCK_STREAM;
2131
2132   port = NULL;
2133   if (*addr == '[') /* IPv6+port format */
2134   {
2135     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2136     addr++;
2137
2138     port = strchr (addr, ']');
2139     if (port == NULL)
2140     {
2141       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2142       return (-1);
2143     }
2144     *port = 0;
2145     port++;
2146
2147     if (*port == ':')
2148       port++;
2149     else if (*port == 0)
2150       port = NULL;
2151     else
2152     {
2153       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2154       return (-1);
2155     }
2156   } /* if (*addr = ']') */
2157   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2158   {
2159     port = rindex(addr, ':');
2160     if (port != NULL)
2161     {
2162       *port = 0;
2163       port++;
2164     }
2165   }
2166   ai_res = NULL;
2167   status = getaddrinfo (addr,
2168                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2169                         &ai_hints, &ai_res);
2170   if (status != 0)
2171   {
2172     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2173              addr, gai_strerror (status));
2174     return (-1);
2175   }
2176
2177   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2178   {
2179     int fd;
2180     listen_socket_t *temp;
2181     int one = 1;
2182
2183     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2184         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2185     if (temp == NULL)
2186     {
2187       fprintf (stderr,
2188                "rrdcached: open_listen_socket_network: realloc failed.\n");
2189       continue;
2190     }
2191     listen_fds = temp;
2192     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2193
2194     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2195     if (fd < 0)
2196     {
2197       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2198                rrd_strerror(errno));
2199       continue;
2200     }
2201
2202     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2203
2204     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2205     if (status != 0)
2206     {
2207       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2208                sock->addr, rrd_strerror(errno));
2209       close (fd);
2210       continue;
2211     }
2212
2213     status = listen (fd, /* backlog = */ 10);
2214     if (status != 0)
2215     {
2216       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2217                sock->addr, rrd_strerror(errno));
2218       close (fd);
2219       freeaddrinfo(ai_res);
2220       return (-1);
2221     }
2222
2223     listen_fds[listen_fds_num].fd = fd;
2224     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2225     listen_fds_num++;
2226   } /* for (ai_ptr) */
2227
2228   freeaddrinfo(ai_res);
2229   return (0);
2230 } /* }}} static int open_listen_socket_network */
2231
2232 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2233 {
2234   assert(sock != NULL);
2235   assert(sock->addr != NULL);
2236
2237   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2238       || sock->addr[0] == '/')
2239     return (open_listen_socket_unix(sock));
2240   else
2241     return (open_listen_socket_network(sock));
2242 } /* }}} int open_listen_socket */
2243
2244 static int close_listen_sockets (void) /* {{{ */
2245 {
2246   size_t i;
2247
2248   for (i = 0; i < listen_fds_num; i++)
2249   {
2250     close (listen_fds[i].fd);
2251
2252     if (listen_fds[i].family == PF_UNIX)
2253       unlink(listen_fds[i].addr);
2254   }
2255
2256   free (listen_fds);
2257   listen_fds = NULL;
2258   listen_fds_num = 0;
2259
2260   return (0);
2261 } /* }}} int close_listen_sockets */
2262
2263 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2264 {
2265   struct pollfd *pollfds;
2266   int pollfds_num;
2267   int status;
2268   int i;
2269
2270   if (listen_fds_num < 1)
2271   {
2272     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2273     return (NULL);
2274   }
2275
2276   pollfds_num = listen_fds_num;
2277   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2278   if (pollfds == NULL)
2279   {
2280     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2281     return (NULL);
2282   }
2283   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2284
2285   RRDD_LOG(LOG_INFO, "listening for connections");
2286
2287   while (do_shutdown == 0)
2288   {
2289     for (i = 0; i < pollfds_num; i++)
2290     {
2291       pollfds[i].fd = listen_fds[i].fd;
2292       pollfds[i].events = POLLIN | POLLPRI;
2293       pollfds[i].revents = 0;
2294     }
2295
2296     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2297     if (do_shutdown)
2298       break;
2299     else if (status == 0) /* timeout */
2300       continue;
2301     else if (status < 0) /* error */
2302     {
2303       status = errno;
2304       if (status != EINTR)
2305       {
2306         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2307       }
2308       continue;
2309     }
2310
2311     for (i = 0; i < pollfds_num; i++)
2312     {
2313       listen_socket_t *client_sock;
2314       struct sockaddr_storage client_sa;
2315       socklen_t client_sa_size;
2316       pthread_t tid;
2317       pthread_attr_t attr;
2318
2319       if (pollfds[i].revents == 0)
2320         continue;
2321
2322       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2323       {
2324         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2325             "poll(2) returned something unexpected for listen FD #%i.",
2326             pollfds[i].fd);
2327         continue;
2328       }
2329
2330       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2331       if (client_sock == NULL)
2332       {
2333         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2334         continue;
2335       }
2336       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2337
2338       client_sa_size = sizeof (client_sa);
2339       client_sock->fd = accept (pollfds[i].fd,
2340           (struct sockaddr *) &client_sa, &client_sa_size);
2341       if (client_sock->fd < 0)
2342       {
2343         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2344         free(client_sock);
2345         continue;
2346       }
2347
2348       pthread_attr_init (&attr);
2349       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2350
2351       status = pthread_create (&tid, &attr, connection_thread_main,
2352                                client_sock);
2353       if (status != 0)
2354       {
2355         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2356         close_connection(client_sock);
2357         continue;
2358       }
2359     } /* for (pollfds_num) */
2360   } /* while (do_shutdown == 0) */
2361
2362   RRDD_LOG(LOG_INFO, "starting shutdown");
2363
2364   close_listen_sockets ();
2365
2366   pthread_mutex_lock (&connection_threads_lock);
2367   while (connection_threads_num > 0)
2368   {
2369     pthread_t wait_for;
2370
2371     wait_for = connection_threads[0];
2372
2373     pthread_mutex_unlock (&connection_threads_lock);
2374     pthread_join (wait_for, /* retval = */ NULL);
2375     pthread_mutex_lock (&connection_threads_lock);
2376   }
2377   pthread_mutex_unlock (&connection_threads_lock);
2378
2379   free(pollfds);
2380
2381   return (NULL);
2382 } /* }}} void *listen_thread_main */
2383
2384 static int daemonize (void) /* {{{ */
2385 {
2386   int pid_fd;
2387   char *base_dir;
2388
2389   daemon_uid = geteuid();
2390
2391   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2392   if (pid_fd < 0)
2393     pid_fd = check_pidfile();
2394   if (pid_fd < 0)
2395     return pid_fd;
2396
2397   /* open all the listen sockets */
2398   if (config_listen_address_list_len > 0)
2399   {
2400     for (int i = 0; i < config_listen_address_list_len; i++)
2401     {
2402       open_listen_socket (config_listen_address_list[i]);
2403       free_listen_socket (config_listen_address_list[i]);
2404     }
2405
2406     free(config_listen_address_list);
2407   }
2408   else
2409   {
2410     listen_socket_t sock;
2411     memset(&sock, 0, sizeof(sock));
2412     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2413     open_listen_socket (&sock);
2414   }
2415
2416   if (listen_fds_num < 1)
2417   {
2418     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2419     goto error;
2420   }
2421
2422   if (!stay_foreground)
2423   {
2424     pid_t child;
2425
2426     child = fork ();
2427     if (child < 0)
2428     {
2429       fprintf (stderr, "daemonize: fork(2) failed.\n");
2430       goto error;
2431     }
2432     else if (child > 0)
2433       exit(0);
2434
2435     /* Become session leader */
2436     setsid ();
2437
2438     /* Open the first three file descriptors to /dev/null */
2439     close (2);
2440     close (1);
2441     close (0);
2442
2443     open ("/dev/null", O_RDWR);
2444     dup (0);
2445     dup (0);
2446   } /* if (!stay_foreground) */
2447
2448   /* Change into the /tmp directory. */
2449   base_dir = (config_base_dir != NULL)
2450     ? config_base_dir
2451     : "/tmp";
2452
2453   if (chdir (base_dir) != 0)
2454   {
2455     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2456     goto error;
2457   }
2458
2459   install_signal_handlers();
2460
2461   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2462   RRDD_LOG(LOG_INFO, "starting up");
2463
2464   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2465                                 (GDestroyNotify) free_cache_item);
2466   if (cache_tree == NULL)
2467   {
2468     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2469     goto error;
2470   }
2471
2472   return write_pidfile (pid_fd);
2473
2474 error:
2475   remove_pidfile();
2476   return -1;
2477 } /* }}} int daemonize */
2478
2479 static int cleanup (void) /* {{{ */
2480 {
2481   do_shutdown++;
2482
2483   pthread_cond_signal (&cache_cond);
2484   pthread_join (queue_thread, /* return = */ NULL);
2485
2486   remove_pidfile ();
2487
2488   free(config_base_dir);
2489   free(config_pid_file);
2490   free(journal_cur);
2491   free(journal_old);
2492
2493   pthread_mutex_lock(&cache_lock);
2494   g_tree_destroy(cache_tree);
2495
2496   RRDD_LOG(LOG_INFO, "goodbye");
2497   closelog ();
2498
2499   return (0);
2500 } /* }}} int cleanup */
2501
2502 static int read_options (int argc, char **argv) /* {{{ */
2503 {
2504   int option;
2505   int status = 0;
2506
2507   while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2508   {
2509     switch (option)
2510     {
2511       case 'g':
2512         stay_foreground=1;
2513         break;
2514
2515       case 'L':
2516       case 'l':
2517       {
2518         listen_socket_t **temp;
2519         listen_socket_t *new;
2520
2521         new = malloc(sizeof(listen_socket_t));
2522         if (new == NULL)
2523         {
2524           fprintf(stderr, "read_options: malloc failed.\n");
2525           return(2);
2526         }
2527         memset(new, 0, sizeof(listen_socket_t));
2528
2529         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2530             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2531         if (temp == NULL)
2532         {
2533           fprintf (stderr, "read_options: realloc failed.\n");
2534           return (2);
2535         }
2536         config_listen_address_list = temp;
2537
2538         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2539         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2540
2541         temp[config_listen_address_list_len] = new;
2542         config_listen_address_list_len++;
2543       }
2544       break;
2545
2546       case 'f':
2547       {
2548         int temp;
2549
2550         temp = atoi (optarg);
2551         if (temp > 0)
2552           config_flush_interval = temp;
2553         else
2554         {
2555           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2556           status = 3;
2557         }
2558       }
2559       break;
2560
2561       case 'w':
2562       {
2563         int temp;
2564
2565         temp = atoi (optarg);
2566         if (temp > 0)
2567           config_write_interval = temp;
2568         else
2569         {
2570           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2571           status = 2;
2572         }
2573       }
2574       break;
2575
2576       case 'z':
2577       {
2578         int temp;
2579
2580         temp = atoi(optarg);
2581         if (temp > 0)
2582           config_write_jitter = temp;
2583         else
2584         {
2585           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2586           status = 2;
2587         }
2588
2589         break;
2590       }
2591
2592       case 'B':
2593         config_write_base_only = 1;
2594         break;
2595
2596       case 'b':
2597       {
2598         size_t len;
2599         char base_realpath[PATH_MAX];
2600
2601         if (config_base_dir != NULL)
2602           free (config_base_dir);
2603         config_base_dir = strdup (optarg);
2604         if (config_base_dir == NULL)
2605         {
2606           fprintf (stderr, "read_options: strdup failed.\n");
2607           return (3);
2608         }
2609
2610         /* make sure that the base directory is not resolved via
2611          * symbolic links.  this makes some performance-enhancing
2612          * assumptions possible (we don't have to resolve paths
2613          * that start with a "/")
2614          */
2615         if (realpath(config_base_dir, base_realpath) == NULL)
2616         {
2617           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2618           return 5;
2619         }
2620         else if (strncmp(config_base_dir,
2621                          base_realpath, sizeof(base_realpath)) != 0)
2622         {
2623           fprintf(stderr,
2624                   "Base directory (-b) resolved via file system links!\n"
2625                   "Please consult rrdcached '-b' documentation!\n"
2626                   "Consider specifying the real directory (%s)\n",
2627                   base_realpath);
2628           return 5;
2629         }
2630
2631         len = strlen (config_base_dir);
2632         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2633         {
2634           config_base_dir[len - 1] = 0;
2635           len--;
2636         }
2637
2638         if (len < 1)
2639         {
2640           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2641           return (4);
2642         }
2643
2644         _config_base_dir_len = len;
2645       }
2646       break;
2647
2648       case 'p':
2649       {
2650         if (config_pid_file != NULL)
2651           free (config_pid_file);
2652         config_pid_file = strdup (optarg);
2653         if (config_pid_file == NULL)
2654         {
2655           fprintf (stderr, "read_options: strdup failed.\n");
2656           return (3);
2657         }
2658       }
2659       break;
2660
2661       case 'F':
2662         config_flush_at_shutdown = 1;
2663         break;
2664
2665       case 'j':
2666       {
2667         struct stat statbuf;
2668         const char *dir = optarg;
2669
2670         status = stat(dir, &statbuf);
2671         if (status != 0)
2672         {
2673           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2674           return 6;
2675         }
2676
2677         if (!S_ISDIR(statbuf.st_mode)
2678             || access(dir, R_OK|W_OK|X_OK) != 0)
2679         {
2680           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2681                   errno ? rrd_strerror(errno) : "");
2682           return 6;
2683         }
2684
2685         journal_cur = malloc(PATH_MAX + 1);
2686         journal_old = malloc(PATH_MAX + 1);
2687         if (journal_cur == NULL || journal_old == NULL)
2688         {
2689           fprintf(stderr, "malloc failure for journal files\n");
2690           return 6;
2691         }
2692         else 
2693         {
2694           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2695           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2696         }
2697       }
2698       break;
2699
2700       case 'h':
2701       case '?':
2702         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2703             "\n"
2704             "Usage: rrdcached [options]\n"
2705             "\n"
2706             "Valid options are:\n"
2707             "  -l <address>  Socket address to listen to.\n"
2708             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2709             "  -w <seconds>  Interval in which to write data.\n"
2710             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2711             "  -f <seconds>  Interval in which to flush dead data.\n"
2712             "  -p <file>     Location of the PID-file.\n"
2713             "  -b <dir>      Base directory to change to.\n"
2714             "  -B            Restrict file access to paths within -b <dir>\n"
2715             "  -g            Do not fork and run in the foreground.\n"
2716             "  -j <dir>      Directory in which to create the journal files.\n"
2717             "  -F            Always flush all updates at shutdown\n"
2718             "\n"
2719             "For more information and a detailed description of all options "
2720             "please refer\n"
2721             "to the rrdcached(1) manual page.\n",
2722             VERSION);
2723         status = -1;
2724         break;
2725     } /* switch (option) */
2726   } /* while (getopt) */
2727
2728   /* advise the user when values are not sane */
2729   if (config_flush_interval < 2 * config_write_interval)
2730     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2731             " 2x write interval (-w) !\n");
2732   if (config_write_jitter > config_write_interval)
2733     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2734             " write interval (-w) !\n");
2735
2736   if (config_write_base_only && config_base_dir == NULL)
2737     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2738             "  Consult the rrdcached documentation\n");
2739
2740   if (journal_cur == NULL)
2741     config_flush_at_shutdown = 1;
2742
2743   return (status);
2744 } /* }}} int read_options */
2745
2746 int main (int argc, char **argv)
2747 {
2748   int status;
2749
2750   status = read_options (argc, argv);
2751   if (status != 0)
2752   {
2753     if (status < 0)
2754       status = 0;
2755     return (status);
2756   }
2757
2758   status = daemonize ();
2759   if (status != 0)
2760   {
2761     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2762     return (1);
2763   }
2764
2765   journal_init();
2766
2767   /* start the queue thread */
2768   memset (&queue_thread, 0, sizeof (queue_thread));
2769   status = pthread_create (&queue_thread,
2770                            NULL, /* attr */
2771                            queue_thread_main,
2772                            NULL); /* args */
2773   if (status != 0)
2774   {
2775     RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2776     cleanup();
2777     return (1);
2778   }
2779
2780   listen_thread_main (NULL);
2781   cleanup ();
2782
2783   return (0);
2784 } /* int main */
2785
2786 /*
2787  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2788  */