17ec97edd37eaae65c5bf322d4878f66663e5d51
[rrdtool.git] / src / rrd_daemon.c
1 /**
2  * RRDTool - src/rrd_daemon.c
3  * Copyright (C) 2008 Florian octo Forster
4  * Copyright (C) 2008 Kevin Brintnall
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the
8  * Free Software Foundation; only version 2 of the License is applicable.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Florian octo Forster <octo at verplant.org>
21  *   kevin brintnall <kbrint@rufus.net>
22  **/
23
24 #if 0
25 /*
26  * First tell the compiler to stick to the C99 and POSIX standards as close as
27  * possible.
28  */
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
31 #endif
32
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
35 #endif
36
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
39 #endif
40 #define _POSIX_C_SOURCE 200112L
41
42 /* Single UNIX needed for strdup. */
43 #ifdef _XOPEN_SOURCE
44 # undef _XOPEN_SOURCE
45 #endif
46 #define _XOPEN_SOURCE 500
47
48 #ifndef _REENTRANT
49 # define _REENTRANT
50 #endif
51
52 #ifndef _THREAD_SAFE
53 # define _THREAD_SAFE
54 #endif
55
56 #ifdef _GNU_SOURCE
57 # undef _GNU_SOURCE
58 #endif
59 /* }}} */
60 #endif /* 0 */
61
62 /*
63  * Now for some includes..
64  */
65 /* {{{ */
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
68 #else
69 #ifdef HAVE_CONFIG_H
70 #include "../rrd_config.h"
71 #endif
72 #endif
73
74 #include "rrd.h"
75 #include "rrd_client.h"
76
77 #include <stdlib.h>
78
79 #ifndef WIN32
80 #include <stdint.h>
81 #include <unistd.h>
82 #include <strings.h>
83 #include <inttypes.h>
84 #       include <sys/socket.h>
85
86 #else
87
88 #endif
89 #include <stdio.h>
90 #include <string.h>
91
92 #include <sys/types.h>
93 #include <sys/stat.h>
94 #include <fcntl.h>
95 #include <signal.h>
96 #include <sys/un.h>
97 #include <netdb.h>
98 #include <poll.h>
99 #include <syslog.h>
100 #include <pthread.h>
101 #include <errno.h>
102 #include <assert.h>
103 #include <sys/time.h>
104 #include <time.h>
105
106 #include <glib-2.0/glib.h>
107 /* }}} */
108
109 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
110
111 #ifndef __GNUC__
112 # define __attribute__(x) /**/
113 #endif
114
115 /*
116  * Types
117  */
118 typedef enum
119 {
120   PRIV_LOW,
121   PRIV_HIGH
122 } socket_privilege;
123
124 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125
126 struct listen_socket_s
127 {
128   int fd;
129   char addr[PATH_MAX + 1];
130   int family;
131   socket_privilege privilege;
132
133   /* state for BATCH processing */
134   time_t batch_start;
135   int batch_cmd;
136
137   /* buffered IO */
138   char *rbuf;
139   off_t next_cmd;
140   off_t next_read;
141
142   char *wbuf;
143   ssize_t wbuf_len;
144 };
145 typedef struct listen_socket_s listen_socket_t;
146
147 struct command;
148 /* note: guard against "unused" warnings in the handlers */
149 #define DISPATCH_PROTO  listen_socket_t *sock   __attribute__((unused)),\
150                         time_t now              __attribute__((unused)),\
151                         char  *buffer           __attribute__((unused)),\
152                         size_t buffer_size      __attribute__((unused))
153
154 #define HANDLER_PROTO   struct command *cmd     __attribute__((unused)),\
155                         DISPATCH_PROTO
156
157 struct command {
158   char   *cmd;
159   int (*handler)(HANDLER_PROTO);
160   socket_privilege min_priv;
161
162   char  context;                /* where we expect to see it */
163 #define CMD_CONTEXT_CLIENT      (1<<0)
164 #define CMD_CONTEXT_BATCH       (1<<1)
165 #define CMD_CONTEXT_JOURNAL     (1<<2)
166 #define CMD_CONTEXT_ANY         (0x7f)
167
168   char *syntax;
169   char *help;
170 };
171
172 struct cache_item_s;
173 typedef struct cache_item_s cache_item_t;
174 struct cache_item_s
175 {
176   char *file;
177   char **values;
178   size_t values_num;
179   time_t last_flush_time;
180   time_t last_update_stamp;
181 #define CI_FLAGS_IN_TREE  (1<<0)
182 #define CI_FLAGS_IN_QUEUE (1<<1)
183   int flags;
184   pthread_cond_t  flushed;
185   cache_item_t *prev;
186   cache_item_t *next;
187 };
188
189 struct callback_flush_data_s
190 {
191   time_t now;
192   time_t abs_timeout;
193   char **keys;
194   size_t keys_num;
195 };
196 typedef struct callback_flush_data_s callback_flush_data_t;
197
198 enum queue_side_e
199 {
200   HEAD,
201   TAIL
202 };
203 typedef enum queue_side_e queue_side_t;
204
205 /* max length of socket command or response */
206 #define CMD_MAX 4096
207 #define RBUF_SIZE (CMD_MAX*2)
208
209 /*
210  * Variables
211  */
212 static int stay_foreground = 0;
213 static uid_t daemon_uid;
214
215 static listen_socket_t *listen_fds = NULL;
216 static size_t listen_fds_num = 0;
217
218 static int do_shutdown = 0;
219
220 static pthread_t *queue_threads;
221 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
222 static int config_queue_threads = 4;
223
224 static pthread_t flush_thread;
225 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
226
227 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
228 static pthread_cond_t  connection_threads_done = PTHREAD_COND_INITIALIZER;
229 static int connection_threads_num = 0;
230
231 /* Cache stuff */
232 static GTree          *cache_tree = NULL;
233 static cache_item_t   *cache_queue_head = NULL;
234 static cache_item_t   *cache_queue_tail = NULL;
235 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
236
237 static int config_write_interval = 300;
238 static int config_write_jitter   = 0;
239 static int config_flush_interval = 3600;
240 static int config_flush_at_shutdown = 0;
241 static char *config_pid_file = NULL;
242 static char *config_base_dir = NULL;
243 static size_t _config_base_dir_len = 0;
244 static int config_write_base_only = 0;
245
246 static listen_socket_t **config_listen_address_list = NULL;
247 static size_t config_listen_address_list_len = 0;
248
249 static uint64_t stats_queue_length = 0;
250 static uint64_t stats_updates_received = 0;
251 static uint64_t stats_flush_received = 0;
252 static uint64_t stats_updates_written = 0;
253 static uint64_t stats_data_sets_written = 0;
254 static uint64_t stats_journal_bytes = 0;
255 static uint64_t stats_journal_rotate = 0;
256 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
257
258 /* Journaled updates */
259 static char *journal_cur = NULL;
260 static char *journal_old = NULL;
261 static FILE *journal_fh = NULL;
262 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
263 static int journal_write(char *cmd, char *args);
264 static void journal_done(void);
265 static void journal_rotate(void);
266
267 /* prototypes for forward refernces */
268 static int handle_request_help (HANDLER_PROTO);
269
270 /* 
271  * Functions
272  */
273 static void sig_common (const char *sig) /* {{{ */
274 {
275   RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
276   do_shutdown++;
277   pthread_cond_broadcast(&flush_cond);
278   pthread_cond_broadcast(&queue_cond);
279 } /* }}} void sig_common */
280
281 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
282 {
283   sig_common("INT");
284 } /* }}} void sig_int_handler */
285
286 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
287 {
288   sig_common("TERM");
289 } /* }}} void sig_term_handler */
290
291 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
292 {
293   config_flush_at_shutdown = 1;
294   sig_common("USR1");
295 } /* }}} void sig_usr1_handler */
296
297 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
298 {
299   config_flush_at_shutdown = 0;
300   sig_common("USR2");
301 } /* }}} void sig_usr2_handler */
302
303 static void install_signal_handlers(void) /* {{{ */
304 {
305   /* These structures are static, because `sigaction' behaves weird if the are
306    * overwritten.. */
307   static struct sigaction sa_int;
308   static struct sigaction sa_term;
309   static struct sigaction sa_pipe;
310   static struct sigaction sa_usr1;
311   static struct sigaction sa_usr2;
312
313   /* Install signal handlers */
314   memset (&sa_int, 0, sizeof (sa_int));
315   sa_int.sa_handler = sig_int_handler;
316   sigaction (SIGINT, &sa_int, NULL);
317
318   memset (&sa_term, 0, sizeof (sa_term));
319   sa_term.sa_handler = sig_term_handler;
320   sigaction (SIGTERM, &sa_term, NULL);
321
322   memset (&sa_pipe, 0, sizeof (sa_pipe));
323   sa_pipe.sa_handler = SIG_IGN;
324   sigaction (SIGPIPE, &sa_pipe, NULL);
325
326   memset (&sa_pipe, 0, sizeof (sa_usr1));
327   sa_usr1.sa_handler = sig_usr1_handler;
328   sigaction (SIGUSR1, &sa_usr1, NULL);
329
330   memset (&sa_usr2, 0, sizeof (sa_usr2));
331   sa_usr2.sa_handler = sig_usr2_handler;
332   sigaction (SIGUSR2, &sa_usr2, NULL);
333
334 } /* }}} void install_signal_handlers */
335
336 static int open_pidfile(char *action, int oflag) /* {{{ */
337 {
338   int fd;
339   char *file;
340
341   file = (config_pid_file != NULL)
342     ? config_pid_file
343     : LOCALSTATEDIR "/run/rrdcached.pid";
344
345   fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
346   if (fd < 0)
347     fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
348             action, file, rrd_strerror(errno));
349
350   return(fd);
351 } /* }}} static int open_pidfile */
352
353 /* check existing pid file to see whether a daemon is running */
354 static int check_pidfile(void)
355 {
356   int pid_fd;
357   pid_t pid;
358   char pid_str[16];
359
360   pid_fd = open_pidfile("open", O_RDWR);
361   if (pid_fd < 0)
362     return pid_fd;
363
364   if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
365     return -1;
366
367   pid = atoi(pid_str);
368   if (pid <= 0)
369     return -1;
370
371   /* another running process that we can signal COULD be
372    * a competing rrdcached */
373   if (pid != getpid() && kill(pid, 0) == 0)
374   {
375     fprintf(stderr,
376             "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
377     close(pid_fd);
378     return -1;
379   }
380
381   lseek(pid_fd, 0, SEEK_SET);
382   if (ftruncate(pid_fd, 0) == -1)
383   {
384     fprintf(stderr,
385             "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
386     close(pid_fd);
387     return -1;
388   }
389
390   fprintf(stderr,
391           "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
392           "rrdcached: starting normally.\n", pid);
393
394   return pid_fd;
395 } /* }}} static int check_pidfile */
396
397 static int write_pidfile (int fd) /* {{{ */
398 {
399   pid_t pid;
400   FILE *fh;
401
402   pid = getpid ();
403
404   fh = fdopen (fd, "w");
405   if (fh == NULL)
406   {
407     RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
408     close(fd);
409     return (-1);
410   }
411
412   fprintf (fh, "%i\n", (int) pid);
413   fclose (fh);
414
415   return (0);
416 } /* }}} int write_pidfile */
417
418 static int remove_pidfile (void) /* {{{ */
419 {
420   char *file;
421   int status;
422
423   file = (config_pid_file != NULL)
424     ? config_pid_file
425     : LOCALSTATEDIR "/run/rrdcached.pid";
426
427   status = unlink (file);
428   if (status == 0)
429     return (0);
430   return (errno);
431 } /* }}} int remove_pidfile */
432
433 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
434 {
435   char *eol;
436
437   eol = memchr(sock->rbuf + sock->next_cmd, '\n',
438                sock->next_read - sock->next_cmd);
439
440   if (eol == NULL)
441   {
442     /* no commands left, move remainder back to front of rbuf */
443     memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
444             sock->next_read - sock->next_cmd);
445     sock->next_read -= sock->next_cmd;
446     sock->next_cmd = 0;
447     *len = 0;
448     return NULL;
449   }
450   else
451   {
452     char *cmd = sock->rbuf + sock->next_cmd;
453     *eol = '\0';
454
455     sock->next_cmd = eol - sock->rbuf + 1;
456
457     if (eol > sock->rbuf && *(eol-1) == '\r')
458       *(--eol) = '\0'; /* handle "\r\n" EOL */
459
460     *len = eol - cmd;
461
462     return cmd;
463   }
464
465   /* NOTREACHED */
466   assert(1==0);
467 }
468
469 /* add the characters directly to the write buffer */
470 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
471 {
472   char *new_buf;
473
474   assert(sock != NULL);
475
476   new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
477   if (new_buf == NULL)
478   {
479     RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
480     return -1;
481   }
482
483   strncpy(new_buf + sock->wbuf_len, str, len + 1);
484
485   sock->wbuf = new_buf;
486   sock->wbuf_len += len;
487
488   return 0;
489 } /* }}} static int add_to_wbuf */
490
491 /* add the text to the "extra" info that's sent after the status line */
492 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
493 {
494   va_list argp;
495   char buffer[CMD_MAX];
496   int len;
497
498   if (sock == NULL) return 0; /* journal replay mode */
499   if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
500
501   va_start(argp, fmt);
502 #ifdef HAVE_VSNPRINTF
503   len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
504 #else
505   len = vsprintf(buffer, fmt, argp);
506 #endif
507   va_end(argp);
508   if (len < 0)
509   {
510     RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
511     return -1;
512   }
513
514   return add_to_wbuf(sock, buffer, len);
515 } /* }}} static int add_response_info */
516
517 static int count_lines(char *str) /* {{{ */
518 {
519   int lines = 0;
520
521   if (str != NULL)
522   {
523     while ((str = strchr(str, '\n')) != NULL)
524     {
525       ++lines;
526       ++str;
527     }
528   }
529
530   return lines;
531 } /* }}} static int count_lines */
532
533 /* send the response back to the user.
534  * returns 0 on success, -1 on error
535  * write buffer is always zeroed after this call */
536 static int send_response (listen_socket_t *sock, response_code rc,
537                           char *fmt, ...) /* {{{ */
538 {
539   va_list argp;
540   char buffer[CMD_MAX];
541   int lines;
542   ssize_t wrote;
543   int rclen, len;
544
545   if (sock == NULL) return rc;  /* journal replay mode */
546
547   if (sock->batch_start)
548   {
549     if (rc == RESP_OK)
550       return rc; /* no response on success during BATCH */
551     lines = sock->batch_cmd;
552   }
553   else if (rc == RESP_OK)
554     lines = count_lines(sock->wbuf);
555   else
556     lines = -1;
557
558   rclen = sprintf(buffer, "%d ", lines);
559   va_start(argp, fmt);
560 #ifdef HAVE_VSNPRINTF
561   len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
562 #else
563   len = vsprintf(buffer+rclen, fmt, argp);
564 #endif
565   va_end(argp);
566   if (len < 0)
567     return -1;
568
569   len += rclen;
570
571   /* append the result to the wbuf, don't write to the user */
572   if (sock->batch_start)
573     return add_to_wbuf(sock, buffer, len);
574
575   /* first write must be complete */
576   if (len != write(sock->fd, buffer, len))
577   {
578     RRDD_LOG(LOG_INFO, "send_response: could not write status message");
579     return -1;
580   }
581
582   if (sock->wbuf != NULL && rc == RESP_OK)
583   {
584     wrote = 0;
585     while (wrote < sock->wbuf_len)
586     {
587       ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
588       if (wb <= 0)
589       {
590         RRDD_LOG(LOG_INFO, "send_response: could not write results");
591         return -1;
592       }
593       wrote += wb;
594     }
595   }
596
597   free(sock->wbuf); sock->wbuf = NULL;
598   sock->wbuf_len = 0;
599
600   return 0;
601 } /* }}} */
602
603 static void wipe_ci_values(cache_item_t *ci, time_t when)
604 {
605   ci->values = NULL;
606   ci->values_num = 0;
607
608   ci->last_flush_time = when;
609   if (config_write_jitter > 0)
610     ci->last_flush_time += (rrd_random() % config_write_jitter);
611 }
612
613 /* remove_from_queue
614  * remove a "cache_item_t" item from the queue.
615  * must hold 'cache_lock' when calling this
616  */
617 static void remove_from_queue(cache_item_t *ci) /* {{{ */
618 {
619   if (ci == NULL) return;
620   if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
621
622   if (ci->prev == NULL)
623     cache_queue_head = ci->next; /* reset head */
624   else
625     ci->prev->next = ci->next;
626
627   if (ci->next == NULL)
628     cache_queue_tail = ci->prev; /* reset the tail */
629   else
630     ci->next->prev = ci->prev;
631
632   ci->next = ci->prev = NULL;
633   ci->flags &= ~CI_FLAGS_IN_QUEUE;
634
635   pthread_mutex_lock (&stats_lock);
636   assert (stats_queue_length > 0);
637   stats_queue_length--;
638   pthread_mutex_unlock (&stats_lock);
639
640 } /* }}} static void remove_from_queue */
641
642 /* free the resources associated with the cache_item_t
643  * must hold cache_lock when calling this function
644  */
645 static void *free_cache_item(cache_item_t *ci) /* {{{ */
646 {
647   if (ci == NULL) return NULL;
648
649   remove_from_queue(ci);
650
651   for (size_t i=0; i < ci->values_num; i++)
652     free(ci->values[i]);
653
654   free (ci->values);
655   free (ci->file);
656
657   /* in case anyone is waiting */
658   pthread_cond_broadcast(&ci->flushed);
659   pthread_cond_destroy(&ci->flushed);
660
661   free (ci);
662
663   return NULL;
664 } /* }}} static void *free_cache_item */
665
666 /*
667  * enqueue_cache_item:
668  * `cache_lock' must be acquired before calling this function!
669  */
670 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
671     queue_side_t side)
672 {
673   if (ci == NULL)
674     return (-1);
675
676   if (ci->values_num == 0)
677     return (0);
678
679   if (side == HEAD)
680   {
681     if (cache_queue_head == ci)
682       return 0;
683
684     /* remove if further down in queue */
685     remove_from_queue(ci);
686
687     ci->prev = NULL;
688     ci->next = cache_queue_head;
689     if (ci->next != NULL)
690       ci->next->prev = ci;
691     cache_queue_head = ci;
692
693     if (cache_queue_tail == NULL)
694       cache_queue_tail = cache_queue_head;
695   }
696   else /* (side == TAIL) */
697   {
698     /* We don't move values back in the list.. */
699     if (ci->flags & CI_FLAGS_IN_QUEUE)
700       return (0);
701
702     assert (ci->next == NULL);
703     assert (ci->prev == NULL);
704
705     ci->prev = cache_queue_tail;
706
707     if (cache_queue_tail == NULL)
708       cache_queue_head = ci;
709     else
710       cache_queue_tail->next = ci;
711
712     cache_queue_tail = ci;
713   }
714
715   ci->flags |= CI_FLAGS_IN_QUEUE;
716
717   pthread_cond_signal(&queue_cond);
718   pthread_mutex_lock (&stats_lock);
719   stats_queue_length++;
720   pthread_mutex_unlock (&stats_lock);
721
722   return (0);
723 } /* }}} int enqueue_cache_item */
724
725 /*
726  * tree_callback_flush:
727  * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
728  * while this is in progress.
729  */
730 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
731     gpointer data)
732 {
733   cache_item_t *ci;
734   callback_flush_data_t *cfd;
735
736   ci = (cache_item_t *) value;
737   cfd = (callback_flush_data_t *) data;
738
739   if (ci->flags & CI_FLAGS_IN_QUEUE)
740     return FALSE;
741
742   if ((ci->last_flush_time <= cfd->abs_timeout)
743       && (ci->values_num > 0))
744   {
745     enqueue_cache_item (ci, TAIL);
746   }
747   else if ((do_shutdown != 0)
748       && (ci->values_num > 0))
749   {
750     enqueue_cache_item (ci, TAIL);
751   }
752   else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
753       && (ci->values_num <= 0))
754   {
755     assert ((char *) key == ci->file);
756     if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
757     {
758       RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
759       return (FALSE);
760     }
761   }
762
763   return (FALSE);
764 } /* }}} gboolean tree_callback_flush */
765
766 static int flush_old_values (int max_age)
767 {
768   callback_flush_data_t cfd;
769   size_t k;
770
771   memset (&cfd, 0, sizeof (cfd));
772   /* Pass the current time as user data so that we don't need to call
773    * `time' for each node. */
774   cfd.now = time (NULL);
775   cfd.keys = NULL;
776   cfd.keys_num = 0;
777
778   if (max_age > 0)
779     cfd.abs_timeout = cfd.now - max_age;
780   else
781     cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
782
783   /* `tree_callback_flush' will return the keys of all values that haven't
784    * been touched in the last `config_flush_interval' seconds in `cfd'.
785    * The char*'s in this array point to the same memory as ci->file, so we
786    * don't need to free them separately. */
787   g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
788
789   for (k = 0; k < cfd.keys_num; k++)
790   {
791     /* should never fail, since we have held the cache_lock
792      * the entire time */
793     assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE );
794   }
795
796   if (cfd.keys != NULL)
797   {
798     free (cfd.keys);
799     cfd.keys = NULL;
800   }
801
802   return (0);
803 } /* int flush_old_values */
804
805 static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */
806 {
807   struct timeval now;
808   struct timespec next_flush;
809   int status;
810
811   gettimeofday (&now, NULL);
812   next_flush.tv_sec = now.tv_sec + config_flush_interval;
813   next_flush.tv_nsec = 1000 * now.tv_usec;
814
815   pthread_mutex_lock(&cache_lock);
816
817   while (!do_shutdown)
818   {
819     gettimeofday (&now, NULL);
820     if ((now.tv_sec > next_flush.tv_sec)
821         || ((now.tv_sec == next_flush.tv_sec)
822           && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
823     {
824       /* Flush all values that haven't been written in the last
825        * `config_write_interval' seconds. */
826       flush_old_values (config_write_interval);
827
828       /* Determine the time of the next cache flush. */
829       next_flush.tv_sec =
830         now.tv_sec + next_flush.tv_sec % config_flush_interval;
831
832       /* unlock the cache while we rotate so we don't block incoming
833        * updates if the fsync() blocks on disk I/O */
834       pthread_mutex_unlock(&cache_lock);
835       journal_rotate();
836       pthread_mutex_lock(&cache_lock);
837     }
838
839     status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
840     if (status != 0 && status != ETIMEDOUT)
841     {
842       RRDD_LOG (LOG_ERR, "flush_thread_main: "
843                 "pthread_cond_timedwait returned %i.", status);
844     }
845   }
846
847   if (config_flush_at_shutdown)
848     flush_old_values (-1); /* flush everything */
849
850   pthread_mutex_unlock(&cache_lock);
851
852   return NULL;
853 } /* void *flush_thread_main */
854
855 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
856 {
857   pthread_mutex_lock (&cache_lock);
858
859   while (!do_shutdown
860          || (cache_queue_head != NULL && config_flush_at_shutdown))
861   {
862     cache_item_t *ci;
863     char *file;
864     char **values;
865     size_t values_num;
866     int status;
867
868     /* Now, check if there's something to store away. If not, wait until
869      * something comes in.  if we are shutting down, do not wait around.  */
870     if (cache_queue_head == NULL && !do_shutdown)
871     {
872       status = pthread_cond_wait (&queue_cond, &cache_lock);
873       if ((status != 0) && (status != ETIMEDOUT))
874       {
875         RRDD_LOG (LOG_ERR, "queue_thread_main: "
876             "pthread_cond_wait returned %i.", status);
877       }
878     }
879
880     /* Check if a value has arrived. This may be NULL if we timed out or there
881      * was an interrupt such as a signal. */
882     if (cache_queue_head == NULL)
883       continue;
884
885     ci = cache_queue_head;
886
887     /* copy the relevant parts */
888     file = strdup (ci->file);
889     if (file == NULL)
890     {
891       RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
892       continue;
893     }
894
895     assert(ci->values != NULL);
896     assert(ci->values_num > 0);
897
898     values = ci->values;
899     values_num = ci->values_num;
900
901     wipe_ci_values(ci, time(NULL));
902     remove_from_queue(ci);
903
904     pthread_mutex_unlock (&cache_lock);
905
906     rrd_clear_error ();
907     status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
908     if (status != 0)
909     {
910       RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
911           "rrd_update_r (%s) failed with status %i. (%s)",
912           file, status, rrd_get_error());
913     }
914
915     journal_write("wrote", file);
916
917     /* Search again in the tree.  It's possible someone issued a "FORGET"
918      * while we were writing the update values. */
919     pthread_mutex_lock(&cache_lock);
920     ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
921     if (ci)
922       pthread_cond_broadcast(&ci->flushed);
923     pthread_mutex_unlock(&cache_lock);
924
925     rrd_free_ptrs((void ***) &values, &values_num);
926     free(file);
927
928     if (status == 0)
929     {
930       pthread_mutex_lock (&stats_lock);
931       stats_updates_written++;
932       stats_data_sets_written += values_num;
933       pthread_mutex_unlock (&stats_lock);
934     }
935
936     pthread_mutex_lock (&cache_lock);
937   }
938   pthread_mutex_unlock (&cache_lock);
939
940   return (NULL);
941 } /* }}} void *queue_thread_main */
942
943 static int buffer_get_field (char **buffer_ret, /* {{{ */
944     size_t *buffer_size_ret, char **field_ret)
945 {
946   char *buffer;
947   size_t buffer_pos;
948   size_t buffer_size;
949   char *field;
950   size_t field_size;
951   int status;
952
953   buffer = *buffer_ret;
954   buffer_pos = 0;
955   buffer_size = *buffer_size_ret;
956   field = *buffer_ret;
957   field_size = 0;
958
959   if (buffer_size <= 0)
960     return (-1);
961
962   /* This is ensured by `handle_request'. */
963   assert (buffer[buffer_size - 1] == '\0');
964
965   status = -1;
966   while (buffer_pos < buffer_size)
967   {
968     /* Check for end-of-field or end-of-buffer */
969     if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
970     {
971       field[field_size] = 0;
972       field_size++;
973       buffer_pos++;
974       status = 0;
975       break;
976     }
977     /* Handle escaped characters. */
978     else if (buffer[buffer_pos] == '\\')
979     {
980       if (buffer_pos >= (buffer_size - 1))
981         break;
982       buffer_pos++;
983       field[field_size] = buffer[buffer_pos];
984       field_size++;
985       buffer_pos++;
986     }
987     /* Normal operation */ 
988     else
989     {
990       field[field_size] = buffer[buffer_pos];
991       field_size++;
992       buffer_pos++;
993     }
994   } /* while (buffer_pos < buffer_size) */
995
996   if (status != 0)
997     return (status);
998
999   *buffer_ret = buffer + buffer_pos;
1000   *buffer_size_ret = buffer_size - buffer_pos;
1001   *field_ret = field;
1002
1003   return (0);
1004 } /* }}} int buffer_get_field */
1005
1006 /* if we're restricting writes to the base directory,
1007  * check whether the file falls within the dir
1008  * returns 1 if OK, otherwise 0
1009  */
1010 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1011 {
1012   assert(file != NULL);
1013
1014   if (!config_write_base_only
1015       || sock == NULL /* journal replay */
1016       || config_base_dir == NULL)
1017     return 1;
1018
1019   if (strstr(file, "../") != NULL) goto err;
1020
1021   /* relative paths without "../" are ok */
1022   if (*file != '/') return 1;
1023
1024   /* file must be of the format base + "/" + <1+ char filename> */
1025   if (strlen(file) < _config_base_dir_len + 2) goto err;
1026   if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1027   if (*(file + _config_base_dir_len) != '/') goto err;
1028
1029   return 1;
1030
1031 err:
1032   if (sock != NULL && sock->fd >= 0)
1033     send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1034
1035   return 0;
1036 } /* }}} static int check_file_access */
1037
1038 /* when using a base dir, convert relative paths to absolute paths.
1039  * if necessary, modifies the "filename" pointer to point
1040  * to the new path created in "tmp".  "tmp" is provided
1041  * by the caller and sizeof(tmp) must be >= PATH_MAX.
1042  *
1043  * this allows us to optimize for the expected case (absolute path)
1044  * with a no-op.
1045  */
1046 static void get_abs_path(char **filename, char *tmp)
1047 {
1048   assert(tmp != NULL);
1049   assert(filename != NULL && *filename != NULL);
1050
1051   if (config_base_dir == NULL || **filename == '/')
1052     return;
1053
1054   snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1055   *filename = tmp;
1056 } /* }}} static int get_abs_path */
1057
1058 /* returns 1 if we have the required privilege level,
1059  * otherwise issue an error to the user on sock */
1060 static int has_privilege (listen_socket_t *sock, /* {{{ */
1061                           socket_privilege priv)
1062 {
1063   if (sock == NULL) /* journal replay */
1064     return 1;
1065
1066   if (sock->privilege >= priv)
1067     return 1;
1068
1069   return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1070 } /* }}} static int has_privilege */
1071
1072 static int flush_file (const char *filename) /* {{{ */
1073 {
1074   cache_item_t *ci;
1075
1076   pthread_mutex_lock (&cache_lock);
1077
1078   ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1079   if (ci == NULL)
1080   {
1081     pthread_mutex_unlock (&cache_lock);
1082     return (ENOENT);
1083   }
1084
1085   if (ci->values_num > 0)
1086   {
1087     /* Enqueue at head */
1088     enqueue_cache_item (ci, HEAD);
1089     pthread_cond_wait(&ci->flushed, &cache_lock);
1090   }
1091
1092   /* DO NOT DO ANYTHING WITH ci HERE!!  The entry
1093    * may have been purged during our cond_wait() */
1094
1095   pthread_mutex_unlock(&cache_lock);
1096
1097   return (0);
1098 } /* }}} int flush_file */
1099
1100 static int syntax_error(listen_socket_t *sock, struct command *cmd) /* {{{ */
1101 {
1102   char *err = "Syntax error.\n";
1103
1104   if (cmd && cmd->syntax)
1105     err = cmd->syntax;
1106
1107   return send_response(sock, RESP_ERR, "Usage: %s", err);
1108 } /* }}} static int syntax_error() */
1109
1110 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1111 {
1112   uint64_t copy_queue_length;
1113   uint64_t copy_updates_received;
1114   uint64_t copy_flush_received;
1115   uint64_t copy_updates_written;
1116   uint64_t copy_data_sets_written;
1117   uint64_t copy_journal_bytes;
1118   uint64_t copy_journal_rotate;
1119
1120   uint64_t tree_nodes_number;
1121   uint64_t tree_depth;
1122
1123   pthread_mutex_lock (&stats_lock);
1124   copy_queue_length       = stats_queue_length;
1125   copy_updates_received   = stats_updates_received;
1126   copy_flush_received     = stats_flush_received;
1127   copy_updates_written    = stats_updates_written;
1128   copy_data_sets_written  = stats_data_sets_written;
1129   copy_journal_bytes      = stats_journal_bytes;
1130   copy_journal_rotate     = stats_journal_rotate;
1131   pthread_mutex_unlock (&stats_lock);
1132
1133   pthread_mutex_lock (&cache_lock);
1134   tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1135   tree_depth        = (uint64_t) g_tree_height (cache_tree);
1136   pthread_mutex_unlock (&cache_lock);
1137
1138   add_response_info(sock,
1139                     "QueueLength: %"PRIu64"\n", copy_queue_length);
1140   add_response_info(sock,
1141                     "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1142   add_response_info(sock,
1143                     "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1144   add_response_info(sock,
1145                     "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1146   add_response_info(sock,
1147                     "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1148   add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1149   add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1150   add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1151   add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1152
1153   send_response(sock, RESP_OK, "Statistics follow\n");
1154
1155   return (0);
1156 } /* }}} int handle_request_stats */
1157
1158 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1159 {
1160   char *file, file_tmp[PATH_MAX];
1161   int status;
1162
1163   status = buffer_get_field (&buffer, &buffer_size, &file);
1164   if (status != 0)
1165   {
1166     return syntax_error(sock,cmd);
1167   }
1168   else
1169   {
1170     pthread_mutex_lock(&stats_lock);
1171     stats_flush_received++;
1172     pthread_mutex_unlock(&stats_lock);
1173
1174     get_abs_path(&file, file_tmp);
1175     if (!check_file_access(file, sock)) return 0;
1176
1177     status = flush_file (file);
1178     if (status == 0)
1179       return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1180     else if (status == ENOENT)
1181     {
1182       /* no file in our tree; see whether it exists at all */
1183       struct stat statbuf;
1184
1185       memset(&statbuf, 0, sizeof(statbuf));
1186       if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1187         return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1188       else
1189         return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1190     }
1191     else if (status < 0)
1192       return send_response(sock, RESP_ERR, "Internal error.\n");
1193     else
1194       return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1195   }
1196
1197   /* NOTREACHED */
1198   assert(1==0);
1199 } /* }}} int handle_request_flush */
1200
1201 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1202 {
1203   RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1204
1205   pthread_mutex_lock(&cache_lock);
1206   flush_old_values(-1);
1207   pthread_mutex_unlock(&cache_lock);
1208
1209   return send_response(sock, RESP_OK, "Started flush.\n");
1210 } /* }}} static int handle_request_flushall */
1211
1212 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1213 {
1214   int status;
1215   char *file, file_tmp[PATH_MAX];
1216   cache_item_t *ci;
1217
1218   status = buffer_get_field(&buffer, &buffer_size, &file);
1219   if (status != 0)
1220     return syntax_error(sock,cmd);
1221
1222   get_abs_path(&file, file_tmp);
1223
1224   pthread_mutex_lock(&cache_lock);
1225   ci = g_tree_lookup(cache_tree, file);
1226   if (ci == NULL)
1227   {
1228     pthread_mutex_unlock(&cache_lock);
1229     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1230   }
1231
1232   for (size_t i=0; i < ci->values_num; i++)
1233     add_response_info(sock, "%s\n", ci->values[i]);
1234
1235   pthread_mutex_unlock(&cache_lock);
1236   return send_response(sock, RESP_OK, "updates pending\n");
1237 } /* }}} static int handle_request_pending */
1238
1239 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1240 {
1241   int status;
1242   gboolean found;
1243   char *file, file_tmp[PATH_MAX];
1244
1245   status = buffer_get_field(&buffer, &buffer_size, &file);
1246   if (status != 0)
1247     return syntax_error(sock,cmd);
1248
1249   get_abs_path(&file, file_tmp);
1250   if (!check_file_access(file, sock)) return 0;
1251
1252   pthread_mutex_lock(&cache_lock);
1253   found = g_tree_remove(cache_tree, file);
1254   pthread_mutex_unlock(&cache_lock);
1255
1256   if (found == TRUE)
1257   {
1258     if (sock != NULL)
1259       journal_write("forget", file);
1260
1261     return send_response(sock, RESP_OK, "Gone!\n");
1262   }
1263   else
1264     return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265
1266   /* NOTREACHED */
1267   assert(1==0);
1268 } /* }}} static int handle_request_forget */
1269
1270 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1271 {
1272   cache_item_t *ci;
1273
1274   pthread_mutex_lock(&cache_lock);
1275
1276   ci = cache_queue_head;
1277   while (ci != NULL)
1278   {
1279     add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1280     ci = ci->next;
1281   }
1282
1283   pthread_mutex_unlock(&cache_lock);
1284
1285   return send_response(sock, RESP_OK, "in queue.\n");
1286 } /* }}} int handle_request_queue */
1287
1288 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1289 {
1290   char *file, file_tmp[PATH_MAX];
1291   int values_num = 0;
1292   int status;
1293   char orig_buf[CMD_MAX];
1294
1295   cache_item_t *ci;
1296
1297   /* save it for the journal later */
1298   strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1299
1300   status = buffer_get_field (&buffer, &buffer_size, &file);
1301   if (status != 0)
1302     return syntax_error(sock,cmd);
1303
1304   pthread_mutex_lock(&stats_lock);
1305   stats_updates_received++;
1306   pthread_mutex_unlock(&stats_lock);
1307
1308   get_abs_path(&file, file_tmp);
1309   if (!check_file_access(file, sock)) return 0;
1310
1311   pthread_mutex_lock (&cache_lock);
1312   ci = g_tree_lookup (cache_tree, file);
1313
1314   if (ci == NULL) /* {{{ */
1315   {
1316     struct stat statbuf;
1317
1318     /* don't hold the lock while we setup; stat(2) might block */
1319     pthread_mutex_unlock(&cache_lock);
1320
1321     memset (&statbuf, 0, sizeof (statbuf));
1322     status = stat (file, &statbuf);
1323     if (status != 0)
1324     {
1325       RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1326
1327       status = errno;
1328       if (status == ENOENT)
1329         return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1330       else
1331         return send_response(sock, RESP_ERR,
1332                              "stat failed with error %i.\n", status);
1333     }
1334     if (!S_ISREG (statbuf.st_mode))
1335       return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1336
1337     if (access(file, R_OK|W_OK) != 0)
1338       return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1339                            file, rrd_strerror(errno));
1340
1341     ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1342     if (ci == NULL)
1343     {
1344       RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1345
1346       return send_response(sock, RESP_ERR, "malloc failed.\n");
1347     }
1348     memset (ci, 0, sizeof (cache_item_t));
1349
1350     ci->file = strdup (file);
1351     if (ci->file == NULL)
1352     {
1353       free (ci);
1354       RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1355
1356       return send_response(sock, RESP_ERR, "strdup failed.\n");
1357     }
1358
1359     wipe_ci_values(ci, now);
1360     ci->flags = CI_FLAGS_IN_TREE;
1361     pthread_cond_init(&ci->flushed, NULL);
1362
1363     pthread_mutex_lock(&cache_lock);
1364     g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1365   } /* }}} */
1366   assert (ci != NULL);
1367
1368   /* don't re-write updates in replay mode */
1369   if (sock != NULL)
1370     journal_write("update", orig_buf);
1371
1372   while (buffer_size > 0)
1373   {
1374     char *value;
1375     time_t stamp;
1376     char *eostamp;
1377
1378     status = buffer_get_field (&buffer, &buffer_size, &value);
1379     if (status != 0)
1380     {
1381       RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1382       break;
1383     }
1384
1385     /* make sure update time is always moving forward */
1386     stamp = strtol(value, &eostamp, 10);
1387     if (eostamp == value || eostamp == NULL || *eostamp != ':')
1388     {
1389       pthread_mutex_unlock(&cache_lock);
1390       return send_response(sock, RESP_ERR,
1391                            "Cannot find timestamp in '%s'!\n", value);
1392     }
1393     else if (stamp <= ci->last_update_stamp)
1394     {
1395       pthread_mutex_unlock(&cache_lock);
1396       return send_response(sock, RESP_ERR,
1397                            "illegal attempt to update using time %ld when last"
1398                            " update time is %ld (minimum one second step)\n",
1399                            stamp, ci->last_update_stamp);
1400     }
1401     else
1402       ci->last_update_stamp = stamp;
1403
1404     if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
1405     {
1406       RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1407       continue;
1408     }
1409
1410     values_num++;
1411   }
1412
1413   if (((now - ci->last_flush_time) >= config_write_interval)
1414       && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1415       && (ci->values_num > 0))
1416   {
1417     enqueue_cache_item (ci, TAIL);
1418   }
1419
1420   pthread_mutex_unlock (&cache_lock);
1421
1422   if (values_num < 1)
1423     return send_response(sock, RESP_ERR, "No values updated.\n");
1424   else
1425     return send_response(sock, RESP_OK,
1426                          "errors, enqueued %i value(s).\n", values_num);
1427
1428   /* NOTREACHED */
1429   assert(1==0);
1430
1431 } /* }}} int handle_request_update */
1432
1433 /* we came across a "WROTE" entry during journal replay.
1434  * throw away any values that we have accumulated for this file
1435  */
1436 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1437 {
1438   cache_item_t *ci;
1439   const char *file = buffer;
1440
1441   pthread_mutex_lock(&cache_lock);
1442
1443   ci = g_tree_lookup(cache_tree, file);
1444   if (ci == NULL)
1445   {
1446     pthread_mutex_unlock(&cache_lock);
1447     return (0);
1448   }
1449
1450   if (ci->values)
1451     rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1452
1453   wipe_ci_values(ci, now);
1454   remove_from_queue(ci);
1455
1456   pthread_mutex_unlock(&cache_lock);
1457   return (0);
1458 } /* }}} int handle_request_wrote */
1459
1460 /* start "BATCH" processing */
1461 static int batch_start (HANDLER_PROTO) /* {{{ */
1462 {
1463   int status;
1464   if (sock->batch_start)
1465     return send_response(sock, RESP_ERR, "Already in BATCH\n");
1466
1467   status = send_response(sock, RESP_OK,
1468                          "Go ahead.  End with dot '.' on its own line.\n");
1469   sock->batch_start = time(NULL);
1470   sock->batch_cmd = 0;
1471
1472   return status;
1473 } /* }}} static int batch_start */
1474
1475 /* finish "BATCH" processing and return results to the client */
1476 static int batch_done (HANDLER_PROTO) /* {{{ */
1477 {
1478   assert(sock->batch_start);
1479   sock->batch_start = 0;
1480   sock->batch_cmd  = 0;
1481   return send_response(sock, RESP_OK, "errors\n");
1482 } /* }}} static int batch_done */
1483
1484 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1485 {
1486   return -1;
1487 } /* }}} static int handle_request_quit */
1488
1489 struct command COMMANDS[] = {
1490   {
1491     "UPDATE",
1492     handle_request_update,
1493     PRIV_HIGH,
1494     CMD_CONTEXT_ANY,
1495     "UPDATE <filename> <values> [<values> ...]\n"
1496     ,
1497     "Adds the given file to the internal cache if it is not yet known and\n"
1498     "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1499     "for details.\n"
1500     "\n"
1501     "Each <values> has the following form:\n"
1502     "  <values> = <time>:<value>[:<value>[...]]\n"
1503     "See the rrdupdate(1) manpage for details.\n"
1504   },
1505   {
1506     "WROTE",
1507     handle_request_wrote,
1508     PRIV_HIGH,
1509     CMD_CONTEXT_JOURNAL,
1510     NULL,
1511     NULL
1512   },
1513   {
1514     "FLUSH",
1515     handle_request_flush,
1516     PRIV_LOW,
1517     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1518     "FLUSH <filename>\n"
1519     ,
1520     "Adds the given filename to the head of the update queue and returns\n"
1521     "after it has been dequeued.\n"
1522   },
1523   {
1524     "FLUSHALL",
1525     handle_request_flushall,
1526     PRIV_HIGH,
1527     CMD_CONTEXT_CLIENT,
1528     "FLUSHALL\n"
1529     ,
1530     "Triggers writing of all pending updates.  Returns immediately.\n"
1531   },
1532   {
1533     "PENDING",
1534     handle_request_pending,
1535     PRIV_HIGH,
1536     CMD_CONTEXT_CLIENT,
1537     "PENDING <filename>\n"
1538     ,
1539     "Shows any 'pending' updates for a file, in order.\n"
1540     "The updates shown have not yet been written to the underlying RRD file.\n"
1541   },
1542   {
1543     "FORGET",
1544     handle_request_forget,
1545     PRIV_HIGH,
1546     CMD_CONTEXT_ANY,
1547     "FORGET <filename>\n"
1548     ,
1549     "Removes the file completely from the cache.\n"
1550     "Any pending updates for the file will be lost.\n"
1551   },
1552   {
1553     "QUEUE",
1554     handle_request_queue,
1555     PRIV_LOW,
1556     CMD_CONTEXT_CLIENT,
1557     "QUEUE\n"
1558     ,
1559         "Shows all files in the output queue.\n"
1560     "The output is zero or more lines in the following format:\n"
1561     "(where <num_vals> is the number of values to be written)\n"
1562     "\n"
1563     "<num_vals> <filename>\n"
1564   },
1565   {
1566     "STATS",
1567     handle_request_stats,
1568     PRIV_LOW,
1569     CMD_CONTEXT_CLIENT,
1570     "STATS\n"
1571     ,
1572     "Returns some performance counters, see the rrdcached(1) manpage for\n"
1573     "a description of the values.\n"
1574   },
1575   {
1576     "HELP",
1577     handle_request_help,
1578     PRIV_LOW,
1579     CMD_CONTEXT_CLIENT,
1580     "HELP [<command>]\n",
1581     NULL, /* special! */
1582   },
1583   {
1584     "BATCH",
1585     batch_start,
1586     PRIV_LOW,
1587     CMD_CONTEXT_CLIENT,
1588     "BATCH\n"
1589     ,
1590     "The 'BATCH' command permits the client to initiate a bulk load\n"
1591     "   of commands to rrdcached.\n"
1592     "\n"
1593     "Usage:\n"
1594     "\n"
1595     "    client: BATCH\n"
1596     "    server: 0 Go ahead.  End with dot '.' on its own line.\n"
1597     "    client: command #1\n"
1598     "    client: command #2\n"
1599     "    client: ... and so on\n"
1600     "    client: .\n"
1601     "    server: 2 errors\n"
1602     "    server: 7 message for command #7\n"
1603     "    server: 9 message for command #9\n"
1604     "\n"
1605     "For more information, consult the rrdcached(1) documentation.\n"
1606   },
1607   {
1608     ".",   /* BATCH terminator */
1609     batch_done,
1610     PRIV_LOW,
1611     CMD_CONTEXT_BATCH,
1612     NULL,
1613     NULL
1614   },
1615   {
1616     "QUIT",
1617     handle_request_quit,
1618     PRIV_LOW,
1619     CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1620     "QUIT\n"
1621     ,
1622     "Disconnect from rrdcached.\n"
1623   },
1624   {NULL,NULL,0,0,NULL,NULL}  /* LAST ENTRY */
1625 };
1626
1627 static struct command *find_command(char *cmd)
1628 {
1629   struct command *c = COMMANDS;
1630
1631   while (c->cmd != NULL)
1632   {
1633     if (strcasecmp(cmd, c->cmd) == 0)
1634       break;
1635     c++;
1636   }
1637
1638   if (c->cmd == NULL)
1639     return NULL;
1640   else
1641     return c;
1642 }
1643
1644 /* check whether commands are received in the expected context */
1645 static int command_check_context(listen_socket_t *sock, struct command *cmd)
1646 {
1647   if (sock == NULL)
1648     return (cmd->context & CMD_CONTEXT_JOURNAL);
1649   else if (sock->batch_start)
1650     return (cmd->context & CMD_CONTEXT_BATCH);
1651   else
1652     return (cmd->context & CMD_CONTEXT_CLIENT);
1653
1654   /* NOTREACHED */
1655   assert(1==0);
1656 }
1657
1658 static int handle_request_help (HANDLER_PROTO) /* {{{ */
1659 {
1660   int status;
1661   char *cmd_str;
1662   char *resp_txt;
1663   struct command *help = NULL;
1664
1665   status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
1666   if (status == 0)
1667     help = find_command(cmd_str);
1668
1669   if (help && (help->syntax || help->help))
1670   {
1671     char tmp[CMD_MAX];
1672
1673     snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
1674     resp_txt = tmp;
1675
1676     if (help->syntax)
1677       add_response_info(sock, "Usage: %s\n", help->syntax);
1678
1679     if (help->help)
1680       add_response_info(sock, "%s\n", help->help);
1681   }
1682   else
1683   {
1684     help = COMMANDS;
1685     resp_txt = "Command overview\n";
1686
1687     while (help->cmd)
1688     {
1689       if (help->syntax)
1690         add_response_info(sock, "%s", help->syntax);
1691       help++;
1692     }
1693   }
1694
1695   return send_response(sock, RESP_OK, resp_txt);
1696 } /* }}} int handle_request_help */
1697
1698 /* if sock==NULL, we are in journal replay mode */
1699 static int handle_request (DISPATCH_PROTO) /* {{{ */
1700 {
1701   char *buffer_ptr = buffer;
1702   char *cmd_str = NULL;
1703   struct command *cmd = NULL;
1704   int status;
1705
1706   assert (buffer[buffer_size - 1] == '\0');
1707
1708   status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
1709   if (status != 0)
1710   {
1711     RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1712     return (-1);
1713   }
1714
1715   if (sock != NULL && sock->batch_start)
1716     sock->batch_cmd++;
1717
1718   cmd = find_command(cmd_str);
1719   if (!cmd)
1720     return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
1721
1722   status = has_privilege(sock, cmd->min_priv);
1723   if (status <= 0)
1724     return status;
1725
1726   if (!command_check_context(sock, cmd))
1727     return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
1728
1729   return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
1730 } /* }}} int handle_request */
1731
1732 /* MUST NOT hold journal_lock before calling this */
1733 static void journal_rotate(void) /* {{{ */
1734 {
1735   FILE *old_fh = NULL;
1736   int new_fd;
1737
1738   if (journal_cur == NULL || journal_old == NULL)
1739     return;
1740
1741   pthread_mutex_lock(&journal_lock);
1742
1743   /* we rotate this way (rename before close) so that the we can release
1744    * the journal lock as fast as possible.  Journal writes to the new
1745    * journal can proceed immediately after the new file is opened.  The
1746    * fclose can then block without affecting new updates.
1747    */
1748   if (journal_fh != NULL)
1749   {
1750     old_fh = journal_fh;
1751     journal_fh = NULL;
1752     rename(journal_cur, journal_old);
1753     ++stats_journal_rotate;
1754   }
1755
1756   new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1757                 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1758   if (new_fd >= 0)
1759   {
1760     journal_fh = fdopen(new_fd, "a");
1761     if (journal_fh == NULL)
1762       close(new_fd);
1763   }
1764
1765   pthread_mutex_unlock(&journal_lock);
1766
1767   if (old_fh != NULL)
1768     fclose(old_fh);
1769
1770   if (journal_fh == NULL)
1771   {
1772     RRDD_LOG(LOG_CRIT,
1773              "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1774              journal_cur, rrd_strerror(errno));
1775
1776     RRDD_LOG(LOG_ERR,
1777              "JOURNALING DISABLED: All values will be flushed at shutdown");
1778     config_flush_at_shutdown = 1;
1779   }
1780
1781 } /* }}} static void journal_rotate */
1782
1783 static void journal_done(void) /* {{{ */
1784 {
1785   if (journal_cur == NULL)
1786     return;
1787
1788   pthread_mutex_lock(&journal_lock);
1789   if (journal_fh != NULL)
1790   {
1791     fclose(journal_fh);
1792     journal_fh = NULL;
1793   }
1794
1795   if (config_flush_at_shutdown)
1796   {
1797     RRDD_LOG(LOG_INFO, "removing journals");
1798     unlink(journal_old);
1799     unlink(journal_cur);
1800   }
1801   else
1802   {
1803     RRDD_LOG(LOG_INFO, "expedited shutdown; "
1804              "journals will be used at next startup");
1805   }
1806
1807   pthread_mutex_unlock(&journal_lock);
1808
1809 } /* }}} static void journal_done */
1810
1811 static int journal_write(char *cmd, char *args) /* {{{ */
1812 {
1813   int chars;
1814
1815   if (journal_fh == NULL)
1816     return 0;
1817
1818   pthread_mutex_lock(&journal_lock);
1819   chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1820   pthread_mutex_unlock(&journal_lock);
1821
1822   if (chars > 0)
1823   {
1824     pthread_mutex_lock(&stats_lock);
1825     stats_journal_bytes += chars;
1826     pthread_mutex_unlock(&stats_lock);
1827   }
1828
1829   return chars;
1830 } /* }}} static int journal_write */
1831
1832 static int journal_replay (const char *file) /* {{{ */
1833 {
1834   FILE *fh;
1835   int entry_cnt = 0;
1836   int fail_cnt = 0;
1837   uint64_t line = 0;
1838   char entry[CMD_MAX];
1839   time_t now;
1840
1841   if (file == NULL) return 0;
1842
1843   {
1844     char *reason = "unknown error";
1845     int status = 0;
1846     struct stat statbuf;
1847
1848     memset(&statbuf, 0, sizeof(statbuf));
1849     if (stat(file, &statbuf) != 0)
1850     {
1851       if (errno == ENOENT)
1852         return 0;
1853
1854       reason = "stat error";
1855       status = errno;
1856     }
1857     else if (!S_ISREG(statbuf.st_mode))
1858     {
1859       reason = "not a regular file";
1860       status = EPERM;
1861     }
1862     if (statbuf.st_uid != daemon_uid)
1863     {
1864       reason = "not owned by daemon user";
1865       status = EACCES;
1866     }
1867     if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1868     {
1869       reason = "must not be user/group writable";
1870       status = EACCES;
1871     }
1872
1873     if (status != 0)
1874     {
1875       RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1876                file, rrd_strerror(status), reason);
1877       return 0;
1878     }
1879   }
1880
1881   fh = fopen(file, "r");
1882   if (fh == NULL)
1883   {
1884     if (errno != ENOENT)
1885       RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1886                file, rrd_strerror(errno));
1887     return 0;
1888   }
1889   else
1890     RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1891
1892   now = time(NULL);
1893
1894   while(!feof(fh))
1895   {
1896     size_t entry_len;
1897
1898     ++line;
1899     if (fgets(entry, sizeof(entry), fh) == NULL)
1900       break;
1901     entry_len = strlen(entry);
1902
1903     /* check \n termination in case journal writing crashed mid-line */
1904     if (entry_len == 0)
1905       continue;
1906     else if (entry[entry_len - 1] != '\n')
1907     {
1908       RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1909       ++fail_cnt;
1910       continue;
1911     }
1912
1913     entry[entry_len - 1] = '\0';
1914
1915     if (handle_request(NULL, now, entry, entry_len) == 0)
1916       ++entry_cnt;
1917     else
1918       ++fail_cnt;
1919   }
1920
1921   fclose(fh);
1922
1923   RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1924            entry_cnt, fail_cnt);
1925
1926   return entry_cnt > 0 ? 1 : 0;
1927 } /* }}} static int journal_replay */
1928
1929 static void journal_init(void) /* {{{ */
1930 {
1931   int had_journal = 0;
1932
1933   if (journal_cur == NULL) return;
1934
1935   pthread_mutex_lock(&journal_lock);
1936
1937   RRDD_LOG(LOG_INFO, "checking for journal files");
1938
1939   had_journal += journal_replay(journal_old);
1940   had_journal += journal_replay(journal_cur);
1941
1942   /* it must have been a crash.  start a flush */
1943   if (had_journal && config_flush_at_shutdown)
1944     flush_old_values(-1);
1945
1946   pthread_mutex_unlock(&journal_lock);
1947   journal_rotate();
1948
1949   RRDD_LOG(LOG_INFO, "journal processing complete");
1950
1951 } /* }}} static void journal_init */
1952
1953 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
1954 {
1955   assert(sock != NULL);
1956
1957   free(sock->rbuf);  sock->rbuf = NULL;
1958   free(sock->wbuf);  sock->wbuf = NULL;
1959   free(sock);
1960 } /* }}} void free_listen_socket */
1961
1962 static void close_connection(listen_socket_t *sock) /* {{{ */
1963 {
1964   if (sock->fd >= 0)
1965   {
1966     close(sock->fd);
1967     sock->fd = -1;
1968   }
1969
1970   free_listen_socket(sock);
1971
1972 } /* }}} void close_connection */
1973
1974 static void *connection_thread_main (void *args) /* {{{ */
1975 {
1976   listen_socket_t *sock;
1977   int fd;
1978
1979   sock = (listen_socket_t *) args;
1980   fd = sock->fd;
1981
1982   /* init read buffers */
1983   sock->next_read = sock->next_cmd = 0;
1984   sock->rbuf = malloc(RBUF_SIZE);
1985   if (sock->rbuf == NULL)
1986   {
1987     RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1988     close_connection(sock);
1989     return NULL;
1990   }
1991
1992   pthread_mutex_lock (&connection_threads_lock);
1993   connection_threads_num++;
1994   pthread_mutex_unlock (&connection_threads_lock);
1995
1996   while (do_shutdown == 0)
1997   {
1998     char *cmd;
1999     ssize_t cmd_len;
2000     ssize_t rbytes;
2001     time_t now;
2002
2003     struct pollfd pollfd;
2004     int status;
2005
2006     pollfd.fd = fd;
2007     pollfd.events = POLLIN | POLLPRI;
2008     pollfd.revents = 0;
2009
2010     status = poll (&pollfd, 1, /* timeout = */ 500);
2011     if (do_shutdown)
2012       break;
2013     else if (status == 0) /* timeout */
2014       continue;
2015     else if (status < 0) /* error */
2016     {
2017       status = errno;
2018       if (status != EINTR)
2019         RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2020       continue;
2021     }
2022
2023     if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2024       break;
2025     else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2026     {
2027       RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2028           "poll(2) returned something unexpected: %#04hx",
2029           pollfd.revents);
2030       break;
2031     }
2032
2033     rbytes = read(fd, sock->rbuf + sock->next_read,
2034                   RBUF_SIZE - sock->next_read);
2035     if (rbytes < 0)
2036     {
2037       RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2038       break;
2039     }
2040     else if (rbytes == 0)
2041       break; /* eof */
2042
2043     sock->next_read += rbytes;
2044
2045     if (sock->batch_start)
2046       now = sock->batch_start;
2047     else
2048       now = time(NULL);
2049
2050     while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2051     {
2052       status = handle_request (sock, now, cmd, cmd_len+1);
2053       if (status != 0)
2054         goto out_close;
2055     }
2056   }
2057
2058 out_close:
2059   close_connection(sock);
2060
2061   /* Remove this thread from the connection threads list */
2062   pthread_mutex_lock (&connection_threads_lock);
2063   connection_threads_num--;
2064   if (connection_threads_num <= 0)
2065     pthread_cond_broadcast(&connection_threads_done);
2066   pthread_mutex_unlock (&connection_threads_lock);
2067
2068   return (NULL);
2069 } /* }}} void *connection_thread_main */
2070
2071 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2072 {
2073   int fd;
2074   struct sockaddr_un sa;
2075   listen_socket_t *temp;
2076   int status;
2077   const char *path;
2078
2079   path = sock->addr;
2080   if (strncmp(path, "unix:", strlen("unix:")) == 0)
2081     path += strlen("unix:");
2082
2083   temp = (listen_socket_t *) rrd_realloc (listen_fds,
2084       sizeof (listen_fds[0]) * (listen_fds_num + 1));
2085   if (temp == NULL)
2086   {
2087     fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2088     return (-1);
2089   }
2090   listen_fds = temp;
2091   memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2092
2093   fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2094   if (fd < 0)
2095   {
2096     fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2097              rrd_strerror(errno));
2098     return (-1);
2099   }
2100
2101   memset (&sa, 0, sizeof (sa));
2102   sa.sun_family = AF_UNIX;
2103   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2104
2105   /* if we've gotten this far, we own the pid file.  any daemon started
2106    * with the same args must not be alive.  therefore, ensure that we can
2107    * create the socket...
2108    */
2109   unlink(path);
2110
2111   status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2112   if (status != 0)
2113   {
2114     fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2115              path, rrd_strerror(errno));
2116     close (fd);
2117     return (-1);
2118   }
2119
2120   status = listen (fd, /* backlog = */ 10);
2121   if (status != 0)
2122   {
2123     fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2124              path, rrd_strerror(errno));
2125     close (fd);
2126     unlink (path);
2127     return (-1);
2128   }
2129
2130   listen_fds[listen_fds_num].fd = fd;
2131   listen_fds[listen_fds_num].family = PF_UNIX;
2132   strncpy(listen_fds[listen_fds_num].addr, path,
2133           sizeof (listen_fds[listen_fds_num].addr) - 1);
2134   listen_fds_num++;
2135
2136   return (0);
2137 } /* }}} int open_listen_socket_unix */
2138
2139 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2140 {
2141   struct addrinfo ai_hints;
2142   struct addrinfo *ai_res;
2143   struct addrinfo *ai_ptr;
2144   char addr_copy[NI_MAXHOST];
2145   char *addr;
2146   char *port;
2147   int status;
2148
2149   strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2150   addr_copy[sizeof (addr_copy) - 1] = 0;
2151   addr = addr_copy;
2152
2153   memset (&ai_hints, 0, sizeof (ai_hints));
2154   ai_hints.ai_flags = 0;
2155 #ifdef AI_ADDRCONFIG
2156   ai_hints.ai_flags |= AI_ADDRCONFIG;
2157 #endif
2158   ai_hints.ai_family = AF_UNSPEC;
2159   ai_hints.ai_socktype = SOCK_STREAM;
2160
2161   port = NULL;
2162   if (*addr == '[') /* IPv6+port format */
2163   {
2164     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2165     addr++;
2166
2167     port = strchr (addr, ']');
2168     if (port == NULL)
2169     {
2170       fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2171       return (-1);
2172     }
2173     *port = 0;
2174     port++;
2175
2176     if (*port == ':')
2177       port++;
2178     else if (*port == 0)
2179       port = NULL;
2180     else
2181     {
2182       fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2183       return (-1);
2184     }
2185   } /* if (*addr = ']') */
2186   else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2187   {
2188     port = rindex(addr, ':');
2189     if (port != NULL)
2190     {
2191       *port = 0;
2192       port++;
2193     }
2194   }
2195   ai_res = NULL;
2196   status = getaddrinfo (addr,
2197                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2198                         &ai_hints, &ai_res);
2199   if (status != 0)
2200   {
2201     fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2202              addr, gai_strerror (status));
2203     return (-1);
2204   }
2205
2206   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2207   {
2208     int fd;
2209     listen_socket_t *temp;
2210     int one = 1;
2211
2212     temp = (listen_socket_t *) rrd_realloc (listen_fds,
2213         sizeof (listen_fds[0]) * (listen_fds_num + 1));
2214     if (temp == NULL)
2215     {
2216       fprintf (stderr,
2217                "rrdcached: open_listen_socket_network: realloc failed.\n");
2218       continue;
2219     }
2220     listen_fds = temp;
2221     memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2222
2223     fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2224     if (fd < 0)
2225     {
2226       fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2227                rrd_strerror(errno));
2228       continue;
2229     }
2230
2231     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2232
2233     status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2234     if (status != 0)
2235     {
2236       fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2237                sock->addr, rrd_strerror(errno));
2238       close (fd);
2239       continue;
2240     }
2241
2242     status = listen (fd, /* backlog = */ 10);
2243     if (status != 0)
2244     {
2245       fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2246                sock->addr, rrd_strerror(errno));
2247       close (fd);
2248       freeaddrinfo(ai_res);
2249       return (-1);
2250     }
2251
2252     listen_fds[listen_fds_num].fd = fd;
2253     listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2254     listen_fds_num++;
2255   } /* for (ai_ptr) */
2256
2257   freeaddrinfo(ai_res);
2258   return (0);
2259 } /* }}} static int open_listen_socket_network */
2260
2261 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2262 {
2263   assert(sock != NULL);
2264   assert(sock->addr != NULL);
2265
2266   if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2267       || sock->addr[0] == '/')
2268     return (open_listen_socket_unix(sock));
2269   else
2270     return (open_listen_socket_network(sock));
2271 } /* }}} int open_listen_socket */
2272
2273 static int close_listen_sockets (void) /* {{{ */
2274 {
2275   size_t i;
2276
2277   for (i = 0; i < listen_fds_num; i++)
2278   {
2279     close (listen_fds[i].fd);
2280
2281     if (listen_fds[i].family == PF_UNIX)
2282       unlink(listen_fds[i].addr);
2283   }
2284
2285   free (listen_fds);
2286   listen_fds = NULL;
2287   listen_fds_num = 0;
2288
2289   return (0);
2290 } /* }}} int close_listen_sockets */
2291
2292 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2293 {
2294   struct pollfd *pollfds;
2295   int pollfds_num;
2296   int status;
2297   int i;
2298
2299   if (listen_fds_num < 1)
2300   {
2301     RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2302     return (NULL);
2303   }
2304
2305   pollfds_num = listen_fds_num;
2306   pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2307   if (pollfds == NULL)
2308   {
2309     RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2310     return (NULL);
2311   }
2312   memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2313
2314   RRDD_LOG(LOG_INFO, "listening for connections");
2315
2316   while (do_shutdown == 0)
2317   {
2318     for (i = 0; i < pollfds_num; i++)
2319     {
2320       pollfds[i].fd = listen_fds[i].fd;
2321       pollfds[i].events = POLLIN | POLLPRI;
2322       pollfds[i].revents = 0;
2323     }
2324
2325     status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2326     if (do_shutdown)
2327       break;
2328     else if (status == 0) /* timeout */
2329       continue;
2330     else if (status < 0) /* error */
2331     {
2332       status = errno;
2333       if (status != EINTR)
2334       {
2335         RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2336       }
2337       continue;
2338     }
2339
2340     for (i = 0; i < pollfds_num; i++)
2341     {
2342       listen_socket_t *client_sock;
2343       struct sockaddr_storage client_sa;
2344       socklen_t client_sa_size;
2345       pthread_t tid;
2346       pthread_attr_t attr;
2347
2348       if (pollfds[i].revents == 0)
2349         continue;
2350
2351       if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2352       {
2353         RRDD_LOG (LOG_ERR, "listen_thread_main: "
2354             "poll(2) returned something unexpected for listen FD #%i.",
2355             pollfds[i].fd);
2356         continue;
2357       }
2358
2359       client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2360       if (client_sock == NULL)
2361       {
2362         RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2363         continue;
2364       }
2365       memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2366
2367       client_sa_size = sizeof (client_sa);
2368       client_sock->fd = accept (pollfds[i].fd,
2369           (struct sockaddr *) &client_sa, &client_sa_size);
2370       if (client_sock->fd < 0)
2371       {
2372         RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2373         free(client_sock);
2374         continue;
2375       }
2376
2377       pthread_attr_init (&attr);
2378       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2379
2380       status = pthread_create (&tid, &attr, connection_thread_main,
2381                                client_sock);
2382       if (status != 0)
2383       {
2384         RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2385         close_connection(client_sock);
2386         continue;
2387       }
2388     } /* for (pollfds_num) */
2389   } /* while (do_shutdown == 0) */
2390
2391   RRDD_LOG(LOG_INFO, "starting shutdown");
2392
2393   close_listen_sockets ();
2394
2395   pthread_mutex_lock (&connection_threads_lock);
2396   while (connection_threads_num > 0)
2397     pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
2398   pthread_mutex_unlock (&connection_threads_lock);
2399
2400   free(pollfds);
2401
2402   return (NULL);
2403 } /* }}} void *listen_thread_main */
2404
2405 static int daemonize (void) /* {{{ */
2406 {
2407   int pid_fd;
2408   char *base_dir;
2409
2410   daemon_uid = geteuid();
2411
2412   pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
2413   if (pid_fd < 0)
2414     pid_fd = check_pidfile();
2415   if (pid_fd < 0)
2416     return pid_fd;
2417
2418   /* open all the listen sockets */
2419   if (config_listen_address_list_len > 0)
2420   {
2421     for (size_t i = 0; i < config_listen_address_list_len; i++)
2422       open_listen_socket (config_listen_address_list[i]);
2423
2424     rrd_free_ptrs((void ***) &config_listen_address_list,
2425                   &config_listen_address_list_len);
2426   }
2427   else
2428   {
2429     listen_socket_t sock;
2430     memset(&sock, 0, sizeof(sock));
2431     strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr)-1);
2432     open_listen_socket (&sock);
2433   }
2434
2435   if (listen_fds_num < 1)
2436   {
2437     fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
2438     goto error;
2439   }
2440
2441   if (!stay_foreground)
2442   {
2443     pid_t child;
2444
2445     child = fork ();
2446     if (child < 0)
2447     {
2448       fprintf (stderr, "daemonize: fork(2) failed.\n");
2449       goto error;
2450     }
2451     else if (child > 0)
2452       exit(0);
2453
2454     /* Become session leader */
2455     setsid ();
2456
2457     /* Open the first three file descriptors to /dev/null */
2458     close (2);
2459     close (1);
2460     close (0);
2461
2462     open ("/dev/null", O_RDWR);
2463     if (dup(0) == -1 || dup(0) == -1){
2464         RRDD_LOG (LOG_ERR, "faild to run dup.\n");
2465     }
2466   } /* if (!stay_foreground) */
2467
2468   /* Change into the /tmp directory. */
2469   base_dir = (config_base_dir != NULL)
2470     ? config_base_dir
2471     : "/tmp";
2472
2473   if (chdir (base_dir) != 0)
2474   {
2475     fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2476     goto error;
2477   }
2478
2479   install_signal_handlers();
2480
2481   openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2482   RRDD_LOG(LOG_INFO, "starting up");
2483
2484   cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
2485                                 (GDestroyNotify) free_cache_item);
2486   if (cache_tree == NULL)
2487   {
2488     RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2489     goto error;
2490   }
2491
2492   return write_pidfile (pid_fd);
2493
2494 error:
2495   remove_pidfile();
2496   return -1;
2497 } /* }}} int daemonize */
2498
2499 static int cleanup (void) /* {{{ */
2500 {
2501   do_shutdown++;
2502
2503   pthread_cond_broadcast (&flush_cond);
2504   pthread_join (flush_thread, NULL);
2505
2506   pthread_cond_broadcast (&queue_cond);
2507   for (int i = 0; i < config_queue_threads; i++)
2508     pthread_join (queue_threads[i], NULL);
2509
2510   if (config_flush_at_shutdown)
2511   {
2512     assert(cache_queue_head == NULL);
2513     RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
2514   }
2515
2516   journal_done();
2517   remove_pidfile ();
2518
2519   free(queue_threads);
2520   free(config_base_dir);
2521   free(config_pid_file);
2522   free(journal_cur);
2523   free(journal_old);
2524
2525   pthread_mutex_lock(&cache_lock);
2526   g_tree_destroy(cache_tree);
2527
2528   RRDD_LOG(LOG_INFO, "goodbye");
2529   closelog ();
2530
2531   return (0);
2532 } /* }}} int cleanup */
2533
2534 static int read_options (int argc, char **argv) /* {{{ */
2535 {
2536   int option;
2537   int status = 0;
2538
2539   while ((option = getopt(argc, argv, "gl:L:f:w:z:t:Bb:p:Fj:h?")) != -1)
2540   {
2541     switch (option)
2542     {
2543       case 'g':
2544         stay_foreground=1;
2545         break;
2546
2547       case 'L':
2548       case 'l':
2549       {
2550         listen_socket_t *new;
2551
2552         new = malloc(sizeof(listen_socket_t));
2553         if (new == NULL)
2554         {
2555           fprintf(stderr, "read_options: malloc failed.\n");
2556           return(2);
2557         }
2558         memset(new, 0, sizeof(listen_socket_t));
2559
2560         strncpy(new->addr, optarg, sizeof(new->addr)-1);
2561         new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2562
2563         if (!rrd_add_ptr((void ***)&config_listen_address_list,
2564                          &config_listen_address_list_len, new))
2565         {
2566           fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
2567           return (2);
2568         }
2569       }
2570       break;
2571
2572       case 'f':
2573       {
2574         int temp;
2575
2576         temp = atoi (optarg);
2577         if (temp > 0)
2578           config_flush_interval = temp;
2579         else
2580         {
2581           fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2582           status = 3;
2583         }
2584       }
2585       break;
2586
2587       case 'w':
2588       {
2589         int temp;
2590
2591         temp = atoi (optarg);
2592         if (temp > 0)
2593           config_write_interval = temp;
2594         else
2595         {
2596           fprintf (stderr, "Invalid write interval: %s\n", optarg);
2597           status = 2;
2598         }
2599       }
2600       break;
2601
2602       case 'z':
2603       {
2604         int temp;
2605
2606         temp = atoi(optarg);
2607         if (temp > 0)
2608           config_write_jitter = temp;
2609         else
2610         {
2611           fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2612           status = 2;
2613         }
2614
2615         break;
2616       }
2617
2618       case 't':
2619       {
2620         int threads;
2621         threads = atoi(optarg);
2622         if (threads >= 1)
2623           config_queue_threads = threads;
2624         else
2625         {
2626           fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
2627           return 1;
2628         }
2629       }
2630       break;
2631
2632       case 'B':
2633         config_write_base_only = 1;
2634         break;
2635
2636       case 'b':
2637       {
2638         size_t len;
2639         char base_realpath[PATH_MAX];
2640
2641         if (config_base_dir != NULL)
2642           free (config_base_dir);
2643         config_base_dir = strdup (optarg);
2644         if (config_base_dir == NULL)
2645         {
2646           fprintf (stderr, "read_options: strdup failed.\n");
2647           return (3);
2648         }
2649
2650         /* make sure that the base directory is not resolved via
2651          * symbolic links.  this makes some performance-enhancing
2652          * assumptions possible (we don't have to resolve paths
2653          * that start with a "/")
2654          */
2655         if (realpath(config_base_dir, base_realpath) == NULL)
2656         {
2657           fprintf (stderr, "Invalid base directory '%s'.\n", config_base_dir);
2658           return 5;
2659         }
2660         else if (strncmp(config_base_dir,
2661                          base_realpath, sizeof(base_realpath)) != 0)
2662         {
2663           fprintf(stderr,
2664                   "Base directory (-b) resolved via file system links!\n"
2665                   "Please consult rrdcached '-b' documentation!\n"
2666                   "Consider specifying the real directory (%s)\n",
2667                   base_realpath);
2668           return 5;
2669         }
2670
2671         len = strlen (config_base_dir);
2672         while ((len > 0) && (config_base_dir[len - 1] == '/'))
2673         {
2674           config_base_dir[len - 1] = 0;
2675           len--;
2676         }
2677
2678         if (len < 1)
2679         {
2680           fprintf (stderr, "Invalid base directory: %s\n", optarg);
2681           return (4);
2682         }
2683
2684         _config_base_dir_len = len;
2685       }
2686       break;
2687
2688       case 'p':
2689       {
2690         if (config_pid_file != NULL)
2691           free (config_pid_file);
2692         config_pid_file = strdup (optarg);
2693         if (config_pid_file == NULL)
2694         {
2695           fprintf (stderr, "read_options: strdup failed.\n");
2696           return (3);
2697         }
2698       }
2699       break;
2700
2701       case 'F':
2702         config_flush_at_shutdown = 1;
2703         break;
2704
2705       case 'j':
2706       {
2707         struct stat statbuf;
2708         const char *dir = optarg;
2709
2710         status = stat(dir, &statbuf);
2711         if (status != 0)
2712         {
2713           fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2714           return 6;
2715         }
2716
2717         if (!S_ISDIR(statbuf.st_mode)
2718             || access(dir, R_OK|W_OK|X_OK) != 0)
2719         {
2720           fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2721                   errno ? rrd_strerror(errno) : "");
2722           return 6;
2723         }
2724
2725         journal_cur = malloc(PATH_MAX + 1);
2726         journal_old = malloc(PATH_MAX + 1);
2727         if (journal_cur == NULL || journal_old == NULL)
2728         {
2729           fprintf(stderr, "malloc failure for journal files\n");
2730           return 6;
2731         }
2732         else 
2733         {
2734           snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2735           snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2736         }
2737       }
2738       break;
2739
2740       case 'h':
2741       case '?':
2742         printf ("RRDCacheD %s  Copyright (C) 2008 Florian octo Forster\n"
2743             "\n"
2744             "Usage: rrdcached [options]\n"
2745             "\n"
2746             "Valid options are:\n"
2747             "  -l <address>  Socket address to listen to.\n"
2748             "  -L <address>  Socket address to listen to ('FLUSH' only).\n"
2749             "  -w <seconds>  Interval in which to write data.\n"
2750             "  -z <delay>    Delay writes up to <delay> seconds to spread load\n"
2751             "  -t <threads>  Number of write threads.\n"
2752             "  -f <seconds>  Interval in which to flush dead data.\n"
2753             "  -p <file>     Location of the PID-file.\n"
2754             "  -b <dir>      Base directory to change to.\n"
2755             "  -B            Restrict file access to paths within -b <dir>\n"
2756             "  -g            Do not fork and run in the foreground.\n"
2757             "  -j <dir>      Directory in which to create the journal files.\n"
2758             "  -F            Always flush all updates at shutdown\n"
2759             "\n"
2760             "For more information and a detailed description of all options "
2761             "please refer\n"
2762             "to the rrdcached(1) manual page.\n",
2763             VERSION);
2764         status = -1;
2765         break;
2766     } /* switch (option) */
2767   } /* while (getopt) */
2768
2769   /* advise the user when values are not sane */
2770   if (config_flush_interval < 2 * config_write_interval)
2771     fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2772             " 2x write interval (-w) !\n");
2773   if (config_write_jitter > config_write_interval)
2774     fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2775             " write interval (-w) !\n");
2776
2777   if (config_write_base_only && config_base_dir == NULL)
2778     fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2779             "  Consult the rrdcached documentation\n");
2780
2781   if (journal_cur == NULL)
2782     config_flush_at_shutdown = 1;
2783
2784   return (status);
2785 } /* }}} int read_options */
2786
2787 int main (int argc, char **argv)
2788 {
2789   int status;
2790
2791   status = read_options (argc, argv);
2792   if (status != 0)
2793   {
2794     if (status < 0)
2795       status = 0;
2796     return (status);
2797   }
2798
2799   status = daemonize ();
2800   if (status != 0)
2801   {
2802     fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
2803     return (1);
2804   }
2805
2806   journal_init();
2807
2808   /* start the queue threads */
2809   queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
2810   if (queue_threads == NULL)
2811   {
2812     RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
2813     cleanup();
2814     return (1);
2815   }
2816   for (int i = 0; i < config_queue_threads; i++)
2817   {
2818     memset (&queue_threads[i], 0, sizeof (*queue_threads));
2819     status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
2820     if (status != 0)
2821     {
2822       RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2823       cleanup();
2824       return (1);
2825     }
2826   }
2827
2828   /* start the flush thread */
2829   memset(&flush_thread, 0, sizeof(flush_thread));
2830   status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
2831   if (status != 0)
2832   {
2833     RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
2834     cleanup();
2835     return (1);
2836   }
2837
2838   listen_thread_main (NULL);
2839   cleanup ();
2840
2841   return (0);
2842 } /* int main */
2843
2844 /*
2845  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
2846  */