From db9842391d7f238faf7b5ad1b01059115a4f2264 Mon Sep 17 00:00:00 2001 From: oetiker Date: Tue, 7 Oct 2008 21:10:54 +0000 Subject: [PATCH] This patch introduces two new commands for cache management: PENDING: shows any un-written updates for a file FORGET : remove a file completely from cache -- kevin brintnall git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1588 a5681a0c-68f1-0310-ab6d-d61299d08faa --- doc/rrdcached.pod | 9 ++++ src/rrd_daemon.c | 154 ++++++++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 142 insertions(+), 21 deletions(-) diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod index 4bd1bb8..b01165e 100644 --- a/doc/rrdcached.pod +++ b/doc/rrdcached.pod @@ -363,6 +363,15 @@ sent B the node has been dequeued. Causes the daemon to start flushing ALL pending values to disk. This returns immediately, even though the writes may take a long time. +=item B I + +Shows any "pending" updates for a file, in order. The updates shown have +not yet been written to the underlying RRD file. + +=item B I + +Removes I from the cache. Any pending updates B. + =item B [I] Returns a short usage message. If no command is given, or I is diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 30cf748..9c8847d 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -540,6 +540,34 @@ static void remove_from_queue(cache_item_t *ci) /* {{{ */ ci->flags &= ~CI_FLAGS_IN_QUEUE; } /* }}} static void remove_from_queue */ +/* remove an entry from the tree and free all its resources. + * must hold 'cache lock' while calling this. + * returns 0 on success, otherwise errno */ +static int forget_file(const char *file) +{ + cache_item_t *ci; + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + return ENOENT; + + g_tree_remove (cache_tree, file); + remove_from_queue(ci); + + for (int i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free (ci->values); + free (ci->file); + + /* in case anyone is waiting */ + pthread_cond_broadcast(&ci->flushed); + + free (ci); + + return 0; +} /* }}} static int forget_file */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -674,26 +702,10 @@ static int flush_old_values (int max_age) for (k = 0; k < cfd.keys_num; k++) { - cache_item_t *ci; - - /* This must not fail. */ - ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); - assert (ci != NULL); - - /* If we end up here with values available, something's seriously - * messed up. */ - assert (ci->values_num == 0); - - /* Remove the node from the tree */ - g_tree_remove (cache_tree, cfd.keys[k]); - cfd.keys[k] = NULL; - - /* Now free and clean up `ci'. */ - free (ci->file); - ci->file = NULL; - free (ci); - ci = NULL; - } /* for (k = 0; k < cfd.keys_num; k++) */ + /* should never fail, since we have held the cache_lock + * the entire time */ + assert( forget_file(cfd.keys[k]) == 0 ); + } if (cfd.keys != NULL) { @@ -977,6 +989,9 @@ static int flush_file (const char *filename) /* {{{ */ pthread_cond_wait(&ci->flushed, &cache_lock); } + /* DO NOT DO ANYTHING WITH ci HERE!! The entry + * may have been purged during our cond_wait() */ + pthread_mutex_unlock(&cache_lock); return (0); @@ -993,9 +1008,11 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ { "Command overview\n" , + "HELP []\n" "FLUSH \n" "FLUSHALL\n" - "HELP []\n" + "PENDING \n" + "FORGET \n" "UPDATE [ ...]\n" "BATCH\n" "STATS\n" @@ -1020,6 +1037,26 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ "Triggers writing of all pending updates. Returns immediately.\n" }; + char *help_pending[2] = + { + "Help for PENDING\n" + , + "Usage: PENDING \n" + "\n" + "Shows any 'pending' updates for a file, in order.\n" + "The updates shown have not yet been written to the underlying RRD file.\n" + }; + + char *help_forget[2] = + { + "Help for FORGET\n" + , + "Usage: FORGET \n" + "\n" + "Removes the file completely from the cache.\n" + "Any pending updates for the file will be lost.\n" + }; + char *help_update[2] = { "Help for UPDATE\n" @@ -1078,6 +1115,10 @@ static int handle_request_help (listen_socket_t *sock, /* {{{ */ help_text = help_flush; else if (strcasecmp (command, "flushall") == 0) help_text = help_flushall; + else if (strcasecmp (command, "pending") == 0) + help_text = help_pending; + else if (strcasecmp (command, "forget") == 0) + help_text = help_forget; else if (strcasecmp (command, "stats") == 0) help_text = help_stats; else if (strcasecmp (command, "batch") == 0) @@ -1198,6 +1239,73 @@ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ return send_response(sock, RESP_OK, "Started flush.\n"); } /* }}} static int handle_request_flushall */ +static int handle_request_pending(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file; + cache_item_t *ci; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: PENDING \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) + { + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); + } + + for (int i=0; i < ci->values_num; i++) + add_response_info(sock, "%s\n", ci->values[i]); + + pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_OK, "updates pending\n"); +} /* }}} static int handle_request_pending */ + +static int handle_request_forget(listen_socket_t *sock, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char *file; + + status = buffer_get_field(&buffer, &buffer_size, &file); + if (status != 0) + return send_response(sock, RESP_ERR, + "Usage: FORGET \n"); + + status = has_privilege(sock, PRIV_HIGH); + if (status <= 0) + return status; + + if (!check_file_access(file, sock)) return 0; + + pthread_mutex_lock(&cache_lock); + status = forget_file(file); + pthread_mutex_unlock(&cache_lock); + + if (status == 0) + { + if (sock != NULL) + journal_write("forget", file); + + return send_response(sock, RESP_OK, "Gone!\n"); + } + else + return send_response(sock, RESP_ERR, "cannot forget: %s\n", + status < 0 ? "Internal error" : rrd_strerror(status)); + + /* NOTREACHED */ + assert(1==0); +} /* }}} static int handle_request_forget */ + static int handle_request_update (listen_socket_t *sock, /* {{{ */ char *buffer, size_t buffer_size) { @@ -1430,6 +1538,10 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (handle_request_flush (sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "flushall") == 0) return (handle_request_flushall(sock)); + else if (strcasecmp (command, "pending") == 0) + return (handle_request_pending(sock, buffer_ptr, buffer_size)); + else if (strcasecmp (command, "forget") == 0) + return (handle_request_forget(sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "stats") == 0) return (handle_request_stats (sock)); else if (strcasecmp (command, "help") == 0) -- 2.11.0