X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=128f3c444afe0bdde1d75331532f73ea388df0c2;hp=6445e55d6bad10ad207ea4d64ff73e4a413d4cee;hb=b4d4059df709e7faded933b9e619eabf9cbe1622;hpb=aff0a2728543eee1ac21f3fa02f171caae8d9362 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 6445e55..128f3c4 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1,7 +1,7 @@ /** * RRDTool - src/rrd_daemon.c - * Copyright (C) 2008 Florian octo Forster - * Copyright (C) 2008 Kevin Brintnall + * Copyright (C) 2008,2009 Florian octo Forster + * Copyright (C) 2008,2009 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -77,11 +77,13 @@ #include #ifndef WIN32 -#include +#ifdef HAVE_STDINT_H +# include +#endif #include #include #include -# include +#include #else @@ -91,6 +93,7 @@ #include #include +#include #include #include #include @@ -102,11 +105,18 @@ #include #include #include +#include +#include #include /* }}} */ -#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) +#define RRDD_LOG(severity, ...) \ + do { \ + if (stay_foreground) \ + fprintf(stderr, __VA_ARGS__); \ + syslog ((severity), __VA_ARGS__); \ + } while (0) #ifndef __GNUC__ # define __attribute__(x) /**/ @@ -115,12 +125,6 @@ /* * Types */ -typedef enum -{ - PRIV_LOW, - PRIV_HIGH -} socket_privilege; - typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code; struct listen_socket_s @@ -128,7 +132,6 @@ struct listen_socket_s int fd; char addr[PATH_MAX + 1]; int family; - socket_privilege privilege; /* state for BATCH processing */ time_t batch_start; @@ -141,16 +144,46 @@ struct listen_socket_s char *wbuf; ssize_t wbuf_len; + + uint32_t permissions; + + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; +struct command_s; +typedef struct command_s command_t; +/* note: guard against "unused" warnings in the handlers */ +#define DISPATCH_PROTO listen_socket_t *sock __attribute__((unused)),\ + time_t now __attribute__((unused)),\ + char *buffer __attribute__((unused)),\ + size_t buffer_size __attribute__((unused)) + +#define HANDLER_PROTO command_t *cmd __attribute__((unused)),\ + DISPATCH_PROTO + +struct command_s { + char *cmd; + int (*handler)(HANDLER_PROTO); + + char context; /* where we expect to see it */ +#define CMD_CONTEXT_CLIENT (1<<0) +#define CMD_CONTEXT_BATCH (1<<1) +#define CMD_CONTEXT_JOURNAL (1<<2) +#define CMD_CONTEXT_ANY (0x7f) + + char *syntax; + char *help; +}; + struct cache_item_s; typedef struct cache_item_s cache_item_t; struct cache_item_s { char *file; char **values; - int values_num; + size_t values_num; time_t last_flush_time; time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) @@ -177,6 +210,12 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* describe a set of journal files */ +typedef struct { + char **files; + size_t files_num; +} journal_set; + /* max length of socket command or response */ #define CMD_MAX 4096 #define RBUF_SIZE (CMD_MAX*2) @@ -190,7 +229,11 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; static pthread_t *queue_threads; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -199,8 +242,8 @@ static int config_queue_threads = 4; static pthread_t flush_thread; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; -static pthread_t *connection_threads = NULL; static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER; static int connection_threads_num = 0; /* Cache stuff */ @@ -219,7 +262,7 @@ static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; static listen_socket_t **config_listen_address_list = NULL; -static int config_listen_address_list_len = 0; +static size_t config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; static uint64_t stats_updates_received = 0; @@ -231,21 +274,29 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* Journaled updates */ -static char *journal_cur = NULL; -static char *journal_old = NULL; -static FILE *journal_fh = NULL; +#define JOURNAL_REPLAY(s) ((s) == NULL) +#define JOURNAL_BASE "rrd.journal" +static journal_set *journal_cur = NULL; +static journal_set *journal_old = NULL; +static char *journal_dir = NULL; +static FILE *journal_fh = NULL; /* current journal file handle */ +static long journal_size = 0; /* current journal size */ +#define JOURNAL_MAX (1 * 1024 * 1024 * 1024) static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; static int journal_write(char *cmd, char *args); static void journal_done(void); static void journal_rotate(void); +/* prototypes for forward refernces */ +static int handle_request_help (HANDLER_PROTO); + /* * Functions */ static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; + state = FLUSHING; pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ @@ -308,12 +359,32 @@ static void install_signal_handlers(void) /* {{{ */ static int open_pidfile(char *action, int oflag) /* {{{ */ { int fd; - char *file; + const char *file; + char *file_copy, *dir; file = (config_pid_file != NULL) ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; + /* dirname may modify its argument */ + file_copy = strdup(file); + if (file_copy == NULL) + { + fprintf(stderr, "rrdcached: strdup(): %s\n", + rrd_strerror(errno)); + return -1; + } + + dir = dirname(file_copy); + if (rrd_mkdir_p(dir, 0777) != 0) + { + fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", + dir, rrd_strerror(errno)); + return -1; + } + + free(file_copy); + fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); if (fd < 0) fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", @@ -351,7 +422,13 @@ static int check_pidfile(void) } lseek(pid_fd, 0, SEEK_SET); - ftruncate(pid_fd, 0); + if (ftruncate(pid_fd, 0) == -1) + { + fprintf(stderr, + "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid); + close(pid_fd); + return -1; + } fprintf(stderr, "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" @@ -430,7 +507,7 @@ static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ /* NOTREACHED */ assert(1==0); -} +} /* }}} char *next_cmd */ /* add the characters directly to the write buffer */ static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ @@ -461,12 +538,12 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ char buffer[CMD_MAX]; int len; - if (sock == NULL) return 0; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return 0; if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp); + len = vsnprintf(buffer, sizeof(buffer), fmt, argp); #else len = vsprintf(buffer, fmt, argp); #endif @@ -508,7 +585,7 @@ static int send_response (listen_socket_t *sock, response_code rc, ssize_t wrote; int rclen, len; - if (sock == NULL) return rc; /* journal replay mode */ + if (JOURNAL_REPLAY(sock)) return rc; if (sock->batch_start) { @@ -524,7 +601,7 @@ static int send_response (listen_socket_t *sock, response_code rc, rclen = sprintf(buffer, "%d ", lines); va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp); + len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp); #else len = vsprintf(buffer+rclen, fmt, argp); #endif @@ -573,7 +650,7 @@ static void wipe_ci_values(cache_item_t *ci, time_t when) ci->last_flush_time = when; if (config_write_jitter > 0) - ci->last_flush_time += (random() % config_write_jitter); + ci->last_flush_time += (rrd_random() % config_write_jitter); } /* remove_from_queue @@ -614,7 +691,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ remove_from_queue(ci); - for (int i=0; i < ci->values_num; i++) + for (size_t i=0; i < ci->values_num; i++) free(ci->values[i]); free (ci->values); @@ -622,6 +699,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ /* in case anyone is waiting */ pthread_cond_broadcast(&ci->flushed); + pthread_cond_destroy(&ci->flushed); free (ci); @@ -704,33 +782,20 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ if (ci->flags & CI_FLAGS_IN_QUEUE) return FALSE; - if ((ci->last_flush_time <= cfd->abs_timeout) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && (ci->values_num > 0)) + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) && (ci->values_num <= 0)) { - char **temp; - - temp = (char **) rrd_realloc (cfd->keys, - sizeof (char *) * (cfd->keys_num + 1)); - if (temp == NULL) + assert ((char *) key == ci->file); + if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key)) { - RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); return (FALSE); } - cfd->keys = temp; - /* Make really sure this points to the _same_ place */ - assert ((char *) key == ci->file); - cfd->keys[cfd->keys_num] = (char *) key; - cfd->keys_num++; } return (FALSE); @@ -761,9 +826,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); /* should never fail, since we have held the cache_lock * the entire time */ - assert( g_tree_remove(cache_tree, cfd.keys[k]) == TRUE ); + assert(status == TRUE); } if (cfd.keys != NULL) @@ -787,21 +853,22 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); - while (!do_shutdown) + while (state == RUNNING) { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* Flush all values that haven't been written in the last * `config_write_interval' seconds. */ flush_old_values (config_write_interval); - /* Determine the time of the next cache flush. */ - next_flush.tv_sec = - now.tv_sec + next_flush.tv_sec % config_flush_interval; - /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ pthread_mutex_unlock(&cache_lock); @@ -820,6 +887,8 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ if (config_flush_at_shutdown) flush_old_values (-1); /* flush everything */ + state = SHUTDOWN; + pthread_mutex_unlock(&cache_lock); return NULL; @@ -829,19 +898,18 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { pthread_mutex_lock (&cache_lock); - while (!do_shutdown + while (state != SHUTDOWN || (cache_queue_head != NULL && config_flush_at_shutdown)) { cache_item_t *ci; char *file; char **values; - int values_num; + size_t values_num; int status; - int i; /* Now, check if there's something to store away. If not, wait until - * something comes in. if we are shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) @@ -878,7 +946,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); rrd_clear_error (); - status = rrd_update_r (file, NULL, values_num, (void *) values); + status = rrd_update_r (file, NULL, (int) values_num, (void *) values); if (status != 0) { RRDD_LOG (LOG_NOTICE, "queue_thread_main: " @@ -887,13 +955,14 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); - pthread_cond_broadcast(&ci->flushed); - - for (i = 0; i < values_num; i++) - free (values[i]); - free(values); - free(file); + /* Search again in the tree. It's possible someone issued a "FORGET" + * while we were writing the update values. */ + pthread_mutex_lock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, file); + if (ci) + pthread_cond_broadcast(&ci->flushed); + pthread_mutex_unlock(&cache_lock); if (status == 0) { @@ -903,6 +972,9 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&stats_lock); } + rrd_free_ptrs((void ***) &values, &values_num); + free(file); + pthread_mutex_lock (&cache_lock); } pthread_mutex_unlock (&cache_lock); @@ -982,7 +1054,7 @@ static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ assert(file != NULL); if (!config_write_base_only - || sock == NULL /* journal replay */ + || JOURNAL_REPLAY(sock) || config_base_dir == NULL) return 1; @@ -1025,20 +1097,6 @@ static void get_abs_path(char **filename, char *tmp) *filename = tmp; } /* }}} static int get_abs_path */ -/* returns 1 if we have the required privilege level, - * otherwise issue an error to the user on sock */ -static int has_privilege (listen_socket_t *sock, /* {{{ */ - socket_privilege priv) -{ - if (sock == NULL) /* journal replay */ - return 1; - - if (sock->privilege >= priv) - return 1; - - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); -} /* }}} static int has_privilege */ - static int flush_file (const char *filename) /* {{{ */ { cache_item_t *ci; @@ -1067,166 +1125,17 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ -static int handle_request_help (listen_socket_t *sock, /* {{{ */ - char *buffer, size_t buffer_size) +static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */ { - int status; - char **help_text; - char *command; + char *err = "Syntax error.\n"; - char *help_help[2] = - { - "Command overview\n" - , - "HELP []\n" - "FLUSH \n" - "FLUSHALL\n" - "PENDING \n" - "FORGET \n" - "QUEUE\n" - "UPDATE [ ...]\n" - "BATCH\n" - "STATS\n" - "QUIT\n" - }; - - char *help_flush[2] = - { - "Help for FLUSH\n" - , - "Usage: FLUSH \n" - "\n" - "Adds the given filename to the head of the update queue and returns\n" - "after it has been dequeued.\n" - }; - - char *help_flushall[2] = - { - "Help for FLUSHALL\n" - , - "Usage: FLUSHALL\n" - "\n" - "Triggers writing of all pending updates. Returns immediately.\n" - }; - - char *help_pending[2] = - { - "Help for PENDING\n" - , - "Usage: PENDING \n" - "\n" - "Shows any 'pending' updates for a file, in order.\n" - "The updates shown have not yet been written to the underlying RRD file.\n" - }; - - char *help_forget[2] = - { - "Help for FORGET\n" - , - "Usage: FORGET \n" - "\n" - "Removes the file completely from the cache.\n" - "Any pending updates for the file will be lost.\n" - }; - - char *help_queue[2] = - { - "Help for QUEUE\n" - , - "Shows all files in the output queue.\n" - "The output is zero or more lines in the following format:\n" - "(where is the number of values to be written)\n" - "\n" - " \n" - "\n" - }; + if (cmd && cmd->syntax) + err = cmd->syntax; - char *help_update[2] = - { - "Help for UPDATE\n" - , - "Usage: UPDATE [ ...]\n" - "\n" - "Adds the given file to the internal cache if it is not yet known and\n" - "appends the given value(s) to the entry. See the rrdcached(1) manpage\n" - "for details.\n" - "\n" - "Each has the following form:\n" - " =