2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008-2010 Florian octo Forster
4 * Copyright (C) 2008,2009 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
66 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__) && !defined(HAVE_CONFIG_H)
67 #include "../win32/config.h"
70 #include "../rrd_config.h"
75 #include "rrd_client.h"
87 #include <sys/socket.h>
95 #include <sys/types.h>
107 #include <sys/time.h>
112 #include <glib-2.0/glib.h>
115 #define RRDD_LOG(severity, ...) \
117 if (stay_foreground) { \
118 fprintf(stderr, __VA_ARGS__); \
119 fprintf(stderr, "\n"); } \
120 syslog ((severity), __VA_ARGS__); \
126 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
128 struct listen_socket_s
131 char addr[PATH_MAX + 1];
134 /* state for BATCH processing */
146 uint32_t permissions;
149 mode_t socket_permissions;
151 typedef struct listen_socket_s listen_socket_t;
154 typedef struct command_s command_t;
155 /* note: guard against "unused" warnings in the handlers */
156 #define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\
158 char UNUSED(*buffer),\
159 size_t UNUSED(buffer_size)
161 #define HANDLER_PROTO command_t UNUSED(*cmd),\
166 int (*handler)(HANDLER_PROTO);
168 char context; /* where we expect to see it */
169 #define CMD_CONTEXT_CLIENT (1<<0)
170 #define CMD_CONTEXT_BATCH (1<<1)
171 #define CMD_CONTEXT_JOURNAL (1<<2)
172 #define CMD_CONTEXT_ANY (0x7f)
179 typedef struct cache_item_s cache_item_t;
184 size_t values_num; /* number of valid pointers */
185 size_t values_alloc; /* number of allocated pointers */
186 time_t last_flush_time;
187 time_t last_update_stamp;
188 #define CI_FLAGS_IN_TREE (1<<0)
189 #define CI_FLAGS_IN_QUEUE (1<<1)
191 pthread_cond_t flushed;
196 struct callback_flush_data_s
203 typedef struct callback_flush_data_s callback_flush_data_t;
210 typedef enum queue_side_e queue_side_t;
212 /* describe a set of journal files */
218 /* max length of socket command or response */
220 #define RBUF_SIZE (CMD_MAX*2)
225 static int stay_foreground = 0;
226 static uid_t daemon_uid;
228 static listen_socket_t *listen_fds = NULL;
229 static size_t listen_fds_num = 0;
231 static listen_socket_t default_socket;
234 RUNNING, /* normal operation */
235 FLUSHING, /* flushing remaining values */
236 SHUTDOWN /* shutting down */
239 static pthread_t *queue_threads;
240 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
241 static int config_queue_threads = 4;
243 static pthread_t flush_thread;
244 static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER;
246 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
247 static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER;
248 static int connection_threads_num = 0;
251 static GTree *cache_tree = NULL;
252 static cache_item_t *cache_queue_head = NULL;
253 static cache_item_t *cache_queue_tail = NULL;
254 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
256 static int config_write_interval = 300;
257 static int config_write_jitter = 0;
258 static int config_flush_interval = 3600;
259 static int config_flush_at_shutdown = 0;
260 static char *config_pid_file = NULL;
261 static char *config_base_dir = NULL;
262 static size_t _config_base_dir_len = 0;
263 static int config_write_base_only = 0;
264 static size_t config_alloc_chunk = 1;
266 static listen_socket_t **config_listen_address_list = NULL;
267 static size_t config_listen_address_list_len = 0;
269 static uint64_t stats_queue_length = 0;
270 static uint64_t stats_updates_received = 0;
271 static uint64_t stats_flush_received = 0;
272 static uint64_t stats_updates_written = 0;
273 static uint64_t stats_data_sets_written = 0;
274 static uint64_t stats_journal_bytes = 0;
275 static uint64_t stats_journal_rotate = 0;
276 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
278 static int opt_no_overwrite = 0;
280 /* Journaled updates */
281 #define JOURNAL_REPLAY(s) ((s) == NULL)
282 #define JOURNAL_BASE "rrd.journal"
283 static journal_set *journal_cur = NULL;
284 static journal_set *journal_old = NULL;
285 static char *journal_dir = NULL;
286 static FILE *journal_fh = NULL; /* current journal file handle */
287 static long journal_size = 0; /* current journal size */
288 #define JOURNAL_MAX (1 * 1024 * 1024 * 1024)
289 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
290 static int journal_write(char *cmd, char *args);
291 static void journal_done(void);
292 static void journal_rotate(void);
294 /* prototypes for forward refernces */
295 static int handle_request_help (HANDLER_PROTO);
300 static void sig_common (const char *sig) /* {{{ */
302 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
304 pthread_cond_broadcast(&flush_cond);
305 pthread_cond_broadcast(&queue_cond);
306 } /* }}} void sig_common */
308 static void sig_int_handler (int UNUSED(s)) /* {{{ */
311 } /* }}} void sig_int_handler */
313 static void sig_term_handler (int UNUSED(s)) /* {{{ */
316 } /* }}} void sig_term_handler */
318 static void sig_usr1_handler (int UNUSED(s)) /* {{{ */
320 config_flush_at_shutdown = 1;
322 } /* }}} void sig_usr1_handler */
324 static void sig_usr2_handler (int UNUSED(s)) /* {{{ */
326 config_flush_at_shutdown = 0;
328 } /* }}} void sig_usr2_handler */
330 static void install_signal_handlers(void) /* {{{ */
332 /* These structures are static, because `sigaction' behaves weird if the are
334 static struct sigaction sa_int;
335 static struct sigaction sa_term;
336 static struct sigaction sa_pipe;
337 static struct sigaction sa_usr1;
338 static struct sigaction sa_usr2;
340 /* Install signal handlers */
341 memset (&sa_int, 0, sizeof (sa_int));
342 sa_int.sa_handler = sig_int_handler;
343 sigaction (SIGINT, &sa_int, NULL);
345 memset (&sa_term, 0, sizeof (sa_term));
346 sa_term.sa_handler = sig_term_handler;
347 sigaction (SIGTERM, &sa_term, NULL);
349 memset (&sa_pipe, 0, sizeof (sa_pipe));
350 sa_pipe.sa_handler = SIG_IGN;
351 sigaction (SIGPIPE, &sa_pipe, NULL);
353 memset (&sa_pipe, 0, sizeof (sa_usr1));
354 sa_usr1.sa_handler = sig_usr1_handler;
355 sigaction (SIGUSR1, &sa_usr1, NULL);
357 memset (&sa_usr2, 0, sizeof (sa_usr2));
358 sa_usr2.sa_handler = sig_usr2_handler;
359 sigaction (SIGUSR2, &sa_usr2, NULL);
361 } /* }}} void install_signal_handlers */
363 static int open_pidfile(char *action, int oflag) /* {{{ */
367 char *file_copy, *dir;
369 file = (config_pid_file != NULL)
371 : LOCALSTATEDIR "/run/rrdcached.pid";
373 /* dirname may modify its argument */
374 file_copy = strdup(file);
375 if (file_copy == NULL)
377 fprintf(stderr, "rrdcached: strdup(): %s\n",
378 rrd_strerror(errno));
382 dir = dirname(file_copy);
383 if (rrd_mkdir_p(dir, 0777) != 0)
385 fprintf(stderr, "Failed to create pidfile directory '%s': %s\n",
386 dir, rrd_strerror(errno));
392 fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH);
394 fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n",
395 action, file, rrd_strerror(errno));
398 } /* }}} static int open_pidfile */
400 /* check existing pid file to see whether a daemon is running */
401 static int check_pidfile(void)
407 pid_fd = open_pidfile("open", O_RDWR);
411 if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0)
418 /* another running process that we can signal COULD be
419 * a competing rrdcached */
420 if (pid != getpid() && kill(pid, 0) == 0)
423 "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid);
428 lseek(pid_fd, 0, SEEK_SET);
429 if (ftruncate(pid_fd, 0) == -1)
432 "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
438 "rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
439 "rrdcached: starting normally.\n", pid);
442 } /* }}} static int check_pidfile */
444 static int write_pidfile (int fd) /* {{{ */
451 fh = fdopen (fd, "w");
454 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
459 fprintf (fh, "%i\n", (int) pid);
463 } /* }}} int write_pidfile */
465 static int remove_pidfile (void) /* {{{ */
470 file = (config_pid_file != NULL)
472 : LOCALSTATEDIR "/run/rrdcached.pid";
474 status = unlink (file);
478 } /* }}} int remove_pidfile */
480 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
484 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
485 sock->next_read - sock->next_cmd);
489 /* no commands left, move remainder back to front of rbuf */
490 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
491 sock->next_read - sock->next_cmd);
492 sock->next_read -= sock->next_cmd;
499 char *cmd = sock->rbuf + sock->next_cmd;
502 sock->next_cmd = eol - sock->rbuf + 1;
504 if (eol > sock->rbuf && *(eol-1) == '\r')
505 *(--eol) = '\0'; /* handle "\r\n" EOL */
514 } /* }}} char *next_cmd */
516 /* add the characters directly to the write buffer */
517 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
521 assert(sock != NULL);
523 new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
526 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
530 strncpy(new_buf + sock->wbuf_len, str, len + 1);
532 sock->wbuf = new_buf;
533 sock->wbuf_len += len;
536 } /* }}} static int add_to_wbuf */
538 /* add the text to the "extra" info that's sent after the status line */
539 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
542 char buffer[CMD_MAX];
545 if (JOURNAL_REPLAY(sock)) return 0;
546 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
549 #ifdef HAVE_VSNPRINTF
550 len = vsnprintf(buffer, sizeof(buffer), fmt, argp);
552 len = vsprintf(buffer, fmt, argp);
557 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
561 return add_to_wbuf(sock, buffer, len);
562 } /* }}} static int add_response_info */
564 static int count_lines(char *str) /* {{{ */
570 while ((str = strchr(str, '\n')) != NULL)
578 } /* }}} static int count_lines */
580 /* send the response back to the user.
581 * returns 0 on success, -1 on error
582 * write buffer is always zeroed after this call */
583 static int send_response (listen_socket_t *sock, response_code rc,
584 char *fmt, ...) /* {{{ */
587 char buffer[CMD_MAX];
592 if (JOURNAL_REPLAY(sock)) return rc;
594 if (sock->batch_start)
597 return rc; /* no response on success during BATCH */
598 lines = sock->batch_cmd;
600 else if (rc == RESP_OK)
601 lines = count_lines(sock->wbuf);
605 rclen = sprintf(buffer, "%d ", lines);
607 #ifdef HAVE_VSNPRINTF
608 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp);
610 len = vsprintf(buffer+rclen, fmt, argp);
618 /* append the result to the wbuf, don't write to the user */
619 if (sock->batch_start)
620 return add_to_wbuf(sock, buffer, len);
622 /* first write must be complete */
623 if (len != write(sock->fd, buffer, len))
625 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
629 if (sock->wbuf != NULL && rc == RESP_OK)
632 while (wrote < sock->wbuf_len)
634 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
637 RRDD_LOG(LOG_INFO, "send_response: could not write results");
644 free(sock->wbuf); sock->wbuf = NULL;
650 static void wipe_ci_values(cache_item_t *ci, time_t when)
654 ci->values_alloc = 0;
656 ci->last_flush_time = when;
657 if (config_write_jitter > 0)
658 ci->last_flush_time += (rrd_random() % config_write_jitter);
662 * remove a "cache_item_t" item from the queue.
663 * must hold 'cache_lock' when calling this
665 static void remove_from_queue(cache_item_t *ci) /* {{{ */
667 if (ci == NULL) return;
668 if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */
670 if (ci->prev == NULL)
671 cache_queue_head = ci->next; /* reset head */
673 ci->prev->next = ci->next;
675 if (ci->next == NULL)
676 cache_queue_tail = ci->prev; /* reset the tail */
678 ci->next->prev = ci->prev;
680 ci->next = ci->prev = NULL;
681 ci->flags &= ~CI_FLAGS_IN_QUEUE;
683 pthread_mutex_lock (&stats_lock);
684 assert (stats_queue_length > 0);
685 stats_queue_length--;
686 pthread_mutex_unlock (&stats_lock);
688 } /* }}} static void remove_from_queue */
690 /* free the resources associated with the cache_item_t
691 * must hold cache_lock when calling this function
693 static void *free_cache_item(cache_item_t *ci) /* {{{ */
695 if (ci == NULL) return NULL;
697 remove_from_queue(ci);
699 for (size_t i=0; i < ci->values_num; i++)
705 /* in case anyone is waiting */
706 pthread_cond_broadcast(&ci->flushed);
707 pthread_cond_destroy(&ci->flushed);
712 } /* }}} static void *free_cache_item */
715 * enqueue_cache_item:
716 * `cache_lock' must be acquired before calling this function!
718 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
724 if (ci->values_num == 0)
729 if (cache_queue_head == ci)
732 /* remove if further down in queue */
733 remove_from_queue(ci);
736 ci->next = cache_queue_head;
737 if (ci->next != NULL)
739 cache_queue_head = ci;
741 if (cache_queue_tail == NULL)
742 cache_queue_tail = cache_queue_head;
744 else /* (side == TAIL) */
746 /* We don't move values back in the list.. */
747 if (ci->flags & CI_FLAGS_IN_QUEUE)
750 assert (ci->next == NULL);
751 assert (ci->prev == NULL);
753 ci->prev = cache_queue_tail;
755 if (cache_queue_tail == NULL)
756 cache_queue_head = ci;
758 cache_queue_tail->next = ci;
760 cache_queue_tail = ci;
763 ci->flags |= CI_FLAGS_IN_QUEUE;
765 pthread_cond_signal(&queue_cond);
766 pthread_mutex_lock (&stats_lock);
767 stats_queue_length++;
768 pthread_mutex_unlock (&stats_lock);
771 } /* }}} int enqueue_cache_item */
774 * tree_callback_flush:
775 * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held
776 * while this is in progress.
778 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
782 callback_flush_data_t *cfd;
784 ci = (cache_item_t *) value;
785 cfd = (callback_flush_data_t *) data;
787 if (ci->flags & CI_FLAGS_IN_QUEUE)
790 if (ci->values_num > 0
791 && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
793 enqueue_cache_item (ci, TAIL);
795 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
796 && (ci->values_num <= 0))
798 assert ((char *) key == ci->file);
799 if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
801 RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
807 } /* }}} gboolean tree_callback_flush */
809 static int flush_old_values (int max_age)
811 callback_flush_data_t cfd;
814 memset (&cfd, 0, sizeof (cfd));
815 /* Pass the current time as user data so that we don't need to call
816 * `time' for each node. */
817 cfd.now = time (NULL);
822 cfd.abs_timeout = cfd.now - max_age;
824 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
826 /* `tree_callback_flush' will return the keys of all values that haven't
827 * been touched in the last `config_flush_interval' seconds in `cfd'.
828 * The char*'s in this array point to the same memory as ci->file, so we
829 * don't need to free them separately. */
830 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
832 for (k = 0; k < cfd.keys_num; k++)
834 gboolean status = g_tree_remove(cache_tree, cfd.keys[k]);
835 /* should never fail, since we have held the cache_lock
837 assert(status == TRUE);
840 if (cfd.keys != NULL)
847 } /* int flush_old_values */
849 static void *flush_thread_main (void UNUSED(*args)) /* {{{ */
852 struct timespec next_flush;
855 gettimeofday (&now, NULL);
856 next_flush.tv_sec = now.tv_sec + config_flush_interval;
857 next_flush.tv_nsec = 1000 * now.tv_usec;
859 pthread_mutex_lock(&cache_lock);
861 while (state == RUNNING)
863 gettimeofday (&now, NULL);
864 if ((now.tv_sec > next_flush.tv_sec)
865 || ((now.tv_sec == next_flush.tv_sec)
866 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
868 RRDD_LOG(LOG_DEBUG, "flushing old values");
870 /* Determine the time of the next cache flush. */
871 next_flush.tv_sec = now.tv_sec + config_flush_interval;
873 /* Flush all values that haven't been written in the last
874 * `config_write_interval' seconds. */
875 flush_old_values (config_write_interval);
877 /* unlock the cache while we rotate so we don't block incoming
878 * updates if the fsync() blocks on disk I/O */
879 pthread_mutex_unlock(&cache_lock);
881 pthread_mutex_lock(&cache_lock);
884 status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush);
885 if (status != 0 && status != ETIMEDOUT)
887 RRDD_LOG (LOG_ERR, "flush_thread_main: "
888 "pthread_cond_timedwait returned %i.", status);
892 if (config_flush_at_shutdown)
893 flush_old_values (-1); /* flush everything */
897 pthread_mutex_unlock(&cache_lock);
900 } /* void *flush_thread_main */
902 static void *queue_thread_main (void UNUSED(*args)) /* {{{ */
904 pthread_mutex_lock (&cache_lock);
906 while (state != SHUTDOWN
907 || (cache_queue_head != NULL && config_flush_at_shutdown))
915 /* Now, check if there's something to store away. If not, wait until
916 * something comes in. */
917 if (cache_queue_head == NULL)
919 status = pthread_cond_wait (&queue_cond, &cache_lock);
920 if ((status != 0) && (status != ETIMEDOUT))
922 RRDD_LOG (LOG_ERR, "queue_thread_main: "
923 "pthread_cond_wait returned %i.", status);
927 /* Check if a value has arrived. This may be NULL if we timed out or there
928 * was an interrupt such as a signal. */
929 if (cache_queue_head == NULL)
932 ci = cache_queue_head;
934 /* copy the relevant parts */
935 file = strdup (ci->file);
938 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
942 assert(ci->values != NULL);
943 assert(ci->values_num > 0);
946 values_num = ci->values_num;
948 wipe_ci_values(ci, time(NULL));
949 remove_from_queue(ci);
951 pthread_mutex_unlock (&cache_lock);
954 status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
957 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
958 "rrd_update_r (%s) failed with status %i. (%s)",
959 file, status, rrd_get_error());
962 journal_write("wrote", file);
964 /* Search again in the tree. It's possible someone issued a "FORGET"
965 * while we were writing the update values. */
966 pthread_mutex_lock(&cache_lock);
967 ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
969 pthread_cond_broadcast(&ci->flushed);
970 pthread_mutex_unlock(&cache_lock);
974 pthread_mutex_lock (&stats_lock);
975 stats_updates_written++;
976 stats_data_sets_written += values_num;
977 pthread_mutex_unlock (&stats_lock);
980 rrd_free_ptrs((void ***) &values, &values_num);
983 pthread_mutex_lock (&cache_lock);
985 pthread_mutex_unlock (&cache_lock);
988 } /* }}} void *queue_thread_main */
990 static int buffer_get_field (char **buffer_ret, /* {{{ */
991 size_t *buffer_size_ret, char **field_ret)
1000 buffer = *buffer_ret;
1002 buffer_size = *buffer_size_ret;
1003 field = *buffer_ret;
1006 if (buffer_size <= 0)
1009 /* This is ensured by `handle_request'. */
1010 assert (buffer[buffer_size - 1] == '\0');
1013 while (buffer_pos < buffer_size)
1015 /* Check for end-of-field or end-of-buffer */
1016 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
1018 field[field_size] = 0;
1024 /* Handle escaped characters. */
1025 else if (buffer[buffer_pos] == '\\')
1027 if (buffer_pos >= (buffer_size - 1))
1030 field[field_size] = buffer[buffer_pos];
1034 /* Normal operation */
1037 field[field_size] = buffer[buffer_pos];
1041 } /* while (buffer_pos < buffer_size) */
1046 *buffer_ret = buffer + buffer_pos;
1047 *buffer_size_ret = buffer_size - buffer_pos;
1051 } /* }}} int buffer_get_field */
1053 /* if we're restricting writes to the base directory,
1054 * check whether the file falls within the dir
1055 * returns 1 if OK, otherwise 0
1057 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
1059 assert(file != NULL);
1061 if (!config_write_base_only
1062 || JOURNAL_REPLAY(sock)
1063 || config_base_dir == NULL)
1066 if (strstr(file, "../") != NULL) goto err;
1068 /* relative paths without "../" are ok */
1069 if (*file != '/') return 1;
1071 /* file must be of the format base + "/" + <1+ char filename> */
1072 if (strlen(file) < _config_base_dir_len + 2) goto err;
1073 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
1074 if (*(file + _config_base_dir_len) != '/') goto err;
1079 if (sock != NULL && sock->fd >= 0)
1080 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
1083 } /* }}} static int check_file_access */
1085 /* when using a base dir, convert relative paths to absolute paths.
1086 * if necessary, modifies the "filename" pointer to point
1087 * to the new path created in "tmp". "tmp" is provided
1088 * by the caller and sizeof(tmp) must be >= PATH_MAX.
1090 * this allows us to optimize for the expected case (absolute path)
1093 static void get_abs_path(char **filename, char *tmp)
1095 assert(tmp != NULL);
1096 assert(filename != NULL && *filename != NULL);
1098 if (config_base_dir == NULL || **filename == '/')
1101 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
1103 } /* }}} static int get_abs_path */
1105 static int flush_file (const char *filename) /* {{{ */
1109 pthread_mutex_lock (&cache_lock);
1111 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1114 pthread_mutex_unlock (&cache_lock);
1118 if (ci->values_num > 0)
1120 /* Enqueue at head */
1121 enqueue_cache_item (ci, HEAD);
1122 pthread_cond_wait(&ci->flushed, &cache_lock);
1125 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1126 * may have been purged during our cond_wait() */
1128 pthread_mutex_unlock(&cache_lock);
1131 } /* }}} int flush_file */
1133 static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */
1135 char *err = "Syntax error.\n";
1137 if (cmd && cmd->syntax)
1140 return send_response(sock, RESP_ERR, "Usage: %s", err);
1141 } /* }}} static int syntax_error() */
1143 static int handle_request_stats (HANDLER_PROTO) /* {{{ */
1145 uint64_t copy_queue_length;
1146 uint64_t copy_updates_received;
1147 uint64_t copy_flush_received;
1148 uint64_t copy_updates_written;
1149 uint64_t copy_data_sets_written;
1150 uint64_t copy_journal_bytes;
1151 uint64_t copy_journal_rotate;
1153 uint64_t tree_nodes_number;
1154 uint64_t tree_depth;
1156 pthread_mutex_lock (&stats_lock);
1157 copy_queue_length = stats_queue_length;
1158 copy_updates_received = stats_updates_received;
1159 copy_flush_received = stats_flush_received;
1160 copy_updates_written = stats_updates_written;
1161 copy_data_sets_written = stats_data_sets_written;
1162 copy_journal_bytes = stats_journal_bytes;
1163 copy_journal_rotate = stats_journal_rotate;
1164 pthread_mutex_unlock (&stats_lock);
1166 pthread_mutex_lock (&cache_lock);
1167 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1168 tree_depth = (uint64_t) g_tree_height (cache_tree);
1169 pthread_mutex_unlock (&cache_lock);
1171 add_response_info(sock,
1172 "QueueLength: %"PRIu64"\n", copy_queue_length);
1173 add_response_info(sock,
1174 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1175 add_response_info(sock,
1176 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1177 add_response_info(sock,
1178 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1179 add_response_info(sock,
1180 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1181 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1182 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1183 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1184 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1186 send_response(sock, RESP_OK, "Statistics follow\n");
1189 } /* }}} int handle_request_stats */
1191 static int handle_request_flush (HANDLER_PROTO) /* {{{ */
1193 char *file, file_tmp[PATH_MAX];
1196 status = buffer_get_field (&buffer, &buffer_size, &file);
1199 return syntax_error(sock,cmd);
1203 pthread_mutex_lock(&stats_lock);
1204 stats_flush_received++;
1205 pthread_mutex_unlock(&stats_lock);
1207 get_abs_path(&file, file_tmp);
1208 if (!check_file_access(file, sock)) return 0;
1210 status = flush_file (file);
1212 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1213 else if (status == ENOENT)
1215 /* no file in our tree; see whether it exists at all */
1216 struct stat statbuf;
1218 memset(&statbuf, 0, sizeof(statbuf));
1219 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1220 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1222 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1224 else if (status < 0)
1225 return send_response(sock, RESP_ERR, "Internal error.\n");
1227 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1232 } /* }}} int handle_request_flush */
1234 static int handle_request_flushall(HANDLER_PROTO) /* {{{ */
1236 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1238 pthread_mutex_lock(&cache_lock);
1239 flush_old_values(-1);
1240 pthread_mutex_unlock(&cache_lock);
1242 return send_response(sock, RESP_OK, "Started flush.\n");
1243 } /* }}} static int handle_request_flushall */
1245 static int handle_request_pending(HANDLER_PROTO) /* {{{ */
1248 char *file, file_tmp[PATH_MAX];
1251 status = buffer_get_field(&buffer, &buffer_size, &file);
1253 return syntax_error(sock,cmd);
1255 get_abs_path(&file, file_tmp);
1257 pthread_mutex_lock(&cache_lock);
1258 ci = g_tree_lookup(cache_tree, file);
1261 pthread_mutex_unlock(&cache_lock);
1262 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1265 for (size_t i=0; i < ci->values_num; i++)
1266 add_response_info(sock, "%s\n", ci->values[i]);
1268 pthread_mutex_unlock(&cache_lock);
1269 return send_response(sock, RESP_OK, "updates pending\n");
1270 } /* }}} static int handle_request_pending */
1272 static int handle_request_forget(HANDLER_PROTO) /* {{{ */
1276 char *file, file_tmp[PATH_MAX];
1278 status = buffer_get_field(&buffer, &buffer_size, &file);
1280 return syntax_error(sock,cmd);
1282 get_abs_path(&file, file_tmp);
1283 if (!check_file_access(file, sock)) return 0;
1285 pthread_mutex_lock(&cache_lock);
1286 found = g_tree_remove(cache_tree, file);
1287 pthread_mutex_unlock(&cache_lock);
1291 if (!JOURNAL_REPLAY(sock))
1292 journal_write("forget", file);
1294 return send_response(sock, RESP_OK, "Gone!\n");
1297 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1301 } /* }}} static int handle_request_forget */
1303 static int handle_request_queue (HANDLER_PROTO) /* {{{ */
1307 pthread_mutex_lock(&cache_lock);
1309 ci = cache_queue_head;
1312 add_response_info(sock, "%d %s\n", ci->values_num, ci->file);
1316 pthread_mutex_unlock(&cache_lock);
1318 return send_response(sock, RESP_OK, "in queue.\n");
1319 } /* }}} int handle_request_queue */
1321 static int handle_request_update (HANDLER_PROTO) /* {{{ */
1323 char *file, file_tmp[PATH_MAX];
1326 char orig_buf[CMD_MAX];
1330 /* save it for the journal later */
1331 if (!JOURNAL_REPLAY(sock))
1332 strncpy(orig_buf, buffer, buffer_size);
1334 status = buffer_get_field (&buffer, &buffer_size, &file);
1336 return syntax_error(sock,cmd);
1338 pthread_mutex_lock(&stats_lock);
1339 stats_updates_received++;
1340 pthread_mutex_unlock(&stats_lock);
1342 get_abs_path(&file, file_tmp);
1343 if (!check_file_access(file, sock)) return 0;
1345 pthread_mutex_lock (&cache_lock);
1346 ci = g_tree_lookup (cache_tree, file);
1348 if (ci == NULL) /* {{{ */
1350 struct stat statbuf;
1353 /* don't hold the lock while we setup; stat(2) might block */
1354 pthread_mutex_unlock(&cache_lock);
1356 memset (&statbuf, 0, sizeof (statbuf));
1357 status = stat (file, &statbuf);
1360 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1363 if (status == ENOENT)
1364 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1366 return send_response(sock, RESP_ERR,
1367 "stat failed with error %i.\n", status);
1369 if (!S_ISREG (statbuf.st_mode))
1370 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1372 if (access(file, R_OK|W_OK) != 0)
1373 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1374 file, rrd_strerror(errno));
1376 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1379 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1381 return send_response(sock, RESP_ERR, "malloc failed.\n");
1383 memset (ci, 0, sizeof (cache_item_t));
1385 ci->file = strdup (file);
1386 if (ci->file == NULL)
1389 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1391 return send_response(sock, RESP_ERR, "strdup failed.\n");
1394 wipe_ci_values(ci, now);
1395 ci->flags = CI_FLAGS_IN_TREE;
1396 pthread_cond_init(&ci->flushed, NULL);
1398 pthread_mutex_lock(&cache_lock);
1400 /* another UPDATE might have added this entry in the meantime */
1401 tmp = g_tree_lookup (cache_tree, file);
1403 g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
1406 free_cache_item (ci);
1410 /* state may have changed while we were unlocked */
1411 if (state == SHUTDOWN)
1414 assert (ci != NULL);
1416 /* don't re-write updates in replay mode */
1417 if (!JOURNAL_REPLAY(sock))
1418 journal_write("update", orig_buf);
1420 while (buffer_size > 0)
1426 status = buffer_get_field (&buffer, &buffer_size, &value);
1429 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1433 /* make sure update time is always moving forward */
1434 stamp = strtol(value, &eostamp, 10);
1435 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1437 pthread_mutex_unlock(&cache_lock);
1438 return send_response(sock, RESP_ERR,
1439 "Cannot find timestamp in '%s'!\n", value);
1441 else if (stamp <= ci->last_update_stamp)
1443 pthread_mutex_unlock(&cache_lock);
1444 return send_response(sock, RESP_ERR,
1445 "illegal attempt to update using time %ld when last"
1446 " update time is %ld (minimum one second step)\n",
1447 stamp, ci->last_update_stamp);
1450 ci->last_update_stamp = stamp;
1452 if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value,
1453 &ci->values_alloc, config_alloc_chunk))
1455 RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
1462 if (((now - ci->last_flush_time) >= config_write_interval)
1463 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1464 && (ci->values_num > 0))
1466 enqueue_cache_item (ci, TAIL);
1469 pthread_mutex_unlock (&cache_lock);
1472 return send_response(sock, RESP_ERR, "No values updated.\n");
1474 return send_response(sock, RESP_OK,
1475 "errors, enqueued %i value(s).\n", values_num);
1480 } /* }}} int handle_request_update */
1482 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
1484 char *file, file_tmp[PATH_MAX];
1493 unsigned long ds_cnt;
1500 rrd_value_t *data_ptr;
1507 /* Read the arguments */
1510 status = buffer_get_field (&buffer, &buffer_size, &file);
1514 status = buffer_get_field (&buffer, &buffer_size, &cf);
1518 status = buffer_get_field (&buffer, &buffer_size, &start_str);
1526 status = buffer_get_field (&buffer, &buffer_size, &end_str);
1536 return (syntax_error(sock,cmd));
1538 get_abs_path(&file, file_tmp);
1539 if (!check_file_access(file, sock)) return 0;
1541 status = flush_file (file);
1542 if ((status != 0) && (status != ENOENT))
1543 return (send_response (sock, RESP_ERR,
1544 "flush_file (%s) failed with status %i.\n", file, status));
1546 t = time (NULL); /* "now" */
1548 /* Parse start time */
1549 if (start_str != NULL)
1556 value = strtol (start_str, &endptr, /* base = */ 0);
1557 if ((endptr == start_str) || (errno != 0))
1558 return (send_response(sock, RESP_ERR,
1559 "Cannot parse start time `%s': Only simple integers are allowed.\n",
1563 start_tm = (time_t) value;
1565 start_tm = (time_t) (t + value);
1569 start_tm = t - 86400;
1572 /* Parse end time */
1573 if (end_str != NULL)
1580 value = strtol (end_str, &endptr, /* base = */ 0);
1581 if ((endptr == end_str) || (errno != 0))
1582 return (send_response(sock, RESP_ERR,
1583 "Cannot parse end time `%s': Only simple integers are allowed.\n",
1587 end_tm = (time_t) value;
1589 end_tm = (time_t) (t + value);
1601 status = rrd_fetch_r (file, cf, &start_tm, &end_tm, &step,
1602 &ds_cnt, &ds_namv, &data);
1604 return (send_response(sock, RESP_ERR,
1605 "rrd_fetch_r failed: %s\n", rrd_get_error ()));
1607 add_response_info (sock, "FlushVersion: %lu\n", 1);
1608 add_response_info (sock, "Start: %lu\n", (unsigned long) start_tm);
1609 add_response_info (sock, "End: %lu\n", (unsigned long) end_tm);
1610 add_response_info (sock, "Step: %lu\n", step);
1611 add_response_info (sock, "DSCount: %lu\n", ds_cnt);
1613 #define SSTRCAT(buffer,str,buffer_fill) do { \
1614 size_t str_len = strlen (str); \
1615 if ((buffer_fill + str_len) > sizeof (buffer)) \
1616 str_len = sizeof (buffer) - buffer_fill; \
1617 if (str_len > 0) { \
1618 strncpy (buffer + buffer_fill, str, str_len); \
1619 buffer_fill += str_len; \
1620 assert (buffer_fill <= sizeof (buffer)); \
1621 if (buffer_fill == sizeof (buffer)) \
1622 buffer[buffer_fill - 1] = 0; \
1624 buffer[buffer_fill] = 0; \
1628 { /* Add list of DS names */
1630 size_t linebuf_fill;
1632 memset (linebuf, 0, sizeof (linebuf));
1634 for (i = 0; i < ds_cnt; i++)
1637 SSTRCAT (linebuf, " ", linebuf_fill);
1638 SSTRCAT (linebuf, ds_namv[i], linebuf_fill);
1639 rrd_freemem(ds_namv[i]);
1641 rrd_freemem(ds_namv);
1642 add_response_info (sock, "DSName: %s\n", linebuf);
1645 /* Add the actual data */
1648 for (t = start_tm + step; t <= end_tm; t += step)
1651 size_t linebuf_fill;
1654 memset (linebuf, 0, sizeof (linebuf));
1656 for (i = 0; i < ds_cnt; i++)
1658 snprintf (tmp, sizeof (tmp), " %0.10e", *data_ptr);
1659 tmp[sizeof (tmp) - 1] = 0;
1660 SSTRCAT (linebuf, tmp, linebuf_fill);
1665 add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf);
1669 return (send_response (sock, RESP_OK, "Success\n"));
1671 } /* }}} int handle_request_fetch */
1673 /* we came across a "WROTE" entry during journal replay.
1674 * throw away any values that we have accumulated for this file
1676 static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
1679 const char *file = buffer;
1681 pthread_mutex_lock(&cache_lock);
1683 ci = g_tree_lookup(cache_tree, file);
1686 pthread_mutex_unlock(&cache_lock);
1691 rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
1693 wipe_ci_values(ci, now);
1694 remove_from_queue(ci);
1696 pthread_mutex_unlock(&cache_lock);
1698 } /* }}} int handle_request_wrote */
1700 static int handle_request_info (HANDLER_PROTO) /* {{{ */
1702 char *file, file_tmp[PATH_MAX];
1706 /* obtain filename */
1707 status = buffer_get_field(&buffer, &buffer_size, &file);
1709 return syntax_error(sock,cmd);
1710 /* get full pathname */
1711 get_abs_path(&file, file_tmp);
1712 if (!check_file_access(file, sock)) {
1713 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1717 data = rrd_info_r(file);
1719 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1722 switch (data->type) {
1724 if (isnan(data->value.u_val))
1725 add_response_info(sock,"%s %d NaN\n",data->key, data->type);
1727 add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val);
1730 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt);
1733 add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int);
1736 add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str);
1739 add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size);
1744 return send_response(sock, RESP_OK, "Info for %s follows\n",file);
1745 } /* }}} static int handle_request_info */
1747 static int handle_request_first (HANDLER_PROTO) /* {{{ */
1749 char *i, *file, file_tmp[PATH_MAX];
1754 /* obtain filename */
1755 status = buffer_get_field(&buffer, &buffer_size, &file);
1757 return syntax_error(sock,cmd);
1758 /* get full pathname */
1759 get_abs_path(&file, file_tmp);
1760 if (!check_file_access(file, sock)) {
1761 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1764 status = buffer_get_field(&buffer, &buffer_size, &i);
1766 return syntax_error(sock,cmd);
1769 return send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx);
1774 t = rrd_first_r(file,idx);
1776 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1778 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1779 } /* }}} static int handle_request_last */
1781 static int handle_request_last (HANDLER_PROTO) /* {{{ */
1783 char *file, file_tmp[PATH_MAX];
1787 /* obtain filename */
1788 status = buffer_get_field(&buffer, &buffer_size, &file);
1790 return syntax_error(sock,cmd);
1791 /* get full pathname */
1792 get_abs_path(&file, file_tmp);
1793 if (!check_file_access(file, sock)) {
1794 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1798 t = rrd_last_r(file);
1800 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1802 return send_response(sock, RESP_OK, "%lu\n",(unsigned)t);
1803 } /* }}} static int handle_request_last */
1805 static int handle_request_create (HANDLER_PROTO) /* {{{ */
1807 char *file, file_tmp[PATH_MAX];
1813 time_t last_up = time(NULL)-10;
1814 rrd_time_value_t last_up_tv;
1815 char *parsetime_error = NULL;
1816 int no_overwrite = opt_no_overwrite;
1819 /* obtain filename */
1820 status = buffer_get_field(&buffer, &buffer_size, &file);
1822 return syntax_error(sock,cmd);
1823 /* get full pathname */
1824 get_abs_path(&file, file_tmp);
1825 if (!check_file_access(file, sock)) {
1826 return send_response(sock, RESP_ERR, "Cannot read: %s\n", file);
1828 RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file);
1830 status = buffer_get_field(&buffer, &buffer_size, &tok );
1831 for(;(tok && !status);status = buffer_get_field(&buffer, &buffer_size, &tok )) {
1832 if( ! strncmp(tok,"-b",2) ) {
1833 status = buffer_get_field(&buffer, &buffer_size, &tok );
1834 if (status != 0) return syntax_error(sock,cmd);
1835 if ((parsetime_error = rrd_parsetime(tok, &last_up_tv)))
1836 return send_response(sock, RESP_ERR, "start time: %s\n", parsetime_error);
1837 if (last_up_tv.type == RELATIVE_TO_END_TIME ||
1838 last_up_tv.type == RELATIVE_TO_START_TIME) {
1839 return send_response(sock, RESP_ERR, "Cannot specify time relative to start or end here.\n");
1841 last_up = mktime(&last_up_tv.tm) +last_up_tv.offset;
1845 if( ! strncmp(tok,"-s",2) ) {
1846 status = buffer_get_field(&buffer, &buffer_size, &tok );
1847 if (status != 0) return syntax_error(sock,cmd);
1851 if( ! strncmp(tok,"-O",2) ) {
1855 if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; }
1856 if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; }
1857 return syntax_error(sock,cmd);
1860 return send_response(sock, RESP_ERR, "The step size cannot be less than 1 second.\n");
1862 if (last_up < 3600 * 24 * 365 * 10) {
1863 return send_response(sock, RESP_ERR, "The first entry must be after 1980.\n");
1866 rrd_create_set_no_overwrite(no_overwrite);
1868 status = rrd_create_r(file,step,last_up,ac,(const char **)av);
1871 return send_response(sock, RESP_OK, "RRD created OK\n");
1873 return send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error());
1874 } /* }}} static int handle_request_create */
1876 /* start "BATCH" processing */
1877 static int batch_start (HANDLER_PROTO) /* {{{ */
1880 if (sock->batch_start)
1881 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1883 status = send_response(sock, RESP_OK,
1884 "Go ahead. End with dot '.' on its own line.\n");
1885 sock->batch_start = time(NULL);
1886 sock->batch_cmd = 0;
1889 } /* }}} static int batch_start */
1891 /* finish "BATCH" processing and return results to the client */
1892 static int batch_done (HANDLER_PROTO) /* {{{ */
1894 assert(sock->batch_start);
1895 sock->batch_start = 0;
1896 sock->batch_cmd = 0;
1897 return send_response(sock, RESP_OK, "errors\n");
1898 } /* }}} static int batch_done */
1900 static int handle_request_quit (HANDLER_PROTO) /* {{{ */
1903 } /* }}} static int handle_request_quit */
1905 static command_t list_of_commands[] = { /* {{{ */
1908 handle_request_update,
1910 "UPDATE <filename> <values> [<values> ...]\n"
1912 "Adds the given file to the internal cache if it is not yet known and\n"
1913 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1916 "Each <values> has the following form:\n"
1917 " <values> = <time>:<value>[:<value>[...]]\n"
1918 "See the rrdupdate(1) manpage for details.\n"
1922 handle_request_wrote,
1923 CMD_CONTEXT_JOURNAL,
1929 handle_request_flush,
1930 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
1931 "FLUSH <filename>\n"
1933 "Adds the given filename to the head of the update queue and returns\n"
1934 "after it has been dequeued.\n"
1938 handle_request_flushall,
1942 "Triggers writing of all pending updates. Returns immediately.\n"
1946 handle_request_pending,
1948 "PENDING <filename>\n"
1950 "Shows any 'pending' updates for a file, in order.\n"
1951 "The updates shown have not yet been written to the underlying RRD file.\n"
1955 handle_request_forget,
1957 "FORGET <filename>\n"
1959 "Removes the file completely from the cache.\n"
1960 "Any pending updates for the file will be lost.\n"
1964 handle_request_queue,
1968 "Shows all files in the output queue.\n"
1969 "The output is zero or more lines in the following format:\n"
1970 "(where <num_vals> is the number of values to be written)\n"
1972 "<num_vals> <filename>\n"
1976 handle_request_stats,
1980 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1981 "a description of the values.\n"
1985 handle_request_help,
1987 "HELP [<command>]\n",
1988 NULL, /* special! */
1996 "The 'BATCH' command permits the client to initiate a bulk load\n"
1997 " of commands to rrdcached.\n"
2002 " server: 0 Go ahead. End with dot '.' on its own line.\n"
2003 " client: command #1\n"
2004 " client: command #2\n"
2005 " client: ... and so on\n"
2007 " server: 2 errors\n"
2008 " server: 7 message for command #7\n"
2009 " server: 9 message for command #9\n"
2011 "For more information, consult the rrdcached(1) documentation.\n"
2014 ".", /* BATCH terminator */
2022 handle_request_fetch,
2024 "FETCH <file> <CF> [<start> [<end>]]\n"
2026 "The 'FETCH' can be used by the client to retrieve values from an RRD file.\n"
2030 handle_request_info,
2032 "INFO <filename>\n",
2033 "The INFO command retrieves information about a specified RRD file.\n"
2034 "This is returned in standard rrdinfo format, a sequence of lines\n"
2035 "with the format <keyname> = <value>\n"
2036 "Note that this is the data as of the last update of the RRD file itself,\n"
2037 "not the last time data was received via rrdcached, so there may be pending\n"
2038 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2042 handle_request_first,
2044 "FIRST <filename> <rra index>\n",
2045 "The FIRST command retrieves the first data time for a specified RRA in\n"
2050 handle_request_last,
2052 "LAST <filename>\n",
2053 "The LAST command retrieves the last update time for a specified RRD file.\n"
2054 "Note that this is the time of the last update of the RRD file itself, not\n"
2055 "the last time data was received via rrdcached, so there may be pending\n"
2056 "updates in the queue. If this bothers you, then first run a FLUSH.\n"
2060 handle_request_create,
2061 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2062 "CREATE <filename> [-b start] [-s step] [-O] <DS definitions> <RRA definitions>\n",
2063 "The CREATE command will create an RRD file, overwriting any existing file\n"
2064 "unless the -O option is given or rrdcached was started with the -O option.\n"
2065 "The DS and RRA definitions are as for the 'rrdtool create' command.\n"
2069 handle_request_quit,
2070 CMD_CONTEXT_CLIENT | CMD_CONTEXT_BATCH,
2073 "Disconnect from rrdcached.\n"
2075 }; /* }}} command_t list_of_commands[] */
2076 static size_t list_of_commands_len = sizeof (list_of_commands)
2077 / sizeof (list_of_commands[0]);
2079 static command_t *find_command(char *cmd)
2083 for (i = 0; i < list_of_commands_len; i++)
2084 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2085 return (&list_of_commands[i]);
2089 /* We currently use the index in the `list_of_commands' array as a bit position
2090 * in `listen_socket_t.permissions'. This member schould NEVER be accessed from
2091 * outside these functions so that switching to a more elegant storage method
2092 * is easily possible. */
2093 static ssize_t find_command_index (const char *cmd) /* {{{ */
2097 for (i = 0; i < list_of_commands_len; i++)
2098 if (strcasecmp(cmd, list_of_commands[i].cmd) == 0)
2099 return ((ssize_t) i);
2101 } /* }}} ssize_t find_command_index */
2103 static int socket_permission_check (listen_socket_t *sock, /* {{{ */
2108 if (JOURNAL_REPLAY(sock))
2114 if ((strcasecmp ("QUIT", cmd) == 0)
2115 || (strcasecmp ("HELP", cmd) == 0))
2117 else if (strcmp (".", cmd) == 0)
2120 i = find_command_index (cmd);
2125 if ((sock->permissions & (1 << i)) != 0)
2128 } /* }}} int socket_permission_check */
2130 static int socket_permission_add (listen_socket_t *sock, /* {{{ */
2135 i = find_command_index (cmd);
2140 sock->permissions |= (1 << i);
2142 } /* }}} int socket_permission_add */
2144 static void socket_permission_clear (listen_socket_t *sock) /* {{{ */
2146 sock->permissions = 0;
2147 } /* }}} socket_permission_clear */
2149 static void socket_permission_copy (listen_socket_t *dest, /* {{{ */
2150 listen_socket_t *src)
2152 dest->permissions = src->permissions;
2153 } /* }}} socket_permission_copy */
2155 /* check whether commands are received in the expected context */
2156 static int command_check_context(listen_socket_t *sock, command_t *cmd)
2158 if (JOURNAL_REPLAY(sock))
2159 return (cmd->context & CMD_CONTEXT_JOURNAL);
2160 else if (sock->batch_start)
2161 return (cmd->context & CMD_CONTEXT_BATCH);
2163 return (cmd->context & CMD_CONTEXT_CLIENT);
2169 static int handle_request_help (HANDLER_PROTO) /* {{{ */
2174 command_t *help = NULL;
2176 status = buffer_get_field (&buffer, &buffer_size, &cmd_str);
2178 help = find_command(cmd_str);
2180 if (help && (help->syntax || help->help))
2184 snprintf(tmp, sizeof(tmp)-1, "Help for %s\n", help->cmd);
2188 add_response_info(sock, "Usage: %s\n", help->syntax);
2191 add_response_info(sock, "%s\n", help->help);
2197 resp_txt = "Command overview\n";
2199 for (i = 0; i < list_of_commands_len; i++)
2201 if (list_of_commands[i].syntax == NULL)
2203 add_response_info (sock, "%s", list_of_commands[i].syntax);
2207 return send_response(sock, RESP_OK, resp_txt);
2208 } /* }}} int handle_request_help */
2210 static int handle_request (DISPATCH_PROTO) /* {{{ */
2212 char *buffer_ptr = buffer;
2213 char *cmd_str = NULL;
2214 command_t *cmd = NULL;
2217 assert (buffer[buffer_size - 1] == '\0');
2219 status = buffer_get_field (&buffer_ptr, &buffer_size, &cmd_str);
2222 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
2226 if (sock != NULL && sock->batch_start)
2229 cmd = find_command(cmd_str);
2231 return send_response(sock, RESP_ERR, "Unknown command: %s\n", cmd_str);
2233 if (!socket_permission_check (sock, cmd->cmd))
2234 return send_response(sock, RESP_ERR, "Permission denied.\n");
2236 if (!command_check_context(sock, cmd))
2237 return send_response(sock, RESP_ERR, "Can't use '%s' here.\n", cmd_str);
2239 return cmd->handler(cmd, sock, now, buffer_ptr, buffer_size);
2240 } /* }}} int handle_request */
2242 static void journal_set_free (journal_set *js) /* {{{ */
2247 rrd_free_ptrs((void ***) &js->files, &js->files_num);
2250 } /* }}} journal_set_free */
2252 static void journal_set_remove (journal_set *js) /* {{{ */
2257 for (uint i=0; i < js->files_num; i++)
2259 RRDD_LOG(LOG_DEBUG, "removing old journal %s", js->files[i]);
2260 unlink(js->files[i]);
2262 } /* }}} journal_set_remove */
2264 /* close current journal file handle.
2265 * MUST hold journal_lock before calling */
2266 static void journal_close(void) /* {{{ */
2268 if (journal_fh != NULL)
2270 if (fclose(journal_fh) != 0)
2271 RRDD_LOG(LOG_ERR, "cannot close journal: %s", rrd_strerror(errno));
2276 } /* }}} journal_close */
2278 /* MUST hold journal_lock before calling */
2279 static void journal_new_file(void) /* {{{ */
2283 char new_file[PATH_MAX + 1];
2285 assert(journal_dir != NULL);
2286 assert(journal_cur != NULL);
2290 gettimeofday(&now, NULL);
2291 /* this format assures that the files sort in strcmp() order */
2292 snprintf(new_file, PATH_MAX, "%s/%s.%010d.%06d",
2293 journal_dir, JOURNAL_BASE, (int)now.tv_sec, (int)now.tv_usec);
2295 new_fd = open(new_file, O_WRONLY|O_CREAT|O_APPEND,
2296 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
2300 journal_fh = fdopen(new_fd, "a");
2301 if (journal_fh == NULL)
2304 journal_size = ftell(journal_fh);
2305 RRDD_LOG(LOG_DEBUG, "started new journal %s", new_file);
2307 /* record the file in the journal set */
2308 rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, new_file);
2314 "JOURNALING DISABLED: Error while trying to create %s : %s",
2315 new_file, rrd_strerror(errno));
2317 "JOURNALING DISABLED: All values will be flushed at shutdown");
2320 config_flush_at_shutdown = 1;
2322 } /* }}} journal_new_file */
2324 /* MUST NOT hold journal_lock before calling this */
2325 static void journal_rotate(void) /* {{{ */
2327 journal_set *old_js = NULL;
2329 if (journal_dir == NULL)
2332 RRDD_LOG(LOG_DEBUG, "rotating journals");
2334 pthread_mutex_lock(&stats_lock);
2335 ++stats_journal_rotate;
2336 pthread_mutex_unlock(&stats_lock);
2338 pthread_mutex_lock(&journal_lock);
2342 /* rotate the journal sets */
2343 old_js = journal_old;
2344 journal_old = journal_cur;
2345 journal_cur = calloc(1, sizeof(journal_set));
2347 if (journal_cur != NULL)
2350 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2352 pthread_mutex_unlock(&journal_lock);
2354 journal_set_remove(old_js);
2355 journal_set_free (old_js);
2357 } /* }}} static void journal_rotate */
2359 /* MUST hold journal_lock when calling */
2360 static void journal_done(void) /* {{{ */
2362 if (journal_cur == NULL)
2367 if (config_flush_at_shutdown)
2369 RRDD_LOG(LOG_INFO, "removing journals");
2370 journal_set_remove(journal_old);
2371 journal_set_remove(journal_cur);
2375 RRDD_LOG(LOG_INFO, "expedited shutdown; "
2376 "journals will be used at next startup");
2379 journal_set_free(journal_cur);
2380 journal_set_free(journal_old);
2383 } /* }}} static void journal_done */
2385 static int journal_write(char *cmd, char *args) /* {{{ */
2389 if (journal_fh == NULL)
2392 pthread_mutex_lock(&journal_lock);
2393 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
2394 journal_size += chars;
2396 if (journal_size > JOURNAL_MAX)
2399 pthread_mutex_unlock(&journal_lock);
2403 pthread_mutex_lock(&stats_lock);
2404 stats_journal_bytes += chars;
2405 pthread_mutex_unlock(&stats_lock);
2409 } /* }}} static int journal_write */
2411 static int journal_replay (const char *file) /* {{{ */
2417 char entry[CMD_MAX];
2420 if (file == NULL) return 0;
2423 char *reason = "unknown error";
2425 struct stat statbuf;
2427 memset(&statbuf, 0, sizeof(statbuf));
2428 if (stat(file, &statbuf) != 0)
2430 reason = "stat error";
2433 else if (!S_ISREG(statbuf.st_mode))
2435 reason = "not a regular file";
2438 if (statbuf.st_uid != daemon_uid)
2440 reason = "not owned by daemon user";
2443 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
2445 reason = "must not be user/group writable";
2451 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
2452 file, rrd_strerror(status), reason);
2457 fh = fopen(file, "r");
2460 if (errno != ENOENT)
2461 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
2462 file, rrd_strerror(errno));
2466 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
2475 if (fgets(entry, sizeof(entry), fh) == NULL)
2477 entry_len = strlen(entry);
2479 /* check \n termination in case journal writing crashed mid-line */
2482 else if (entry[entry_len - 1] != '\n')
2484 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
2489 entry[entry_len - 1] = '\0';
2491 if (handle_request(NULL, now, entry, entry_len) == 0)
2499 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
2500 entry_cnt, fail_cnt);
2502 return entry_cnt > 0 ? 1 : 0;
2503 } /* }}} static int journal_replay */
2505 static int journal_sort(const void *v1, const void *v2)
2507 char **jn1 = (char **) v1;
2508 char **jn2 = (char **) v2;
2510 return strcmp(*jn1,*jn2);
2513 static void journal_init(void) /* {{{ */
2515 int had_journal = 0;
2517 struct dirent *dent;
2518 char path[PATH_MAX+1];
2520 if (journal_dir == NULL) return;
2522 pthread_mutex_lock(&journal_lock);
2524 journal_cur = calloc(1, sizeof(journal_set));
2525 if (journal_cur == NULL)
2527 RRDD_LOG(LOG_CRIT, "journal_rotate: malloc(journal_set) failed\n");
2531 RRDD_LOG(LOG_INFO, "checking for journal files");
2533 /* Handle old journal files during transition. This gives them the
2534 * correct sort order. TODO: remove after first release
2537 char old_path[PATH_MAX+1];
2538 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".old" );
2539 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0000");
2540 rename(old_path, path);
2542 snprintf(old_path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE );
2543 snprintf(path, PATH_MAX, "%s/%s", journal_dir, JOURNAL_BASE ".0001");
2544 rename(old_path, path);
2547 dir = opendir(journal_dir);
2549 RRDD_LOG(LOG_CRIT, "journal_init: opendir(%s) failed\n", journal_dir);
2552 while ((dent = readdir(dir)) != NULL)
2554 /* looks like a journal file? */
2555 if (strncmp(dent->d_name, JOURNAL_BASE, strlen(JOURNAL_BASE)))
2558 snprintf(path, PATH_MAX, "%s/%s", journal_dir, dent->d_name);
2560 if (!rrd_add_strdup(&journal_cur->files, &journal_cur->files_num, path))
2562 RRDD_LOG(LOG_CRIT, "journal_init: cannot add journal file %s!",
2569 qsort(journal_cur->files, journal_cur->files_num,
2570 sizeof(journal_cur->files[0]), journal_sort);
2572 for (uint i=0; i < journal_cur->files_num; i++)
2573 had_journal += journal_replay(journal_cur->files[i]);
2577 /* it must have been a crash. start a flush */
2578 if (had_journal && config_flush_at_shutdown)
2579 flush_old_values(-1);
2581 pthread_mutex_unlock(&journal_lock);
2583 RRDD_LOG(LOG_INFO, "journal processing complete");
2585 } /* }}} static void journal_init */
2587 static void free_listen_socket(listen_socket_t *sock) /* {{{ */
2589 assert(sock != NULL);
2591 free(sock->rbuf); sock->rbuf = NULL;
2592 free(sock->wbuf); sock->wbuf = NULL;
2594 } /* }}} void free_listen_socket */
2596 static void close_connection(listen_socket_t *sock) /* {{{ */
2604 free_listen_socket(sock);
2606 } /* }}} void close_connection */
2608 static void *connection_thread_main (void *args) /* {{{ */
2610 listen_socket_t *sock;
2613 sock = (listen_socket_t *) args;
2616 /* init read buffers */
2617 sock->next_read = sock->next_cmd = 0;
2618 sock->rbuf = malloc(RBUF_SIZE);
2619 if (sock->rbuf == NULL)
2621 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
2622 close_connection(sock);
2626 pthread_mutex_lock (&connection_threads_lock);
2627 connection_threads_num++;
2628 pthread_mutex_unlock (&connection_threads_lock);
2630 while (state == RUNNING)
2637 struct pollfd pollfd;
2641 pollfd.events = POLLIN | POLLPRI;
2644 status = poll (&pollfd, 1, /* timeout = */ 500);
2645 if (state != RUNNING)
2647 else if (status == 0) /* timeout */
2649 else if (status < 0) /* error */
2652 if (status != EINTR)
2653 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
2657 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
2659 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
2661 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
2662 "poll(2) returned something unexpected: %#04hx",
2667 rbytes = read(fd, sock->rbuf + sock->next_read,
2668 RBUF_SIZE - sock->next_read);
2671 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
2674 else if (rbytes == 0)
2677 sock->next_read += rbytes;
2679 if (sock->batch_start)
2680 now = sock->batch_start;
2684 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
2686 status = handle_request (sock, now, cmd, cmd_len+1);
2693 close_connection(sock);
2695 /* Remove this thread from the connection threads list */
2696 pthread_mutex_lock (&connection_threads_lock);
2697 connection_threads_num--;
2698 if (connection_threads_num <= 0)
2699 pthread_cond_broadcast(&connection_threads_done);
2700 pthread_mutex_unlock (&connection_threads_lock);
2703 } /* }}} void *connection_thread_main */
2705 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
2708 struct sockaddr_un sa;
2709 listen_socket_t *temp;
2712 char *path_copy, *dir;
2715 if (strncmp(path, "unix:", strlen("unix:")) == 0)
2716 path += strlen("unix:");
2718 /* dirname may modify its argument */
2719 path_copy = strdup(path);
2720 if (path_copy == NULL)
2722 fprintf(stderr, "rrdcached: strdup(): %s\n",
2723 rrd_strerror(errno));
2727 dir = dirname(path_copy);
2728 if (rrd_mkdir_p(dir, 0777) != 0)
2730 fprintf(stderr, "Failed to create socket directory '%s': %s\n",
2731 dir, rrd_strerror(errno));
2737 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2738 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2741 fprintf (stderr, "rrdcached: open_listen_socket_unix: realloc failed.\n");
2745 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2747 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2750 fprintf (stderr, "rrdcached: unix socket(2) failed: %s\n",
2751 rrd_strerror(errno));
2755 memset (&sa, 0, sizeof (sa));
2756 sa.sun_family = AF_UNIX;
2757 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2759 /* if we've gotten this far, we own the pid file. any daemon started
2760 * with the same args must not be alive. therefore, ensure that we can
2761 * create the socket...
2765 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2768 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2769 path, rrd_strerror(errno));
2774 /* tweak the sockets group ownership */
2775 if (sock->socket_group != (gid_t)-1)
2777 if ( (chown(path, getuid(), sock->socket_group) != 0) ||
2778 (chmod(path, (S_IRUSR|S_IWUSR|S_IXUSR | S_IRGRP|S_IWGRP)) != 0) )
2780 fprintf(stderr, "rrdcached: failed to set socket group permissions (%s)\n", strerror(errno));
2784 if (sock->socket_permissions != (mode_t)-1)
2786 if (chmod(path, sock->socket_permissions) != 0)
2787 fprintf(stderr, "rrdcached: failed to set socket file permissions (%o): %s\n",
2788 (unsigned int)sock->socket_permissions, strerror(errno));
2791 status = listen (fd, /* backlog = */ 10);
2794 fprintf (stderr, "rrdcached: listen(%s) failed: %s.\n",
2795 path, rrd_strerror(errno));
2801 listen_fds[listen_fds_num].fd = fd;
2802 listen_fds[listen_fds_num].family = PF_UNIX;
2803 strncpy(listen_fds[listen_fds_num].addr, path,
2804 sizeof (listen_fds[listen_fds_num].addr) - 1);
2808 } /* }}} int open_listen_socket_unix */
2810 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2812 struct addrinfo ai_hints;
2813 struct addrinfo *ai_res;
2814 struct addrinfo *ai_ptr;
2815 char addr_copy[NI_MAXHOST];
2820 strncpy (addr_copy, sock->addr, sizeof(addr_copy)-1);
2821 addr_copy[sizeof (addr_copy) - 1] = 0;
2824 memset (&ai_hints, 0, sizeof (ai_hints));
2825 ai_hints.ai_flags = 0;
2826 #ifdef AI_ADDRCONFIG
2827 ai_hints.ai_flags |= AI_ADDRCONFIG;
2829 ai_hints.ai_family = AF_UNSPEC;
2830 ai_hints.ai_socktype = SOCK_STREAM;
2833 if (*addr == '[') /* IPv6+port format */
2835 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2838 port = strchr (addr, ']');
2841 fprintf (stderr, "rrdcached: Malformed address: %s\n", sock->addr);
2849 else if (*port == 0)
2853 fprintf (stderr, "rrdcached: Garbage after address: %s\n", port);
2856 } /* if (*addr == '[') */
2859 port = rindex(addr, ':');
2867 status = getaddrinfo (addr,
2868 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2869 &ai_hints, &ai_res);
2872 fprintf (stderr, "rrdcached: getaddrinfo(%s) failed: %s\n",
2873 addr, gai_strerror (status));
2877 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2880 listen_socket_t *temp;
2883 temp = (listen_socket_t *) rrd_realloc (listen_fds,
2884 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2888 "rrdcached: open_listen_socket_network: realloc failed.\n");
2892 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2894 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2897 fprintf (stderr, "rrdcached: network socket(2) failed: %s.\n",
2898 rrd_strerror(errno));
2902 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2904 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2907 fprintf (stderr, "rrdcached: bind(%s) failed: %s.\n",
2908 sock->addr, rrd_strerror(errno));
2913 status = listen (fd, /* backlog = */ 10);
2916 fprintf (stderr, "rrdcached: listen(%s) failed: %s\n.",
2917 sock->addr, rrd_strerror(errno));
2919 freeaddrinfo(ai_res);
2923 listen_fds[listen_fds_num].fd = fd;
2924 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2926 } /* for (ai_ptr) */
2928 freeaddrinfo(ai_res);
2930 } /* }}} static int open_listen_socket_network */
2932 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2934 assert(sock != NULL);
2935 assert(sock->addr != NULL);
2937 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2938 || sock->addr[0] == '/')
2939 return (open_listen_socket_unix(sock));
2941 return (open_listen_socket_network(sock));
2942 } /* }}} int open_listen_socket */
2944 static int close_listen_sockets (void) /* {{{ */
2948 for (i = 0; i < listen_fds_num; i++)
2950 close (listen_fds[i].fd);
2952 if (listen_fds[i].family == PF_UNIX)
2953 unlink(listen_fds[i].addr);
2961 } /* }}} int close_listen_sockets */
2963 static void *listen_thread_main (void UNUSED(*args)) /* {{{ */
2965 struct pollfd *pollfds;
2970 if (listen_fds_num < 1)
2972 RRDD_LOG(LOG_ERR, "listen_thread_main: no listen_fds !");
2976 pollfds_num = listen_fds_num;
2977 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2978 if (pollfds == NULL)
2980 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2983 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2985 RRDD_LOG(LOG_INFO, "listening for connections");
2987 while (state == RUNNING)
2989 for (i = 0; i < pollfds_num; i++)
2991 pollfds[i].fd = listen_fds[i].fd;
2992 pollfds[i].events = POLLIN | POLLPRI;
2993 pollfds[i].revents = 0;
2996 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2997 if (state != RUNNING)
2999 else if (status == 0) /* timeout */
3001 else if (status < 0) /* error */
3004 if (status != EINTR)
3006 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
3011 for (i = 0; i < pollfds_num; i++)
3013 listen_socket_t *client_sock;
3014 struct sockaddr_storage client_sa;
3015 socklen_t client_sa_size;
3017 pthread_attr_t attr;
3019 if (pollfds[i].revents == 0)
3022 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
3024 RRDD_LOG (LOG_ERR, "listen_thread_main: "
3025 "poll(2) returned something unexpected for listen FD #%i.",
3030 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
3031 if (client_sock == NULL)
3033 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
3036 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
3038 client_sa_size = sizeof (client_sa);
3039 client_sock->fd = accept (pollfds[i].fd,
3040 (struct sockaddr *) &client_sa, &client_sa_size);
3041 if (client_sock->fd < 0)
3043 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
3048 pthread_attr_init (&attr);
3049 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
3051 status = pthread_create (&tid, &attr, connection_thread_main,
3055 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
3056 close_connection(client_sock);
3059 } /* for (pollfds_num) */
3060 } /* while (state == RUNNING) */
3062 RRDD_LOG(LOG_INFO, "starting shutdown");
3064 close_listen_sockets ();
3066 pthread_mutex_lock (&connection_threads_lock);
3067 while (connection_threads_num > 0)
3068 pthread_cond_wait(&connection_threads_done, &connection_threads_lock);
3069 pthread_mutex_unlock (&connection_threads_lock);
3074 } /* }}} void *listen_thread_main */
3076 static int daemonize (void) /* {{{ */
3081 daemon_uid = geteuid();
3083 pid_fd = open_pidfile("create", O_CREAT|O_EXCL|O_WRONLY);
3085 pid_fd = check_pidfile();
3089 /* open all the listen sockets */
3090 if (config_listen_address_list_len > 0)
3092 for (size_t i = 0; i < config_listen_address_list_len; i++)
3093 open_listen_socket (config_listen_address_list[i]);
3095 rrd_free_ptrs((void ***) &config_listen_address_list,
3096 &config_listen_address_list_len);
3100 strncpy(default_socket.addr, RRDCACHED_DEFAULT_ADDRESS,
3101 sizeof(default_socket.addr) - 1);
3102 default_socket.addr[sizeof(default_socket.addr) - 1] = '\0';
3103 open_listen_socket (&default_socket);
3106 if (listen_fds_num < 1)
3108 fprintf (stderr, "rrdcached: FATAL: cannot open any listen sockets\n");
3112 if (!stay_foreground)
3119 fprintf (stderr, "daemonize: fork(2) failed.\n");
3125 /* Become session leader */
3128 /* Open the first three file descriptors to /dev/null */
3133 open ("/dev/null", O_RDWR);
3134 if (dup(0) == -1 || dup(0) == -1){
3135 RRDD_LOG (LOG_ERR, "faild to run dup.\n");
3137 } /* if (!stay_foreground) */
3139 /* Change into the /tmp directory. */
3140 base_dir = (config_base_dir != NULL)
3144 if (chdir (base_dir) != 0)
3146 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
3150 install_signal_handlers();
3152 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
3153 RRDD_LOG(LOG_INFO, "starting up");
3155 cache_tree = g_tree_new_full ((GCompareDataFunc) strcmp, NULL, NULL,
3156 (GDestroyNotify) free_cache_item);
3157 if (cache_tree == NULL)
3159 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
3163 return write_pidfile (pid_fd);
3168 } /* }}} int daemonize */
3170 static int cleanup (void) /* {{{ */
3172 pthread_cond_broadcast (&flush_cond);
3173 pthread_join (flush_thread, NULL);
3175 pthread_cond_broadcast (&queue_cond);
3176 for (int i = 0; i < config_queue_threads; i++)
3177 pthread_join (queue_threads[i], NULL);
3179 if (config_flush_at_shutdown)
3181 assert(cache_queue_head == NULL);
3182 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
3185 free(queue_threads);
3186 free(config_base_dir);
3188 pthread_mutex_lock(&cache_lock);
3189 g_tree_destroy(cache_tree);
3191 pthread_mutex_lock(&journal_lock);
3194 RRDD_LOG(LOG_INFO, "goodbye");
3198 free(config_pid_file);
3201 } /* }}} int cleanup */
3203 static int read_options (int argc, char **argv) /* {{{ */
3208 socket_permission_clear (&default_socket);
3210 default_socket.socket_group = (gid_t)-1;
3211 default_socket.socket_permissions = (mode_t)-1;
3213 while ((option = getopt(argc, argv, "Ogl:s:m:P:f:w:z:t:Bb:p:Fj:a:h?")) != -1)
3218 opt_no_overwrite = 1;
3227 listen_socket_t *new;
3229 new = malloc(sizeof(listen_socket_t));
3232 fprintf(stderr, "read_options: malloc failed.\n");
3235 memset(new, 0, sizeof(listen_socket_t));
3237 strncpy(new->addr, optarg, sizeof(new->addr)-1);
3239 /* Add permissions to the socket {{{ */
3240 if (default_socket.permissions != 0)
3242 socket_permission_copy (new, &default_socket);
3244 else /* if (default_socket.permissions == 0) */
3246 /* Add permission for ALL commands to the socket. */
3248 for (i = 0; i < list_of_commands_len; i++)
3250 status = socket_permission_add (new, list_of_commands[i].cmd);
3253 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3254 "socket failed. This should never happen, ever! Sorry.\n",
3255 list_of_commands[i].cmd);
3260 /* }}} Done adding permissions. */
3262 new->socket_group = default_socket.socket_group;
3263 new->socket_permissions = default_socket.socket_permissions;
3265 if (!rrd_add_ptr((void ***)&config_listen_address_list,
3266 &config_listen_address_list_len, new))
3268 fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
3274 /* set socket group permissions */
3280 group_gid = strtoul(optarg, NULL, 10);
3281 if (errno != EINVAL && group_gid>0)
3283 /* we were passed a number */
3284 grp = getgrgid(group_gid);
3288 grp = getgrnam(optarg);
3293 default_socket.socket_group = grp->gr_gid;
3297 /* no idea what the user wanted... */
3298 fprintf (stderr, "read_options: couldn't map \"%s\" to a group, Sorry\n", optarg);
3304 /* set socket file permissions */
3308 char *endptr = NULL;
3310 tmp = strtol (optarg, &endptr, 8);
3311 if ((endptr == optarg) || (! endptr) || (*endptr != '\0')
3312 || (tmp > 07777) || (tmp < 0)) {
3313 fprintf (stderr, "read_options: Invalid file mode \"%s\".\n",
3318 default_socket.socket_permissions = (mode_t)tmp;
3329 socket_permission_clear (&default_socket);
3331 optcopy = strdup (optarg);
3334 while ((ptr = strtok_r (dummy, ", ", &saveptr)) != NULL)
3337 status = socket_permission_add (&default_socket, ptr);
3340 fprintf (stderr, "read_options: Adding permission \"%s\" to "
3341 "socket failed. Most likely, this permission doesn't "
3342 "exist. Check your command line.\n", ptr);
3355 temp = atoi (optarg);
3357 config_flush_interval = temp;
3360 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
3370 temp = atoi (optarg);
3372 config_write_interval = temp;
3375 fprintf (stderr, "Invalid write interval: %s\n", optarg);
3385 temp = atoi(optarg);
3387 config_write_jitter = temp;
3390 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
3400 threads = atoi(optarg);
3402 config_queue_threads = threads;
3405 fprintf (stderr, "Invalid thread count: -t %s\n", optarg);
3412 config_write_base_only = 1;
3418 char base_realpath[PATH_MAX];
3420 if (config_base_dir != NULL)
3421 free (config_base_dir);
3422 config_base_dir = strdup (optarg);
3423 if (config_base_dir == NULL)
3425 fprintf (stderr, "read_options: strdup failed.\n");
3429 if (rrd_mkdir_p (config_base_dir, 0777) != 0)
3431 fprintf (stderr, "Failed to create base directory '%s': %s\n",
3432 config_base_dir, rrd_strerror (errno));
3436 /* make sure that the base directory is not resolved via
3437 * symbolic links. this makes some performance-enhancing
3438 * assumptions possible (we don't have to resolve paths
3439 * that start with a "/")
3441 if (realpath(config_base_dir, base_realpath) == NULL)
3443 fprintf (stderr, "Failed to canonicalize the base directory '%s': "
3444 "%s\n", config_base_dir, rrd_strerror(errno));
3448 len = strlen (config_base_dir);
3449 while ((len > 0) && (config_base_dir[len - 1] == '/'))
3451 config_base_dir[len - 1] = 0;
3457 fprintf (stderr, "Invalid base directory: %s\n", optarg);
3461 _config_base_dir_len = len;
3463 len = strlen (base_realpath);
3464 while ((len > 0) && (base_realpath[len - 1] == '/'))
3466 base_realpath[len - 1] = '\0';
3470 if (strncmp(config_base_dir,
3471 base_realpath, sizeof(base_realpath)) != 0)
3474 "Base directory (-b) resolved via file system links!\n"
3475 "Please consult rrdcached '-b' documentation!\n"
3476 "Consider specifying the real directory (%s)\n",
3485 if (config_pid_file != NULL)
3486 free (config_pid_file);
3487 config_pid_file = strdup (optarg);
3488 if (config_pid_file == NULL)
3490 fprintf (stderr, "read_options: strdup failed.\n");
3497 config_flush_at_shutdown = 1;
3502 char journal_dir_actual[PATH_MAX];
3504 dir = journal_dir = strdup(realpath((const char *)optarg, journal_dir_actual));
3506 status = rrd_mkdir_p(dir, 0777);
3509 fprintf(stderr, "Failed to create journal directory '%s': %s\n",
3510 dir, rrd_strerror(errno));
3514 if (access(dir, R_OK|W_OK|X_OK) != 0)
3516 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
3517 errno ? rrd_strerror(errno) : "");
3525 int temp = atoi(optarg);
3527 config_alloc_chunk = temp;
3530 fprintf(stderr, "Invalid allocation size: %s\n", optarg);
3538 printf ("RRDCacheD %s\n"
3539 "Copyright (C) 2008,2009 Florian octo Forster and Kevin Brintnall\n"
3541 "Usage: rrdcached [options]\n"
3543 "Valid options are:\n"
3544 " -l <address> Socket address to listen to.\n"
3545 " -P <perms> Sets the permissions to assign to all following "
3547 " -w <seconds> Interval in which to write data.\n"
3548 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
3549 " -t <threads> Number of write threads.\n"
3550 " -f <seconds> Interval in which to flush dead data.\n"
3551 " -p <file> Location of the PID-file.\n"
3552 " -b <dir> Base directory to change to.\n"
3553 " -B Restrict file access to paths within -b <dir>\n"
3554 " -g Do not fork and run in the foreground.\n"
3555 " -j <dir> Directory in which to create the journal files.\n"
3556 " -F Always flush all updates at shutdown\n"
3557 " -s <id|name> Group owner of all following UNIX sockets\n"
3558 " (the socket will also have read/write permissions "
3560 " -m <mode> File permissions (octal) of all following UNIX "
3562 " -a <size> Memory allocation chunk size. Default is 1.\n"
3563 " -O Do not allow CREATE commands to overwrite existing\n"
3564 " files, even if asked to.\n"
3566 "For more information and a detailed description of all options "
3568 "to the rrdcached(1) manual page.\n",
3575 } /* switch (option) */
3576 } /* while (getopt) */
3578 /* advise the user when values are not sane */
3579 if (config_flush_interval < 2 * config_write_interval)
3580 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
3581 " 2x write interval (-w) !\n");
3582 if (config_write_jitter > config_write_interval)
3583 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
3584 " write interval (-w) !\n");
3586 if (config_write_base_only && config_base_dir == NULL)
3587 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
3588 " Consult the rrdcached documentation\n");
3590 if (journal_dir == NULL)
3591 config_flush_at_shutdown = 1;
3594 } /* }}} int read_options */
3596 int main (int argc, char **argv)
3600 status = read_options (argc, argv);
3608 status = daemonize ();
3611 fprintf (stderr, "rrdcached: daemonize failed, exiting.\n");
3617 /* start the queue threads */
3618 queue_threads = calloc(config_queue_threads, sizeof(*queue_threads));
3619 if (queue_threads == NULL)
3621 RRDD_LOG (LOG_ERR, "FATAL: cannot calloc queue threads");
3625 for (int i = 0; i < config_queue_threads; i++)
3627 memset (&queue_threads[i], 0, sizeof (*queue_threads));
3628 status = pthread_create (&queue_threads[i], NULL, queue_thread_main, NULL);
3631 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
3637 /* start the flush thread */
3638 memset(&flush_thread, 0, sizeof(flush_thread));
3639 status = pthread_create (&flush_thread, NULL, flush_thread_main, NULL);
3642 RRDD_LOG (LOG_ERR, "FATAL: cannot create flush thread");
3647 listen_thread_main (NULL);
3654 * vim: set sw=2 sts=2 ts=8 et fdm=marker :