2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
87 #include <sys/socket.h>
95 #include <sys/types.h>
107 #include <sys/time.h>
114 #endif /* HAVE_LIBWRAP */
116 #include <glib-2.0/glib.h>
119 #define RRDD_LOG(severity, ...) \
121 if (stay_foreground) { \
122 fprintf(stderr, __VA_ARGS__); \
123 fprintf(stderr, "\n"); } \
124 syslog ((severity), __VA_ARGS__); \
130 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
132 struct listen_socket_s
135 char addr[PATH_MAX + 1];
138 /* state for BATCH processing */
150 uint32_t permissions;
153 mode_t socket_permissions;
155 typedef struct listen_socket_s listen_socket_t;
158 typedef struct command_s command_t;
159 /* note: guard against "unused" warnings in the handlers */
160 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
162 char UNUSED(*buffer),\
163 size_t UNUSED(buffer_size)
165 #define HANDLER_PROTO command_t UNUSED(*cmd),\
170 int (*handler)(HANDLER_PROTO);
172 char context; /* where we expect to see it */
173 #define CMD_CONTEXT_CLIENT (1<<0)
174 #define CMD_CONTEXT_BATCH (1<<1)
175 #define CMD_CONTEXT_JOURNAL (1<<2)
176 #define CMD_CONTEXT_ANY (0x7f)
183 typedef struct cache_item_s cache_item_t;
188 size_t values_num; /* number of valid pointers */
189 size_t values_alloc; /* number of allocated pointers */
190 time_t last_flush_time;
191 time_t last_update_stamp;
192 #define CI_FLAGS_IN_TREE (1<<0)
193 #define CI_FLAGS_IN_QUEUE (1<<1)
195 pthread_cond_t flushed;
200 struct callback_flush_data_s
207 typedef struct callback_flush_data_s callback_flush_data_t;
214 typedef enum queue_side_e queue_side_t;
216 /* describe a set of journal files */
222 /* max length of socket command or response */
224 #define RBUF_SIZE (CMD_MAX*2)
229 static int stay_foreground = 0;
230 static uid_t daemon_uid;
232 static listen_socket_t *listen_fds = NULL;
233 static size_t listen_fds_num = 0;
235 static listen_socket_t default_socket;
238 RUNNING, /* normal operation */
239 FLUSHING, /* flushing remaining values */
240 SHUTDOWN /* shutting down */
243 static pthread_t *queue_threads;
244 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
245 static int config_queue_threads = 4;
247 static pthread_t flush_thread;
248 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
250 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
251 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
252 static int connection_threads_num = 0;
255 static GTree *cache_tree = NULL;
256 static cache_item_t *cache_queue_head = NULL;
257 static cache_item_t *cache_queue_tail = NULL;
258 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
260 static int config_write_interval = 300;
261 static int config_write_jitter = 0;
262 static int config_flush_interval = 3600;
263 static int config_flush_at_shutdown = 0;
264 static char *config_pid_file = NULL;
265 static char *config_base_dir = NULL;
266 static size_t _config_base_dir_len = 0;
267 static int config_write_base_only = 0;
268 static size_t config_alloc_chunk = 1;
270 static listen_socket_t **config_listen_address_list = NULL;
271 static size_t config_listen_address_list_len = 0;
273 static uint64_t stats_queue_length = 0;
274 static uint64_t stats_updates_received = 0;
275 static uint64_t stats_flush_received = 0;
276 static uint64_t stats_updates_written = 0;
277 static uint64_t stats_data_sets_written = 0;
278 static uint64_t stats_journal_bytes = 0;
279 static uint64_t stats_journal_rotate = 0;
280 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
282 static int opt_no_overwrite = 0; /* default for the daemon */
284 /* Journaled updates */
285 #define JOURNAL_REPLAY(s) ((s) == NULL)
286 #define JOURNAL_BASE "rrd.journal"
287 static journal_set *journal_cur = NULL;
288 static journal_set *journal_old = NULL;
289 static char *journal_dir = NULL;
290 static FILE *journal_fh = NULL; /* current journal file handle */
291 static long journal_size = 0; /* current journal size */
292 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
293 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
294 static int journal_write(char *cmd, char *args);
295 static void journal_done(void);
296 static void journal_rotate(void);
298 /* prototypes for forward refernces */
299 static int handle_request_help (HANDLER_PROTO);
304 static void sig_common (const char *sig) /* {{{ */
306 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
308 pthread_cond_broadcast(&flush_cond);
309 pthread_cond_broadcast(&queue_cond);
310 } /* }}} void sig_common */
312 static void sig_int_handler (int UNUSED(s)) /* {{{ */
315 } /* }}} void sig_int_handler */
317 static void sig_term_handler (int UNUSED(s)) /* {{{ */
320 } /* }}} void sig_term_handler */
322 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
324 config_flush_at_shutdown = 1;
326 } /* }}} void sig_usr1_handler */
328 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
330 config_flush_at_shutdown = 0;
332 } /* }}} void sig_usr2_handler */
334 static void install_signal_handlers(void) /* {{{ */
336 /* These structures are static, because `sigaction' behaves weird if the are
338 static struct sigaction sa_int;
339 static struct sigaction sa_term;
340 static struct sigaction sa_pipe;
341 static struct sigaction sa_usr1;
342 static struct sigaction sa_usr2;
344 /* Install signal handlers */
345 memset (&sa_int, 0, sizeof (sa_int));
346 sa_int.sa_handler = sig_int_handler;
347 sigaction (SIGINT, &sa_int, NULL);
349 memset (&sa_term, 0, sizeof (sa_term));
350 sa_term.sa_handler = sig_term_handler;
351 sigaction (SIGTERM, &sa_term, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_pipe));
354 sa_pipe.sa_handler = SIG_IGN;
355 sigaction (SIGPIPE, &sa_pipe, NULL);
357 memset (&sa_pipe, 0, sizeof (sa_usr1));
358 sa_usr1.sa_handler = sig_usr1_handler;
359 sigaction (SIGUSR1, &sa_usr1, NULL);
361 memset (&sa_usr2, 0, sizeof (sa_usr2));
362 sa_usr2.sa_handler = sig_usr2_handler;
363 sigaction (SIGUSR2, &sa_usr2, NULL);
365 } /* }}} void install_signal_handlers */
367 static int open_pidfile(char *action, int oflag) /* {{{ */
371 char *file_copy, *dir;
373 file = (config_pid_file != NULL)
375 : LOCALSTATEDIR "/run/rrdcached.pid";
377 /* dirname may modify its argument */
378 file_copy = strdup(file);
379 if (file_copy == NULL)
381 fprintf(stderr, "rrdcached: strdup(): %s\n",
382 rrd_strerror(errno));
386 dir = dirname(file_copy);
387 if (rrd_mkdir_p(dir, 0777) != 0)
389 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
390 dir, rrd_strerror(errno));
396 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
398 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
399 action, file, rrd_strerror(errno));
402 } /* }}} static int open_pidfile */
404 /* check existing pid file to see whether a daemon is running */
405 static int check_pidfile(void)
411 pid_fd = open_pidfile("open", O_RDWR);
415 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
422 /* another running process that we can signal COULD be
423 * a competing rrdcached */
424 if (pid != getpid() && kill(pid, 0) == 0)
427 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
432 lseek(pid_fd, 0, SEEK_SET);
433 if (ftruncate(pid_fd, 0) == -1)
436 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
442 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
443 "rrdcached: starting normally.\n", pid);
446 } /* }}} static int check_pidfile */
448 static int write_pidfile (int fd) /* {{{ */
455 fh = fdopen (fd, "w");
458 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
463 fprintf (fh, "%i\n", (int) pid);
467 } /* }}} int write_pidfile */
469 static int remove_pidfile (void) /* {{{ */
474 file = (config_pid_file != NULL)
476 : LOCALSTATEDIR "/run/rrdcached.pid";
478 status = unlink (file);
482 } /* }}} int remove_pidfile */
484 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
488 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
489 sock->next_read - sock->next_cmd);
493 /* no commands left, move remainder back to front of rbuf */
494 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
495 sock->next_read - sock->next_cmd);
496 sock->next_read -= sock->next_cmd;
503 char *cmd = sock->rbuf + sock->next_cmd;
506 sock->next_cmd = eol - sock->rbuf + 1;
508 if (eol > sock->rbuf && *(eol-1) == '\r')
509 *(--eol) = '\0'; /* handle "\r\n" EOL */
518 } /* }}} char *next_cmd */
520 /* add the characters directly to the write buffer */
521 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
525 assert(sock != NULL);
527 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
530 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
534 strncpy(new_buf + sock->wbuf_len, str, len + 1);
536 sock->wbuf = new_buf;
537 sock->wbuf_len += len;
540 } /* }}} static int add_to_wbuf */
542 /* add the text to the "extra" info that's sent after the status line */
543 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
546 char buffer[CMD_MAX];
549 if (JOURNAL_REPLAY(sock)) return 0;
550 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
553 #ifdef HAVE_VSNPRINTF
554 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
556 len = vsprintf(buffer, fmt, argp);
561 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
565 return add_to_wbuf(sock, buffer, len);
566 } /* }}} static int add_response_info */
568 static int count_lines(char *str) /* {{{ */
574 while ((str = strchr(str, '\n')) != NULL)
582 } /* }}} static int count_lines */
584 /* send the response back to the user.
585 * returns 0 on success, -1 on error
586 * write buffer is always zeroed after this call */
587 static int send_response (listen_socket_t *sock, response_code rc,
588 char *fmt, ...) /* {{{ */
591 char buffer[CMD_MAX];
596 if (JOURNAL_REPLAY(sock)) return rc;
598 if (sock->batch_start)
601 return rc; /* no response on success during BATCH */
602 lines = sock->batch_cmd;
604 else if (rc == RESP_OK)
605 lines = count_lines(sock->wbuf);
609 rclen = sprintf(buffer, "%d ", lines);
611 #ifdef HAVE_VSNPRINTF
612 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
614 len = vsprintf(buffer+rclen, fmt, argp);
622 /* append the result to the wbuf, don't write to the user */
623 if (sock->batch_start)
624 return add_to_wbuf(sock, buffer, len);
626 /* first write must be complete */
627 if (len != write(sock->fd, buffer, len))
629 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
633 if (sock->wbuf != NULL && rc == RESP_OK)
636 while (wrote < sock->wbuf_len)
638 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
641 RRDD_LOG(LOG_INFO, "send_response: could not write results");
648 free(sock->wbuf); sock->wbuf = NULL;
654 static void wipe_ci_values(cache_item_t *ci, time_t when)
658 ci->values_alloc = 0;
660 ci->last_flush_time = when;
661 if (config_write_jitter > 0)
662 ci->last_flush_time += (rrd_random() % config_write_jitter);
666 * remove a "cache_item_t" item from the queue.
667 * must hold 'cache_lock' when calling this
669 static void remove_from_queue(cache_item_t *ci) /* {{{ */
671 if (ci == NULL) return;
672 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
674 if (ci->prev == NULL)
675 cache_queue_head = ci->next; /* reset head */
677 ci->prev->next = ci->next;
679 if (ci->next == NULL)
680 cache_queue_tail = ci->prev; /* reset the tail */
682 ci->next->prev = ci->prev;
684 ci->next = ci->prev = NULL;
685 ci->flags &= ~CI_FLAGS_IN_QUEUE;
687 pthread_mutex_lock (&stats_lock);
688 assert (stats_queue_length > 0);
689 stats_queue_length--;
690 pthread_mutex_unlock (&stats_lock);
692 } /* }}} static void remove_from_queue */
694 /* free the resources associated with the cache_item_t
695 * must hold cache_lock when calling this function
697 static void *free_cache_item(cache_item_t *ci) /* {{{ */
699 if (ci == NULL) return NULL;
701 remove_from_queue(ci);
703 for (size_t i=0; i < ci->values_num; i++)
709 /* in case anyone is waiting */
710 pthread_cond_broadcast(&ci->flushed);
711 pthread_cond_destroy(&ci->flushed);
716 } /* }}} static void *free_cache_item */
719 * enqueue_cache_item:
720 * `cache_lock' must be acquired before calling this function!
722 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
728 if (ci->values_num == 0)
733 if (cache_queue_head == ci)
736 /* remove if further down in queue */
737 remove_from_queue(ci);
740 ci->next = cache_queue_head;
741 if (ci->next != NULL)
743 cache_queue_head = ci;
745 if (cache_queue_tail == NULL)
746 cache_queue_tail = cache_queue_head;
748 else /* (side == TAIL) */
750 /* We don't move values back in the list.. */
751 if (ci->flags & CI_FLAGS_IN_QUEUE)
754 assert (ci->next == NULL);
755 assert (ci->prev == NULL);
757 ci->prev = cache_queue_tail;
759 if (cache_queue_tail == NULL)
760 cache_queue_head = ci;
762 cache_queue_tail->next = ci;
764 cache_queue_tail = ci;
767 ci->flags |= CI_FLAGS_IN_QUEUE;
769 pthread_cond_signal(&queue_cond);
770 pthread_mutex_lock (&stats_lock);
771 stats_queue_length++;
772 pthread_mutex_unlock (&stats_lock);
775 } /* }}} int enqueue_cache_item */
778 * tree_callback_flush:
779 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
780 * while this is in progress.
782 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
786 callback_flush_data_t *cfd;
788 ci = (cache_item_t *) value;
789 cfd = (callback_flush_data_t *) data;
791 if (ci->flags & CI_FLAGS_IN_QUEUE)
794 if (ci->values_num > 0
795 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
797 enqueue_cache_item (ci, TAIL);
799 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
800 && (ci->values_num <= 0))
802 assert ((char *) key == ci->file);
803 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
805 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
811 } /* }}} gboolean tree_callback_flush */
813 static int flush_old_values (int max_age)
815 callback_flush_data_t cfd;
818 memset (&cfd, 0, sizeof (cfd));
819 /* Pass the current time as user data so that we don't need to call
820 * `time' for each node. */
821 cfd.now = time (NULL);
826 cfd.abs_timeout = cfd.now - max_age;
828 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
830 /* `tree_callback_flush' will return the keys of all values that haven't
831 * been touched in the last `config_flush_interval' seconds in `cfd'.
832 * The char*'s in this array point to the same memory as ci->file, so we
833 * don't need to free them separately. */
834 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
836 for (k = 0; k < cfd.keys_num; k++)
838 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
839 /* should never fail, since we have held the cache_lock
841 assert(status == TRUE);
844 if (cfd.keys != NULL)
851 } /* int flush_old_values */
853 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
856 struct timespec next_flush;
859 gettimeofday (&now, NULL);
860 next_flush.tv_sec = now.tv_sec + config_flush_interval;
861 next_flush.tv_nsec = 1000 * now.tv_usec;
863 pthread_mutex_lock(&cache_lock);
865 while (state == RUNNING)
867 gettimeofday (&now, NULL);
868 if ((now.tv_sec > next_flush.tv_sec)
869 || ((now.tv_sec == next_flush.tv_sec)
870 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
872 RRDD_LOG(LOG_DEBUG, "flushing old values");
874 /* Determine the time of the next cache flush. */
875 next_flush.tv_sec = now.tv_sec + config_flush_interval;
877 /* Flush all values that haven't been written in the last
878 * `config_write_interval' seconds. */
879 flush_old_values (config_write_interval);
881 /* unlock the cache while we rotate so we don't block incoming
882 * updates if the fsync() blocks on disk I/O */
883 pthread_mutex_unlock(&cache_lock);
885 pthread_mutex_lock(&cache_lock);
888 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
889 if (status != 0 && status != ETIMEDOUT)
891 RRDD_LOG (LOG_ERR, "flush_thread_main: "
892 "pthread_cond_timedwait returned %i.", status);
896 if (config_flush_at_shutdown)
897 flush_old_values (-1); /* flush everything */
901 pthread_mutex_unlock(&cache_lock);
904 } /* void *flush_thread_main */
906 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
908 pthread_mutex_lock (&cache_lock);
910 while (state != SHUTDOWN
911 || (cache_queue_head != NULL && config_flush_at_shutdown))
919 /* Now, check if there's something to store away. If not, wait until
920 * something comes in. */
921 if (cache_queue_head == NULL)
923 status = pthread_cond_wait (&queue_cond, &cache_lock);
924 if ((status != 0) && (status != ETIMEDOUT))
926 RRDD_LOG (LOG_ERR, "queue_thread_main: "
927 "pthread_cond_wait returned %i.", status);
931 /* Check if a value has arrived. This may be NULL if we timed out or there
932 * was an interrupt such as a signal. */
933 if (cache_queue_head == NULL)
936 ci = cache_queue_head;
938 /* copy the relevant parts */
939 file = strdup (ci->file);
942 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
946 assert(ci->values != NULL);
947 assert(ci->values_num > 0);
950 values_num = ci->values_num;
952 wipe_ci_values(ci, time(NULL));
953 remove_from_queue(ci);
955 pthread_mutex_unlock (&cache_lock);
958 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
961 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
962 "rrd_update_r (%s) failed with status %i. (%s)",
963 file, status, rrd_get_error());
966 journal_write("wrote", file);
968 /* Search again in the tree. It's possible someone issued a "FORGET"
969 * while we were writing the update values. */
970 pthread_mutex_lock(&cache_lock);
971 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
973 pthread_cond_broadcast(&ci->flushed);
974 pthread_mutex_unlock(&cache_lock);
978 pthread_mutex_lock (&stats_lock);
979 stats_updates_written++;
980 stats_data_sets_written += values_num;
981 pthread_mutex_unlock (&stats_lock);
984 rrd_free_ptrs((void ***) &values, &values_num);
987 pthread_mutex_lock (&cache_lock);
989 pthread_mutex_unlock (&cache_lock);
992 } /* }}} void *queue_thread_main */
994 static int buffer_get_field (char **buffer_ret, /* {{{ */
995 size_t *buffer_size_ret, char **field_ret)
1004 buffer = *buffer_ret;
1006 buffer_size = *buffer_size_ret;
1007 field = *buffer_ret;
1010 if (buffer_size <= 0)
1013 /* This is ensured by `handle_request'. */
1014 assert (buffer[buffer_size - 1] == '\0');
1017 while (buffer_pos < buffer_size)
1019 /* Check for end-of-field or end-of-buffer */
1020 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1022 field[field_size] = 0;
1028 /* Handle escaped characters. */
1029 else if (buffer[buffer_pos] == '\\')
1031 if (buffer_pos >= (buffer_size - 1))
1034 field[field_size] = buffer[buffer_pos];
1038 /* Normal operation */
1041 field[field_size] = buffer[buffer_pos];
1045 } /* while (buffer_pos < buffer_size) */
1050 *buffer_ret = buffer + buffer_pos;
1051 *buffer_size_ret = buffer_size - buffer_pos;
1055 } /* }}} int buffer_get_field */
1057 /* if we're restricting writes to the base directory,
1058 * check whether the file falls within the dir
1059 * returns 1 if OK, otherwise 0
1061 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1063 assert(file != NULL);
1065 if (!config_write_base_only
1066 || JOURNAL_REPLAY(sock)
1067 || config_base_dir == NULL)
1070 if (strstr(file, "../") != NULL) goto err;
1072 /* relative paths without "../" are ok */
1073 if (*file != '/') return 1;
1075 /* file must be of the format base + "/" + <1+ char filename> */
1076 if (strlen(file) < _config_base_dir_len + 2) goto err;
1077 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1078 if (*(file + _config_base_dir_len) != '/') goto err;
1083 if (sock != NULL && sock->fd >= 0)
1084 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1087 } /* }}} static int check_file_access */
1089 /* when using a base dir, convert relative paths to absolute paths.
1090 * if necessary, modifies the "filename" pointer to point
1091 * to the new path created in "tmp". "tmp" is provided
1092 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1094 * this allows us to optimize for the expected case (absolute path)
1097 static void get_abs_path(char **filename, char *tmp)
1099 assert(tmp != NULL);
1100 assert(filename != NULL && *filename != NULL);
1102 if (config_base_dir == NULL || **filename == '/')
1105 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1107 } /* }}} static int get_abs_path */
1109 static int flush_file (const char *filename) /* {{{ */
1113 pthread_mutex_lock (&cache_lock);
1115 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1118 pthread_mutex_unlock (&cache_lock);
1122 if (ci->values_num > 0)
1124 /* Enqueue at head */
1125 enqueue_cache_item (ci, HEAD);
1126 pthread_cond_wait(&ci->flushed, &cache_lock);
1129 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1130 * may have been purged during our cond_wait() */
1132 pthread_mutex_unlock(&cache_lock);
1135 } /* }}} int flush_file */
1137 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1139 char *err = "Syntax error.\n";
1141 if (cmd && cmd->syntax)
1144 return send_response(sock, RESP_ERR, "Usage: %s", err);
1145 } /* }}} static int syntax_error() */
1147 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1149 uint64_t copy_queue_length;
1150 uint64_t copy_updates_received;
1151 uint64_t copy_flush_received;
1152 uint64_t copy_updates_written;
1153 uint64_t copy_data_sets_written;
1154 uint64_t copy_journal_bytes;
1155 uint64_t copy_journal_rotate;
1157 uint64_t tree_nodes_number;
1158 uint64_t tree_depth;
1160 pthread_mutex_lock (&stats_lock);
1161 copy_queue_length = stats_queue_length;
1162 copy_updates_received = stats_updates_received;
1163 copy_flush_received = stats_flush_received;
1164 copy_updates_written = stats_updates_written;
1165 copy_data_sets_written = stats_data_sets_written;
1166 copy_journal_bytes = stats_journal_bytes;
1167 copy_journal_rotate = stats_journal_rotate;
1168 pthread_mutex_unlock (&stats_lock);
1170 pthread_mutex_lock (&cache_lock);
1171 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1172 tree_depth = (uint64_t) g_tree_height (cache_tree);
1173 pthread_mutex_unlock (&cache_lock);
1175 add_response_info(sock,
1176 "QueueLength: %"PRIu64"\n", copy_queue_length);
1177 add_response_info(sock,
1178 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1179 add_response_info(sock,
1180 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1181 add_response_info(sock,
1182 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1183 add_response_info(sock,
1184 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1185 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1186 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1187 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1188 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1190 send_response(sock, RESP_OK, "Statistics follow\n");
1193 } /* }}} int handle_request_stats */
1195 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1197 char *file, file_tmp[PATH_MAX];
1200 status = buffer_get_field (&buffer, &buffer_size, &file);
1203 return syntax_error(sock,cmd);
1207 pthread_mutex_lock(&stats_lock);
1208 stats_flush_received++;
1209 pthread_mutex_unlock(&stats_lock);
1211 get_abs_path(&file, file_tmp);
1212 if (!check_file_access(file, sock)) return 0;
1214 status = flush_file (file);
1216 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1217 else if (status == ENOENT)
1219 /* no file in our tree; see whether it exists at all */
1220 struct stat statbuf;
1222 memset(&statbuf, 0, sizeof(statbuf));
1223 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1224 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1226 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1228 else if (status < 0)
1229 return send_response(sock, RESP_ERR, "Internal error.\n");
1231 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1236 } /* }}} int handle_request_flush */
1238 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1240 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1242 pthread_mutex_lock(&cache_lock);
1243 flush_old_values(-1);
1244 pthread_mutex_unlock(&cache_lock);
1246 return send_response(sock, RESP_OK, "Started flush.\n");
1247 } /* }}} static int handle_request_flushall */
1249 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1252 char *file, file_tmp[PATH_MAX];
1255 status = buffer_get_field(&buffer, &buffer_size, &file);
1257 return syntax_error(sock,cmd);
1259 get_abs_path(&file, file_tmp);
1261 pthread_mutex_lock(&cache_lock);
1262 ci = g_tree_lookup(cache_tree, file);
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1269 for (size_t i=0; i < ci->values_num; i++)
1270 add_response_info(sock, "%s\n", ci->values[i]);
1272 pthread_mutex_unlock(&cache_lock);
1273 return send_response(sock, RESP_OK, "updates pending\n");
1274 } /* }}} static int handle_request_pending */
1276 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1280 char *file, file_tmp[PATH_MAX];
1282 status = buffer_get_field(&buffer, &buffer_size, &file);
1284 return syntax_error(sock,cmd);
1286 get_abs_path(&file, file_tmp);
1287 if (!check_file_access(file, sock)) return 0;
1289 pthread_mutex_lock(&cache_lock);
1290 found = g_tree_remove(cache_tree, file);
1291 pthread_mutex_unlock(&cache_lock);
1295 if (!JOURNAL_REPLAY(sock))
1296 journal_write("forget", file);
1298 return send_response(sock, RESP_OK, "Gone!\n");
1301 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1305 } /* }}} static int handle_request_forget */
1307 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1311 pthread_mutex_lock(&cache_lock);
1313 ci = cache_queue_head;
1316 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1320 pthread_mutex_unlock(&cache_lock);
1322 return send_response(sock, RESP_OK, "in queue.\n");
1323 } /* }}} int handle_request_queue */
1325 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1327 char *file, file_tmp[PATH_MAX];
1330 char orig_buf[CMD_MAX];
1334 /* save it for the journal later */
1335 if (!JOURNAL_REPLAY(sock))
1336 strncpy(orig_buf, buffer, buffer_size);
1338 status = buffer_get_field (&buffer, &buffer_size, &file);
1340 return syntax_error(sock,cmd);
1342 pthread_mutex_lock(&stats_lock);
1343 stats_updates_received++;
1344 pthread_mutex_unlock(&stats_lock);
1346 get_abs_path(&file, file_tmp);
1347 if (!check_file_access(file, sock)) return 0;
1349 pthread_mutex_lock (&cache_lock);
1350 ci = g_tree_lookup (cache_tree, file);
1352 if (ci == NULL) /* {{{ */
1354 struct stat statbuf;
1357 /* don't hold the lock while we setup; stat(2) might block */
1358 pthread_mutex_unlock(&cache_lock);
1360 memset (&statbuf, 0, sizeof (statbuf));
1361 status = stat (file, &statbuf);
1364 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1367 if (status == ENOENT)
1368 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1370 return send_response(sock, RESP_ERR,
1371 "stat failed with error %i.\n", status);
1373 if (!S_ISREG (statbuf.st_mode))
1374 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1376 if (access(file, R_OK|W_OK) != 0)
1377 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1378 file, rrd_strerror(errno));
1380 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1383 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1385 return send_response(sock, RESP_ERR, "malloc failed.\n");
1387 memset (ci, 0, sizeof (cache_item_t));
1389 ci->file = strdup (file);
1390 if (ci->file == NULL)
1393 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1395 return send_response(sock, RESP_ERR, "strdup failed.\n");
1398 wipe_ci_values(ci, now);
1399 ci->flags = CI_FLAGS_IN_TREE;
1400 pthread_cond_init(&ci->flushed, NULL);
1402 pthread_mutex_lock(&cache_lock);
1404 /* another UPDATE might have added this entry in the meantime */
1405 tmp = g_tree_lookup (cache_tree, file);
1407 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1410 free_cache_item (ci);
1414 /* state may have changed while we were unlocked */
1415 if (state == SHUTDOWN)
1418 assert (ci != NULL);
1420 /* don't re-write updates in replay mode */
1421 if (!JOURNAL_REPLAY(sock))
1422 journal_write("update", orig_buf);
1424 while (buffer_size > 0)
1430 status = buffer_get_field (&buffer, &buffer_size, &value);
1433 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1437 /* make sure update time is always moving forward */
1438 stamp = strtol(value, &eostamp, 10);
1439 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "Cannot find timestamp in '%s'!\n", value);
1445 else if (stamp <= ci->last_update_stamp)
1447 pthread_mutex_unlock(&cache_lock);
1448 return send_response(sock, RESP_ERR,
1449 "illegal attempt to update using time %ld when last"
1450 " update time is %ld (minimum one second step)\n",
1451 stamp, ci->last_update_stamp);
1454 ci->last_update_stamp = stamp;
1456 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1457 &ci->values_alloc, config_alloc_chunk))
1459 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1466 if (((now - ci->last_flush_time) >= config_write_interval)
1467 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1468 && (ci->values_num > 0))
1470 enqueue_cache_item (ci, TAIL);
1473 pthread_mutex_unlock (&cache_lock);
1476 return send_response(sock, RESP_ERR, "No values updated.\n");
1478 return send_response(sock, RESP_OK,
1479 "errors, enqueued %i value(s).\n", values_num);
1484 } /* }}} int handle_request_update */
1486 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1488 char *file, file_tmp[PATH_MAX];
1497 unsigned long ds_cnt;
1504 rrd_value_t *data_ptr;
1511 /* Read the arguments */
1514 status = buffer_get_field (&buffer, &buffer_size, &file);
1518 status = buffer_get_field (&buffer, &buffer_size, &cf);
1522 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1530 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1540 return (syntax_error(sock,cmd));
1542 get_abs_path(&file, file_tmp);
1543 if (!check_file_access(file, sock)) return 0;
1545 status = flush_file (file);
1546 if ((status != 0) && (status != ENOENT))
1547 return (send_response (sock, RESP_ERR,
1548 "flush_file (%s) failed with status %i.\n", file, status));
1550 t = time (NULL); /* "now" */
1552 /* Parse start time */
1553 if (start_str != NULL)
1560 value = strtol (start_str, &endptr, /* base = */ 0);
1561 if ((endptr == start_str) || (errno != 0))
1562 return (send_response(sock, RESP_ERR,
1563 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1567 start_tm = (time_t) value;
1569 start_tm = (time_t) (t + value);
1573 start_tm = t - 86400;
1576 /* Parse end time */
1577 if (end_str != NULL)
1584 value = strtol (end_str, &endptr, /* base = */ 0);
1585 if ((endptr == end_str) || (errno != 0))
1586 return (send_response(sock, RESP_ERR,
1587 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1591 end_tm = (time_t) value;
1593 end_tm = (time_t) (t + value);
1605 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1606 &ds_cnt, &ds_namv, &data);
1608 return (send_response(sock, RESP_ERR,
1609 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1611 add_response_info (sock, "FlushVersion: %lu\n", 1);
1612 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1613 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1614 add_response_info (sock, "Step: %lu\n", step);
1615 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1617 #define SSTRCAT(buffer,str,buffer_fill) do { \
1618 size_t str_len = strlen (str); \
1619 if ((buffer_fill + str_len) > sizeof (buffer)) \
1620 str_len = sizeof (buffer) - buffer_fill; \
1621 if (str_len > 0) { \
1622 strncpy (buffer + buffer_fill, str, str_len); \
1623 buffer_fill += str_len; \
1624 assert (buffer_fill <= sizeof (buffer)); \
1625 if (buffer_fill == sizeof (buffer)) \
1626 buffer[buffer_fill - 1] = 0; \
1628 buffer[buffer_fill] = 0; \
1632 { /* Add list of DS names */
1634 size_t linebuf_fill;
1636 memset (linebuf, 0, sizeof (linebuf));
1638 for (i = 0; i < ds_cnt; i++)
1641 SSTRCAT (linebuf, " ", linebuf_fill);
1642 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1643 rrd_freemem(ds_namv[i]);
1645 rrd_freemem(ds_namv);
1646 add_response_info (sock, "DSName: %s\n", linebuf);
1649 /* Add the actual data */
1652 for (t = start_tm + step; t <= end_tm; t += step)
1655 size_t linebuf_fill;
1658 memset (linebuf, 0, sizeof (linebuf));
1660 for (i = 0; i < ds_cnt; i++)
1662 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1663 tmp[sizeof (tmp) - 1] = 0;
1664 SSTRCAT (linebuf, tmp, linebuf_fill);
1669 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1673 return (send_response (sock, RESP_OK, "Success\n"));
1675 } /* }}} int handle_request_fetch */
1677 /* we came across a "WROTE" entry during journal replay.
1678 * throw away any values that we have accumulated for this file
1680 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1683 const char *file = buffer;
1685 pthread_mutex_lock(&cache_lock);
1687 ci = g_tree_lookup(cache_tree, file);
1690 pthread_mutex_unlock(&cache_lock);
1695 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1697 wipe_ci_values(ci, now);
1698 remove_from_queue(ci);
1700 pthread_mutex_unlock(&cache_lock);
1702 } /* }}} int handle_request_wrote */
1704 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1706 char *file, file_tmp[PATH_MAX];
1710 /* obtain filename */
1711 status = buffer_get_field(&buffer, &buffer_size, &file);
1713 return syntax_error(sock,cmd);
1714 /* get full pathname */
1715 get_abs_path(&file, file_tmp);
1716 if (!check_file_access(file, sock)) {
1717 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1721 info = rrd_info_r(file);
1723 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1725 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1726 switch (data->type) {
1728 if (isnan(data->value.u_val))
1729 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1731 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1734 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1737 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1740 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1743 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1748 rrd_info_free(info);
1750 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1751 } /* }}} static int handle_request_info */
1753 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1755 char *i, *file, file_tmp[PATH_MAX];
1760 /* obtain filename */
1761 status = buffer_get_field(&buffer, &buffer_size, &file);
1763 return syntax_error(sock,cmd);
1764 /* get full pathname */
1765 get_abs_path(&file, file_tmp);
1766 if (!check_file_access(file, sock)) {
1767 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1770 status = buffer_get_field(&buffer, &buffer_size, &i);
1772 return syntax_error(sock,cmd);
1775 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1780 t = rrd_first_r(file,idx);
1782 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1784 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1785 } /* }}} static int handle_request_first */
1788 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1790 char *file, file_tmp[PATH_MAX];
1792 time_t t, from_file, step;
1793 rrd_file_t * rrd_file;
1797 /* obtain filename */
1798 status = buffer_get_field(&buffer, &buffer_size, &file);
1800 return syntax_error(sock,cmd);
1801 /* get full pathname */
1802 get_abs_path(&file, file_tmp);
1803 if (!check_file_access(file, sock)) {
1804 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1808 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1810 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1812 from_file = rrd.live_head->last_up;
1813 step = rrd.stat_head->pdp_step;
1814 rrd_close(rrd_file);
1815 pthread_mutex_lock(&cache_lock);
1816 ci = g_tree_lookup(cache_tree, file);
1818 t = ci->last_update_stamp;
1821 pthread_mutex_unlock(&cache_lock);
1825 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1827 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1828 } /* }}} static int handle_request_last */
1830 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1832 char *file, file_tmp[PATH_MAX];
1837 unsigned long step = 300;
1838 time_t last_up = time(NULL)-10;
1839 int no_overwrite = opt_no_overwrite;
1842 /* obtain filename */
1843 status = buffer_get_field(&buffer, &buffer_size, &file);
1845 return syntax_error(sock,cmd);
1846 /* get full pathname */
1847 get_abs_path(&file, file_tmp);
1848 if (!check_file_access(file, sock)) {
1849 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1851 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1853 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1854 if( ! strncmp(tok,"-b",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1857 last_up = (time_t) atol(tok);
1860 if( ! strncmp(tok,"-s",2) ) {
1861 status = buffer_get_field(&buffer, &buffer_size, &tok );
1862 if (status != 0) return syntax_error(sock,cmd);
1866 if( ! strncmp(tok,"-O",2) ) {
1870 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1871 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1872 return syntax_error(sock,cmd);
1875 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1877 if (last_up < 3600 * 24 * 365 * 10) {
1878 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1882 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1885 return send_response(sock, RESP_OK, "RRD created OK\n");
1887 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1888 } /* }}} static int handle_request_create */
1890 /* start "BATCH" processing */
1891 static int batch_start (HANDLER_PROTO) /* {{{ */
1894 if (sock->batch_start)
1895 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1897 status = send_response(sock, RESP_OK,
1898 "Go ahead. End with dot '.' on its own line.\n");
1899 sock->batch_start = time(NULL);
1900 sock->batch_cmd = 0;
1903 } /* }}} static int batch_start */
1905 /* finish "BATCH" processing and return results to the client */
1906 static int batch_done (HANDLER_PROTO) /* {{{ */
1908 assert(sock->batch_start);
1909 sock->batch_start = 0;
1910 sock->batch_cmd = 0;
1911 return send_response(sock, RESP_OK, "errors\n");
1912 } /* }}} static int batch_done */
1914 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1917 } /* }}} static int handle_request_quit */
1919 static command_t list_of_commands[] = { /* {{{ */
1922 handle_request_update,
1924 "UPDATE <filename> <values> [<values> ...]\n"
1926 "Adds the given file to the internal cache if it is not yet known and\n"
1927 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1930 "Each <values> has the following form:\n"
1931 " <values> = <time>:<value>[:<value>[...]]\n"
1932 "See the rrdupdate(1) manpage for details.\n"
1936 handle_request_wrote,
1937 CMD_CONTEXT_JOURNAL,
1943 handle_request_flush,
1944 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1945 "FLUSH <filename>\n"
1947 "Adds the given filename to the head of the update queue and returns\n"
1948 "after it has been dequeued.\n"
1952 handle_request_flushall,
1956 "Triggers writing of all pending updates. Returns immediately.\n"
1960 handle_request_pending,
1962 "PENDING <filename>\n"
1964 "Shows any 'pending' updates for a file, in order.\n"
1965 "The updates shown have not yet been written to the underlying RRD file.\n"
1969 handle_request_forget,
1971 "FORGET <filename>\n"
1973 "Removes the file completely from the cache.\n"
1974 "Any pending updates for the file will be lost.\n"
1978 handle_request_queue,
1982 "Shows all files in the output queue.\n"
1983 "The output is zero or more lines in the following format:\n"
1984 "(where <num_vals> is the number of values to be written)\n"
1986 "<num_vals> <filename>\n"
1990 handle_request_stats,
1994 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1995 "a description of the values.\n"
1999 handle_request_help,
2001 "HELP [<command>]\n",
2002 NULL, /* special! */
2010 "The 'BATCH' command permits the client to initiate a bulk load\n"
2011 " of commands to rrdcached.\n"
2016 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2017 " client: command #1\n"
2018 " client: command #2\n"
2019 " client: ... and so on\n"
2021 " server: 2 errors\n"
2022 " server: 7 message for command #7\n"
2023 " server: 9 message for command #9\n"
2025 "For more information, consult the rrdcached(1) documentation.\n"
2028 ".", /* BATCH terminator */
2036 handle_request_fetch,
2038 "FETCH <file> <CF> [<start> [<end>]]\n"
2040 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2044 handle_request_info,
2046 "INFO <filename>\n",
2047 "The INFO command retrieves information about a specified RRD file.\n"
2048 "This is returned in standard rrdinfo format, a sequence of lines\n"
2049 "with the format <keyname> = <value>\n"
2050 "Note that this is the data as of the last update of the RRD file itself,\n"
2051 "not the last time data was received via rrdcached, so there may be pending\n"
2052 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2056 handle_request_first,
2058 "FIRST <filename> <rra index>\n",
2059 "The FIRST command retrieves the first data time for a specified RRA in\n"
2064 handle_request_last,
2066 "LAST <filename>\n",
2067 "The LAST command retrieves the last update time for a specified RRD file.\n"
2068 "Note that this is the time of the last update of the RRD file itself, not\n"
2069 "the last time data was received via rrdcached, so there may be pending\n"
2070 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2074 handle_request_create,
2075 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2076 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2077 "The CREATE command will create an RRD file, overwriting any existing file\n"
2078 "unless the -O option is given or rrdcached was started with the -O option.\n"
2079 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2080 "not acceptable) and the step is in seconds (default is 300).\n"
2081 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2085 handle_request_quit,
2086 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2089 "Disconnect from rrdcached.\n"
2091 }; /* }}} command_t list_of_commands[] */
2092 static size_t list_of_commands_len = sizeof (list_of_commands)
2093 / sizeof (list_of_commands[0]);
2095 static command_t *find_command(char *cmd)
2099 for (i = 0; i < list_of_commands_len; i++)
2100 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2101 return (&list_of_commands[i]);
2105 /* We currently use the index in the `list_of_commands' array as a bit position
2106 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2107 * outside these functions so that switching to a more elegant storage method
2108 * is easily possible. */
2109 static ssize_t find_command_index (const char *cmd) /* {{{ */
2113 for (i = 0; i < list_of_commands_len; i++)
2114 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2115 return ((ssize_t) i);
2117 } /* }}} ssize_t find_command_index */
2119 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2124 if (JOURNAL_REPLAY(sock))
2130 if ((strcasecmp ("QUIT", cmd) == 0)
2131 || (strcasecmp ("HELP", cmd) == 0))
2133 else if (strcmp (".", cmd) == 0)
2136 i = find_command_index (cmd);
2141 if ((sock->permissions & (1 << i)) != 0)
2144 } /* }}} int socket_permission_check */
2146 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2151 i = find_command_index (cmd);
2156 sock->permissions |= (1 << i);
2158 } /* }}} int socket_permission_add */
2160 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2162 sock->permissions = 0;
2163 } /* }}} socket_permission_clear */
2165 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2166 listen_socket_t *src)
2168 dest->permissions = src->permissions;
2169 } /* }}} socket_permission_copy */
2171 /* check whether commands are received in the expected context */
2172 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2174 if (JOURNAL_REPLAY(sock))
2175 return (cmd->context & CMD_CONTEXT_JOURNAL);
2176 else if (sock->batch_start)
2177 return (cmd->context & CMD_CONTEXT_BATCH);
2179 return (cmd->context & CMD_CONTEXT_CLIENT);
2185 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2190 command_t *help = NULL;
2192 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2194 help = find_command(cmd_str);
2196 if (help && (help->syntax || help->help))
2200 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2204 add_response_info(sock, "Usage: %s\n", help->syntax);
2207 add_response_info(sock, "%s\n", help->help);
2213 resp_txt = "Command overview\n";
2215 for (i = 0; i < list_of_commands_len; i++)
2217 if (list_of_commands[i].syntax == NULL)
2219 add_response_info (sock, "%s", list_of_commands[i].syntax);
2223 return send_response(sock, RESP_OK, resp_txt);
2224 } /* }}} int handle_request_help */
2226 static int handle_request (DISPATCH_PROTO) /* {{{ */
2228 char *buffer_ptr = buffer;
2229 char *cmd_str = NULL;
2230 command_t *cmd = NULL;
2233 assert (buffer[buffer_size - 1] == '\0');
2235 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2238 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2242 if (sock != NULL && sock->batch_start)
2245 cmd = find_command(cmd_str);
2247 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2249 if (!socket_permission_check (sock, cmd->cmd))
2250 return send_response(sock, RESP_ERR, "Permission denied.\n");
2252 if (!command_check_context(sock, cmd))
2253 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2255 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2256 } /* }}} int handle_request */
2258 static void journal_set_free (journal_set *js) /* {{{ */
2263 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2266 } /* }}} journal_set_free */
2268 static void journal_set_remove (journal_set *js) /* {{{ */
2273 for (uint i=0; i < js->files_num; i++)
2275 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2276 unlink(js->files[i]);
2278 } /* }}} journal_set_remove */
2280 /* close current journal file handle.
2281 * MUST hold journal_lock before calling */
2282 static void journal_close(void) /* {{{ */
2284 if (journal_fh != NULL)
2286 if (fclose(journal_fh) != 0)
2287 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2292 } /* }}} journal_close */
2294 /* MUST hold journal_lock before calling */
2295 static void journal_new_file(void) /* {{{ */
2299 char new_file[PATH_MAX + 1];
2301 assert(journal_dir != NULL);
2302 assert(journal_cur != NULL);
2306 gettimeofday(&now, NULL);
2307 /* this format assures that the files sort in strcmp() order */
2308 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2309 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2311 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2312 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2316 journal_fh = fdopen(new_fd, "a");
2317 if (journal_fh == NULL)
2320 journal_size = ftell(journal_fh);
2321 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2323 /* record the file in the journal set */
2324 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2330 "JOURNALING DISABLED: Error while trying to create %s : %s",
2331 new_file, rrd_strerror(errno));
2333 "JOURNALING DISABLED: All values will be flushed at shutdown");
2336 config_flush_at_shutdown = 1;
2338 } /* }}} journal_new_file */
2340 /* MUST NOT hold journal_lock before calling this */
2341 static void journal_rotate(void) /* {{{ */
2343 journal_set *old_js = NULL;
2345 if (journal_dir == NULL)
2348 RRDD_LOG(LOG_DEBUG, "rotating journals");
2350 pthread_mutex_lock(&stats_lock);
2351 ++stats_journal_rotate;
2352 pthread_mutex_unlock(&stats_lock);
2354 pthread_mutex_lock(&journal_lock);
2358 /* rotate the journal sets */
2359 old_js = journal_old;
2360 journal_old = journal_cur;
2361 journal_cur = calloc(1, sizeof(journal_set));
2363 if (journal_cur != NULL)
2366 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2368 pthread_mutex_unlock(&journal_lock);
2370 journal_set_remove(old_js);
2371 journal_set_free (old_js);
2373 } /* }}} static void journal_rotate */
2375 /* MUST hold journal_lock when calling */
2376 static void journal_done(void) /* {{{ */
2378 if (journal_cur == NULL)
2383 if (config_flush_at_shutdown)
2385 RRDD_LOG(LOG_INFO, "removing journals");
2386 journal_set_remove(journal_old);
2387 journal_set_remove(journal_cur);
2391 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2392 "journals will be used at next startup");
2395 journal_set_free(journal_cur);
2396 journal_set_free(journal_old);
2399 } /* }}} static void journal_done */
2401 static int journal_write(char *cmd, char *args) /* {{{ */
2405 if (journal_fh == NULL)
2408 pthread_mutex_lock(&journal_lock);
2409 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2410 journal_size += chars;
2412 if (journal_size > JOURNAL_MAX)
2415 pthread_mutex_unlock(&journal_lock);
2419 pthread_mutex_lock(&stats_lock);
2420 stats_journal_bytes += chars;
2421 pthread_mutex_unlock(&stats_lock);
2425 } /* }}} static int journal_write */
2427 static int journal_replay (const char *file) /* {{{ */
2433 char entry[CMD_MAX];
2436 if (file == NULL) return 0;
2439 char *reason = "unknown error";
2441 struct stat statbuf;
2443 memset(&statbuf, 0, sizeof(statbuf));
2444 if (stat(file, &statbuf) != 0)
2446 reason = "stat error";
2449 else if (!S_ISREG(statbuf.st_mode))
2451 reason = "not a regular file";
2454 if (statbuf.st_uid != daemon_uid)
2456 reason = "not owned by daemon user";
2459 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2461 reason = "must not be user/group writable";
2467 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2468 file, rrd_strerror(status), reason);
2473 fh = fopen(file, "r");
2476 if (errno != ENOENT)
2477 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2478 file, rrd_strerror(errno));
2482 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2491 if (fgets(entry, sizeof(entry), fh) == NULL)
2493 entry_len = strlen(entry);
2495 /* check \n termination in case journal writing crashed mid-line */
2498 else if (entry[entry_len - 1] != '\n')
2500 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2505 entry[entry_len - 1] = '\0';
2507 if (handle_request(NULL, now, entry, entry_len) == 0)
2515 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2516 entry_cnt, fail_cnt);
2518 return entry_cnt > 0 ? 1 : 0;
2519 } /* }}} static int journal_replay */
2521 static int journal_sort(const void *v1, const void *v2)
2523 char **jn1 = (char **) v1;
2524 char **jn2 = (char **) v2;
2526 return strcmp(*jn1,*jn2);
2529 static void journal_init(void) /* {{{ */
2531 int had_journal = 0;
2533 struct dirent *dent;
2534 char path[PATH_MAX+1];
2536 if (journal_dir == NULL) return;
2538 pthread_mutex_lock(&journal_lock);
2540 journal_cur = calloc(1, sizeof(journal_set));
2541 if (journal_cur == NULL)
2543 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2547 RRDD_LOG(LOG_INFO, "checking for journal files");
2549 /* Handle old journal files during transition. This gives them the
2550 * correct sort order. TODO: remove after first release
2553 char old_path[PATH_MAX+1];
2554 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2555 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2556 rename(old_path, path);
2558 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2559 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2560 rename(old_path, path);
2563 dir = opendir(journal_dir);
2565 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2568 while ((dent = readdir(dir)) != NULL)
2570 /* looks like a journal file? */
2571 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2574 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2576 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2578 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2585 qsort(journal_cur->files, journal_cur->files_num,
2586 sizeof(journal_cur->files[0]), journal_sort);
2588 for (uint i=0; i < journal_cur->files_num; i++)
2589 had_journal += journal_replay(journal_cur->files[i]);
2593 /* it must have been a crash. start a flush */
2594 if (had_journal && config_flush_at_shutdown)
2595 flush_old_values(-1);
2597 pthread_mutex_unlock(&journal_lock);
2599 RRDD_LOG(LOG_INFO, "journal processing complete");
2601 } /* }}} static void journal_init */
2603 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2605 assert(sock != NULL);
2607 free(sock->rbuf); sock->rbuf = NULL;
2608 free(sock->wbuf); sock->wbuf = NULL;
2610 } /* }}} void free_listen_socket */
2612 static void close_connection(listen_socket_t *sock) /* {{{ */
2620 free_listen_socket(sock);
2622 } /* }}} void close_connection */
2624 static void *connection_thread_main (void *args) /* {{{ */
2626 listen_socket_t *sock;
2629 sock = (listen_socket_t *) args;
2632 /* init read buffers */
2633 sock->next_read = sock->next_cmd = 0;
2634 sock->rbuf = malloc(RBUF_SIZE);
2635 if (sock->rbuf == NULL)
2637 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2638 close_connection(sock);
2642 pthread_mutex_lock (&connection_threads_lock);
2644 /* LIBWRAP does not support multiple threads! By putting this code
2645 inside pthread_mutex_lock we do not have to worry about request_info
2646 getting overwritten by another thread.
2648 struct request_info req;
2649 request_init(&req, RQ_DAEMON, "rrdcache\0", RQ_FILE, fd, NULL );
2651 if(!hosts_access(&req)) {
2652 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2653 pthread_mutex_unlock (&connection_threads_lock);
2654 close_connection(sock);
2657 #endif /* HAVE_LIBWRAP */
2658 connection_threads_num++;
2659 pthread_mutex_unlock (&connection_threads_lock);
2661 while (state == RUNNING)
2668 struct pollfd pollfd;
2672 pollfd.events = POLLIN | POLLPRI;
2675 status = poll (&pollfd, 1, /* timeout = */ 500);
2676 if (state != RUNNING)
2678 else if (status == 0) /* timeout */
2680 else if (status < 0) /* error */
2683 if (status != EINTR)
2684 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2688 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2690 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2692 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2693 "poll(2) returned something unexpected: %#04hx",
2698 rbytes = read(fd, sock->rbuf + sock->next_read,
2699 RBUF_SIZE - sock->next_read);
2702 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2705 else if (rbytes == 0)
2708 sock->next_read += rbytes;
2710 if (sock->batch_start)
2711 now = sock->batch_start;
2715 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2717 status = handle_request (sock, now, cmd, cmd_len+1);
2724 close_connection(sock);
2726 /* Remove this thread from the connection threads list */
2727 pthread_mutex_lock (&connection_threads_lock);
2728 connection_threads_num--;
2729 if (connection_threads_num <= 0)
2730 pthread_cond_broadcast(&connection_threads_done);
2731 pthread_mutex_unlock (&connection_threads_lock);
2734 } /* }}} void *connection_thread_main */
2736 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2739 struct sockaddr_un sa;
2740 listen_socket_t *temp;
2743 char *path_copy, *dir;
2746 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2747 path += strlen("unix:");
2749 /* dirname may modify its argument */
2750 path_copy = strdup(path);
2751 if (path_copy == NULL)
2753 fprintf(stderr, "rrdcached: strdup(): %s\n",
2754 rrd_strerror(errno));
2758 dir = dirname(path_copy);
2759 if (rrd_mkdir_p(dir, 0777) != 0)
2761 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2762 dir, rrd_strerror(errno));
2768 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2769 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2772 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2776 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2778 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2781 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2782 rrd_strerror(errno));
2786 memset (&sa, 0, sizeof (sa));
2787 sa.sun_family = AF_UNIX;
2788 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2790 /* if we've gotten this far, we own the pid file. any daemon started
2791 * with the same args must not be alive. therefore, ensure that we can
2792 * create the socket...
2796 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2799 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2800 path, rrd_strerror(errno));
2805 /* tweak the sockets group ownership */
2806 if (sock->socket_group != (gid_t)-1)
2808 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2809 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2811 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2815 if (sock->socket_permissions != (mode_t)-1)
2817 if (chmod(path, sock->socket_permissions) != 0)
2818 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2819 (unsigned int)sock->socket_permissions, strerror(errno));
2822 status = listen (fd, /* backlog = */ 10);
2825 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2826 path, rrd_strerror(errno));
2832 listen_fds[listen_fds_num].fd = fd;
2833 listen_fds[listen_fds_num].family = PF_UNIX;
2834 strncpy(listen_fds[listen_fds_num].addr, path,
2835 sizeof (listen_fds[listen_fds_num].addr) - 1);
2839 } /* }}} int open_listen_socket_unix */
2841 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2843 struct addrinfo ai_hints;
2844 struct addrinfo *ai_res;
2845 struct addrinfo *ai_ptr;
2846 char addr_copy[NI_MAXHOST];
2851 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2852 addr_copy[sizeof (addr_copy) - 1] = 0;
2855 memset (&ai_hints, 0, sizeof (ai_hints));
2856 ai_hints.ai_flags = 0;
2857 #ifdef AI_ADDRCONFIG
2858 ai_hints.ai_flags |= AI_ADDRCONFIG;
2860 ai_hints.ai_family = AF_UNSPEC;
2861 ai_hints.ai_socktype = SOCK_STREAM;
2864 if (*addr == '[') /* IPv6+port format */
2866 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2869 port = strchr (addr, ']');
2872 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2880 else if (*port == 0)
2884 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2887 } /* if (*addr == '[') */
2890 port = rindex(addr, ':');
2898 status = getaddrinfo (addr,
2899 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2900 &ai_hints, &ai_res);
2903 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2904 addr, gai_strerror (status));
2908 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2911 listen_socket_t *temp;
2914 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2915 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2919 "rrdcached: open_listen_socket_network: realloc failed.\n");
2923 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2925 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2928 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2929 rrd_strerror(errno));
2933 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2935 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2938 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2939 sock->addr, rrd_strerror(errno));
2944 status = listen (fd, /* backlog = */ 10);
2947 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2948 sock->addr, rrd_strerror(errno));
2950 freeaddrinfo(ai_res);
2954 listen_fds[listen_fds_num].fd = fd;
2955 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2957 } /* for (ai_ptr) */
2959 freeaddrinfo(ai_res);
2961 } /* }}} static int open_listen_socket_network */
2963 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2965 assert(sock != NULL);
2966 assert(sock->addr != NULL);
2968 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2969 || sock->addr[0] == '/')
2970 return (open_listen_socket_unix(sock));
2972 return (open_listen_socket_network(sock));
2973 } /* }}} int open_listen_socket */
2975 static int close_listen_sockets (void) /* {{{ */
2979 for (i = 0; i < listen_fds_num; i++)
2981 close (listen_fds[i].fd);
2983 if (listen_fds[i].family == PF_UNIX)
2984 unlink(listen_fds[i].addr);
2992 } /* }}} int close_listen_sockets */
2994 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2996 struct pollfd *pollfds;
3001 if (listen_fds_num < 1)
3003 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3007 pollfds_num = listen_fds_num;
3008 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3009 if (pollfds == NULL)
3011 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3014 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3016 RRDD_LOG(LOG_INFO, "listening for connections");
3018 while (state == RUNNING)
3020 for (i = 0; i < pollfds_num; i++)
3022 pollfds[i].fd = listen_fds[i].fd;
3023 pollfds[i].events = POLLIN | POLLPRI;
3024 pollfds[i].revents = 0;
3027 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3028 if (state != RUNNING)
3030 else if (status == 0) /* timeout */
3032 else if (status < 0) /* error */
3035 if (status != EINTR)
3037 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3042 for (i = 0; i < pollfds_num; i++)
3044 listen_socket_t *client_sock;
3045 struct sockaddr_storage client_sa;
3046 socklen_t client_sa_size;
3048 pthread_attr_t attr;
3050 if (pollfds[i].revents == 0)
3053 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3055 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3056 "poll(2) returned something unexpected for listen FD #%i.",
3061 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3062 if (client_sock == NULL)
3064 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3067 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3069 client_sa_size = sizeof (client_sa);
3070 client_sock->fd = accept (pollfds[i].fd,
3071 (struct sockaddr *) &client_sa, &client_sa_size);
3072 if (client_sock->fd < 0)
3074 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3079 pthread_attr_init (&attr);
3080 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3082 status = pthread_create (&tid, &attr, connection_thread_main,
3086 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3087 close_connection(client_sock);
3090 } /* for (pollfds_num) */
3091 } /* while (state == RUNNING) */
3093 RRDD_LOG(LOG_INFO, "starting shutdown");
3095 close_listen_sockets ();
3097 pthread_mutex_lock (&connection_threads_lock);
3098 while (connection_threads_num > 0)
3099 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3100 pthread_mutex_unlock (&connection_threads_lock);
3105 } /* }}} void *listen_thread_main */
3107 static int daemonize (void) /* {{{ */
3112 daemon_uid = geteuid();
3114 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3116 pid_fd = check_pidfile();
3120 /* open all the listen sockets */
3121 if (config_listen_address_list_len > 0)
3123 for (size_t i = 0; i < config_listen_address_list_len; i++)
3124 open_listen_socket (config_listen_address_list[i]);
3126 rrd_free_ptrs((void ***) &config_listen_address_list,
3127 &config_listen_address_list_len);
3131 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3132 sizeof(default_socket.addr) - 1);
3133 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3134 open_listen_socket (&default_socket);
3137 if (listen_fds_num < 1)
3139 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3143 if (!stay_foreground)
3150 fprintf (stderr, "daemonize: fork(2) failed.\n");
3156 /* Become session leader */
3159 /* Open the first three file descriptors to /dev/null */
3164 open ("/dev/null", O_RDWR);
3165 if (dup(0) == -1 || dup(0) == -1){
3166 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3168 } /* if (!stay_foreground) */
3170 /* Change into the /tmp directory. */
3171 base_dir = (config_base_dir != NULL)
3175 if (chdir (base_dir) != 0)
3177 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3181 install_signal_handlers();
3183 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3184 RRDD_LOG(LOG_INFO, "starting up");
3186 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3187 (GDestroyNotify) free_cache_item);
3188 if (cache_tree == NULL)
3190 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3194 return write_pidfile (pid_fd);
3199 } /* }}} int daemonize */
3201 static int cleanup (void) /* {{{ */
3203 pthread_cond_broadcast (&flush_cond);
3204 pthread_join (flush_thread, NULL);
3206 pthread_cond_broadcast (&queue_cond);
3207 for (int i = 0; i < config_queue_threads; i++)
3208 pthread_join (queue_threads[i], NULL);
3210 if (config_flush_at_shutdown)
3212 assert(cache_queue_head == NULL);
3213 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3216 free(queue_threads);
3217 free(config_base_dir);
3219 pthread_mutex_lock(&cache_lock);
3220 g_tree_destroy(cache_tree);
3222 pthread_mutex_lock(&journal_lock);
3225 RRDD_LOG(LOG_INFO, "goodbye");
3229 free(config_pid_file);
3232 } /* }}} int cleanup */
3234 static int read_options (int argc, char **argv) /* {{{ */
3239 socket_permission_clear (&default_socket);
3241 default_socket.socket_group = (gid_t)-1;
3242 default_socket.socket_permissions = (mode_t)-1;
3244 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3249 opt_no_overwrite = 1;
3258 listen_socket_t *new;
3260 new = malloc(sizeof(listen_socket_t));
3263 fprintf(stderr, "read_options: malloc failed.\n");
3266 memset(new, 0, sizeof(listen_socket_t));
3268 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3270 /* Add permissions to the socket {{{ */
3271 if (default_socket.permissions != 0)
3273 socket_permission_copy (new, &default_socket);
3275 else /* if (default_socket.permissions == 0) */
3277 /* Add permission for ALL commands to the socket. */
3279 for (i = 0; i < list_of_commands_len; i++)
3281 status = socket_permission_add (new, list_of_commands[i].cmd);
3284 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3285 "socket failed. This should never happen, ever! Sorry.\n",
3286 list_of_commands[i].cmd);
3291 /* }}} Done adding permissions. */
3293 new->socket_group = default_socket.socket_group;
3294 new->socket_permissions = default_socket.socket_permissions;
3296 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3297 &config_listen_address_list_len, new))
3299 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3305 /* set socket group permissions */
3311 group_gid = strtoul(optarg, NULL, 10);
3312 if (errno != EINVAL && group_gid>0)
3314 /* we were passed a number */
3315 grp = getgrgid(group_gid);
3319 grp = getgrnam(optarg);
3324 default_socket.socket_group = grp->gr_gid;
3328 /* no idea what the user wanted... */
3329 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3335 /* set socket file permissions */
3339 char *endptr = NULL;
3341 tmp = strtol (optarg, &endptr, 8);
3342 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3343 || (tmp > 07777) || (tmp < 0)) {
3344 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3349 default_socket.socket_permissions = (mode_t)tmp;
3360 socket_permission_clear (&default_socket);
3362 optcopy = strdup (optarg);
3365 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3368 status = socket_permission_add (&default_socket, ptr);
3371 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3372 "socket failed. Most likely, this permission doesn't "
3373 "exist. Check your command line.\n", ptr);
3386 temp = atoi (optarg);
3388 config_flush_interval = temp;
3391 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3401 temp = atoi (optarg);
3403 config_write_interval = temp;
3406 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3416 temp = atoi(optarg);
3418 config_write_jitter = temp;
3421 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3431 threads = atoi(optarg);
3433 config_queue_threads = threads;
3436 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3443 config_write_base_only = 1;
3449 char base_realpath[PATH_MAX];
3451 if (config_base_dir != NULL)
3452 free (config_base_dir);
3453 config_base_dir = strdup (optarg);
3454 if (config_base_dir == NULL)
3456 fprintf (stderr, "read_options: strdup failed.\n");
3460 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3462 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3463 config_base_dir, rrd_strerror (errno));
3467 /* make sure that the base directory is not resolved via
3468 * symbolic links. this makes some performance-enhancing
3469 * assumptions possible (we don't have to resolve paths
3470 * that start with a "/")
3472 if (realpath(config_base_dir, base_realpath) == NULL)
3474 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3475 "%s\n", config_base_dir, rrd_strerror(errno));
3479 len = strlen (config_base_dir);
3480 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3482 config_base_dir[len - 1] = 0;
3488 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3492 _config_base_dir_len = len;
3494 len = strlen (base_realpath);
3495 while ((len > 0) && (base_realpath[len - 1] == '/'))
3497 base_realpath[len - 1] = '\0';
3501 if (strncmp(config_base_dir,
3502 base_realpath, sizeof(base_realpath)) != 0)
3505 "Base directory (-b) resolved via file system links!\n"
3506 "Please consult rrdcached '-b' documentation!\n"
3507 "Consider specifying the real directory (%s)\n",
3516 if (config_pid_file != NULL)
3517 free (config_pid_file);
3518 config_pid_file = strdup (optarg);
3519 if (config_pid_file == NULL)
3521 fprintf (stderr, "read_options: strdup failed.\n");
3528 config_flush_at_shutdown = 1;
3533 char journal_dir_actual[PATH_MAX];
3535 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3537 status = rrd_mkdir_p(dir, 0777);
3540 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3541 dir, rrd_strerror(errno));
3545 if (access(dir, R_OK|W_OK|X_OK) != 0)
3547 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3548 errno ? rrd_strerror(errno) : "");
3556 int temp = atoi(optarg);
3558 config_alloc_chunk = temp;
3561 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3569 printf ("RRDCacheD %s\n"
3570 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3572 "Usage: rrdcached [options]\n"
3574 "Valid options are:\n"
3575 " -l <address> Socket address to listen to.\n"
3576 " -P <perms> Sets the permissions to assign to all following "
3578 " -w <seconds> Interval in which to write data.\n"
3579 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3580 " -t <threads> Number of write threads.\n"
3581 " -f <seconds> Interval in which to flush dead data.\n"
3582 " -p <file> Location of the PID-file.\n"
3583 " -b <dir> Base directory to change to.\n"
3584 " -B Restrict file access to paths within -b <dir>\n"
3585 " -g Do not fork and run in the foreground.\n"
3586 " -j <dir> Directory in which to create the journal files.\n"
3587 " -F Always flush all updates at shutdown\n"
3588 " -s <id|name> Group owner of all following UNIX sockets\n"
3589 " (the socket will also have read/write permissions "
3591 " -m <mode> File permissions (octal) of all following UNIX "
3593 " -a <size> Memory allocation chunk size. Default is 1.\n"
3594 " -O Do not allow CREATE commands to overwrite existing\n"
3595 " files, even if asked to.\n"
3597 "For more information and a detailed description of all options "
3599 "to the rrdcached(1) manual page.\n",
3606 } /* switch (option) */
3607 } /* while (getopt) */
3609 /* advise the user when values are not sane */
3610 if (config_flush_interval < 2 * config_write_interval)
3611 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3612 " 2x write interval (-w) !\n");
3613 if (config_write_jitter > config_write_interval)
3614 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3615 " write interval (-w) !\n");
3617 if (config_write_base_only && config_base_dir == NULL)
3618 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3619 " Consult the rrdcached documentation\n");
3621 if (journal_dir == NULL)
3622 config_flush_at_shutdown = 1;
3625 } /* }}} int read_options */
3627 int main (int argc, char **argv)
3631 status = read_options (argc, argv);
3639 status = daemonize ();
3642 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3648 /* start the queue threads */
3649 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3650 if (queue_threads == NULL)
3652 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3656 for (int i = 0; i < config_queue_threads; i++)
3658 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3659 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3662 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3668 /* start the flush thread */
3669 memset(&flush_thread, 0, sizeof(flush_thread));
3670 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3673 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3678 listen_thread_main (NULL);
3685 * vim: set sw=2 sts=2 ts=8 et fdm=marker :