2 * RRDTool - src/rrd_daemon.c
3 * Copyright (C) 2008 Florian octo Forster
4 * Copyright (C) 2008 Kevin Brintnall
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; only version 2 of the License is applicable.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Florian octo Forster <octo at verplant.org>
21 * kevin brintnall <kbrint@rufus.net>
26 * First tell the compiler to stick to the C99 and POSIX standards as close as
29 #ifndef __STRICT_ANSI__ /* {{{ */
30 # define __STRICT_ANSI__
33 #ifndef _ISOC99_SOURCE
34 # define _ISOC99_SOURCE
37 #ifdef _POSIX_C_SOURCE
38 # undef _POSIX_C_SOURCE
40 #define _POSIX_C_SOURCE 200112L
42 /* Single UNIX needed for strdup. */
46 #define _XOPEN_SOURCE 500
63 * Now for some includes..
65 #include "rrd.h" /* {{{ */
66 #include "rrd_client.h"
77 #include <sys/types.h>
81 #include <sys/socket.h>
92 #include <glib-2.0/glib.h>
95 #define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
98 # define __attribute__(x) /**/
110 typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code;
112 struct listen_socket_s
115 char addr[PATH_MAX + 1];
117 socket_privilege privilege;
119 /* state for BATCH processing */
131 typedef struct listen_socket_s listen_socket_t;
134 typedef struct cache_item_s cache_item_t;
140 time_t last_flush_time;
141 time_t last_update_stamp;
142 #define CI_FLAGS_IN_TREE (1<<0)
143 #define CI_FLAGS_IN_QUEUE (1<<1)
145 pthread_cond_t flushed;
150 struct callback_flush_data_s
157 typedef struct callback_flush_data_s callback_flush_data_t;
164 typedef enum queue_side_e queue_side_t;
166 /* max length of socket command or response */
168 #define RBUF_SIZE (CMD_MAX*2)
173 static int stay_foreground = 0;
174 static uid_t daemon_uid;
176 static listen_socket_t *listen_fds = NULL;
177 static size_t listen_fds_num = 0;
179 static int do_shutdown = 0;
181 static pthread_t queue_thread;
183 static pthread_t *connection_threads = NULL;
184 static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER;
185 static int connection_threads_num = 0;
188 static GTree *cache_tree = NULL;
189 static cache_item_t *cache_queue_head = NULL;
190 static cache_item_t *cache_queue_tail = NULL;
191 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
192 static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER;
194 static int config_write_interval = 300;
195 static int config_write_jitter = 0;
196 static int config_flush_interval = 3600;
197 static int config_flush_at_shutdown = 0;
198 static char *config_pid_file = NULL;
199 static char *config_base_dir = NULL;
200 static size_t _config_base_dir_len = 0;
201 static int config_write_base_only = 0;
203 static listen_socket_t **config_listen_address_list = NULL;
204 static int config_listen_address_list_len = 0;
206 static uint64_t stats_queue_length = 0;
207 static uint64_t stats_updates_received = 0;
208 static uint64_t stats_flush_received = 0;
209 static uint64_t stats_updates_written = 0;
210 static uint64_t stats_data_sets_written = 0;
211 static uint64_t stats_journal_bytes = 0;
212 static uint64_t stats_journal_rotate = 0;
213 static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
215 /* Journaled updates */
216 static char *journal_cur = NULL;
217 static char *journal_old = NULL;
218 static FILE *journal_fh = NULL;
219 static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER;
220 static int journal_write(char *cmd, char *args);
221 static void journal_done(void);
222 static void journal_rotate(void);
227 static void sig_common (const char *sig) /* {{{ */
229 RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
231 pthread_cond_broadcast(&cache_cond);
232 } /* }}} void sig_common */
234 static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
237 } /* }}} void sig_int_handler */
239 static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
242 } /* }}} void sig_term_handler */
244 static void sig_usr1_handler (int s __attribute__((unused))) /* {{{ */
246 config_flush_at_shutdown = 1;
248 } /* }}} void sig_usr1_handler */
250 static void sig_usr2_handler (int s __attribute__((unused))) /* {{{ */
252 config_flush_at_shutdown = 0;
254 } /* }}} void sig_usr2_handler */
256 static void install_signal_handlers(void) /* {{{ */
258 /* These structures are static, because `sigaction' behaves weird if the are
260 static struct sigaction sa_int;
261 static struct sigaction sa_term;
262 static struct sigaction sa_pipe;
263 static struct sigaction sa_usr1;
264 static struct sigaction sa_usr2;
266 /* Install signal handlers */
267 memset (&sa_int, 0, sizeof (sa_int));
268 sa_int.sa_handler = sig_int_handler;
269 sigaction (SIGINT, &sa_int, NULL);
271 memset (&sa_term, 0, sizeof (sa_term));
272 sa_term.sa_handler = sig_term_handler;
273 sigaction (SIGTERM, &sa_term, NULL);
275 memset (&sa_pipe, 0, sizeof (sa_pipe));
276 sa_pipe.sa_handler = SIG_IGN;
277 sigaction (SIGPIPE, &sa_pipe, NULL);
279 memset (&sa_pipe, 0, sizeof (sa_usr1));
280 sa_usr1.sa_handler = sig_usr1_handler;
281 sigaction (SIGUSR1, &sa_usr1, NULL);
283 memset (&sa_usr2, 0, sizeof (sa_usr2));
284 sa_usr2.sa_handler = sig_usr2_handler;
285 sigaction (SIGUSR2, &sa_usr2, NULL);
287 } /* }}} void install_signal_handlers */
289 static int open_pidfile(void) /* {{{ */
294 file = (config_pid_file != NULL)
296 : LOCALSTATEDIR "/run/rrdcached.pid";
298 fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH);
300 fprintf(stderr, "FATAL: cannot create '%s' (%s)\n",
301 file, rrd_strerror(errno));
304 } /* }}} static int open_pidfile */
306 static int write_pidfile (int fd) /* {{{ */
313 fh = fdopen (fd, "w");
316 RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed.");
321 fprintf (fh, "%i\n", (int) pid);
325 } /* }}} int write_pidfile */
327 static int remove_pidfile (void) /* {{{ */
332 file = (config_pid_file != NULL)
334 : LOCALSTATEDIR "/run/rrdcached.pid";
336 status = unlink (file);
340 } /* }}} int remove_pidfile */
342 static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
346 eol = memchr(sock->rbuf + sock->next_cmd, '\n',
347 sock->next_read - sock->next_cmd);
351 /* no commands left, move remainder back to front of rbuf */
352 memmove(sock->rbuf, sock->rbuf + sock->next_cmd,
353 sock->next_read - sock->next_cmd);
354 sock->next_read -= sock->next_cmd;
361 char *cmd = sock->rbuf + sock->next_cmd;
364 sock->next_cmd = eol - sock->rbuf + 1;
366 if (eol > sock->rbuf && *(eol-1) == '\r')
367 *(--eol) = '\0'; /* handle "\r\n" EOL */
378 /* add the characters directly to the write buffer */
379 static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
383 assert(sock != NULL);
385 new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1);
388 RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
392 strncpy(new_buf + sock->wbuf_len, str, len + 1);
394 sock->wbuf = new_buf;
395 sock->wbuf_len += len;
398 } /* }}} static int add_to_wbuf */
400 /* add the text to the "extra" info that's sent after the status line */
401 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
404 char buffer[CMD_MAX];
407 if (sock == NULL) return 0; /* journal replay mode */
408 if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
411 #ifdef HAVE_VSNPRINTF
412 len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp);
414 len = vsprintf(buffer, fmt, argp);
419 RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed");
423 return add_to_wbuf(sock, buffer, len);
424 } /* }}} static int add_response_info */
426 static int count_lines(char *str) /* {{{ */
432 while ((str = strchr(str, '\n')) != NULL)
440 } /* }}} static int count_lines */
442 /* send the response back to the user.
443 * returns 0 on success, -1 on error
444 * write buffer is always zeroed after this call */
445 static int send_response (listen_socket_t *sock, response_code rc,
446 char *fmt, ...) /* {{{ */
449 char buffer[CMD_MAX];
454 if (sock == NULL) return rc; /* journal replay mode */
456 if (sock->batch_start)
459 return rc; /* no response on success during BATCH */
460 lines = sock->batch_cmd;
462 else if (rc == RESP_OK)
463 lines = count_lines(sock->wbuf);
467 rclen = sprintf(buffer, "%d ", lines);
469 #ifdef HAVE_VSNPRINTF
470 len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp);
472 len = vsprintf(buffer+rclen, fmt, argp);
480 /* append the result to the wbuf, don't write to the user */
481 if (sock->batch_start)
482 return add_to_wbuf(sock, buffer, len);
484 /* first write must be complete */
485 if (len != write(sock->fd, buffer, len))
487 RRDD_LOG(LOG_INFO, "send_response: could not write status message");
491 if (sock->wbuf != NULL && rc == RESP_OK)
494 while (wrote < sock->wbuf_len)
496 ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
499 RRDD_LOG(LOG_INFO, "send_response: could not write results");
506 free(sock->wbuf); sock->wbuf = NULL;
512 static void wipe_ci_values(cache_item_t *ci, time_t when)
517 ci->last_flush_time = when;
518 if (config_write_jitter > 0)
519 ci->last_flush_time += (random() % config_write_jitter);
523 * remove a "cache_item_t" item from the queue.
524 * must hold 'cache_lock' when calling this
526 static void remove_from_queue(cache_item_t *ci) /* {{{ */
528 if (ci == NULL) return;
530 if (ci->prev == NULL)
531 cache_queue_head = ci->next; /* reset head */
533 ci->prev->next = ci->next;
535 if (ci->next == NULL)
536 cache_queue_tail = ci->prev; /* reset the tail */
538 ci->next->prev = ci->prev;
540 ci->next = ci->prev = NULL;
541 ci->flags &= ~CI_FLAGS_IN_QUEUE;
542 } /* }}} static void remove_from_queue */
544 /* remove an entry from the tree and free all its resources.
545 * must hold 'cache lock' while calling this.
546 * returns 0 on success, otherwise errno */
547 static int forget_file(const char *file)
551 ci = g_tree_lookup(cache_tree, file);
555 g_tree_remove (cache_tree, file);
556 remove_from_queue(ci);
558 for (int i=0; i < ci->values_num; i++)
564 /* in case anyone is waiting */
565 pthread_cond_broadcast(&ci->flushed);
570 } /* }}} static int forget_file */
573 * enqueue_cache_item:
574 * `cache_lock' must be acquired before calling this function!
576 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
582 if (ci->values_num == 0)
587 if (cache_queue_head == ci)
590 /* remove from the double linked list */
591 if (ci->flags & CI_FLAGS_IN_QUEUE)
592 remove_from_queue(ci);
595 ci->next = cache_queue_head;
596 if (ci->next != NULL)
598 cache_queue_head = ci;
600 if (cache_queue_tail == NULL)
601 cache_queue_tail = cache_queue_head;
603 else /* (side == TAIL) */
605 /* We don't move values back in the list.. */
606 if (ci->flags & CI_FLAGS_IN_QUEUE)
609 assert (ci->next == NULL);
610 assert (ci->prev == NULL);
612 ci->prev = cache_queue_tail;
614 if (cache_queue_tail == NULL)
615 cache_queue_head = ci;
617 cache_queue_tail->next = ci;
619 cache_queue_tail = ci;
622 ci->flags |= CI_FLAGS_IN_QUEUE;
624 pthread_cond_broadcast(&cache_cond);
625 pthread_mutex_lock (&stats_lock);
626 stats_queue_length++;
627 pthread_mutex_unlock (&stats_lock);
630 } /* }}} int enqueue_cache_item */
633 * tree_callback_flush:
634 * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
635 * while this is in progress.
637 static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
641 callback_flush_data_t *cfd;
643 ci = (cache_item_t *) value;
644 cfd = (callback_flush_data_t *) data;
646 if ((ci->last_flush_time <= cfd->abs_timeout)
647 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
648 && (ci->values_num > 0))
650 enqueue_cache_item (ci, TAIL);
652 else if ((do_shutdown != 0)
653 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
654 && (ci->values_num > 0))
656 enqueue_cache_item (ci, TAIL);
658 else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
659 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
660 && (ci->values_num <= 0))
664 temp = (char **) realloc (cfd->keys,
665 sizeof (char *) * (cfd->keys_num + 1));
668 RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
672 /* Make really sure this points to the _same_ place */
673 assert ((char *) key == ci->file);
674 cfd->keys[cfd->keys_num] = (char *) key;
679 } /* }}} gboolean tree_callback_flush */
681 static int flush_old_values (int max_age)
683 callback_flush_data_t cfd;
686 memset (&cfd, 0, sizeof (cfd));
687 /* Pass the current time as user data so that we don't need to call
688 * `time' for each node. */
689 cfd.now = time (NULL);
694 cfd.abs_timeout = cfd.now - max_age;
696 cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1;
698 /* `tree_callback_flush' will return the keys of all values that haven't
699 * been touched in the last `config_flush_interval' seconds in `cfd'.
700 * The char*'s in this array point to the same memory as ci->file, so we
701 * don't need to free them separately. */
702 g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
704 for (k = 0; k < cfd.keys_num; k++)
706 /* should never fail, since we have held the cache_lock
708 assert( forget_file(cfd.keys[k]) == 0 );
711 if (cfd.keys != NULL)
718 } /* int flush_old_values */
720 static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
723 struct timespec next_flush;
724 int final_flush = 0; /* make sure we only flush once on shutdown */
726 gettimeofday (&now, NULL);
727 next_flush.tv_sec = now.tv_sec + config_flush_interval;
728 next_flush.tv_nsec = 1000 * now.tv_usec;
730 pthread_mutex_lock (&cache_lock);
731 while ((do_shutdown == 0) || (cache_queue_head != NULL))
740 /* First, check if it's time to do the cache flush. */
741 gettimeofday (&now, NULL);
742 if ((now.tv_sec > next_flush.tv_sec)
743 || ((now.tv_sec == next_flush.tv_sec)
744 && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
746 /* Flush all values that haven't been written in the last
747 * `config_write_interval' seconds. */
748 flush_old_values (config_write_interval);
750 /* Determine the time of the next cache flush. */
752 now.tv_sec + next_flush.tv_sec % config_flush_interval;
754 /* unlock the cache while we rotate so we don't block incoming
755 * updates if the fsync() blocks on disk I/O */
756 pthread_mutex_unlock(&cache_lock);
758 pthread_mutex_lock(&cache_lock);
761 /* Now, check if there's something to store away. If not, wait until
762 * something comes in or it's time to do the cache flush. if we are
763 * shutting down, do not wait around. */
764 if (cache_queue_head == NULL && !do_shutdown)
766 status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
767 if ((status != 0) && (status != ETIMEDOUT))
769 RRDD_LOG (LOG_ERR, "queue_thread_main: "
770 "pthread_cond_timedwait returned %i.", status);
774 /* We're about to shut down */
775 if (do_shutdown != 0 && !final_flush++)
777 if (config_flush_at_shutdown)
778 flush_old_values (-1); /* flush everything */
783 /* Check if a value has arrived. This may be NULL if we timed out or there
784 * was an interrupt such as a signal. */
785 if (cache_queue_head == NULL)
788 ci = cache_queue_head;
790 /* copy the relevant parts */
791 file = strdup (ci->file);
794 RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
798 assert(ci->values != NULL);
799 assert(ci->values_num > 0);
802 values_num = ci->values_num;
804 wipe_ci_values(ci, time(NULL));
805 remove_from_queue(ci);
807 pthread_mutex_lock (&stats_lock);
808 assert (stats_queue_length > 0);
809 stats_queue_length--;
810 pthread_mutex_unlock (&stats_lock);
812 pthread_mutex_unlock (&cache_lock);
815 status = rrd_update_r (file, NULL, values_num, (void *) values);
818 RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
819 "rrd_update_r (%s) failed with status %i. (%s)",
820 file, status, rrd_get_error());
823 journal_write("wrote", file);
824 pthread_cond_broadcast(&ci->flushed);
826 for (i = 0; i < values_num; i++)
834 pthread_mutex_lock (&stats_lock);
835 stats_updates_written++;
836 stats_data_sets_written += values_num;
837 pthread_mutex_unlock (&stats_lock);
840 pthread_mutex_lock (&cache_lock);
842 /* We're about to shut down */
843 if (do_shutdown != 0 && !final_flush++)
845 if (config_flush_at_shutdown)
846 flush_old_values (-1); /* flush everything */
850 } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
851 pthread_mutex_unlock (&cache_lock);
853 if (config_flush_at_shutdown)
855 assert(cache_queue_head == NULL);
856 RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed");
862 } /* }}} void *queue_thread_main */
864 static int buffer_get_field (char **buffer_ret, /* {{{ */
865 size_t *buffer_size_ret, char **field_ret)
874 buffer = *buffer_ret;
876 buffer_size = *buffer_size_ret;
880 if (buffer_size <= 0)
883 /* This is ensured by `handle_request'. */
884 assert (buffer[buffer_size - 1] == '\0');
887 while (buffer_pos < buffer_size)
889 /* Check for end-of-field or end-of-buffer */
890 if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0')
892 field[field_size] = 0;
898 /* Handle escaped characters. */
899 else if (buffer[buffer_pos] == '\\')
901 if (buffer_pos >= (buffer_size - 1))
904 field[field_size] = buffer[buffer_pos];
908 /* Normal operation */
911 field[field_size] = buffer[buffer_pos];
915 } /* while (buffer_pos < buffer_size) */
920 *buffer_ret = buffer + buffer_pos;
921 *buffer_size_ret = buffer_size - buffer_pos;
925 } /* }}} int buffer_get_field */
927 /* if we're restricting writes to the base directory,
928 * check whether the file falls within the dir
929 * returns 1 if OK, otherwise 0
931 static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */
933 assert(file != NULL);
935 if (!config_write_base_only
936 || sock == NULL /* journal replay */
937 || config_base_dir == NULL)
940 if (strstr(file, "../") != NULL) goto err;
942 /* relative paths without "../" are ok */
943 if (*file != '/') return 1;
945 /* file must be of the format base + "/" + <1+ char filename> */
946 if (strlen(file) < _config_base_dir_len + 2) goto err;
947 if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) goto err;
948 if (*(file + _config_base_dir_len) != '/') goto err;
953 if (sock != NULL && sock->fd >= 0)
954 send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
957 } /* }}} static int check_file_access */
959 /* when using a base dir, convert relative paths to absolute paths.
960 * if necessary, modifies the "filename" pointer to point
961 * to the new path created in "tmp". "tmp" is provided
962 * by the caller and sizeof(tmp) must be >= PATH_MAX.
964 * this allows us to optimize for the expected case (absolute path)
967 static void get_abs_path(char **filename, char *tmp)
970 assert(filename != NULL && *filename != NULL);
972 if (config_base_dir == NULL || **filename == '/')
975 snprintf(tmp, PATH_MAX, "%s/%s", config_base_dir, *filename);
977 } /* }}} static int get_abs_path */
979 /* returns 1 if we have the required privilege level,
980 * otherwise issue an error to the user on sock */
981 static int has_privilege (listen_socket_t *sock, /* {{{ */
982 socket_privilege priv)
984 if (sock == NULL) /* journal replay */
987 if (sock->privilege >= priv)
990 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES));
991 } /* }}} static int has_privilege */
993 static int flush_file (const char *filename) /* {{{ */
997 pthread_mutex_lock (&cache_lock);
999 ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
1002 pthread_mutex_unlock (&cache_lock);
1006 if (ci->values_num > 0)
1008 /* Enqueue at head */
1009 enqueue_cache_item (ci, HEAD);
1010 pthread_cond_wait(&ci->flushed, &cache_lock);
1013 /* DO NOT DO ANYTHING WITH ci HERE!! The entry
1014 * may have been purged during our cond_wait() */
1016 pthread_mutex_unlock(&cache_lock);
1019 } /* }}} int flush_file */
1021 static int handle_request_help (listen_socket_t *sock, /* {{{ */
1022 char *buffer, size_t buffer_size)
1028 char *help_help[2] =
1030 "Command overview\n"
1032 "HELP [<command>]\n"
1033 "FLUSH <filename>\n"
1035 "PENDING <filename>\n"
1036 "FORGET <filename>\n"
1037 "UPDATE <filename> <values> [<values> ...]\n"
1042 char *help_flush[2] =
1046 "Usage: FLUSH <filename>\n"
1048 "Adds the given filename to the head of the update queue and returns\n"
1049 "after is has been dequeued.\n"
1052 char *help_flushall[2] =
1054 "Help for FLUSHALL\n"
1058 "Triggers writing of all pending updates. Returns immediately.\n"
1061 char *help_pending[2] =
1063 "Help for PENDING\n"
1065 "Usage: PENDING <filename>\n"
1067 "Shows any 'pending' updates for a file, in order.\n"
1068 "The updates shown have not yet been written to the underlying RRD file.\n"
1071 char *help_forget[2] =
1075 "Usage: FORGET <filename>\n"
1077 "Removes the file completely from the cache.\n"
1078 "Any pending updates for the file will be lost.\n"
1081 char *help_update[2] =
1085 "Usage: UPDATE <filename> <values> [<values> ...]\n"
1087 "Adds the given file to the internal cache if it is not yet known and\n"
1088 "appends the given value(s) to the entry. See the rrdcached(1) manpage\n"
1091 "Each <values> has the following form:\n"
1092 " <values> = <time>:<value>[:<value>[...]]\n"
1093 "See the rrdupdate(1) manpage for details.\n"
1096 char *help_stats[2] =
1102 "Returns some performance counters, see the rrdcached(1) manpage for\n"
1103 "a description of the values.\n"
1106 char *help_batch[2] =
1110 "The 'BATCH' command permits the client to initiate a bulk load\n"
1111 " of commands to rrdcached.\n"
1116 " server: 0 Go ahead. End with dot '.' on its own line.\n"
1117 " client: command #1\n"
1118 " client: command #2\n"
1119 " client: ... and so on\n"
1121 " server: 2 errors\n"
1122 " server: 7 message for command #7\n"
1123 " server: 9 message for command #9\n"
1125 "For more information, consult the rrdcached(1) documentation.\n"
1128 status = buffer_get_field (&buffer, &buffer_size, &command);
1130 help_text = help_help;
1133 if (strcasecmp (command, "update") == 0)
1134 help_text = help_update;
1135 else if (strcasecmp (command, "flush") == 0)
1136 help_text = help_flush;
1137 else if (strcasecmp (command, "flushall") == 0)
1138 help_text = help_flushall;
1139 else if (strcasecmp (command, "pending") == 0)
1140 help_text = help_pending;
1141 else if (strcasecmp (command, "forget") == 0)
1142 help_text = help_forget;
1143 else if (strcasecmp (command, "stats") == 0)
1144 help_text = help_stats;
1145 else if (strcasecmp (command, "batch") == 0)
1146 help_text = help_batch;
1148 help_text = help_help;
1151 add_response_info(sock, help_text[1]);
1152 return send_response(sock, RESP_OK, help_text[0]);
1153 } /* }}} int handle_request_help */
1155 static int handle_request_stats (listen_socket_t *sock) /* {{{ */
1157 uint64_t copy_queue_length;
1158 uint64_t copy_updates_received;
1159 uint64_t copy_flush_received;
1160 uint64_t copy_updates_written;
1161 uint64_t copy_data_sets_written;
1162 uint64_t copy_journal_bytes;
1163 uint64_t copy_journal_rotate;
1165 uint64_t tree_nodes_number;
1166 uint64_t tree_depth;
1168 pthread_mutex_lock (&stats_lock);
1169 copy_queue_length = stats_queue_length;
1170 copy_updates_received = stats_updates_received;
1171 copy_flush_received = stats_flush_received;
1172 copy_updates_written = stats_updates_written;
1173 copy_data_sets_written = stats_data_sets_written;
1174 copy_journal_bytes = stats_journal_bytes;
1175 copy_journal_rotate = stats_journal_rotate;
1176 pthread_mutex_unlock (&stats_lock);
1178 pthread_mutex_lock (&cache_lock);
1179 tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
1180 tree_depth = (uint64_t) g_tree_height (cache_tree);
1181 pthread_mutex_unlock (&cache_lock);
1183 add_response_info(sock,
1184 "QueueLength: %"PRIu64"\n", copy_queue_length);
1185 add_response_info(sock,
1186 "UpdatesReceived: %"PRIu64"\n", copy_updates_received);
1187 add_response_info(sock,
1188 "FlushesReceived: %"PRIu64"\n", copy_flush_received);
1189 add_response_info(sock,
1190 "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
1191 add_response_info(sock,
1192 "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
1193 add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
1194 add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth);
1195 add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes);
1196 add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate);
1198 send_response(sock, RESP_OK, "Statistics follow\n");
1201 } /* }}} int handle_request_stats */
1203 static int handle_request_flush (listen_socket_t *sock, /* {{{ */
1204 char *buffer, size_t buffer_size)
1206 char *file, file_tmp[PATH_MAX];
1209 status = buffer_get_field (&buffer, &buffer_size, &file);
1212 return send_response(sock, RESP_ERR, "Usage: flush <filename>\n");
1216 pthread_mutex_lock(&stats_lock);
1217 stats_flush_received++;
1218 pthread_mutex_unlock(&stats_lock);
1220 get_abs_path(&file, file_tmp);
1221 if (!check_file_access(file, sock)) return 0;
1223 status = flush_file (file);
1225 return send_response(sock, RESP_OK, "Successfully flushed %s.\n", file);
1226 else if (status == ENOENT)
1228 /* no file in our tree; see whether it exists at all */
1229 struct stat statbuf;
1231 memset(&statbuf, 0, sizeof(statbuf));
1232 if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode))
1233 return send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file);
1235 return send_response(sock, RESP_ERR, "No such file: %s.\n", file);
1237 else if (status < 0)
1238 return send_response(sock, RESP_ERR, "Internal error.\n");
1240 return send_response(sock, RESP_ERR, "Failed with status %i.\n", status);
1245 } /* }}} int handle_request_flush */
1247 static int handle_request_flushall(listen_socket_t *sock) /* {{{ */
1251 status = has_privilege(sock, PRIV_HIGH);
1255 RRDD_LOG(LOG_DEBUG, "Received FLUSHALL");
1257 pthread_mutex_lock(&cache_lock);
1258 flush_old_values(-1);
1259 pthread_mutex_unlock(&cache_lock);
1261 return send_response(sock, RESP_OK, "Started flush.\n");
1262 } /* }}} static int handle_request_flushall */
1264 static int handle_request_pending(listen_socket_t *sock, /* {{{ */
1265 char *buffer, size_t buffer_size)
1268 char *file, file_tmp[PATH_MAX];
1271 status = buffer_get_field(&buffer, &buffer_size, &file);
1273 return send_response(sock, RESP_ERR,
1274 "Usage: PENDING <filename>\n");
1276 status = has_privilege(sock, PRIV_HIGH);
1280 get_abs_path(&file, file_tmp);
1282 pthread_mutex_lock(&cache_lock);
1283 ci = g_tree_lookup(cache_tree, file);
1286 pthread_mutex_unlock(&cache_lock);
1287 return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
1290 for (int i=0; i < ci->values_num; i++)
1291 add_response_info(sock, "%s\n", ci->values[i]);
1293 pthread_mutex_unlock(&cache_lock);
1294 return send_response(sock, RESP_OK, "updates pending\n");
1295 } /* }}} static int handle_request_pending */
1297 static int handle_request_forget(listen_socket_t *sock, /* {{{ */
1298 char *buffer, size_t buffer_size)
1301 char *file, file_tmp[PATH_MAX];
1303 status = buffer_get_field(&buffer, &buffer_size, &file);
1305 return send_response(sock, RESP_ERR,
1306 "Usage: FORGET <filename>\n");
1308 status = has_privilege(sock, PRIV_HIGH);
1312 get_abs_path(&file, file_tmp);
1313 if (!check_file_access(file, sock)) return 0;
1315 pthread_mutex_lock(&cache_lock);
1316 status = forget_file(file);
1317 pthread_mutex_unlock(&cache_lock);
1322 journal_write("forget", file);
1324 return send_response(sock, RESP_OK, "Gone!\n");
1327 return send_response(sock, RESP_ERR, "cannot forget: %s\n",
1328 status < 0 ? "Internal error" : rrd_strerror(status));
1332 } /* }}} static int handle_request_forget */
1334 static int handle_request_update (listen_socket_t *sock, /* {{{ */
1336 char *buffer, size_t buffer_size)
1338 char *file, file_tmp[PATH_MAX];
1340 int bad_timestamps = 0;
1342 char orig_buf[CMD_MAX];
1346 status = has_privilege(sock, PRIV_HIGH);
1350 /* save it for the journal later */
1351 strncpy(orig_buf, buffer, sizeof(orig_buf)-1);
1353 status = buffer_get_field (&buffer, &buffer_size, &file);
1355 return send_response(sock, RESP_ERR,
1356 "Usage: UPDATE <filename> <values> [<values> ...]\n");
1358 pthread_mutex_lock(&stats_lock);
1359 stats_updates_received++;
1360 pthread_mutex_unlock(&stats_lock);
1362 get_abs_path(&file, file_tmp);
1363 if (!check_file_access(file, sock)) return 0;
1365 pthread_mutex_lock (&cache_lock);
1366 ci = g_tree_lookup (cache_tree, file);
1368 if (ci == NULL) /* {{{ */
1370 struct stat statbuf;
1372 /* don't hold the lock while we setup; stat(2) might block */
1373 pthread_mutex_unlock(&cache_lock);
1375 memset (&statbuf, 0, sizeof (statbuf));
1376 status = stat (file, &statbuf);
1379 RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file);
1382 if (status == ENOENT)
1383 return send_response(sock, RESP_ERR, "No such file: %s\n", file);
1385 return send_response(sock, RESP_ERR,
1386 "stat failed with error %i.\n", status);
1388 if (!S_ISREG (statbuf.st_mode))
1389 return send_response(sock, RESP_ERR, "Not a regular file: %s\n", file);
1391 if (access(file, R_OK|W_OK) != 0)
1392 return send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n",
1393 file, rrd_strerror(errno));
1395 ci = (cache_item_t *) malloc (sizeof (cache_item_t));
1398 RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
1400 return send_response(sock, RESP_ERR, "malloc failed.\n");
1402 memset (ci, 0, sizeof (cache_item_t));
1404 ci->file = strdup (file);
1405 if (ci->file == NULL)
1408 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1410 return send_response(sock, RESP_ERR, "strdup failed.\n");
1413 wipe_ci_values(ci, now);
1414 ci->flags = CI_FLAGS_IN_TREE;
1416 pthread_mutex_lock(&cache_lock);
1417 g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
1419 assert (ci != NULL);
1421 /* don't re-write updates in replay mode */
1423 journal_write("update", orig_buf);
1425 while (buffer_size > 0)
1432 status = buffer_get_field (&buffer, &buffer_size, &value);
1435 RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
1439 /* make sure update time is always moving forward */
1440 stamp = strtol(value, &eostamp, 10);
1441 if (eostamp == value || eostamp == NULL || *eostamp != ':')
1444 add_response_info(sock, "Cannot find timestamp in '%s'!\n", value);
1447 else if (stamp <= ci->last_update_stamp)
1450 add_response_info(sock,
1451 "illegal attempt to update using time %ld when"
1452 " last update time is %ld (minimum one second step)\n",
1453 stamp, ci->last_update_stamp);
1457 ci->last_update_stamp = stamp;
1459 temp = (char **) realloc (ci->values,
1460 sizeof (char *) * (ci->values_num + 1));
1463 RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
1468 ci->values[ci->values_num] = strdup (value);
1469 if (ci->values[ci->values_num] == NULL)
1471 RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
1479 if (((now - ci->last_flush_time) >= config_write_interval)
1480 && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
1481 && (ci->values_num > 0))
1483 enqueue_cache_item (ci, TAIL);
1486 pthread_mutex_unlock (&cache_lock);
1490 /* if we had only one update attempt, then return the full
1491 error message... try to get the most information out
1492 of the limited error space allowed by the protocol
1494 if (bad_timestamps == 1)
1495 return send_response(sock, RESP_ERR, "%s", sock->wbuf);
1497 return send_response(sock, RESP_ERR,
1498 "No values updated (%d bad timestamps).\n",
1502 return send_response(sock, RESP_OK,
1503 "errors, enqueued %i value(s).\n", values_num);
1508 } /* }}} int handle_request_update */
1510 /* we came across a "WROTE" entry during journal replay.
1511 * throw away any values that we have accumulated for this file
1513 static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
1517 const char *file = buffer;
1519 pthread_mutex_lock(&cache_lock);
1521 ci = g_tree_lookup(cache_tree, file);
1524 pthread_mutex_unlock(&cache_lock);
1530 for (i=0; i < ci->values_num; i++)
1531 free(ci->values[i]);
1536 wipe_ci_values(ci, now);
1537 remove_from_queue(ci);
1539 pthread_mutex_unlock(&cache_lock);
1541 } /* }}} int handle_request_wrote */
1543 /* start "BATCH" processing */
1544 static int batch_start (listen_socket_t *sock) /* {{{ */
1547 if (sock->batch_start)
1548 return send_response(sock, RESP_ERR, "Already in BATCH\n");
1550 status = send_response(sock, RESP_OK,
1551 "Go ahead. End with dot '.' on its own line.\n");
1552 sock->batch_start = time(NULL);
1553 sock->batch_cmd = 0;
1556 } /* }}} static int batch_start */
1558 /* finish "BATCH" processing and return results to the client */
1559 static int batch_done (listen_socket_t *sock) /* {{{ */
1561 assert(sock->batch_start);
1562 sock->batch_start = 0;
1563 sock->batch_cmd = 0;
1564 return send_response(sock, RESP_OK, "errors\n");
1565 } /* }}} static int batch_done */
1567 /* if sock==NULL, we are in journal replay mode */
1568 static int handle_request (listen_socket_t *sock, /* {{{ */
1570 char *buffer, size_t buffer_size)
1576 assert (buffer[buffer_size - 1] == '\0');
1578 buffer_ptr = buffer;
1580 status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
1583 RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
1587 if (sock != NULL && sock->batch_start)
1590 if (strcasecmp (command, "update") == 0)
1591 return (handle_request_update (sock, now, buffer_ptr, buffer_size));
1592 else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
1594 /* this is only valid in replay mode */
1595 return (handle_request_wrote (buffer_ptr, now));
1597 else if (strcasecmp (command, "flush") == 0)
1598 return (handle_request_flush (sock, buffer_ptr, buffer_size));
1599 else if (strcasecmp (command, "flushall") == 0)
1600 return (handle_request_flushall(sock));
1601 else if (strcasecmp (command, "pending") == 0)
1602 return (handle_request_pending(sock, buffer_ptr, buffer_size));
1603 else if (strcasecmp (command, "forget") == 0)
1604 return (handle_request_forget(sock, buffer_ptr, buffer_size));
1605 else if (strcasecmp (command, "stats") == 0)
1606 return (handle_request_stats (sock));
1607 else if (strcasecmp (command, "help") == 0)
1608 return (handle_request_help (sock, buffer_ptr, buffer_size));
1609 else if (strcasecmp (command, "batch") == 0 && sock != NULL)
1610 return batch_start(sock);
1611 else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
1612 return batch_done(sock);
1614 return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
1618 } /* }}} int handle_request */
1620 /* MUST NOT hold journal_lock before calling this */
1621 static void journal_rotate(void) /* {{{ */
1623 FILE *old_fh = NULL;
1626 if (journal_cur == NULL || journal_old == NULL)
1629 pthread_mutex_lock(&journal_lock);
1631 /* we rotate this way (rename before close) so that the we can release
1632 * the journal lock as fast as possible. Journal writes to the new
1633 * journal can proceed immediately after the new file is opened. The
1634 * fclose can then block without affecting new updates.
1636 if (journal_fh != NULL)
1638 old_fh = journal_fh;
1640 rename(journal_cur, journal_old);
1641 ++stats_journal_rotate;
1644 new_fd = open(journal_cur, O_WRONLY|O_CREAT|O_APPEND,
1645 S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
1648 journal_fh = fdopen(new_fd, "a");
1649 if (journal_fh == NULL)
1653 pthread_mutex_unlock(&journal_lock);
1658 if (journal_fh == NULL)
1661 "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)",
1662 journal_cur, rrd_strerror(errno));
1665 "JOURNALING DISABLED: All values will be flushed at shutdown");
1666 config_flush_at_shutdown = 1;
1669 } /* }}} static void journal_rotate */
1671 static void journal_done(void) /* {{{ */
1673 if (journal_cur == NULL)
1676 pthread_mutex_lock(&journal_lock);
1677 if (journal_fh != NULL)
1683 if (config_flush_at_shutdown)
1685 RRDD_LOG(LOG_INFO, "removing journals");
1686 unlink(journal_old);
1687 unlink(journal_cur);
1691 RRDD_LOG(LOG_INFO, "expedited shutdown; "
1692 "journals will be used at next startup");
1695 pthread_mutex_unlock(&journal_lock);
1697 } /* }}} static void journal_done */
1699 static int journal_write(char *cmd, char *args) /* {{{ */
1703 if (journal_fh == NULL)
1706 pthread_mutex_lock(&journal_lock);
1707 chars = fprintf(journal_fh, "%s %s\n", cmd, args);
1708 pthread_mutex_unlock(&journal_lock);
1712 pthread_mutex_lock(&stats_lock);
1713 stats_journal_bytes += chars;
1714 pthread_mutex_unlock(&stats_lock);
1718 } /* }}} static int journal_write */
1720 static int journal_replay (const char *file) /* {{{ */
1726 char entry[CMD_MAX];
1729 if (file == NULL) return 0;
1734 struct stat statbuf;
1736 memset(&statbuf, 0, sizeof(statbuf));
1737 if (stat(file, &statbuf) != 0)
1739 if (errno == ENOENT)
1742 reason = "stat error";
1745 else if (!S_ISREG(statbuf.st_mode))
1747 reason = "not a regular file";
1750 if (statbuf.st_uid != daemon_uid)
1752 reason = "not owned by daemon user";
1755 if (statbuf.st_mode & (S_IWGRP|S_IWOTH))
1757 reason = "must not be user/group writable";
1763 RRDD_LOG(LOG_ERR, "journal_replay: %s : %s (%s)",
1764 file, rrd_strerror(status), reason);
1769 fh = fopen(file, "r");
1772 if (errno != ENOENT)
1773 RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)",
1774 file, rrd_strerror(errno));
1778 RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
1787 if (fgets(entry, sizeof(entry), fh) == NULL)
1789 entry_len = strlen(entry);
1791 /* check \n termination in case journal writing crashed mid-line */
1794 else if (entry[entry_len - 1] != '\n')
1796 RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line);
1801 entry[entry_len - 1] = '\0';
1803 if (handle_request(NULL, now, entry, entry_len) == 0)
1811 RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)",
1812 entry_cnt, fail_cnt);
1814 return entry_cnt > 0 ? 1 : 0;
1815 } /* }}} static int journal_replay */
1817 static void journal_init(void) /* {{{ */
1819 int had_journal = 0;
1821 if (journal_cur == NULL) return;
1823 pthread_mutex_lock(&journal_lock);
1825 RRDD_LOG(LOG_INFO, "checking for journal files");
1827 had_journal += journal_replay(journal_old);
1828 had_journal += journal_replay(journal_cur);
1830 /* it must have been a crash. start a flush */
1831 if (had_journal && config_flush_at_shutdown)
1832 flush_old_values(-1);
1834 pthread_mutex_unlock(&journal_lock);
1837 RRDD_LOG(LOG_INFO, "journal processing complete");
1839 } /* }}} static void journal_init */
1841 static void close_connection(listen_socket_t *sock)
1843 close(sock->fd) ; sock->fd = -1;
1844 free(sock->rbuf); sock->rbuf = NULL;
1845 free(sock->wbuf); sock->wbuf = NULL;
1850 static void *connection_thread_main (void *args) /* {{{ */
1853 listen_socket_t *sock;
1857 sock = (listen_socket_t *) args;
1860 /* init read buffers */
1861 sock->next_read = sock->next_cmd = 0;
1862 sock->rbuf = malloc(RBUF_SIZE);
1863 if (sock->rbuf == NULL)
1865 RRDD_LOG(LOG_ERR, "connection_thread_main: cannot malloc read buffer");
1866 close_connection(sock);
1870 pthread_mutex_lock (&connection_threads_lock);
1874 temp = (pthread_t *) realloc (connection_threads,
1875 sizeof (pthread_t) * (connection_threads_num + 1));
1878 RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
1882 connection_threads = temp;
1883 connection_threads[connection_threads_num] = pthread_self ();
1884 connection_threads_num++;
1887 pthread_mutex_unlock (&connection_threads_lock);
1889 while (do_shutdown == 0)
1896 struct pollfd pollfd;
1900 pollfd.events = POLLIN | POLLPRI;
1903 status = poll (&pollfd, 1, /* timeout = */ 500);
1906 else if (status == 0) /* timeout */
1908 else if (status < 0) /* error */
1911 if (status != EINTR)
1912 RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
1916 if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
1918 else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
1920 RRDD_LOG (LOG_WARNING, "connection_thread_main: "
1921 "poll(2) returned something unexpected: %#04hx",
1926 rbytes = read(fd, sock->rbuf + sock->next_read,
1927 RBUF_SIZE - sock->next_read);
1930 RRDD_LOG(LOG_ERR, "connection_thread_main: read() failed.");
1933 else if (rbytes == 0)
1936 sock->next_read += rbytes;
1938 if (sock->batch_start)
1939 now = sock->batch_start;
1943 while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
1945 status = handle_request (sock, now, cmd, cmd_len+1);
1952 close_connection(sock);
1954 self = pthread_self ();
1955 /* Remove this thread from the connection threads list */
1956 pthread_mutex_lock (&connection_threads_lock);
1957 /* Find out own index in the array */
1958 for (i = 0; i < connection_threads_num; i++)
1959 if (pthread_equal (connection_threads[i], self) != 0)
1961 assert (i < connection_threads_num);
1963 /* Move the trailing threads forward. */
1964 if (i < (connection_threads_num - 1))
1966 memmove (connection_threads + i,
1967 connection_threads + i + 1,
1968 sizeof (pthread_t) * (connection_threads_num - i - 1));
1971 connection_threads_num--;
1972 pthread_mutex_unlock (&connection_threads_lock);
1975 } /* }}} void *connection_thread_main */
1977 static int open_listen_socket_unix (const listen_socket_t *sock) /* {{{ */
1980 struct sockaddr_un sa;
1981 listen_socket_t *temp;
1986 if (strncmp(path, "unix:", strlen("unix:")) == 0)
1987 path += strlen("unix:");
1989 temp = (listen_socket_t *) realloc (listen_fds,
1990 sizeof (listen_fds[0]) * (listen_fds_num + 1));
1993 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
1997 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
1999 fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
2002 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
2006 memset (&sa, 0, sizeof (sa));
2007 sa.sun_family = AF_UNIX;
2008 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
2010 status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
2013 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
2019 status = listen (fd, /* backlog = */ 10);
2022 RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
2028 listen_fds[listen_fds_num].fd = fd;
2029 listen_fds[listen_fds_num].family = PF_UNIX;
2030 strncpy(listen_fds[listen_fds_num].addr, path,
2031 sizeof (listen_fds[listen_fds_num].addr) - 1);
2035 } /* }}} int open_listen_socket_unix */
2037 static int open_listen_socket_network(const listen_socket_t *sock) /* {{{ */
2039 struct addrinfo ai_hints;
2040 struct addrinfo *ai_res;
2041 struct addrinfo *ai_ptr;
2042 char addr_copy[NI_MAXHOST];
2047 strncpy (addr_copy, sock->addr, sizeof (addr_copy));
2048 addr_copy[sizeof (addr_copy) - 1] = 0;
2051 memset (&ai_hints, 0, sizeof (ai_hints));
2052 ai_hints.ai_flags = 0;
2053 #ifdef AI_ADDRCONFIG
2054 ai_hints.ai_flags |= AI_ADDRCONFIG;
2056 ai_hints.ai_family = AF_UNSPEC;
2057 ai_hints.ai_socktype = SOCK_STREAM;
2060 if (*addr == '[') /* IPv6+port format */
2062 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
2065 port = strchr (addr, ']');
2068 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Malformed address: %s",
2077 else if (*port == 0)
2081 RRDD_LOG (LOG_ERR, "open_listen_socket_network: Garbage after address: %s",
2085 } /* if (*addr = ']') */
2086 else if (strchr (addr, '.') != NULL) /* Hostname or IPv4 */
2088 port = rindex(addr, ':');
2096 status = getaddrinfo (addr,
2097 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
2098 &ai_hints, &ai_res);
2101 RRDD_LOG (LOG_ERR, "open_listen_socket_network: getaddrinfo(%s) failed: "
2102 "%s", addr, gai_strerror (status));
2106 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
2109 listen_socket_t *temp;
2112 temp = (listen_socket_t *) realloc (listen_fds,
2113 sizeof (listen_fds[0]) * (listen_fds_num + 1));
2116 RRDD_LOG (LOG_ERR, "open_listen_socket_network: realloc failed.");
2120 memcpy (listen_fds + listen_fds_num, sock, sizeof (listen_fds[0]));
2122 fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
2125 RRDD_LOG (LOG_ERR, "open_listen_socket_network: socket(2) failed.");
2129 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
2131 status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
2134 RRDD_LOG (LOG_ERR, "open_listen_socket_network: bind(2) failed.");
2139 status = listen (fd, /* backlog = */ 10);
2142 RRDD_LOG (LOG_ERR, "open_listen_socket_network: listen(2) failed.");
2147 listen_fds[listen_fds_num].fd = fd;
2148 listen_fds[listen_fds_num].family = ai_ptr->ai_family;
2150 } /* for (ai_ptr) */
2153 } /* }}} static int open_listen_socket_network */
2155 static int open_listen_socket (const listen_socket_t *sock) /* {{{ */
2157 assert(sock != NULL);
2158 assert(sock->addr != NULL);
2160 if (strncmp ("unix:", sock->addr, strlen ("unix:")) == 0
2161 || sock->addr[0] == '/')
2162 return (open_listen_socket_unix(sock));
2164 return (open_listen_socket_network(sock));
2165 } /* }}} int open_listen_socket */
2167 static int close_listen_sockets (void) /* {{{ */
2171 for (i = 0; i < listen_fds_num; i++)
2173 close (listen_fds[i].fd);
2175 if (listen_fds[i].family == PF_UNIX)
2176 unlink(listen_fds[i].addr);
2184 } /* }}} int close_listen_sockets */
2186 static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
2188 struct pollfd *pollfds;
2193 for (i = 0; i < config_listen_address_list_len; i++)
2194 open_listen_socket (config_listen_address_list[i]);
2196 if (config_listen_address_list_len < 1)
2198 listen_socket_t sock;
2199 memset(&sock, 0, sizeof(sock));
2200 strncpy(sock.addr, RRDCACHED_DEFAULT_ADDRESS, sizeof(sock.addr));
2201 open_listen_socket (&sock);
2204 if (listen_fds_num < 1)
2206 RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
2207 "could be opened. Sorry.");
2211 pollfds_num = listen_fds_num;
2212 pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
2213 if (pollfds == NULL)
2215 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2218 memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
2220 RRDD_LOG(LOG_INFO, "listening for connections");
2222 while (do_shutdown == 0)
2224 assert (pollfds_num == ((int) listen_fds_num));
2225 for (i = 0; i < pollfds_num; i++)
2227 pollfds[i].fd = listen_fds[i].fd;
2228 pollfds[i].events = POLLIN | POLLPRI;
2229 pollfds[i].revents = 0;
2232 status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
2235 else if (status == 0) /* timeout */
2237 else if (status < 0) /* error */
2240 if (status != EINTR)
2242 RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
2247 for (i = 0; i < pollfds_num; i++)
2249 listen_socket_t *client_sock;
2250 struct sockaddr_storage client_sa;
2251 socklen_t client_sa_size;
2253 pthread_attr_t attr;
2255 if (pollfds[i].revents == 0)
2258 if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
2260 RRDD_LOG (LOG_ERR, "listen_thread_main: "
2261 "poll(2) returned something unexpected for listen FD #%i.",
2266 client_sock = (listen_socket_t *) malloc (sizeof (listen_socket_t));
2267 if (client_sock == NULL)
2269 RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
2272 memcpy(client_sock, &listen_fds[i], sizeof(listen_fds[0]));
2274 client_sa_size = sizeof (client_sa);
2275 client_sock->fd = accept (pollfds[i].fd,
2276 (struct sockaddr *) &client_sa, &client_sa_size);
2277 if (client_sock->fd < 0)
2279 RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
2284 pthread_attr_init (&attr);
2285 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
2287 status = pthread_create (&tid, &attr, connection_thread_main,
2291 RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
2292 close_connection(client_sock);
2295 } /* for (pollfds_num) */
2296 } /* while (do_shutdown == 0) */
2298 RRDD_LOG(LOG_INFO, "starting shutdown");
2300 close_listen_sockets ();
2302 pthread_mutex_lock (&connection_threads_lock);
2303 while (connection_threads_num > 0)
2307 wait_for = connection_threads[0];
2309 pthread_mutex_unlock (&connection_threads_lock);
2310 pthread_join (wait_for, /* retval = */ NULL);
2311 pthread_mutex_lock (&connection_threads_lock);
2313 pthread_mutex_unlock (&connection_threads_lock);
2316 } /* }}} void *listen_thread_main */
2318 static int daemonize (void) /* {{{ */
2324 daemon_uid = geteuid();
2326 fd = open_pidfile();
2327 if (fd < 0) return fd;
2329 if (!stay_foreground)
2336 fprintf (stderr, "daemonize: fork(2) failed.\n");
2344 /* Become session leader */
2347 /* Open the first three file descriptors to /dev/null */
2352 open ("/dev/null", O_RDWR);
2355 } /* if (!stay_foreground) */
2357 /* Change into the /tmp directory. */
2358 base_dir = (config_base_dir != NULL)
2361 status = chdir (base_dir);
2364 fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
2368 install_signal_handlers();
2370 openlog ("rrdcached", LOG_PID, LOG_DAEMON);
2371 RRDD_LOG(LOG_INFO, "starting up");
2373 cache_tree = g_tree_new ((GCompareFunc) strcmp);
2374 if (cache_tree == NULL)
2376 RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
2380 status = write_pidfile (fd);
2382 } /* }}} int daemonize */
2384 static int cleanup (void) /* {{{ */
2388 pthread_cond_signal (&cache_cond);
2389 pthread_join (queue_thread, /* return = */ NULL);
2393 RRDD_LOG(LOG_INFO, "goodbye");
2397 } /* }}} int cleanup */
2399 static int read_options (int argc, char **argv) /* {{{ */
2404 while ((option = getopt(argc, argv, "gl:L:f:w:b:Bz:p:j:h?F")) != -1)
2415 listen_socket_t **temp;
2416 listen_socket_t *new;
2418 new = malloc(sizeof(listen_socket_t));
2421 fprintf(stderr, "read_options: malloc failed.\n");
2424 memset(new, 0, sizeof(listen_socket_t));
2426 temp = (listen_socket_t **) realloc (config_listen_address_list,
2427 sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
2430 fprintf (stderr, "read_options: realloc failed.\n");
2433 config_listen_address_list = temp;
2435 strncpy(new->addr, optarg, sizeof(new->addr)-1);
2436 new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
2438 temp[config_listen_address_list_len] = new;
2439 config_listen_address_list_len++;
2447 temp = atoi (optarg);
2449 config_flush_interval = temp;
2452 fprintf (stderr, "Invalid flush interval: %s\n", optarg);
2462 temp = atoi (optarg);
2464 config_write_interval = temp;
2467 fprintf (stderr, "Invalid write interval: %s\n", optarg);
2477 temp = atoi(optarg);
2479 config_write_jitter = temp;
2482 fprintf (stderr, "Invalid write jitter: -z %s\n", optarg);
2490 config_write_base_only = 1;
2497 if (config_base_dir != NULL)
2498 free (config_base_dir);
2499 config_base_dir = strdup (optarg);
2500 if (config_base_dir == NULL)
2502 fprintf (stderr, "read_options: strdup failed.\n");
2506 len = strlen (config_base_dir);
2507 while ((len > 0) && (config_base_dir[len - 1] == '/'))
2509 config_base_dir[len - 1] = 0;
2515 fprintf (stderr, "Invalid base directory: %s\n", optarg);
2519 _config_base_dir_len = len;
2525 if (config_pid_file != NULL)
2526 free (config_pid_file);
2527 config_pid_file = strdup (optarg);
2528 if (config_pid_file == NULL)
2530 fprintf (stderr, "read_options: strdup failed.\n");
2537 config_flush_at_shutdown = 1;
2542 struct stat statbuf;
2543 const char *dir = optarg;
2545 status = stat(dir, &statbuf);
2548 fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno));
2552 if (!S_ISDIR(statbuf.st_mode)
2553 || access(dir, R_OK|W_OK|X_OK) != 0)
2555 fprintf(stderr, "Must specify a writable directory with -j! (%s)\n",
2556 errno ? rrd_strerror(errno) : "");
2560 journal_cur = malloc(PATH_MAX + 1);
2561 journal_old = malloc(PATH_MAX + 1);
2562 if (journal_cur == NULL || journal_old == NULL)
2564 fprintf(stderr, "malloc failure for journal files\n");
2569 snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir);
2570 snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir);
2577 printf ("RRDCacheD %s Copyright (C) 2008 Florian octo Forster\n"
2579 "Usage: rrdcached [options]\n"
2581 "Valid options are:\n"
2582 " -l <address> Socket address to listen to.\n"
2583 " -L <address> Socket address to listen to ('FLUSH' only).\n"
2584 " -w <seconds> Interval in which to write data.\n"
2585 " -z <delay> Delay writes up to <delay> seconds to spread load\n"
2586 " -f <seconds> Interval in which to flush dead data.\n"
2587 " -p <file> Location of the PID-file.\n"
2588 " -b <dir> Base directory to change to.\n"
2589 " -B Restrict file access to paths within -b <dir>\n"
2590 " -g Do not fork and run in the foreground.\n"
2591 " -j <dir> Directory in which to create the journal files.\n"
2592 " -F Always flush all updates at shutdown\n"
2594 "For more information and a detailed description of all options "
2596 "to the rrdcached(1) manual page.\n",
2600 } /* switch (option) */
2601 } /* while (getopt) */
2603 /* advise the user when values are not sane */
2604 if (config_flush_interval < 2 * config_write_interval)
2605 fprintf(stderr, "WARNING: flush interval (-f) should be at least"
2606 " 2x write interval (-w) !\n");
2607 if (config_write_jitter > config_write_interval)
2608 fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than"
2609 " write interval (-w) !\n");
2611 if (config_write_base_only && config_base_dir == NULL)
2612 fprintf(stderr, "WARNING: -B does not make sense without -b!\n"
2613 " Consult the rrdcached documentation\n");
2615 if (journal_cur == NULL)
2616 config_flush_at_shutdown = 1;
2619 } /* }}} int read_options */
2621 int main (int argc, char **argv)
2625 status = read_options (argc, argv);
2633 status = daemonize ();
2636 struct sigaction sigchld;
2638 memset (&sigchld, 0, sizeof (sigchld));
2639 sigchld.sa_handler = SIG_IGN;
2640 sigaction (SIGCHLD, &sigchld, NULL);
2644 else if (status != 0)
2646 fprintf (stderr, "daemonize failed, exiting.\n");
2652 /* start the queue thread */
2653 memset (&queue_thread, 0, sizeof (queue_thread));
2654 status = pthread_create (&queue_thread,
2660 RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread");
2665 listen_thread_main (NULL);
2672 * vim: set sw=2 sts=2 ts=8 et fdm=marker :