X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=7a802203d140c22681ededf50de9ced3eec128d6;hb=0f56f88a65b4e0eba572afcc55be8ca4f9902812;hp=3163e1865629abe2879f0bcaeb5dd583f202c66f;hpb=28e399e75eb5a26a5ab1b65b7b18b4cfd69e263d;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 3163e18..7a80220 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -66,6 +66,9 @@ #include #include #include +#include +#include +#include #include #include @@ -162,6 +165,11 @@ static char *config_base_dir = NULL; static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; +static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; +static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; + /* * Functions */ @@ -215,6 +223,84 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -222,6 +308,8 @@ static int remove_pidfile (void) /* {{{ */ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ queue_side_t side) { + int did_insert = 0; + RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", ci->file); @@ -241,6 +329,8 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ if (cache_queue_tail == NULL) cache_queue_tail = cache_queue_head; + + did_insert = 1; } else if (cache_queue_head == ci) { @@ -279,10 +369,19 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ else cache_queue_tail->next = ci; cache_queue_tail = ci; + + did_insert = 1; } ci->flags |= CI_FLAGS_IN_QUEUE; + if (did_insert) + { + pthread_mutex_lock (&stats_lock); + stats_queue_length++; + pthread_mutex_unlock (&stats_lock); + } + return (0); } /* }}} int enqueue_cache_item */ @@ -443,6 +542,11 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ cache_queue_tail = NULL; ci->next = NULL; + pthread_mutex_lock (&stats_lock); + assert (stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock (&stats_lock); + pthread_mutex_unlock (&cache_lock); RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", @@ -460,6 +564,11 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ for (i = 0; i < values_num; i++) free (values[i]); + pthread_mutex_lock (&stats_lock); + stats_updates_written++; + stats_data_sets_written += values_num; + pthread_mutex_unlock (&stats_lock); + pthread_mutex_lock (&cache_lock); pthread_cond_broadcast (&flush_cond); } /* while (do_shutdown == 0) */ @@ -486,6 +595,9 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ field = *buffer_ret; field_size = 0; + if (buffer_size <= 0) + return (-1); + /* This is ensured by `handle_request'. */ assert (buffer[buffer_size - 1] == ' '); @@ -567,6 +679,166 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ +static int handle_request_help (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char **help_text; + size_t help_text_len; + char *command; + size_t i; + + char *help_help[] = + { + "4 Command overview\n", + "FLUSH \n", + "HELP []\n", + "UPDATE [ ...]\n", + "STATS\n" + }; + size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); + + char *help_flush[] = + { + "4 Help for FLUSH\n", + "Usage: FLUSH \n", + "\n", + "Adds the given filename to the head of the update queue and returns\n", + "after is has been dequeued.\n" + }; + size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); + + char *help_update[] = + { + "9 Help for UPDATE\n", + "Usage: UPDATE [ ...]\n" + "\n", + "Adds the given file to the internal cache if it is not yet known and\n", + "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", + "for details.\n", + "\n", + "Each has the following form:\n", + " =