2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
68 #include "rrd_client.h"
80 #include <sys/socket.h>
88 #include <sys/types.h>
100 #include <sys/time.h>
107 #endif /* HAVE_LIBWRAP */
109 #include <glib-2.0/glib.h>
112 #define RRDD_LOG(severity, ...) \
114 if (stay_foreground) { \
115 fprintf(stderr, __VA_ARGS__); \
116 fprintf(stderr, "\n"); } \
117 syslog ((severity), __VA_ARGS__); \
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
128 char addr[PATH_MAX + 1];
131 /* state for BATCH processing */
143 uint32_t permissions;
146 mode_t socket_permissions;
148 typedef struct listen_socket_s listen_socket_t;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
155 char UNUSED(*buffer),\
156 size_t UNUSED(buffer_size)
158 #define HANDLER_PROTO command_t UNUSED(*cmd),\
163 int (*handler)(HANDLER_PROTO);
165 char context; /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT (1<<0)
167 #define CMD_CONTEXT_BATCH (1<<1)
168 #define CMD_CONTEXT_JOURNAL (1<<2)
169 #define CMD_CONTEXT_ANY (0x7f)
176 typedef struct cache_item_s cache_item_t;
181 size_t values_num; /* number of valid pointers */
182 size_t values_alloc; /* number of allocated pointers */
183 time_t last_flush_time;
184 double last_update_stamp;
185 #define CI_FLAGS_IN_TREE (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
188 pthread_cond_t flushed;
193 struct callback_flush_data_s
200 typedef struct callback_flush_data_s callback_flush_data_t;
207 typedef enum queue_side_e queue_side_t;
209 /* describe a set of journal files */
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int opt_no_overwrite = 0; /* default for the daemon */
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL; /* current journal file handle */
282 static long journal_size = 0; /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
295 static void sig_common (const char *sig) /* {{{ */
297 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
298 if (state == RUNNING) {
301 pthread_cond_broadcast(&flush_cond);
302 pthread_cond_broadcast(&queue_cond);
303 } /* }}} void sig_common */
305 static void sig_int_handler (int UNUSED(s)) /* {{{ */
308 } /* }}} void sig_int_handler */
310 static void sig_term_handler (int UNUSED(s)) /* {{{ */
313 } /* }}} void sig_term_handler */
315 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
317 config_flush_at_shutdown = 1;
319 } /* }}} void sig_usr1_handler */
321 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
323 config_flush_at_shutdown = 0;
325 } /* }}} void sig_usr2_handler */
327 static void install_signal_handlers(void) /* {{{ */
329 /* These structures are static, because `sigaction' behaves weird if the are
331 static struct sigaction sa_int;
332 static struct sigaction sa_term;
333 static struct sigaction sa_pipe;
334 static struct sigaction sa_usr1;
335 static struct sigaction sa_usr2;
337 /* Install signal handlers */
338 memset (&sa_int, 0, sizeof (sa_int));
339 sa_int.sa_handler = sig_int_handler;
340 sigaction (SIGINT, &sa_int, NULL);
342 memset (&sa_term, 0, sizeof (sa_term));
343 sa_term.sa_handler = sig_term_handler;
344 sigaction (SIGTERM, &sa_term, NULL);
346 memset (&sa_pipe, 0, sizeof (sa_pipe));
347 sa_pipe.sa_handler = SIG_IGN;
348 sigaction (SIGPIPE, &sa_pipe, NULL);
350 memset (&sa_pipe, 0, sizeof (sa_usr1));
351 sa_usr1.sa_handler = sig_usr1_handler;
352 sigaction (SIGUSR1, &sa_usr1, NULL);
354 memset (&sa_usr2, 0, sizeof (sa_usr2));
355 sa_usr2.sa_handler = sig_usr2_handler;
356 sigaction (SIGUSR2, &sa_usr2, NULL);
358 } /* }}} void install_signal_handlers */
360 static int open_pidfile(char *action, int oflag) /* {{{ */
364 char *file_copy, *dir;
366 file = (config_pid_file != NULL)
368 : LOCALSTATEDIR "/run/rrdcached.pid";
370 /* dirname may modify its argument */
371 file_copy = strdup(file);
372 if (file_copy == NULL)
374 fprintf(stderr, "rrdcached: strdup(): %s\n",
375 rrd_strerror(errno));
379 dir = dirname(file_copy);
380 if (rrd_mkdir_p(dir, 0777) != 0)
382 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
383 dir, rrd_strerror(errno));
389 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
391 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
392 action, file, rrd_strerror(errno));
395 } /* }}} static int open_pidfile */
397 /* check existing pid file to see whether a daemon is running */
398 static int check_pidfile(void)
404 pid_fd = open_pidfile("open", O_RDWR);
408 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
415 /* another running process that we can signal COULD be
416 * a competing rrdcached */
417 if (pid != getpid() && kill(pid, 0) == 0)
420 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
425 lseek(pid_fd, 0, SEEK_SET);
426 if (ftruncate(pid_fd, 0) == -1)
429 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
435 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
436 "rrdcached: starting normally.\n", pid);
439 } /* }}} static int check_pidfile */
441 static int write_pidfile (int fd) /* {{{ */
448 fh = fdopen (fd, "w");
451 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
456 fprintf (fh, "%i\n", (int) pid);
460 } /* }}} int write_pidfile */
462 static int remove_pidfile (void) /* {{{ */
467 file = (config_pid_file != NULL)
469 : LOCALSTATEDIR "/run/rrdcached.pid";
471 status = unlink (file);
475 } /* }}} int remove_pidfile */
477 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
481 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
482 sock->next_read - sock->next_cmd);
486 /* no commands left, move remainder back to front of rbuf */
487 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
488 sock->next_read - sock->next_cmd);
489 sock->next_read -= sock->next_cmd;
496 char *cmd = sock->rbuf + sock->next_cmd;
499 sock->next_cmd = eol - sock->rbuf + 1;
501 if (eol > sock->rbuf && *(eol-1) == '\r')
502 *(--eol) = '\0'; /* handle "\r\n" EOL */
511 } /* }}} char *next_cmd */
513 /* add the characters directly to the write buffer */
514 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
518 assert(sock != NULL);
520 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
523 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
527 strncpy(new_buf + sock->wbuf_len, str, len + 1);
529 sock->wbuf = new_buf;
530 sock->wbuf_len += len;
533 } /* }}} static int add_to_wbuf */
535 /* add the text to the "extra" info that's sent after the status line */
536 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
539 char buffer[RRD_CMD_MAX];
542 if (JOURNAL_REPLAY(sock)) return 0;
543 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
546 #ifdef HAVE_VSNPRINTF
547 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
549 len = vsprintf(buffer, fmt, argp);
554 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
558 return add_to_wbuf(sock, buffer, len);
559 } /* }}} static int add_response_info */
561 static int count_lines(char *str) /* {{{ */
567 while ((str = strchr(str, '\n')) != NULL)
575 } /* }}} static int count_lines */
577 /* send the response back to the user.
578 * returns 0 on success, -1 on error
579 * write buffer is always zeroed after this call */
580 static int send_response (listen_socket_t *sock, response_code rc,
581 char *fmt, ...) /* {{{ */
584 char buffer[RRD_CMD_MAX];
589 if (JOURNAL_REPLAY(sock)) return rc;
591 if (sock->batch_start)
594 return rc; /* no response on success during BATCH */
595 lines = sock->batch_cmd;
597 else if (rc == RESP_OK)
598 lines = count_lines(sock->wbuf);
602 rclen = snprintf(buffer, sizeof buffer, "%d ", lines);
604 #ifdef HAVE_VSNPRINTF
605 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
607 len = vsprintf(buffer+rclen, fmt, argp);
615 /* append the result to the wbuf, don't write to the user */
616 if (sock->batch_start)
617 return add_to_wbuf(sock, buffer, len);
619 /* first write must be complete */
620 if (len != write(sock->fd, buffer, len))
622 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
626 if (sock->wbuf != NULL && rc == RESP_OK)
629 while (wrote < sock->wbuf_len)
631 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
634 RRDD_LOG(LOG_INFO, "send_response: could not write results");
641 free(sock->wbuf); sock->wbuf = NULL;
647 static void wipe_ci_values(cache_item_t *ci, time_t when)
651 ci->values_alloc = 0;
653 ci->last_flush_time = when;
654 if (config_write_jitter > 0)
655 ci->last_flush_time += (rrd_random() % config_write_jitter);
659 * remove a "cache_item_t" item from the queue.
660 * must hold 'cache_lock' when calling this
662 static void remove_from_queue(cache_item_t *ci) /* {{{ */
664 if (ci == NULL) return;
665 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
667 if (ci->prev == NULL)
668 cache_queue_head = ci->next; /* reset head */
670 ci->prev->next = ci->next;
672 if (ci->next == NULL)
673 cache_queue_tail = ci->prev; /* reset the tail */
675 ci->next->prev = ci->prev;
677 ci->next = ci->prev = NULL;
678 ci->flags &= ~CI_FLAGS_IN_QUEUE;
680 pthread_mutex_lock (&stats_lock);
681 assert (stats_queue_length > 0);
682 stats_queue_length--;
683 pthread_mutex_unlock (&stats_lock);
685 } /* }}} static void remove_from_queue */
687 /* free the resources associated with the cache_item_t
688 * must hold cache_lock when calling this function
690 static void *free_cache_item(cache_item_t *ci) /* {{{ */
692 if (ci == NULL) return NULL;
694 remove_from_queue(ci);
696 for (size_t i=0; i < ci->values_num; i++)
702 /* in case anyone is waiting */
703 pthread_cond_broadcast(&ci->flushed);
704 pthread_cond_destroy(&ci->flushed);
709 } /* }}} static void *free_cache_item */
712 * enqueue_cache_item:
713 * `cache_lock' must be acquired before calling this function!
715 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
721 if (ci->values_num == 0)
726 if (cache_queue_head == ci)
729 /* remove if further down in queue */
730 remove_from_queue(ci);
733 ci->next = cache_queue_head;
734 if (ci->next != NULL)
736 cache_queue_head = ci;
738 if (cache_queue_tail == NULL)
739 cache_queue_tail = cache_queue_head;
741 else /* (side == TAIL) */
743 /* We don't move values back in the list.. */
744 if (ci->flags & CI_FLAGS_IN_QUEUE)
747 assert (ci->next == NULL);
748 assert (ci->prev == NULL);
750 ci->prev = cache_queue_tail;
752 if (cache_queue_tail == NULL)
753 cache_queue_head = ci;
755 cache_queue_tail->next = ci;
757 cache_queue_tail = ci;
760 ci->flags |= CI_FLAGS_IN_QUEUE;
762 pthread_cond_signal(&queue_cond);
763 pthread_mutex_lock (&stats_lock);
764 stats_queue_length++;
765 pthread_mutex_unlock (&stats_lock);
768 } /* }}} int enqueue_cache_item */
771 * tree_callback_flush:
772 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
773 * while this is in progress.
775 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
779 callback_flush_data_t *cfd;
781 ci = (cache_item_t *) value;
782 cfd = (callback_flush_data_t *) data;
784 if (ci->flags & CI_FLAGS_IN_QUEUE)
787 if (ci->values_num > 0
788 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
790 enqueue_cache_item (ci, TAIL);
792 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
793 && (ci->values_num <= 0))
795 assert ((char *) key == ci->file);
796 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
798 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
804 } /* }}} gboolean tree_callback_flush */
806 static int flush_old_values (int max_age)
808 callback_flush_data_t cfd;
811 memset (&cfd, 0, sizeof (cfd));
812 /* Pass the current time as user data so that we don't need to call
813 * `time' for each node. */
814 cfd.now = time (NULL);
819 cfd.abs_timeout = cfd.now - max_age;
821 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
823 /* `tree_callback_flush' will return the keys of all values that haven't
824 * been touched in the last `config_flush_interval' seconds in `cfd'.
825 * The char*'s in this array point to the same memory as ci->file, so we
826 * don't need to free them separately. */
827 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
829 for (k = 0; k < cfd.keys_num; k++)
831 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
832 /* should never fail, since we have held the cache_lock
834 assert(status == TRUE);
837 if (cfd.keys != NULL)
844 } /* int flush_old_values */
846 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
849 struct timespec next_flush;
852 gettimeofday (&now, NULL);
853 next_flush.tv_sec = now.tv_sec + config_flush_interval;
854 next_flush.tv_nsec = 1000 * now.tv_usec;
856 pthread_mutex_lock(&cache_lock);
858 while (state == RUNNING)
860 gettimeofday (&now, NULL);
861 if ((now.tv_sec > next_flush.tv_sec)
862 || ((now.tv_sec == next_flush.tv_sec)
863 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
865 RRDD_LOG(LOG_DEBUG, "flushing old values");
867 /* Determine the time of the next cache flush. */
868 next_flush.tv_sec = now.tv_sec + config_flush_interval;
870 /* Flush all values that haven't been written in the last
871 * `config_write_interval' seconds. */
872 flush_old_values (config_write_interval);
874 /* unlock the cache while we rotate so we don't block incoming
875 * updates if the fsync() blocks on disk I/O */
876 pthread_mutex_unlock(&cache_lock);
878 pthread_mutex_lock(&cache_lock);
881 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
882 if (status != 0 && status != ETIMEDOUT)
884 RRDD_LOG (LOG_ERR, "flush_thread_main: "
885 "pthread_cond_timedwait returned %i.", status);
889 if (config_flush_at_shutdown)
890 flush_old_values (-1); /* flush everything */
894 pthread_mutex_unlock(&cache_lock);
897 } /* void *flush_thread_main */
899 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
901 pthread_mutex_lock (&cache_lock);
903 while (state != SHUTDOWN
904 || (cache_queue_head != NULL && config_flush_at_shutdown))
912 /* Now, check if there's something to store away. If not, wait until
913 * something comes in. */
914 if (cache_queue_head == NULL)
916 status = pthread_cond_wait (&queue_cond, &cache_lock);
917 if ((status != 0) && (status != ETIMEDOUT))
919 RRDD_LOG (LOG_ERR, "queue_thread_main: "
920 "pthread_cond_wait returned %i.", status);
924 /* Check if a value has arrived. This may be NULL if we timed out or there
925 * was an interrupt such as a signal. */
926 if (cache_queue_head == NULL)
929 ci = cache_queue_head;
931 /* copy the relevant parts */
932 file = strdup (ci->file);
935 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
939 assert(ci->values != NULL);
940 assert(ci->values_num > 0);
943 values_num = ci->values_num;
945 wipe_ci_values(ci, time(NULL));
946 remove_from_queue(ci);
948 pthread_mutex_unlock (&cache_lock);
951 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
954 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
955 "rrd_update_r (%s) failed with status %i. (%s)",
956 file, status, rrd_get_error());
959 journal_write("wrote", file);
961 /* Search again in the tree. It's possible someone issued a "FORGET"
962 * while we were writing the update values. */
963 pthread_mutex_lock(&cache_lock);
964 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
966 pthread_cond_broadcast(&ci->flushed);
967 pthread_mutex_unlock(&cache_lock);
971 pthread_mutex_lock (&stats_lock);
972 stats_updates_written++;
973 stats_data_sets_written += values_num;
974 pthread_mutex_unlock (&stats_lock);
977 rrd_free_ptrs((void ***) &values, &values_num);
980 pthread_mutex_lock (&cache_lock);
982 pthread_mutex_unlock (&cache_lock);
985 } /* }}} void *queue_thread_main */
987 static int buffer_get_field (char **buffer_ret, /* {{{ */
988 size_t *buffer_size_ret, char **field_ret)
997 buffer = *buffer_ret;
999 buffer_size = *buffer_size_ret;
1000 field = *buffer_ret;
1003 if (buffer_size <= 0)
1006 /* This is ensured by `handle_request'. */
1007 assert (buffer[buffer_size - 1] == '\0');
1010 while (buffer_pos < buffer_size)
1012 /* Check for end-of-field or end-of-buffer */
1013 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1015 field[field_size] = 0;
1021 /* Handle escaped characters. */
1022 else if (buffer[buffer_pos] == '\\')
1024 if (buffer_pos >= (buffer_size - 1))
1027 field[field_size] = buffer[buffer_pos];
1031 /* Normal operation */
1034 field[field_size] = buffer[buffer_pos];
1038 } /* while (buffer_pos < buffer_size) */
1043 *buffer_ret = buffer + buffer_pos;
1044 *buffer_size_ret = buffer_size - buffer_pos;
1048 } /* }}} int buffer_get_field */
1050 /* if we're restricting writes to the base directory,
1051 * check whether the file falls within the dir
1052 * returns 1 if OK, otherwise 0
1054 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1056 assert(file != NULL);
1058 if (!config_write_base_only
1059 || JOURNAL_REPLAY(sock)
1060 || config_base_dir == NULL)
1063 if (strstr(file, "../") != NULL) goto err;
1065 /* relative paths without "../" are ok */
1066 if (*file != '/') return 1;
1068 /* file must be of the format base + "/" + <1+ char filename> */
1069 if (strlen(file) < _config_base_dir_len + 2) goto err;
1070 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1071 if (*(file + _config_base_dir_len) != '/') goto err;
1076 if (sock != NULL && sock->fd >= 0)
1077 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1080 } /* }}} static int check_file_access */
1082 /* when using a base dir, convert relative paths to absolute paths.
1083 * if necessary, modifies the "filename" pointer to point
1084 * to the new path created in "tmp". "tmp" is provided
1085 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1087 * this allows us to optimize for the expected case (absolute path)
1090 static void get_abs_path(char **filename, char *tmp)
1092 assert(tmp != NULL);
1093 assert(filename != NULL && *filename != NULL);
1095 if (config_base_dir == NULL || **filename == '/')
1098 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1100 } /* }}} static int get_abs_path */
1102 static int flush_file (const char *filename) /* {{{ */
1106 pthread_mutex_lock (&cache_lock);
1108 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1111 pthread_mutex_unlock (&cache_lock);
1115 if (ci->values_num > 0)
1117 /* Enqueue at head */
1118 enqueue_cache_item (ci, HEAD);
1119 pthread_cond_wait(&ci->flushed, &cache_lock);
1122 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1123 * may have been purged during our cond_wait() */
1125 pthread_mutex_unlock(&cache_lock);
1128 } /* }}} int flush_file */
1130 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1132 char *err = "Syntax error.\n";
1134 if (cmd && cmd->syntax)
1137 return send_response(sock, RESP_ERR, "Usage: %s", err);
1138 } /* }}} static int syntax_error() */
1140 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1142 uint64_t copy_queue_length;
1143 uint64_t copy_updates_received;
1144 uint64_t copy_flush_received;
1145 uint64_t copy_updates_written;
1146 uint64_t copy_data_sets_written;
1147 uint64_t copy_journal_bytes;
1148 uint64_t copy_journal_rotate;
1150 uint64_t tree_nodes_number;
1151 uint64_t tree_depth;
1153 pthread_mutex_lock (&stats_lock);
1154 copy_queue_length = stats_queue_length;
1155 copy_updates_received = stats_updates_received;
1156 copy_flush_received = stats_flush_received;
1157 copy_updates_written = stats_updates_written;
1158 copy_data_sets_written = stats_data_sets_written;
1159 copy_journal_bytes = stats_journal_bytes;
1160 copy_journal_rotate = stats_journal_rotate;
1161 pthread_mutex_unlock (&stats_lock);
1163 pthread_mutex_lock (&cache_lock);
1164 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1165 tree_depth = (uint64_t) g_tree_height (cache_tree);
1166 pthread_mutex_unlock (&cache_lock);
1168 add_response_info(sock,
1169 "QueueLength: %"PRIu64"\n", copy_queue_length);
1170 add_response_info(sock,
1171 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1172 add_response_info(sock,
1173 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1174 add_response_info(sock,
1175 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1176 add_response_info(sock,
1177 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1178 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1179 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1180 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1181 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1183 send_response(sock, RESP_OK, "Statistics follow\n");
1186 } /* }}} int handle_request_stats */
1188 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1190 char *file, file_tmp[PATH_MAX];
1193 status = buffer_get_field (&buffer, &buffer_size, &file);
1196 return syntax_error(sock,cmd);
1200 pthread_mutex_lock(&stats_lock);
1201 stats_flush_received++;
1202 pthread_mutex_unlock(&stats_lock);
1204 get_abs_path(&file, file_tmp);
1205 if (!check_file_access(file, sock)) return 0;
1207 status = flush_file (file);
1209 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1210 else if (status == ENOENT)
1212 /* no file in our tree; see whether it exists at all */
1213 struct stat statbuf;
1215 memset(&statbuf, 0, sizeof(statbuf));
1216 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1217 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1219 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1221 else if (status < 0)
1222 return send_response(sock, RESP_ERR, "Internal error.\n");
1224 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1229 } /* }}} int handle_request_flush */
1231 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1233 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1235 pthread_mutex_lock(&cache_lock);
1236 flush_old_values(-1);
1237 pthread_mutex_unlock(&cache_lock);
1239 return send_response(sock, RESP_OK, "Started flush.\n");
1240 } /* }}} static int handle_request_flushall */
1242 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1245 char *file, file_tmp[PATH_MAX];
1248 status = buffer_get_field(&buffer, &buffer_size, &file);
1250 return syntax_error(sock,cmd);
1252 get_abs_path(&file, file_tmp);
1254 pthread_mutex_lock(&cache_lock);
1255 ci = g_tree_lookup(cache_tree, file);
1258 pthread_mutex_unlock(&cache_lock);
1259 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1262 for (size_t i=0; i < ci->values_num; i++)
1263 add_response_info(sock, "%s\n", ci->values[i]);
1265 pthread_mutex_unlock(&cache_lock);
1266 return send_response(sock, RESP_OK, "updates pending\n");
1267 } /* }}} static int handle_request_pending */
1269 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1273 char *file, file_tmp[PATH_MAX];
1275 status = buffer_get_field(&buffer, &buffer_size, &file);
1277 return syntax_error(sock,cmd);
1279 get_abs_path(&file, file_tmp);
1280 if (!check_file_access(file, sock)) return 0;
1282 pthread_mutex_lock(&cache_lock);
1283 found = g_tree_remove(cache_tree, file);
1284 pthread_mutex_unlock(&cache_lock);
1288 if (!JOURNAL_REPLAY(sock))
1289 journal_write("forget", file);
1291 return send_response(sock, RESP_OK, "Gone!\n");
1294 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1298 } /* }}} static int handle_request_forget */
1300 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1304 pthread_mutex_lock(&cache_lock);
1306 ci = cache_queue_head;
1309 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1313 pthread_mutex_unlock(&cache_lock);
1315 return send_response(sock, RESP_OK, "in queue.\n");
1316 } /* }}} int handle_request_queue */
1318 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1320 char *file, file_tmp[PATH_MAX];
1323 char orig_buf[RRD_CMD_MAX];
1327 /* save it for the journal later */
1328 if (!JOURNAL_REPLAY(sock))
1329 strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1331 status = buffer_get_field (&buffer, &buffer_size, &file);
1333 return syntax_error(sock,cmd);
1335 pthread_mutex_lock(&stats_lock);
1336 stats_updates_received++;
1337 pthread_mutex_unlock(&stats_lock);
1339 get_abs_path(&file, file_tmp);
1340 if (!check_file_access(file, sock)) return 0;
1342 pthread_mutex_lock (&cache_lock);
1343 ci = g_tree_lookup (cache_tree, file);
1345 if (ci == NULL) /* {{{ */
1347 struct stat statbuf;
1350 /* don't hold the lock while we setup; stat(2) might block */
1351 pthread_mutex_unlock(&cache_lock);
1353 memset (&statbuf, 0, sizeof (statbuf));
1354 status = stat (file, &statbuf);
1357 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1360 if (status == ENOENT)
1361 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1363 return send_response(sock, RESP_ERR,
1364 "stat failed with error %i.\n", status);
1366 if (!S_ISREG (statbuf.st_mode))
1367 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1369 if (access(file, R_OK|W_OK) != 0)
1370 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1371 file, rrd_strerror(errno));
1373 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1376 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1378 return send_response(sock, RESP_ERR, "malloc failed.\n");
1380 memset (ci, 0, sizeof (cache_item_t));
1382 ci->file = strdup (file);
1383 if (ci->file == NULL)
1386 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1388 return send_response(sock, RESP_ERR, "strdup failed.\n");
1391 wipe_ci_values(ci, now);
1392 ci->flags = CI_FLAGS_IN_TREE;
1393 pthread_cond_init(&ci->flushed, NULL);
1395 pthread_mutex_lock(&cache_lock);
1397 /* another UPDATE might have added this entry in the meantime */
1398 tmp = g_tree_lookup (cache_tree, file);
1400 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1403 free_cache_item (ci);
1407 /* state may have changed while we were unlocked */
1408 if (state == SHUTDOWN)
1411 assert (ci != NULL);
1413 /* don't re-write updates in replay mode */
1414 if (!JOURNAL_REPLAY(sock))
1415 journal_write("update", orig_buf);
1417 while (buffer_size > 0)
1423 status = buffer_get_field (&buffer, &buffer_size, &value);
1426 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1430 /* make sure update time is always moving forward. We use double here since
1431 update does support subsecond precision for timestamps ... */
1432 stamp = strtod(value, &eostamp);
1433 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1435 pthread_mutex_unlock(&cache_lock);
1436 return send_response(sock, RESP_ERR,
1437 "Cannot find timestamp in '%s'!\n", value);
1439 else if (stamp <= ci->last_update_stamp)
1441 pthread_mutex_unlock(&cache_lock);
1442 return send_response(sock, RESP_ERR,
1443 "illegal attempt to update using time %lf when last"
1444 " update time is %lf (minimum one second step)\n",
1445 stamp, ci->last_update_stamp);
1448 ci->last_update_stamp = stamp;
1450 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1451 &ci->values_alloc, config_alloc_chunk))
1453 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1460 if (((now - ci->last_flush_time) >= config_write_interval)
1461 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1462 && (ci->values_num > 0))
1464 enqueue_cache_item (ci, TAIL);
1467 pthread_mutex_unlock (&cache_lock);
1470 return send_response(sock, RESP_ERR, "No values updated.\n");
1472 return send_response(sock, RESP_OK,
1473 "errors, enqueued %i value(s).\n", values_num);
1478 } /* }}} int handle_request_update */
1480 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1482 char *file, file_tmp[PATH_MAX];
1491 unsigned long ds_cnt;
1498 rrd_value_t *data_ptr;
1505 /* Read the arguments */
1508 status = buffer_get_field (&buffer, &buffer_size, &file);
1512 status = buffer_get_field (&buffer, &buffer_size, &cf);
1516 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1524 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1534 return (syntax_error(sock,cmd));
1536 get_abs_path(&file, file_tmp);
1537 if (!check_file_access(file, sock)) return 0;
1539 status = flush_file (file);
1540 if ((status != 0) && (status != ENOENT))
1541 return (send_response (sock, RESP_ERR,
1542 "flush_file (%s) failed with status %i.\n", file, status));
1544 t = time (NULL); /* "now" */
1546 /* Parse start time */
1547 if (start_str != NULL)
1554 value = strtol (start_str, &endptr, /* base = */ 0);
1555 if ((endptr == start_str) || (errno != 0))
1556 return (send_response(sock, RESP_ERR,
1557 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1561 start_tm = (time_t) value;
1563 start_tm = (time_t) (t + value);
1567 start_tm = t - 86400;
1570 /* Parse end time */
1571 if (end_str != NULL)
1578 value = strtol (end_str, &endptr, /* base = */ 0);
1579 if ((endptr == end_str) || (errno != 0))
1580 return (send_response(sock, RESP_ERR,
1581 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1585 end_tm = (time_t) value;
1587 end_tm = (time_t) (t + value);
1599 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1600 &ds_cnt, &ds_namv, &data);
1602 return (send_response(sock, RESP_ERR,
1603 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1605 add_response_info (sock, "FlushVersion: %lu\n", 1);
1606 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1607 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1608 add_response_info (sock, "Step: %lu\n", step);
1609 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1611 #define SSTRCAT(buffer,str,buffer_fill) do { \
1612 size_t str_len = strlen (str); \
1613 if ((buffer_fill + str_len) > sizeof (buffer)) \
1614 str_len = sizeof (buffer) - buffer_fill; \
1615 if (str_len > 0) { \
1616 strncpy (buffer + buffer_fill, str, str_len); \
1617 buffer_fill += str_len; \
1618 assert (buffer_fill <= sizeof (buffer)); \
1619 if (buffer_fill == sizeof (buffer)) \
1620 buffer[buffer_fill - 1] = 0; \
1622 buffer[buffer_fill] = 0; \
1626 { /* Add list of DS names */
1628 size_t linebuf_fill;
1630 memset (linebuf, 0, sizeof (linebuf));
1632 for (i = 0; i < ds_cnt; i++)
1635 SSTRCAT (linebuf, " ", linebuf_fill);
1636 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1637 rrd_freemem(ds_namv[i]);
1639 rrd_freemem(ds_namv);
1640 add_response_info (sock, "DSName: %s\n", linebuf);
1643 /* Add the actual data */
1646 for (t = start_tm + step; t <= end_tm; t += step)
1649 size_t linebuf_fill;
1652 memset (linebuf, 0, sizeof (linebuf));
1654 for (i = 0; i < ds_cnt; i++)
1656 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1657 tmp[sizeof (tmp) - 1] = 0;
1658 SSTRCAT (linebuf, tmp, linebuf_fill);
1663 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1667 return (send_response (sock, RESP_OK, "Success\n"));
1669 } /* }}} int handle_request_fetch */
1671 /* we came across a "WROTE" entry during journal replay.
1672 * throw away any values that we have accumulated for this file
1674 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1677 const char *file = buffer;
1679 pthread_mutex_lock(&cache_lock);
1681 ci = g_tree_lookup(cache_tree, file);
1684 pthread_mutex_unlock(&cache_lock);
1689 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1691 wipe_ci_values(ci, now);
1692 remove_from_queue(ci);
1694 pthread_mutex_unlock(&cache_lock);
1696 } /* }}} int handle_request_wrote */
1698 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1700 char *file, file_tmp[PATH_MAX];
1704 /* obtain filename */
1705 status = buffer_get_field(&buffer, &buffer_size, &file);
1707 return syntax_error(sock,cmd);
1708 /* get full pathname */
1709 get_abs_path(&file, file_tmp);
1710 if (!check_file_access(file, sock)) {
1711 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1715 info = rrd_info_r(file);
1717 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1719 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1720 switch (data->type) {
1722 if (isnan(data->value.u_val))
1723 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1725 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1728 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1731 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1734 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1737 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1742 rrd_info_free(info);
1744 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info */
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1749 char *i, *file, file_tmp[PATH_MAX];
1754 /* obtain filename */
1755 status = buffer_get_field(&buffer, &buffer_size, &file);
1757 return syntax_error(sock,cmd);
1758 /* get full pathname */
1759 get_abs_path(&file, file_tmp);
1760 if (!check_file_access(file, sock)) {
1761 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1764 status = buffer_get_field(&buffer, &buffer_size, &i);
1766 return syntax_error(sock,cmd);
1769 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774 t = rrd_first_r(file,idx);
1776 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1778 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_first */
1782 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1784 char *file, file_tmp[PATH_MAX];
1786 time_t t, from_file, step;
1787 rrd_file_t * rrd_file;
1791 /* obtain filename */
1792 status = buffer_get_field(&buffer, &buffer_size, &file);
1794 return syntax_error(sock,cmd);
1795 /* get full pathname */
1796 get_abs_path(&file, file_tmp);
1797 if (!check_file_access(file, sock)) {
1798 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1802 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1804 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1806 from_file = rrd.live_head->last_up;
1807 step = rrd.stat_head->pdp_step;
1808 rrd_close(rrd_file);
1809 pthread_mutex_lock(&cache_lock);
1810 ci = g_tree_lookup(cache_tree, file);
1812 t = ci->last_update_stamp;
1815 pthread_mutex_unlock(&cache_lock);
1819 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1821 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1822 } /* }}} static int handle_request_last */
1824 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1826 char *file, file_tmp[PATH_MAX];
1831 unsigned long step = 300;
1832 time_t last_up = time(NULL)-10;
1833 int no_overwrite = opt_no_overwrite;
1836 /* obtain filename */
1837 status = buffer_get_field(&buffer, &buffer_size, &file);
1839 return syntax_error(sock,cmd);
1840 /* get full pathname */
1841 get_abs_path(&file, file_tmp);
1842 if (!check_file_access(file, sock)) {
1843 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1845 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1847 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1848 if( ! strncmp(tok,"-b",2) ) {
1849 status = buffer_get_field(&buffer, &buffer_size, &tok );
1850 if (status != 0) return syntax_error(sock,cmd);
1851 last_up = (time_t) atol(tok);
1854 if( ! strncmp(tok,"-s",2) ) {
1855 status = buffer_get_field(&buffer, &buffer_size, &tok );
1856 if (status != 0) return syntax_error(sock,cmd);
1860 if( ! strncmp(tok,"-O",2) ) {
1864 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1865 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1866 return syntax_error(sock,cmd);
1869 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1871 if (last_up < 3600 * 24 * 365 * 10) {
1872 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1876 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1879 return send_response(sock, RESP_OK, "RRD created OK\n");
1881 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1882 } /* }}} static int handle_request_create */
1884 /* start "BATCH" processing */
1885 static int batch_start (HANDLER_PROTO) /* {{{ */
1888 if (sock->batch_start)
1889 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1891 status = send_response(sock, RESP_OK,
1892 "Go ahead. End with dot '.' on its own line.\n");
1893 sock->batch_start = time(NULL);
1894 sock->batch_cmd = 0;
1897 } /* }}} static int batch_start */
1899 /* finish "BATCH" processing and return results to the client */
1900 static int batch_done (HANDLER_PROTO) /* {{{ */
1902 assert(sock->batch_start);
1903 sock->batch_start = 0;
1904 sock->batch_cmd = 0;
1905 return send_response(sock, RESP_OK, "errors\n");
1906 } /* }}} static int batch_done */
1908 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1911 } /* }}} static int handle_request_quit */
1913 static command_t list_of_commands[] = { /* {{{ */
1916 handle_request_update,
1918 "UPDATE <filename> <values> [<values> ...]\n"
1920 "Adds the given file to the internal cache if it is not yet known and\n"
1921 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1924 "Each <values> has the following form:\n"
1925 " <values> = <time>:<value>[:<value>[...]]\n"
1926 "See the rrdupdate(1) manpage for details.\n"
1930 handle_request_wrote,
1931 CMD_CONTEXT_JOURNAL,
1937 handle_request_flush,
1938 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1939 "FLUSH <filename>\n"
1941 "Adds the given filename to the head of the update queue and returns\n"
1942 "after it has been dequeued.\n"
1946 handle_request_flushall,
1950 "Triggers writing of all pending updates. Returns immediately.\n"
1954 handle_request_pending,
1956 "PENDING <filename>\n"
1958 "Shows any 'pending' updates for a file, in order.\n"
1959 "The updates shown have not yet been written to the underlying RRD file.\n"
1963 handle_request_forget,
1965 "FORGET <filename>\n"
1967 "Removes the file completely from the cache.\n"
1968 "Any pending updates for the file will be lost.\n"
1972 handle_request_queue,
1976 "Shows all files in the output queue.\n"
1977 "The output is zero or more lines in the following format:\n"
1978 "(where <num_vals> is the number of values to be written)\n"
1980 "<num_vals> <filename>\n"
1984 handle_request_stats,
1988 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1989 "a description of the values.\n"
1993 handle_request_help,
1995 "HELP [<command>]\n",
1996 NULL, /* special! */
2004 "The 'BATCH' command permits the client to initiate a bulk load\n"
2005 " of commands to rrdcached.\n"
2010 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2011 " client: command #1\n"
2012 " client: command #2\n"
2013 " client: ... and so on\n"
2015 " server: 2 errors\n"
2016 " server: 7 message for command #7\n"
2017 " server: 9 message for command #9\n"
2019 "For more information, consult the rrdcached(1) documentation.\n"
2022 ".", /* BATCH terminator */
2030 handle_request_fetch,
2032 "FETCH <file> <CF> [<start> [<end>]]\n"
2034 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2038 handle_request_info,
2040 "INFO <filename>\n",
2041 "The INFO command retrieves information about a specified RRD file.\n"
2042 "This is returned in standard rrdinfo format, a sequence of lines\n"
2043 "with the format <keyname> = <value>\n"
2044 "Note that this is the data as of the last update of the RRD file itself,\n"
2045 "not the last time data was received via rrdcached, so there may be pending\n"
2046 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2050 handle_request_first,
2052 "FIRST <filename> <rra index>\n",
2053 "The FIRST command retrieves the first data time for a specified RRA in\n"
2058 handle_request_last,
2060 "LAST <filename>\n",
2061 "The LAST command retrieves the last update time for a specified RRD file.\n"
2062 "Note that this is the time of the last update of the RRD file itself, not\n"
2063 "the last time data was received via rrdcached, so there may be pending\n"
2064 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2068 handle_request_create,
2069 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2070 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2071 "The CREATE command will create an RRD file, overwriting any existing file\n"
2072 "unless the -O option is given or rrdcached was started with the -O option.\n"
2073 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2074 "not acceptable) and the step is in seconds (default is 300).\n"
2075 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2079 handle_request_quit,
2080 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2083 "Disconnect from rrdcached.\n"
2085 }; /* }}} command_t list_of_commands[] */
2086 static size_t list_of_commands_len = sizeof (list_of_commands)
2087 / sizeof (list_of_commands[0]);
2089 static command_t *find_command(char *cmd)
2093 for (i = 0; i < list_of_commands_len; i++)
2094 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2095 return (&list_of_commands[i]);
2099 /* We currently use the index in the `list_of_commands' array as a bit position
2100 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2101 * outside these functions so that switching to a more elegant storage method
2102 * is easily possible. */
2103 static ssize_t find_command_index (const char *cmd) /* {{{ */
2107 for (i = 0; i < list_of_commands_len; i++)
2108 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2109 return ((ssize_t) i);
2111 } /* }}} ssize_t find_command_index */
2113 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2118 if (JOURNAL_REPLAY(sock))
2124 if ((strcasecmp ("QUIT", cmd) == 0)
2125 || (strcasecmp ("HELP", cmd) == 0))
2127 else if (strcmp (".", cmd) == 0)
2130 i = find_command_index (cmd);
2135 if ((sock->permissions & (1 << i)) != 0)
2138 } /* }}} int socket_permission_check */
2140 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2145 i = find_command_index (cmd);
2150 sock->permissions |= (1 << i);
2152 } /* }}} int socket_permission_add */
2154 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2156 sock->permissions = 0;
2157 } /* }}} socket_permission_clear */
2159 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2160 listen_socket_t *src)
2162 dest->permissions = src->permissions;
2163 } /* }}} socket_permission_copy */
2165 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2169 sock->permissions = 0;
2170 for (i = 0; i < list_of_commands_len; i++)
2171 sock->permissions |= (1 << i);
2172 } /* }}} void socket_permission_set_all */
2174 /* check whether commands are received in the expected context */
2175 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2177 if (JOURNAL_REPLAY(sock))
2178 return (cmd->context & CMD_CONTEXT_JOURNAL);
2179 else if (sock->batch_start)
2180 return (cmd->context & CMD_CONTEXT_BATCH);
2182 return (cmd->context & CMD_CONTEXT_CLIENT);
2188 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2193 command_t *help = NULL;
2195 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2197 help = find_command(cmd_str);
2199 if (help && (help->syntax || help->help))
2201 char tmp[RRD_CMD_MAX];
2203 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2207 add_response_info(sock, "Usage: %s\n", help->syntax);
2210 add_response_info(sock, "%s\n", help->help);
2216 resp_txt = "Command overview\n";
2218 for (i = 0; i < list_of_commands_len; i++)
2220 if (list_of_commands[i].syntax == NULL)
2222 add_response_info (sock, "%s", list_of_commands[i].syntax);
2226 return send_response(sock, RESP_OK, resp_txt);
2227 } /* }}} int handle_request_help */
2229 static int handle_request (DISPATCH_PROTO) /* {{{ */
2231 char *buffer_ptr = buffer;
2232 char *cmd_str = NULL;
2233 command_t *cmd = NULL;
2236 assert (buffer[buffer_size - 1] == '\0');
2238 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2241 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2245 if (sock != NULL && sock->batch_start)
2248 cmd = find_command(cmd_str);
2250 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2252 if (!socket_permission_check (sock, cmd->cmd))
2253 return send_response(sock, RESP_ERR, "Permission denied.\n");
2255 if (!command_check_context(sock, cmd))
2256 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2258 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2259 } /* }}} int handle_request */
2261 static void journal_set_free (journal_set *js) /* {{{ */
2266 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2269 } /* }}} journal_set_free */
2271 static void journal_set_remove (journal_set *js) /* {{{ */
2276 for (uint i=0; i < js->files_num; i++)
2278 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2279 unlink(js->files[i]);
2281 } /* }}} journal_set_remove */
2283 /* close current journal file handle.
2284 * MUST hold journal_lock before calling */
2285 static void journal_close(void) /* {{{ */
2287 if (journal_fh != NULL)
2289 if (fclose(journal_fh) != 0)
2290 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2295 } /* }}} journal_close */
2297 /* MUST hold journal_lock before calling */
2298 static void journal_new_file(void) /* {{{ */
2302 char new_file[PATH_MAX + 1];
2304 assert(journal_dir != NULL);
2305 assert(journal_cur != NULL);
2309 gettimeofday(&now, NULL);
2310 /* this format assures that the files sort in strcmp() order */
2311 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2312 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2314 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2315 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2319 journal_fh = fdopen(new_fd, "a");
2320 if (journal_fh == NULL)
2323 journal_size = ftell(journal_fh);
2324 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2326 /* record the file in the journal set */
2327 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2333 "JOURNALING DISABLED: Error while trying to create %s : %s",
2334 new_file, rrd_strerror(errno));
2336 "JOURNALING DISABLED: All values will be flushed at shutdown");
2339 config_flush_at_shutdown = 1;
2341 } /* }}} journal_new_file */
2343 /* MUST NOT hold journal_lock before calling this */
2344 static void journal_rotate(void) /* {{{ */
2346 journal_set *old_js = NULL;
2348 if (journal_dir == NULL)
2351 RRDD_LOG(LOG_DEBUG, "rotating journals");
2353 pthread_mutex_lock(&stats_lock);
2354 ++stats_journal_rotate;
2355 pthread_mutex_unlock(&stats_lock);
2357 pthread_mutex_lock(&journal_lock);
2361 /* rotate the journal sets */
2362 old_js = journal_old;
2363 journal_old = journal_cur;
2364 journal_cur = calloc(1, sizeof(journal_set));
2366 if (journal_cur != NULL)
2369 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2371 pthread_mutex_unlock(&journal_lock);
2373 journal_set_remove(old_js);
2374 journal_set_free (old_js);
2376 } /* }}} static void journal_rotate */
2378 /* MUST hold journal_lock when calling */
2379 static void journal_done(void) /* {{{ */
2381 if (journal_cur == NULL)
2386 if (config_flush_at_shutdown)
2388 RRDD_LOG(LOG_INFO, "removing journals");
2389 journal_set_remove(journal_old);
2390 journal_set_remove(journal_cur);
2394 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2395 "journals will be used at next startup");
2398 journal_set_free(journal_cur);
2399 journal_set_free(journal_old);
2402 } /* }}} static void journal_done */
2404 static int journal_write(char *cmd, char *args) /* {{{ */
2408 if (journal_fh == NULL)
2411 pthread_mutex_lock(&journal_lock);
2412 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2413 journal_size += chars;
2415 if (journal_size > JOURNAL_MAX)
2418 pthread_mutex_unlock(&journal_lock);
2422 pthread_mutex_lock(&stats_lock);
2423 stats_journal_bytes += chars;
2424 pthread_mutex_unlock(&stats_lock);
2428 } /* }}} static int journal_write */
2430 static int journal_replay (const char *file) /* {{{ */
2436 char entry[RRD_CMD_MAX];
2439 if (file == NULL) return 0;
2442 char *reason = "unknown error";
2444 struct stat statbuf;
2446 memset(&statbuf, 0, sizeof(statbuf));
2447 if (stat(file, &statbuf) != 0)
2449 reason = "stat error";
2452 else if (!S_ISREG(statbuf.st_mode))
2454 reason = "not a regular file";
2457 if (statbuf.st_uid != daemon_uid)
2459 reason = "not owned by daemon user";
2462 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2464 reason = "must not be user/group writable";
2470 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2471 file, rrd_strerror(status), reason);
2476 fh = fopen(file, "r");
2479 if (errno != ENOENT)
2480 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2481 file, rrd_strerror(errno));
2485 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2494 if (fgets(entry, sizeof(entry), fh) == NULL)
2496 entry_len = strlen(entry);
2498 /* check \n termination in case journal writing crashed mid-line */
2501 else if (entry[entry_len - 1] != '\n')
2503 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2508 entry[entry_len - 1] = '\0';
2510 if (handle_request(NULL, now, entry, entry_len) == 0)
2518 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2519 entry_cnt, fail_cnt);
2521 return entry_cnt > 0 ? 1 : 0;
2522 } /* }}} static int journal_replay */
2524 static int journal_sort(const void *v1, const void *v2)
2526 char **jn1 = (char **) v1;
2527 char **jn2 = (char **) v2;
2529 return strcmp(*jn1,*jn2);
2532 static void journal_init(void) /* {{{ */
2534 int had_journal = 0;
2536 struct dirent *dent;
2537 char path[PATH_MAX+1];
2539 if (journal_dir == NULL) return;
2541 pthread_mutex_lock(&journal_lock);
2543 journal_cur = calloc(1, sizeof(journal_set));
2544 if (journal_cur == NULL)
2546 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2550 RRDD_LOG(LOG_INFO, "checking for journal files");
2552 /* Handle old journal files during transition. This gives them the
2553 * correct sort order. TODO: remove after first release
2556 char old_path[PATH_MAX+1];
2557 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2558 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2559 rename(old_path, path);
2561 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2562 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2563 rename(old_path, path);
2566 dir = opendir(journal_dir);
2568 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2571 while ((dent = readdir(dir)) != NULL)
2573 /* looks like a journal file? */
2574 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2577 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2579 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2581 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2588 qsort(journal_cur->files, journal_cur->files_num,
2589 sizeof(journal_cur->files[0]), journal_sort);
2591 for (uint i=0; i < journal_cur->files_num; i++)
2592 had_journal += journal_replay(journal_cur->files[i]);
2596 /* it must have been a crash. start a flush */
2597 if (had_journal && config_flush_at_shutdown)
2598 flush_old_values(-1);
2600 pthread_mutex_unlock(&journal_lock);
2602 RRDD_LOG(LOG_INFO, "journal processing complete");
2604 } /* }}} static void journal_init */
2606 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2608 assert(sock != NULL);
2610 free(sock->rbuf); sock->rbuf = NULL;
2611 free(sock->wbuf); sock->wbuf = NULL;
2613 } /* }}} void free_listen_socket */
2615 static void close_connection(listen_socket_t *sock) /* {{{ */
2623 free_listen_socket(sock);
2625 } /* }}} void close_connection */
2627 static void *connection_thread_main (void *args) /* {{{ */
2629 listen_socket_t *sock;
2632 sock = (listen_socket_t *) args;
2635 /* init read buffers */
2636 sock->next_read = sock->next_cmd = 0;
2637 sock->rbuf = malloc(RBUF_SIZE);
2638 if (sock->rbuf == NULL)
2640 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2641 close_connection(sock);
2645 pthread_mutex_lock (&connection_threads_lock);
2647 /* LIBWRAP does not support multiple threads! By putting this code
2648 inside pthread_mutex_lock we do not have to worry about request_info
2649 getting overwritten by another thread.
2651 struct request_info req;
2652 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2654 if(!hosts_access(&req)) {
2655 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2656 pthread_mutex_unlock (&connection_threads_lock);
2657 close_connection(sock);
2660 #endif /* HAVE_LIBWRAP */
2661 connection_threads_num++;
2662 pthread_mutex_unlock (&connection_threads_lock);
2664 while (state == RUNNING)
2671 struct pollfd pollfd;
2675 pollfd.events = POLLIN | POLLPRI;
2678 status = poll (&pollfd, 1, /* timeout = */ 500);
2679 if (state != RUNNING)
2681 else if (status == 0) /* timeout */
2683 else if (status < 0) /* error */
2686 if (status != EINTR)
2687 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2691 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2693 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2695 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2696 "poll(2) returned something unexpected: %#04hx",
2701 rbytes = read(fd, sock->rbuf + sock->next_read,
2702 RBUF_SIZE - sock->next_read);
2705 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2708 else if (rbytes == 0)
2711 sock->next_read += rbytes;
2713 if (sock->batch_start)
2714 now = sock->batch_start;
2718 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2720 status = handle_request (sock, now, cmd, cmd_len+1);
2727 close_connection(sock);
2729 /* Remove this thread from the connection threads list */
2730 pthread_mutex_lock (&connection_threads_lock);
2731 connection_threads_num--;
2732 if (connection_threads_num <= 0)
2733 pthread_cond_broadcast(&connection_threads_done);
2734 pthread_mutex_unlock (&connection_threads_lock);
2737 } /* }}} void *connection_thread_main */
2739 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2742 struct sockaddr_un sa;
2743 listen_socket_t *temp;
2746 char *path_copy, *dir;
2749 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2750 path += strlen("unix:");
2752 /* dirname may modify its argument */
2753 path_copy = strdup(path);
2754 if (path_copy == NULL)
2756 fprintf(stderr, "rrdcached: strdup(): %s\n",
2757 rrd_strerror(errno));
2761 dir = dirname(path_copy);
2762 if (rrd_mkdir_p(dir, 0777) != 0)
2764 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2765 dir, rrd_strerror(errno));
2771 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2772 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2775 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2779 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2781 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2784 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2785 rrd_strerror(errno));
2789 memset (&sa, 0, sizeof (sa));
2790 sa.sun_family = AF_UNIX;
2791 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2793 /* if we've gotten this far, we own the pid file. any daemon started
2794 * with the same args must not be alive. therefore, ensure that we can
2795 * create the socket...
2799 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2802 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2803 path, rrd_strerror(errno));
2808 /* tweak the sockets group ownership */
2809 if (sock->socket_group != (gid_t)-1)
2811 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2812 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2814 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2818 if (sock->socket_permissions != (mode_t)-1)
2820 if (chmod(path, sock->socket_permissions) != 0)
2821 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2822 (unsigned int)sock->socket_permissions, strerror(errno));
2825 status = listen (fd, /* backlog = */ 10);
2828 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2829 path, rrd_strerror(errno));
2835 listen_fds[listen_fds_num].fd = fd;
2836 listen_fds[listen_fds_num].family = PF_UNIX;
2837 strncpy(listen_fds[listen_fds_num].addr, path,
2838 sizeof (listen_fds[listen_fds_num].addr) - 1);
2842 } /* }}} int open_listen_socket_unix */
2844 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2846 struct addrinfo ai_hints;
2847 struct addrinfo *ai_res;
2848 struct addrinfo *ai_ptr;
2849 char addr_copy[NI_MAXHOST];
2854 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2855 addr_copy[sizeof (addr_copy) - 1] = 0;
2858 memset (&ai_hints, 0, sizeof (ai_hints));
2859 ai_hints.ai_flags = 0;
2860 #ifdef AI_ADDRCONFIG
2861 ai_hints.ai_flags |= AI_ADDRCONFIG;
2863 ai_hints.ai_family = AF_UNSPEC;
2864 ai_hints.ai_socktype = SOCK_STREAM;
2867 if (*addr == '[') /* IPv6+port format */
2869 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2872 port = strchr (addr, ']');
2875 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2883 else if (*port == 0)
2887 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2890 } /* if (*addr == '[') */
2893 port = rindex(addr, ':');
2901 status = getaddrinfo (addr,
2902 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2903 &ai_hints, &ai_res);
2906 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2907 addr, gai_strerror (status));
2911 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2914 listen_socket_t *temp;
2917 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2918 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2922 "rrdcached: open_listen_socket_network: realloc failed.\n");
2926 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2928 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2931 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2932 rrd_strerror(errno));
2936 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2938 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2941 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2942 sock->addr, rrd_strerror(errno));
2947 status = listen (fd, /* backlog = */ 10);
2950 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2951 sock->addr, rrd_strerror(errno));
2953 freeaddrinfo(ai_res);
2957 listen_fds[listen_fds_num].fd = fd;
2958 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2960 } /* for (ai_ptr) */
2962 freeaddrinfo(ai_res);
2964 } /* }}} static int open_listen_socket_network */
2966 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2968 assert(sock != NULL);
2969 assert(sock->addr != NULL);
2971 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2972 || sock->addr[0] == '/')
2973 return (open_listen_socket_unix(sock));
2975 return (open_listen_socket_network(sock));
2976 } /* }}} int open_listen_socket */
2978 #ifndef SD_LISTEN_FDS_START
2979 # define SD_LISTEN_FDS_START 3
2982 * returns number of descriptors passed from systemd
2984 static int open_listen_sockets_systemd(void) /* {{{ */
2986 listen_socket_t *temp;
2987 struct sockaddr_un sa;
2993 /* check if it for us */
2994 env = getenv("LISTEN_PID");
2998 n = strtoul(env, NULL, 10);
2999 if (!n || n == ULONG_MAX || (pid_t)n != getpid())
3002 /* get the number of passed descriptors */
3003 env = getenv("LISTEN_FDS");
3007 n = strtoul(env, NULL, 10);
3008 if (!n || n == ULONG_MAX)
3011 temp = (listen_socket_t *) rrd_realloc (listen_fds,
3012 sizeof (listen_fds[0]) * (listen_fds_num + n));
3015 fprintf (stderr, "rrdcached: open_listen_socket_systemd: realloc failed.\n");
3020 for (unsigned int i = 0; i < n; i++)
3022 sd_fd = SD_LISTEN_FDS_START + i;
3026 if (getsockname(sd_fd, &sa, &l) < 0)
3028 fprintf(stderr, "open_listen_sockets_systemd: problem getting fd %d: %s\n", sd_fd, rrd_strerror (errno));
3032 listen_fds[listen_fds_num].fd = sd_fd;
3033 listen_fds[listen_fds_num].family = sa.sun_family;
3038 } /* }}} open_listen_sockets_systemd */
3040 static void open_listen_sockets_traditional(void) /* {{{ */
3042 if (config_listen_address_list_len > 0)
3044 for (size_t i = 0; i < config_listen_address_list_len; i++)
3045 open_listen_socket (config_listen_address_list[i]);
3047 rrd_free_ptrs((void ***) &config_listen_address_list,
3048 &config_listen_address_list_len);
3052 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3053 sizeof(default_socket.addr) - 1);
3054 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3056 if (default_socket.permissions == 0)
3057 socket_permission_set_all (&default_socket);
3059 open_listen_socket (&default_socket);
3061 } /* }}} open_list_sockets_traditional */
3063 static int close_listen_sockets (void) /* {{{ */
3067 for (i = 0; i < listen_fds_num; i++)
3069 close (listen_fds[i].fd);
3071 if (listen_fds[i].family == PF_UNIX)
3072 unlink(listen_fds[i].addr);
3080 } /* }}} int close_listen_sockets */
3082 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
3084 struct pollfd *pollfds;
3089 if (listen_fds_num < 1)
3091 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3095 pollfds_num = listen_fds_num;
3096 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3097 if (pollfds == NULL)
3099 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3102 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3104 RRDD_LOG(LOG_INFO, "listening for connections");
3106 while (state == RUNNING)
3108 for (i = 0; i < pollfds_num; i++)
3110 pollfds[i].fd = listen_fds[i].fd;
3111 pollfds[i].events = POLLIN | POLLPRI;
3112 pollfds[i].revents = 0;
3115 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3116 if (state != RUNNING)
3118 else if (status == 0) /* timeout */
3120 else if (status < 0) /* error */
3123 if (status != EINTR)
3125 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3130 for (i = 0; i < pollfds_num; i++)
3132 listen_socket_t *client_sock;
3133 struct sockaddr_storage client_sa;
3134 socklen_t client_sa_size;
3136 pthread_attr_t attr;
3138 if (pollfds[i].revents == 0)
3141 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3143 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3144 "poll(2) returned something unexpected for listen FD #%i.",
3149 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3150 if (client_sock == NULL)
3152 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3155 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3157 client_sa_size = sizeof (client_sa);
3158 client_sock->fd = accept (pollfds[i].fd,
3159 (struct sockaddr *) &client_sa, &client_sa_size);
3160 if (client_sock->fd < 0)
3162 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3167 pthread_attr_init (&attr);
3168 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3170 status = pthread_create (&tid, &attr, connection_thread_main,
3174 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3175 close_connection(client_sock);
3178 } /* for (pollfds_num) */
3179 } /* while (state == RUNNING) */
3181 RRDD_LOG(LOG_INFO, "starting shutdown");
3183 close_listen_sockets ();
3185 pthread_mutex_lock (&connection_threads_lock);
3186 while (connection_threads_num > 0)
3187 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3188 pthread_mutex_unlock (&connection_threads_lock);
3193 } /* }}} void *listen_thread_main */
3195 static int daemonize (void) /* {{{ */
3200 daemon_uid = geteuid();
3202 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3204 pid_fd = check_pidfile();
3208 /* gather sockets passed from systemd;
3209 * if none, open all the listen sockets from config or default */
3211 if (!(open_listen_sockets_systemd() > 0))
3212 open_listen_sockets_traditional();
3214 if (listen_fds_num < 1)
3216 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3220 if (!stay_foreground)
3227 fprintf (stderr, "daemonize: fork(2) failed.\n");
3233 /* Become session leader */
3236 /* Open the first three file descriptors to /dev/null */
3241 open ("/dev/null", O_RDWR);
3242 if (dup(0) == -1 || dup(0) == -1){
3243 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3245 } /* if (!stay_foreground) */
3247 /* Change into the /tmp directory. */
3248 base_dir = (config_base_dir != NULL)
3252 if (chdir (base_dir) != 0)
3254 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3258 install_signal_handlers();
3260 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3261 RRDD_LOG(LOG_INFO, "starting up");
3263 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3264 (GDestroyNotify) free_cache_item);
3265 if (cache_tree == NULL)
3267 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3271 return write_pidfile (pid_fd);
3276 } /* }}} int daemonize */
3278 static int cleanup (void) /* {{{ */
3280 pthread_cond_broadcast (&flush_cond);
3281 pthread_join (flush_thread, NULL);
3283 pthread_cond_broadcast (&queue_cond);
3284 for (int i = 0; i < config_queue_threads; i++)
3285 pthread_join (queue_threads[i], NULL);
3287 if (config_flush_at_shutdown)
3289 assert(cache_queue_head == NULL);
3290 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3293 free(queue_threads);
3294 free(config_base_dir);
3296 pthread_mutex_lock(&cache_lock);
3297 g_tree_destroy(cache_tree);
3299 pthread_mutex_lock(&journal_lock);
3302 RRDD_LOG(LOG_INFO, "goodbye");
3306 free(config_pid_file);
3309 } /* }}} int cleanup */
3311 static int read_options (int argc, char **argv) /* {{{ */
3316 socket_permission_clear (&default_socket);
3318 default_socket.socket_group = (gid_t)-1;
3319 default_socket.socket_permissions = (mode_t)-1;
3321 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3326 opt_no_overwrite = 1;
3335 listen_socket_t *new;
3337 new = malloc(sizeof(listen_socket_t));
3340 fprintf(stderr, "read_options: malloc failed.\n");
3343 memset(new, 0, sizeof(listen_socket_t));
3345 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3347 /* Add permissions to the socket {{{ */
3348 if (default_socket.permissions != 0)
3350 socket_permission_copy (new, &default_socket);
3352 else /* if (default_socket.permissions == 0) */
3354 /* Add permission for ALL commands to the socket. */
3355 socket_permission_set_all (new);
3357 /* }}} Done adding permissions. */
3359 new->socket_group = default_socket.socket_group;
3360 new->socket_permissions = default_socket.socket_permissions;
3362 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3363 &config_listen_address_list_len, new))
3365 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3371 /* set socket group permissions */
3377 group_gid = strtoul(optarg, NULL, 10);
3378 if (errno != EINVAL && group_gid>0)
3380 /* we were passed a number */
3381 grp = getgrgid(group_gid);
3385 grp = getgrnam(optarg);
3390 default_socket.socket_group = grp->gr_gid;
3394 /* no idea what the user wanted... */
3395 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3401 /* set socket file permissions */
3405 char *endptr = NULL;
3407 tmp = strtol (optarg, &endptr, 8);
3408 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3409 || (tmp > 07777) || (tmp < 0)) {
3410 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3415 default_socket.socket_permissions = (mode_t)tmp;
3426 socket_permission_clear (&default_socket);
3428 optcopy = strdup (optarg);
3431 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3434 status = socket_permission_add (&default_socket, ptr);
3437 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3438 "socket failed. Most likely, this permission doesn't "
3439 "exist. Check your command line.\n", ptr);
3452 temp = atoi (optarg);
3454 config_flush_interval = temp;
3457 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3467 temp = atoi (optarg);
3469 config_write_interval = temp;
3472 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3482 temp = atoi(optarg);
3484 config_write_jitter = temp;
3487 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3497 threads = atoi(optarg);
3499 config_queue_threads = threads;
3502 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3509 config_write_base_only = 1;
3515 char base_realpath[PATH_MAX];
3517 if (config_base_dir != NULL)
3518 free (config_base_dir);
3519 config_base_dir = strdup (optarg);
3520 if (config_base_dir == NULL)
3522 fprintf (stderr, "read_options: strdup failed.\n");
3526 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3528 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3529 config_base_dir, rrd_strerror (errno));
3533 /* make sure that the base directory is not resolved via
3534 * symbolic links. this makes some performance-enhancing
3535 * assumptions possible (we don't have to resolve paths
3536 * that start with a "/")
3538 if (realpath(config_base_dir, base_realpath) == NULL)
3540 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3541 "%s\n", config_base_dir, rrd_strerror(errno));
3545 len = strlen (config_base_dir);
3546 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3548 config_base_dir[len - 1] = 0;
3554 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3558 _config_base_dir_len = len;
3560 len = strlen (base_realpath);
3561 while ((len > 0) && (base_realpath[len - 1] == '/'))
3563 base_realpath[len - 1] = '\0';
3567 if (strncmp(config_base_dir,
3568 base_realpath, sizeof(base_realpath)) != 0)
3571 "Base directory (-b) resolved via file system links!\n"
3572 "Please consult rrdcached '-b' documentation!\n"
3573 "Consider specifying the real directory (%s)\n",
3582 if (config_pid_file != NULL)
3583 free (config_pid_file);
3584 config_pid_file = strdup (optarg);
3585 if (config_pid_file == NULL)
3587 fprintf (stderr, "read_options: strdup failed.\n");
3594 config_flush_at_shutdown = 1;
3599 char journal_dir_actual[PATH_MAX];
3600 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3603 // if we were able to properly resolve the path, lets have a copy
3604 // for use outside this block.
3605 journal_dir = strdup(journal_dir);
3606 status = rrd_mkdir_p(journal_dir, 0777);
3609 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3610 journal_dir, rrd_strerror(errno));
3613 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3615 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3616 errno ? rrd_strerror(errno) : "");
3620 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3621 errno ? rrd_strerror(errno) : "");
3629 int temp = atoi(optarg);
3631 config_alloc_chunk = temp;
3634 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3642 printf ("RRDCacheD %s\n"
3643 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3645 "Usage: rrdcached [options]\n"
3647 "Valid options are:\n"
3648 " -l <address> Socket address to listen to.\n"
3649 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3650 " -P <perms> Sets the permissions to assign to all following "
3652 " -w <seconds> Interval in which to write data.\n"
3653 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3654 " -t <threads> Number of write threads.\n"
3655 " -f <seconds> Interval in which to flush dead data.\n"
3656 " -p <file> Location of the PID-file.\n"
3657 " -b <dir> Base directory to change to.\n"
3658 " -B Restrict file access to paths within -b <dir>\n"
3659 " -g Do not fork and run in the foreground.\n"
3660 " -j <dir> Directory in which to create the journal files.\n"
3661 " -F Always flush all updates at shutdown\n"
3662 " -s <id|name> Group owner of all following UNIX sockets\n"
3663 " (the socket will also have read/write permissions "
3665 " -m <mode> File permissions (octal) of all following UNIX "
3667 " -a <size> Memory allocation chunk size. Default is 1.\n"
3668 " -O Do not allow CREATE commands to overwrite existing\n"
3669 " files, even if asked to.\n"
3671 "For more information and a detailed description of all options "
3673 "to the rrdcached(1) manual page.\n",
3680 } /* switch (option) */
3681 } /* while (getopt) */
3683 /* advise the user when values are not sane */
3684 if (config_flush_interval < 2 * config_write_interval)
3685 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3686 " 2x write interval (-w) !\n");
3687 if (config_write_jitter > config_write_interval)
3688 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3689 " write interval (-w) !\n");
3691 if (config_write_base_only && config_base_dir == NULL)
3692 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3693 " Consult the rrdcached documentation\n");
3695 if (journal_dir == NULL)
3696 config_flush_at_shutdown = 1;
3699 } /* }}} int read_options */
3701 int main (int argc, char **argv)
3705 status = read_options (argc, argv);
3713 status = daemonize ();
3716 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3722 /* start the queue threads */
3723 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3724 if (queue_threads == NULL)
3726 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3730 for (int i = 0; i < config_queue_threads; i++)
3732 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3733 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3736 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3742 /* start the flush thread */
3743 memset(&flush_thread, 0, sizeof(flush_thread));
3744 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3747 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3752 listen_thread_main (NULL);
3759 * vim: set sw=2 sts=2 ts=8 et fdm=marker :