X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Frrd_daemon.c;h=bc44b810ee01334f97d21c68e4f1d727be2d6d53;hb=a42f6740636ce163011504932c5aab7cdfc95c86;hp=4b725c2189f9fb325319ee28e8fe7f06d9690a21;hpb=31f353175df83d273bd33340faa415269cd9341d;p=rrdtool.git diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 4b725c2..bc44b81 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -66,6 +66,9 @@ #include #include #include +#include +#include +#include #include #include @@ -163,8 +166,8 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; -static uint64_t stats_updates_total = 0; -static uint64_t stats_values_total = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; /* @@ -220,6 +223,84 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -229,9 +310,6 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ { int did_insert = 0; - RRDD_LOG (LOG_DEBUG, "enqueue_cache_item: Adding %s to the update queue.", - ci->file); - if (ci == NULL) return (-1); @@ -468,9 +546,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: rrd_update (%s, %i, %p)", - file, values_num, (void *) values); - status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { @@ -484,8 +559,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ free (values[i]); pthread_mutex_lock (&stats_lock); - stats_updates_total++; - stats_values_total += values_num; + stats_updates_written++; + stats_data_sets_written += values_num; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); @@ -493,8 +568,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* while (do_shutdown == 0) */ pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_DEBUG, "queue_thread_main: Exiting."); - return (NULL); } /* }}} void *queue_thread_main */ @@ -514,6 +587,9 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ field = *buffer_ret; field_size = 0; + if (buffer_size <= 0) + return (-1); + /* This is ensured by `handle_request'. */ assert (buffer[buffer_size - 1] == ' '); @@ -595,6 +671,166 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ +static int handle_request_help (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char **help_text; + size_t help_text_len; + char *command; + size_t i; + + char *help_help[] = + { + "4 Command overview\n", + "FLUSH \n", + "HELP []\n", + "UPDATE [ ...]\n", + "STATS\n" + }; + size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); + + char *help_flush[] = + { + "4 Help for FLUSH\n", + "Usage: FLUSH \n", + "\n", + "Adds the given filename to the head of the update queue and returns\n", + "after is has been dequeued.\n" + }; + size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); + + char *help_update[] = + { + "9 Help for UPDATE\n", + "Usage: UPDATE [ ...]\n" + "\n", + "Adds the given file to the internal cache if it is not yet known and\n", + "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", + "for details.\n", + "\n", + "Each has the following form:\n", + " =