The bookkeeping of all threads is not necessary, since we cannot pthread_join() them...
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct cache_item_s;
148 typedef struct cache_item_s cache_item_t;
149 struct cache_item_s
150 {
151   char *file;
152   char **values;
153   int values_num;
154   time_t last_flush_time;
155   time_t last_update_stamp;
156 #define CI_FLAGS_IN_TREE  (1<<0)
157 #define CI_FLAGS_IN_QUEUE (1<<1)
158   int flags;
159   pthread_cond_t  flushed;
160   cache_item_t *prev;
161   cache_item_t *next;
162 };
163
164 struct callback_flush_data_s
165 {
166   time_t now;
167   time_t abs_timeout;
168   char **keys;
169   size_t keys_num;
170 };
171 typedef struct callback_flush_data_s callback_flush_data_t;
172
173 enum queue_side_e
174 {
175   HEAD,
176   TAIL
177 };
178 typedef enum queue_side_e queue_side_t;
179
180 /* max length of socket command or response */
181 #define CMD_MAX 4096
182 #define RBUF_SIZE (CMD_MAX*2)
183
184 /*
185  * Variables
186  */
187 static int stay_foreground = 0;
188 static uid_t daemon_uid;
189
190 static listen_socket_t *listen_fds = NULL;
191 static size_t listen_fds_num = 0;
192
193 static int do_shutdown = 0;
194
195 static pthread_t *queue_threads;
196 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
197 static int config_queue_threads = 4;
198
199 static pthread_t flush_thread;
200 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
201
202 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
203 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
204 static int connection_threads_num = 0;
205
206 /* Cache stuff */
207 static GTree          *cache_tree = NULL;
208 static cache_item_t   *cache_queue_head = NULL;
209 static cache_item_t   *cache_queue_tail = NULL;
210 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
211
212 static int config_write_interval = 300;
213 static int config_write_jitter   = 0;
214 static int config_flush_interval = 3600;
215 static int config_flush_at_shutdown = 0;
216 static char *config_pid_file = NULL;
217 static char *config_base_dir = NULL;
218 static size_t _config_base_dir_len = 0;
219 static int config_write_base_only = 0;
220
221 static listen_socket_t **config_listen_address_list = NULL;
222 static int config_listen_address_list_len = 0;
223
224 static uint64_t stats_queue_length = 0;
225 static uint64_t stats_updates_received = 0;
226 static uint64_t stats_flush_received = 0;
227 static uint64_t stats_updates_written = 0;
228 static uint64_t stats_data_sets_written = 0;
229 static uint64_t stats_journal_bytes = 0;
230 static uint64_t stats_journal_rotate = 0;
231 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
232
233 /* Journaled updates */
234 static char *journal_cur = NULL;
235 static char *journal_old = NULL;
236 static FILE *journal_fh = NULL;
237 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
238 static int journal_write(char *cmd, char *args);
239 static void journal_done(void);
240 static void journal_rotate(void);
241
242 /* 
243  * Functions
244  */
245 static void sig_common (const char *sig) /* {{{ */
246 {
247   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
248   do_shutdown++;
249   pthread_cond_broadcast(&flush_cond);
250   pthread_cond_broadcast(&queue_cond);
251 } /* }}} void sig_common */
252
253 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
254 {
255   sig_common("INT");
256 } /* }}} void sig_int_handler */
257
258 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
259 {
260   sig_common("TERM");
261 } /* }}} void sig_term_handler */
262
263 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
264 {
265   config_flush_at_shutdown = 1;
266   sig_common("USR1");
267 } /* }}} void sig_usr1_handler */
268
269 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
270 {
271   config_flush_at_shutdown = 0;
272   sig_common("USR2");
273 } /* }}} void sig_usr2_handler */
274
275 static void install_signal_handlers(void) /* {{{ */
276 {
277   /* These structures are static, because `sigaction' behaves weird if the are
278    * overwritten.. */
279   static struct sigaction sa_int;
280   static struct sigaction sa_term;
281   static struct sigaction sa_pipe;
282   static struct sigaction sa_usr1;
283   static struct sigaction sa_usr2;
284
285   /* Install signal handlers */
286   memset (&sa_int, 0, sizeof (sa_int));
287   sa_int.sa_handler = sig_int_handler;
288   sigaction (SIGINT, &sa_int, NULL);
289
290   memset (&sa_term, 0, sizeof (sa_term));
291   sa_term.sa_handler = sig_term_handler;
292   sigaction (SIGTERM, &sa_term, NULL);
293
294   memset (&sa_pipe, 0, sizeof (sa_pipe));
295   sa_pipe.sa_handler = SIG_IGN;
296   sigaction (SIGPIPE, &sa_pipe, NULL);
297
298   memset (&sa_pipe, 0, sizeof (sa_usr1));
299   sa_usr1.sa_handler = sig_usr1_handler;
300   sigaction (SIGUSR1, &sa_usr1, NULL);
301
302   memset (&sa_usr2, 0, sizeof (sa_usr2));
303   sa_usr2.sa_handler = sig_usr2_handler;
304   sigaction (SIGUSR2, &sa_usr2, NULL);
305
306 } /* }}} void install_signal_handlers */
307
308 static int open_pidfile(char *action, int oflag) /* {{{ */
309 {
310   int fd;
311   char *file;
312
313   file = (config_pid_file != NULL)
314     ? config_pid_file
315     : LOCALSTATEDIR "/run/rrdcached.pid";
316
317   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
318   if (fd < 0)
319     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
320             action, file, rrd_strerror(errno));
321
322   return(fd);
323 } /* }}} static int open_pidfile */
324
325 /* check existing pid file to see whether a daemon is running */
326 static int check_pidfile(void)
327 {
328   int pid_fd;
329   pid_t pid;
330   char pid_str[16];
331
332   pid_fd = open_pidfile("open", O_RDWR);
333   if (pid_fd < 0)
334     return pid_fd;
335
336   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
337     return -1;
338
339   pid = atoi(pid_str);
340   if (pid <= 0)
341     return -1;
342
343   /* another running process that we can signal COULD be
344    * a competing rrdcached */
345   if (pid != getpid() && kill(pid, 0) == 0)
346   {
347     fprintf(stderr,
348             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
349     close(pid_fd);
350     return -1;
351   }
352
353   lseek(pid_fd, 0, SEEK_SET);
354   ftruncate(pid_fd, 0);
355
356   fprintf(stderr,
357           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
358           "rrdcached: starting normally.\n", pid);
359
360   return pid_fd;
361 } /* }}} static int check_pidfile */
362
363 static int write_pidfile (int fd) /* {{{ */
364 {
365   pid_t pid;
366   FILE *fh;
367
368   pid = getpid ();
369
370   fh = fdopen (fd, "w");
371   if (fh == NULL)
372   {
373     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
374     close(fd);
375     return (-1);
376   }
377
378   fprintf (fh, "%i\n", (int) pid);
379   fclose (fh);
380
381   return (0);
382 } /* }}} int write_pidfile */
383
384 static int remove_pidfile (void) /* {{{ */
385 {
386   char *file;
387   int status;
388
389   file = (config_pid_file != NULL)
390     ? config_pid_file
391     : LOCALSTATEDIR "/run/rrdcached.pid";
392
393   status = unlink (file);
394   if (status == 0)
395     return (0);
396   return (errno);
397 } /* }}} int remove_pidfile */
398
399 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
400 {
401   char *eol;
402
403   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
404                sock->next_read - sock->next_cmd);
405
406   if (eol == NULL)
407   {
408     /* no commands left, move remainder back to front of rbuf */
409     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
410             sock->next_read - sock->next_cmd);
411     sock->next_read -= sock->next_cmd;
412     sock->next_cmd = 0;
413     *len = 0;
414     return NULL;
415   }
416   else
417   {
418     char *cmd = sock->rbuf + sock->next_cmd;
419     *eol = '\0';
420
421     sock->next_cmd = eol - sock->rbuf + 1;
422
423     if (eol > sock->rbuf && *(eol-1) == '\r')
424       *(--eol) = '\0'; /* handle "\r\n" EOL */
425
426     *len = eol - cmd;
427
428     return cmd;
429   }
430
431   /* NOTREACHED */
432   assert(1==0);
433 }
434
435 /* add the characters directly to the write buffer */
436 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
437 {
438   char *new_buf;
439
440   assert(sock != NULL);
441
442   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
443   if (new_buf == NULL)
444   {
445     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
446     return -1;
447   }
448
449   strncpy(new_buf + sock->wbuf_len, str, len + 1);
450
451   sock->wbuf = new_buf;
452   sock->wbuf_len += len;
453
454   return 0;
455 } /* }}} static int add_to_wbuf */
456
457 /* add the text to the "extra" info that's sent after the status line */
458 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
459 {
460   va_list argp;
461   char buffer[CMD_MAX];
462   int len;
463
464   if (sock == NULL) return 0; /* journal replay mode */
465   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
466
467   va_start(argp, fmt);
468 #ifdef HAVE_VSNPRINTF
469   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
470 #else
471   len = vsprintf(buffer, fmt, argp);
472 #endif
473   va_end(argp);
474   if (len < 0)
475   {
476     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
477     return -1;
478   }
479
480   return add_to_wbuf(sock, buffer, len);
481 } /* }}} static int add_response_info */
482
483 static int count_lines(char *str) /* {{{ */
484 {
485   int lines = 0;
486
487   if (str != NULL)
488   {
489     while ((str = strchr(str, '\n')) != NULL)
490     {
491       ++lines;
492       ++str;
493     }
494   }
495
496   return lines;
497 } /* }}} static int count_lines */
498
499 /* send the response back to the user.
500  * returns 0 on success, -1 on error
501  * write buffer is always zeroed after this call */
502 static int send_response (listen_socket_t *sock, response_code rc,
503                           char *fmt, ...) /* {{{ */
504 {
505   va_list argp;
506   char buffer[CMD_MAX];
507   int lines;
508   ssize_t wrote;
509   int rclen, len;
510
511   if (sock == NULL) return rc;  /* journal replay mode */
512
513   if (sock->batch_start)
514   {
515     if (rc == RESP_OK)
516       return rc; /* no response on success during BATCH */
517     lines = sock->batch_cmd;
518   }
519   else if (rc == RESP_OK)
520     lines = count_lines(sock->wbuf);
521   else
522     lines = -1;
523
524   rclen = sprintf(buffer, "%d ", lines);
525   va_start(argp, fmt);
526 #ifdef HAVE_VSNPRINTF
527   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
528 #else
529   len = vsprintf(buffer+rclen, fmt, argp);
530 #endif
531   va_end(argp);
532   if (len < 0)
533     return -1;
534
535   len += rclen;
536
537   /* append the result to the wbuf, don't write to the user */
538   if (sock->batch_start)
539     return add_to_wbuf(sock, buffer, len);
540
541   /* first write must be complete */
542   if (len != write(sock->fd, buffer, len))
543   {
544     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
545     return -1;
546   }
547
548   if (sock->wbuf != NULL && rc == RESP_OK)
549   {
550     wrote = 0;
551     while (wrote < sock->wbuf_len)
552     {
553       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
554       if (wb <= 0)
555       {
556         RRDD_LOG(LOG_INFO, "send_response: could not write results");
557         return -1;
558       }
559       wrote += wb;
560     }
561   }
562
563   free(sock->wbuf); sock->wbuf = NULL;
564   sock->wbuf_len = 0;
565
566   return 0;
567 } /* }}} */
568
569 static void wipe_ci_values(cache_item_t *ci, time_t when)
570 {
571   ci->values = NULL;
572   ci->values_num = 0;
573
574   ci->last_flush_time = when;
575   if (config_write_jitter > 0)
576     ci->last_flush_time += (random() % config_write_jitter);
577 }
578
579 /* remove_from_queue
580  * remove a "cache_item_t" item from the queue.
581  * must hold 'cache_lock' when calling this
582  */
583 static void remove_from_queue(cache_item_t *ci) /* {{{ */
584 {
585   if (ci == NULL) return;
586   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
587
588   if (ci->prev == NULL)
589     cache_queue_head = ci->next; /* reset head */
590   else
591     ci->prev->next = ci->next;
592
593   if (ci->next == NULL)
594     cache_queue_tail = ci->prev; /* reset the tail */
595   else
596     ci->next->prev = ci->prev;
597
598   ci->next = ci->prev = NULL;
599   ci->flags &= ~CI_FLAGS_IN_QUEUE;
600
601   pthread_mutex_lock (&stats_lock);
602   assert (stats_queue_length > 0);
603   stats_queue_length--;
604   pthread_mutex_unlock (&stats_lock);
605
606 } /* }}} static void remove_from_queue */
607
608 /* free the resources associated with the cache_item_t
609  * must hold cache_lock when calling this function
610  */
611 static void *free_cache_item(cache_item_t *ci) /* {{{ */
612 {
613   if (ci == NULL) return NULL;
614
615   remove_from_queue(ci);
616
617   for (int i=0; i < ci->values_num; i++)
618     free(ci->values[i]);
619
620   free (ci->values);
621   free (ci->file);
622
623   /* in case anyone is waiting */
624   pthread_cond_broadcast(&ci->flushed);
625
626   free (ci);
627
628   return NULL;
629 } /* }}} static void *free_cache_item */
630
631 /*
632  * enqueue_cache_item:
633  * `cache_lock' must be acquired before calling this function!
634  */
635 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
636     queue_side_t side)
637 {
638   if (ci == NULL)
639     return (-1);
640
641   if (ci->values_num == 0)
642     return (0);
643
644   if (side == HEAD)
645   {
646     if (cache_queue_head == ci)
647       return 0;
648
649     /* remove if further down in queue */
650     remove_from_queue(ci);
651
652     ci->prev = NULL;
653     ci->next = cache_queue_head;
654     if (ci->next != NULL)
655       ci->next->prev = ci;
656     cache_queue_head = ci;
657
658     if (cache_queue_tail == NULL)
659       cache_queue_tail = cache_queue_head;
660   }
661   else /* (side == TAIL) */
662   {
663     /* We don't move values back in the list.. */
664     if (ci->flags & CI_FLAGS_IN_QUEUE)
665       return (0);
666
667     assert (ci->next == NULL);
668     assert (ci->prev == NULL);
669
670     ci->prev = cache_queue_tail;
671
672     if (cache_queue_tail == NULL)
673       cache_queue_head = ci;
674     else
675       cache_queue_tail->next = ci;
676
677     cache_queue_tail = ci;
678   }
679
680   ci->flags |= CI_FLAGS_IN_QUEUE;
681
682   pthread_cond_signal(&queue_cond);
683   pthread_mutex_lock (&stats_lock);
684   stats_queue_length++;
685   pthread_mutex_unlock (&stats_lock);
686
687   return (0);
688 } /* }}} int enqueue_cache_item */
689
690 /*
691  * tree_callback_flush:
692  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
693  * while this is in progress.
694  */
695 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
696     gpointer data)
697 {
698   cache_item_t *ci;
699   callback_flush_data_t *cfd;
700
701   ci = (cache_item_t *) value;
702   cfd = (callback_flush_data_t *) data;
703
704   if (ci->flags & CI_FLAGS_IN_QUEUE)
705     return FALSE;
706
707   if ((ci->last_flush_time <= cfd->abs_timeout)
708       && (ci->values_num > 0))
709   {
710     enqueue_cache_item (ci, TAIL);
711   }
712   else if ((do_shutdown != 0)
713       && (ci->values_num > 0))
714   {
715     enqueue_cache_item (ci, TAIL);
716   }
717   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
718       && (ci->values_num <= 0))
719   {
720     char **temp;
721
722     temp = (char **) rrd_realloc (cfd->keys,
723         sizeof (char *) * (cfd->keys_num + 1));
724     if (temp == NULL)
725     {
726       RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
727       return (FALSE);
728     }
729     cfd->keys = temp;
730     /* Make really sure this points to the _same_ place */
731     assert ((char *) key == ci->file);
732     cfd->keys[cfd->keys_num] = (char *) key;
733     cfd->keys_num++;
734   }
735
736   return (FALSE);
737 } /* }}} gboolean tree_callback_flush */
738
739 static int flush_old_values (int max_age)
740 {
741   callback_flush_data_t cfd;
742   size_t k;
743
744   memset (&cfd, 0, sizeof (cfd));
745   /* Pass the current time as user data so that we don't need to call
746    * `time' for each node. */
747   cfd.now = time (NULL);
748   cfd.keys = NULL;
749   cfd.keys_num = 0;
750
751   if (max_age > 0)
752     cfd.abs_timeout = cfd.now - max_age;
753   else
754     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
755
756   /* `tree_callback_flush' will return the keys of all values that haven't
757    * been touched in the last `config_flush_interval' seconds in `cfd'.
758    * The char*'s in this array point to the same memory as ci->file, so we
759    * don't need to free them separately. */
760   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
761
762   for (k = 0; k < cfd.keys_num; k++)
763   {
764     /* should never fail, since we have held the cache_lock
765      * the entire time */
766     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
767   }
768
769   if (cfd.keys != NULL)
770   {
771     free (cfd.keys);
772     cfd.keys = NULL;
773   }
774
775   return (0);
776 } /* int flush_old_values */
777
778 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
779 {
780   struct timeval now;
781   struct timespec next_flush;
782   int status;
783
784   gettimeofday (&now, NULL);
785   next_flush.tv_sec = now.tv_sec + config_flush_interval;
786   next_flush.tv_nsec = 1000 * now.tv_usec;
787
788   pthread_mutex_lock(&cache_lock);
789
790   while (!do_shutdown)
791   {
792     gettimeofday (&now, NULL);
793     if ((now.tv_sec > next_flush.tv_sec)
794         || ((now.tv_sec == next_flush.tv_sec)
795           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
796     {
797       /* Flush all values that haven't been written in the last
798        * `config_write_interval' seconds. */
799       flush_old_values (config_write_interval);
800
801       /* Determine the time of the next cache flush. */
802       next_flush.tv_sec =
803         now.tv_sec + next_flush.tv_sec % config_flush_interval;
804
805       /* unlock the cache while we rotate so we don't block incoming
806        * updates if the fsync() blocks on disk I/O */
807       pthread_mutex_unlock(&cache_lock);
808       journal_rotate();
809       pthread_mutex_lock(&cache_lock);
810     }
811
812     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
813     if (status != 0 && status != ETIMEDOUT)
814     {
815       RRDD_LOG (LOG_ERR, "flush_thread_main: "
816                 "pthread_cond_timedwait returned %i.", status);
817     }
818   }
819
820   if (config_flush_at_shutdown)
821     flush_old_values (-1); /* flush everything */
822
823   pthread_mutex_unlock(&cache_lock);
824
825   return NULL;
826 } /* void *flush_thread_main */
827
828 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
829 {
830   pthread_mutex_lock (&cache_lock);
831
832   while (!do_shutdown
833          || (cache_queue_head != NULL && config_flush_at_shutdown))
834   {
835     cache_item_t *ci;
836     char *file;
837     char **values;
838     int values_num;
839     int status;
840     int i;
841
842     /* Now, check if there's something to store away. If not, wait until
843      * something comes in.  if we are shutting down, do not wait around.  */
844     if (cache_queue_head == NULL && !do_shutdown)
845     {
846       status = pthread_cond_wait (&queue_cond, &cache_lock);
847       if ((status != 0) && (status != ETIMEDOUT))
848       {
849         RRDD_LOG (LOG_ERR, "queue_thread_main: "
850             "pthread_cond_wait returned %i.", status);
851       }
852     }
853
854     /* Check if a value has arrived. This may be NULL if we timed out or there
855      * was an interrupt such as a signal. */
856     if (cache_queue_head == NULL)
857       continue;
858
859     ci = cache_queue_head;
860
861     /* copy the relevant parts */
862     file = strdup (ci->file);
863     if (file == NULL)
864     {
865       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
866       continue;
867     }
868
869     assert(ci->values != NULL);
870     assert(ci->values_num > 0);
871
872     values = ci->values;
873     values_num = ci->values_num;
874
875     wipe_ci_values(ci, time(NULL));
876     remove_from_queue(ci);
877
878     pthread_mutex_unlock (&cache_lock);
879
880     rrd_clear_error ();
881     status = rrd_update_r (file, NULL, values_num, (void *) values);
882     if (status != 0)
883     {
884       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
885           "rrd_update_r (%s) failed with status %i. (%s)",
886           file, status, rrd_get_error());
887     }
888
889     journal_write("wrote", file);
890     pthread_cond_broadcast(&ci->flushed);
891
892     for (i = 0; i < values_num; i++)
893       free (values[i]);
894
895     free(values);
896     free(file);
897
898     if (status == 0)
899     {
900       pthread_mutex_lock (&stats_lock);
901       stats_updates_written++;
902       stats_data_sets_written += values_num;
903       pthread_mutex_unlock (&stats_lock);
904     }
905
906     pthread_mutex_lock (&cache_lock);
907   }
908   pthread_mutex_unlock (&cache_lock);
909
910   return (NULL);
911 } /* }}} void *queue_thread_main */
912
913 static int buffer_get_field (char **buffer_ret, /* {{{ */
914     size_t *buffer_size_ret, char **field_ret)
915 {
916   char *buffer;
917   size_t buffer_pos;
918   size_t buffer_size;
919   char *field;
920   size_t field_size;
921   int status;
922
923   buffer = *buffer_ret;
924   buffer_pos = 0;
925   buffer_size = *buffer_size_ret;
926   field = *buffer_ret;
927   field_size = 0;
928
929   if (buffer_size <= 0)
930     return (-1);
931
932   /* This is ensured by `handle_request'. */
933   assert (buffer[buffer_size - 1] == '\0');
934
935   status = -1;
936   while (buffer_pos < buffer_size)
937   {
938     /* Check for end-of-field or end-of-buffer */
939     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
940     {
941       field[field_size] = 0;
942       field_size++;
943       buffer_pos++;
944       status = 0;
945       break;
946     }
947     /* Handle escaped characters. */
948     else if (buffer[buffer_pos] == '\\')
949     {
950       if (buffer_pos >= (buffer_size - 1))
951         break;
952       buffer_pos++;
953       field[field_size] = buffer[buffer_pos];
954       field_size++;
955       buffer_pos++;
956     }
957     /* Normal operation */ 
958     else
959     {
960       field[field_size] = buffer[buffer_pos];
961       field_size++;
962       buffer_pos++;
963     }
964   } /* while (buffer_pos < buffer_size) */
965
966   if (status != 0)
967     return (status);
968
969   *buffer_ret = buffer + buffer_pos;
970   *buffer_size_ret = buffer_size - buffer_pos;
971   *field_ret = field;
972
973   return (0);
974 } /* }}} int buffer_get_field */
975
976 /* if we're restricting writes to the base directory,
977  * check whether the file falls within the dir
978  * returns 1 if OK, otherwise 0
979  */
980 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
981 {
982   assert(file != NULL);
983
984   if (!config_write_base_only
985       || sock == NULL /* journal replay */
986       || config_base_dir == NULL)
987     return 1;
988
989   if (strstr(file, "../") != NULL) goto err;
990
991   /* relative paths without "../" are ok */
992   if (*file != '/') return 1;
993
994   /* file must be of the format base + "/" + <1+ char filename> */
995   if (strlen(file) < _config_base_dir_len + 2) goto err;
996   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
997   if (*(file + _config_base_dir_len) != '/') goto err;
998
999   return 1;
1000
1001 err:
1002   if (sock != NULL && sock->fd >= 0)
1003     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1004
1005   return 0;
1006 } /* }}} static int check_file_access */
1007
1008 /* when using a base dir, convert relative paths to absolute paths.
1009  * if necessary, modifies the "filename" pointer to point
1010  * to the new path created in "tmp".  "tmp" is provided
1011  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1012  *
1013  * this allows us to optimize for the expected case (absolute path)
1014  * with a no-op.
1015  */
1016 static void get_abs_path(char **filename, char *tmp)
1017 {
1018   assert(tmp != NULL);
1019   assert(filename != NULL && *filename != NULL);
1020
1021   if (config_base_dir == NULL || **filename == '/')
1022     return;
1023
1024   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1025   *filename = tmp;
1026 } /* }}} static int get_abs_path */
1027
1028 /* returns 1 if we have the required privilege level,
1029  * otherwise issue an error to the user on sock */
1030 static int has_privilege (listen_socket_t *sock, /* {{{ */
1031                           socket_privilege priv)
1032 {
1033   if (sock == NULL) /* journal replay */
1034     return 1;
1035
1036   if (sock->privilege >= priv)
1037     return 1;
1038
1039   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1040 } /* }}} static int has_privilege */
1041
1042 static int flush_file (const char *filename) /* {{{ */
1043 {
1044   cache_item_t *ci;
1045
1046   pthread_mutex_lock (&cache_lock);
1047
1048   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1049   if (ci == NULL)
1050   {
1051     pthread_mutex_unlock (&cache_lock);
1052     return (ENOENT);
1053   }
1054
1055   if (ci->values_num > 0)
1056   {
1057     /* Enqueue at head */
1058     enqueue_cache_item (ci, HEAD);
1059     pthread_cond_wait(&ci->flushed, &cache_lock);
1060   }
1061
1062   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1063    * may have been purged during our cond_wait() */
1064
1065   pthread_mutex_unlock(&cache_lock);
1066
1067   return (0);
1068 } /* }}} int flush_file */
1069
1070 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1071     char *buffer, size_t buffer_size)
1072 {
1073   int status;
1074   char **help_text;
1075   char *command;
1076
1077   char *help_help[2] =
1078   {
1079     "Command overview\n"
1080     ,
1081     "HELP [<command>]\n"
1082     "FLUSH <filename>\n"
1083     "FLUSHALL\n"
1084     "PENDING <filename>\n"
1085     "FORGET <filename>\n"
1086     "QUEUE\n"
1087     "UPDATE <filename> <values> [<values> ...]\n"
1088     "BATCH\n"
1089     "STATS\n"
1090     "QUIT\n"
1091   };
1092
1093   char *help_flush[2] =
1094   {
1095     "Help for FLUSH\n"
1096     ,
1097     "Usage: FLUSH <filename>\n"
1098     "\n"
1099     "Adds the given filename to the head of the update queue and returns\n"
1100     "after it has been dequeued.\n"
1101   };
1102
1103   char *help_flushall[2] =
1104   {
1105     "Help for FLUSHALL\n"
1106     ,
1107     "Usage: FLUSHALL\n"
1108     "\n"
1109     "Triggers writing of all pending updates.  Returns immediately.\n"
1110   };
1111
1112   char *help_pending[2] =
1113   {
1114     "Help for PENDING\n"
1115     ,
1116     "Usage: PENDING <filename>\n"
1117     "\n"
1118     "Shows any 'pending' updates for a file, in order.\n"
1119     "The updates shown have not yet been written to the underlying RRD file.\n"
1120   };
1121
1122   char *help_forget[2] =
1123   {
1124     "Help for FORGET\n"
1125     ,
1126     "Usage: FORGET <filename>\n"
1127     "\n"
1128     "Removes the file completely from the cache.\n"
1129     "Any pending updates for the file will be lost.\n"
1130   };
1131
1132   char *help_queue[2] =
1133   {
1134     "Help for QUEUE\n"
1135     ,
1136     "Shows all files in the output queue.\n"
1137     "The output is zero or more lines in the following format:\n"
1138     "(where <num_vals> is the number of values to be written)\n"
1139     "\n"
1140     "<num_vals> <filename>\n"
1141     "\n"
1142   };
1143
1144   char *help_update[2] =
1145   {
1146     "Help for UPDATE\n"
1147     ,
1148     "Usage: UPDATE <filename> <values> [<values> ...]\n"
1149     "\n"
1150     "Adds the given file to the internal cache if it is not yet known and\n"
1151     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1152     "for details.\n"
1153     "\n"
1154     "Each <values> has the following form:\n"
1155     "  <values> = <time>:<value>[:<value>[...]]\n"
1156     "See the rrdupdate(1) manpage for details.\n"
1157   };
1158
1159   char *help_stats[2] =
1160   {
1161     "Help for STATS\n"
1162     ,
1163     "Usage: STATS\n"
1164     "\n"
1165     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1166     "a description of the values.\n"
1167   };
1168
1169   char *help_batch[2] =
1170   {
1171     "Help for BATCH\n"
1172     ,
1173     "The 'BATCH' command permits the client to initiate a bulk load\n"
1174     "   of commands to rrdcached.\n"
1175     "\n"
1176     "Usage:\n"
1177     "\n"
1178     "    client: BATCH\n"
1179     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1180     "    client: command #1\n"
1181     "    client: command #2\n"
1182     "    client: ... and so on\n"
1183     "    client: .\n"
1184     "    server: 2 errors\n"
1185     "    server: 7 message for command #7\n"
1186     "    server: 9 message for command #9\n"
1187     "\n"
1188     "For more information, consult the rrdcached(1) documentation.\n"
1189   };
1190
1191   char *help_quit[2] =
1192   {
1193     "Help for QUIT\n"
1194     ,
1195     "Disconnect from rrdcached.\n"
1196   };
1197
1198   status = buffer_get_field (&buffer, &buffer_size, &command);
1199   if (status != 0)
1200     help_text = help_help;
1201   else
1202   {
1203     if (strcasecmp (command, "update") == 0)
1204       help_text = help_update;
1205     else if (strcasecmp (command, "flush") == 0)
1206       help_text = help_flush;
1207     else if (strcasecmp (command, "flushall") == 0)
1208       help_text = help_flushall;
1209     else if (strcasecmp (command, "pending") == 0)
1210       help_text = help_pending;
1211     else if (strcasecmp (command, "forget") == 0)
1212       help_text = help_forget;
1213     else if (strcasecmp (command, "queue") == 0)
1214       help_text = help_queue;
1215     else if (strcasecmp (command, "stats") == 0)
1216       help_text = help_stats;
1217     else if (strcasecmp (command, "batch") == 0)
1218       help_text = help_batch;
1219     else if (strcasecmp (command, "quit") == 0)
1220       help_text = help_quit;
1221     else
1222       help_text = help_help;
1223   }
1224
1225   add_response_info(sock, help_text[1]);
1226   return send_response(sock, RESP_OK, help_text[0]);
1227 } /* }}} int handle_request_help */
1228
1229 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1230 {
1231   uint64_t copy_queue_length;
1232   uint64_t copy_updates_received;
1233   uint64_t copy_flush_received;
1234   uint64_t copy_updates_written;
1235   uint64_t copy_data_sets_written;
1236   uint64_t copy_journal_bytes;
1237   uint64_t copy_journal_rotate;
1238
1239   uint64_t tree_nodes_number;
1240   uint64_t tree_depth;
1241
1242   pthread_mutex_lock (&stats_lock);
1243   copy_queue_length       = stats_queue_length;
1244   copy_updates_received   = stats_updates_received;
1245   copy_flush_received     = stats_flush_received;
1246   copy_updates_written    = stats_updates_written;
1247   copy_data_sets_written  = stats_data_sets_written;
1248   copy_journal_bytes      = stats_journal_bytes;
1249   copy_journal_rotate     = stats_journal_rotate;
1250   pthread_mutex_unlock (&stats_lock);
1251
1252   pthread_mutex_lock (&cache_lock);
1253   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1254   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1255   pthread_mutex_unlock (&cache_lock);
1256
1257   add_response_info(sock,
1258                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1259   add_response_info(sock,
1260                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1261   add_response_info(sock,
1262                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1263   add_response_info(sock,
1264                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1265   add_response_info(sock,
1266                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1267   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1268   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1269   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1270   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1271
1272   send_response(sock, RESP_OK, "Statistics follow\n");
1273
1274   return (0);
1275 } /* }}} int handle_request_stats */
1276
1277 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1278     char *buffer, size_t buffer_size)
1279 {
1280   char *file, file_tmp[PATH_MAX];
1281   int status;
1282
1283   status = buffer_get_field (&buffer, &buffer_size, &file);
1284   if (status != 0)
1285   {
1286     return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1287   }
1288   else
1289   {
1290     pthread_mutex_lock(&stats_lock);
1291     stats_flush_received++;
1292     pthread_mutex_unlock(&stats_lock);
1293
1294     get_abs_path(&file, file_tmp);
1295     if (!check_file_access(file, sock)) return 0;
1296
1297     status = flush_file (file);
1298     if (status == 0)
1299       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1300     else if (status == ENOENT)
1301     {
1302       /* no file in our tree; see whether it exists at all */
1303       struct stat statbuf;
1304
1305       memset(&statbuf, 0, sizeof(statbuf));
1306       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1307         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1308       else
1309         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1310     }
1311     else if (status < 0)
1312       return send_response(sock, RESP_ERR, "Internal error.\n");
1313     else
1314       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1315   }
1316
1317   /* NOTREACHED */
1318   assert(1==0);
1319 } /* }}} int handle_request_flush */
1320
1321 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1322 {
1323   int status;
1324
1325   status = has_privilege(sock, PRIV_HIGH);
1326   if (status <= 0)
1327     return status;
1328
1329   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1330
1331   pthread_mutex_lock(&cache_lock);
1332   flush_old_values(-1);
1333   pthread_mutex_unlock(&cache_lock);
1334
1335   return send_response(sock, RESP_OK, "Started flush.\n");
1336 } /* }}} static int handle_request_flushall */
1337
1338 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1339                                   char *buffer, size_t buffer_size)
1340 {
1341   int status;
1342   char *file, file_tmp[PATH_MAX];
1343   cache_item_t *ci;
1344
1345   status = buffer_get_field(&buffer, &buffer_size, &file);
1346   if (status != 0)
1347     return send_response(sock, RESP_ERR,
1348                          "Usage: PENDING <filename>\n");
1349
1350   status = has_privilege(sock, PRIV_HIGH);
1351   if (status <= 0)
1352     return status;
1353
1354   get_abs_path(&file, file_tmp);
1355
1356   pthread_mutex_lock(&cache_lock);
1357   ci = g_tree_lookup(cache_tree, file);
1358   if (ci == NULL)
1359   {
1360     pthread_mutex_unlock(&cache_lock);
1361     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1362   }
1363
1364   for (int i=0; i < ci->values_num; i++)
1365     add_response_info(sock, "%s\n", ci->values[i]);
1366
1367   pthread_mutex_unlock(&cache_lock);
1368   return send_response(sock, RESP_OK, "updates pending\n");
1369 } /* }}} static int handle_request_pending */
1370
1371 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1372                                  char *buffer, size_t buffer_size)
1373 {
1374   int status;
1375   gboolean found;
1376   char *file, file_tmp[PATH_MAX];
1377
1378   status = buffer_get_field(&buffer, &buffer_size, &file);
1379   if (status != 0)
1380     return send_response(sock, RESP_ERR,
1381                          "Usage: FORGET <filename>\n");
1382
1383   status = has_privilege(sock, PRIV_HIGH);
1384   if (status <= 0)
1385     return status;
1386
1387   get_abs_path(&file, file_tmp);
1388   if (!check_file_access(file, sock)) return 0;
1389
1390   pthread_mutex_lock(&cache_lock);
1391   found = g_tree_remove(cache_tree, file);
1392   pthread_mutex_unlock(&cache_lock);
1393
1394   if (found == TRUE)
1395   {
1396     if (sock != NULL)
1397       journal_write("forget", file);
1398
1399     return send_response(sock, RESP_OK, "Gone!\n");
1400   }
1401   else
1402     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1403
1404   /* NOTREACHED */
1405   assert(1==0);
1406 } /* }}} static int handle_request_forget */
1407
1408 static int handle_request_queue (listen_socket_t *sock) /* {{{ */
1409 {
1410   cache_item_t *ci;
1411
1412   pthread_mutex_lock(&cache_lock);
1413
1414   ci = cache_queue_head;
1415   while (ci != NULL)
1416   {
1417     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1418     ci = ci->next;
1419   }
1420
1421   pthread_mutex_unlock(&cache_lock);
1422
1423   return send_response(sock, RESP_OK, "in queue.\n");
1424 } /* }}} int handle_request_queue */
1425
1426 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1427                                   time_t now,
1428                                   char *buffer, size_t buffer_size)
1429 {
1430   char *file, file_tmp[PATH_MAX];
1431   int values_num = 0;
1432   int status;
1433   char orig_buf[CMD_MAX];
1434
1435   cache_item_t *ci;
1436
1437   status = has_privilege(sock, PRIV_HIGH);
1438   if (status <= 0)
1439     return status;
1440
1441   /* save it for the journal later */
1442   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1443
1444   status = buffer_get_field (&buffer, &buffer_size, &file);
1445   if (status != 0)
1446     return send_response(sock, RESP_ERR,
1447                          "Usage: UPDATE <filename> <values> [<values> ...]\n");
1448
1449   pthread_mutex_lock(&stats_lock);
1450   stats_updates_received++;
1451   pthread_mutex_unlock(&stats_lock);
1452
1453   get_abs_path(&file, file_tmp);
1454   if (!check_file_access(file, sock)) return 0;
1455
1456   pthread_mutex_lock (&cache_lock);
1457   ci = g_tree_lookup (cache_tree, file);
1458
1459   if (ci == NULL) /* {{{ */
1460   {
1461     struct stat statbuf;
1462
1463     /* don't hold the lock while we setup; stat(2) might block */
1464     pthread_mutex_unlock(&cache_lock);
1465
1466     memset (&statbuf, 0, sizeof (statbuf));
1467     status = stat (file, &statbuf);
1468     if (status != 0)
1469     {
1470       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1471
1472       status = errno;
1473       if (status == ENOENT)
1474         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1475       else
1476         return send_response(sock, RESP_ERR,
1477                              "stat failed with error %i.\n", status);
1478     }
1479     if (!S_ISREG (statbuf.st_mode))
1480       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1481
1482     if (access(file, R_OK|W_OK) != 0)
1483       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1484                            file, rrd_strerror(errno));
1485
1486     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1487     if (ci == NULL)
1488     {
1489       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1490
1491       return send_response(sock, RESP_ERR, "malloc failed.\n");
1492     }
1493     memset (ci, 0, sizeof (cache_item_t));
1494
1495     ci->file = strdup (file);
1496     if (ci->file == NULL)
1497     {
1498       free (ci);
1499       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1500
1501       return send_response(sock, RESP_ERR, "strdup failed.\n");
1502     }
1503
1504     wipe_ci_values(ci, now);
1505     ci->flags = CI_FLAGS_IN_TREE;
1506     pthread_cond_init(&ci->flushed, NULL);
1507
1508     pthread_mutex_lock(&cache_lock);
1509     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1510   } /* }}} */
1511   assert (ci != NULL);
1512
1513   /* don't re-write updates in replay mode */
1514   if (sock != NULL)
1515     journal_write("update", orig_buf);
1516
1517   while (buffer_size > 0)
1518   {
1519     char **temp;
1520     char *value;
1521     time_t stamp;
1522     char *eostamp;
1523
1524     status = buffer_get_field (&buffer, &buffer_size, &value);
1525     if (status != 0)
1526     {
1527       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1528       break;
1529     }
1530
1531     /* make sure update time is always moving forward */
1532     stamp = strtol(value, &eostamp, 10);
1533     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1534     {
1535       pthread_mutex_unlock(&cache_lock);
1536       return send_response(sock, RESP_ERR,
1537                            "Cannot find timestamp in '%s'!\n", value);
1538     }
1539     else if (stamp <= ci->last_update_stamp)
1540     {
1541       pthread_mutex_unlock(&cache_lock);
1542       return send_response(sock, RESP_ERR,
1543                            "illegal attempt to update using time %ld when last"
1544                            " update time is %ld (minimum one second step)\n",
1545                            stamp, ci->last_update_stamp);
1546     }
1547     else
1548       ci->last_update_stamp = stamp;
1549
1550     temp = (char **) rrd_realloc (ci->values,
1551         sizeof (char *) * (ci->values_num + 1));
1552     if (temp == NULL)
1553     {
1554       RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1555       continue;
1556     }
1557     ci->values = temp;
1558
1559     ci->values[ci->values_num] = strdup (value);
1560     if (ci->values[ci->values_num] == NULL)
1561     {
1562       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1563       continue;
1564     }
1565     ci->values_num++;
1566
1567     values_num++;
1568   }
1569
1570   if (((now - ci->last_flush_time) >= config_write_interval)
1571       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1572       && (ci->values_num > 0))
1573   {
1574     enqueue_cache_item (ci, TAIL);
1575   }
1576
1577   pthread_mutex_unlock (&cache_lock);
1578
1579   if (values_num < 1)
1580     return send_response(sock, RESP_ERR, "No values updated.\n");
1581   else
1582     return send_response(sock, RESP_OK,
1583                          "errors, enqueued %i value(s).\n", values_num);
1584
1585   /* NOTREACHED */
1586   assert(1==0);
1587
1588 } /* }}} int handle_request_update */
1589
1590 /* we came across a "WROTE" entry during journal replay.
1591  * throw away any values that we have accumulated for this file
1592  */
1593 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1594 {
1595   int i;
1596   cache_item_t *ci;
1597   const char *file = buffer;
1598
1599   pthread_mutex_lock(&cache_lock);
1600
1601   ci = g_tree_lookup(cache_tree, file);
1602   if (ci == NULL)
1603   {
1604     pthread_mutex_unlock(&cache_lock);
1605     return (0);
1606   }
1607
1608   if (ci->values)
1609   {
1610     for (i=0; i < ci->values_num; i++)
1611       free(ci->values[i]);
1612
1613     free(ci->values);
1614   }
1615
1616   wipe_ci_values(ci, now);
1617   remove_from_queue(ci);
1618
1619   pthread_mutex_unlock(&cache_lock);
1620   return (0);
1621 } /* }}} int handle_request_wrote */
1622
1623 /* start "BATCH" processing */
1624 static int batch_start (listen_socket_t *sock) /* {{{ */
1625 {
1626   int status;
1627   if (sock->batch_start)
1628     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1629
1630   status = send_response(sock, RESP_OK,
1631                          "Go ahead.  End with dot '.' on its own line.\n");
1632   sock->batch_start = time(NULL);
1633   sock->batch_cmd = 0;
1634
1635   return status;
1636 } /* }}} static int batch_start */
1637
1638 /* finish "BATCH" processing and return results to the client */
1639 static int batch_done (listen_socket_t *sock) /* {{{ */
1640 {
1641   assert(sock->batch_start);
1642   sock->batch_start = 0;
1643   sock->batch_cmd  = 0;
1644   return send_response(sock, RESP_OK, "errors\n");
1645 } /* }}} static int batch_done */
1646
1647 /* if sock==NULL, we are in journal replay mode */
1648 static int handle_request (listen_socket_t *sock, /* {{{ */
1649                            time_t now,
1650                            char *buffer, size_t buffer_size)
1651 {
1652   char *buffer_ptr;
1653   char *command;
1654   int status;
1655
1656   assert (buffer[buffer_size - 1] == '\0');
1657
1658   buffer_ptr = buffer;
1659   command = NULL;
1660   status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1661   if (status != 0)
1662   {
1663     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1664     return (-1);
1665   }
1666
1667   if (sock != NULL && sock->batch_start)
1668     sock->batch_cmd++;
1669
1670   if (strcasecmp (command, "update") == 0)
1671     return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1672   else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1673   {
1674     /* this is only valid in replay mode */
1675     return (handle_request_wrote (buffer_ptr, now));
1676   }
1677   else if (strcasecmp (command, "flush") == 0)
1678     return (handle_request_flush (sock, buffer_ptr, buffer_size));
1679   else if (strcasecmp (command, "flushall") == 0)
1680     return (handle_request_flushall(sock));
1681   else if (strcasecmp (command, "pending") == 0)
1682     return (handle_request_pending(sock, buffer_ptr, buffer_size));
1683   else if (strcasecmp (command, "forget") == 0)
1684     return (handle_request_forget(sock, buffer_ptr, buffer_size));
1685   else if (strcasecmp (command, "queue") == 0)
1686     return (handle_request_queue(sock));
1687   else if (strcasecmp (command, "stats") == 0)
1688     return (handle_request_stats (sock));
1689   else if (strcasecmp (command, "help") == 0)
1690     return (handle_request_help (sock, buffer_ptr, buffer_size));
1691   else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1692     return batch_start(sock);
1693   else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1694     return batch_done(sock);
1695   else if (strcasecmp (command, "quit") == 0)
1696     return -1;
1697   else
1698     return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1699
1700   /* NOTREACHED */
1701   assert(1==0);
1702 } /* }}} int handle_request */
1703
1704 /* MUST NOT hold journal_lock before calling this */
1705 static void journal_rotate(void) /* {{{ */
1706 {
1707   FILE *old_fh = NULL;
1708   int new_fd;
1709
1710   if (journal_cur == NULL || journal_old == NULL)
1711     return;
1712
1713   pthread_mutex_lock(&journal_lock);
1714
1715   /* we rotate this way (rename before close) so that the we can release
1716    * the journal lock as fast as possible.  Journal writes to the new
1717    * journal can proceed immediately after the new file is opened.  The
1718    * fclose can then block without affecting new updates.
1719    */
1720   if (journal_fh != NULL)
1721   {
1722     old_fh = journal_fh;
1723     journal_fh = NULL;
1724     rename(journal_cur, journal_old);
1725     ++stats_journal_rotate;
1726   }
1727
1728   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1729                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1730   if (new_fd >= 0)
1731   {
1732     journal_fh = fdopen(new_fd, "a");
1733     if (journal_fh == NULL)
1734       close(new_fd);
1735   }
1736
1737   pthread_mutex_unlock(&journal_lock);
1738
1739   if (old_fh != NULL)
1740     fclose(old_fh);
1741
1742   if (journal_fh == NULL)
1743   {
1744     RRDD_LOG(LOG_CRIT,
1745              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1746              journal_cur, rrd_strerror(errno));
1747
1748     RRDD_LOG(LOG_ERR,
1749              "JOURNALING DISABLED: All values will be flushed at shutdown");
1750     config_flush_at_shutdown = 1;
1751   }
1752
1753 } /* }}} static void journal_rotate */
1754
1755 static void journal_done(void) /* {{{ */
1756 {
1757   if (journal_cur == NULL)
1758     return;
1759
1760   pthread_mutex_lock(&journal_lock);
1761   if (journal_fh != NULL)
1762   {
1763     fclose(journal_fh);
1764     journal_fh = NULL;
1765   }
1766
1767   if (config_flush_at_shutdown)
1768   {
1769     RRDD_LOG(LOG_INFO, "removing journals");
1770     unlink(journal_old);
1771     unlink(journal_cur);
1772   }
1773   else
1774   {
1775     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1776              "journals will be used at next startup");
1777   }
1778
1779   pthread_mutex_unlock(&journal_lock);
1780
1781 } /* }}} static void journal_done */
1782
1783 static int journal_write(char *cmd, char *args) /* {{{ */
1784 {
1785   int chars;
1786
1787   if (journal_fh == NULL)
1788     return 0;
1789
1790   pthread_mutex_lock(&journal_lock);
1791   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1792   pthread_mutex_unlock(&journal_lock);
1793
1794   if (chars > 0)
1795   {
1796     pthread_mutex_lock(&stats_lock);
1797     stats_journal_bytes += chars;
1798     pthread_mutex_unlock(&stats_lock);
1799   }
1800
1801   return chars;
1802 } /* }}} static int journal_write */
1803
1804 static int journal_replay (const char *file) /* {{{ */
1805 {
1806   FILE *fh;
1807   int entry_cnt = 0;
1808   int fail_cnt = 0;
1809   uint64_t line = 0;
1810   char entry[CMD_MAX];
1811   time_t now;
1812
1813   if (file == NULL) return 0;
1814
1815   {
1816     char *reason = "unknown error";
1817     int status = 0;
1818     struct stat statbuf;
1819
1820     memset(&statbuf, 0, sizeof(statbuf));
1821     if (stat(file, &statbuf) != 0)
1822     {
1823       if (errno == ENOENT)
1824         return 0;
1825
1826       reason = "stat error";
1827       status = errno;
1828     }
1829     else if (!S_ISREG(statbuf.st_mode))
1830     {
1831       reason = "not a regular file";
1832       status = EPERM;
1833     }
1834     if (statbuf.st_uid != daemon_uid)
1835     {
1836       reason = "not owned by daemon user";
1837       status = EACCES;
1838     }
1839     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1840     {
1841       reason = "must not be user/group writable";
1842       status = EACCES;
1843     }
1844
1845     if (status != 0)
1846     {
1847       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1848                file, rrd_strerror(status), reason);
1849       return 0;
1850     }
1851   }
1852
1853   fh = fopen(file, "r");
1854   if (fh == NULL)
1855   {
1856     if (errno != ENOENT)
1857       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1858                file, rrd_strerror(errno));
1859     return 0;
1860   }
1861   else
1862     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1863
1864   now = time(NULL);
1865
1866   while(!feof(fh))
1867   {
1868     size_t entry_len;
1869
1870     ++line;
1871     if (fgets(entry, sizeof(entry), fh) == NULL)
1872       break;
1873     entry_len = strlen(entry);
1874
1875     /* check \n termination in case journal writing crashed mid-line */
1876     if (entry_len == 0)
1877       continue;
1878     else if (entry[entry_len - 1] != '\n')
1879     {
1880       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1881       ++fail_cnt;
1882       continue;
1883     }
1884
1885     entry[entry_len - 1] = '\0';
1886
1887     if (handle_request(NULL, now, entry, entry_len) == 0)
1888       ++entry_cnt;
1889     else
1890       ++fail_cnt;
1891   }
1892
1893   fclose(fh);
1894
1895   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1896            entry_cnt, fail_cnt);
1897
1898   return entry_cnt > 0 ? 1 : 0;
1899 } /* }}} static int journal_replay */
1900
1901 static void journal_init(void) /* {{{ */
1902 {
1903   int had_journal = 0;
1904
1905   if (journal_cur == NULL) return;
1906
1907   pthread_mutex_lock(&journal_lock);
1908
1909   RRDD_LOG(LOG_INFO, "checking for journal files");
1910
1911   had_journal += journal_replay(journal_old);
1912   had_journal += journal_replay(journal_cur);
1913
1914   /* it must have been a crash.  start a flush */
1915   if (had_journal && config_flush_at_shutdown)
1916     flush_old_values(-1);
1917
1918   pthread_mutex_unlock(&journal_lock);
1919   journal_rotate();
1920
1921   RRDD_LOG(LOG_INFO, "journal processing complete");
1922
1923 } /* }}} static void journal_init */
1924
1925 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1926 {
1927   assert(sock != NULL);
1928
1929   free(sock->rbuf);  sock->rbuf = NULL;
1930   free(sock->wbuf);  sock->wbuf = NULL;
1931   free(sock);
1932 } /* }}} void free_listen_socket */
1933
1934 static void close_connection(listen_socket_t *sock) /* {{{ */
1935 {
1936   if (sock->fd >= 0)
1937   {
1938     close(sock->fd);
1939     sock->fd = -1;
1940   }
1941
1942   free_listen_socket(sock);
1943
1944 } /* }}} void close_connection */
1945
1946 static void *connection_thread_main (void *args) /* {{{ */
1947 {
1948   listen_socket_t *sock;
1949   int fd;
1950
1951   sock = (listen_socket_t *) args;
1952   fd = sock->fd;
1953
1954   /* init read buffers */
1955   sock->next_read = sock->next_cmd = 0;
1956   sock->rbuf = malloc(RBUF_SIZE);
1957   if (sock->rbuf == NULL)
1958   {
1959     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1960     close_connection(sock);
1961     return NULL;
1962   }
1963
1964   pthread_mutex_lock (&connection_threads_lock);
1965   connection_threads_num++;
1966   pthread_mutex_unlock (&connection_threads_lock);
1967
1968   while (do_shutdown == 0)
1969   {
1970     char *cmd;
1971     ssize_t cmd_len;
1972     ssize_t rbytes;
1973     time_t now;
1974
1975     struct pollfd pollfd;
1976     int status;
1977
1978     pollfd.fd = fd;
1979     pollfd.events = POLLIN | POLLPRI;
1980     pollfd.revents = 0;
1981
1982     status = poll (&pollfd, 1, /* timeout = */ 500);
1983     if (do_shutdown)
1984       break;
1985     else if (status == 0) /* timeout */
1986       continue;
1987     else if (status < 0) /* error */
1988     {
1989       status = errno;
1990       if (status != EINTR)
1991         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1992       continue;
1993     }
1994
1995     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1996       break;
1997     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1998     {
1999       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2000           "poll(2) returned something unexpected: %#04hx",
2001           pollfd.revents);
2002       break;
2003     }
2004
2005     rbytes = read(fd, sock->rbuf + sock->next_read,
2006                   RBUF_SIZE - sock->next_read);
2007     if (rbytes < 0)
2008     {
2009       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2010       break;
2011     }
2012     else if (rbytes == 0)
2013       break; /* eof */
2014
2015     sock->next_read += rbytes;
2016
2017     if (sock->batch_start)
2018       now = sock->batch_start;
2019     else
2020       now = time(NULL);
2021
2022     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2023     {
2024       status = handle_request (sock, now, cmd, cmd_len+1);
2025       if (status != 0)
2026         goto out_close;
2027     }
2028   }
2029
2030 out_close:
2031   close_connection(sock);
2032
2033   /* Remove this thread from the connection threads list */
2034   pthread_mutex_lock (&connection_threads_lock);
2035   connection_threads_num--;
2036   if (connection_threads_num <= 0)
2037     pthread_cond_broadcast(&connection_threads_done);
2038   pthread_mutex_unlock (&connection_threads_lock);
2039
2040   return (NULL);
2041 } /* }}} void *connection_thread_main */
2042
2043 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2044 {
2045   int fd;
2046   struct sockaddr_un sa;
2047   listen_socket_t *temp;
2048   int status;
2049   const char *path;
2050
2051   path = sock->addr;
2052   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2053     path += strlen("unix:");
2054
2055   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2056       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2057   if (temp == NULL)
2058   {
2059     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2060     return (-1);
2061   }
2062   listen_fds = temp;
2063   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2064
2065   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2066   if (fd < 0)
2067   {
2068     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2069              rrd_strerror(errno));
2070     return (-1);
2071   }
2072
2073   memset (&sa, 0, sizeof (sa));
2074   sa.sun_family = AF_UNIX;
2075   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2076
2077   /* if we've gotten this far, we own the pid file.  any daemon started
2078    * with the same args must not be alive.  therefore, ensure that we can
2079    * create the socket...
2080    */
2081   unlink(path);
2082
2083   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2084   if (status != 0)
2085   {
2086     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2087              path, rrd_strerror(errno));
2088     close (fd);
2089     return (-1);
2090   }
2091
2092   status = listen (fd, /* backlog = */ 10);
2093   if (status != 0)
2094   {
2095     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2096              path, rrd_strerror(errno));
2097     close (fd);
2098     unlink (path);
2099     return (-1);
2100   }
2101
2102   listen_fds[listen_fds_num].fd = fd;
2103   listen_fds[listen_fds_num].family = PF_UNIX;
2104   strncpy(listen_fds[listen_fds_num].addr, path,
2105           sizeof (listen_fds[listen_fds_num].addr) - 1);
2106   listen_fds_num++;
2107
2108   return (0);
2109 } /* }}} int open_listen_socket_unix */
2110
2111 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2112 {
2113   struct addrinfo ai_hints;
2114   struct addrinfo *ai_res;
2115   struct addrinfo *ai_ptr;
2116   char addr_copy[NI_MAXHOST];
2117   char *addr;
2118   char *port;
2119   int status;
2120
2121   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2122   addr_copy[sizeof (addr_copy) - 1] = 0;
2123   addr = addr_copy;
2124
2125   memset (&ai_hints, 0, sizeof (ai_hints));
2126   ai_hints.ai_flags = 0;
2127 #ifdef AI_ADDRCONFIG
2128   ai_hints.ai_flags |= AI_ADDRCONFIG;
2129 #endif
2130   ai_hints.ai_family = AF_UNSPEC;
2131   ai_hints.ai_socktype = SOCK_STREAM;
2132
2133   port = NULL;
2134   if (*addr == '[') /* IPv6+port format */
2135   {
2136     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2137     addr++;
2138
2139     port = strchr (addr, ']');
2140     if (port == NULL)
2141     {
2142       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2143       return (-1);
2144     }
2145     *port = 0;
2146     port++;
2147
2148     if (*port == ':')
2149       port++;
2150     else if (*port == 0)
2151       port = NULL;
2152     else
2153     {
2154       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2155       return (-1);
2156     }
2157   } /* if (*addr = ']') */
2158   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2159   {
2160     port = rindex(addr, ':');
2161     if (port != NULL)
2162     {
2163       *port = 0;
2164       port++;
2165     }
2166   }
2167   ai_res = NULL;
2168   status = getaddrinfo (addr,
2169                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2170                         &ai_hints, &ai_res);
2171   if (status != 0)
2172   {
2173     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2174              addr, gai_strerror (status));
2175     return (-1);
2176   }
2177
2178   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2179   {
2180     int fd;
2181     listen_socket_t *temp;
2182     int one = 1;
2183
2184     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2185         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2186     if (temp == NULL)
2187     {
2188       fprintf (stderr,
2189                "rrdcached: open_listen_socket_network: realloc failed.\n");
2190       continue;
2191     }
2192     listen_fds = temp;
2193     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2194
2195     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2196     if (fd < 0)
2197     {
2198       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2199                rrd_strerror(errno));
2200       continue;
2201     }
2202
2203     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2204
2205     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2206     if (status != 0)
2207     {
2208       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2209                sock->addr, rrd_strerror(errno));
2210       close (fd);
2211       continue;
2212     }
2213
2214     status = listen (fd, /* backlog = */ 10);
2215     if (status != 0)
2216     {
2217       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2218                sock->addr, rrd_strerror(errno));
2219       close (fd);
2220       freeaddrinfo(ai_res);
2221       return (-1);
2222     }
2223
2224     listen_fds[listen_fds_num].fd = fd;
2225     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2226     listen_fds_num++;
2227   } /* for (ai_ptr) */
2228
2229   freeaddrinfo(ai_res);
2230   return (0);
2231 } /* }}} static int open_listen_socket_network */
2232
2233 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2234 {
2235   assert(sock != NULL);
2236   assert(sock->addr != NULL);
2237
2238   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2239       || sock->addr[0] == '/')
2240     return (open_listen_socket_unix(sock));
2241   else
2242     return (open_listen_socket_network(sock));
2243 } /* }}} int open_listen_socket */
2244
2245 static int close_listen_sockets (void) /* {{{ */
2246 {
2247   size_t i;
2248
2249   for (i = 0; i < listen_fds_num; i++)
2250   {
2251     close (listen_fds[i].fd);
2252
2253     if (listen_fds[i].family == PF_UNIX)
2254       unlink(listen_fds[i].addr);
2255   }
2256
2257   free (listen_fds);
2258   listen_fds = NULL;
2259   listen_fds_num = 0;
2260
2261   return (0);
2262 } /* }}} int close_listen_sockets */
2263
2264 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2265 {
2266   struct pollfd *pollfds;
2267   int pollfds_num;
2268   int status;
2269   int i;
2270
2271   if (listen_fds_num < 1)
2272   {
2273     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2274     return (NULL);
2275   }
2276
2277   pollfds_num = listen_fds_num;
2278   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2279   if (pollfds == NULL)
2280   {
2281     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2282     return (NULL);
2283   }
2284   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2285
2286   RRDD_LOG(LOG_INFO, "listening for connections");
2287
2288   while (do_shutdown == 0)
2289   {
2290     for (i = 0; i < pollfds_num; i++)
2291     {
2292       pollfds[i].fd = listen_fds[i].fd;
2293       pollfds[i].events = POLLIN | POLLPRI;
2294       pollfds[i].revents = 0;
2295     }
2296
2297     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2298     if (do_shutdown)
2299       break;
2300     else if (status == 0) /* timeout */
2301       continue;
2302     else if (status < 0) /* error */
2303     {
2304       status = errno;
2305       if (status != EINTR)
2306       {
2307         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2308       }
2309       continue;
2310     }
2311
2312     for (i = 0; i < pollfds_num; i++)
2313     {
2314       listen_socket_t *client_sock;
2315       struct sockaddr_storage client_sa;
2316       socklen_t client_sa_size;
2317       pthread_t tid;
2318       pthread_attr_t attr;
2319
2320       if (pollfds[i].revents == 0)
2321         continue;
2322
2323       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2324       {
2325         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2326             "poll(2) returned something unexpected for listen FD #%i.",
2327             pollfds[i].fd);
2328         continue;
2329       }
2330
2331       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2332       if (client_sock == NULL)
2333       {
2334         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2335         continue;
2336       }
2337       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2338
2339       client_sa_size = sizeof (client_sa);
2340       client_sock->fd = accept (pollfds[i].fd,
2341           (struct sockaddr *) &client_sa, &client_sa_size);
2342       if (client_sock->fd < 0)
2343       {
2344         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2345         free(client_sock);
2346         continue;
2347       }
2348
2349       pthread_attr_init (&attr);
2350       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2351
2352       status = pthread_create (&tid, &attr, connection_thread_main,
2353                                client_sock);
2354       if (status != 0)
2355       {
2356         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2357         close_connection(client_sock);
2358         continue;
2359       }
2360     } /* for (pollfds_num) */
2361   } /* while (do_shutdown == 0) */
2362
2363   RRDD_LOG(LOG_INFO, "starting shutdown");
2364
2365   close_listen_sockets ();
2366
2367   pthread_mutex_lock (&connection_threads_lock);
2368   while (connection_threads_num > 0)
2369     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2370   pthread_mutex_unlock (&connection_threads_lock);
2371
2372   free(pollfds);
2373
2374   return (NULL);
2375 } /* }}} void *listen_thread_main */
2376
2377 static int daemonize (void) /* {{{ */
2378 {
2379   int pid_fd;
2380   char *base_dir;
2381
2382   daemon_uid = geteuid();
2383
2384   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2385   if (pid_fd < 0)
2386     pid_fd = check_pidfile();
2387   if (pid_fd < 0)
2388     return pid_fd;
2389
2390   /* open all the listen sockets */
2391   if (config_listen_address_list_len > 0)
2392   {
2393     for (int i = 0; i < config_listen_address_list_len; i++)
2394     {
2395       open_listen_socket (config_listen_address_list[i]);
2396       free_listen_socket (config_listen_address_list[i]);
2397     }
2398
2399     free(config_listen_address_list);
2400   }
2401   else
2402   {
2403     listen_socket_t sock;
2404     memset(&sock, 0, sizeof(sock));
2405     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2406     open_listen_socket (&sock);
2407   }
2408
2409   if (listen_fds_num < 1)
2410   {
2411     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2412     goto error;
2413   }
2414
2415   if (!stay_foreground)
2416   {
2417     pid_t child;
2418
2419     child = fork ();
2420     if (child < 0)
2421     {
2422       fprintf (stderr, "daemonize: fork(2) failed.\n");
2423       goto error;
2424     }
2425     else if (child > 0)
2426       exit(0);
2427
2428     /* Become session leader */
2429     setsid ();
2430
2431     /* Open the first three file descriptors to /dev/null */
2432     close (2);
2433     close (1);
2434     close (0);
2435
2436     open ("/dev/null", O_RDWR);
2437     dup (0);
2438     dup (0);
2439   } /* if (!stay_foreground) */
2440
2441   /* Change into the /tmp directory. */
2442   base_dir = (config_base_dir != NULL)
2443     ? config_base_dir
2444     : "/tmp";
2445
2446   if (chdir (base_dir) != 0)
2447   {
2448     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2449     goto error;
2450   }
2451
2452   install_signal_handlers();
2453
2454   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2455   RRDD_LOG(LOG_INFO, "starting up");
2456
2457   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2458                                 (GDestroyNotify) free_cache_item);
2459   if (cache_tree == NULL)
2460   {
2461     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2462     goto error;
2463   }
2464
2465   return write_pidfile (pid_fd);
2466
2467 error:
2468   remove_pidfile();
2469   return -1;
2470 } /* }}} int daemonize */
2471
2472 static int cleanup (void) /* {{{ */
2473 {
2474   do_shutdown++;
2475
2476   pthread_cond_broadcast (&flush_cond);
2477   pthread_join (flush_thread, NULL);
2478
2479   pthread_cond_broadcast (&queue_cond);
2480   for (int i = 0; i < config_queue_threads; i++)
2481     pthread_join (queue_threads[i], NULL);
2482
2483   if (config_flush_at_shutdown)
2484   {
2485     assert(cache_queue_head == NULL);
2486     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2487   }
2488
2489   journal_done();
2490   remove_pidfile ();
2491
2492   free(queue_threads);
2493   free(config_base_dir);
2494   free(config_pid_file);
2495   free(journal_cur);
2496   free(journal_old);
2497
2498   pthread_mutex_lock(&cache_lock);
2499   g_tree_destroy(cache_tree);
2500
2501   RRDD_LOG(LOG_INFO, "goodbye");
2502   closelog ();
2503
2504   return (0);
2505 } /* }}} int cleanup */
2506
2507 static int read_options (int argc, char **argv) /* {{{ */
2508 {
2509   int option;
2510   int status = 0;
2511
2512   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2513   {
2514     switch (option)
2515     {
2516       case 'g':
2517         stay_foreground=1;
2518         break;
2519
2520       case 'L':
2521       case 'l':
2522       {
2523         listen_socket_t **temp;
2524         listen_socket_t *new;
2525
2526         new = malloc(sizeof(listen_socket_t));
2527         if (new == NULL)
2528         {
2529           fprintf(stderr, "read_options: malloc failed.\n");
2530           return(2);
2531         }
2532         memset(new, 0, sizeof(listen_socket_t));
2533
2534         temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
2535             sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2536         if (temp == NULL)
2537         {
2538           fprintf (stderr, "read_options: realloc failed.\n");
2539           return (2);
2540         }
2541         config_listen_address_list = temp;
2542
2543         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2544         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2545
2546         temp[config_listen_address_list_len] = new;
2547         config_listen_address_list_len++;
2548       }
2549       break;
2550
2551       case 'f':
2552       {
2553         int temp;
2554
2555         temp = atoi (optarg);
2556         if (temp > 0)
2557           config_flush_interval = temp;
2558         else
2559         {
2560           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2561           status = 3;
2562         }
2563       }
2564       break;
2565
2566       case 'w':
2567       {
2568         int temp;
2569
2570         temp = atoi (optarg);
2571         if (temp > 0)
2572           config_write_interval = temp;
2573         else
2574         {
2575           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2576           status = 2;
2577         }
2578       }
2579       break;
2580
2581       case 'z':
2582       {
2583         int temp;
2584
2585         temp = atoi(optarg);
2586         if (temp > 0)
2587           config_write_jitter = temp;
2588         else
2589         {
2590           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2591           status = 2;
2592         }
2593
2594         break;
2595       }
2596
2597       case 't':
2598       {
2599         int threads;
2600         threads = atoi(optarg);
2601         if (threads >= 1)
2602           config_queue_threads = threads;
2603         else
2604         {
2605           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2606           return 1;
2607         }
2608       }
2609       break;
2610
2611       case 'B':
2612         config_write_base_only = 1;
2613         break;
2614
2615       case 'b':
2616       {
2617         size_t len;
2618         char base_realpath[PATH_MAX];
2619
2620         if (config_base_dir != NULL)
2621           free (config_base_dir);
2622         config_base_dir = strdup (optarg);
2623         if (config_base_dir == NULL)
2624         {
2625           fprintf (stderr, "read_options: strdup failed.\n");
2626           return (3);
2627         }
2628
2629         /* make sure that the base directory is not resolved via
2630          * symbolic links.  this makes some performance-enhancing
2631          * assumptions possible (we don't have to resolve paths
2632          * that start with a "/")
2633          */
2634         if (realpath(config_base_dir, base_realpath) == NULL)
2635         {
2636           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2637           return 5;
2638         }
2639         else if (strncmp(config_base_dir,
2640                          base_realpath, sizeof(base_realpath)) != 0)
2641         {
2642           fprintf(stderr,
2643                   "Base directory (-b) resolved via file system links!\n"
2644                   "Please consult rrdcached '-b' documentation!\n"
2645                   "Consider specifying the real directory (%s)\n",
2646                   base_realpath);
2647           return 5;
2648         }
2649
2650         len = strlen (config_base_dir);
2651         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2652         {
2653           config_base_dir[len - 1] = 0;
2654           len--;
2655         }
2656
2657         if (len < 1)
2658         {
2659           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2660           return (4);
2661         }
2662
2663         _config_base_dir_len = len;
2664       }
2665       break;
2666
2667       case 'p':
2668       {
2669         if (config_pid_file != NULL)
2670           free (config_pid_file);
2671         config_pid_file = strdup (optarg);
2672         if (config_pid_file == NULL)
2673         {
2674           fprintf (stderr, "read_options: strdup failed.\n");
2675           return (3);
2676         }
2677       }
2678       break;
2679
2680       case 'F':
2681         config_flush_at_shutdown = 1;
2682         break;
2683
2684       case 'j':
2685       {
2686         struct stat statbuf;
2687         const char *dir = optarg;
2688
2689         status = stat(dir, &statbuf);
2690         if (status != 0)
2691         {
2692           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2693           return 6;
2694         }
2695
2696         if (!S_ISDIR(statbuf.st_mode)
2697             || access(dir, R_OK|W_OK|X_OK) != 0)
2698         {
2699           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2700                   errno ? rrd_strerror(errno) : "");
2701           return 6;
2702         }
2703
2704         journal_cur = malloc(PATH_MAX + 1);
2705         journal_old = malloc(PATH_MAX + 1);
2706         if (journal_cur == NULL || journal_old == NULL)
2707         {
2708           fprintf(stderr, "malloc failure for journal files\n");
2709           return 6;
2710         }
2711         else 
2712         {
2713           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2714           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2715         }
2716       }
2717       break;
2718
2719       case 'h':
2720       case '?':
2721         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2722             "\n"
2723             "Usage: rrdcached [options]\n"
2724             "\n"
2725             "Valid options are:\n"
2726             "  -l <address>  Socket address to listen to.\n"
2727             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2728             "  -w <seconds>  Interval in which to write data.\n"
2729             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2730             "  -t <threads>  Number of write threads.\n"
2731             "  -f <seconds>  Interval in which to flush dead data.\n"
2732             "  -p <file>     Location of the PID-file.\n"
2733             "  -b <dir>      Base directory to change to.\n"
2734             "  -B            Restrict file access to paths within -b <dir>\n"
2735             "  -g            Do not fork and run in the foreground.\n"
2736             "  -j <dir>      Directory in which to create the journal files.\n"
2737             "  -F            Always flush all updates at shutdown\n"
2738             "\n"
2739             "For more information and a detailed description of all options "
2740             "please refer\n"
2741             "to the rrdcached(1) manual page.\n",
2742             VERSION);
2743         status = -1;
2744         break;
2745     } /* switch (option) */
2746   } /* while (getopt) */
2747
2748   /* advise the user when values are not sane */
2749   if (config_flush_interval < 2 * config_write_interval)
2750     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2751             " 2x write interval (-w) !\n");
2752   if (config_write_jitter > config_write_interval)
2753     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2754             " write interval (-w) !\n");
2755
2756   if (config_write_base_only && config_base_dir == NULL)
2757     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2758             "  Consult the rrdcached documentation\n");
2759
2760   if (journal_cur == NULL)
2761     config_flush_at_shutdown = 1;
2762
2763   return (status);
2764 } /* }}} int read_options */
2765
2766 int main (int argc, char **argv)
2767 {
2768   int status;
2769
2770   status = read_options (argc, argv);
2771   if (status != 0)
2772   {
2773     if (status < 0)
2774       status = 0;
2775     return (status);
2776   }
2777
2778   status = daemonize ();
2779   if (status != 0)
2780   {
2781     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2782     return (1);
2783   }
2784
2785   journal_init();
2786
2787   /* start the queue threads */
2788   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2789   if (queue_threads == NULL)
2790   {
2791     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2792     cleanup();
2793     return (1);
2794   }
2795   for (int i = 0; i < config_queue_threads; i++)
2796   {
2797     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2798     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2799     if (status != 0)
2800     {
2801       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2802       cleanup();
2803       return (1);
2804     }
2805   }
2806
2807   /* start the flush thread */
2808   memset(&flush_thread, 0, sizeof(flush_thread));
2809   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2810   if (status != 0)
2811   {
2812     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2813     cleanup();
2814     return (1);
2815   }
2816
2817   listen_thread_main (NULL);
2818   cleanup ();
2819
2820   return (0);
2821 } /* int main */
2822
2823 /*
2824  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2825  */