2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
68 #include "rrd_client.h"
80 #include <sys/socket.h>
88 #include <sys/types.h>
100 #include <sys/time.h>
107 #endif /* HAVE_LIBWRAP */
109 #include <glib-2.0/glib.h>
112 #define RRDD_LOG(severity, ...) \
114 if (stay_foreground) { \
115 fprintf(stderr, __VA_ARGS__); \
116 fprintf(stderr, "\n"); } \
117 syslog ((severity), __VA_ARGS__); \
123 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
125 struct listen_socket_s
128 char addr[PATH_MAX + 1];
131 /* state for BATCH processing */
143 uint32_t permissions;
146 mode_t socket_permissions;
148 typedef struct listen_socket_s listen_socket_t;
151 typedef struct command_s command_t;
152 /* note: guard against "unused" warnings in the handlers */
153 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
155 char UNUSED(*buffer),\
156 size_t UNUSED(buffer_size)
158 #define HANDLER_PROTO command_t UNUSED(*cmd),\
163 int (*handler)(HANDLER_PROTO);
165 char context; /* where we expect to see it */
166 #define CMD_CONTEXT_CLIENT (1<<0)
167 #define CMD_CONTEXT_BATCH (1<<1)
168 #define CMD_CONTEXT_JOURNAL (1<<2)
169 #define CMD_CONTEXT_ANY (0x7f)
176 typedef struct cache_item_s cache_item_t;
181 size_t values_num; /* number of valid pointers */
182 size_t values_alloc; /* number of allocated pointers */
183 time_t last_flush_time;
184 double last_update_stamp;
185 #define CI_FLAGS_IN_TREE (1<<0)
186 #define CI_FLAGS_IN_QUEUE (1<<1)
188 pthread_cond_t flushed;
193 struct callback_flush_data_s
200 typedef struct callback_flush_data_s callback_flush_data_t;
207 typedef enum queue_side_e queue_side_t;
209 /* describe a set of journal files */
215 #define RBUF_SIZE (RRD_CMD_MAX*2)
220 static int stay_foreground = 0;
221 static uid_t daemon_uid;
223 static listen_socket_t *listen_fds = NULL;
224 static size_t listen_fds_num = 0;
226 static listen_socket_t default_socket;
229 RUNNING, /* normal operation */
230 FLUSHING, /* flushing remaining values */
231 SHUTDOWN /* shutting down */
234 static pthread_t *queue_threads;
235 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
236 static int config_queue_threads = 4;
238 static pthread_t flush_thread;
239 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
241 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
242 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
243 static int connection_threads_num = 0;
246 static GTree *cache_tree = NULL;
247 static cache_item_t *cache_queue_head = NULL;
248 static cache_item_t *cache_queue_tail = NULL;
249 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
251 static int config_write_interval = 300;
252 static int config_write_jitter = 0;
253 static int config_flush_interval = 3600;
254 static int config_flush_at_shutdown = 0;
255 static char *config_pid_file = NULL;
256 static char *config_base_dir = NULL;
257 static size_t _config_base_dir_len = 0;
258 static int config_write_base_only = 0;
259 static size_t config_alloc_chunk = 1;
261 static listen_socket_t **config_listen_address_list = NULL;
262 static size_t config_listen_address_list_len = 0;
264 static uint64_t stats_queue_length = 0;
265 static uint64_t stats_updates_received = 0;
266 static uint64_t stats_flush_received = 0;
267 static uint64_t stats_updates_written = 0;
268 static uint64_t stats_data_sets_written = 0;
269 static uint64_t stats_journal_bytes = 0;
270 static uint64_t stats_journal_rotate = 0;
271 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
273 static int opt_no_overwrite = 0; /* default for the daemon */
275 /* Journaled updates */
276 #define JOURNAL_REPLAY(s) ((s) == NULL)
277 #define JOURNAL_BASE "rrd.journal"
278 static journal_set *journal_cur = NULL;
279 static journal_set *journal_old = NULL;
280 static char *journal_dir = NULL;
281 static FILE *journal_fh = NULL; /* current journal file handle */
282 static long journal_size = 0; /* current journal size */
283 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
284 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
285 static int journal_write(char *cmd, char *args);
286 static void journal_done(void);
287 static void journal_rotate(void);
289 /* prototypes for forward refernces */
290 static int handle_request_help (HANDLER_PROTO);
295 static void sig_common (const char *sig) /* {{{ */
297 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
299 pthread_cond_broadcast(&flush_cond);
300 pthread_cond_broadcast(&queue_cond);
301 } /* }}} void sig_common */
303 static void sig_int_handler (int UNUSED(s)) /* {{{ */
306 } /* }}} void sig_int_handler */
308 static void sig_term_handler (int UNUSED(s)) /* {{{ */
311 } /* }}} void sig_term_handler */
313 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
315 config_flush_at_shutdown = 1;
317 } /* }}} void sig_usr1_handler */
319 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
321 config_flush_at_shutdown = 0;
323 } /* }}} void sig_usr2_handler */
325 static void install_signal_handlers(void) /* {{{ */
327 /* These structures are static, because `sigaction' behaves weird if the are
329 static struct sigaction sa_int;
330 static struct sigaction sa_term;
331 static struct sigaction sa_pipe;
332 static struct sigaction sa_usr1;
333 static struct sigaction sa_usr2;
335 /* Install signal handlers */
336 memset (&sa_int, 0, sizeof (sa_int));
337 sa_int.sa_handler = sig_int_handler;
338 sigaction (SIGINT, &sa_int, NULL);
340 memset (&sa_term, 0, sizeof (sa_term));
341 sa_term.sa_handler = sig_term_handler;
342 sigaction (SIGTERM, &sa_term, NULL);
344 memset (&sa_pipe, 0, sizeof (sa_pipe));
345 sa_pipe.sa_handler = SIG_IGN;
346 sigaction (SIGPIPE, &sa_pipe, NULL);
348 memset (&sa_pipe, 0, sizeof (sa_usr1));
349 sa_usr1.sa_handler = sig_usr1_handler;
350 sigaction (SIGUSR1, &sa_usr1, NULL);
352 memset (&sa_usr2, 0, sizeof (sa_usr2));
353 sa_usr2.sa_handler = sig_usr2_handler;
354 sigaction (SIGUSR2, &sa_usr2, NULL);
356 } /* }}} void install_signal_handlers */
358 static int open_pidfile(char *action, int oflag) /* {{{ */
362 char *file_copy, *dir;
364 file = (config_pid_file != NULL)
366 : LOCALSTATEDIR "/run/rrdcached.pid";
368 /* dirname may modify its argument */
369 file_copy = strdup(file);
370 if (file_copy == NULL)
372 fprintf(stderr, "rrdcached: strdup(): %s\n",
373 rrd_strerror(errno));
377 dir = dirname(file_copy);
378 if (rrd_mkdir_p(dir, 0777) != 0)
380 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
381 dir, rrd_strerror(errno));
387 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
389 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
390 action, file, rrd_strerror(errno));
393 } /* }}} static int open_pidfile */
395 /* check existing pid file to see whether a daemon is running */
396 static int check_pidfile(void)
402 pid_fd = open_pidfile("open", O_RDWR);
406 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
413 /* another running process that we can signal COULD be
414 * a competing rrdcached */
415 if (pid != getpid() && kill(pid, 0) == 0)
418 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
423 lseek(pid_fd, 0, SEEK_SET);
424 if (ftruncate(pid_fd, 0) == -1)
427 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
433 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
434 "rrdcached: starting normally.\n", pid);
437 } /* }}} static int check_pidfile */
439 static int write_pidfile (int fd) /* {{{ */
446 fh = fdopen (fd, "w");
449 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
454 fprintf (fh, "%i\n", (int) pid);
458 } /* }}} int write_pidfile */
460 static int remove_pidfile (void) /* {{{ */
465 file = (config_pid_file != NULL)
467 : LOCALSTATEDIR "/run/rrdcached.pid";
469 status = unlink (file);
473 } /* }}} int remove_pidfile */
475 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
479 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
480 sock->next_read - sock->next_cmd);
484 /* no commands left, move remainder back to front of rbuf */
485 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
486 sock->next_read - sock->next_cmd);
487 sock->next_read -= sock->next_cmd;
494 char *cmd = sock->rbuf + sock->next_cmd;
497 sock->next_cmd = eol - sock->rbuf + 1;
499 if (eol > sock->rbuf && *(eol-1) == '\r')
500 *(--eol) = '\0'; /* handle "\r\n" EOL */
509 } /* }}} char *next_cmd */
511 /* add the characters directly to the write buffer */
512 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
516 assert(sock != NULL);
518 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
521 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
525 strncpy(new_buf + sock->wbuf_len, str, len + 1);
527 sock->wbuf = new_buf;
528 sock->wbuf_len += len;
531 } /* }}} static int add_to_wbuf */
533 /* add the text to the "extra" info that's sent after the status line */
534 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
537 char buffer[RRD_CMD_MAX];
540 if (JOURNAL_REPLAY(sock)) return 0;
541 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
544 #ifdef HAVE_VSNPRINTF
545 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
547 len = vsprintf(buffer, fmt, argp);
552 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
556 return add_to_wbuf(sock, buffer, len);
557 } /* }}} static int add_response_info */
559 static int count_lines(char *str) /* {{{ */
565 while ((str = strchr(str, '\n')) != NULL)
573 } /* }}} static int count_lines */
575 /* send the response back to the user.
576 * returns 0 on success, -1 on error
577 * write buffer is always zeroed after this call */
578 static int send_response (listen_socket_t *sock, response_code rc,
579 char *fmt, ...) /* {{{ */
582 char buffer[RRD_CMD_MAX];
587 if (JOURNAL_REPLAY(sock)) return rc;
589 if (sock->batch_start)
592 return rc; /* no response on success during BATCH */
593 lines = sock->batch_cmd;
595 else if (rc == RESP_OK)
596 lines = count_lines(sock->wbuf);
600 rclen = sprintf(buffer, "%d ", lines);
602 #ifdef HAVE_VSNPRINTF
603 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
605 len = vsprintf(buffer+rclen, fmt, argp);
613 /* append the result to the wbuf, don't write to the user */
614 if (sock->batch_start)
615 return add_to_wbuf(sock, buffer, len);
617 /* first write must be complete */
618 if (len != write(sock->fd, buffer, len))
620 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
624 if (sock->wbuf != NULL && rc == RESP_OK)
627 while (wrote < sock->wbuf_len)
629 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
632 RRDD_LOG(LOG_INFO, "send_response: could not write results");
639 free(sock->wbuf); sock->wbuf = NULL;
645 static void wipe_ci_values(cache_item_t *ci, time_t when)
649 ci->values_alloc = 0;
651 ci->last_flush_time = when;
652 if (config_write_jitter > 0)
653 ci->last_flush_time += (rrd_random() % config_write_jitter);
657 * remove a "cache_item_t" item from the queue.
658 * must hold 'cache_lock' when calling this
660 static void remove_from_queue(cache_item_t *ci) /* {{{ */
662 if (ci == NULL) return;
663 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
665 if (ci->prev == NULL)
666 cache_queue_head = ci->next; /* reset head */
668 ci->prev->next = ci->next;
670 if (ci->next == NULL)
671 cache_queue_tail = ci->prev; /* reset the tail */
673 ci->next->prev = ci->prev;
675 ci->next = ci->prev = NULL;
676 ci->flags &= ~CI_FLAGS_IN_QUEUE;
678 pthread_mutex_lock (&stats_lock);
679 assert (stats_queue_length > 0);
680 stats_queue_length--;
681 pthread_mutex_unlock (&stats_lock);
683 } /* }}} static void remove_from_queue */
685 /* free the resources associated with the cache_item_t
686 * must hold cache_lock when calling this function
688 static void *free_cache_item(cache_item_t *ci) /* {{{ */
690 if (ci == NULL) return NULL;
692 remove_from_queue(ci);
694 for (size_t i=0; i < ci->values_num; i++)
700 /* in case anyone is waiting */
701 pthread_cond_broadcast(&ci->flushed);
702 pthread_cond_destroy(&ci->flushed);
707 } /* }}} static void *free_cache_item */
710 * enqueue_cache_item:
711 * `cache_lock' must be acquired before calling this function!
713 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
719 if (ci->values_num == 0)
724 if (cache_queue_head == ci)
727 /* remove if further down in queue */
728 remove_from_queue(ci);
731 ci->next = cache_queue_head;
732 if (ci->next != NULL)
734 cache_queue_head = ci;
736 if (cache_queue_tail == NULL)
737 cache_queue_tail = cache_queue_head;
739 else /* (side == TAIL) */
741 /* We don't move values back in the list.. */
742 if (ci->flags & CI_FLAGS_IN_QUEUE)
745 assert (ci->next == NULL);
746 assert (ci->prev == NULL);
748 ci->prev = cache_queue_tail;
750 if (cache_queue_tail == NULL)
751 cache_queue_head = ci;
753 cache_queue_tail->next = ci;
755 cache_queue_tail = ci;
758 ci->flags |= CI_FLAGS_IN_QUEUE;
760 pthread_cond_signal(&queue_cond);
761 pthread_mutex_lock (&stats_lock);
762 stats_queue_length++;
763 pthread_mutex_unlock (&stats_lock);
766 } /* }}} int enqueue_cache_item */
769 * tree_callback_flush:
770 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
771 * while this is in progress.
773 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
777 callback_flush_data_t *cfd;
779 ci = (cache_item_t *) value;
780 cfd = (callback_flush_data_t *) data;
782 if (ci->flags & CI_FLAGS_IN_QUEUE)
785 if (ci->values_num > 0
786 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
788 enqueue_cache_item (ci, TAIL);
790 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
791 && (ci->values_num <= 0))
793 assert ((char *) key == ci->file);
794 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
796 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
802 } /* }}} gboolean tree_callback_flush */
804 static int flush_old_values (int max_age)
806 callback_flush_data_t cfd;
809 memset (&cfd, 0, sizeof (cfd));
810 /* Pass the current time as user data so that we don't need to call
811 * `time' for each node. */
812 cfd.now = time (NULL);
817 cfd.abs_timeout = cfd.now - max_age;
819 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
821 /* `tree_callback_flush' will return the keys of all values that haven't
822 * been touched in the last `config_flush_interval' seconds in `cfd'.
823 * The char*'s in this array point to the same memory as ci->file, so we
824 * don't need to free them separately. */
825 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
827 for (k = 0; k < cfd.keys_num; k++)
829 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
830 /* should never fail, since we have held the cache_lock
832 assert(status == TRUE);
835 if (cfd.keys != NULL)
842 } /* int flush_old_values */
844 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
847 struct timespec next_flush;
850 gettimeofday (&now, NULL);
851 next_flush.tv_sec = now.tv_sec + config_flush_interval;
852 next_flush.tv_nsec = 1000 * now.tv_usec;
854 pthread_mutex_lock(&cache_lock);
856 while (state == RUNNING)
858 gettimeofday (&now, NULL);
859 if ((now.tv_sec > next_flush.tv_sec)
860 || ((now.tv_sec == next_flush.tv_sec)
861 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
863 RRDD_LOG(LOG_DEBUG, "flushing old values");
865 /* Determine the time of the next cache flush. */
866 next_flush.tv_sec = now.tv_sec + config_flush_interval;
868 /* Flush all values that haven't been written in the last
869 * `config_write_interval' seconds. */
870 flush_old_values (config_write_interval);
872 /* unlock the cache while we rotate so we don't block incoming
873 * updates if the fsync() blocks on disk I/O */
874 pthread_mutex_unlock(&cache_lock);
876 pthread_mutex_lock(&cache_lock);
879 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
880 if (status != 0 && status != ETIMEDOUT)
882 RRDD_LOG (LOG_ERR, "flush_thread_main: "
883 "pthread_cond_timedwait returned %i.", status);
887 if (config_flush_at_shutdown)
888 flush_old_values (-1); /* flush everything */
892 pthread_mutex_unlock(&cache_lock);
895 } /* void *flush_thread_main */
897 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
899 pthread_mutex_lock (&cache_lock);
901 while (state != SHUTDOWN
902 || (cache_queue_head != NULL && config_flush_at_shutdown))
910 /* Now, check if there's something to store away. If not, wait until
911 * something comes in. */
912 if (cache_queue_head == NULL)
914 status = pthread_cond_wait (&queue_cond, &cache_lock);
915 if ((status != 0) && (status != ETIMEDOUT))
917 RRDD_LOG (LOG_ERR, "queue_thread_main: "
918 "pthread_cond_wait returned %i.", status);
922 /* Check if a value has arrived. This may be NULL if we timed out or there
923 * was an interrupt such as a signal. */
924 if (cache_queue_head == NULL)
927 ci = cache_queue_head;
929 /* copy the relevant parts */
930 file = strdup (ci->file);
933 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
937 assert(ci->values != NULL);
938 assert(ci->values_num > 0);
941 values_num = ci->values_num;
943 wipe_ci_values(ci, time(NULL));
944 remove_from_queue(ci);
946 pthread_mutex_unlock (&cache_lock);
949 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
952 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
953 "rrd_update_r (%s) failed with status %i. (%s)",
954 file, status, rrd_get_error());
957 journal_write("wrote", file);
959 /* Search again in the tree. It's possible someone issued a "FORGET"
960 * while we were writing the update values. */
961 pthread_mutex_lock(&cache_lock);
962 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
964 pthread_cond_broadcast(&ci->flushed);
965 pthread_mutex_unlock(&cache_lock);
969 pthread_mutex_lock (&stats_lock);
970 stats_updates_written++;
971 stats_data_sets_written += values_num;
972 pthread_mutex_unlock (&stats_lock);
975 rrd_free_ptrs((void ***) &values, &values_num);
978 pthread_mutex_lock (&cache_lock);
980 pthread_mutex_unlock (&cache_lock);
983 } /* }}} void *queue_thread_main */
985 static int buffer_get_field (char **buffer_ret, /* {{{ */
986 size_t *buffer_size_ret, char **field_ret)
995 buffer = *buffer_ret;
997 buffer_size = *buffer_size_ret;
1001 if (buffer_size <= 0)
1004 /* This is ensured by `handle_request'. */
1005 assert (buffer[buffer_size - 1] == '\0');
1008 while (buffer_pos < buffer_size)
1010 /* Check for end-of-field or end-of-buffer */
1011 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1013 field[field_size] = 0;
1019 /* Handle escaped characters. */
1020 else if (buffer[buffer_pos] == '\\')
1022 if (buffer_pos >= (buffer_size - 1))
1025 field[field_size] = buffer[buffer_pos];
1029 /* Normal operation */
1032 field[field_size] = buffer[buffer_pos];
1036 } /* while (buffer_pos < buffer_size) */
1041 *buffer_ret = buffer + buffer_pos;
1042 *buffer_size_ret = buffer_size - buffer_pos;
1046 } /* }}} int buffer_get_field */
1048 /* if we're restricting writes to the base directory,
1049 * check whether the file falls within the dir
1050 * returns 1 if OK, otherwise 0
1052 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1054 assert(file != NULL);
1056 if (!config_write_base_only
1057 || JOURNAL_REPLAY(sock)
1058 || config_base_dir == NULL)
1061 if (strstr(file, "../") != NULL) goto err;
1063 /* relative paths without "../" are ok */
1064 if (*file != '/') return 1;
1066 /* file must be of the format base + "/" + <1+ char filename> */
1067 if (strlen(file) < _config_base_dir_len + 2) goto err;
1068 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1069 if (*(file + _config_base_dir_len) != '/') goto err;
1074 if (sock != NULL && sock->fd >= 0)
1075 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1078 } /* }}} static int check_file_access */
1080 /* when using a base dir, convert relative paths to absolute paths.
1081 * if necessary, modifies the "filename" pointer to point
1082 * to the new path created in "tmp". "tmp" is provided
1083 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1085 * this allows us to optimize for the expected case (absolute path)
1088 static void get_abs_path(char **filename, char *tmp)
1090 assert(tmp != NULL);
1091 assert(filename != NULL && *filename != NULL);
1093 if (config_base_dir == NULL || **filename == '/')
1096 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1098 } /* }}} static int get_abs_path */
1100 static int flush_file (const char *filename) /* {{{ */
1104 pthread_mutex_lock (&cache_lock);
1106 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1109 pthread_mutex_unlock (&cache_lock);
1113 if (ci->values_num > 0)
1115 /* Enqueue at head */
1116 enqueue_cache_item (ci, HEAD);
1117 pthread_cond_wait(&ci->flushed, &cache_lock);
1120 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1121 * may have been purged during our cond_wait() */
1123 pthread_mutex_unlock(&cache_lock);
1126 } /* }}} int flush_file */
1128 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1130 char *err = "Syntax error.\n";
1132 if (cmd && cmd->syntax)
1135 return send_response(sock, RESP_ERR, "Usage: %s", err);
1136 } /* }}} static int syntax_error() */
1138 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1140 uint64_t copy_queue_length;
1141 uint64_t copy_updates_received;
1142 uint64_t copy_flush_received;
1143 uint64_t copy_updates_written;
1144 uint64_t copy_data_sets_written;
1145 uint64_t copy_journal_bytes;
1146 uint64_t copy_journal_rotate;
1148 uint64_t tree_nodes_number;
1149 uint64_t tree_depth;
1151 pthread_mutex_lock (&stats_lock);
1152 copy_queue_length = stats_queue_length;
1153 copy_updates_received = stats_updates_received;
1154 copy_flush_received = stats_flush_received;
1155 copy_updates_written = stats_updates_written;
1156 copy_data_sets_written = stats_data_sets_written;
1157 copy_journal_bytes = stats_journal_bytes;
1158 copy_journal_rotate = stats_journal_rotate;
1159 pthread_mutex_unlock (&stats_lock);
1161 pthread_mutex_lock (&cache_lock);
1162 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1163 tree_depth = (uint64_t) g_tree_height (cache_tree);
1164 pthread_mutex_unlock (&cache_lock);
1166 add_response_info(sock,
1167 "QueueLength: %"PRIu64"\n", copy_queue_length);
1168 add_response_info(sock,
1169 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1170 add_response_info(sock,
1171 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1172 add_response_info(sock,
1173 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1174 add_response_info(sock,
1175 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1176 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1177 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1178 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1179 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1181 send_response(sock, RESP_OK, "Statistics follow\n");
1184 } /* }}} int handle_request_stats */
1186 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1188 char *file, file_tmp[PATH_MAX];
1191 status = buffer_get_field (&buffer, &buffer_size, &file);
1194 return syntax_error(sock,cmd);
1198 pthread_mutex_lock(&stats_lock);
1199 stats_flush_received++;
1200 pthread_mutex_unlock(&stats_lock);
1202 get_abs_path(&file, file_tmp);
1203 if (!check_file_access(file, sock)) return 0;
1205 status = flush_file (file);
1207 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1208 else if (status == ENOENT)
1210 /* no file in our tree; see whether it exists at all */
1211 struct stat statbuf;
1213 memset(&statbuf, 0, sizeof(statbuf));
1214 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1215 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1217 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1219 else if (status < 0)
1220 return send_response(sock, RESP_ERR, "Internal error.\n");
1222 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1227 } /* }}} int handle_request_flush */
1229 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1231 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1233 pthread_mutex_lock(&cache_lock);
1234 flush_old_values(-1);
1235 pthread_mutex_unlock(&cache_lock);
1237 return send_response(sock, RESP_OK, "Started flush.\n");
1238 } /* }}} static int handle_request_flushall */
1240 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1243 char *file, file_tmp[PATH_MAX];
1246 status = buffer_get_field(&buffer, &buffer_size, &file);
1248 return syntax_error(sock,cmd);
1250 get_abs_path(&file, file_tmp);
1252 pthread_mutex_lock(&cache_lock);
1253 ci = g_tree_lookup(cache_tree, file);
1256 pthread_mutex_unlock(&cache_lock);
1257 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1260 for (size_t i=0; i < ci->values_num; i++)
1261 add_response_info(sock, "%s\n", ci->values[i]);
1263 pthread_mutex_unlock(&cache_lock);
1264 return send_response(sock, RESP_OK, "updates pending\n");
1265 } /* }}} static int handle_request_pending */
1267 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1271 char *file, file_tmp[PATH_MAX];
1273 status = buffer_get_field(&buffer, &buffer_size, &file);
1275 return syntax_error(sock,cmd);
1277 get_abs_path(&file, file_tmp);
1278 if (!check_file_access(file, sock)) return 0;
1280 pthread_mutex_lock(&cache_lock);
1281 found = g_tree_remove(cache_tree, file);
1282 pthread_mutex_unlock(&cache_lock);
1286 if (!JOURNAL_REPLAY(sock))
1287 journal_write("forget", file);
1289 return send_response(sock, RESP_OK, "Gone!\n");
1292 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1296 } /* }}} static int handle_request_forget */
1298 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1302 pthread_mutex_lock(&cache_lock);
1304 ci = cache_queue_head;
1307 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1311 pthread_mutex_unlock(&cache_lock);
1313 return send_response(sock, RESP_OK, "in queue.\n");
1314 } /* }}} int handle_request_queue */
1316 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1318 char *file, file_tmp[PATH_MAX];
1321 char orig_buf[RRD_CMD_MAX];
1325 /* save it for the journal later */
1326 if (!JOURNAL_REPLAY(sock))
1327 strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size));
1329 status = buffer_get_field (&buffer, &buffer_size, &file);
1331 return syntax_error(sock,cmd);
1333 pthread_mutex_lock(&stats_lock);
1334 stats_updates_received++;
1335 pthread_mutex_unlock(&stats_lock);
1337 get_abs_path(&file, file_tmp);
1338 if (!check_file_access(file, sock)) return 0;
1340 pthread_mutex_lock (&cache_lock);
1341 ci = g_tree_lookup (cache_tree, file);
1343 if (ci == NULL) /* {{{ */
1345 struct stat statbuf;
1348 /* don't hold the lock while we setup; stat(2) might block */
1349 pthread_mutex_unlock(&cache_lock);
1351 memset (&statbuf, 0, sizeof (statbuf));
1352 status = stat (file, &statbuf);
1355 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1358 if (status == ENOENT)
1359 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1361 return send_response(sock, RESP_ERR,
1362 "stat failed with error %i.\n", status);
1364 if (!S_ISREG (statbuf.st_mode))
1365 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1367 if (access(file, R_OK|W_OK) != 0)
1368 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1369 file, rrd_strerror(errno));
1371 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1374 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1376 return send_response(sock, RESP_ERR, "malloc failed.\n");
1378 memset (ci, 0, sizeof (cache_item_t));
1380 ci->file = strdup (file);
1381 if (ci->file == NULL)
1384 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1386 return send_response(sock, RESP_ERR, "strdup failed.\n");
1389 wipe_ci_values(ci, now);
1390 ci->flags = CI_FLAGS_IN_TREE;
1391 pthread_cond_init(&ci->flushed, NULL);
1393 pthread_mutex_lock(&cache_lock);
1395 /* another UPDATE might have added this entry in the meantime */
1396 tmp = g_tree_lookup (cache_tree, file);
1398 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1401 free_cache_item (ci);
1405 /* state may have changed while we were unlocked */
1406 if (state == SHUTDOWN)
1409 assert (ci != NULL);
1411 /* don't re-write updates in replay mode */
1412 if (!JOURNAL_REPLAY(sock))
1413 journal_write("update", orig_buf);
1415 while (buffer_size > 0)
1421 status = buffer_get_field (&buffer, &buffer_size, &value);
1424 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1428 /* make sure update time is always moving forward. We use double here since
1429 update does support subsecond precision for timestamps ... */
1430 stamp = strtod(value, &eostamp);
1431 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1433 pthread_mutex_unlock(&cache_lock);
1434 return send_response(sock, RESP_ERR,
1435 "Cannot find timestamp in '%s'!\n", value);
1437 else if (stamp <= ci->last_update_stamp)
1439 pthread_mutex_unlock(&cache_lock);
1440 return send_response(sock, RESP_ERR,
1441 "illegal attempt to update using time %lf when last"
1442 " update time is %lf (minimum one second step)\n",
1443 stamp, ci->last_update_stamp);
1446 ci->last_update_stamp = stamp;
1448 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1449 &ci->values_alloc, config_alloc_chunk))
1451 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1458 if (((now - ci->last_flush_time) >= config_write_interval)
1459 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1460 && (ci->values_num > 0))
1462 enqueue_cache_item (ci, TAIL);
1465 pthread_mutex_unlock (&cache_lock);
1468 return send_response(sock, RESP_ERR, "No values updated.\n");
1470 return send_response(sock, RESP_OK,
1471 "errors, enqueued %i value(s).\n", values_num);
1476 } /* }}} int handle_request_update */
1478 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1480 char *file, file_tmp[PATH_MAX];
1489 unsigned long ds_cnt;
1496 rrd_value_t *data_ptr;
1503 /* Read the arguments */
1506 status = buffer_get_field (&buffer, &buffer_size, &file);
1510 status = buffer_get_field (&buffer, &buffer_size, &cf);
1514 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1522 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1532 return (syntax_error(sock,cmd));
1534 get_abs_path(&file, file_tmp);
1535 if (!check_file_access(file, sock)) return 0;
1537 status = flush_file (file);
1538 if ((status != 0) && (status != ENOENT))
1539 return (send_response (sock, RESP_ERR,
1540 "flush_file (%s) failed with status %i.\n", file, status));
1542 t = time (NULL); /* "now" */
1544 /* Parse start time */
1545 if (start_str != NULL)
1552 value = strtol (start_str, &endptr, /* base = */ 0);
1553 if ((endptr == start_str) || (errno != 0))
1554 return (send_response(sock, RESP_ERR,
1555 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1559 start_tm = (time_t) value;
1561 start_tm = (time_t) (t + value);
1565 start_tm = t - 86400;
1568 /* Parse end time */
1569 if (end_str != NULL)
1576 value = strtol (end_str, &endptr, /* base = */ 0);
1577 if ((endptr == end_str) || (errno != 0))
1578 return (send_response(sock, RESP_ERR,
1579 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1583 end_tm = (time_t) value;
1585 end_tm = (time_t) (t + value);
1597 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1598 &ds_cnt, &ds_namv, &data);
1600 return (send_response(sock, RESP_ERR,
1601 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1603 add_response_info (sock, "FlushVersion: %lu\n", 1);
1604 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1605 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1606 add_response_info (sock, "Step: %lu\n", step);
1607 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1609 #define SSTRCAT(buffer,str,buffer_fill) do { \
1610 size_t str_len = strlen (str); \
1611 if ((buffer_fill + str_len) > sizeof (buffer)) \
1612 str_len = sizeof (buffer) - buffer_fill; \
1613 if (str_len > 0) { \
1614 strncpy (buffer + buffer_fill, str, str_len); \
1615 buffer_fill += str_len; \
1616 assert (buffer_fill <= sizeof (buffer)); \
1617 if (buffer_fill == sizeof (buffer)) \
1618 buffer[buffer_fill - 1] = 0; \
1620 buffer[buffer_fill] = 0; \
1624 { /* Add list of DS names */
1626 size_t linebuf_fill;
1628 memset (linebuf, 0, sizeof (linebuf));
1630 for (i = 0; i < ds_cnt; i++)
1633 SSTRCAT (linebuf, " ", linebuf_fill);
1634 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1635 rrd_freemem(ds_namv[i]);
1637 rrd_freemem(ds_namv);
1638 add_response_info (sock, "DSName: %s\n", linebuf);
1641 /* Add the actual data */
1644 for (t = start_tm + step; t <= end_tm; t += step)
1647 size_t linebuf_fill;
1650 memset (linebuf, 0, sizeof (linebuf));
1652 for (i = 0; i < ds_cnt; i++)
1654 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1655 tmp[sizeof (tmp) - 1] = 0;
1656 SSTRCAT (linebuf, tmp, linebuf_fill);
1661 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1665 return (send_response (sock, RESP_OK, "Success\n"));
1667 } /* }}} int handle_request_fetch */
1669 /* we came across a "WROTE" entry during journal replay.
1670 * throw away any values that we have accumulated for this file
1672 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1675 const char *file = buffer;
1677 pthread_mutex_lock(&cache_lock);
1679 ci = g_tree_lookup(cache_tree, file);
1682 pthread_mutex_unlock(&cache_lock);
1687 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1689 wipe_ci_values(ci, now);
1690 remove_from_queue(ci);
1692 pthread_mutex_unlock(&cache_lock);
1694 } /* }}} int handle_request_wrote */
1696 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1698 char *file, file_tmp[PATH_MAX];
1702 /* obtain filename */
1703 status = buffer_get_field(&buffer, &buffer_size, &file);
1705 return syntax_error(sock,cmd);
1706 /* get full pathname */
1707 get_abs_path(&file, file_tmp);
1708 if (!check_file_access(file, sock)) {
1709 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1713 info = rrd_info_r(file);
1715 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1717 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1718 switch (data->type) {
1720 if (isnan(data->value.u_val))
1721 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1723 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1726 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1729 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1732 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1735 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1740 rrd_info_free(info);
1742 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1743 } /* }}} static int handle_request_info */
1745 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1747 char *i, *file, file_tmp[PATH_MAX];
1752 /* obtain filename */
1753 status = buffer_get_field(&buffer, &buffer_size, &file);
1755 return syntax_error(sock,cmd);
1756 /* get full pathname */
1757 get_abs_path(&file, file_tmp);
1758 if (!check_file_access(file, sock)) {
1759 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1762 status = buffer_get_field(&buffer, &buffer_size, &i);
1764 return syntax_error(sock,cmd);
1767 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1772 t = rrd_first_r(file,idx);
1774 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1776 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1777 } /* }}} static int handle_request_first */
1780 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1782 char *file, file_tmp[PATH_MAX];
1784 time_t t, from_file, step;
1785 rrd_file_t * rrd_file;
1789 /* obtain filename */
1790 status = buffer_get_field(&buffer, &buffer_size, &file);
1792 return syntax_error(sock,cmd);
1793 /* get full pathname */
1794 get_abs_path(&file, file_tmp);
1795 if (!check_file_access(file, sock)) {
1796 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1800 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1802 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1804 from_file = rrd.live_head->last_up;
1805 step = rrd.stat_head->pdp_step;
1806 rrd_close(rrd_file);
1807 pthread_mutex_lock(&cache_lock);
1808 ci = g_tree_lookup(cache_tree, file);
1810 t = ci->last_update_stamp;
1813 pthread_mutex_unlock(&cache_lock);
1817 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1819 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1820 } /* }}} static int handle_request_last */
1822 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1824 char *file, file_tmp[PATH_MAX];
1829 unsigned long step = 300;
1830 time_t last_up = time(NULL)-10;
1831 int no_overwrite = opt_no_overwrite;
1834 /* obtain filename */
1835 status = buffer_get_field(&buffer, &buffer_size, &file);
1837 return syntax_error(sock,cmd);
1838 /* get full pathname */
1839 get_abs_path(&file, file_tmp);
1840 if (!check_file_access(file, sock)) {
1841 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1843 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1845 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1846 if( ! strncmp(tok,"-b",2) ) {
1847 status = buffer_get_field(&buffer, &buffer_size, &tok );
1848 if (status != 0) return syntax_error(sock,cmd);
1849 last_up = (time_t) atol(tok);
1852 if( ! strncmp(tok,"-s",2) ) {
1853 status = buffer_get_field(&buffer, &buffer_size, &tok );
1854 if (status != 0) return syntax_error(sock,cmd);
1858 if( ! strncmp(tok,"-O",2) ) {
1862 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1863 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1864 return syntax_error(sock,cmd);
1867 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1869 if (last_up < 3600 * 24 * 365 * 10) {
1870 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1874 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1877 return send_response(sock, RESP_OK, "RRD created OK\n");
1879 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1880 } /* }}} static int handle_request_create */
1882 /* start "BATCH" processing */
1883 static int batch_start (HANDLER_PROTO) /* {{{ */
1886 if (sock->batch_start)
1887 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1889 status = send_response(sock, RESP_OK,
1890 "Go ahead. End with dot '.' on its own line.\n");
1891 sock->batch_start = time(NULL);
1892 sock->batch_cmd = 0;
1895 } /* }}} static int batch_start */
1897 /* finish "BATCH" processing and return results to the client */
1898 static int batch_done (HANDLER_PROTO) /* {{{ */
1900 assert(sock->batch_start);
1901 sock->batch_start = 0;
1902 sock->batch_cmd = 0;
1903 return send_response(sock, RESP_OK, "errors\n");
1904 } /* }}} static int batch_done */
1906 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1909 } /* }}} static int handle_request_quit */
1911 static command_t list_of_commands[] = { /* {{{ */
1914 handle_request_update,
1916 "UPDATE <filename> <values> [<values> ...]\n"
1918 "Adds the given file to the internal cache if it is not yet known and\n"
1919 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1922 "Each <values> has the following form:\n"
1923 " <values> = <time>:<value>[:<value>[...]]\n"
1924 "See the rrdupdate(1) manpage for details.\n"
1928 handle_request_wrote,
1929 CMD_CONTEXT_JOURNAL,
1935 handle_request_flush,
1936 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1937 "FLUSH <filename>\n"
1939 "Adds the given filename to the head of the update queue and returns\n"
1940 "after it has been dequeued.\n"
1944 handle_request_flushall,
1948 "Triggers writing of all pending updates. Returns immediately.\n"
1952 handle_request_pending,
1954 "PENDING <filename>\n"
1956 "Shows any 'pending' updates for a file, in order.\n"
1957 "The updates shown have not yet been written to the underlying RRD file.\n"
1961 handle_request_forget,
1963 "FORGET <filename>\n"
1965 "Removes the file completely from the cache.\n"
1966 "Any pending updates for the file will be lost.\n"
1970 handle_request_queue,
1974 "Shows all files in the output queue.\n"
1975 "The output is zero or more lines in the following format:\n"
1976 "(where <num_vals> is the number of values to be written)\n"
1978 "<num_vals> <filename>\n"
1982 handle_request_stats,
1986 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1987 "a description of the values.\n"
1991 handle_request_help,
1993 "HELP [<command>]\n",
1994 NULL, /* special! */
2002 "The 'BATCH' command permits the client to initiate a bulk load\n"
2003 " of commands to rrdcached.\n"
2008 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2009 " client: command #1\n"
2010 " client: command #2\n"
2011 " client: ... and so on\n"
2013 " server: 2 errors\n"
2014 " server: 7 message for command #7\n"
2015 " server: 9 message for command #9\n"
2017 "For more information, consult the rrdcached(1) documentation.\n"
2020 ".", /* BATCH terminator */
2028 handle_request_fetch,
2030 "FETCH <file> <CF> [<start> [<end>]]\n"
2032 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2036 handle_request_info,
2038 "INFO <filename>\n",
2039 "The INFO command retrieves information about a specified RRD file.\n"
2040 "This is returned in standard rrdinfo format, a sequence of lines\n"
2041 "with the format <keyname> = <value>\n"
2042 "Note that this is the data as of the last update of the RRD file itself,\n"
2043 "not the last time data was received via rrdcached, so there may be pending\n"
2044 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2048 handle_request_first,
2050 "FIRST <filename> <rra index>\n",
2051 "The FIRST command retrieves the first data time for a specified RRA in\n"
2056 handle_request_last,
2058 "LAST <filename>\n",
2059 "The LAST command retrieves the last update time for a specified RRD file.\n"
2060 "Note that this is the time of the last update of the RRD file itself, not\n"
2061 "the last time data was received via rrdcached, so there may be pending\n"
2062 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2066 handle_request_create,
2067 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2068 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2069 "The CREATE command will create an RRD file, overwriting any existing file\n"
2070 "unless the -O option is given or rrdcached was started with the -O option.\n"
2071 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2072 "not acceptable) and the step is in seconds (default is 300).\n"
2073 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2077 handle_request_quit,
2078 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2081 "Disconnect from rrdcached.\n"
2083 }; /* }}} command_t list_of_commands[] */
2084 static size_t list_of_commands_len = sizeof (list_of_commands)
2085 / sizeof (list_of_commands[0]);
2087 static command_t *find_command(char *cmd)
2091 for (i = 0; i < list_of_commands_len; i++)
2092 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2093 return (&list_of_commands[i]);
2097 /* We currently use the index in the `list_of_commands' array as a bit position
2098 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2099 * outside these functions so that switching to a more elegant storage method
2100 * is easily possible. */
2101 static ssize_t find_command_index (const char *cmd) /* {{{ */
2105 for (i = 0; i < list_of_commands_len; i++)
2106 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2107 return ((ssize_t) i);
2109 } /* }}} ssize_t find_command_index */
2111 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2116 if (JOURNAL_REPLAY(sock))
2122 if ((strcasecmp ("QUIT", cmd) == 0)
2123 || (strcasecmp ("HELP", cmd) == 0))
2125 else if (strcmp (".", cmd) == 0)
2128 i = find_command_index (cmd);
2133 if ((sock->permissions & (1 << i)) != 0)
2136 } /* }}} int socket_permission_check */
2138 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2143 i = find_command_index (cmd);
2148 sock->permissions |= (1 << i);
2150 } /* }}} int socket_permission_add */
2152 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2154 sock->permissions = 0;
2155 } /* }}} socket_permission_clear */
2157 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2158 listen_socket_t *src)
2160 dest->permissions = src->permissions;
2161 } /* }}} socket_permission_copy */
2163 static void socket_permission_set_all (listen_socket_t *sock) /* {{{ */
2167 sock->permissions = 0;
2168 for (i = 0; i < list_of_commands_len; i++)
2169 sock->permissions |= (1 << i);
2170 } /* }}} void socket_permission_set_all */
2172 /* check whether commands are received in the expected context */
2173 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2175 if (JOURNAL_REPLAY(sock))
2176 return (cmd->context & CMD_CONTEXT_JOURNAL);
2177 else if (sock->batch_start)
2178 return (cmd->context & CMD_CONTEXT_BATCH);
2180 return (cmd->context & CMD_CONTEXT_CLIENT);
2186 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2191 command_t *help = NULL;
2193 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2195 help = find_command(cmd_str);
2197 if (help && (help->syntax || help->help))
2199 char tmp[RRD_CMD_MAX];
2201 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2205 add_response_info(sock, "Usage: %s\n", help->syntax);
2208 add_response_info(sock, "%s\n", help->help);
2214 resp_txt = "Command overview\n";
2216 for (i = 0; i < list_of_commands_len; i++)
2218 if (list_of_commands[i].syntax == NULL)
2220 add_response_info (sock, "%s", list_of_commands[i].syntax);
2224 return send_response(sock, RESP_OK, resp_txt);
2225 } /* }}} int handle_request_help */
2227 static int handle_request (DISPATCH_PROTO) /* {{{ */
2229 char *buffer_ptr = buffer;
2230 char *cmd_str = NULL;
2231 command_t *cmd = NULL;
2234 assert (buffer[buffer_size - 1] == '\0');
2236 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2239 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2243 if (sock != NULL && sock->batch_start)
2246 cmd = find_command(cmd_str);
2248 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2250 if (!socket_permission_check (sock, cmd->cmd))
2251 return send_response(sock, RESP_ERR, "Permission denied.\n");
2253 if (!command_check_context(sock, cmd))
2254 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2256 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2257 } /* }}} int handle_request */
2259 static void journal_set_free (journal_set *js) /* {{{ */
2264 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2267 } /* }}} journal_set_free */
2269 static void journal_set_remove (journal_set *js) /* {{{ */
2274 for (uint i=0; i < js->files_num; i++)
2276 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2277 unlink(js->files[i]);
2279 } /* }}} journal_set_remove */
2281 /* close current journal file handle.
2282 * MUST hold journal_lock before calling */
2283 static void journal_close(void) /* {{{ */
2285 if (journal_fh != NULL)
2287 if (fclose(journal_fh) != 0)
2288 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2293 } /* }}} journal_close */
2295 /* MUST hold journal_lock before calling */
2296 static void journal_new_file(void) /* {{{ */
2300 char new_file[PATH_MAX + 1];
2302 assert(journal_dir != NULL);
2303 assert(journal_cur != NULL);
2307 gettimeofday(&now, NULL);
2308 /* this format assures that the files sort in strcmp() order */
2309 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2310 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2312 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2313 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2317 journal_fh = fdopen(new_fd, "a");
2318 if (journal_fh == NULL)
2321 journal_size = ftell(journal_fh);
2322 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2324 /* record the file in the journal set */
2325 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2331 "JOURNALING DISABLED: Error while trying to create %s : %s",
2332 new_file, rrd_strerror(errno));
2334 "JOURNALING DISABLED: All values will be flushed at shutdown");
2337 config_flush_at_shutdown = 1;
2339 } /* }}} journal_new_file */
2341 /* MUST NOT hold journal_lock before calling this */
2342 static void journal_rotate(void) /* {{{ */
2344 journal_set *old_js = NULL;
2346 if (journal_dir == NULL)
2349 RRDD_LOG(LOG_DEBUG, "rotating journals");
2351 pthread_mutex_lock(&stats_lock);
2352 ++stats_journal_rotate;
2353 pthread_mutex_unlock(&stats_lock);
2355 pthread_mutex_lock(&journal_lock);
2359 /* rotate the journal sets */
2360 old_js = journal_old;
2361 journal_old = journal_cur;
2362 journal_cur = calloc(1, sizeof(journal_set));
2364 if (journal_cur != NULL)
2367 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2369 pthread_mutex_unlock(&journal_lock);
2371 journal_set_remove(old_js);
2372 journal_set_free (old_js);
2374 } /* }}} static void journal_rotate */
2376 /* MUST hold journal_lock when calling */
2377 static void journal_done(void) /* {{{ */
2379 if (journal_cur == NULL)
2384 if (config_flush_at_shutdown)
2386 RRDD_LOG(LOG_INFO, "removing journals");
2387 journal_set_remove(journal_old);
2388 journal_set_remove(journal_cur);
2392 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2393 "journals will be used at next startup");
2396 journal_set_free(journal_cur);
2397 journal_set_free(journal_old);
2400 } /* }}} static void journal_done */
2402 static int journal_write(char *cmd, char *args) /* {{{ */
2406 if (journal_fh == NULL)
2409 pthread_mutex_lock(&journal_lock);
2410 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2411 journal_size += chars;
2413 if (journal_size > JOURNAL_MAX)
2416 pthread_mutex_unlock(&journal_lock);
2420 pthread_mutex_lock(&stats_lock);
2421 stats_journal_bytes += chars;
2422 pthread_mutex_unlock(&stats_lock);
2426 } /* }}} static int journal_write */
2428 static int journal_replay (const char *file) /* {{{ */
2434 char entry[RRD_CMD_MAX];
2437 if (file == NULL) return 0;
2440 char *reason = "unknown error";
2442 struct stat statbuf;
2444 memset(&statbuf, 0, sizeof(statbuf));
2445 if (stat(file, &statbuf) != 0)
2447 reason = "stat error";
2450 else if (!S_ISREG(statbuf.st_mode))
2452 reason = "not a regular file";
2455 if (statbuf.st_uid != daemon_uid)
2457 reason = "not owned by daemon user";
2460 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2462 reason = "must not be user/group writable";
2468 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2469 file, rrd_strerror(status), reason);
2474 fh = fopen(file, "r");
2477 if (errno != ENOENT)
2478 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2479 file, rrd_strerror(errno));
2483 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2492 if (fgets(entry, sizeof(entry), fh) == NULL)
2494 entry_len = strlen(entry);
2496 /* check \n termination in case journal writing crashed mid-line */
2499 else if (entry[entry_len - 1] != '\n')
2501 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2506 entry[entry_len - 1] = '\0';
2508 if (handle_request(NULL, now, entry, entry_len) == 0)
2516 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2517 entry_cnt, fail_cnt);
2519 return entry_cnt > 0 ? 1 : 0;
2520 } /* }}} static int journal_replay */
2522 static int journal_sort(const void *v1, const void *v2)
2524 char **jn1 = (char **) v1;
2525 char **jn2 = (char **) v2;
2527 return strcmp(*jn1,*jn2);
2530 static void journal_init(void) /* {{{ */
2532 int had_journal = 0;
2534 struct dirent *dent;
2535 char path[PATH_MAX+1];
2537 if (journal_dir == NULL) return;
2539 pthread_mutex_lock(&journal_lock);
2541 journal_cur = calloc(1, sizeof(journal_set));
2542 if (journal_cur == NULL)
2544 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2548 RRDD_LOG(LOG_INFO, "checking for journal files");
2550 /* Handle old journal files during transition. This gives them the
2551 * correct sort order. TODO: remove after first release
2554 char old_path[PATH_MAX+1];
2555 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2556 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2557 rename(old_path, path);
2559 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2560 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2561 rename(old_path, path);
2564 dir = opendir(journal_dir);
2566 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2569 while ((dent = readdir(dir)) != NULL)
2571 /* looks like a journal file? */
2572 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2575 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2577 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2579 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2586 qsort(journal_cur->files, journal_cur->files_num,
2587 sizeof(journal_cur->files[0]), journal_sort);
2589 for (uint i=0; i < journal_cur->files_num; i++)
2590 had_journal += journal_replay(journal_cur->files[i]);
2594 /* it must have been a crash. start a flush */
2595 if (had_journal && config_flush_at_shutdown)
2596 flush_old_values(-1);
2598 pthread_mutex_unlock(&journal_lock);
2600 RRDD_LOG(LOG_INFO, "journal processing complete");
2602 } /* }}} static void journal_init */
2604 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2606 assert(sock != NULL);
2608 free(sock->rbuf); sock->rbuf = NULL;
2609 free(sock->wbuf); sock->wbuf = NULL;
2611 } /* }}} void free_listen_socket */
2613 static void close_connection(listen_socket_t *sock) /* {{{ */
2621 free_listen_socket(sock);
2623 } /* }}} void close_connection */
2625 static void *connection_thread_main (void *args) /* {{{ */
2627 listen_socket_t *sock;
2630 sock = (listen_socket_t *) args;
2633 /* init read buffers */
2634 sock->next_read = sock->next_cmd = 0;
2635 sock->rbuf = malloc(RBUF_SIZE);
2636 if (sock->rbuf == NULL)
2638 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2639 close_connection(sock);
2643 pthread_mutex_lock (&connection_threads_lock);
2645 /* LIBWRAP does not support multiple threads! By putting this code
2646 inside pthread_mutex_lock we do not have to worry about request_info
2647 getting overwritten by another thread.
2649 struct request_info req;
2650 request_init(&req, RQ_DAEMON, "rrdcached\0", RQ_FILE, fd, NULL );
2652 if(!hosts_access(&req)) {
2653 RRDD_LOG(LOG_INFO, "refused connection from %s", eval_client(&req));
2654 pthread_mutex_unlock (&connection_threads_lock);
2655 close_connection(sock);
2658 #endif /* HAVE_LIBWRAP */
2659 connection_threads_num++;
2660 pthread_mutex_unlock (&connection_threads_lock);
2662 while (state == RUNNING)
2669 struct pollfd pollfd;
2673 pollfd.events = POLLIN | POLLPRI;
2676 status = poll (&pollfd, 1, /* timeout = */ 500);
2677 if (state != RUNNING)
2679 else if (status == 0) /* timeout */
2681 else if (status < 0) /* error */
2684 if (status != EINTR)
2685 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2689 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2691 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2693 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2694 "poll(2) returned something unexpected: %#04hx",
2699 rbytes = read(fd, sock->rbuf + sock->next_read,
2700 RBUF_SIZE - sock->next_read);
2703 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2706 else if (rbytes == 0)
2709 sock->next_read += rbytes;
2711 if (sock->batch_start)
2712 now = sock->batch_start;
2716 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2718 status = handle_request (sock, now, cmd, cmd_len+1);
2725 close_connection(sock);
2727 /* Remove this thread from the connection threads list */
2728 pthread_mutex_lock (&connection_threads_lock);
2729 connection_threads_num--;
2730 if (connection_threads_num <= 0)
2731 pthread_cond_broadcast(&connection_threads_done);
2732 pthread_mutex_unlock (&connection_threads_lock);
2735 } /* }}} void *connection_thread_main */
2737 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2740 struct sockaddr_un sa;
2741 listen_socket_t *temp;
2744 char *path_copy, *dir;
2747 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2748 path += strlen("unix:");
2750 /* dirname may modify its argument */
2751 path_copy = strdup(path);
2752 if (path_copy == NULL)
2754 fprintf(stderr, "rrdcached: strdup(): %s\n",
2755 rrd_strerror(errno));
2759 dir = dirname(path_copy);
2760 if (rrd_mkdir_p(dir, 0777) != 0)
2762 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2763 dir, rrd_strerror(errno));
2769 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2770 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2773 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2777 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2779 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2782 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2783 rrd_strerror(errno));
2787 memset (&sa, 0, sizeof (sa));
2788 sa.sun_family = AF_UNIX;
2789 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2791 /* if we've gotten this far, we own the pid file. any daemon started
2792 * with the same args must not be alive. therefore, ensure that we can
2793 * create the socket...
2797 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2800 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2801 path, rrd_strerror(errno));
2806 /* tweak the sockets group ownership */
2807 if (sock->socket_group != (gid_t)-1)
2809 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2810 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2812 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2816 if (sock->socket_permissions != (mode_t)-1)
2818 if (chmod(path, sock->socket_permissions) != 0)
2819 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2820 (unsigned int)sock->socket_permissions, strerror(errno));
2823 status = listen (fd, /* backlog = */ 10);
2826 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2827 path, rrd_strerror(errno));
2833 listen_fds[listen_fds_num].fd = fd;
2834 listen_fds[listen_fds_num].family = PF_UNIX;
2835 strncpy(listen_fds[listen_fds_num].addr, path,
2836 sizeof (listen_fds[listen_fds_num].addr) - 1);
2840 } /* }}} int open_listen_socket_unix */
2842 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2844 struct addrinfo ai_hints;
2845 struct addrinfo *ai_res;
2846 struct addrinfo *ai_ptr;
2847 char addr_copy[NI_MAXHOST];
2852 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2853 addr_copy[sizeof (addr_copy) - 1] = 0;
2856 memset (&ai_hints, 0, sizeof (ai_hints));
2857 ai_hints.ai_flags = 0;
2858 #ifdef AI_ADDRCONFIG
2859 ai_hints.ai_flags |= AI_ADDRCONFIG;
2861 ai_hints.ai_family = AF_UNSPEC;
2862 ai_hints.ai_socktype = SOCK_STREAM;
2865 if (*addr == '[') /* IPv6+port format */
2867 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2870 port = strchr (addr, ']');
2873 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2881 else if (*port == 0)
2885 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2888 } /* if (*addr == '[') */
2891 port = rindex(addr, ':');
2899 status = getaddrinfo (addr,
2900 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2901 &ai_hints, &ai_res);
2904 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2905 addr, gai_strerror (status));
2909 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2912 listen_socket_t *temp;
2915 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2916 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2920 "rrdcached: open_listen_socket_network: realloc failed.\n");
2924 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2926 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2929 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2930 rrd_strerror(errno));
2934 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2936 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2939 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2940 sock->addr, rrd_strerror(errno));
2945 status = listen (fd, /* backlog = */ 10);
2948 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2949 sock->addr, rrd_strerror(errno));
2951 freeaddrinfo(ai_res);
2955 listen_fds[listen_fds_num].fd = fd;
2956 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2958 } /* for (ai_ptr) */
2960 freeaddrinfo(ai_res);
2962 } /* }}} static int open_listen_socket_network */
2964 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2966 assert(sock != NULL);
2967 assert(sock->addr != NULL);
2969 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2970 || sock->addr[0] == '/')
2971 return (open_listen_socket_unix(sock));
2973 return (open_listen_socket_network(sock));
2974 } /* }}} int open_listen_socket */
2976 static int close_listen_sockets (void) /* {{{ */
2980 for (i = 0; i < listen_fds_num; i++)
2982 close (listen_fds[i].fd);
2984 if (listen_fds[i].family == PF_UNIX)
2985 unlink(listen_fds[i].addr);
2993 } /* }}} int close_listen_sockets */
2995 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2997 struct pollfd *pollfds;
3002 if (listen_fds_num < 1)
3004 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
3008 pollfds_num = listen_fds_num;
3009 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
3010 if (pollfds == NULL)
3012 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3015 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
3017 RRDD_LOG(LOG_INFO, "listening for connections");
3019 while (state == RUNNING)
3021 for (i = 0; i < pollfds_num; i++)
3023 pollfds[i].fd = listen_fds[i].fd;
3024 pollfds[i].events = POLLIN | POLLPRI;
3025 pollfds[i].revents = 0;
3028 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3029 if (state != RUNNING)
3031 else if (status == 0) /* timeout */
3033 else if (status < 0) /* error */
3036 if (status != EINTR)
3038 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3043 for (i = 0; i < pollfds_num; i++)
3045 listen_socket_t *client_sock;
3046 struct sockaddr_storage client_sa;
3047 socklen_t client_sa_size;
3049 pthread_attr_t attr;
3051 if (pollfds[i].revents == 0)
3054 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3056 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3057 "poll(2) returned something unexpected for listen FD #%i.",
3062 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3063 if (client_sock == NULL)
3065 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3068 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3070 client_sa_size = sizeof (client_sa);
3071 client_sock->fd = accept (pollfds[i].fd,
3072 (struct sockaddr *) &client_sa, &client_sa_size);
3073 if (client_sock->fd < 0)
3075 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3080 pthread_attr_init (&attr);
3081 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3083 status = pthread_create (&tid, &attr, connection_thread_main,
3087 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3088 close_connection(client_sock);
3091 } /* for (pollfds_num) */
3092 } /* while (state == RUNNING) */
3094 RRDD_LOG(LOG_INFO, "starting shutdown");
3096 close_listen_sockets ();
3098 pthread_mutex_lock (&connection_threads_lock);
3099 while (connection_threads_num > 0)
3100 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3101 pthread_mutex_unlock (&connection_threads_lock);
3106 } /* }}} void *listen_thread_main */
3108 static int daemonize (void) /* {{{ */
3113 daemon_uid = geteuid();
3115 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3117 pid_fd = check_pidfile();
3121 /* open all the listen sockets */
3122 if (config_listen_address_list_len > 0)
3124 for (size_t i = 0; i < config_listen_address_list_len; i++)
3125 open_listen_socket (config_listen_address_list[i]);
3127 rrd_free_ptrs((void ***) &config_listen_address_list,
3128 &config_listen_address_list_len);
3132 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3133 sizeof(default_socket.addr) - 1);
3134 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3136 if (default_socket.permissions == 0)
3137 socket_permission_set_all (&default_socket);
3139 open_listen_socket (&default_socket);
3142 if (listen_fds_num < 1)
3144 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3148 if (!stay_foreground)
3155 fprintf (stderr, "daemonize: fork(2) failed.\n");
3161 /* Become session leader */
3164 /* Open the first three file descriptors to /dev/null */
3169 open ("/dev/null", O_RDWR);
3170 if (dup(0) == -1 || dup(0) == -1){
3171 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3173 } /* if (!stay_foreground) */
3175 /* Change into the /tmp directory. */
3176 base_dir = (config_base_dir != NULL)
3180 if (chdir (base_dir) != 0)
3182 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3186 install_signal_handlers();
3188 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3189 RRDD_LOG(LOG_INFO, "starting up");
3191 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3192 (GDestroyNotify) free_cache_item);
3193 if (cache_tree == NULL)
3195 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3199 return write_pidfile (pid_fd);
3204 } /* }}} int daemonize */
3206 static int cleanup (void) /* {{{ */
3208 pthread_cond_broadcast (&flush_cond);
3209 pthread_join (flush_thread, NULL);
3211 pthread_cond_broadcast (&queue_cond);
3212 for (int i = 0; i < config_queue_threads; i++)
3213 pthread_join (queue_threads[i], NULL);
3215 if (config_flush_at_shutdown)
3217 assert(cache_queue_head == NULL);
3218 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3221 free(queue_threads);
3222 free(config_base_dir);
3224 pthread_mutex_lock(&cache_lock);
3225 g_tree_destroy(cache_tree);
3227 pthread_mutex_lock(&journal_lock);
3230 RRDD_LOG(LOG_INFO, "goodbye");
3234 free(config_pid_file);
3237 } /* }}} int cleanup */
3239 static int read_options (int argc, char **argv) /* {{{ */
3244 socket_permission_clear (&default_socket);
3246 default_socket.socket_group = (gid_t)-1;
3247 default_socket.socket_permissions = (mode_t)-1;
3249 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3254 opt_no_overwrite = 1;
3263 listen_socket_t *new;
3265 new = malloc(sizeof(listen_socket_t));
3268 fprintf(stderr, "read_options: malloc failed.\n");
3271 memset(new, 0, sizeof(listen_socket_t));
3273 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3275 /* Add permissions to the socket {{{ */
3276 if (default_socket.permissions != 0)
3278 socket_permission_copy (new, &default_socket);
3280 else /* if (default_socket.permissions == 0) */
3282 /* Add permission for ALL commands to the socket. */
3283 socket_permission_set_all (new);
3285 /* }}} Done adding permissions. */
3287 new->socket_group = default_socket.socket_group;
3288 new->socket_permissions = default_socket.socket_permissions;
3290 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3291 &config_listen_address_list_len, new))
3293 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3299 /* set socket group permissions */
3305 group_gid = strtoul(optarg, NULL, 10);
3306 if (errno != EINVAL && group_gid>0)
3308 /* we were passed a number */
3309 grp = getgrgid(group_gid);
3313 grp = getgrnam(optarg);
3318 default_socket.socket_group = grp->gr_gid;
3322 /* no idea what the user wanted... */
3323 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3329 /* set socket file permissions */
3333 char *endptr = NULL;
3335 tmp = strtol (optarg, &endptr, 8);
3336 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3337 || (tmp > 07777) || (tmp < 0)) {
3338 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3343 default_socket.socket_permissions = (mode_t)tmp;
3354 socket_permission_clear (&default_socket);
3356 optcopy = strdup (optarg);
3359 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3362 status = socket_permission_add (&default_socket, ptr);
3365 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3366 "socket failed. Most likely, this permission doesn't "
3367 "exist. Check your command line.\n", ptr);
3380 temp = atoi (optarg);
3382 config_flush_interval = temp;
3385 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3395 temp = atoi (optarg);
3397 config_write_interval = temp;
3400 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3410 temp = atoi(optarg);
3412 config_write_jitter = temp;
3415 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3425 threads = atoi(optarg);
3427 config_queue_threads = threads;
3430 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3437 config_write_base_only = 1;
3443 char base_realpath[PATH_MAX];
3445 if (config_base_dir != NULL)
3446 free (config_base_dir);
3447 config_base_dir = strdup (optarg);
3448 if (config_base_dir == NULL)
3450 fprintf (stderr, "read_options: strdup failed.\n");
3454 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3456 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3457 config_base_dir, rrd_strerror (errno));
3461 /* make sure that the base directory is not resolved via
3462 * symbolic links. this makes some performance-enhancing
3463 * assumptions possible (we don't have to resolve paths
3464 * that start with a "/")
3466 if (realpath(config_base_dir, base_realpath) == NULL)
3468 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3469 "%s\n", config_base_dir, rrd_strerror(errno));
3473 len = strlen (config_base_dir);
3474 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3476 config_base_dir[len - 1] = 0;
3482 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3486 _config_base_dir_len = len;
3488 len = strlen (base_realpath);
3489 while ((len > 0) && (base_realpath[len - 1] == '/'))
3491 base_realpath[len - 1] = '\0';
3495 if (strncmp(config_base_dir,
3496 base_realpath, sizeof(base_realpath)) != 0)
3499 "Base directory (-b) resolved via file system links!\n"
3500 "Please consult rrdcached '-b' documentation!\n"
3501 "Consider specifying the real directory (%s)\n",
3510 if (config_pid_file != NULL)
3511 free (config_pid_file);
3512 config_pid_file = strdup (optarg);
3513 if (config_pid_file == NULL)
3515 fprintf (stderr, "read_options: strdup failed.\n");
3522 config_flush_at_shutdown = 1;
3527 char journal_dir_actual[PATH_MAX];
3528 journal_dir = realpath((const char *)optarg, journal_dir_actual);
3531 // if we were able to properly resolve the path, lets have a copy
3532 // for use outside this block.
3533 journal_dir = strdup(journal_dir);
3534 status = rrd_mkdir_p(journal_dir, 0777);
3537 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3538 journal_dir, rrd_strerror(errno));
3541 if (access(journal_dir, R_OK|W_OK|X_OK) != 0)
3543 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3544 errno ? rrd_strerror(errno) : "");
3548 fprintf(stderr, "Unable to resolve journal path (%s,%s)\n", optarg,
3549 errno ? rrd_strerror(errno) : "");
3557 int temp = atoi(optarg);
3559 config_alloc_chunk = temp;
3562 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3570 printf ("RRDCacheD %s\n"
3571 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3573 "Usage: rrdcached [options]\n"
3575 "Valid options are:\n"
3576 " -l <address> Socket address to listen to.\n"
3577 " Default: "RRDCACHED_DEFAULT_ADDRESS"\n"
3578 " -P <perms> Sets the permissions to assign to all following "
3580 " -w <seconds> Interval in which to write data.\n"
3581 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3582 " -t <threads> Number of write threads.\n"
3583 " -f <seconds> Interval in which to flush dead data.\n"
3584 " -p <file> Location of the PID-file.\n"
3585 " -b <dir> Base directory to change to.\n"
3586 " -B Restrict file access to paths within -b <dir>\n"
3587 " -g Do not fork and run in the foreground.\n"
3588 " -j <dir> Directory in which to create the journal files.\n"
3589 " -F Always flush all updates at shutdown\n"
3590 " -s <id|name> Group owner of all following UNIX sockets\n"
3591 " (the socket will also have read/write permissions "
3593 " -m <mode> File permissions (octal) of all following UNIX "
3595 " -a <size> Memory allocation chunk size. Default is 1.\n"
3596 " -O Do not allow CREATE commands to overwrite existing\n"
3597 " files, even if asked to.\n"
3599 "For more information and a detailed description of all options "
3601 "to the rrdcached(1) manual page.\n",
3608 } /* switch (option) */
3609 } /* while (getopt) */
3611 /* advise the user when values are not sane */
3612 if (config_flush_interval < 2 * config_write_interval)
3613 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3614 " 2x write interval (-w) !\n");
3615 if (config_write_jitter > config_write_interval)
3616 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3617 " write interval (-w) !\n");
3619 if (config_write_base_only && config_base_dir == NULL)
3620 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3621 " Consult the rrdcached documentation\n");
3623 if (journal_dir == NULL)
3624 config_flush_at_shutdown = 1;
3627 } /* }}} int read_options */
3629 int main (int argc, char **argv)
3633 status = read_options (argc, argv);
3641 status = daemonize ();
3644 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3650 /* start the queue threads */
3651 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3652 if (queue_threads == NULL)
3654 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3658 for (int i = 0; i < config_queue_threads; i++)
3660 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3661 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3664 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3670 /* start the flush thread */
3671 memset(&flush_thread, 0, sizeof(flush_thread));
3672 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3675 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3680 listen_thread_main (NULL);
3687 * vim: set sw=2 sts=2 ts=8 et fdm=marker :