Renaming ntmake.pl to ntmake.PL (r1742) had unforseen side effects. At least
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct cache_item_s;
148 typedef struct cache_item_s cache_item_t;
149 struct cache_item_s
150 {
151   char *file;
152   char **values;
153   int values_num;
154   time_t last_flush_time;
155   time_t last_update_stamp;
156 #define CI_FLAGS_IN_TREE  (1<<0)
157 #define CI_FLAGS_IN_QUEUE (1<<1)
158   int flags;
159   pthread_cond_t  flushed;
160   cache_item_t *prev;
161   cache_item_t *next;
162 };
163
164 struct callback_flush_data_s
165 {
166   time_t now;
167   time_t abs_timeout;
168   char **keys;
169   size_t keys_num;
170 };
171 typedef struct callback_flush_data_s callback_flush_data_t;
172
173 enum queue_side_e
174 {
175   HEAD,
176   TAIL
177 };
178 typedef enum queue_side_e queue_side_t;
179
180 /* max length of socket command or response */
181 #define CMD_MAX 4096
182 #define RBUF_SIZE (CMD_MAX*2)
183
184 /*
185  * Variables
186  */
187 static int stay_foreground = 0;
188 static uid_t daemon_uid;
189
190 static listen_socket_t *listen_fds = NULL;
191 static size_t listen_fds_num = 0;
192
193 static int do_shutdown = 0;
194
195 static pthread_t *queue_threads;
196 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
197 static int config_queue_threads = 4;
198
199 static pthread_t flush_thread;
200 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
201
202 static pthread_t *connection_threads = NULL;
203 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
204 static int connection_threads_num = 0;
205
206 /* Cache stuff */
207 static GTree          *cache_tree = NULL;
208 static cache_item_t   *cache_queue_head = NULL;
209 static cache_item_t   *cache_queue_tail = NULL;
210 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
211
212 static int config_write_interval = 300;
213 static int config_write_jitter   = 0;
214 static int config_flush_interval = 3600;
215 static int config_flush_at_shutdown = 0;
216 static char *config_pid_file = NULL;
217 static char *config_base_dir = NULL;
218 static size_t _config_base_dir_len = 0;
219 static int config_write_base_only = 0;
220
221 static listen_socket_t **config_listen_address_list = NULL;
222 static int config_listen_address_list_len = 0;
223
224 static uint64_t stats_queue_length = 0;
225 static uint64_t stats_updates_received = 0;
226 static uint64_t stats_flush_received = 0;
227 static uint64_t stats_updates_written = 0;
228 static uint64_t stats_data_sets_written = 0;
229 static uint64_t stats_journal_bytes = 0;
230 static uint64_t stats_journal_rotate = 0;
231 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
232
233 /* Journaled updates */
234 static char *journal_cur = NULL;
235 static char *journal_old = NULL;
236 static FILE *journal_fh = NULL;
237 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
238 static int journal_write(char *cmd, char *args);
239 static void journal_done(void);
240 static void journal_rotate(void);
241
242 /* 
243  * Functions
244  */
245 static void sig_common (const char *sig) /* {{{ */
246 {
247   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
248   do_shutdown++;
249   pthread_cond_broadcast(&flush_cond);
250   pthread_cond_broadcast(&queue_cond);
251 } /* }}} void sig_common */
252
253 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   sig_common("INT");
256 } /* }}} void sig_int_handler */
257
258 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
259 {
260   sig_common("TERM");
261 } /* }}} void sig_term_handler */
262
263 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
264 {
265   config_flush_at_shutdown = 1;
266   sig_common("USR1");
267 } /* }}} void sig_usr1_handler */
268
269 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
270 {
271   config_flush_at_shutdown = 0;
272   sig_common("USR2");
273 } /* }}} void sig_usr2_handler */
274
275 static void install_signal_handlers(void) /* {{{ */
276 {
277   /* These structures are static, because `sigaction' behaves weird if the are
278    * overwritten.. */
279   static struct sigaction sa_int;
280   static struct sigaction sa_term;
281   static struct sigaction sa_pipe;
282   static struct sigaction sa_usr1;
283   static struct sigaction sa_usr2;
284
285   /* Install signal handlers */
286   memset (&sa_int, 0, sizeof (sa_int));
287   sa_int.sa_handler = sig_int_handler;
288   sigaction (SIGINT, &sa_int, NULL);
289
290   memset (&sa_term, 0, sizeof (sa_term));
291   sa_term.sa_handler = sig_term_handler;
292   sigaction (SIGTERM, &sa_term, NULL);
293
294   memset (&sa_pipe, 0, sizeof (sa_pipe));
295   sa_pipe.sa_handler = SIG_IGN;
296   sigaction (SIGPIPE, &sa_pipe, NULL);
297
298   memset (&sa_pipe, 0, sizeof (sa_usr1));
299   sa_usr1.sa_handler = sig_usr1_handler;
300   sigaction (SIGUSR1, &sa_usr1, NULL);
301
302   memset (&sa_usr2, 0, sizeof (sa_usr2));
303   sa_usr2.sa_handler = sig_usr2_handler;
304   sigaction (SIGUSR2, &sa_usr2, NULL);
305
306 } /* }}} void install_signal_handlers */
307
308 static int open_pidfile(char *action, int oflag) /* {{{ */
309 {
310   int fd;
311   char *file;
312
313   file = (config_pid_file != NULL)
314     ? config_pid_file
315     : LOCALSTATEDIR "/run/rrdcached.pid";
316
317   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
318   if (fd < 0)
319     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
320             action, file, rrd_strerror(errno));
321
322   return(fd);
323 } /* }}} static int open_pidfile */
324
325 /* check existing pid file to see whether a daemon is running */
326 static int check_pidfile(void)
327 {
328   int pid_fd;
329   pid_t pid;
330   char pid_str[16];
331
332   pid_fd = open_pidfile("open", O_RDWR);
333   if (pid_fd < 0)
334     return pid_fd;
335
336   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
337     return -1;
338
339   pid = atoi(pid_str);
340   if (pid <= 0)
341     return -1;
342
343   /* another running process that we can signal COULD be
344    * a competing rrdcached */
345   if (pid != getpid() && kill(pid, 0) == 0)
346   {
347     fprintf(stderr,
348             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
349     close(pid_fd);
350     return -1;
351   }
352
353   lseek(pid_fd, 0, SEEK_SET);
354   ftruncate(pid_fd, 0);
355
356   fprintf(stderr,
357           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
358           "rrdcached: starting normally.\n", pid);
359
360   return pid_fd;
361 } /* }}} static int check_pidfile */
362
363 static int write_pidfile (int fd) /* {{{ */
364 {
365   pid_t pid;
366   FILE *fh;
367
368   pid = getpid ();
369
370   fh = fdopen (fd, "w");
371   if (fh == NULL)
372   {
373     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
374     close(fd);
375     return (-1);
376   }
377
378   fprintf (fh, "%i\n", (int) pid);
379   fclose (fh);
380
381   return (0);
382 } /* }}} int write_pidfile */
383
384 static int remove_pidfile (void) /* {{{ */
385 {
386   char *file;
387   int status;
388
389   file = (config_pid_file != NULL)
390     ? config_pid_file
391     : LOCALSTATEDIR "/run/rrdcached.pid";
392
393   status = unlink (file);
394   if (status == 0)
395     return (0);
396   return (errno);
397 } /* }}} int remove_pidfile */
398
399 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
400 {
401   char *eol;
402
403   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
404                sock->next_read - sock->next_cmd);
405
406   if (eol == NULL)
407   {
408     /* no commands left, move remainder back to front of rbuf */
409     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
410             sock->next_read - sock->next_cmd);
411     sock->next_read -= sock->next_cmd;
412     sock->next_cmd = 0;
413     *len = 0;
414     return NULL;
415   }
416   else
417   {
418     char *cmd = sock->rbuf + sock->next_cmd;
419     *eol = '\0';
420
421     sock->next_cmd = eol - sock->rbuf + 1;
422
423     if (eol > sock->rbuf && *(eol-1) == '\r')
424       *(--eol) = '\0'; /* handle "\r\n" EOL */
425
426     *len = eol - cmd;
427
428     return cmd;
429   }
430
431   /* NOTREACHED */
432   assert(1==0);
433 }
434
435 /* add the characters directly to the write buffer */
436 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
437 {
438   char *new_buf;
439
440   assert(sock != NULL);
441
442   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
443   if (new_buf == NULL)
444   {
445     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
446     return -1;
447   }
448
449   strncpy(new_buf + sock->wbuf_len, str, len + 1);
450
451   sock->wbuf = new_buf;
452   sock->wbuf_len += len;
453
454   return 0;
455 } /* }}} static int add_to_wbuf */
456
457 /* add the text to the "extra" info that's sent after the status line */
458 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
459 {
460   va_list argp;
461   char buffer[CMD_MAX];
462   int len;
463
464   if (sock == NULL) return 0; /* journal replay mode */
465   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
466
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
470 #else
471   len = vsprintf(buffer, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475   {
476     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
477     return -1;
478   }
479
480   return add_to_wbuf(sock, buffer, len);
481 } /* }}} static int add_response_info */
482
483 static int count_lines(char *str) /* {{{ */
484 {
485   int lines = 0;
486
487   if (str != NULL)
488   {
489     while ((str = strchr(str, '\n')) != NULL)
490     {
491       ++lines;
492       ++str;
493     }
494   }
495
496   return lines;
497 } /* }}} static int count_lines */
498
499 /* send the response back to the user.
500  * returns 0 on success, -1 on error
501  * write buffer is always zeroed after this call */
502 static int send_response (listen_socket_t *sock, response_code rc,
503                           char *fmt, ...) /* {{{ */
504 {
505   va_list argp;
506   char buffer[CMD_MAX];
507   int lines;
508   ssize_t wrote;
509   int rclen, len;
510
511   if (sock == NULL) return rc;  /* journal replay mode */
512
513   if (sock->batch_start)
514   {
515     if (rc == RESP_OK)
516       return rc; /* no response on success during BATCH */
517     lines = sock->batch_cmd;
518   }
519   else if (rc == RESP_OK)
520     lines = count_lines(sock->wbuf);
521   else
522     lines = -1;
523
524   rclen = sprintf(buffer, "%d ", lines);
525   va_start(argp, fmt);
526 #ifdef HAVE_VSNPRINTF
527   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
528 #else
529   len = vsprintf(buffer+rclen, fmt, argp);
530 #endif
531   va_end(argp);
532   if (len < 0)
533     return -1;
534
535   len += rclen;
536
537   /* append the result to the wbuf, don't write to the user */
538   if (sock->batch_start)
539     return add_to_wbuf(sock, buffer, len);
540
541   /* first write must be complete */
542   if (len != write(sock->fd, buffer, len))
543   {
544     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
545     return -1;
546   }
547
548   if (sock->wbuf != NULL && rc == RESP_OK)
549   {
550     wrote = 0;
551     while (wrote < sock->wbuf_len)
552     {
553       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
554       if (wb <= 0)
555       {
556         RRDD_LOG(LOG_INFO, "send_response: could not write results");
557         return -1;
558       }
559       wrote += wb;
560     }
561   }
562
563   free(sock->wbuf); sock->wbuf = NULL;
564   sock->wbuf_len = 0;
565
566   return 0;
567 } /* }}} */
568
569 static void wipe_ci_values(cache_item_t *ci, time_t when)
570 {
571   ci->values = NULL;
572   ci->values_num = 0;
573
574   ci->last_flush_time = when;
575   if (config_write_jitter > 0)
576     ci->last_flush_time += (random() % config_write_jitter);
577 }
578
579 /* remove_from_queue
580  * remove a "cache_item_t" item from the queue.
581  * must hold 'cache_lock' when calling this
582  */
583 static void remove_from_queue(cache_item_t *ci) /* {{{ */
584 {
585   if (ci == NULL) return;
586   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
587
588   if (ci->prev == NULL)
589     cache_queue_head = ci->next; /* reset head */
590   else
591     ci->prev->next = ci->next;
592
593   if (ci->next == NULL)
594     cache_queue_tail = ci->prev; /* reset the tail */
595   else
596     ci->next->prev = ci->prev;
597
598   ci->next = ci->prev = NULL;
599   ci->flags &= ~CI_FLAGS_IN_QUEUE;
600
601   pthread_mutex_lock (&stats_lock);
602   assert (stats_queue_length > 0);
603   stats_queue_length--;
604   pthread_mutex_unlock (&stats_lock);
605
606 } /* }}} static void remove_from_queue */
607
608 /* free the resources associated with the cache_item_t
609  * must hold cache_lock when calling this function
610  */
611 static void *free_cache_item(cache_item_t *ci) /* {{{ */
612 {
613   if (ci == NULL) return NULL;
614
615   remove_from_queue(ci);
616
617   for (int i=0; i < ci->values_num; i++)
618     free(ci->values[i]);
619
620   free (ci->values);
621   free (ci->file);
622
623   /* in case anyone is waiting */
624   pthread_cond_broadcast(&ci->flushed);
625
626   free (ci);
627
628   return NULL;
629 } /* }}} static void *free_cache_item */
630
631 /*
632  * enqueue_cache_item:
633  * `cache_lock' must be acquired before calling this function!
634  */
635 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
636     queue_side_t side)
637 {
638   if (ci == NULL)
639     return (-1);
640
641   if (ci->values_num == 0)
642     return (0);
643
644   if (side == HEAD)
645   {
646     if (cache_queue_head == ci)
647       return 0;
648
649     /* remove if further down in queue */
650     remove_from_queue(ci);
651
652     ci->prev = NULL;
653     ci->next = cache_queue_head;
654     if (ci->next != NULL)
655       ci->next->prev = ci;
656     cache_queue_head = ci;
657
658     if (cache_queue_tail == NULL)
659       cache_queue_tail = cache_queue_head;
660   }
661   else /* (side == TAIL) */
662   {
663     /* We don't move values back in the list.. */
664     if (ci->flags & CI_FLAGS_IN_QUEUE)
665       return (0);
666
667     assert (ci->next == NULL);
668     assert (ci->prev == NULL);
669
670     ci->prev = cache_queue_tail;
671
672     if (cache_queue_tail == NULL)
673       cache_queue_head = ci;
674     else
675       cache_queue_tail->next = ci;
676
677     cache_queue_tail = ci;
678   }
679
680   ci->flags |= CI_FLAGS_IN_QUEUE;
681
682   pthread_cond_signal(&queue_cond);
683   pthread_mutex_lock (&stats_lock);
684   stats_queue_length++;
685   pthread_mutex_unlock (&stats_lock);
686
687   return (0);
688 } /* }}} int enqueue_cache_item */
689
690 /*
691  * tree_callback_flush:
692  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
693  * while this is in progress.
694  */
695 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
696     gpointer data)
697 {
698   cache_item_t *ci;
699   callback_flush_data_t *cfd;
700
701   ci = (cache_item_t *) value;
702   cfd = (callback_flush_data_t *) data;
703
704   if (ci->flags & CI_FLAGS_IN_QUEUE)
705     return FALSE;
706
707   if ((ci->last_flush_time <= cfd->abs_timeout)
708       && (ci->values_num > 0))
709   {
710     enqueue_cache_item (ci, TAIL);
711   }
712   else if ((do_shutdown != 0)
713       && (ci->values_num > 0))
714   {
715     enqueue_cache_item (ci, TAIL);
716   }
717   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
718       && (ci->values_num <= 0))
719   {
720     char **temp;
721
722     temp = (char **) rrd_realloc (cfd->keys,
723         sizeof (char *) * (cfd->keys_num + 1));
724     if (temp == NULL)
725     {
726       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
727       return (FALSE);
728     }
729     cfd->keys = temp;
730     /* Make really sure this points to the _same_ place */
731     assert ((char *) key == ci->file);
732     cfd->keys[cfd->keys_num] = (char *) key;
733     cfd->keys_num++;
734   }
735
736   return (FALSE);
737 } /* }}} gboolean tree_callback_flush */
738
739 static int flush_old_values (int max_age)
740 {
741   callback_flush_data_t cfd;
742   size_t k;
743
744   memset (&cfd, 0, sizeof (cfd));
745   /* Pass the current time as user data so that we don't need to call
746    * `time' for each node. */
747   cfd.now = time (NULL);
748   cfd.keys = NULL;
749   cfd.keys_num = 0;
750
751   if (max_age > 0)
752     cfd.abs_timeout = cfd.now - max_age;
753   else
754     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
755
756   /* `tree_callback_flush' will return the keys of all values that haven't
757    * been touched in the last `config_flush_interval' seconds in `cfd'.
758    * The char*'s in this array point to the same memory as ci->file, so we
759    * don't need to free them separately. */
760   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
761
762   for (k = 0; k < cfd.keys_num; k++)
763   {
764     /* should never fail, since we have held the cache_lock
765      * the entire time */
766     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
767   }
768
769   if (cfd.keys != NULL)
770   {
771     free (cfd.keys);
772     cfd.keys = NULL;
773   }
774
775   return (0);
776 } /* int flush_old_values */
777
778 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
779 {
780   struct timeval now;
781   struct timespec next_flush;
782   int status;
783
784   gettimeofday (&now, NULL);
785   next_flush.tv_sec = now.tv_sec + config_flush_interval;
786   next_flush.tv_nsec = 1000 * now.tv_usec;
787
788   pthread_mutex_lock(&cache_lock);
789
790   while (!do_shutdown)
791   {
792     gettimeofday (&now, NULL);
793     if ((now.tv_sec > next_flush.tv_sec)
794         || ((now.tv_sec == next_flush.tv_sec)
795           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
796     {
797       /* Flush all values that haven't been written in the last
798        * `config_write_interval' seconds. */
799       flush_old_values (config_write_interval);
800
801       /* Determine the time of the next cache flush. */
802       next_flush.tv_sec =
803         now.tv_sec + next_flush.tv_sec % config_flush_interval;
804
805       /* unlock the cache while we rotate so we don't block incoming
806        * updates if the fsync() blocks on disk I/O */
807       pthread_mutex_unlock(&cache_lock);
808       journal_rotate();
809       pthread_mutex_lock(&cache_lock);
810     }
811
812     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
813     if (status != 0 && status != ETIMEDOUT)
814     {
815       RRDD_LOG (LOG_ERR, "flush_thread_main: "
816                 "pthread_cond_timedwait returned %i.", status);
817     }
818   }
819
820   if (config_flush_at_shutdown)
821     flush_old_values (-1); /* flush everything */
822
823   pthread_mutex_unlock(&cache_lock);
824
825   return NULL;
826 } /* void *flush_thread_main */
827
828 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
829 {
830   pthread_mutex_lock (&cache_lock);
831
832   while (!do_shutdown
833          || (cache_queue_head != NULL && config_flush_at_shutdown))
834   {
835     cache_item_t *ci;
836     char *file;
837     char **values;
838     int values_num;
839     int status;
840     int i;
841
842     /* Now, check if there's something to store away. If not, wait until
843      * something comes in.  if we are shutting down, do not wait around.  */
844     if (cache_queue_head == NULL && !do_shutdown)
845     {
846       status = pthread_cond_wait (&queue_cond, &cache_lock);
847       if ((status != 0) && (status != ETIMEDOUT))
848       {
849         RRDD_LOG (LOG_ERR, "queue_thread_main: "
850             "pthread_cond_wait returned %i.", status);
851       }
852     }
853
854     /* Check if a value has arrived. This may be NULL if we timed out or there
855      * was an interrupt such as a signal. */
856     if (cache_queue_head == NULL)
857       continue;
858
859     ci = cache_queue_head;
860
861     /* copy the relevant parts */
862     file = strdup (ci->file);
863     if (file == NULL)
864     {
865       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
866       continue;
867     }
868
869     assert(ci->values != NULL);
870     assert(ci->values_num > 0);
871
872     values = ci->values;
873     values_num = ci->values_num;
874
875     wipe_ci_values(ci, time(NULL));
876     remove_from_queue(ci);
877
878     pthread_mutex_unlock (&cache_lock);
879
880     rrd_clear_error ();
881     status = rrd_update_r (file, NULL, values_num, (void *) values);
882     if (status != 0)
883     {
884       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
885           "rrd_update_r (%s) failed with status %i. (%s)",
886           file, status, rrd_get_error());
887     }
888
889     journal_write("wrote", file);
890     pthread_cond_broadcast(&ci->flushed);
891
892     for (i = 0; i < values_num; i++)
893       free (values[i]);
894
895     free(values);
896     free(file);
897
898     if (status == 0)
899     {
900       pthread_mutex_lock (&stats_lock);
901       stats_updates_written++;
902       stats_data_sets_written += values_num;
903       pthread_mutex_unlock (&stats_lock);
904     }
905
906     pthread_mutex_lock (&cache_lock);
907   }
908   pthread_mutex_unlock (&cache_lock);
909
910   return (NULL);
911 } /* }}} void *queue_thread_main */
912
913 static int buffer_get_field (char **buffer_ret, /* {{{ */
914     size_t *buffer_size_ret, char **field_ret)
915 {
916   char *buffer;
917   size_t buffer_pos;
918   size_t buffer_size;
919   char *field;
920   size_t field_size;
921   int status;
922
923   buffer = *buffer_ret;
924   buffer_pos = 0;
925   buffer_size = *buffer_size_ret;
926   field = *buffer_ret;
927   field_size = 0;
928
929   if (buffer_size <= 0)
930     return (-1);
931
932   /* This is ensured by `handle_request'. */
933   assert (buffer[buffer_size - 1] == '\0');
934
935   status = -1;
936   while (buffer_pos < buffer_size)
937   {
938     /* Check for end-of-field or end-of-buffer */
939     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
940     {
941       field[field_size] = 0;
942       field_size++;
943       buffer_pos++;
944       status = 0;
945       break;
946     }
947     /* Handle escaped characters. */
948     else if (buffer[buffer_pos] == '\\')
949     {
950       if (buffer_pos >= (buffer_size - 1))
951         break;
952       buffer_pos++;
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957     /* Normal operation */ 
958     else
959     {
960       field[field_size] = buffer[buffer_pos];
961       field_size++;
962       buffer_pos++;
963     }
964   } /* while (buffer_pos < buffer_size) */
965
966   if (status != 0)
967     return (status);
968
969   *buffer_ret = buffer + buffer_pos;
970   *buffer_size_ret = buffer_size - buffer_pos;
971   *field_ret = field;
972
973   return (0);
974 } /* }}} int buffer_get_field */
975
976 /* if we're restricting writes to the base directory,
977  * check whether the file falls within the dir
978  * returns 1 if OK, otherwise 0
979  */
980 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
981 {
982   assert(file != NULL);
983
984   if (!config_write_base_only
985       || sock == NULL /* journal replay */
986       || config_base_dir == NULL)
987     return 1;
988
989   if (strstr(file, "../") != NULL) goto err;
990
991   /* relative paths without "../" are ok */
992   if (*file != '/') return 1;
993
994   /* file must be of the format base + "/" + <1+ char filename> */
995   if (strlen(file) < _config_base_dir_len + 2) goto err;
996   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
997   if (*(file + _config_base_dir_len) != '/') goto err;
998
999   return 1;
1000
1001 err:
1002   if (sock != NULL && sock->fd >= 0)
1003     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1004
1005   return 0;
1006 } /* }}} static int check_file_access */
1007
1008 /* when using a base dir, convert relative paths to absolute paths.
1009  * if necessary, modifies the "filename" pointer to point
1010  * to the new path created in "tmp".  "tmp" is provided
1011  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1012  *
1013  * this allows us to optimize for the expected case (absolute path)
1014  * with a no-op.
1015  */
1016 static void get_abs_path(char **filename, char *tmp)
1017 {
1018   assert(tmp != NULL);
1019   assert(filename != NULL && *filename != NULL);
1020
1021   if (config_base_dir == NULL || **filename == '/')
1022     return;
1023
1024   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1025   *filename = tmp;
1026 } /* }}} static int get_abs_path */
1027
1028 /* returns 1 if we have the required privilege level,
1029  * otherwise issue an error to the user on sock */
1030 static int has_privilege (listen_socket_t *sock, /* {{{ */
1031                           socket_privilege priv)
1032 {
1033   if (sock == NULL) /* journal replay */
1034     return 1;
1035
1036   if (sock->privilege >= priv)
1037     return 1;
1038
1039   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1040 } /* }}} static int has_privilege */
1041
1042 static int flush_file (const char *filename) /* {{{ */
1043 {
1044   cache_item_t *ci;
1045
1046   pthread_mutex_lock (&cache_lock);
1047
1048   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1049   if (ci == NULL)
1050   {
1051     pthread_mutex_unlock (&cache_lock);
1052     return (ENOENT);
1053   }
1054
1055   if (ci->values_num > 0)
1056   {
1057     /* Enqueue at head */
1058     enqueue_cache_item (ci, HEAD);
1059     pthread_cond_wait(&ci->flushed, &cache_lock);
1060   }
1061
1062   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1063    * may have been purged during our cond_wait() */
1064
1065   pthread_mutex_unlock(&cache_lock);
1066
1067   return (0);
1068 } /* }}} int flush_file */
1069
1070 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1071     char *buffer, size_t buffer_size)
1072 {
1073   int status;
1074   char **help_text;
1075   char *command;
1076
1077   char *help_help[2] =
1078   {
1079     "Command overview\n"
1080     ,
1081     "HELP [<command>]\n"
1082     "FLUSH <filename>\n"
1083     "FLUSHALL\n"
1084     "PENDING <filename>\n"
1085     "FORGET <filename>\n"
1086     "QUEUE\n"
1087     "UPDATE <filename> <values> [<values> ...]\n"
1088     "BATCH\n"
1089     "STATS\n"
1090     "QUIT\n"
1091   };
1092
1093   char *help_flush[2] =
1094   {
1095     "Help for FLUSH\n"
1096     ,
1097     "Usage: FLUSH <filename>\n"
1098     "\n"
1099     "Adds the given filename to the head of the update queue and returns\n"
1100     "after it has been dequeued.\n"
1101   };
1102
1103   char *help_flushall[2] =
1104   {
1105     "Help for FLUSHALL\n"
1106     ,
1107     "Usage: FLUSHALL\n"
1108     "\n"
1109     "Triggers writing of all pending updates.  Returns immediately.\n"
1110   };
1111
1112   char *help_pending[2] =
1113   {
1114     "Help for PENDING\n"
1115     ,
1116     "Usage: PENDING <filename>\n"
1117     "\n"
1118     "Shows any 'pending' updates for a file, in order.\n"
1119     "The updates shown have not yet been written to the underlying RRD file.\n"
1120   };
1121
1122   char *help_forget[2] =
1123   {
1124     "Help for FORGET\n"
1125     ,
1126     "Usage: FORGET <filename>\n"
1127     "\n"
1128     "Removes the file completely from the cache.\n"
1129     "Any pending updates for the file will be lost.\n"
1130   };
1131
1132   char *help_queue[2] =
1133   {
1134     "Help for QUEUE\n"
1135     ,
1136     "Shows all files in the output queue.\n"
1137     "The output is zero or more lines in the following format:\n"
1138     "(where <num_vals> is the number of values to be written)\n"
1139     "\n"
1140     "<num_vals> <filename>\n"
1141     "\n"
1142   };
1143
1144   char *help_update[2] =
1145   {
1146     "Help for UPDATE\n"
1147     ,
1148     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1149     "\n"
1150     "Adds the given file to the internal cache if it is not yet known and\n"
1151     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1152     "for details.\n"
1153     "\n"
1154     "Each <values> has the following form:\n"
1155     "  <values> = <time>:<value>[:<value>[...]]\n"
1156     "See the rrdupdate(1) manpage for details.\n"
1157   };
1158
1159   char *help_stats[2] =
1160   {
1161     "Help for STATS\n"
1162     ,
1163     "Usage: STATS\n"
1164     "\n"
1165     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1166     "a description of the values.\n"
1167   };
1168
1169   char *help_batch[2] =
1170   {
1171     "Help for BATCH\n"
1172     ,
1173     "The 'BATCH' command permits the client to initiate a bulk load\n"
1174     "   of commands to rrdcached.\n"
1175     "\n"
1176     "Usage:\n"
1177     "\n"
1178     "    client: BATCH\n"
1179     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1180     "    client: command #1\n"
1181     "    client: command #2\n"
1182     "    client: ... and so on\n"
1183     "    client: .\n"
1184     "    server: 2 errors\n"
1185     "    server: 7 message for command #7\n"
1186     "    server: 9 message for command #9\n"
1187     "\n"
1188     "For more information, consult the rrdcached(1) documentation.\n"
1189   };
1190
1191   char *help_quit[2] =
1192   {
1193     "Help for QUIT\n"
1194     ,
1195     "Disconnect from rrdcached.\n"
1196   };
1197
1198   status = buffer_get_field (&buffer, &buffer_size, &command);
1199   if (status != 0)
1200     help_text = help_help;
1201   else
1202   {
1203     if (strcasecmp (command, "update") == 0)
1204       help_text = help_update;
1205     else if (strcasecmp (command, "flush") == 0)
1206       help_text = help_flush;
1207     else if (strcasecmp (command, "flushall") == 0)
1208       help_text = help_flushall;
1209     else if (strcasecmp (command, "pending") == 0)
1210       help_text = help_pending;
1211     else if (strcasecmp (command, "forget") == 0)
1212       help_text = help_forget;
1213     else if (strcasecmp (command, "queue") == 0)
1214       help_text = help_queue;
1215     else if (strcasecmp (command, "stats") == 0)
1216       help_text = help_stats;
1217     else if (strcasecmp (command, "batch") == 0)
1218       help_text = help_batch;
1219     else if (strcasecmp (command, "quit") == 0)
1220       help_text = help_quit;
1221     else
1222       help_text = help_help;
1223   }
1224
1225   add_response_info(sock, help_text[1]);
1226   return send_response(sock, RESP_OK, help_text[0]);
1227 } /* }}} int handle_request_help */
1228
1229 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1230 {
1231   uint64_t copy_queue_length;
1232   uint64_t copy_updates_received;
1233   uint64_t copy_flush_received;
1234   uint64_t copy_updates_written;
1235   uint64_t copy_data_sets_written;
1236   uint64_t copy_journal_bytes;
1237   uint64_t copy_journal_rotate;
1238
1239   uint64_t tree_nodes_number;
1240   uint64_t tree_depth;
1241
1242   pthread_mutex_lock (&stats_lock);
1243   copy_queue_length       = stats_queue_length;
1244   copy_updates_received   = stats_updates_received;
1245   copy_flush_received     = stats_flush_received;
1246   copy_updates_written    = stats_updates_written;
1247   copy_data_sets_written  = stats_data_sets_written;
1248   copy_journal_bytes      = stats_journal_bytes;
1249   copy_journal_rotate     = stats_journal_rotate;
1250   pthread_mutex_unlock (&stats_lock);
1251
1252   pthread_mutex_lock (&cache_lock);
1253   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1254   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1255   pthread_mutex_unlock (&cache_lock);
1256
1257   add_response_info(sock,
1258                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1259   add_response_info(sock,
1260                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1261   add_response_info(sock,
1262                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1263   add_response_info(sock,
1264                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1265   add_response_info(sock,
1266                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1267   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1268   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1269   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1270   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1271
1272   send_response(sock, RESP_OK, "Statistics follow\n");
1273
1274   return (0);
1275 } /* }}} int handle_request_stats */
1276
1277 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1278     char *buffer, size_t buffer_size)
1279 {
1280   char *file, file_tmp[PATH_MAX];
1281   int status;
1282
1283   status = buffer_get_field (&buffer, &buffer_size, &file);
1284   if (status != 0)
1285   {
1286     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1287   }
1288   else
1289   {
1290     pthread_mutex_lock(&stats_lock);
1291     stats_flush_received++;
1292     pthread_mutex_unlock(&stats_lock);
1293
1294     get_abs_path(&file, file_tmp);
1295     if (!check_file_access(file, sock)) return 0;
1296
1297     status = flush_file (file);
1298     if (status == 0)
1299       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1300     else if (status == ENOENT)
1301     {
1302       /* no file in our tree; see whether it exists at all */
1303       struct stat statbuf;
1304
1305       memset(&statbuf, 0, sizeof(statbuf));
1306       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1307         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1308       else
1309         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1310     }
1311     else if (status < 0)
1312       return send_response(sock, RESP_ERR, "Internal error.\n");
1313     else
1314       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1315   }
1316
1317   /* NOTREACHED */
1318   assert(1==0);
1319 } /* }}} int handle_request_flush */
1320
1321 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1322 {
1323   int status;
1324
1325   status = has_privilege(sock, PRIV_HIGH);
1326   if (status <= 0)
1327     return status;
1328
1329   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1330
1331   pthread_mutex_lock(&cache_lock);
1332   flush_old_values(-1);
1333   pthread_mutex_unlock(&cache_lock);
1334
1335   return send_response(sock, RESP_OK, "Started flush.\n");
1336 } /* }}} static int handle_request_flushall */
1337
1338 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1339                                   char *buffer, size_t buffer_size)
1340 {
1341   int status;
1342   char *file, file_tmp[PATH_MAX];
1343   cache_item_t *ci;
1344
1345   status = buffer_get_field(&buffer, &buffer_size, &file);
1346   if (status != 0)
1347     return send_response(sock, RESP_ERR,
1348                          "Usage: PENDING <filename>\n");
1349
1350   status = has_privilege(sock, PRIV_HIGH);
1351   if (status <= 0)
1352     return status;
1353
1354   get_abs_path(&file, file_tmp);
1355
1356   pthread_mutex_lock(&cache_lock);
1357   ci = g_tree_lookup(cache_tree, file);
1358   if (ci == NULL)
1359   {
1360     pthread_mutex_unlock(&cache_lock);
1361     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1362   }
1363
1364   for (int i=0; i < ci->values_num; i++)
1365     add_response_info(sock, "%s\n", ci->values[i]);
1366
1367   pthread_mutex_unlock(&cache_lock);
1368   return send_response(sock, RESP_OK, "updates pending\n");
1369 } /* }}} static int handle_request_pending */
1370
1371 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1372                                  char *buffer, size_t buffer_size)
1373 {
1374   int status;
1375   gboolean found;
1376   char *file, file_tmp[PATH_MAX];
1377
1378   status = buffer_get_field(&buffer, &buffer_size, &file);
1379   if (status != 0)
1380     return send_response(sock, RESP_ERR,
1381                          "Usage: FORGET <filename>\n");
1382
1383   status = has_privilege(sock, PRIV_HIGH);
1384   if (status <= 0)
1385     return status;
1386
1387   get_abs_path(&file, file_tmp);
1388   if (!check_file_access(file, sock)) return 0;
1389
1390   pthread_mutex_lock(&cache_lock);
1391   found = g_tree_remove(cache_tree, file);
1392   pthread_mutex_unlock(&cache_lock);
1393
1394   if (found == TRUE)
1395   {
1396     if (sock != NULL)
1397       journal_write("forget", file);
1398
1399     return send_response(sock, RESP_OK, "Gone!\n");
1400   }
1401   else
1402     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1403
1404   /* NOTREACHED */
1405   assert(1==0);
1406 } /* }}} static int handle_request_forget */
1407
1408 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1409 {
1410   cache_item_t *ci;
1411
1412   pthread_mutex_lock(&cache_lock);
1413
1414   ci = cache_queue_head;
1415   while (ci != NULL)
1416   {
1417     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1418     ci = ci->next;
1419   }
1420
1421   pthread_mutex_unlock(&cache_lock);
1422
1423   return send_response(sock, RESP_OK, "in queue.\n");
1424 } /* }}} int handle_request_queue */
1425
1426 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1427                                   time_t now,
1428                                   char *buffer, size_t buffer_size)
1429 {
1430   char *file, file_tmp[PATH_MAX];
1431   int values_num = 0;
1432   int status;
1433   char orig_buf[CMD_MAX];
1434
1435   cache_item_t *ci;
1436
1437   status = has_privilege(sock, PRIV_HIGH);
1438   if (status <= 0)
1439     return status;
1440
1441   /* save it for the journal later */
1442   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1443
1444   status = buffer_get_field (&buffer, &buffer_size, &file);
1445   if (status != 0)
1446     return send_response(sock, RESP_ERR,
1447                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1448
1449   pthread_mutex_lock(&stats_lock);
1450   stats_updates_received++;
1451   pthread_mutex_unlock(&stats_lock);
1452
1453   get_abs_path(&file, file_tmp);
1454   if (!check_file_access(file, sock)) return 0;
1455
1456   pthread_mutex_lock (&cache_lock);
1457   ci = g_tree_lookup (cache_tree, file);
1458
1459   if (ci == NULL) /* {{{ */
1460   {
1461     struct stat statbuf;
1462
1463     /* don't hold the lock while we setup; stat(2) might block */
1464     pthread_mutex_unlock(&cache_lock);
1465
1466     memset (&statbuf, 0, sizeof (statbuf));
1467     status = stat (file, &statbuf);
1468     if (status != 0)
1469     {
1470       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1471
1472       status = errno;
1473       if (status == ENOENT)
1474         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1475       else
1476         return send_response(sock, RESP_ERR,
1477                              "stat failed with error %i.\n", status);
1478     }
1479     if (!S_ISREG (statbuf.st_mode))
1480       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1481
1482     if (access(file, R_OK|W_OK) != 0)
1483       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1484                            file, rrd_strerror(errno));
1485
1486     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1487     if (ci == NULL)
1488     {
1489       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1490
1491       return send_response(sock, RESP_ERR, "malloc failed.\n");
1492     }
1493     memset (ci, 0, sizeof (cache_item_t));
1494
1495     ci->file = strdup (file);
1496     if (ci->file == NULL)
1497     {
1498       free (ci);
1499       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1500
1501       return send_response(sock, RESP_ERR, "strdup failed.\n");
1502     }
1503
1504     wipe_ci_values(ci, now);
1505     ci->flags = CI_FLAGS_IN_TREE;
1506     pthread_cond_init(&ci->flushed, NULL);
1507
1508     pthread_mutex_lock(&cache_lock);
1509     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1510   } /* }}} */
1511   assert (ci != NULL);
1512
1513   /* don't re-write updates in replay mode */
1514   if (sock != NULL)
1515     journal_write("update", orig_buf);
1516
1517   while (buffer_size > 0)
1518   {
1519     char **temp;
1520     char *value;
1521     time_t stamp;
1522     char *eostamp;
1523
1524     status = buffer_get_field (&buffer, &buffer_size, &value);
1525     if (status != 0)
1526     {
1527       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1528       break;
1529     }
1530
1531     /* make sure update time is always moving forward */
1532     stamp = strtol(value, &eostamp, 10);
1533     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1534     {
1535       pthread_mutex_unlock(&cache_lock);
1536       return send_response(sock, RESP_ERR,
1537                            "Cannot find timestamp in '%s'!\n", value);
1538     }
1539     else if (stamp <= ci->last_update_stamp)
1540     {
1541       pthread_mutex_unlock(&cache_lock);
1542       return send_response(sock, RESP_ERR,
1543                            "illegal attempt to update using time %ld when last"
1544                            " update time is %ld (minimum one second step)\n",
1545                            stamp, ci->last_update_stamp);
1546     }
1547     else
1548       ci->last_update_stamp = stamp;
1549
1550     temp = (char **) rrd_realloc (ci->values,
1551         sizeof (char *) * (ci->values_num + 1));
1552     if (temp == NULL)
1553     {
1554       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1555       continue;
1556     }
1557     ci->values = temp;
1558
1559     ci->values[ci->values_num] = strdup (value);
1560     if (ci->values[ci->values_num] == NULL)
1561     {
1562       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1563       continue;
1564     }
1565     ci->values_num++;
1566
1567     values_num++;
1568   }
1569
1570   if (((now - ci->last_flush_time) >= config_write_interval)
1571       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1572       && (ci->values_num > 0))
1573   {
1574     enqueue_cache_item (ci, TAIL);
1575   }
1576
1577   pthread_mutex_unlock (&cache_lock);
1578
1579   if (values_num < 1)
1580     return send_response(sock, RESP_ERR, "No values updated.\n");
1581   else
1582     return send_response(sock, RESP_OK,
1583                          "errors, enqueued %i value(s).\n", values_num);
1584
1585   /* NOTREACHED */
1586   assert(1==0);
1587
1588 } /* }}} int handle_request_update */
1589
1590 /* we came across a "WROTE" entry during journal replay.
1591  * throw away any values that we have accumulated for this file
1592  */
1593 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1594 {
1595   int i;
1596   cache_item_t *ci;
1597   const char *file = buffer;
1598
1599   pthread_mutex_lock(&cache_lock);
1600
1601   ci = g_tree_lookup(cache_tree, file);
1602   if (ci == NULL)
1603   {
1604     pthread_mutex_unlock(&cache_lock);
1605     return (0);
1606   }
1607
1608   if (ci->values)
1609   {
1610     for (i=0; i < ci->values_num; i++)
1611       free(ci->values[i]);
1612
1613     free(ci->values);
1614   }
1615
1616   wipe_ci_values(ci, now);
1617   remove_from_queue(ci);
1618
1619   pthread_mutex_unlock(&cache_lock);
1620   return (0);
1621 } /* }}} int handle_request_wrote */
1622
1623 /* start "BATCH" processing */
1624 static int batch_start (listen_socket_t *sock) /* {{{ */
1625 {
1626   int status;
1627   if (sock->batch_start)
1628     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1629
1630   status = send_response(sock, RESP_OK,
1631                          "Go ahead.  End with dot '.' on its own line.\n");
1632   sock->batch_start = time(NULL);
1633   sock->batch_cmd = 0;
1634
1635   return status;
1636 } /* }}} static int batch_start */
1637
1638 /* finish "BATCH" processing and return results to the client */
1639 static int batch_done (listen_socket_t *sock) /* {{{ */
1640 {
1641   assert(sock->batch_start);
1642   sock->batch_start = 0;
1643   sock->batch_cmd  = 0;
1644   return send_response(sock, RESP_OK, "errors\n");
1645 } /* }}} static int batch_done */
1646
1647 /* if sock==NULL, we are in journal replay mode */
1648 static int handle_request (listen_socket_t *sock, /* {{{ */
1649                            time_t now,
1650                            char *buffer, size_t buffer_size)
1651 {
1652   char *buffer_ptr;
1653   char *command;
1654   int status;
1655
1656   assert (buffer[buffer_size - 1] == '\0');
1657
1658   buffer_ptr = buffer;
1659   command = NULL;
1660   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1661   if (status != 0)
1662   {
1663     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1664     return (-1);
1665   }
1666
1667   if (sock != NULL && sock->batch_start)
1668     sock->batch_cmd++;
1669
1670   if (strcasecmp (command, "update") == 0)
1671     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1672   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1673   {
1674     /* this is only valid in replay mode */
1675     return (handle_request_wrote (buffer_ptr, now));
1676   }
1677   else if (strcasecmp (command, "flush") == 0)
1678     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1679   else if (strcasecmp (command, "flushall") == 0)
1680     return (handle_request_flushall(sock));
1681   else if (strcasecmp (command, "pending") == 0)
1682     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1683   else if (strcasecmp (command, "forget") == 0)
1684     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1685   else if (strcasecmp (command, "queue") == 0)
1686     return (handle_request_queue(sock));
1687   else if (strcasecmp (command, "stats") == 0)
1688     return (handle_request_stats (sock));
1689   else if (strcasecmp (command, "help") == 0)
1690     return (handle_request_help (sock, buffer_ptr, buffer_size));
1691   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1692     return batch_start(sock);
1693   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1694     return batch_done(sock);
1695   else if (strcasecmp (command, "quit") == 0)
1696     return -1;
1697   else
1698     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1699
1700   /* NOTREACHED */
1701   assert(1==0);
1702 } /* }}} int handle_request */
1703
1704 /* MUST NOT hold journal_lock before calling this */
1705 static void journal_rotate(void) /* {{{ */
1706 {
1707   FILE *old_fh = NULL;
1708   int new_fd;
1709
1710   if (journal_cur == NULL || journal_old == NULL)
1711     return;
1712
1713   pthread_mutex_lock(&journal_lock);
1714
1715   /* we rotate this way (rename before close) so that the we can release
1716    * the journal lock as fast as possible.  Journal writes to the new
1717    * journal can proceed immediately after the new file is opened.  The
1718    * fclose can then block without affecting new updates.
1719    */
1720   if (journal_fh != NULL)
1721   {
1722     old_fh = journal_fh;
1723     journal_fh = NULL;
1724     rename(journal_cur, journal_old);
1725     ++stats_journal_rotate;
1726   }
1727
1728   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1729                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1730   if (new_fd >= 0)
1731   {
1732     journal_fh = fdopen(new_fd, "a");
1733     if (journal_fh == NULL)
1734       close(new_fd);
1735   }
1736
1737   pthread_mutex_unlock(&journal_lock);
1738
1739   if (old_fh != NULL)
1740     fclose(old_fh);
1741
1742   if (journal_fh == NULL)
1743   {
1744     RRDD_LOG(LOG_CRIT,
1745              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1746              journal_cur, rrd_strerror(errno));
1747
1748     RRDD_LOG(LOG_ERR,
1749              "JOURNALING DISABLED: All values will be flushed at shutdown");
1750     config_flush_at_shutdown = 1;
1751   }
1752
1753 } /* }}} static void journal_rotate */
1754
1755 static void journal_done(void) /* {{{ */
1756 {
1757   if (journal_cur == NULL)
1758     return;
1759
1760   pthread_mutex_lock(&journal_lock);
1761   if (journal_fh != NULL)
1762   {
1763     fclose(journal_fh);
1764     journal_fh = NULL;
1765   }
1766
1767   if (config_flush_at_shutdown)
1768   {
1769     RRDD_LOG(LOG_INFO, "removing journals");
1770     unlink(journal_old);
1771     unlink(journal_cur);
1772   }
1773   else
1774   {
1775     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1776              "journals will be used at next startup");
1777   }
1778
1779   pthread_mutex_unlock(&journal_lock);
1780
1781 } /* }}} static void journal_done */
1782
1783 static int journal_write(char *cmd, char *args) /* {{{ */
1784 {
1785   int chars;
1786
1787   if (journal_fh == NULL)
1788     return 0;
1789
1790   pthread_mutex_lock(&journal_lock);
1791   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1792   pthread_mutex_unlock(&journal_lock);
1793
1794   if (chars > 0)
1795   {
1796     pthread_mutex_lock(&stats_lock);
1797     stats_journal_bytes += chars;
1798     pthread_mutex_unlock(&stats_lock);
1799   }
1800
1801   return chars;
1802 } /* }}} static int journal_write */
1803
1804 static int journal_replay (const char *file) /* {{{ */
1805 {
1806   FILE *fh;
1807   int entry_cnt = 0;
1808   int fail_cnt = 0;
1809   uint64_t line = 0;
1810   char entry[CMD_MAX];
1811   time_t now;
1812
1813   if (file == NULL) return 0;
1814
1815   {
1816     char *reason = "unknown error";
1817     int status = 0;
1818     struct stat statbuf;
1819
1820     memset(&statbuf, 0, sizeof(statbuf));
1821     if (stat(file, &statbuf) != 0)
1822     {
1823       if (errno == ENOENT)
1824         return 0;
1825
1826       reason = "stat error";
1827       status = errno;
1828     }
1829     else if (!S_ISREG(statbuf.st_mode))
1830     {
1831       reason = "not a regular file";
1832       status = EPERM;
1833     }
1834     if (statbuf.st_uid != daemon_uid)
1835     {
1836       reason = "not owned by daemon user";
1837       status = EACCES;
1838     }
1839     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1840     {
1841       reason = "must not be user/group writable";
1842       status = EACCES;
1843     }
1844
1845     if (status != 0)
1846     {
1847       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1848                file, rrd_strerror(status), reason);
1849       return 0;
1850     }
1851   }
1852
1853   fh = fopen(file, "r");
1854   if (fh == NULL)
1855   {
1856     if (errno != ENOENT)
1857       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1858                file, rrd_strerror(errno));
1859     return 0;
1860   }
1861   else
1862     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1863
1864   now = time(NULL);
1865
1866   while(!feof(fh))
1867   {
1868     size_t entry_len;
1869
1870     ++line;
1871     if (fgets(entry, sizeof(entry), fh) == NULL)
1872       break;
1873     entry_len = strlen(entry);
1874
1875     /* check \n termination in case journal writing crashed mid-line */
1876     if (entry_len == 0)
1877       continue;
1878     else if (entry[entry_len - 1] != '\n')
1879     {
1880       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1881       ++fail_cnt;
1882       continue;
1883     }
1884
1885     entry[entry_len - 1] = '\0';
1886
1887     if (handle_request(NULL, now, entry, entry_len) == 0)
1888       ++entry_cnt;
1889     else
1890       ++fail_cnt;
1891   }
1892
1893   fclose(fh);
1894
1895   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1896            entry_cnt, fail_cnt);
1897
1898   return entry_cnt > 0 ? 1 : 0;
1899 } /* }}} static int journal_replay */
1900
1901 static void journal_init(void) /* {{{ */
1902 {
1903   int had_journal = 0;
1904
1905   if (journal_cur == NULL) return;
1906
1907   pthread_mutex_lock(&journal_lock);
1908
1909   RRDD_LOG(LOG_INFO, "checking for journal files");
1910
1911   had_journal += journal_replay(journal_old);
1912   had_journal += journal_replay(journal_cur);
1913
1914   /* it must have been a crash.  start a flush */
1915   if (had_journal && config_flush_at_shutdown)
1916     flush_old_values(-1);
1917
1918   pthread_mutex_unlock(&journal_lock);
1919   journal_rotate();
1920
1921   RRDD_LOG(LOG_INFO, "journal processing complete");
1922
1923 } /* }}} static void journal_init */
1924
1925 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1926 {
1927   assert(sock != NULL);
1928
1929   free(sock->rbuf);  sock->rbuf = NULL;
1930   free(sock->wbuf);  sock->wbuf = NULL;
1931   free(sock);
1932 } /* }}} void free_listen_socket */
1933
1934 static void close_connection(listen_socket_t *sock) /* {{{ */
1935 {
1936   if (sock->fd >= 0)
1937   {
1938     close(sock->fd);
1939     sock->fd = -1;
1940   }
1941
1942   free_listen_socket(sock);
1943
1944 } /* }}} void close_connection */
1945
1946 static void *connection_thread_main (void *args) /* {{{ */
1947 {
1948   listen_socket_t *sock;
1949   int i;
1950   int fd;
1951
1952   sock = (listen_socket_t *) args;
1953   fd = sock->fd;
1954
1955   /* init read buffers */
1956   sock->next_read = sock->next_cmd = 0;
1957   sock->rbuf = malloc(RBUF_SIZE);
1958   if (sock->rbuf == NULL)
1959   {
1960     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1961     close_connection(sock);
1962     return NULL;
1963   }
1964
1965   pthread_mutex_lock (&connection_threads_lock);
1966   {
1967     pthread_t *temp;
1968
1969     temp = (pthread_t *) rrd_realloc (connection_threads,
1970         sizeof (pthread_t) * (connection_threads_num + 1));
1971     if (temp == NULL)
1972     {
1973       RRDD_LOG (LOG_ERR, "connection_thread_main: realloc(++) failed.");
1974     }
1975     else
1976     {
1977       connection_threads = temp;
1978       connection_threads[connection_threads_num] = pthread_self ();
1979       connection_threads_num++;
1980     }
1981   }
1982   pthread_mutex_unlock (&connection_threads_lock);
1983
1984   while (do_shutdown == 0)
1985   {
1986     char *cmd;
1987     ssize_t cmd_len;
1988     ssize_t rbytes;
1989     time_t now;
1990
1991     struct pollfd pollfd;
1992     int status;
1993
1994     pollfd.fd = fd;
1995     pollfd.events = POLLIN | POLLPRI;
1996     pollfd.revents = 0;
1997
1998     status = poll (&pollfd, 1, /* timeout = */ 500);
1999     if (do_shutdown)
2000       break;
2001     else if (status == 0) /* timeout */
2002       continue;
2003     else if (status < 0) /* error */
2004     {
2005       status = errno;
2006       if (status != EINTR)
2007         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2008       continue;
2009     }
2010
2011     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2012       break;
2013     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2014     {
2015       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2016           "poll(2) returned something unexpected: %#04hx",
2017           pollfd.revents);
2018       break;
2019     }
2020
2021     rbytes = read(fd, sock->rbuf + sock->next_read,
2022                   RBUF_SIZE - sock->next_read);
2023     if (rbytes < 0)
2024     {
2025       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2026       break;
2027     }
2028     else if (rbytes == 0)
2029       break; /* eof */
2030
2031     sock->next_read += rbytes;
2032
2033     if (sock->batch_start)
2034       now = sock->batch_start;
2035     else
2036       now = time(NULL);
2037
2038     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2039     {
2040       status = handle_request (sock, now, cmd, cmd_len+1);
2041       if (status != 0)
2042         goto out_close;
2043     }
2044   }
2045
2046 out_close:
2047   close_connection(sock);
2048
2049   /* Remove this thread from the connection threads list */
2050   pthread_mutex_lock (&connection_threads_lock);
2051   {
2052     pthread_t self;
2053     pthread_t *temp;
2054
2055     /* Find out own index in the array */
2056     self = pthread_self ();
2057     for (i = 0; i < connection_threads_num; i++)
2058       if (pthread_equal (connection_threads[i], self) != 0)
2059         break;
2060     assert (i < connection_threads_num);
2061
2062     /* Move the trailing threads forward. */
2063     if (i < (connection_threads_num - 1))
2064     {
2065       memmove (connection_threads + i,
2066                connection_threads + i + 1,
2067                sizeof (pthread_t) * (connection_threads_num - i - 1));
2068     }
2069
2070     connection_threads_num--;
2071
2072     temp = rrd_realloc(connection_threads,
2073                    sizeof(*connection_threads) * connection_threads_num);
2074     if (connection_threads_num > 0 && temp == NULL)
2075       RRDD_LOG(LOG_ERR, "connection_thread_main: realloc(--) failed.");
2076     else
2077       connection_threads = temp;
2078   }
2079   pthread_mutex_unlock (&connection_threads_lock);
2080
2081   return (NULL);
2082 } /* }}} void *connection_thread_main */
2083
2084 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2085 {
2086   int fd;
2087   struct sockaddr_un sa;
2088   listen_socket_t *temp;
2089   int status;
2090   const char *path;
2091
2092   path = sock->addr;
2093   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2094     path += strlen("unix:");
2095
2096   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2097       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2098   if (temp == NULL)
2099   {
2100     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2101     return (-1);
2102   }
2103   listen_fds = temp;
2104   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2105
2106   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2107   if (fd < 0)
2108   {
2109     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2110              rrd_strerror(errno));
2111     return (-1);
2112   }
2113
2114   memset (&sa, 0, sizeof (sa));
2115   sa.sun_family = AF_UNIX;
2116   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2117
2118   /* if we've gotten this far, we own the pid file.  any daemon started
2119    * with the same args must not be alive.  therefore, ensure that we can
2120    * create the socket...
2121    */
2122   unlink(path);
2123
2124   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2125   if (status != 0)
2126   {
2127     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2128              path, rrd_strerror(errno));
2129     close (fd);
2130     return (-1);
2131   }
2132
2133   status = listen (fd, /* backlog = */ 10);
2134   if (status != 0)
2135   {
2136     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2137              path, rrd_strerror(errno));
2138     close (fd);
2139     unlink (path);
2140     return (-1);
2141   }
2142
2143   listen_fds[listen_fds_num].fd = fd;
2144   listen_fds[listen_fds_num].family = PF_UNIX;
2145   strncpy(listen_fds[listen_fds_num].addr, path,
2146           sizeof (listen_fds[listen_fds_num].addr) - 1);
2147   listen_fds_num++;
2148
2149   return (0);
2150 } /* }}} int open_listen_socket_unix */
2151
2152 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2153 {
2154   struct addrinfo ai_hints;
2155   struct addrinfo *ai_res;
2156   struct addrinfo *ai_ptr;
2157   char addr_copy[NI_MAXHOST];
2158   char *addr;
2159   char *port;
2160   int status;
2161
2162   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2163   addr_copy[sizeof (addr_copy) - 1] = 0;
2164   addr = addr_copy;
2165
2166   memset (&ai_hints, 0, sizeof (ai_hints));
2167   ai_hints.ai_flags = 0;
2168 #ifdef AI_ADDRCONFIG
2169   ai_hints.ai_flags |= AI_ADDRCONFIG;
2170 #endif
2171   ai_hints.ai_family = AF_UNSPEC;
2172   ai_hints.ai_socktype = SOCK_STREAM;
2173
2174   port = NULL;
2175   if (*addr == '[') /* IPv6+port format */
2176   {
2177     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2178     addr++;
2179
2180     port = strchr (addr, ']');
2181     if (port == NULL)
2182     {
2183       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2184       return (-1);
2185     }
2186     *port = 0;
2187     port++;
2188
2189     if (*port == ':')
2190       port++;
2191     else if (*port == 0)
2192       port = NULL;
2193     else
2194     {
2195       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2196       return (-1);
2197     }
2198   } /* if (*addr = ']') */
2199   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2200   {
2201     port = rindex(addr, ':');
2202     if (port != NULL)
2203     {
2204       *port = 0;
2205       port++;
2206     }
2207   }
2208   ai_res = NULL;
2209   status = getaddrinfo (addr,
2210                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2211                         &ai_hints, &ai_res);
2212   if (status != 0)
2213   {
2214     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2215              addr, gai_strerror (status));
2216     return (-1);
2217   }
2218
2219   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2220   {
2221     int fd;
2222     listen_socket_t *temp;
2223     int one = 1;
2224
2225     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2226         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2227     if (temp == NULL)
2228     {
2229       fprintf (stderr,
2230                "rrdcached: open_listen_socket_network: realloc failed.\n");
2231       continue;
2232     }
2233     listen_fds = temp;
2234     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2235
2236     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2237     if (fd < 0)
2238     {
2239       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2240                rrd_strerror(errno));
2241       continue;
2242     }
2243
2244     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2245
2246     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2247     if (status != 0)
2248     {
2249       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2250                sock->addr, rrd_strerror(errno));
2251       close (fd);
2252       continue;
2253     }
2254
2255     status = listen (fd, /* backlog = */ 10);
2256     if (status != 0)
2257     {
2258       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2259                sock->addr, rrd_strerror(errno));
2260       close (fd);
2261       freeaddrinfo(ai_res);
2262       return (-1);
2263     }
2264
2265     listen_fds[listen_fds_num].fd = fd;
2266     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2267     listen_fds_num++;
2268   } /* for (ai_ptr) */
2269
2270   freeaddrinfo(ai_res);
2271   return (0);
2272 } /* }}} static int open_listen_socket_network */
2273
2274 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2275 {
2276   assert(sock != NULL);
2277   assert(sock->addr != NULL);
2278
2279   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2280       || sock->addr[0] == '/')
2281     return (open_listen_socket_unix(sock));
2282   else
2283     return (open_listen_socket_network(sock));
2284 } /* }}} int open_listen_socket */
2285
2286 static int close_listen_sockets (void) /* {{{ */
2287 {
2288   size_t i;
2289
2290   for (i = 0; i < listen_fds_num; i++)
2291   {
2292     close (listen_fds[i].fd);
2293
2294     if (listen_fds[i].family == PF_UNIX)
2295       unlink(listen_fds[i].addr);
2296   }
2297
2298   free (listen_fds);
2299   listen_fds = NULL;
2300   listen_fds_num = 0;
2301
2302   return (0);
2303 } /* }}} int close_listen_sockets */
2304
2305 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2306 {
2307   struct pollfd *pollfds;
2308   int pollfds_num;
2309   int status;
2310   int i;
2311
2312   if (listen_fds_num < 1)
2313   {
2314     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2315     return (NULL);
2316   }
2317
2318   pollfds_num = listen_fds_num;
2319   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2320   if (pollfds == NULL)
2321   {
2322     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2323     return (NULL);
2324   }
2325   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2326
2327   RRDD_LOG(LOG_INFO, "listening for connections");
2328
2329   while (do_shutdown == 0)
2330   {
2331     for (i = 0; i < pollfds_num; i++)
2332     {
2333       pollfds[i].fd = listen_fds[i].fd;
2334       pollfds[i].events = POLLIN | POLLPRI;
2335       pollfds[i].revents = 0;
2336     }
2337
2338     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2339     if (do_shutdown)
2340       break;
2341     else if (status == 0) /* timeout */
2342       continue;
2343     else if (status < 0) /* error */
2344     {
2345       status = errno;
2346       if (status != EINTR)
2347       {
2348         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2349       }
2350       continue;
2351     }
2352
2353     for (i = 0; i < pollfds_num; i++)
2354     {
2355       listen_socket_t *client_sock;
2356       struct sockaddr_storage client_sa;
2357       socklen_t client_sa_size;
2358       pthread_t tid;
2359       pthread_attr_t attr;
2360
2361       if (pollfds[i].revents == 0)
2362         continue;
2363
2364       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2365       {
2366         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2367             "poll(2) returned something unexpected for listen FD #%i.",
2368             pollfds[i].fd);
2369         continue;
2370       }
2371
2372       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2373       if (client_sock == NULL)
2374       {
2375         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2376         continue;
2377       }
2378       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2379
2380       client_sa_size = sizeof (client_sa);
2381       client_sock->fd = accept (pollfds[i].fd,
2382           (struct sockaddr *) &client_sa, &client_sa_size);
2383       if (client_sock->fd < 0)
2384       {
2385         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2386         free(client_sock);
2387         continue;
2388       }
2389
2390       pthread_attr_init (&attr);
2391       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2392
2393       status = pthread_create (&tid, &attr, connection_thread_main,
2394                                client_sock);
2395       if (status != 0)
2396       {
2397         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2398         close_connection(client_sock);
2399         continue;
2400       }
2401     } /* for (pollfds_num) */
2402   } /* while (do_shutdown == 0) */
2403
2404   RRDD_LOG(LOG_INFO, "starting shutdown");
2405
2406   close_listen_sockets ();
2407
2408   pthread_mutex_lock (&connection_threads_lock);
2409   while (connection_threads_num > 0)
2410   {
2411     pthread_t wait_for;
2412
2413     wait_for = connection_threads[0];
2414
2415     pthread_mutex_unlock (&connection_threads_lock);
2416     pthread_join (wait_for, /* retval = */ NULL);
2417     pthread_mutex_lock (&connection_threads_lock);
2418   }
2419   pthread_mutex_unlock (&connection_threads_lock);
2420
2421   free(pollfds);
2422
2423   return (NULL);
2424 } /* }}} void *listen_thread_main */
2425
2426 static int daemonize (void) /* {{{ */
2427 {
2428   int pid_fd;
2429   char *base_dir;
2430
2431   daemon_uid = geteuid();
2432
2433   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2434   if (pid_fd < 0)
2435     pid_fd = check_pidfile();
2436   if (pid_fd < 0)
2437     return pid_fd;
2438
2439   /* open all the listen sockets */
2440   if (config_listen_address_list_len > 0)
2441   {
2442     for (int i = 0; i < config_listen_address_list_len; i++)
2443     {
2444       open_listen_socket (config_listen_address_list[i]);
2445       free_listen_socket (config_listen_address_list[i]);
2446     }
2447
2448     free(config_listen_address_list);
2449   }
2450   else
2451   {
2452     listen_socket_t sock;
2453     memset(&sock, 0, sizeof(sock));
2454     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2455     open_listen_socket (&sock);
2456   }
2457
2458   if (listen_fds_num < 1)
2459   {
2460     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2461     goto error;
2462   }
2463
2464   if (!stay_foreground)
2465   {
2466     pid_t child;
2467
2468     child = fork ();
2469     if (child < 0)
2470     {
2471       fprintf (stderr, "daemonize: fork(2) failed.\n");
2472       goto error;
2473     }
2474     else if (child > 0)
2475       exit(0);
2476
2477     /* Become session leader */
2478     setsid ();
2479
2480     /* Open the first three file descriptors to /dev/null */
2481     close (2);
2482     close (1);
2483     close (0);
2484
2485     open ("/dev/null", O_RDWR);
2486     dup (0);
2487     dup (0);
2488   } /* if (!stay_foreground) */
2489
2490   /* Change into the /tmp directory. */
2491   base_dir = (config_base_dir != NULL)
2492     ? config_base_dir
2493     : "/tmp";
2494
2495   if (chdir (base_dir) != 0)
2496   {
2497     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2498     goto error;
2499   }
2500
2501   install_signal_handlers();
2502
2503   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2504   RRDD_LOG(LOG_INFO, "starting up");
2505
2506   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2507                                 (GDestroyNotify) free_cache_item);
2508   if (cache_tree == NULL)
2509   {
2510     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2511     goto error;
2512   }
2513
2514   return write_pidfile (pid_fd);
2515
2516 error:
2517   remove_pidfile();
2518   return -1;
2519 } /* }}} int daemonize */
2520
2521 static int cleanup (void) /* {{{ */
2522 {
2523   do_shutdown++;
2524
2525   pthread_cond_broadcast (&flush_cond);
2526   pthread_join (flush_thread, NULL);
2527
2528   pthread_cond_broadcast (&queue_cond);
2529   for (int i = 0; i < config_queue_threads; i++)
2530     pthread_join (queue_threads[i], NULL);
2531
2532   if (config_flush_at_shutdown)
2533   {
2534     assert(cache_queue_head == NULL);
2535     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2536   }
2537
2538   journal_done();
2539   remove_pidfile ();
2540
2541   free(queue_threads);
2542   free(config_base_dir);
2543   free(config_pid_file);
2544   free(journal_cur);
2545   free(journal_old);
2546
2547   pthread_mutex_lock(&cache_lock);
2548   g_tree_destroy(cache_tree);
2549
2550   RRDD_LOG(LOG_INFO, "goodbye");
2551   closelog ();
2552
2553   return (0);
2554 } /* }}} int cleanup */
2555
2556 static int read_options (int argc, char **argv) /* {{{ */
2557 {
2558   int option;
2559   int status = 0;
2560
2561   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2562   {
2563     switch (option)
2564     {
2565       case 'g':
2566         stay_foreground=1;
2567         break;
2568
2569       case 'L':
2570       case 'l':
2571       {
2572         listen_socket_t **temp;
2573         listen_socket_t *new;
2574
2575         new = malloc(sizeof(listen_socket_t));
2576         if (new == NULL)
2577         {
2578           fprintf(stderr, "read_options: malloc failed.\n");
2579           return(2);
2580         }
2581         memset(new, 0, sizeof(listen_socket_t));
2582
2583         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2584             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2585         if (temp == NULL)
2586         {
2587           fprintf (stderr, "read_options: realloc failed.\n");
2588           return (2);
2589         }
2590         config_listen_address_list = temp;
2591
2592         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2593         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2594
2595         temp[config_listen_address_list_len] = new;
2596         config_listen_address_list_len++;
2597       }
2598       break;
2599
2600       case 'f':
2601       {
2602         int temp;
2603
2604         temp = atoi (optarg);
2605         if (temp > 0)
2606           config_flush_interval = temp;
2607         else
2608         {
2609           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2610           status = 3;
2611         }
2612       }
2613       break;
2614
2615       case 'w':
2616       {
2617         int temp;
2618
2619         temp = atoi (optarg);
2620         if (temp > 0)
2621           config_write_interval = temp;
2622         else
2623         {
2624           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2625           status = 2;
2626         }
2627       }
2628       break;
2629
2630       case 'z':
2631       {
2632         int temp;
2633
2634         temp = atoi(optarg);
2635         if (temp > 0)
2636           config_write_jitter = temp;
2637         else
2638         {
2639           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2640           status = 2;
2641         }
2642
2643         break;
2644       }
2645
2646       case 't':
2647       {
2648         int threads;
2649         threads = atoi(optarg);
2650         if (threads >= 1)
2651           config_queue_threads = threads;
2652         else
2653         {
2654           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2655           return 1;
2656         }
2657       }
2658       break;
2659
2660       case 'B':
2661         config_write_base_only = 1;
2662         break;
2663
2664       case 'b':
2665       {
2666         size_t len;
2667         char base_realpath[PATH_MAX];
2668
2669         if (config_base_dir != NULL)
2670           free (config_base_dir);
2671         config_base_dir = strdup (optarg);
2672         if (config_base_dir == NULL)
2673         {
2674           fprintf (stderr, "read_options: strdup failed.\n");
2675           return (3);
2676         }
2677
2678         /* make sure that the base directory is not resolved via
2679          * symbolic links.  this makes some performance-enhancing
2680          * assumptions possible (we don't have to resolve paths
2681          * that start with a "/")
2682          */
2683         if (realpath(config_base_dir, base_realpath) == NULL)
2684         {
2685           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2686           return 5;
2687         }
2688         else if (strncmp(config_base_dir,
2689                          base_realpath, sizeof(base_realpath)) != 0)
2690         {
2691           fprintf(stderr,
2692                   "Base directory (-b) resolved via file system links!\n"
2693                   "Please consult rrdcached '-b' documentation!\n"
2694                   "Consider specifying the real directory (%s)\n",
2695                   base_realpath);
2696           return 5;
2697         }
2698
2699         len = strlen (config_base_dir);
2700         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2701         {
2702           config_base_dir[len - 1] = 0;
2703           len--;
2704         }
2705
2706         if (len < 1)
2707         {
2708           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2709           return (4);
2710         }
2711
2712         _config_base_dir_len = len;
2713       }
2714       break;
2715
2716       case 'p':
2717       {
2718         if (config_pid_file != NULL)
2719           free (config_pid_file);
2720         config_pid_file = strdup (optarg);
2721         if (config_pid_file == NULL)
2722         {
2723           fprintf (stderr, "read_options: strdup failed.\n");
2724           return (3);
2725         }
2726       }
2727       break;
2728
2729       case 'F':
2730         config_flush_at_shutdown = 1;
2731         break;
2732
2733       case 'j':
2734       {
2735         struct stat statbuf;
2736         const char *dir = optarg;
2737
2738         status = stat(dir, &statbuf);
2739         if (status != 0)
2740         {
2741           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2742           return 6;
2743         }
2744
2745         if (!S_ISDIR(statbuf.st_mode)
2746             || access(dir, R_OK|W_OK|X_OK) != 0)
2747         {
2748           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2749                   errno ? rrd_strerror(errno) : "");
2750           return 6;
2751         }
2752
2753         journal_cur = malloc(PATH_MAX + 1);
2754         journal_old = malloc(PATH_MAX + 1);
2755         if (journal_cur == NULL || journal_old == NULL)
2756         {
2757           fprintf(stderr, "malloc failure for journal files\n");
2758           return 6;
2759         }
2760         else 
2761         {
2762           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2763           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2764         }
2765       }
2766       break;
2767
2768       case 'h':
2769       case '?':
2770         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2771             "\n"
2772             "Usage: rrdcached [options]\n"
2773             "\n"
2774             "Valid options are:\n"
2775             "  -l <address>  Socket address to listen to.\n"
2776             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2777             "  -w <seconds>  Interval in which to write data.\n"
2778             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2779             "  -t <threads>  Number of write threads.\n"
2780             "  -f <seconds>  Interval in which to flush dead data.\n"
2781             "  -p <file>     Location of the PID-file.\n"
2782             "  -b <dir>      Base directory to change to.\n"
2783             "  -B            Restrict file access to paths within -b <dir>\n"
2784             "  -g            Do not fork and run in the foreground.\n"
2785             "  -j <dir>      Directory in which to create the journal files.\n"
2786             "  -F            Always flush all updates at shutdown\n"
2787             "\n"
2788             "For more information and a detailed description of all options "
2789             "please refer\n"
2790             "to the rrdcached(1) manual page.\n",
2791             VERSION);
2792         status = -1;
2793         break;
2794     } /* switch (option) */
2795   } /* while (getopt) */
2796
2797   /* advise the user when values are not sane */
2798   if (config_flush_interval < 2 * config_write_interval)
2799     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2800             " 2x write interval (-w) !\n");
2801   if (config_write_jitter > config_write_interval)
2802     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2803             " write interval (-w) !\n");
2804
2805   if (config_write_base_only && config_base_dir == NULL)
2806     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2807             "  Consult the rrdcached documentation\n");
2808
2809   if (journal_cur == NULL)
2810     config_flush_at_shutdown = 1;
2811
2812   return (status);
2813 } /* }}} int read_options */
2814
2815 int main (int argc, char **argv)
2816 {
2817   int status;
2818
2819   status = read_options (argc, argv);
2820   if (status != 0)
2821   {
2822     if (status < 0)
2823       status = 0;
2824     return (status);
2825   }
2826
2827   status = daemonize ();
2828   if (status != 0)
2829   {
2830     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2831     return (1);
2832   }
2833
2834   journal_init();
2835
2836   /* start the queue threads */
2837   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2838   if (queue_threads == NULL)
2839   {
2840     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2841     cleanup();
2842     return (1);
2843   }
2844   for (int i = 0; i < config_queue_threads; i++)
2845   {
2846     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2847     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2848     if (status != 0)
2849     {
2850       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2851       cleanup();
2852       return (1);
2853     }
2854   }
2855
2856   /* start the flush thread */
2857   memset(&flush_thread, 0, sizeof(flush_thread));
2858   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2859   if (status != 0)
2860   {
2861     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2862     cleanup();
2863     return (1);
2864   }
2865
2866   listen_thread_main (NULL);
2867   cleanup ();
2868
2869   return (0);
2870 } /* int main */
2871
2872 /*
2873  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2874  */