From: oetiker Date: Mon, 6 Oct 2008 19:04:48 +0000 (+0000) Subject: This patch introduces buffered I/O to rrdcached. Now, rrdcached can X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=commitdiff_plain;h=1164122dace49986850172ef2cb198dc301fe750 This patch introduces buffered I/O to rrdcached. Now, rrdcached can interpret as many commands as arrive in a single read(), and it will use fewer write()s when there are multiple output lines. All routines now pass around listen_socket_t objects instead of file descriptors. All I/O is now contained in two routines. It's no longer necessary to specify the line count in multi-line outputs, since that is calculated automatically. This is the foundation for accepting batched commands. -- kevin brintnall git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1580 a5681a0c-68f1-0310-ab6d-d61299d08faa --- diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 8c4f042..03dd181 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -107,12 +107,22 @@ typedef enum PRIV_HIGH } socket_privilege; +typedef enum { RESP_ERR = -1, RESP_OK = 0 } response_code; + struct listen_socket_s { int fd; char addr[PATH_MAX + 1]; int family; socket_privilege privilege; + + /* buffered IO */ + char *rbuf; + off_t next_cmd; + off_t next_read; + + char *wbuf; + ssize_t wbuf_len; }; typedef struct listen_socket_s listen_socket_t; @@ -150,6 +160,7 @@ typedef enum queue_side_e queue_side_t; /* max length of socket command or response */ #define CMD_MAX 4096 +#define RBUF_SIZE (CMD_MAX*2) /* * Variables @@ -322,86 +333,166 @@ static int remove_pidfile (void) /* {{{ */ return (errno); } /* }}} int remove_pidfile */ -static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ { - char *buffer; - size_t buffer_used; - size_t buffer_free; - ssize_t status; + char *eol; - buffer = (char *) buffer_void; - buffer_used = 0; - buffer_free = buffer_size; + eol = memchr(sock->rbuf + sock->next_cmd, '\n', + sock->next_read - sock->next_cmd); - while (buffer_free > 0) + if (eol == NULL) { - status = read (fd, buffer + buffer_used, buffer_free); - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; - - if (status < 0) - return (-1); + /* no commands left, move remainder back to front of rbuf */ + memmove(sock->rbuf, sock->rbuf + sock->next_cmd, + sock->next_read - sock->next_cmd); + sock->next_read -= sock->next_cmd; + sock->next_cmd = 0; + *len = 0; + return NULL; + } + else + { + char *cmd = sock->rbuf + sock->next_cmd; + *eol = '\0'; - if (status == 0) - return (0); + sock->next_cmd = eol - sock->rbuf + 1; - assert ((0 > status) || (buffer_free >= (size_t) status)); + if (eol > sock->rbuf && *(eol-1) == '\r') + *(--eol) = '\0'; /* handle "\r\n" EOL */ - buffer_free = buffer_free - status; - buffer_used = buffer_used + status; + *len = eol - cmd; - if (buffer[buffer_used - 1] == '\n') - break; + return cmd; } - assert (buffer_used > 0); + /* NOTREACHED */ + assert(1==0); +} + +/* add the characters directly to the write buffer */ +static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ +{ + char *new_buf; + + assert(sock != NULL); - if (buffer[buffer_used - 1] != '\n') + new_buf = realloc(sock->wbuf, sock->wbuf_len + len + 1); + if (new_buf == NULL) { - errno = ENOBUFS; - return (-1); + RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed"); + return -1; } - buffer[buffer_used - 1] = 0; + strncpy(new_buf + sock->wbuf_len, str, len + 1); + + sock->wbuf = new_buf; + sock->wbuf_len += len; - /* Fix network line endings. */ - if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + return 0; +} /* }}} static int add_to_wbuf */ + +/* add the text to the "extra" info that's sent after the status line */ +static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ +{ + va_list argp; + char buffer[CMD_MAX]; + int len; + + if (sock == NULL) return 0; /* journal replay mode */ + + va_start(argp, fmt); +#ifdef HAVE_VSNPRINTF + len = vsnprintf(buffer, sizeof(buffer)-1, fmt, argp); +#else + len = vsprintf(buffer, fmt, argp); +#endif + va_end(argp); + if (len < 0) { - buffer_used--; - buffer[buffer_used - 1] = 0; + RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed"); + return -1; } - return (buffer_used); -} /* }}} ssize_t sread */ + return add_to_wbuf(sock, buffer, len); +} /* }}} static int add_response_info */ -static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +static int count_lines(char *str) /* {{{ */ { - const char *ptr; - size_t nleft; - ssize_t status; + int lines = 0; - /* special case for journal replay */ - if (fd < 0) return 0; + if (str != NULL) + { + while ((str = strchr(str, '\n')) != NULL) + { + ++lines; + ++str; + } + } - ptr = (const char *) buf; - nleft = count; + return lines; +} /* }}} static int count_lines */ - while (nleft > 0) +/* send the response back to the user. + * returns 0 on success, -1 on error + * write buffer is always zeroed after this call */ +static int send_response (listen_socket_t *sock, response_code rc, + char *fmt, ...) /* {{{ */ +{ + va_list argp; + char buffer[CMD_MAX]; + int lines; + ssize_t wrote; + int rclen, len; + + if (sock == NULL) return rc; /* journal replay mode */ + + if (rc == RESP_OK) { - status = write (fd, (const void *) ptr, nleft); + lines = count_lines(sock->wbuf); + } + else + lines = -1; + + rclen = sprintf(buffer, "%d ", lines); + va_start(argp, fmt); +#ifdef HAVE_VSNPRINTF + len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen-1, fmt, argp); +#else + len = vsprintf(buffer+rclen, fmt, argp); +#endif + va_end(argp); + if (len < 0) + return -1; - if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) - continue; + len += rclen; - if (status < 0) - return (status); + /* first write must be complete */ + if (len != write(sock->fd, buffer, len)) + { + RRDD_LOG(LOG_INFO, "send_response: could not write status message"); + return -1; + } - nleft -= status; - ptr += status; + if (sock->wbuf != NULL) + { + wrote = 0; + while (wrote < sock->wbuf_len) + { + ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote); + if (wb <= 0) + { + RRDD_LOG(LOG_INFO, "send_response: could not write results"); + return -1; + } + wrote += wb; + } } - return (0); -} /* }}} ssize_t swrite */ + free(sock->wbuf); sock->wbuf = NULL; + sock->wbuf_len = 0; + + return 0; +} /* }}} */ static void wipe_ci_values(cache_item_t *ci, time_t when) { @@ -810,13 +901,12 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ * check whether the file falls within the dir * returns 1 if OK, otherwise 0 */ -static int check_file_access (const char *file, int fd) /* {{{ */ +static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ { - char error[CMD_MAX]; assert(file != NULL); if (!config_write_base_only - || fd < 0 /* journal replay */ + || sock == NULL /* journal replay */ || config_base_dir == NULL) return 1; @@ -833,8 +923,9 @@ static int check_file_access (const char *file, int fd) /* {{{ */ return 1; err: - snprintf(error, sizeof(error)-1, "-1 %s\n", rrd_strerror(EACCES)); - swrite(fd, error, strlen(error)); + if (sock != NULL && sock->fd >= 0) + send_response(sock, RESP_ERR, "%s\n", rrd_strerror(EACCES)); + return 0; } /* }}} static int check_file_access */ @@ -863,126 +954,91 @@ static int flush_file (const char *filename) /* {{{ */ return (0); } /* }}} int flush_file */ -static int handle_request_help (int fd, /* {{{ */ +static int handle_request_help (listen_socket_t *sock, /* {{{ */ char *buffer, size_t buffer_size) { int status; char **help_text; - size_t help_text_len; char *command; - size_t i; - char *help_help[] = + char *help_help[2] = { - "5 Command overview\n", - "FLUSH \n", - "FLUSHALL\n", - "HELP []\n", - "UPDATE [ ...]\n", + "Command overview\n" + , + "FLUSH \n" + "FLUSHALL\n" + "HELP []\n" + "UPDATE [ ...]\n" "STATS\n" }; - size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); - char *help_flush[] = + char *help_flush[2] = { - "4 Help for FLUSH\n", - "Usage: FLUSH \n", - "\n", - "Adds the given filename to the head of the update queue and returns\n", + "Help for FLUSH\n" + , + "Usage: FLUSH \n" + "\n" + "Adds the given filename to the head of the update queue and returns\n" "after is has been dequeued.\n" }; - size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); - char *help_flushall[] = + char *help_flushall[2] = { - "3 Help for FLUSHALL\n", - "Usage: FLUSHALL\n", - "\n", + "Help for FLUSHALL\n" + , + "Usage: FLUSHALL\n" + "\n" "Triggers writing of all pending updates. Returns immediately.\n" }; - size_t help_flushall_len = sizeof(help_flushall) / sizeof(help_flushall[0]); - char *help_update[] = + char *help_update[2] = { - "9 Help for UPDATE\n", + "Help for UPDATE\n" + , "Usage: UPDATE [ ...]\n" - "\n", - "Adds the given file to the internal cache if it is not yet known and\n", - "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", - "for details.\n", - "\n", - "Each has the following form:\n", - " =