ef6d83cb31fc86d7dcc28cad062d4cdba6249981
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008,2009 Florian octo Forster
4  * Copyright (C) 2008,2009 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #ifdef HAVE_STDINT_H
81 #  include <stdint.h>
82 #endif
83 #include <unistd.h>
84 #include <strings.h>
85 #include <inttypes.h>
86 #include <sys/socket.h>
87
88 #else
89
90 #endif
91 #include <stdio.h>
92 #include <string.h>
93
94 #include <sys/types.h>
95 #include <sys/stat.h>
96 #include <dirent.h>
97 #include <fcntl.h>
98 #include <signal.h>
99 #include <sys/un.h>
100 #include <netdb.h>
101 #include <poll.h>
102 #include <syslog.h>
103 #include <pthread.h>
104 #include <errno.h>
105 #include <assert.h>
106 #include <sys/time.h>
107 #include <time.h>
108 #include <libgen.h>
109 #include <grp.h>
110
111 #include <glib-2.0/glib.h>
112 /* }}} */
113
114 #define RRDD_LOG(severity, ...) \
115   do { \
116     if (stay_foreground) \
117       fprintf(stderr, __VA_ARGS__); \
118     syslog ((severity), __VA_ARGS__); \
119   } while (0)
120
121 #ifndef __GNUC__
122 # define __attribute__(x) /**/
123 #endif
124
125 /*
126  * Types
127  */
128 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
129
130 struct listen_socket_s
131 {
132   int fd;
133   char addr[PATH_MAX + 1];
134   int family;
135
136   /* state for BATCH processing */
137   time_t batch_start;
138   int batch_cmd;
139
140   /* buffered IO */
141   char *rbuf;
142   off_t next_cmd;
143   off_t next_read;
144
145   char *wbuf;
146   ssize_t wbuf_len;
147
148   uint32_t permissions;
149
150   gid_t  socket_group;
151   mode_t socket_permissions;
152 };
153 typedef struct listen_socket_s listen_socket_t;
154
155 struct command_s;
156 typedef struct command_s command_t;
157 /* note: guard against "unused" warnings in the handlers */
158 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
159                         time_t now              __attribute__((unused)),\
160                         char  *buffer           __attribute__((unused)),\
161                         size_t buffer_size      __attribute__((unused))
162
163 #define HANDLER_PROTO   command_t *cmd          __attribute__((unused)),\
164                         DISPATCH_PROTO
165
166 struct command_s {
167   char   *cmd;
168   int (*handler)(HANDLER_PROTO);
169
170   char  context;                /* where we expect to see it */
171 #define CMD_CONTEXT_CLIENT      (1<<0)
172 #define CMD_CONTEXT_BATCH       (1<<1)
173 #define CMD_CONTEXT_JOURNAL     (1<<2)
174 #define CMD_CONTEXT_ANY         (0x7f)
175
176   char *syntax;
177   char *help;
178 };
179
180 struct cache_item_s;
181 typedef struct cache_item_s cache_item_t;
182 struct cache_item_s
183 {
184   char *file;
185   char **values;
186   size_t values_num;            /* number of valid pointers */
187   size_t values_alloc;          /* number of allocated pointers */
188   time_t last_flush_time;
189   time_t last_update_stamp;
190 #define CI_FLAGS_IN_TREE  (1<<0)
191 #define CI_FLAGS_IN_QUEUE (1<<1)
192   int flags;
193   pthread_cond_t  flushed;
194   cache_item_t *prev;
195   cache_item_t *next;
196 };
197
198 struct callback_flush_data_s
199 {
200   time_t now;
201   time_t abs_timeout;
202   char **keys;
203   size_t keys_num;
204 };
205 typedef struct callback_flush_data_s callback_flush_data_t;
206
207 enum queue_side_e
208 {
209   HEAD,
210   TAIL
211 };
212 typedef enum queue_side_e queue_side_t;
213
214 /* describe a set of journal files */
215 typedef struct {
216   char **files;
217   size_t files_num;
218 } journal_set;
219
220 /* max length of socket command or response */
221 #define CMD_MAX 4096
222 #define RBUF_SIZE (CMD_MAX*2)
223
224 /*
225  * Variables
226  */
227 static int stay_foreground = 0;
228 static uid_t daemon_uid;
229
230 static listen_socket_t *listen_fds = NULL;
231 static size_t listen_fds_num = 0;
232
233 enum {
234   RUNNING,              /* normal operation */
235   FLUSHING,             /* flushing remaining values */
236   SHUTDOWN              /* shutting down */
237 } state = RUNNING;
238
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
242
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
245
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
249
250 /* Cache stuff */
251 static GTree          *cache_tree = NULL;
252 static cache_item_t   *cache_queue_head = NULL;
253 static cache_item_t   *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
255
256 static int config_write_interval = 300;
257 static int config_write_jitter   = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
265
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
268
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
277
278 /* Journaled updates */
279 #define JOURNAL_REPLAY(s) ((s) == NULL)
280 #define JOURNAL_BASE "rrd.journal"
281 static journal_set *journal_cur = NULL;
282 static journal_set *journal_old = NULL;
283 static char *journal_dir = NULL;
284 static FILE *journal_fh = NULL;         /* current journal file handle */
285 static long  journal_size = 0;          /* current journal size */
286 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
287 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
288 static int journal_write(char *cmd, char *args);
289 static void journal_done(void);
290 static void journal_rotate(void);
291
292 /* prototypes for forward refernces */
293 static int handle_request_help (HANDLER_PROTO);
294
295 /* 
296  * Functions
297  */
298 static void sig_common (const char *sig) /* {{{ */
299 {
300   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
301   state = FLUSHING;
302   pthread_cond_broadcast(&flush_cond);
303   pthread_cond_broadcast(&queue_cond);
304 } /* }}} void sig_common */
305
306 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
307 {
308   sig_common("INT");
309 } /* }}} void sig_int_handler */
310
311 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
312 {
313   sig_common("TERM");
314 } /* }}} void sig_term_handler */
315
316 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
317 {
318   config_flush_at_shutdown = 1;
319   sig_common("USR1");
320 } /* }}} void sig_usr1_handler */
321
322 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
323 {
324   config_flush_at_shutdown = 0;
325   sig_common("USR2");
326 } /* }}} void sig_usr2_handler */
327
328 static void install_signal_handlers(void) /* {{{ */
329 {
330   /* These structures are static, because `sigaction' behaves weird if the are
331    * overwritten.. */
332   static struct sigaction sa_int;
333   static struct sigaction sa_term;
334   static struct sigaction sa_pipe;
335   static struct sigaction sa_usr1;
336   static struct sigaction sa_usr2;
337
338   /* Install signal handlers */
339   memset (&sa_int, 0, sizeof (sa_int));
340   sa_int.sa_handler = sig_int_handler;
341   sigaction (SIGINT, &sa_int, NULL);
342
343   memset (&sa_term, 0, sizeof (sa_term));
344   sa_term.sa_handler = sig_term_handler;
345   sigaction (SIGTERM, &sa_term, NULL);
346
347   memset (&sa_pipe, 0, sizeof (sa_pipe));
348   sa_pipe.sa_handler = SIG_IGN;
349   sigaction (SIGPIPE, &sa_pipe, NULL);
350
351   memset (&sa_pipe, 0, sizeof (sa_usr1));
352   sa_usr1.sa_handler = sig_usr1_handler;
353   sigaction (SIGUSR1, &sa_usr1, NULL);
354
355   memset (&sa_usr2, 0, sizeof (sa_usr2));
356   sa_usr2.sa_handler = sig_usr2_handler;
357   sigaction (SIGUSR2, &sa_usr2, NULL);
358
359 } /* }}} void install_signal_handlers */
360
361 static int open_pidfile(char *action, int oflag) /* {{{ */
362 {
363   int fd;
364   const char *file;
365   char *file_copy, *dir;
366
367   file = (config_pid_file != NULL)
368     ? config_pid_file
369     : LOCALSTATEDIR "/run/rrdcached.pid";
370
371   /* dirname may modify its argument */
372   file_copy = strdup(file);
373   if (file_copy == NULL)
374   {
375     fprintf(stderr, "rrdcached: strdup(): %s\n",
376         rrd_strerror(errno));
377     return -1;
378   }
379
380   dir = dirname(file_copy);
381   if (rrd_mkdir_p(dir, 0777) != 0)
382   {
383     fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
384         dir, rrd_strerror(errno));
385     return -1;
386   }
387
388   free(file_copy);
389
390   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391   if (fd < 0)
392     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
393             action, file, rrd_strerror(errno));
394
395   return(fd);
396 } /* }}} static int open_pidfile */
397
398 /* check existing pid file to see whether a daemon is running */
399 static int check_pidfile(void)
400 {
401   int pid_fd;
402   pid_t pid;
403   char pid_str[16];
404
405   pid_fd = open_pidfile("open", O_RDWR);
406   if (pid_fd < 0)
407     return pid_fd;
408
409   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
410     return -1;
411
412   pid = atoi(pid_str);
413   if (pid <= 0)
414     return -1;
415
416   /* another running process that we can signal COULD be
417    * a competing rrdcached */
418   if (pid != getpid() && kill(pid, 0) == 0)
419   {
420     fprintf(stderr,
421             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
422     close(pid_fd);
423     return -1;
424   }
425
426   lseek(pid_fd, 0, SEEK_SET);
427   if (ftruncate(pid_fd, 0) == -1)
428   {
429     fprintf(stderr,
430             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
431     close(pid_fd);
432     return -1;
433   }
434
435   fprintf(stderr,
436           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
437           "rrdcached: starting normally.\n", pid);
438
439   return pid_fd;
440 } /* }}} static int check_pidfile */
441
442 static int write_pidfile (int fd) /* {{{ */
443 {
444   pid_t pid;
445   FILE *fh;
446
447   pid = getpid ();
448
449   fh = fdopen (fd, "w");
450   if (fh == NULL)
451   {
452     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
453     close(fd);
454     return (-1);
455   }
456
457   fprintf (fh, "%i\n", (int) pid);
458   fclose (fh);
459
460   return (0);
461 } /* }}} int write_pidfile */
462
463 static int remove_pidfile (void) /* {{{ */
464 {
465   char *file;
466   int status;
467
468   file = (config_pid_file != NULL)
469     ? config_pid_file
470     : LOCALSTATEDIR "/run/rrdcached.pid";
471
472   status = unlink (file);
473   if (status == 0)
474     return (0);
475   return (errno);
476 } /* }}} int remove_pidfile */
477
478 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 {
480   char *eol;
481
482   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
483                sock->next_read - sock->next_cmd);
484
485   if (eol == NULL)
486   {
487     /* no commands left, move remainder back to front of rbuf */
488     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
489             sock->next_read - sock->next_cmd);
490     sock->next_read -= sock->next_cmd;
491     sock->next_cmd = 0;
492     *len = 0;
493     return NULL;
494   }
495   else
496   {
497     char *cmd = sock->rbuf + sock->next_cmd;
498     *eol = '\0';
499
500     sock->next_cmd = eol - sock->rbuf + 1;
501
502     if (eol > sock->rbuf && *(eol-1) == '\r')
503       *(--eol) = '\0'; /* handle "\r\n" EOL */
504
505     *len = eol - cmd;
506
507     return cmd;
508   }
509
510   /* NOTREACHED */
511   assert(1==0);
512 } /* }}} char *next_cmd */
513
514 /* add the characters directly to the write buffer */
515 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 {
517   char *new_buf;
518
519   assert(sock != NULL);
520
521   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
522   if (new_buf == NULL)
523   {
524     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525     return -1;
526   }
527
528   strncpy(new_buf + sock->wbuf_len, str, len + 1);
529
530   sock->wbuf = new_buf;
531   sock->wbuf_len += len;
532
533   return 0;
534 } /* }}} static int add_to_wbuf */
535
536 /* add the text to the "extra" info that's sent after the status line */
537 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int len;
542
543   if (JOURNAL_REPLAY(sock)) return 0;
544   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
545
546   va_start(argp, fmt);
547 #ifdef HAVE_VSNPRINTF
548   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 #else
550   len = vsprintf(buffer, fmt, argp);
551 #endif
552   va_end(argp);
553   if (len < 0)
554   {
555     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556     return -1;
557   }
558
559   return add_to_wbuf(sock, buffer, len);
560 } /* }}} static int add_response_info */
561
562 static int count_lines(char *str) /* {{{ */
563 {
564   int lines = 0;
565
566   if (str != NULL)
567   {
568     while ((str = strchr(str, '\n')) != NULL)
569     {
570       ++lines;
571       ++str;
572     }
573   }
574
575   return lines;
576 } /* }}} static int count_lines */
577
578 /* send the response back to the user.
579  * returns 0 on success, -1 on error
580  * write buffer is always zeroed after this call */
581 static int send_response (listen_socket_t *sock, response_code rc,
582                           char *fmt, ...) /* {{{ */
583 {
584   va_list argp;
585   char buffer[CMD_MAX];
586   int lines;
587   ssize_t wrote;
588   int rclen, len;
589
590   if (JOURNAL_REPLAY(sock)) return rc;
591
592   if (sock->batch_start)
593   {
594     if (rc == RESP_OK)
595       return rc; /* no response on success during BATCH */
596     lines = sock->batch_cmd;
597   }
598   else if (rc == RESP_OK)
599     lines = count_lines(sock->wbuf);
600   else
601     lines = -1;
602
603   rclen = sprintf(buffer, "%d ", lines);
604   va_start(argp, fmt);
605 #ifdef HAVE_VSNPRINTF
606   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 #else
608   len = vsprintf(buffer+rclen, fmt, argp);
609 #endif
610   va_end(argp);
611   if (len < 0)
612     return -1;
613
614   len += rclen;
615
616   /* append the result to the wbuf, don't write to the user */
617   if (sock->batch_start)
618     return add_to_wbuf(sock, buffer, len);
619
620   /* first write must be complete */
621   if (len != write(sock->fd, buffer, len))
622   {
623     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624     return -1;
625   }
626
627   if (sock->wbuf != NULL && rc == RESP_OK)
628   {
629     wrote = 0;
630     while (wrote < sock->wbuf_len)
631     {
632       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
633       if (wb <= 0)
634       {
635         RRDD_LOG(LOG_INFO, "send_response: could not write results");
636         return -1;
637       }
638       wrote += wb;
639     }
640   }
641
642   free(sock->wbuf); sock->wbuf = NULL;
643   sock->wbuf_len = 0;
644
645   return 0;
646 } /* }}} */
647
648 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 {
650   ci->values = NULL;
651   ci->values_num = 0;
652   ci->values_alloc = 0;
653
654   ci->last_flush_time = when;
655   if (config_write_jitter > 0)
656     ci->last_flush_time += (rrd_random() % config_write_jitter);
657 }
658
659 /* remove_from_queue
660  * remove a "cache_item_t" item from the queue.
661  * must hold 'cache_lock' when calling this
662  */
663 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 {
665   if (ci == NULL) return;
666   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
667
668   if (ci->prev == NULL)
669     cache_queue_head = ci->next; /* reset head */
670   else
671     ci->prev->next = ci->next;
672
673   if (ci->next == NULL)
674     cache_queue_tail = ci->prev; /* reset the tail */
675   else
676     ci->next->prev = ci->prev;
677
678   ci->next = ci->prev = NULL;
679   ci->flags &= ~CI_FLAGS_IN_QUEUE;
680
681   pthread_mutex_lock (&stats_lock);
682   assert (stats_queue_length > 0);
683   stats_queue_length--;
684   pthread_mutex_unlock (&stats_lock);
685
686 } /* }}} static void remove_from_queue */
687
688 /* free the resources associated with the cache_item_t
689  * must hold cache_lock when calling this function
690  */
691 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 {
693   if (ci == NULL) return NULL;
694
695   remove_from_queue(ci);
696
697   for (size_t i=0; i < ci->values_num; i++)
698     free(ci->values[i]);
699
700   free (ci->values);
701   free (ci->file);
702
703   /* in case anyone is waiting */
704   pthread_cond_broadcast(&ci->flushed);
705   pthread_cond_destroy(&ci->flushed);
706
707   free (ci);
708
709   return NULL;
710 } /* }}} static void *free_cache_item */
711
712 /*
713  * enqueue_cache_item:
714  * `cache_lock' must be acquired before calling this function!
715  */
716 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
717     queue_side_t side)
718 {
719   if (ci == NULL)
720     return (-1);
721
722   if (ci->values_num == 0)
723     return (0);
724
725   if (side == HEAD)
726   {
727     if (cache_queue_head == ci)
728       return 0;
729
730     /* remove if further down in queue */
731     remove_from_queue(ci);
732
733     ci->prev = NULL;
734     ci->next = cache_queue_head;
735     if (ci->next != NULL)
736       ci->next->prev = ci;
737     cache_queue_head = ci;
738
739     if (cache_queue_tail == NULL)
740       cache_queue_tail = cache_queue_head;
741   }
742   else /* (side == TAIL) */
743   {
744     /* We don't move values back in the list.. */
745     if (ci->flags & CI_FLAGS_IN_QUEUE)
746       return (0);
747
748     assert (ci->next == NULL);
749     assert (ci->prev == NULL);
750
751     ci->prev = cache_queue_tail;
752
753     if (cache_queue_tail == NULL)
754       cache_queue_head = ci;
755     else
756       cache_queue_tail->next = ci;
757
758     cache_queue_tail = ci;
759   }
760
761   ci->flags |= CI_FLAGS_IN_QUEUE;
762
763   pthread_cond_signal(&queue_cond);
764   pthread_mutex_lock (&stats_lock);
765   stats_queue_length++;
766   pthread_mutex_unlock (&stats_lock);
767
768   return (0);
769 } /* }}} int enqueue_cache_item */
770
771 /*
772  * tree_callback_flush:
773  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
774  * while this is in progress.
775  */
776 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777     gpointer data)
778 {
779   cache_item_t *ci;
780   callback_flush_data_t *cfd;
781
782   ci = (cache_item_t *) value;
783   cfd = (callback_flush_data_t *) data;
784
785   if (ci->flags & CI_FLAGS_IN_QUEUE)
786     return FALSE;
787
788   if (ci->values_num > 0
789       && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790   {
791     enqueue_cache_item (ci, TAIL);
792   }
793   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
794       && (ci->values_num <= 0))
795   {
796     assert ((char *) key == ci->file);
797     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798     {
799       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
800       return (FALSE);
801     }
802   }
803
804   return (FALSE);
805 } /* }}} gboolean tree_callback_flush */
806
807 static int flush_old_values (int max_age)
808 {
809   callback_flush_data_t cfd;
810   size_t k;
811
812   memset (&cfd, 0, sizeof (cfd));
813   /* Pass the current time as user data so that we don't need to call
814    * `time' for each node. */
815   cfd.now = time (NULL);
816   cfd.keys = NULL;
817   cfd.keys_num = 0;
818
819   if (max_age > 0)
820     cfd.abs_timeout = cfd.now - max_age;
821   else
822     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
823
824   /* `tree_callback_flush' will return the keys of all values that haven't
825    * been touched in the last `config_flush_interval' seconds in `cfd'.
826    * The char*'s in this array point to the same memory as ci->file, so we
827    * don't need to free them separately. */
828   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
829
830   for (k = 0; k < cfd.keys_num; k++)
831   {
832     gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
833     /* should never fail, since we have held the cache_lock
834      * the entire time */
835     assert(status == TRUE);
836   }
837
838   if (cfd.keys != NULL)
839   {
840     free (cfd.keys);
841     cfd.keys = NULL;
842   }
843
844   return (0);
845 } /* int flush_old_values */
846
847 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
848 {
849   struct timeval now;
850   struct timespec next_flush;
851   int status;
852
853   gettimeofday (&now, NULL);
854   next_flush.tv_sec = now.tv_sec + config_flush_interval;
855   next_flush.tv_nsec = 1000 * now.tv_usec;
856
857   pthread_mutex_lock(&cache_lock);
858
859   while (state == RUNNING)
860   {
861     gettimeofday (&now, NULL);
862     if ((now.tv_sec > next_flush.tv_sec)
863         || ((now.tv_sec == next_flush.tv_sec)
864           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865     {
866       RRDD_LOG(LOG_DEBUG, "flushing old values");
867
868       /* Determine the time of the next cache flush. */
869       next_flush.tv_sec = now.tv_sec + config_flush_interval;
870
871       /* Flush all values that haven't been written in the last
872        * `config_write_interval' seconds. */
873       flush_old_values (config_write_interval);
874
875       /* unlock the cache while we rotate so we don't block incoming
876        * updates if the fsync() blocks on disk I/O */
877       pthread_mutex_unlock(&cache_lock);
878       journal_rotate();
879       pthread_mutex_lock(&cache_lock);
880     }
881
882     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
883     if (status != 0 && status != ETIMEDOUT)
884     {
885       RRDD_LOG (LOG_ERR, "flush_thread_main: "
886                 "pthread_cond_timedwait returned %i.", status);
887     }
888   }
889
890   if (config_flush_at_shutdown)
891     flush_old_values (-1); /* flush everything */
892
893   state = SHUTDOWN;
894
895   pthread_mutex_unlock(&cache_lock);
896
897   return NULL;
898 } /* void *flush_thread_main */
899
900 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
901 {
902   pthread_mutex_lock (&cache_lock);
903
904   while (state != SHUTDOWN
905          || (cache_queue_head != NULL && config_flush_at_shutdown))
906   {
907     cache_item_t *ci;
908     char *file;
909     char **values;
910     size_t values_num;
911     int status;
912
913     /* Now, check if there's something to store away. If not, wait until
914      * something comes in. */
915     if (cache_queue_head == NULL)
916     {
917       status = pthread_cond_wait (&queue_cond, &cache_lock);
918       if ((status != 0) && (status != ETIMEDOUT))
919       {
920         RRDD_LOG (LOG_ERR, "queue_thread_main: "
921             "pthread_cond_wait returned %i.", status);
922       }
923     }
924
925     /* Check if a value has arrived. This may be NULL if we timed out or there
926      * was an interrupt such as a signal. */
927     if (cache_queue_head == NULL)
928       continue;
929
930     ci = cache_queue_head;
931
932     /* copy the relevant parts */
933     file = strdup (ci->file);
934     if (file == NULL)
935     {
936       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937       continue;
938     }
939
940     assert(ci->values != NULL);
941     assert(ci->values_num > 0);
942
943     values = ci->values;
944     values_num = ci->values_num;
945
946     wipe_ci_values(ci, time(NULL));
947     remove_from_queue(ci);
948
949     pthread_mutex_unlock (&cache_lock);
950
951     rrd_clear_error ();
952     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
953     if (status != 0)
954     {
955       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
956           "rrd_update_r (%s) failed with status %i. (%s)",
957           file, status, rrd_get_error());
958     }
959
960     journal_write("wrote", file);
961
962     /* Search again in the tree.  It's possible someone issued a "FORGET"
963      * while we were writing the update values. */
964     pthread_mutex_lock(&cache_lock);
965     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966     if (ci)
967       pthread_cond_broadcast(&ci->flushed);
968     pthread_mutex_unlock(&cache_lock);
969
970     if (status == 0)
971     {
972       pthread_mutex_lock (&stats_lock);
973       stats_updates_written++;
974       stats_data_sets_written += values_num;
975       pthread_mutex_unlock (&stats_lock);
976     }
977
978     rrd_free_ptrs((void ***) &values, &values_num);
979     free(file);
980
981     pthread_mutex_lock (&cache_lock);
982   }
983   pthread_mutex_unlock (&cache_lock);
984
985   return (NULL);
986 } /* }}} void *queue_thread_main */
987
988 static int buffer_get_field (char **buffer_ret, /* {{{ */
989     size_t *buffer_size_ret, char **field_ret)
990 {
991   char *buffer;
992   size_t buffer_pos;
993   size_t buffer_size;
994   char *field;
995   size_t field_size;
996   int status;
997
998   buffer = *buffer_ret;
999   buffer_pos = 0;
1000   buffer_size = *buffer_size_ret;
1001   field = *buffer_ret;
1002   field_size = 0;
1003
1004   if (buffer_size <= 0)
1005     return (-1);
1006
1007   /* This is ensured by `handle_request'. */
1008   assert (buffer[buffer_size - 1] == '\0');
1009
1010   status = -1;
1011   while (buffer_pos < buffer_size)
1012   {
1013     /* Check for end-of-field or end-of-buffer */
1014     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015     {
1016       field[field_size] = 0;
1017       field_size++;
1018       buffer_pos++;
1019       status = 0;
1020       break;
1021     }
1022     /* Handle escaped characters. */
1023     else if (buffer[buffer_pos] == '\\')
1024     {
1025       if (buffer_pos >= (buffer_size - 1))
1026         break;
1027       buffer_pos++;
1028       field[field_size] = buffer[buffer_pos];
1029       field_size++;
1030       buffer_pos++;
1031     }
1032     /* Normal operation */ 
1033     else
1034     {
1035       field[field_size] = buffer[buffer_pos];
1036       field_size++;
1037       buffer_pos++;
1038     }
1039   } /* while (buffer_pos < buffer_size) */
1040
1041   if (status != 0)
1042     return (status);
1043
1044   *buffer_ret = buffer + buffer_pos;
1045   *buffer_size_ret = buffer_size - buffer_pos;
1046   *field_ret = field;
1047
1048   return (0);
1049 } /* }}} int buffer_get_field */
1050
1051 /* if we're restricting writes to the base directory,
1052  * check whether the file falls within the dir
1053  * returns 1 if OK, otherwise 0
1054  */
1055 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 {
1057   assert(file != NULL);
1058
1059   if (!config_write_base_only
1060       || JOURNAL_REPLAY(sock)
1061       || config_base_dir == NULL)
1062     return 1;
1063
1064   if (strstr(file, "../") != NULL) goto err;
1065
1066   /* relative paths without "../" are ok */
1067   if (*file != '/') return 1;
1068
1069   /* file must be of the format base + "/" + <1+ char filename> */
1070   if (strlen(file) < _config_base_dir_len + 2) goto err;
1071   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1072   if (*(file + _config_base_dir_len) != '/') goto err;
1073
1074   return 1;
1075
1076 err:
1077   if (sock != NULL && sock->fd >= 0)
1078     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1079
1080   return 0;
1081 } /* }}} static int check_file_access */
1082
1083 /* when using a base dir, convert relative paths to absolute paths.
1084  * if necessary, modifies the "filename" pointer to point
1085  * to the new path created in "tmp".  "tmp" is provided
1086  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087  *
1088  * this allows us to optimize for the expected case (absolute path)
1089  * with a no-op.
1090  */
1091 static void get_abs_path(char **filename, char *tmp)
1092 {
1093   assert(tmp != NULL);
1094   assert(filename != NULL && *filename != NULL);
1095
1096   if (config_base_dir == NULL || **filename == '/')
1097     return;
1098
1099   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100   *filename = tmp;
1101 } /* }}} static int get_abs_path */
1102
1103 static int flush_file (const char *filename) /* {{{ */
1104 {
1105   cache_item_t *ci;
1106
1107   pthread_mutex_lock (&cache_lock);
1108
1109   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1110   if (ci == NULL)
1111   {
1112     pthread_mutex_unlock (&cache_lock);
1113     return (ENOENT);
1114   }
1115
1116   if (ci->values_num > 0)
1117   {
1118     /* Enqueue at head */
1119     enqueue_cache_item (ci, HEAD);
1120     pthread_cond_wait(&ci->flushed, &cache_lock);
1121   }
1122
1123   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1124    * may have been purged during our cond_wait() */
1125
1126   pthread_mutex_unlock(&cache_lock);
1127
1128   return (0);
1129 } /* }}} int flush_file */
1130
1131 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 {
1133   char *err = "Syntax error.\n";
1134
1135   if (cmd && cmd->syntax)
1136     err = cmd->syntax;
1137
1138   return send_response(sock, RESP_ERR, "Usage: %s", err);
1139 } /* }}} static int syntax_error() */
1140
1141 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 {
1143   uint64_t copy_queue_length;
1144   uint64_t copy_updates_received;
1145   uint64_t copy_flush_received;
1146   uint64_t copy_updates_written;
1147   uint64_t copy_data_sets_written;
1148   uint64_t copy_journal_bytes;
1149   uint64_t copy_journal_rotate;
1150
1151   uint64_t tree_nodes_number;
1152   uint64_t tree_depth;
1153
1154   pthread_mutex_lock (&stats_lock);
1155   copy_queue_length       = stats_queue_length;
1156   copy_updates_received   = stats_updates_received;
1157   copy_flush_received     = stats_flush_received;
1158   copy_updates_written    = stats_updates_written;
1159   copy_data_sets_written  = stats_data_sets_written;
1160   copy_journal_bytes      = stats_journal_bytes;
1161   copy_journal_rotate     = stats_journal_rotate;
1162   pthread_mutex_unlock (&stats_lock);
1163
1164   pthread_mutex_lock (&cache_lock);
1165   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1166   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1167   pthread_mutex_unlock (&cache_lock);
1168
1169   add_response_info(sock,
1170                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1171   add_response_info(sock,
1172                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1173   add_response_info(sock,
1174                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1175   add_response_info(sock,
1176                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1177   add_response_info(sock,
1178                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1179   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1180   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1181   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1182   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1183
1184   send_response(sock, RESP_OK, "Statistics follow\n");
1185
1186   return (0);
1187 } /* }}} int handle_request_stats */
1188
1189 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 {
1191   char *file, file_tmp[PATH_MAX];
1192   int status;
1193
1194   status = buffer_get_field (&buffer, &buffer_size, &file);
1195   if (status != 0)
1196   {
1197     return syntax_error(sock,cmd);
1198   }
1199   else
1200   {
1201     pthread_mutex_lock(&stats_lock);
1202     stats_flush_received++;
1203     pthread_mutex_unlock(&stats_lock);
1204
1205     get_abs_path(&file, file_tmp);
1206     if (!check_file_access(file, sock)) return 0;
1207
1208     status = flush_file (file);
1209     if (status == 0)
1210       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1211     else if (status == ENOENT)
1212     {
1213       /* no file in our tree; see whether it exists at all */
1214       struct stat statbuf;
1215
1216       memset(&statbuf, 0, sizeof(statbuf));
1217       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1218         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219       else
1220         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221     }
1222     else if (status < 0)
1223       return send_response(sock, RESP_ERR, "Internal error.\n");
1224     else
1225       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1226   }
1227
1228   /* NOTREACHED */
1229   assert(1==0);
1230 } /* }}} int handle_request_flush */
1231
1232 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 {
1234   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235
1236   pthread_mutex_lock(&cache_lock);
1237   flush_old_values(-1);
1238   pthread_mutex_unlock(&cache_lock);
1239
1240   return send_response(sock, RESP_OK, "Started flush.\n");
1241 } /* }}} static int handle_request_flushall */
1242
1243 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1244 {
1245   int status;
1246   char *file, file_tmp[PATH_MAX];
1247   cache_item_t *ci;
1248
1249   status = buffer_get_field(&buffer, &buffer_size, &file);
1250   if (status != 0)
1251     return syntax_error(sock,cmd);
1252
1253   get_abs_path(&file, file_tmp);
1254
1255   pthread_mutex_lock(&cache_lock);
1256   ci = g_tree_lookup(cache_tree, file);
1257   if (ci == NULL)
1258   {
1259     pthread_mutex_unlock(&cache_lock);
1260     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1261   }
1262
1263   for (size_t i=0; i < ci->values_num; i++)
1264     add_response_info(sock, "%s\n", ci->values[i]);
1265
1266   pthread_mutex_unlock(&cache_lock);
1267   return send_response(sock, RESP_OK, "updates pending\n");
1268 } /* }}} static int handle_request_pending */
1269
1270 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 {
1272   int status;
1273   gboolean found;
1274   char *file, file_tmp[PATH_MAX];
1275
1276   status = buffer_get_field(&buffer, &buffer_size, &file);
1277   if (status != 0)
1278     return syntax_error(sock,cmd);
1279
1280   get_abs_path(&file, file_tmp);
1281   if (!check_file_access(file, sock)) return 0;
1282
1283   pthread_mutex_lock(&cache_lock);
1284   found = g_tree_remove(cache_tree, file);
1285   pthread_mutex_unlock(&cache_lock);
1286
1287   if (found == TRUE)
1288   {
1289     if (!JOURNAL_REPLAY(sock))
1290       journal_write("forget", file);
1291
1292     return send_response(sock, RESP_OK, "Gone!\n");
1293   }
1294   else
1295     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1296
1297   /* NOTREACHED */
1298   assert(1==0);
1299 } /* }}} static int handle_request_forget */
1300
1301 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 {
1303   cache_item_t *ci;
1304
1305   pthread_mutex_lock(&cache_lock);
1306
1307   ci = cache_queue_head;
1308   while (ci != NULL)
1309   {
1310     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311     ci = ci->next;
1312   }
1313
1314   pthread_mutex_unlock(&cache_lock);
1315
1316   return send_response(sock, RESP_OK, "in queue.\n");
1317 } /* }}} int handle_request_queue */
1318
1319 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 {
1321   char *file, file_tmp[PATH_MAX];
1322   int values_num = 0;
1323   int status;
1324   char orig_buf[CMD_MAX];
1325
1326   cache_item_t *ci;
1327
1328   /* save it for the journal later */
1329   if (!JOURNAL_REPLAY(sock))
1330     strncpy(orig_buf, buffer, buffer_size);
1331
1332   status = buffer_get_field (&buffer, &buffer_size, &file);
1333   if (status != 0)
1334     return syntax_error(sock,cmd);
1335
1336   pthread_mutex_lock(&stats_lock);
1337   stats_updates_received++;
1338   pthread_mutex_unlock(&stats_lock);
1339
1340   get_abs_path(&file, file_tmp);
1341   if (!check_file_access(file, sock)) return 0;
1342
1343   pthread_mutex_lock (&cache_lock);
1344   ci = g_tree_lookup (cache_tree, file);
1345
1346   if (ci == NULL) /* {{{ */
1347   {
1348     struct stat statbuf;
1349     cache_item_t *tmp;
1350
1351     /* don't hold the lock while we setup; stat(2) might block */
1352     pthread_mutex_unlock(&cache_lock);
1353
1354     memset (&statbuf, 0, sizeof (statbuf));
1355     status = stat (file, &statbuf);
1356     if (status != 0)
1357     {
1358       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1359
1360       status = errno;
1361       if (status == ENOENT)
1362         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363       else
1364         return send_response(sock, RESP_ERR,
1365                              "stat failed with error %i.\n", status);
1366     }
1367     if (!S_ISREG (statbuf.st_mode))
1368       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1369
1370     if (access(file, R_OK|W_OK) != 0)
1371       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1372                            file, rrd_strerror(errno));
1373
1374     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1375     if (ci == NULL)
1376     {
1377       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1378
1379       return send_response(sock, RESP_ERR, "malloc failed.\n");
1380     }
1381     memset (ci, 0, sizeof (cache_item_t));
1382
1383     ci->file = strdup (file);
1384     if (ci->file == NULL)
1385     {
1386       free (ci);
1387       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1388
1389       return send_response(sock, RESP_ERR, "strdup failed.\n");
1390     }
1391
1392     wipe_ci_values(ci, now);
1393     ci->flags = CI_FLAGS_IN_TREE;
1394     pthread_cond_init(&ci->flushed, NULL);
1395
1396     pthread_mutex_lock(&cache_lock);
1397
1398     /* another UPDATE might have added this entry in the meantime */
1399     tmp = g_tree_lookup (cache_tree, file);
1400     if (tmp == NULL)
1401       g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1402     else
1403     {
1404       free_cache_item (ci);
1405       ci = tmp;
1406     }
1407
1408     /* state may have changed while we were unlocked */
1409     if (state == SHUTDOWN)
1410       return -1;
1411   } /* }}} */
1412   assert (ci != NULL);
1413
1414   /* don't re-write updates in replay mode */
1415   if (!JOURNAL_REPLAY(sock))
1416     journal_write("update", orig_buf);
1417
1418   while (buffer_size > 0)
1419   {
1420     char *value;
1421     time_t stamp;
1422     char *eostamp;
1423
1424     status = buffer_get_field (&buffer, &buffer_size, &value);
1425     if (status != 0)
1426     {
1427       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428       break;
1429     }
1430
1431     /* make sure update time is always moving forward */
1432     stamp = strtol(value, &eostamp, 10);
1433     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1434     {
1435       pthread_mutex_unlock(&cache_lock);
1436       return send_response(sock, RESP_ERR,
1437                            "Cannot find timestamp in '%s'!\n", value);
1438     }
1439     else if (stamp <= ci->last_update_stamp)
1440     {
1441       pthread_mutex_unlock(&cache_lock);
1442       return send_response(sock, RESP_ERR,
1443                            "illegal attempt to update using time %ld when last"
1444                            " update time is %ld (minimum one second step)\n",
1445                            stamp, ci->last_update_stamp);
1446     }
1447     else
1448       ci->last_update_stamp = stamp;
1449
1450     if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451                               &ci->values_alloc, config_alloc_chunk))
1452     {
1453       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1454       continue;
1455     }
1456
1457     values_num++;
1458   }
1459
1460   if (((now - ci->last_flush_time) >= config_write_interval)
1461       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462       && (ci->values_num > 0))
1463   {
1464     enqueue_cache_item (ci, TAIL);
1465   }
1466
1467   pthread_mutex_unlock (&cache_lock);
1468
1469   if (values_num < 1)
1470     return send_response(sock, RESP_ERR, "No values updated.\n");
1471   else
1472     return send_response(sock, RESP_OK,
1473                          "errors, enqueued %i value(s).\n", values_num);
1474
1475   /* NOTREACHED */
1476   assert(1==0);
1477
1478 } /* }}} int handle_request_update */
1479
1480 /* we came across a "WROTE" entry during journal replay.
1481  * throw away any values that we have accumulated for this file
1482  */
1483 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1484 {
1485   cache_item_t *ci;
1486   const char *file = buffer;
1487
1488   pthread_mutex_lock(&cache_lock);
1489
1490   ci = g_tree_lookup(cache_tree, file);
1491   if (ci == NULL)
1492   {
1493     pthread_mutex_unlock(&cache_lock);
1494     return (0);
1495   }
1496
1497   if (ci->values)
1498     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1499
1500   wipe_ci_values(ci, now);
1501   remove_from_queue(ci);
1502
1503   pthread_mutex_unlock(&cache_lock);
1504   return (0);
1505 } /* }}} int handle_request_wrote */
1506
1507 /* start "BATCH" processing */
1508 static int batch_start (HANDLER_PROTO) /* {{{ */
1509 {
1510   int status;
1511   if (sock->batch_start)
1512     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1513
1514   status = send_response(sock, RESP_OK,
1515                          "Go ahead.  End with dot '.' on its own line.\n");
1516   sock->batch_start = time(NULL);
1517   sock->batch_cmd = 0;
1518
1519   return status;
1520 } /* }}} static int batch_start */
1521
1522 /* finish "BATCH" processing and return results to the client */
1523 static int batch_done (HANDLER_PROTO) /* {{{ */
1524 {
1525   assert(sock->batch_start);
1526   sock->batch_start = 0;
1527   sock->batch_cmd  = 0;
1528   return send_response(sock, RESP_OK, "errors\n");
1529 } /* }}} static int batch_done */
1530
1531 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1532 {
1533   return -1;
1534 } /* }}} static int handle_request_quit */
1535
1536 static command_t list_of_commands[] = { /* {{{ */
1537   {
1538     "UPDATE",
1539     handle_request_update,
1540     CMD_CONTEXT_ANY,
1541     "UPDATE <filename> <values> [<values> ...]\n"
1542     ,
1543     "Adds the given file to the internal cache if it is not yet known and\n"
1544     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1545     "for details.\n"
1546     "\n"
1547     "Each <values> has the following form:\n"
1548     "  <values> = <time>:<value>[:<value>[...]]\n"
1549     "See the rrdupdate(1) manpage for details.\n"
1550   },
1551   {
1552     "WROTE",
1553     handle_request_wrote,
1554     CMD_CONTEXT_JOURNAL,
1555     NULL,
1556     NULL
1557   },
1558   {
1559     "FLUSH",
1560     handle_request_flush,
1561     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1562     "FLUSH <filename>\n"
1563     ,
1564     "Adds the given filename to the head of the update queue and returns\n"
1565     "after it has been dequeued.\n"
1566   },
1567   {
1568     "FLUSHALL",
1569     handle_request_flushall,
1570     CMD_CONTEXT_CLIENT,
1571     "FLUSHALL\n"
1572     ,
1573     "Triggers writing of all pending updates.  Returns immediately.\n"
1574   },
1575   {
1576     "PENDING",
1577     handle_request_pending,
1578     CMD_CONTEXT_CLIENT,
1579     "PENDING <filename>\n"
1580     ,
1581     "Shows any 'pending' updates for a file, in order.\n"
1582     "The updates shown have not yet been written to the underlying RRD file.\n"
1583   },
1584   {
1585     "FORGET",
1586     handle_request_forget,
1587     CMD_CONTEXT_ANY,
1588     "FORGET <filename>\n"
1589     ,
1590     "Removes the file completely from the cache.\n"
1591     "Any pending updates for the file will be lost.\n"
1592   },
1593   {
1594     "QUEUE",
1595     handle_request_queue,
1596     CMD_CONTEXT_CLIENT,
1597     "QUEUE\n"
1598     ,
1599         "Shows all files in the output queue.\n"
1600     "The output is zero or more lines in the following format:\n"
1601     "(where <num_vals> is the number of values to be written)\n"
1602     "\n"
1603     "<num_vals> <filename>\n"
1604   },
1605   {
1606     "STATS",
1607     handle_request_stats,
1608     CMD_CONTEXT_CLIENT,
1609     "STATS\n"
1610     ,
1611     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1612     "a description of the values.\n"
1613   },
1614   {
1615     "HELP",
1616     handle_request_help,
1617     CMD_CONTEXT_CLIENT,
1618     "HELP [<command>]\n",
1619     NULL, /* special! */
1620   },
1621   {
1622     "BATCH",
1623     batch_start,
1624     CMD_CONTEXT_CLIENT,
1625     "BATCH\n"
1626     ,
1627     "The 'BATCH' command permits the client to initiate a bulk load\n"
1628     "   of commands to rrdcached.\n"
1629     "\n"
1630     "Usage:\n"
1631     "\n"
1632     "    client: BATCH\n"
1633     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1634     "    client: command #1\n"
1635     "    client: command #2\n"
1636     "    client: ... and so on\n"
1637     "    client: .\n"
1638     "    server: 2 errors\n"
1639     "    server: 7 message for command #7\n"
1640     "    server: 9 message for command #9\n"
1641     "\n"
1642     "For more information, consult the rrdcached(1) documentation.\n"
1643   },
1644   {
1645     ".",   /* BATCH terminator */
1646     batch_done,
1647     CMD_CONTEXT_BATCH,
1648     NULL,
1649     NULL
1650   },
1651   {
1652     "QUIT",
1653     handle_request_quit,
1654     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1655     "QUIT\n"
1656     ,
1657     "Disconnect from rrdcached.\n"
1658   }
1659 }; /* }}} command_t list_of_commands[] */
1660 static size_t list_of_commands_len = sizeof (list_of_commands)
1661   / sizeof (list_of_commands[0]);
1662
1663 static command_t *find_command(char *cmd)
1664 {
1665   size_t i;
1666
1667   for (i = 0; i < list_of_commands_len; i++)
1668     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1669       return (&list_of_commands[i]);
1670   return NULL;
1671 }
1672
1673 /* We currently use the index in the `list_of_commands' array as a bit position
1674  * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
1675  * outside these functions so that switching to a more elegant storage method
1676  * is easily possible. */
1677 static ssize_t find_command_index (const char *cmd) /* {{{ */
1678 {
1679   size_t i;
1680
1681   for (i = 0; i < list_of_commands_len; i++)
1682     if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
1683       return ((ssize_t) i);
1684   return (-1);
1685 } /* }}} ssize_t find_command_index */
1686
1687 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
1688     const char *cmd)
1689 {
1690   ssize_t i;
1691
1692   if (JOURNAL_REPLAY(sock))
1693     return (1);
1694
1695   if (cmd == NULL)
1696     return (-1);
1697
1698   if ((strcasecmp ("QUIT", cmd) == 0)
1699       || (strcasecmp ("HELP", cmd) == 0))
1700     return (1);
1701   else if (strcmp (".", cmd) == 0)
1702     cmd = "BATCH";
1703
1704   i = find_command_index (cmd);
1705   if (i < 0)
1706     return (-1);
1707   assert (i < 32);
1708
1709   if ((sock->permissions & (1 << i)) != 0)
1710     return (1);
1711   return (0);
1712 } /* }}} int socket_permission_check */
1713
1714 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
1715     const char *cmd)
1716 {
1717   ssize_t i;
1718
1719   i = find_command_index (cmd);
1720   if (i < 0)
1721     return (-1);
1722   assert (i < 32);
1723
1724   sock->permissions |= (1 << i);
1725   return (0);
1726 } /* }}} int socket_permission_add */
1727
1728 /* check whether commands are received in the expected context */
1729 static int command_check_context(listen_socket_t *sock, command_t *cmd)
1730 {
1731   if (JOURNAL_REPLAY(sock))
1732     return (cmd->context & CMD_CONTEXT_JOURNAL);
1733   else if (sock->batch_start)
1734     return (cmd->context & CMD_CONTEXT_BATCH);
1735   else
1736     return (cmd->context & CMD_CONTEXT_CLIENT);
1737
1738   /* NOTREACHED */
1739   assert(1==0);
1740 }
1741
1742 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1743 {
1744   int status;
1745   char *cmd_str;
1746   char *resp_txt;
1747   command_t *help = NULL;
1748
1749   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1750   if (status == 0)
1751     help = find_command(cmd_str);
1752
1753   if (help && (help->syntax || help->help))
1754   {
1755     char tmp[CMD_MAX];
1756
1757     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1758     resp_txt = tmp;
1759
1760     if (help->syntax)
1761       add_response_info(sock, "Usage: %s\n", help->syntax);
1762
1763     if (help->help)
1764       add_response_info(sock, "%s\n", help->help);
1765   }
1766   else
1767   {
1768     size_t i;
1769
1770     resp_txt = "Command overview\n";
1771
1772     for (i = 0; i < list_of_commands_len; i++)
1773     {
1774       if (list_of_commands[i].syntax == NULL)
1775         continue;
1776       add_response_info (sock, "%s", list_of_commands[i].syntax);
1777     }
1778   }
1779
1780   return send_response(sock, RESP_OK, resp_txt);
1781 } /* }}} int handle_request_help */
1782
1783 static int handle_request (DISPATCH_PROTO) /* {{{ */
1784 {
1785   char *buffer_ptr = buffer;
1786   char *cmd_str = NULL;
1787   command_t *cmd = NULL;
1788   int status;
1789
1790   assert (buffer[buffer_size - 1] == '\0');
1791
1792   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1793   if (status != 0)
1794   {
1795     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1796     return (-1);
1797   }
1798
1799   if (sock != NULL && sock->batch_start)
1800     sock->batch_cmd++;
1801
1802   cmd = find_command(cmd_str);
1803   if (!cmd)
1804     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1805
1806   if (!socket_permission_check (sock, cmd->cmd))
1807     return send_response(sock, RESP_ERR, "Permission denied.\n");
1808
1809   if (!command_check_context(sock, cmd))
1810     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1811
1812   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1813 } /* }}} int handle_request */
1814
1815 static void journal_set_free (journal_set *js) /* {{{ */
1816 {
1817   if (js == NULL)
1818     return;
1819
1820   rrd_free_ptrs((void ***) &js->files, &js->files_num);
1821
1822   free(js);
1823 } /* }}} journal_set_free */
1824
1825 static void journal_set_remove (journal_set *js) /* {{{ */
1826 {
1827   if (js == NULL)
1828     return;
1829
1830   for (uint i=0; i < js->files_num; i++)
1831   {
1832     RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
1833     unlink(js->files[i]);
1834   }
1835 } /* }}} journal_set_remove */
1836
1837 /* close current journal file handle.
1838  * MUST hold journal_lock before calling */
1839 static void journal_close(void) /* {{{ */
1840 {
1841   if (journal_fh != NULL)
1842   {
1843     if (fclose(journal_fh) != 0)
1844       RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
1845   }
1846
1847   journal_fh = NULL;
1848   journal_size = 0;
1849 } /* }}} journal_close */
1850
1851 /* MUST hold journal_lock before calling */
1852 static void journal_new_file(void) /* {{{ */
1853 {
1854   struct timeval now;
1855   int  new_fd;
1856   char new_file[PATH_MAX + 1];
1857
1858   assert(journal_dir != NULL);
1859   assert(journal_cur != NULL);
1860
1861   journal_close();
1862
1863   gettimeofday(&now, NULL);
1864   /* this format assures that the files sort in strcmp() order */
1865   snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
1866            journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
1867
1868   new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
1869                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1870   if (new_fd < 0)
1871     goto error;
1872
1873   journal_fh = fdopen(new_fd, "a");
1874   if (journal_fh == NULL)
1875     goto error;
1876
1877   journal_size = ftell(journal_fh);
1878   RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
1879
1880   /* record the file in the journal set */
1881   rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
1882
1883   return;
1884
1885 error:
1886   RRDD_LOG(LOG_CRIT,
1887            "JOURNALING DISABLED: Error while trying to create %s : %s",
1888            new_file, rrd_strerror(errno));
1889   RRDD_LOG(LOG_CRIT,
1890            "JOURNALING DISABLED: All values will be flushed at shutdown");
1891
1892   close(new_fd);
1893   config_flush_at_shutdown = 1;
1894
1895 } /* }}} journal_new_file */
1896
1897 /* MUST NOT hold journal_lock before calling this */
1898 static void journal_rotate(void) /* {{{ */
1899 {
1900   journal_set *old_js = NULL;
1901
1902   if (journal_dir == NULL)
1903     return;
1904
1905   RRDD_LOG(LOG_DEBUG, "rotating journals");
1906
1907   pthread_mutex_lock(&stats_lock);
1908   ++stats_journal_rotate;
1909   pthread_mutex_unlock(&stats_lock);
1910
1911   pthread_mutex_lock(&journal_lock);
1912
1913   journal_close();
1914
1915   /* rotate the journal sets */
1916   old_js = journal_old;
1917   journal_old = journal_cur;
1918   journal_cur = calloc(1, sizeof(journal_set));
1919
1920   if (journal_cur != NULL)
1921     journal_new_file();
1922   else
1923     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
1924
1925   pthread_mutex_unlock(&journal_lock);
1926
1927   journal_set_remove(old_js);
1928   journal_set_free  (old_js);
1929
1930 } /* }}} static void journal_rotate */
1931
1932 /* MUST hold journal_lock when calling */
1933 static void journal_done(void) /* {{{ */
1934 {
1935   if (journal_cur == NULL)
1936     return;
1937
1938   journal_close();
1939
1940   if (config_flush_at_shutdown)
1941   {
1942     RRDD_LOG(LOG_INFO, "removing journals");
1943     journal_set_remove(journal_old);
1944     journal_set_remove(journal_cur);
1945   }
1946   else
1947   {
1948     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1949              "journals will be used at next startup");
1950   }
1951
1952   journal_set_free(journal_cur);
1953   journal_set_free(journal_old);
1954   free(journal_dir);
1955
1956 } /* }}} static void journal_done */
1957
1958 static int journal_write(char *cmd, char *args) /* {{{ */
1959 {
1960   int chars;
1961
1962   if (journal_fh == NULL)
1963     return 0;
1964
1965   pthread_mutex_lock(&journal_lock);
1966   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1967   journal_size += chars;
1968
1969   if (journal_size > JOURNAL_MAX)
1970     journal_new_file();
1971
1972   pthread_mutex_unlock(&journal_lock);
1973
1974   if (chars > 0)
1975   {
1976     pthread_mutex_lock(&stats_lock);
1977     stats_journal_bytes += chars;
1978     pthread_mutex_unlock(&stats_lock);
1979   }
1980
1981   return chars;
1982 } /* }}} static int journal_write */
1983
1984 static int journal_replay (const char *file) /* {{{ */
1985 {
1986   FILE *fh;
1987   int entry_cnt = 0;
1988   int fail_cnt = 0;
1989   uint64_t line = 0;
1990   char entry[CMD_MAX];
1991   time_t now;
1992
1993   if (file == NULL) return 0;
1994
1995   {
1996     char *reason = "unknown error";
1997     int status = 0;
1998     struct stat statbuf;
1999
2000     memset(&statbuf, 0, sizeof(statbuf));
2001     if (stat(file, &statbuf) != 0)
2002     {
2003       reason = "stat error";
2004       status = errno;
2005     }
2006     else if (!S_ISREG(statbuf.st_mode))
2007     {
2008       reason = "not a regular file";
2009       status = EPERM;
2010     }
2011     if (statbuf.st_uid != daemon_uid)
2012     {
2013       reason = "not owned by daemon user";
2014       status = EACCES;
2015     }
2016     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2017     {
2018       reason = "must not be user/group writable";
2019       status = EACCES;
2020     }
2021
2022     if (status != 0)
2023     {
2024       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2025                file, rrd_strerror(status), reason);
2026       return 0;
2027     }
2028   }
2029
2030   fh = fopen(file, "r");
2031   if (fh == NULL)
2032   {
2033     if (errno != ENOENT)
2034       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2035                file, rrd_strerror(errno));
2036     return 0;
2037   }
2038   else
2039     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2040
2041   now = time(NULL);
2042
2043   while(!feof(fh))
2044   {
2045     size_t entry_len;
2046
2047     ++line;
2048     if (fgets(entry, sizeof(entry), fh) == NULL)
2049       break;
2050     entry_len = strlen(entry);
2051
2052     /* check \n termination in case journal writing crashed mid-line */
2053     if (entry_len == 0)
2054       continue;
2055     else if (entry[entry_len - 1] != '\n')
2056     {
2057       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2058       ++fail_cnt;
2059       continue;
2060     }
2061
2062     entry[entry_len - 1] = '\0';
2063
2064     if (handle_request(NULL, now, entry, entry_len) == 0)
2065       ++entry_cnt;
2066     else
2067       ++fail_cnt;
2068   }
2069
2070   fclose(fh);
2071
2072   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2073            entry_cnt, fail_cnt);
2074
2075   return entry_cnt > 0 ? 1 : 0;
2076 } /* }}} static int journal_replay */
2077
2078 static int journal_sort(const void *v1, const void *v2)
2079 {
2080   char **jn1 = (char **) v1;
2081   char **jn2 = (char **) v2;
2082
2083   return strcmp(*jn1,*jn2);
2084 }
2085
2086 static void journal_init(void) /* {{{ */
2087 {
2088   int had_journal = 0;
2089   DIR *dir;
2090   struct dirent *dent;
2091   char path[PATH_MAX+1];
2092
2093   if (journal_dir == NULL) return;
2094
2095   pthread_mutex_lock(&journal_lock);
2096
2097   journal_cur = calloc(1, sizeof(journal_set));
2098   if (journal_cur == NULL)
2099   {
2100     RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2101     return;
2102   }
2103
2104   RRDD_LOG(LOG_INFO, "checking for journal files");
2105
2106   /* Handle old journal files during transition.  This gives them the
2107    * correct sort order.  TODO: remove after first release
2108    */
2109   {
2110     char old_path[PATH_MAX+1];
2111     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2112     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2113     rename(old_path, path);
2114
2115     snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE        );
2116     snprintf(path,     PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2117     rename(old_path, path);
2118   }
2119
2120   dir = opendir(journal_dir);
2121   while ((dent = readdir(dir)) != NULL)
2122   {
2123     /* looks like a journal file? */
2124     if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2125       continue;
2126
2127     snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2128
2129     if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2130     {
2131       RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2132                dent->d_name);
2133       break;
2134     }
2135   }
2136   closedir(dir);
2137
2138   qsort(journal_cur->files, journal_cur->files_num,
2139         sizeof(journal_cur->files[0]), journal_sort);
2140
2141   for (uint i=0; i < journal_cur->files_num; i++)
2142     had_journal += journal_replay(journal_cur->files[i]);
2143
2144   journal_new_file();
2145
2146   /* it must have been a crash.  start a flush */
2147   if (had_journal && config_flush_at_shutdown)
2148     flush_old_values(-1);
2149
2150   pthread_mutex_unlock(&journal_lock);
2151
2152   RRDD_LOG(LOG_INFO, "journal processing complete");
2153
2154 } /* }}} static void journal_init */
2155
2156 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2157 {
2158   assert(sock != NULL);
2159
2160   free(sock->rbuf);  sock->rbuf = NULL;
2161   free(sock->wbuf);  sock->wbuf = NULL;
2162   free(sock);
2163 } /* }}} void free_listen_socket */
2164
2165 static void close_connection(listen_socket_t *sock) /* {{{ */
2166 {
2167   if (sock->fd >= 0)
2168   {
2169     close(sock->fd);
2170     sock->fd = -1;
2171   }
2172
2173   free_listen_socket(sock);
2174
2175 } /* }}} void close_connection */
2176
2177 static void *connection_thread_main (void *args) /* {{{ */
2178 {
2179   listen_socket_t *sock;
2180   int fd;
2181
2182   sock = (listen_socket_t *) args;
2183   fd = sock->fd;
2184
2185   /* init read buffers */
2186   sock->next_read = sock->next_cmd = 0;
2187   sock->rbuf = malloc(RBUF_SIZE);
2188   if (sock->rbuf == NULL)
2189   {
2190     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2191     close_connection(sock);
2192     return NULL;
2193   }
2194
2195   pthread_mutex_lock (&connection_threads_lock);
2196   connection_threads_num++;
2197   pthread_mutex_unlock (&connection_threads_lock);
2198
2199   while (state == RUNNING)
2200   {
2201     char *cmd;
2202     ssize_t cmd_len;
2203     ssize_t rbytes;
2204     time_t now;
2205
2206     struct pollfd pollfd;
2207     int status;
2208
2209     pollfd.fd = fd;
2210     pollfd.events = POLLIN | POLLPRI;
2211     pollfd.revents = 0;
2212
2213     status = poll (&pollfd, 1, /* timeout = */ 500);
2214     if (state != RUNNING)
2215       break;
2216     else if (status == 0) /* timeout */
2217       continue;
2218     else if (status < 0) /* error */
2219     {
2220       status = errno;
2221       if (status != EINTR)
2222         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2223       continue;
2224     }
2225
2226     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2227       break;
2228     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2229     {
2230       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2231           "poll(2) returned something unexpected: %#04hx",
2232           pollfd.revents);
2233       break;
2234     }
2235
2236     rbytes = read(fd, sock->rbuf + sock->next_read,
2237                   RBUF_SIZE - sock->next_read);
2238     if (rbytes < 0)
2239     {
2240       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2241       break;
2242     }
2243     else if (rbytes == 0)
2244       break; /* eof */
2245
2246     sock->next_read += rbytes;
2247
2248     if (sock->batch_start)
2249       now = sock->batch_start;
2250     else
2251       now = time(NULL);
2252
2253     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2254     {
2255       status = handle_request (sock, now, cmd, cmd_len+1);
2256       if (status != 0)
2257         goto out_close;
2258     }
2259   }
2260
2261 out_close:
2262   close_connection(sock);
2263
2264   /* Remove this thread from the connection threads list */
2265   pthread_mutex_lock (&connection_threads_lock);
2266   connection_threads_num--;
2267   if (connection_threads_num <= 0)
2268     pthread_cond_broadcast(&connection_threads_done);
2269   pthread_mutex_unlock (&connection_threads_lock);
2270
2271   return (NULL);
2272 } /* }}} void *connection_thread_main */
2273
2274 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2275 {
2276   int fd;
2277   struct sockaddr_un sa;
2278   listen_socket_t *temp;
2279   int status;
2280   const char *path;
2281   char *path_copy, *dir;
2282
2283   path = sock->addr;
2284   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2285     path += strlen("unix:");
2286
2287   /* dirname may modify its argument */
2288   path_copy = strdup(path);
2289   if (path_copy == NULL)
2290   {
2291     fprintf(stderr, "rrdcached: strdup(): %s\n",
2292         rrd_strerror(errno));
2293     return (-1);
2294   }
2295
2296   dir = dirname(path_copy);
2297   if (rrd_mkdir_p(dir, 0777) != 0)
2298   {
2299     fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2300         dir, rrd_strerror(errno));
2301     return (-1);
2302   }
2303
2304   free(path_copy);
2305
2306   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2307       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2308   if (temp == NULL)
2309   {
2310     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2311     return (-1);
2312   }
2313   listen_fds = temp;
2314   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2315
2316   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2317   if (fd < 0)
2318   {
2319     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2320              rrd_strerror(errno));
2321     return (-1);
2322   }
2323
2324   memset (&sa, 0, sizeof (sa));
2325   sa.sun_family = AF_UNIX;
2326   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2327
2328   /* if we've gotten this far, we own the pid file.  any daemon started
2329    * with the same args must not be alive.  therefore, ensure that we can
2330    * create the socket...
2331    */
2332   unlink(path);
2333
2334   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2335   if (status != 0)
2336   {
2337     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2338              path, rrd_strerror(errno));
2339     close (fd);
2340     return (-1);
2341   }
2342
2343   /* tweak the sockets group ownership */
2344   if (sock->socket_group != (gid_t)-1)
2345   {
2346     if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2347          (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2348     {
2349       fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2350     }
2351   }
2352
2353   if (sock->socket_permissions != (mode_t)-1)
2354   {
2355     if (chmod(path, sock->socket_permissions) != 0)
2356       fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2357           (unsigned int)sock->socket_permissions, strerror(errno));
2358   }
2359
2360   status = listen (fd, /* backlog = */ 10);
2361   if (status != 0)
2362   {
2363     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2364              path, rrd_strerror(errno));
2365     close (fd);
2366     unlink (path);
2367     return (-1);
2368   }
2369
2370   listen_fds[listen_fds_num].fd = fd;
2371   listen_fds[listen_fds_num].family = PF_UNIX;
2372   strncpy(listen_fds[listen_fds_num].addr, path,
2373           sizeof (listen_fds[listen_fds_num].addr) - 1);
2374   listen_fds_num++;
2375
2376   return (0);
2377 } /* }}} int open_listen_socket_unix */
2378
2379 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2380 {
2381   struct addrinfo ai_hints;
2382   struct addrinfo *ai_res;
2383   struct addrinfo *ai_ptr;
2384   char addr_copy[NI_MAXHOST];
2385   char *addr;
2386   char *port;
2387   int status;
2388
2389   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2390   addr_copy[sizeof (addr_copy) - 1] = 0;
2391   addr = addr_copy;
2392
2393   memset (&ai_hints, 0, sizeof (ai_hints));
2394   ai_hints.ai_flags = 0;
2395 #ifdef AI_ADDRCONFIG
2396   ai_hints.ai_flags |= AI_ADDRCONFIG;
2397 #endif
2398   ai_hints.ai_family = AF_UNSPEC;
2399   ai_hints.ai_socktype = SOCK_STREAM;
2400
2401   port = NULL;
2402   if (*addr == '[') /* IPv6+port format */
2403   {
2404     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2405     addr++;
2406
2407     port = strchr (addr, ']');
2408     if (port == NULL)
2409     {
2410       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2411       return (-1);
2412     }
2413     *port = 0;
2414     port++;
2415
2416     if (*port == ':')
2417       port++;
2418     else if (*port == 0)
2419       port = NULL;
2420     else
2421     {
2422       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2423       return (-1);
2424     }
2425   } /* if (*addr == '[') */
2426   else
2427   {
2428     port = rindex(addr, ':');
2429     if (port != NULL)
2430     {
2431       *port = 0;
2432       port++;
2433     }
2434   }
2435   ai_res = NULL;
2436   status = getaddrinfo (addr,
2437                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2438                         &ai_hints, &ai_res);
2439   if (status != 0)
2440   {
2441     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2442              addr, gai_strerror (status));
2443     return (-1);
2444   }
2445
2446   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2447   {
2448     int fd;
2449     listen_socket_t *temp;
2450     int one = 1;
2451
2452     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2453         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2454     if (temp == NULL)
2455     {
2456       fprintf (stderr,
2457                "rrdcached: open_listen_socket_network: realloc failed.\n");
2458       continue;
2459     }
2460     listen_fds = temp;
2461     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2462
2463     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2464     if (fd < 0)
2465     {
2466       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2467                rrd_strerror(errno));
2468       continue;
2469     }
2470
2471     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2472
2473     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2474     if (status != 0)
2475     {
2476       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2477                sock->addr, rrd_strerror(errno));
2478       close (fd);
2479       continue;
2480     }
2481
2482     status = listen (fd, /* backlog = */ 10);
2483     if (status != 0)
2484     {
2485       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2486                sock->addr, rrd_strerror(errno));
2487       close (fd);
2488       freeaddrinfo(ai_res);
2489       return (-1);
2490     }
2491
2492     listen_fds[listen_fds_num].fd = fd;
2493     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2494     listen_fds_num++;
2495   } /* for (ai_ptr) */
2496
2497   freeaddrinfo(ai_res);
2498   return (0);
2499 } /* }}} static int open_listen_socket_network */
2500
2501 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2502 {
2503   assert(sock != NULL);
2504   assert(sock->addr != NULL);
2505
2506   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2507       || sock->addr[0] == '/')
2508     return (open_listen_socket_unix(sock));
2509   else
2510     return (open_listen_socket_network(sock));
2511 } /* }}} int open_listen_socket */
2512
2513 static int close_listen_sockets (void) /* {{{ */
2514 {
2515   size_t i;
2516
2517   for (i = 0; i < listen_fds_num; i++)
2518   {
2519     close (listen_fds[i].fd);
2520
2521     if (listen_fds[i].family == PF_UNIX)
2522       unlink(listen_fds[i].addr);
2523   }
2524
2525   free (listen_fds);
2526   listen_fds = NULL;
2527   listen_fds_num = 0;
2528
2529   return (0);
2530 } /* }}} int close_listen_sockets */
2531
2532 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2533 {
2534   struct pollfd *pollfds;
2535   int pollfds_num;
2536   int status;
2537   int i;
2538
2539   if (listen_fds_num < 1)
2540   {
2541     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2542     return (NULL);
2543   }
2544
2545   pollfds_num = listen_fds_num;
2546   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2547   if (pollfds == NULL)
2548   {
2549     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2550     return (NULL);
2551   }
2552   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2553
2554   RRDD_LOG(LOG_INFO, "listening for connections");
2555
2556   while (state == RUNNING)
2557   {
2558     for (i = 0; i < pollfds_num; i++)
2559     {
2560       pollfds[i].fd = listen_fds[i].fd;
2561       pollfds[i].events = POLLIN | POLLPRI;
2562       pollfds[i].revents = 0;
2563     }
2564
2565     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2566     if (state != RUNNING)
2567       break;
2568     else if (status == 0) /* timeout */
2569       continue;
2570     else if (status < 0) /* error */
2571     {
2572       status = errno;
2573       if (status != EINTR)
2574       {
2575         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2576       }
2577       continue;
2578     }
2579
2580     for (i = 0; i < pollfds_num; i++)
2581     {
2582       listen_socket_t *client_sock;
2583       struct sockaddr_storage client_sa;
2584       socklen_t client_sa_size;
2585       pthread_t tid;
2586       pthread_attr_t attr;
2587
2588       if (pollfds[i].revents == 0)
2589         continue;
2590
2591       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2592       {
2593         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2594             "poll(2) returned something unexpected for listen FD #%i.",
2595             pollfds[i].fd);
2596         continue;
2597       }
2598
2599       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2600       if (client_sock == NULL)
2601       {
2602         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2603         continue;
2604       }
2605       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2606
2607       client_sa_size = sizeof (client_sa);
2608       client_sock->fd = accept (pollfds[i].fd,
2609           (struct sockaddr *) &client_sa, &client_sa_size);
2610       if (client_sock->fd < 0)
2611       {
2612         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2613         free(client_sock);
2614         continue;
2615       }
2616
2617       pthread_attr_init (&attr);
2618       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2619
2620       status = pthread_create (&tid, &attr, connection_thread_main,
2621                                client_sock);
2622       if (status != 0)
2623       {
2624         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2625         close_connection(client_sock);
2626         continue;
2627       }
2628     } /* for (pollfds_num) */
2629   } /* while (state == RUNNING) */
2630
2631   RRDD_LOG(LOG_INFO, "starting shutdown");
2632
2633   close_listen_sockets ();
2634
2635   pthread_mutex_lock (&connection_threads_lock);
2636   while (connection_threads_num > 0)
2637     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2638   pthread_mutex_unlock (&connection_threads_lock);
2639
2640   free(pollfds);
2641
2642   return (NULL);
2643 } /* }}} void *listen_thread_main */
2644
2645 static int daemonize (void) /* {{{ */
2646 {
2647   int pid_fd;
2648   char *base_dir;
2649
2650   daemon_uid = geteuid();
2651
2652   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2653   if (pid_fd < 0)
2654     pid_fd = check_pidfile();
2655   if (pid_fd < 0)
2656     return pid_fd;
2657
2658   /* open all the listen sockets */
2659   if (config_listen_address_list_len > 0)
2660   {
2661     for (size_t i = 0; i < config_listen_address_list_len; i++)
2662       open_listen_socket (config_listen_address_list[i]);
2663
2664     rrd_free_ptrs((void ***) &config_listen_address_list,
2665                   &config_listen_address_list_len);
2666   }
2667   else
2668   {
2669     listen_socket_t sock;
2670     memset(&sock, 0, sizeof(sock));
2671     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2672     open_listen_socket (&sock);
2673   }
2674
2675   if (listen_fds_num < 1)
2676   {
2677     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2678     goto error;
2679   }
2680
2681   if (!stay_foreground)
2682   {
2683     pid_t child;
2684
2685     child = fork ();
2686     if (child < 0)
2687     {
2688       fprintf (stderr, "daemonize: fork(2) failed.\n");
2689       goto error;
2690     }
2691     else if (child > 0)
2692       exit(0);
2693
2694     /* Become session leader */
2695     setsid ();
2696
2697     /* Open the first three file descriptors to /dev/null */
2698     close (2);
2699     close (1);
2700     close (0);
2701
2702     open ("/dev/null", O_RDWR);
2703     if (dup(0) == -1 || dup(0) == -1){
2704         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2705     }
2706   } /* if (!stay_foreground) */
2707
2708   /* Change into the /tmp directory. */
2709   base_dir = (config_base_dir != NULL)
2710     ? config_base_dir
2711     : "/tmp";
2712
2713   if (chdir (base_dir) != 0)
2714   {
2715     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2716     goto error;
2717   }
2718
2719   install_signal_handlers();
2720
2721   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2722   RRDD_LOG(LOG_INFO, "starting up");
2723
2724   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2725                                 (GDestroyNotify) free_cache_item);
2726   if (cache_tree == NULL)
2727   {
2728     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2729     goto error;
2730   }
2731
2732   return write_pidfile (pid_fd);
2733
2734 error:
2735   remove_pidfile();
2736   return -1;
2737 } /* }}} int daemonize */
2738
2739 static int cleanup (void) /* {{{ */
2740 {
2741   pthread_cond_broadcast (&flush_cond);
2742   pthread_join (flush_thread, NULL);
2743
2744   pthread_cond_broadcast (&queue_cond);
2745   for (int i = 0; i < config_queue_threads; i++)
2746     pthread_join (queue_threads[i], NULL);
2747
2748   if (config_flush_at_shutdown)
2749   {
2750     assert(cache_queue_head == NULL);
2751     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2752   }
2753
2754   free(queue_threads);
2755   free(config_base_dir);
2756
2757   pthread_mutex_lock(&cache_lock);
2758   g_tree_destroy(cache_tree);
2759
2760   pthread_mutex_lock(&journal_lock);
2761   journal_done();
2762
2763   RRDD_LOG(LOG_INFO, "goodbye");
2764   closelog ();
2765
2766   remove_pidfile ();
2767   free(config_pid_file);
2768
2769   return (0);
2770 } /* }}} int cleanup */
2771
2772 static int read_options (int argc, char **argv) /* {{{ */
2773 {
2774   int option;
2775   int status = 0;
2776
2777   char **permissions = NULL;
2778   size_t permissions_len = 0;
2779
2780   gid_t  socket_group = (gid_t)-1;
2781   mode_t socket_permissions = (mode_t)-1;
2782
2783   while ((option = getopt(argc, argv, "gl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
2784   {
2785     switch (option)
2786     {
2787       case 'g':
2788         stay_foreground=1;
2789         break;
2790
2791       case 'l':
2792       {
2793         listen_socket_t *new;
2794
2795         new = malloc(sizeof(listen_socket_t));
2796         if (new == NULL)
2797         {
2798           fprintf(stderr, "read_options: malloc failed.\n");
2799           return(2);
2800         }
2801         memset(new, 0, sizeof(listen_socket_t));
2802
2803         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2804
2805         /* Add permissions to the socket {{{ */
2806         if (permissions_len != 0)
2807         {
2808           size_t i;
2809           for (i = 0; i < permissions_len; i++)
2810           {
2811             status = socket_permission_add (new, permissions[i]);
2812             if (status != 0)
2813             {
2814               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2815                   "socket failed. Most likely, this permission doesn't "
2816                   "exist. Check your command line.\n", permissions[i]);
2817               status = 4;
2818             }
2819           }
2820         }
2821         else /* if (permissions_len == 0) */
2822         {
2823           /* Add permission for ALL commands to the socket. */
2824           size_t i;
2825           for (i = 0; i < list_of_commands_len; i++)
2826           {
2827             status = socket_permission_add (new, list_of_commands[i].cmd);
2828             if (status != 0)
2829             {
2830               fprintf (stderr, "read_options: Adding permission \"%s\" to "
2831                   "socket failed. This should never happen, ever! Sorry.\n",
2832                   permissions[i]);
2833               status = 4;
2834             }
2835           }
2836         }
2837         /* }}} Done adding permissions. */
2838
2839         new->socket_group = socket_group;
2840         new->socket_permissions = socket_permissions;
2841
2842         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2843                          &config_listen_address_list_len, new))
2844         {
2845           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2846           return (2);
2847         }
2848       }
2849       break;
2850
2851       /* set socket group permissions */
2852       case 's':
2853       {
2854         gid_t group_gid;
2855         struct group *grp;
2856
2857         group_gid = strtoul(optarg, NULL, 10);
2858         if (errno != EINVAL && group_gid>0)
2859         {
2860           /* we were passed a number */
2861           grp = getgrgid(group_gid);
2862         }
2863         else
2864         {
2865           grp = getgrnam(optarg);
2866         }
2867
2868         if (grp)
2869         {
2870           socket_group = grp->gr_gid;
2871         }
2872         else
2873         {
2874           /* no idea what the user wanted... */
2875           fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
2876           return (5);
2877         }
2878       }
2879       break;
2880
2881       /* set socket file permissions */
2882       case 'm':
2883       {
2884         long  tmp;
2885         char *endptr = NULL;
2886
2887         tmp = strtol (optarg, &endptr, 8);
2888         if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
2889             || (tmp > 07777) || (tmp < 0)) {
2890           fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
2891               optarg);
2892           return (5);
2893         }
2894
2895         socket_permissions = (mode_t)tmp;
2896       }
2897       break;
2898
2899       case 'P':
2900       {
2901         char *optcopy;
2902         char *saveptr;
2903         char *dummy;
2904         char *ptr;
2905
2906         rrd_free_ptrs ((void *) &permissions, &permissions_len);
2907
2908         optcopy = strdup (optarg);
2909         dummy = optcopy;
2910         saveptr = NULL;
2911         while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
2912         {
2913           dummy = NULL;
2914           rrd_add_strdup ((void *) &permissions, &permissions_len, ptr);
2915         }
2916
2917         free (optcopy);
2918       }
2919       break;
2920
2921       case 'f':
2922       {
2923         int temp;
2924
2925         temp = atoi (optarg);
2926         if (temp > 0)
2927           config_flush_interval = temp;
2928         else
2929         {
2930           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2931           status = 3;
2932         }
2933       }
2934       break;
2935
2936       case 'w':
2937       {
2938         int temp;
2939
2940         temp = atoi (optarg);
2941         if (temp > 0)
2942           config_write_interval = temp;
2943         else
2944         {
2945           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2946           status = 2;
2947         }
2948       }
2949       break;
2950
2951       case 'z':
2952       {
2953         int temp;
2954
2955         temp = atoi(optarg);
2956         if (temp > 0)
2957           config_write_jitter = temp;
2958         else
2959         {
2960           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2961           status = 2;
2962         }
2963
2964         break;
2965       }
2966
2967       case 't':
2968       {
2969         int threads;
2970         threads = atoi(optarg);
2971         if (threads >= 1)
2972           config_queue_threads = threads;
2973         else
2974         {
2975           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2976           return 1;
2977         }
2978       }
2979       break;
2980
2981       case 'B':
2982         config_write_base_only = 1;
2983         break;
2984
2985       case 'b':
2986       {
2987         size_t len;
2988         char base_realpath[PATH_MAX];
2989
2990         if (config_base_dir != NULL)
2991           free (config_base_dir);
2992         config_base_dir = strdup (optarg);
2993         if (config_base_dir == NULL)
2994         {
2995           fprintf (stderr, "read_options: strdup failed.\n");
2996           return (3);
2997         }
2998
2999         if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3000         {
3001           fprintf (stderr, "Failed to create base directory '%s': %s\n",
3002               config_base_dir, rrd_strerror (errno));
3003           return (3);
3004         }
3005
3006         /* make sure that the base directory is not resolved via
3007          * symbolic links.  this makes some performance-enhancing
3008          * assumptions possible (we don't have to resolve paths
3009          * that start with a "/")
3010          */
3011         if (realpath(config_base_dir, base_realpath) == NULL)
3012         {
3013           fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3014               "%s\n", config_base_dir, rrd_strerror(errno));
3015           return 5;
3016         }
3017
3018         len = strlen (config_base_dir);
3019         while ((len > 0) && (config_base_dir[len - 1] == '/'))
3020         {
3021           config_base_dir[len - 1] = 0;
3022           len--;
3023         }
3024
3025         if (len < 1)
3026         {
3027           fprintf (stderr, "Invalid base directory: %s\n", optarg);
3028           return (4);
3029         }
3030
3031         _config_base_dir_len = len;
3032
3033         len = strlen (base_realpath);
3034         while ((len > 0) && (base_realpath[len - 1] == '/'))
3035         {
3036           base_realpath[len - 1] = '\0';
3037           len--;
3038         }
3039
3040         if (strncmp(config_base_dir,
3041                          base_realpath, sizeof(base_realpath)) != 0)
3042         {
3043           fprintf(stderr,
3044                   "Base directory (-b) resolved via file system links!\n"
3045                   "Please consult rrdcached '-b' documentation!\n"
3046                   "Consider specifying the real directory (%s)\n",
3047                   base_realpath);
3048           return 5;
3049         }
3050       }
3051       break;
3052
3053       case 'p':
3054       {
3055         if (config_pid_file != NULL)
3056           free (config_pid_file);
3057         config_pid_file = strdup (optarg);
3058         if (config_pid_file == NULL)
3059         {
3060           fprintf (stderr, "read_options: strdup failed.\n");
3061           return (3);
3062         }
3063       }
3064       break;
3065
3066       case 'F':
3067         config_flush_at_shutdown = 1;
3068         break;
3069
3070       case 'j':
3071       {
3072         const char *dir = journal_dir = strdup(optarg);
3073
3074         status = rrd_mkdir_p(dir, 0777);
3075         if (status != 0)
3076         {
3077           fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3078               dir, rrd_strerror(errno));
3079           return 6;
3080         }
3081
3082         if (access(dir, R_OK|W_OK|X_OK) != 0)
3083         {
3084           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3085                   errno ? rrd_strerror(errno) : "");
3086           return 6;
3087         }
3088       }
3089       break;
3090
3091       case 'a':
3092       {
3093         int temp = atoi(optarg);
3094         if (temp > 0)
3095           config_alloc_chunk = temp;
3096         else
3097         {
3098           fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3099           return 10;
3100         }
3101       }
3102       break;
3103
3104       case 'h':
3105       case '?':
3106         printf ("RRDCacheD %s\n"
3107             "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3108             "\n"
3109             "Usage: rrdcached [options]\n"
3110             "\n"
3111             "Valid options are:\n"
3112             "  -l <address>  Socket address to listen to.\n"
3113             "  -P <perms>    Sets the permissions to assign to all following "
3114                             "sockets\n"
3115             "  -w <seconds>  Interval in which to write data.\n"
3116             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
3117             "  -t <threads>  Number of write threads.\n"
3118             "  -f <seconds>  Interval in which to flush dead data.\n"
3119             "  -p <file>     Location of the PID-file.\n"
3120             "  -b <dir>      Base directory to change to.\n"
3121             "  -B            Restrict file access to paths within -b <dir>\n"
3122             "  -g            Do not fork and run in the foreground.\n"
3123             "  -j <dir>      Directory in which to create the journal files.\n"
3124             "  -F            Always flush all updates at shutdown\n"
3125             "  -s <id|name>  Group owner of all following UNIX sockets\n"
3126             "                (the socket will also have read/write permissions "
3127                             "for that group)\n"
3128             "  -m <mode>     File permissions (octal) of all following UNIX "
3129                             "sockets\n"
3130             "  -a <size>     Memory allocation chunk size. Default is 1."
3131             "\n"
3132             "For more information and a detailed description of all options "
3133             "please refer\n"
3134             "to the rrdcached(1) manual page.\n",
3135             VERSION);
3136         if (option == 'h')
3137           status = -1;
3138         else
3139           status = 1;
3140         break;
3141     } /* switch (option) */
3142   } /* while (getopt) */
3143
3144   /* advise the user when values are not sane */
3145   if (config_flush_interval < 2 * config_write_interval)
3146     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3147             " 2x write interval (-w) !\n");
3148   if (config_write_jitter > config_write_interval)
3149     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3150             " write interval (-w) !\n");
3151
3152   if (config_write_base_only && config_base_dir == NULL)
3153     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3154             "  Consult the rrdcached documentation\n");
3155
3156   if (journal_dir == NULL)
3157     config_flush_at_shutdown = 1;
3158
3159   rrd_free_ptrs ((void *) &permissions, &permissions_len);
3160
3161   return (status);
3162 } /* }}} int read_options */
3163
3164 int main (int argc, char **argv)
3165 {
3166   int status;
3167
3168   status = read_options (argc, argv);
3169   if (status != 0)
3170   {
3171     if (status < 0)
3172       status = 0;
3173     return (status);
3174   }
3175
3176   status = daemonize ();
3177   if (status != 0)
3178   {
3179     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3180     return (1);
3181   }
3182
3183   journal_init();
3184
3185   /* start the queue threads */
3186   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3187   if (queue_threads == NULL)
3188   {
3189     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3190     cleanup();
3191     return (1);
3192   }
3193   for (int i = 0; i < config_queue_threads; i++)
3194   {
3195     memset (&queue_threads[i], 0, sizeof (*queue_threads));
3196     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3197     if (status != 0)
3198     {
3199       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3200       cleanup();
3201       return (1);
3202     }
3203   }
3204
3205   /* start the flush thread */
3206   memset(&flush_thread, 0, sizeof(flush_thread));
3207   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3208   if (status != 0)
3209   {
3210     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3211     cleanup();
3212     return (1);
3213   }
3214
3215   listen_thread_main (NULL);
3216   cleanup ();
3217
3218   return (0);
3219 } /* int main */
3220
3221 /*
3222  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
3223  */