2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
87 #include <sys/socket.h>
95 #include <sys/types.h>
107 #include <sys/time.h>
112 #include <glib-2.0/glib.h>
115 #define RRDD_LOG(severity, ...) \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
131 char addr[PATH_MAX + 1];
134 /* state for BATCH processing */
146 uint32_t permissions;
149 mode_t socket_permissions;
151 typedef struct listen_socket_s listen_socket_t;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
179 typedef struct cache_item_s cache_item_t;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
191 pthread_cond_t flushed;
196 struct callback_flush_data_s
203 typedef struct callback_flush_data_s callback_flush_data_t;
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
218 /* max length of socket command or response */
220 #define RBUF_SIZE (CMD_MAX*2)
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int opt_no_overwrite = 0; /* default for the daemon */
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
300 static void sig_common (const char *sig) /* {{{ */
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
320 config_flush_at_shutdown = 1;
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
326 config_flush_at_shutdown = 0;
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
332 /* These structures are static, because `sigaction' behaves weird if the are
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
407 pid_fd = open_pidfile("open", O_RDWR);
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
451 fh = fdopen (fd, "w");
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459 fprintf (fh, "%i\n", (int) pid);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
470 file = (config_pid_file != NULL)
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
499 char *cmd = sock->rbuf + sock->next_cmd;
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 char buffer[CMD_MAX];
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
552 len = vsprintf(buffer, fmt, argp);
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
570 while ((str = strchr(str, '\n')) != NULL)
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
587 char buffer[CMD_MAX];
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
605 rclen = sprintf(buffer, "%d ", lines);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
610 len = vsprintf(buffer+rclen, fmt, argp);
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
629 if (sock->wbuf != NULL && rc == RESP_OK)
632 while (wrote < sock->wbuf_len)
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
644 free(sock->wbuf); sock->wbuf = NULL;
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
654 ci->values_alloc = 0;
656 ci->last_flush_time = when;
657 if (config_write_jitter > 0)
658 ci->last_flush_time += (rrd_random() % config_write_jitter);
662 * remove a "cache_item_t" item from the queue.
663 * must hold 'cache_lock' when calling this
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
667 if (ci == NULL) return;
668 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
670 if (ci->prev == NULL)
671 cache_queue_head = ci->next; /* reset head */
673 ci->prev->next = ci->next;
675 if (ci->next == NULL)
676 cache_queue_tail = ci->prev; /* reset the tail */
678 ci->next->prev = ci->prev;
680 ci->next = ci->prev = NULL;
681 ci->flags &= ~CI_FLAGS_IN_QUEUE;
683 pthread_mutex_lock (&stats_lock);
684 assert (stats_queue_length > 0);
685 stats_queue_length--;
686 pthread_mutex_unlock (&stats_lock);
688 } /* }}} static void remove_from_queue */
690 /* free the resources associated with the cache_item_t
691 * must hold cache_lock when calling this function
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
695 if (ci == NULL) return NULL;
697 remove_from_queue(ci);
699 for (size_t i=0; i < ci->values_num; i++)
705 /* in case anyone is waiting */
706 pthread_cond_broadcast(&ci->flushed);
707 pthread_cond_destroy(&ci->flushed);
712 } /* }}} static void *free_cache_item */
715 * enqueue_cache_item:
716 * `cache_lock' must be acquired before calling this function!
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
724 if (ci->values_num == 0)
729 if (cache_queue_head == ci)
732 /* remove if further down in queue */
733 remove_from_queue(ci);
736 ci->next = cache_queue_head;
737 if (ci->next != NULL)
739 cache_queue_head = ci;
741 if (cache_queue_tail == NULL)
742 cache_queue_tail = cache_queue_head;
744 else /* (side == TAIL) */
746 /* We don't move values back in the list.. */
747 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 assert (ci->next == NULL);
751 assert (ci->prev == NULL);
753 ci->prev = cache_queue_tail;
755 if (cache_queue_tail == NULL)
756 cache_queue_head = ci;
758 cache_queue_tail->next = ci;
760 cache_queue_tail = ci;
763 ci->flags |= CI_FLAGS_IN_QUEUE;
765 pthread_cond_signal(&queue_cond);
766 pthread_mutex_lock (&stats_lock);
767 stats_queue_length++;
768 pthread_mutex_unlock (&stats_lock);
771 } /* }}} int enqueue_cache_item */
774 * tree_callback_flush:
775 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776 * while this is in progress.
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
782 callback_flush_data_t *cfd;
784 ci = (cache_item_t *) value;
785 cfd = (callback_flush_data_t *) data;
787 if (ci->flags & CI_FLAGS_IN_QUEUE)
790 if (ci->values_num > 0
791 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
793 enqueue_cache_item (ci, TAIL);
795 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796 && (ci->values_num <= 0))
798 assert ((char *) key == ci->file);
799 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
801 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
807 } /* }}} gboolean tree_callback_flush */
809 static int flush_old_values (int max_age)
811 callback_flush_data_t cfd;
814 memset (&cfd, 0, sizeof (cfd));
815 /* Pass the current time as user data so that we don't need to call
816 * `time' for each node. */
817 cfd.now = time (NULL);
822 cfd.abs_timeout = cfd.now - max_age;
824 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
826 /* `tree_callback_flush' will return the keys of all values that haven't
827 * been touched in the last `config_flush_interval' seconds in `cfd'.
828 * The char*'s in this array point to the same memory as ci->file, so we
829 * don't need to free them separately. */
830 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
832 for (k = 0; k < cfd.keys_num; k++)
834 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835 /* should never fail, since we have held the cache_lock
837 assert(status == TRUE);
840 if (cfd.keys != NULL)
847 } /* int flush_old_values */
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 struct timespec next_flush;
855 gettimeofday (&now, NULL);
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 next_flush.tv_nsec = 1000 * now.tv_usec;
859 pthread_mutex_lock(&cache_lock);
861 while (state == RUNNING)
863 gettimeofday (&now, NULL);
864 if ((now.tv_sec > next_flush.tv_sec)
865 || ((now.tv_sec == next_flush.tv_sec)
866 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
868 RRDD_LOG(LOG_DEBUG, "flushing old values");
870 /* Determine the time of the next cache flush. */
871 next_flush.tv_sec = now.tv_sec + config_flush_interval;
873 /* Flush all values that haven't been written in the last
874 * `config_write_interval' seconds. */
875 flush_old_values (config_write_interval);
877 /* unlock the cache while we rotate so we don't block incoming
878 * updates if the fsync() blocks on disk I/O */
879 pthread_mutex_unlock(&cache_lock);
881 pthread_mutex_lock(&cache_lock);
884 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885 if (status != 0 && status != ETIMEDOUT)
887 RRDD_LOG (LOG_ERR, "flush_thread_main: "
888 "pthread_cond_timedwait returned %i.", status);
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
897 pthread_mutex_unlock(&cache_lock);
900 } /* void *flush_thread_main */
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
904 pthread_mutex_lock (&cache_lock);
906 while (state != SHUTDOWN
907 || (cache_queue_head != NULL && config_flush_at_shutdown))
915 /* Now, check if there's something to store away. If not, wait until
916 * something comes in. */
917 if (cache_queue_head == NULL)
919 status = pthread_cond_wait (&queue_cond, &cache_lock);
920 if ((status != 0) && (status != ETIMEDOUT))
922 RRDD_LOG (LOG_ERR, "queue_thread_main: "
923 "pthread_cond_wait returned %i.", status);
927 /* Check if a value has arrived. This may be NULL if we timed out or there
928 * was an interrupt such as a signal. */
929 if (cache_queue_head == NULL)
932 ci = cache_queue_head;
934 /* copy the relevant parts */
935 file = strdup (ci->file);
938 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
942 assert(ci->values != NULL);
943 assert(ci->values_num > 0);
946 values_num = ci->values_num;
948 wipe_ci_values(ci, time(NULL));
949 remove_from_queue(ci);
951 pthread_mutex_unlock (&cache_lock);
954 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958 "rrd_update_r (%s) failed with status %i. (%s)",
959 file, status, rrd_get_error());
962 journal_write("wrote", file);
964 /* Search again in the tree. It's possible someone issued a "FORGET"
965 * while we were writing the update values. */
966 pthread_mutex_lock(&cache_lock);
967 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
969 pthread_cond_broadcast(&ci->flushed);
970 pthread_mutex_unlock(&cache_lock);
974 pthread_mutex_lock (&stats_lock);
975 stats_updates_written++;
976 stats_data_sets_written += values_num;
977 pthread_mutex_unlock (&stats_lock);
980 rrd_free_ptrs((void ***) &values, &values_num);
983 pthread_mutex_lock (&cache_lock);
985 pthread_mutex_unlock (&cache_lock);
988 } /* }}} void *queue_thread_main */
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991 size_t *buffer_size_ret, char **field_ret)
1000 buffer = *buffer_ret;
1002 buffer_size = *buffer_size_ret;
1003 field = *buffer_ret;
1006 if (buffer_size <= 0)
1009 /* This is ensured by `handle_request'. */
1010 assert (buffer[buffer_size - 1] == '\0');
1013 while (buffer_pos < buffer_size)
1015 /* Check for end-of-field or end-of-buffer */
1016 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1018 field[field_size] = 0;
1024 /* Handle escaped characters. */
1025 else if (buffer[buffer_pos] == '\\')
1027 if (buffer_pos >= (buffer_size - 1))
1030 field[field_size] = buffer[buffer_pos];
1034 /* Normal operation */
1037 field[field_size] = buffer[buffer_pos];
1041 } /* while (buffer_pos < buffer_size) */
1046 *buffer_ret = buffer + buffer_pos;
1047 *buffer_size_ret = buffer_size - buffer_pos;
1051 } /* }}} int buffer_get_field */
1053 /* if we're restricting writes to the base directory,
1054 * check whether the file falls within the dir
1055 * returns 1 if OK, otherwise 0
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1059 assert(file != NULL);
1061 if (!config_write_base_only
1062 || JOURNAL_REPLAY(sock)
1063 || config_base_dir == NULL)
1066 if (strstr(file, "../") != NULL) goto err;
1068 /* relative paths without "../" are ok */
1069 if (*file != '/') return 1;
1071 /* file must be of the format base + "/" + <1+ char filename> */
1072 if (strlen(file) < _config_base_dir_len + 2) goto err;
1073 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074 if (*(file + _config_base_dir_len) != '/') goto err;
1079 if (sock != NULL && sock->fd >= 0)
1080 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083 } /* }}} static int check_file_access */
1085 /* when using a base dir, convert relative paths to absolute paths.
1086 * if necessary, modifies the "filename" pointer to point
1087 * to the new path created in "tmp". "tmp" is provided
1088 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1090 * this allows us to optimize for the expected case (absolute path)
1093 static void get_abs_path(char **filename, char *tmp)
1095 assert(tmp != NULL);
1096 assert(filename != NULL && *filename != NULL);
1098 if (config_base_dir == NULL || **filename == '/')
1101 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1103 } /* }}} static int get_abs_path */
1105 static int flush_file (const char *filename) /* {{{ */
1109 pthread_mutex_lock (&cache_lock);
1111 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114 pthread_mutex_unlock (&cache_lock);
1118 if (ci->values_num > 0)
1120 /* Enqueue at head */
1121 enqueue_cache_item (ci, HEAD);
1122 pthread_cond_wait(&ci->flushed, &cache_lock);
1125 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1126 * may have been purged during our cond_wait() */
1128 pthread_mutex_unlock(&cache_lock);
1131 } /* }}} int flush_file */
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1135 char *err = "Syntax error.\n";
1137 if (cmd && cmd->syntax)
1140 return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1145 uint64_t copy_queue_length;
1146 uint64_t copy_updates_received;
1147 uint64_t copy_flush_received;
1148 uint64_t copy_updates_written;
1149 uint64_t copy_data_sets_written;
1150 uint64_t copy_journal_bytes;
1151 uint64_t copy_journal_rotate;
1153 uint64_t tree_nodes_number;
1154 uint64_t tree_depth;
1156 pthread_mutex_lock (&stats_lock);
1157 copy_queue_length = stats_queue_length;
1158 copy_updates_received = stats_updates_received;
1159 copy_flush_received = stats_flush_received;
1160 copy_updates_written = stats_updates_written;
1161 copy_data_sets_written = stats_data_sets_written;
1162 copy_journal_bytes = stats_journal_bytes;
1163 copy_journal_rotate = stats_journal_rotate;
1164 pthread_mutex_unlock (&stats_lock);
1166 pthread_mutex_lock (&cache_lock);
1167 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168 tree_depth = (uint64_t) g_tree_height (cache_tree);
1169 pthread_mutex_unlock (&cache_lock);
1171 add_response_info(sock,
1172 "QueueLength: %"PRIu64"\n", copy_queue_length);
1173 add_response_info(sock,
1174 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175 add_response_info(sock,
1176 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177 add_response_info(sock,
1178 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179 add_response_info(sock,
1180 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1186 send_response(sock, RESP_OK, "Statistics follow\n");
1189 } /* }}} int handle_request_stats */
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1193 char *file, file_tmp[PATH_MAX];
1196 status = buffer_get_field (&buffer, &buffer_size, &file);
1199 return syntax_error(sock,cmd);
1203 pthread_mutex_lock(&stats_lock);
1204 stats_flush_received++;
1205 pthread_mutex_unlock(&stats_lock);
1207 get_abs_path(&file, file_tmp);
1208 if (!check_file_access(file, sock)) return 0;
1210 status = flush_file (file);
1212 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213 else if (status == ENOENT)
1215 /* no file in our tree; see whether it exists at all */
1216 struct stat statbuf;
1218 memset(&statbuf, 0, sizeof(statbuf));
1219 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1222 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1224 else if (status < 0)
1225 return send_response(sock, RESP_ERR, "Internal error.\n");
1227 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232 } /* }}} int handle_request_flush */
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1236 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1238 pthread_mutex_lock(&cache_lock);
1239 flush_old_values(-1);
1240 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 char *file, file_tmp[PATH_MAX];
1251 status = buffer_get_field(&buffer, &buffer_size, &file);
1253 return syntax_error(sock,cmd);
1255 get_abs_path(&file, file_tmp);
1257 pthread_mutex_lock(&cache_lock);
1258 ci = g_tree_lookup(cache_tree, file);
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 for (size_t i=0; i < ci->values_num; i++)
1266 add_response_info(sock, "%s\n", ci->values[i]);
1268 pthread_mutex_unlock(&cache_lock);
1269 return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1276 char *file, file_tmp[PATH_MAX];
1278 status = buffer_get_field(&buffer, &buffer_size, &file);
1280 return syntax_error(sock,cmd);
1282 get_abs_path(&file, file_tmp);
1283 if (!check_file_access(file, sock)) return 0;
1285 pthread_mutex_lock(&cache_lock);
1286 found = g_tree_remove(cache_tree, file);
1287 pthread_mutex_unlock(&cache_lock);
1291 if (!JOURNAL_REPLAY(sock))
1292 journal_write("forget", file);
1294 return send_response(sock, RESP_OK, "Gone!\n");
1297 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301 } /* }}} static int handle_request_forget */
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1307 pthread_mutex_lock(&cache_lock);
1309 ci = cache_queue_head;
1312 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1316 pthread_mutex_unlock(&cache_lock);
1318 return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1323 char *file, file_tmp[PATH_MAX];
1326 char orig_buf[CMD_MAX];
1330 /* save it for the journal later */
1331 if (!JOURNAL_REPLAY(sock))
1332 strncpy(orig_buf, buffer, buffer_size);
1334 status = buffer_get_field (&buffer, &buffer_size, &file);
1336 return syntax_error(sock,cmd);
1338 pthread_mutex_lock(&stats_lock);
1339 stats_updates_received++;
1340 pthread_mutex_unlock(&stats_lock);
1342 get_abs_path(&file, file_tmp);
1343 if (!check_file_access(file, sock)) return 0;
1345 pthread_mutex_lock (&cache_lock);
1346 ci = g_tree_lookup (cache_tree, file);
1348 if (ci == NULL) /* {{{ */
1350 struct stat statbuf;
1353 /* don't hold the lock while we setup; stat(2) might block */
1354 pthread_mutex_unlock(&cache_lock);
1356 memset (&statbuf, 0, sizeof (statbuf));
1357 status = stat (file, &statbuf);
1360 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1363 if (status == ENOENT)
1364 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1366 return send_response(sock, RESP_ERR,
1367 "stat failed with error %i.\n", status);
1369 if (!S_ISREG (statbuf.st_mode))
1370 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1372 if (access(file, R_OK|W_OK) != 0)
1373 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374 file, rrd_strerror(errno));
1376 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1381 return send_response(sock, RESP_ERR, "malloc failed.\n");
1383 memset (ci, 0, sizeof (cache_item_t));
1385 ci->file = strdup (file);
1386 if (ci->file == NULL)
1389 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1391 return send_response(sock, RESP_ERR, "strdup failed.\n");
1394 wipe_ci_values(ci, now);
1395 ci->flags = CI_FLAGS_IN_TREE;
1396 pthread_cond_init(&ci->flushed, NULL);
1398 pthread_mutex_lock(&cache_lock);
1400 /* another UPDATE might have added this entry in the meantime */
1401 tmp = g_tree_lookup (cache_tree, file);
1403 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406 free_cache_item (ci);
1410 /* state may have changed while we were unlocked */
1411 if (state == SHUTDOWN)
1414 assert (ci != NULL);
1416 /* don't re-write updates in replay mode */
1417 if (!JOURNAL_REPLAY(sock))
1418 journal_write("update", orig_buf);
1420 while (buffer_size > 0)
1426 status = buffer_get_field (&buffer, &buffer_size, &value);
1429 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1433 /* make sure update time is always moving forward */
1434 stamp = strtol(value, &eostamp, 10);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1441 else if (stamp <= ci->last_update_stamp)
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %ld when last"
1446 " update time is %ld (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453 &ci->values_alloc, config_alloc_chunk))
1455 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1462 if (((now - ci->last_flush_time) >= config_write_interval)
1463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464 && (ci->values_num > 0))
1466 enqueue_cache_item (ci, TAIL);
1469 pthread_mutex_unlock (&cache_lock);
1472 return send_response(sock, RESP_ERR, "No values updated.\n");
1474 return send_response(sock, RESP_OK,
1475 "errors, enqueued %i value(s).\n", values_num);
1480 } /* }}} int handle_request_update */
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1484 char *file, file_tmp[PATH_MAX];
1493 unsigned long ds_cnt;
1500 rrd_value_t *data_ptr;
1507 /* Read the arguments */
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1536 return (syntax_error(sock,cmd));
1538 get_abs_path(&file, file_tmp);
1539 if (!check_file_access(file, sock)) return 0;
1541 status = flush_file (file);
1542 if ((status != 0) && (status != ENOENT))
1543 return (send_response (sock, RESP_ERR,
1544 "flush_file (%s) failed with status %i.\n", file, status));
1546 t = time (NULL); /* "now" */
1548 /* Parse start time */
1549 if (start_str != NULL)
1556 value = strtol (start_str, &endptr, /* base = */ 0);
1557 if ((endptr == start_str) || (errno != 0))
1558 return (send_response(sock, RESP_ERR,
1559 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563 start_tm = (time_t) value;
1565 start_tm = (time_t) (t + value);
1569 start_tm = t - 86400;
1572 /* Parse end time */
1573 if (end_str != NULL)
1580 value = strtol (end_str, &endptr, /* base = */ 0);
1581 if ((endptr == end_str) || (errno != 0))
1582 return (send_response(sock, RESP_ERR,
1583 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587 end_tm = (time_t) value;
1589 end_tm = (time_t) (t + value);
1601 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602 &ds_cnt, &ds_namv, &data);
1604 return (send_response(sock, RESP_ERR,
1605 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1607 add_response_info (sock, "FlushVersion: %lu\n", 1);
1608 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610 add_response_info (sock, "Step: %lu\n", step);
1611 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614 size_t str_len = strlen (str); \
1615 if ((buffer_fill + str_len) > sizeof (buffer)) \
1616 str_len = sizeof (buffer) - buffer_fill; \
1617 if (str_len > 0) { \
1618 strncpy (buffer + buffer_fill, str, str_len); \
1619 buffer_fill += str_len; \
1620 assert (buffer_fill <= sizeof (buffer)); \
1621 if (buffer_fill == sizeof (buffer)) \
1622 buffer[buffer_fill - 1] = 0; \
1624 buffer[buffer_fill] = 0; \
1628 { /* Add list of DS names */
1630 size_t linebuf_fill;
1632 memset (linebuf, 0, sizeof (linebuf));
1634 for (i = 0; i < ds_cnt; i++)
1637 SSTRCAT (linebuf, " ", linebuf_fill);
1638 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639 rrd_freemem(ds_namv[i]);
1641 rrd_freemem(ds_namv);
1642 add_response_info (sock, "DSName: %s\n", linebuf);
1645 /* Add the actual data */
1648 for (t = start_tm + step; t <= end_tm; t += step)
1651 size_t linebuf_fill;
1654 memset (linebuf, 0, sizeof (linebuf));
1656 for (i = 0; i < ds_cnt; i++)
1658 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659 tmp[sizeof (tmp) - 1] = 0;
1660 SSTRCAT (linebuf, tmp, linebuf_fill);
1665 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669 return (send_response (sock, RESP_OK, "Success\n"));
1671 } /* }}} int handle_request_fetch */
1673 /* we came across a "WROTE" entry during journal replay.
1674 * throw away any values that we have accumulated for this file
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1679 const char *file = buffer;
1681 pthread_mutex_lock(&cache_lock);
1683 ci = g_tree_lookup(cache_tree, file);
1686 pthread_mutex_unlock(&cache_lock);
1691 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1693 wipe_ci_values(ci, now);
1694 remove_from_queue(ci);
1696 pthread_mutex_unlock(&cache_lock);
1698 } /* }}} int handle_request_wrote */
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1702 char *file, file_tmp[PATH_MAX];
1706 /* obtain filename */
1707 status = buffer_get_field(&buffer, &buffer_size, &file);
1709 return syntax_error(sock,cmd);
1710 /* get full pathname */
1711 get_abs_path(&file, file_tmp);
1712 if (!check_file_access(file, sock)) {
1713 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717 info = rrd_info_r(file);
1719 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1721 for (rrd_info_t *data = info; data != NULL; data = data->next) {
1722 switch (data->type) {
1724 if (isnan(data->value.u_val))
1725 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1727 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744 rrd_info_free(info);
1746 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1747 } /* }}} static int handle_request_info */
1749 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1751 char *i, *file, file_tmp[PATH_MAX];
1756 /* obtain filename */
1757 status = buffer_get_field(&buffer, &buffer_size, &file);
1759 return syntax_error(sock,cmd);
1760 /* get full pathname */
1761 get_abs_path(&file, file_tmp);
1762 if (!check_file_access(file, sock)) {
1763 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1766 status = buffer_get_field(&buffer, &buffer_size, &i);
1768 return syntax_error(sock,cmd);
1771 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1776 t = rrd_first_r(file,idx);
1778 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1780 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1781 } /* }}} static int handle_request_first */
1784 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1786 char *file, file_tmp[PATH_MAX];
1788 time_t t, from_file, step;
1789 rrd_file_t * rrd_file;
1793 /* obtain filename */
1794 status = buffer_get_field(&buffer, &buffer_size, &file);
1796 return syntax_error(sock,cmd);
1797 /* get full pathname */
1798 get_abs_path(&file, file_tmp);
1799 if (!check_file_access(file, sock)) {
1800 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1804 rrd_file = rrd_open(file,&rrd,RRD_READONLY);
1806 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1808 from_file = rrd.live_head->last_up;
1809 step = rrd.stat_head->pdp_step;
1810 rrd_close(rrd_file);
1811 pthread_mutex_lock(&cache_lock);
1812 ci = g_tree_lookup(cache_tree, file);
1814 t = ci->last_update_stamp;
1817 pthread_mutex_unlock(&cache_lock);
1821 return send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n");
1823 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1824 } /* }}} static int handle_request_last */
1826 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1828 char *file, file_tmp[PATH_MAX];
1833 unsigned long step = 300;
1834 time_t last_up = time(NULL)-10;
1835 int no_overwrite = opt_no_overwrite;
1838 /* obtain filename */
1839 status = buffer_get_field(&buffer, &buffer_size, &file);
1841 return syntax_error(sock,cmd);
1842 /* get full pathname */
1843 get_abs_path(&file, file_tmp);
1844 if (!check_file_access(file, sock)) {
1845 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1847 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1849 while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) {
1850 if( ! strncmp(tok,"-b",2) ) {
1851 status = buffer_get_field(&buffer, &buffer_size, &tok );
1852 if (status != 0) return syntax_error(sock,cmd);
1853 last_up = (time_t) atol(tok);
1856 if( ! strncmp(tok,"-s",2) ) {
1857 status = buffer_get_field(&buffer, &buffer_size, &tok );
1858 if (status != 0) return syntax_error(sock,cmd);
1862 if( ! strncmp(tok,"-O",2) ) {
1866 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1867 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1868 return syntax_error(sock,cmd);
1871 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1873 if (last_up < 3600 * 24 * 365 * 10) {
1874 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1878 status = rrd_create_r2(file,step,last_up,no_overwrite,ac,(const char **)av);
1881 return send_response(sock, RESP_OK, "RRD created OK\n");
1883 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1884 } /* }}} static int handle_request_create */
1886 /* start "BATCH" processing */
1887 static int batch_start (HANDLER_PROTO) /* {{{ */
1890 if (sock->batch_start)
1891 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1893 status = send_response(sock, RESP_OK,
1894 "Go ahead. End with dot '.' on its own line.\n");
1895 sock->batch_start = time(NULL);
1896 sock->batch_cmd = 0;
1899 } /* }}} static int batch_start */
1901 /* finish "BATCH" processing and return results to the client */
1902 static int batch_done (HANDLER_PROTO) /* {{{ */
1904 assert(sock->batch_start);
1905 sock->batch_start = 0;
1906 sock->batch_cmd = 0;
1907 return send_response(sock, RESP_OK, "errors\n");
1908 } /* }}} static int batch_done */
1910 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1913 } /* }}} static int handle_request_quit */
1915 static command_t list_of_commands[] = { /* {{{ */
1918 handle_request_update,
1920 "UPDATE <filename> <values> [<values> ...]\n"
1922 "Adds the given file to the internal cache if it is not yet known and\n"
1923 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1926 "Each <values> has the following form:\n"
1927 " <values> = <time>:<value>[:<value>[...]]\n"
1928 "See the rrdupdate(1) manpage for details.\n"
1932 handle_request_wrote,
1933 CMD_CONTEXT_JOURNAL,
1939 handle_request_flush,
1940 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1941 "FLUSH <filename>\n"
1943 "Adds the given filename to the head of the update queue and returns\n"
1944 "after it has been dequeued.\n"
1948 handle_request_flushall,
1952 "Triggers writing of all pending updates. Returns immediately.\n"
1956 handle_request_pending,
1958 "PENDING <filename>\n"
1960 "Shows any 'pending' updates for a file, in order.\n"
1961 "The updates shown have not yet been written to the underlying RRD file.\n"
1965 handle_request_forget,
1967 "FORGET <filename>\n"
1969 "Removes the file completely from the cache.\n"
1970 "Any pending updates for the file will be lost.\n"
1974 handle_request_queue,
1978 "Shows all files in the output queue.\n"
1979 "The output is zero or more lines in the following format:\n"
1980 "(where <num_vals> is the number of values to be written)\n"
1982 "<num_vals> <filename>\n"
1986 handle_request_stats,
1990 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1991 "a description of the values.\n"
1995 handle_request_help,
1997 "HELP [<command>]\n",
1998 NULL, /* special! */
2006 "The 'BATCH' command permits the client to initiate a bulk load\n"
2007 " of commands to rrdcached.\n"
2012 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2013 " client: command #1\n"
2014 " client: command #2\n"
2015 " client: ... and so on\n"
2017 " server: 2 errors\n"
2018 " server: 7 message for command #7\n"
2019 " server: 9 message for command #9\n"
2021 "For more information, consult the rrdcached(1) documentation.\n"
2024 ".", /* BATCH terminator */
2032 handle_request_fetch,
2034 "FETCH <file> <CF> [<start> [<end>]]\n"
2036 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2040 handle_request_info,
2042 "INFO <filename>\n",
2043 "The INFO command retrieves information about a specified RRD file.\n"
2044 "This is returned in standard rrdinfo format, a sequence of lines\n"
2045 "with the format <keyname> = <value>\n"
2046 "Note that this is the data as of the last update of the RRD file itself,\n"
2047 "not the last time data was received via rrdcached, so there may be pending\n"
2048 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2052 handle_request_first,
2054 "FIRST <filename> <rra index>\n",
2055 "The FIRST command retrieves the first data time for a specified RRA in\n"
2060 handle_request_last,
2062 "LAST <filename>\n",
2063 "The LAST command retrieves the last update time for a specified RRD file.\n"
2064 "Note that this is the time of the last update of the RRD file itself, not\n"
2065 "the last time data was received via rrdcached, so there may be pending\n"
2066 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2070 handle_request_create,
2071 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2072 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2073 "The CREATE command will create an RRD file, overwriting any existing file\n"
2074 "unless the -O option is given or rrdcached was started with the -O option.\n"
2075 "The start parameter needs to be in seconds since 1/1/70 (AT-style syntax is\n"
2076 "not acceptable) and the step is in seconds (default is 300).\n"
2077 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2081 handle_request_quit,
2082 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2085 "Disconnect from rrdcached.\n"
2087 }; /* }}} command_t list_of_commands[] */
2088 static size_t list_of_commands_len = sizeof (list_of_commands)
2089 / sizeof (list_of_commands[0]);
2091 static command_t *find_command(char *cmd)
2095 for (i = 0; i < list_of_commands_len; i++)
2096 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2097 return (&list_of_commands[i]);
2101 /* We currently use the index in the `list_of_commands' array as a bit position
2102 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2103 * outside these functions so that switching to a more elegant storage method
2104 * is easily possible. */
2105 static ssize_t find_command_index (const char *cmd) /* {{{ */
2109 for (i = 0; i < list_of_commands_len; i++)
2110 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2111 return ((ssize_t) i);
2113 } /* }}} ssize_t find_command_index */
2115 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2120 if (JOURNAL_REPLAY(sock))
2126 if ((strcasecmp ("QUIT", cmd) == 0)
2127 || (strcasecmp ("HELP", cmd) == 0))
2129 else if (strcmp (".", cmd) == 0)
2132 i = find_command_index (cmd);
2137 if ((sock->permissions & (1 << i)) != 0)
2140 } /* }}} int socket_permission_check */
2142 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2147 i = find_command_index (cmd);
2152 sock->permissions |= (1 << i);
2154 } /* }}} int socket_permission_add */
2156 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2158 sock->permissions = 0;
2159 } /* }}} socket_permission_clear */
2161 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2162 listen_socket_t *src)
2164 dest->permissions = src->permissions;
2165 } /* }}} socket_permission_copy */
2167 /* check whether commands are received in the expected context */
2168 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2170 if (JOURNAL_REPLAY(sock))
2171 return (cmd->context & CMD_CONTEXT_JOURNAL);
2172 else if (sock->batch_start)
2173 return (cmd->context & CMD_CONTEXT_BATCH);
2175 return (cmd->context & CMD_CONTEXT_CLIENT);
2181 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2186 command_t *help = NULL;
2188 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2190 help = find_command(cmd_str);
2192 if (help && (help->syntax || help->help))
2196 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2200 add_response_info(sock, "Usage: %s\n", help->syntax);
2203 add_response_info(sock, "%s\n", help->help);
2209 resp_txt = "Command overview\n";
2211 for (i = 0; i < list_of_commands_len; i++)
2213 if (list_of_commands[i].syntax == NULL)
2215 add_response_info (sock, "%s", list_of_commands[i].syntax);
2219 return send_response(sock, RESP_OK, resp_txt);
2220 } /* }}} int handle_request_help */
2222 static int handle_request (DISPATCH_PROTO) /* {{{ */
2224 char *buffer_ptr = buffer;
2225 char *cmd_str = NULL;
2226 command_t *cmd = NULL;
2229 assert (buffer[buffer_size - 1] == '\0');
2231 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2234 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2238 if (sock != NULL && sock->batch_start)
2241 cmd = find_command(cmd_str);
2243 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2245 if (!socket_permission_check (sock, cmd->cmd))
2246 return send_response(sock, RESP_ERR, "Permission denied.\n");
2248 if (!command_check_context(sock, cmd))
2249 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2251 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2252 } /* }}} int handle_request */
2254 static void journal_set_free (journal_set *js) /* {{{ */
2259 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2262 } /* }}} journal_set_free */
2264 static void journal_set_remove (journal_set *js) /* {{{ */
2269 for (uint i=0; i < js->files_num; i++)
2271 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2272 unlink(js->files[i]);
2274 } /* }}} journal_set_remove */
2276 /* close current journal file handle.
2277 * MUST hold journal_lock before calling */
2278 static void journal_close(void) /* {{{ */
2280 if (journal_fh != NULL)
2282 if (fclose(journal_fh) != 0)
2283 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2288 } /* }}} journal_close */
2290 /* MUST hold journal_lock before calling */
2291 static void journal_new_file(void) /* {{{ */
2295 char new_file[PATH_MAX + 1];
2297 assert(journal_dir != NULL);
2298 assert(journal_cur != NULL);
2302 gettimeofday(&now, NULL);
2303 /* this format assures that the files sort in strcmp() order */
2304 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2305 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2307 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2308 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2312 journal_fh = fdopen(new_fd, "a");
2313 if (journal_fh == NULL)
2316 journal_size = ftell(journal_fh);
2317 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2319 /* record the file in the journal set */
2320 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2326 "JOURNALING DISABLED: Error while trying to create %s : %s",
2327 new_file, rrd_strerror(errno));
2329 "JOURNALING DISABLED: All values will be flushed at shutdown");
2332 config_flush_at_shutdown = 1;
2334 } /* }}} journal_new_file */
2336 /* MUST NOT hold journal_lock before calling this */
2337 static void journal_rotate(void) /* {{{ */
2339 journal_set *old_js = NULL;
2341 if (journal_dir == NULL)
2344 RRDD_LOG(LOG_DEBUG, "rotating journals");
2346 pthread_mutex_lock(&stats_lock);
2347 ++stats_journal_rotate;
2348 pthread_mutex_unlock(&stats_lock);
2350 pthread_mutex_lock(&journal_lock);
2354 /* rotate the journal sets */
2355 old_js = journal_old;
2356 journal_old = journal_cur;
2357 journal_cur = calloc(1, sizeof(journal_set));
2359 if (journal_cur != NULL)
2362 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2364 pthread_mutex_unlock(&journal_lock);
2366 journal_set_remove(old_js);
2367 journal_set_free (old_js);
2369 } /* }}} static void journal_rotate */
2371 /* MUST hold journal_lock when calling */
2372 static void journal_done(void) /* {{{ */
2374 if (journal_cur == NULL)
2379 if (config_flush_at_shutdown)
2381 RRDD_LOG(LOG_INFO, "removing journals");
2382 journal_set_remove(journal_old);
2383 journal_set_remove(journal_cur);
2387 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2388 "journals will be used at next startup");
2391 journal_set_free(journal_cur);
2392 journal_set_free(journal_old);
2395 } /* }}} static void journal_done */
2397 static int journal_write(char *cmd, char *args) /* {{{ */
2401 if (journal_fh == NULL)
2404 pthread_mutex_lock(&journal_lock);
2405 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2406 journal_size += chars;
2408 if (journal_size > JOURNAL_MAX)
2411 pthread_mutex_unlock(&journal_lock);
2415 pthread_mutex_lock(&stats_lock);
2416 stats_journal_bytes += chars;
2417 pthread_mutex_unlock(&stats_lock);
2421 } /* }}} static int journal_write */
2423 static int journal_replay (const char *file) /* {{{ */
2429 char entry[CMD_MAX];
2432 if (file == NULL) return 0;
2435 char *reason = "unknown error";
2437 struct stat statbuf;
2439 memset(&statbuf, 0, sizeof(statbuf));
2440 if (stat(file, &statbuf) != 0)
2442 reason = "stat error";
2445 else if (!S_ISREG(statbuf.st_mode))
2447 reason = "not a regular file";
2450 if (statbuf.st_uid != daemon_uid)
2452 reason = "not owned by daemon user";
2455 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2457 reason = "must not be user/group writable";
2463 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2464 file, rrd_strerror(status), reason);
2469 fh = fopen(file, "r");
2472 if (errno != ENOENT)
2473 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2474 file, rrd_strerror(errno));
2478 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2487 if (fgets(entry, sizeof(entry), fh) == NULL)
2489 entry_len = strlen(entry);
2491 /* check \n termination in case journal writing crashed mid-line */
2494 else if (entry[entry_len - 1] != '\n')
2496 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2501 entry[entry_len - 1] = '\0';
2503 if (handle_request(NULL, now, entry, entry_len) == 0)
2511 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2512 entry_cnt, fail_cnt);
2514 return entry_cnt > 0 ? 1 : 0;
2515 } /* }}} static int journal_replay */
2517 static int journal_sort(const void *v1, const void *v2)
2519 char **jn1 = (char **) v1;
2520 char **jn2 = (char **) v2;
2522 return strcmp(*jn1,*jn2);
2525 static void journal_init(void) /* {{{ */
2527 int had_journal = 0;
2529 struct dirent *dent;
2530 char path[PATH_MAX+1];
2532 if (journal_dir == NULL) return;
2534 pthread_mutex_lock(&journal_lock);
2536 journal_cur = calloc(1, sizeof(journal_set));
2537 if (journal_cur == NULL)
2539 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2543 RRDD_LOG(LOG_INFO, "checking for journal files");
2545 /* Handle old journal files during transition. This gives them the
2546 * correct sort order. TODO: remove after first release
2549 char old_path[PATH_MAX+1];
2550 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2551 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2552 rename(old_path, path);
2554 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2555 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2556 rename(old_path, path);
2559 dir = opendir(journal_dir);
2561 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2564 while ((dent = readdir(dir)) != NULL)
2566 /* looks like a journal file? */
2567 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2570 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2572 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2574 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2581 qsort(journal_cur->files, journal_cur->files_num,
2582 sizeof(journal_cur->files[0]), journal_sort);
2584 for (uint i=0; i < journal_cur->files_num; i++)
2585 had_journal += journal_replay(journal_cur->files[i]);
2589 /* it must have been a crash. start a flush */
2590 if (had_journal && config_flush_at_shutdown)
2591 flush_old_values(-1);
2593 pthread_mutex_unlock(&journal_lock);
2595 RRDD_LOG(LOG_INFO, "journal processing complete");
2597 } /* }}} static void journal_init */
2599 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2601 assert(sock != NULL);
2603 free(sock->rbuf); sock->rbuf = NULL;
2604 free(sock->wbuf); sock->wbuf = NULL;
2606 } /* }}} void free_listen_socket */
2608 static void close_connection(listen_socket_t *sock) /* {{{ */
2616 free_listen_socket(sock);
2618 } /* }}} void close_connection */
2620 static void *connection_thread_main (void *args) /* {{{ */
2622 listen_socket_t *sock;
2625 sock = (listen_socket_t *) args;
2628 /* init read buffers */
2629 sock->next_read = sock->next_cmd = 0;
2630 sock->rbuf = malloc(RBUF_SIZE);
2631 if (sock->rbuf == NULL)
2633 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2634 close_connection(sock);
2638 pthread_mutex_lock (&connection_threads_lock);
2639 connection_threads_num++;
2640 pthread_mutex_unlock (&connection_threads_lock);
2642 while (state == RUNNING)
2649 struct pollfd pollfd;
2653 pollfd.events = POLLIN | POLLPRI;
2656 status = poll (&pollfd, 1, /* timeout = */ 500);
2657 if (state != RUNNING)
2659 else if (status == 0) /* timeout */
2661 else if (status < 0) /* error */
2664 if (status != EINTR)
2665 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2669 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2671 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2673 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2674 "poll(2) returned something unexpected: %#04hx",
2679 rbytes = read(fd, sock->rbuf + sock->next_read,
2680 RBUF_SIZE - sock->next_read);
2683 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2686 else if (rbytes == 0)
2689 sock->next_read += rbytes;
2691 if (sock->batch_start)
2692 now = sock->batch_start;
2696 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2698 status = handle_request (sock, now, cmd, cmd_len+1);
2705 close_connection(sock);
2707 /* Remove this thread from the connection threads list */
2708 pthread_mutex_lock (&connection_threads_lock);
2709 connection_threads_num--;
2710 if (connection_threads_num <= 0)
2711 pthread_cond_broadcast(&connection_threads_done);
2712 pthread_mutex_unlock (&connection_threads_lock);
2715 } /* }}} void *connection_thread_main */
2717 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2720 struct sockaddr_un sa;
2721 listen_socket_t *temp;
2724 char *path_copy, *dir;
2727 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2728 path += strlen("unix:");
2730 /* dirname may modify its argument */
2731 path_copy = strdup(path);
2732 if (path_copy == NULL)
2734 fprintf(stderr, "rrdcached: strdup(): %s\n",
2735 rrd_strerror(errno));
2739 dir = dirname(path_copy);
2740 if (rrd_mkdir_p(dir, 0777) != 0)
2742 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2743 dir, rrd_strerror(errno));
2749 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2750 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2753 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2757 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2759 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2762 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2763 rrd_strerror(errno));
2767 memset (&sa, 0, sizeof (sa));
2768 sa.sun_family = AF_UNIX;
2769 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2771 /* if we've gotten this far, we own the pid file. any daemon started
2772 * with the same args must not be alive. therefore, ensure that we can
2773 * create the socket...
2777 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2780 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2781 path, rrd_strerror(errno));
2786 /* tweak the sockets group ownership */
2787 if (sock->socket_group != (gid_t)-1)
2789 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2790 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2792 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2796 if (sock->socket_permissions != (mode_t)-1)
2798 if (chmod(path, sock->socket_permissions) != 0)
2799 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2800 (unsigned int)sock->socket_permissions, strerror(errno));
2803 status = listen (fd, /* backlog = */ 10);
2806 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2807 path, rrd_strerror(errno));
2813 listen_fds[listen_fds_num].fd = fd;
2814 listen_fds[listen_fds_num].family = PF_UNIX;
2815 strncpy(listen_fds[listen_fds_num].addr, path,
2816 sizeof (listen_fds[listen_fds_num].addr) - 1);
2820 } /* }}} int open_listen_socket_unix */
2822 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2824 struct addrinfo ai_hints;
2825 struct addrinfo *ai_res;
2826 struct addrinfo *ai_ptr;
2827 char addr_copy[NI_MAXHOST];
2832 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2833 addr_copy[sizeof (addr_copy) - 1] = 0;
2836 memset (&ai_hints, 0, sizeof (ai_hints));
2837 ai_hints.ai_flags = 0;
2838 #ifdef AI_ADDRCONFIG
2839 ai_hints.ai_flags |= AI_ADDRCONFIG;
2841 ai_hints.ai_family = AF_UNSPEC;
2842 ai_hints.ai_socktype = SOCK_STREAM;
2845 if (*addr == '[') /* IPv6+port format */
2847 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2850 port = strchr (addr, ']');
2853 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2861 else if (*port == 0)
2865 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2868 } /* if (*addr == '[') */
2871 port = rindex(addr, ':');
2879 status = getaddrinfo (addr,
2880 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2881 &ai_hints, &ai_res);
2884 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2885 addr, gai_strerror (status));
2889 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2892 listen_socket_t *temp;
2895 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2896 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2900 "rrdcached: open_listen_socket_network: realloc failed.\n");
2904 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2906 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2909 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2910 rrd_strerror(errno));
2914 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2916 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2919 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2920 sock->addr, rrd_strerror(errno));
2925 status = listen (fd, /* backlog = */ 10);
2928 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2929 sock->addr, rrd_strerror(errno));
2931 freeaddrinfo(ai_res);
2935 listen_fds[listen_fds_num].fd = fd;
2936 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2938 } /* for (ai_ptr) */
2940 freeaddrinfo(ai_res);
2942 } /* }}} static int open_listen_socket_network */
2944 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2946 assert(sock != NULL);
2947 assert(sock->addr != NULL);
2949 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2950 || sock->addr[0] == '/')
2951 return (open_listen_socket_unix(sock));
2953 return (open_listen_socket_network(sock));
2954 } /* }}} int open_listen_socket */
2956 static int close_listen_sockets (void) /* {{{ */
2960 for (i = 0; i < listen_fds_num; i++)
2962 close (listen_fds[i].fd);
2964 if (listen_fds[i].family == PF_UNIX)
2965 unlink(listen_fds[i].addr);
2973 } /* }}} int close_listen_sockets */
2975 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2977 struct pollfd *pollfds;
2982 if (listen_fds_num < 1)
2984 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2988 pollfds_num = listen_fds_num;
2989 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2990 if (pollfds == NULL)
2992 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2995 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2997 RRDD_LOG(LOG_INFO, "listening for connections");
2999 while (state == RUNNING)
3001 for (i = 0; i < pollfds_num; i++)
3003 pollfds[i].fd = listen_fds[i].fd;
3004 pollfds[i].events = POLLIN | POLLPRI;
3005 pollfds[i].revents = 0;
3008 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
3009 if (state != RUNNING)
3011 else if (status == 0) /* timeout */
3013 else if (status < 0) /* error */
3016 if (status != EINTR)
3018 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3023 for (i = 0; i < pollfds_num; i++)
3025 listen_socket_t *client_sock;
3026 struct sockaddr_storage client_sa;
3027 socklen_t client_sa_size;
3029 pthread_attr_t attr;
3031 if (pollfds[i].revents == 0)
3034 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3036 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3037 "poll(2) returned something unexpected for listen FD #%i.",
3042 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3043 if (client_sock == NULL)
3045 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3048 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3050 client_sa_size = sizeof (client_sa);
3051 client_sock->fd = accept (pollfds[i].fd,
3052 (struct sockaddr *) &client_sa, &client_sa_size);
3053 if (client_sock->fd < 0)
3055 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3060 pthread_attr_init (&attr);
3061 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3063 status = pthread_create (&tid, &attr, connection_thread_main,
3067 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3068 close_connection(client_sock);
3071 } /* for (pollfds_num) */
3072 } /* while (state == RUNNING) */
3074 RRDD_LOG(LOG_INFO, "starting shutdown");
3076 close_listen_sockets ();
3078 pthread_mutex_lock (&connection_threads_lock);
3079 while (connection_threads_num > 0)
3080 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3081 pthread_mutex_unlock (&connection_threads_lock);
3086 } /* }}} void *listen_thread_main */
3088 static int daemonize (void) /* {{{ */
3093 daemon_uid = geteuid();
3095 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3097 pid_fd = check_pidfile();
3101 /* open all the listen sockets */
3102 if (config_listen_address_list_len > 0)
3104 for (size_t i = 0; i < config_listen_address_list_len; i++)
3105 open_listen_socket (config_listen_address_list[i]);
3107 rrd_free_ptrs((void ***) &config_listen_address_list,
3108 &config_listen_address_list_len);
3112 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3113 sizeof(default_socket.addr) - 1);
3114 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3115 open_listen_socket (&default_socket);
3118 if (listen_fds_num < 1)
3120 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3124 if (!stay_foreground)
3131 fprintf (stderr, "daemonize: fork(2) failed.\n");
3137 /* Become session leader */
3140 /* Open the first three file descriptors to /dev/null */
3145 open ("/dev/null", O_RDWR);
3146 if (dup(0) == -1 || dup(0) == -1){
3147 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3149 } /* if (!stay_foreground) */
3151 /* Change into the /tmp directory. */
3152 base_dir = (config_base_dir != NULL)
3156 if (chdir (base_dir) != 0)
3158 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3162 install_signal_handlers();
3164 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3165 RRDD_LOG(LOG_INFO, "starting up");
3167 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3168 (GDestroyNotify) free_cache_item);
3169 if (cache_tree == NULL)
3171 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3175 return write_pidfile (pid_fd);
3180 } /* }}} int daemonize */
3182 static int cleanup (void) /* {{{ */
3184 pthread_cond_broadcast (&flush_cond);
3185 pthread_join (flush_thread, NULL);
3187 pthread_cond_broadcast (&queue_cond);
3188 for (int i = 0; i < config_queue_threads; i++)
3189 pthread_join (queue_threads[i], NULL);
3191 if (config_flush_at_shutdown)
3193 assert(cache_queue_head == NULL);
3194 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3197 free(queue_threads);
3198 free(config_base_dir);
3200 pthread_mutex_lock(&cache_lock);
3201 g_tree_destroy(cache_tree);
3203 pthread_mutex_lock(&journal_lock);
3206 RRDD_LOG(LOG_INFO, "goodbye");
3210 free(config_pid_file);
3213 } /* }}} int cleanup */
3215 static int read_options (int argc, char **argv) /* {{{ */
3220 socket_permission_clear (&default_socket);
3222 default_socket.socket_group = (gid_t)-1;
3223 default_socket.socket_permissions = (mode_t)-1;
3225 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3230 opt_no_overwrite = 1;
3239 listen_socket_t *new;
3241 new = malloc(sizeof(listen_socket_t));
3244 fprintf(stderr, "read_options: malloc failed.\n");
3247 memset(new, 0, sizeof(listen_socket_t));
3249 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3251 /* Add permissions to the socket {{{ */
3252 if (default_socket.permissions != 0)
3254 socket_permission_copy (new, &default_socket);
3256 else /* if (default_socket.permissions == 0) */
3258 /* Add permission for ALL commands to the socket. */
3260 for (i = 0; i < list_of_commands_len; i++)
3262 status = socket_permission_add (new, list_of_commands[i].cmd);
3265 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3266 "socket failed. This should never happen, ever! Sorry.\n",
3267 list_of_commands[i].cmd);
3272 /* }}} Done adding permissions. */
3274 new->socket_group = default_socket.socket_group;
3275 new->socket_permissions = default_socket.socket_permissions;
3277 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3278 &config_listen_address_list_len, new))
3280 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3286 /* set socket group permissions */
3292 group_gid = strtoul(optarg, NULL, 10);
3293 if (errno != EINVAL && group_gid>0)
3295 /* we were passed a number */
3296 grp = getgrgid(group_gid);
3300 grp = getgrnam(optarg);
3305 default_socket.socket_group = grp->gr_gid;
3309 /* no idea what the user wanted... */
3310 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3316 /* set socket file permissions */
3320 char *endptr = NULL;
3322 tmp = strtol (optarg, &endptr, 8);
3323 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3324 || (tmp > 07777) || (tmp < 0)) {
3325 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3330 default_socket.socket_permissions = (mode_t)tmp;
3341 socket_permission_clear (&default_socket);
3343 optcopy = strdup (optarg);
3346 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3349 status = socket_permission_add (&default_socket, ptr);
3352 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3353 "socket failed. Most likely, this permission doesn't "
3354 "exist. Check your command line.\n", ptr);
3367 temp = atoi (optarg);
3369 config_flush_interval = temp;
3372 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3382 temp = atoi (optarg);
3384 config_write_interval = temp;
3387 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3397 temp = atoi(optarg);
3399 config_write_jitter = temp;
3402 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3412 threads = atoi(optarg);
3414 config_queue_threads = threads;
3417 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3424 config_write_base_only = 1;
3430 char base_realpath[PATH_MAX];
3432 if (config_base_dir != NULL)
3433 free (config_base_dir);
3434 config_base_dir = strdup (optarg);
3435 if (config_base_dir == NULL)
3437 fprintf (stderr, "read_options: strdup failed.\n");
3441 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3443 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3444 config_base_dir, rrd_strerror (errno));
3448 /* make sure that the base directory is not resolved via
3449 * symbolic links. this makes some performance-enhancing
3450 * assumptions possible (we don't have to resolve paths
3451 * that start with a "/")
3453 if (realpath(config_base_dir, base_realpath) == NULL)
3455 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3456 "%s\n", config_base_dir, rrd_strerror(errno));
3460 len = strlen (config_base_dir);
3461 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3463 config_base_dir[len - 1] = 0;
3469 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3473 _config_base_dir_len = len;
3475 len = strlen (base_realpath);
3476 while ((len > 0) && (base_realpath[len - 1] == '/'))
3478 base_realpath[len - 1] = '\0';
3482 if (strncmp(config_base_dir,
3483 base_realpath, sizeof(base_realpath)) != 0)
3486 "Base directory (-b) resolved via file system links!\n"
3487 "Please consult rrdcached '-b' documentation!\n"
3488 "Consider specifying the real directory (%s)\n",
3497 if (config_pid_file != NULL)
3498 free (config_pid_file);
3499 config_pid_file = strdup (optarg);
3500 if (config_pid_file == NULL)
3502 fprintf (stderr, "read_options: strdup failed.\n");
3509 config_flush_at_shutdown = 1;
3514 char journal_dir_actual[PATH_MAX];
3516 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3518 status = rrd_mkdir_p(dir, 0777);
3521 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3522 dir, rrd_strerror(errno));
3526 if (access(dir, R_OK|W_OK|X_OK) != 0)
3528 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3529 errno ? rrd_strerror(errno) : "");
3537 int temp = atoi(optarg);
3539 config_alloc_chunk = temp;
3542 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3550 printf ("RRDCacheD %s\n"
3551 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3553 "Usage: rrdcached [options]\n"
3555 "Valid options are:\n"
3556 " -l <address> Socket address to listen to.\n"
3557 " -P <perms> Sets the permissions to assign to all following "
3559 " -w <seconds> Interval in which to write data.\n"
3560 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3561 " -t <threads> Number of write threads.\n"
3562 " -f <seconds> Interval in which to flush dead data.\n"
3563 " -p <file> Location of the PID-file.\n"
3564 " -b <dir> Base directory to change to.\n"
3565 " -B Restrict file access to paths within -b <dir>\n"
3566 " -g Do not fork and run in the foreground.\n"
3567 " -j <dir> Directory in which to create the journal files.\n"
3568 " -F Always flush all updates at shutdown\n"
3569 " -s <id|name> Group owner of all following UNIX sockets\n"
3570 " (the socket will also have read/write permissions "
3572 " -m <mode> File permissions (octal) of all following UNIX "
3574 " -a <size> Memory allocation chunk size. Default is 1.\n"
3575 " -O Do not allow CREATE commands to overwrite existing\n"
3576 " files, even if asked to.\n"
3578 "For more information and a detailed description of all options "
3580 "to the rrdcached(1) manual page.\n",
3587 } /* switch (option) */
3588 } /* while (getopt) */
3590 /* advise the user when values are not sane */
3591 if (config_flush_interval < 2 * config_write_interval)
3592 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3593 " 2x write interval (-w) !\n");
3594 if (config_write_jitter > config_write_interval)
3595 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3596 " write interval (-w) !\n");
3598 if (config_write_base_only && config_base_dir == NULL)
3599 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3600 " Consult the rrdcached documentation\n");
3602 if (journal_dir == NULL)
3603 config_flush_at_shutdown = 1;
3606 } /* }}} int read_options */
3608 int main (int argc, char **argv)
3612 status = read_options (argc, argv);
3620 status = daemonize ();
3623 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3629 /* start the queue threads */
3630 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3631 if (queue_threads == NULL)
3633 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3637 for (int i = 0; i < config_queue_threads; i++)
3639 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3640 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3643 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3649 /* start the flush thread */
3650 memset(&flush_thread, 0, sizeof(flush_thread));
3651 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3654 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3659 listen_thread_main (NULL);
3666 * vim: set sw=2 sts=2 ts=8 et fdm=marker :