doing high volume processing. This enables about 25% speed increase
during journal replay and "BATCH" processing. (this is a function of
syscall overhead).
* note when "BATCH" processing or journal replay starts, use that
timestamp for all commands
* use the batch start time to detect when we're in batch mode. no longer
need a separate boolean.
* pass the time_t into handle_request
* pass the time_t through to the commands that need it
-- kevin brintnall
git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1599
a5681a0c-68f1-0310-ab6d-
d61299d08faa
socket_privilege privilege;
/* state for BATCH processing */
socket_privilege privilege;
/* state for BATCH processing */
int batch_cmd;
/* buffered IO */
int batch_cmd;
/* buffered IO */
int len;
if (sock == NULL) return 0; /* journal replay mode */
int len;
if (sock == NULL) return 0; /* journal replay mode */
- if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */
+ if (sock->batch_start) return 0; /* no extra info returned when in BATCH */
va_start(argp, fmt);
#ifdef HAVE_VSNPRINTF
va_start(argp, fmt);
#ifdef HAVE_VSNPRINTF
if (sock == NULL) return rc; /* journal replay mode */
if (sock == NULL) return rc; /* journal replay mode */
{
if (rc == RESP_OK)
return rc; /* no response on success during BATCH */
{
if (rc == RESP_OK)
return rc; /* no response on success during BATCH */
len += rclen;
/* append the result to the wbuf, don't write to the user */
len += rclen;
/* append the result to the wbuf, don't write to the user */
return add_to_wbuf(sock, buffer, len);
/* first write must be complete */
return add_to_wbuf(sock, buffer, len);
/* first write must be complete */
} /* }}} static int handle_request_forget */
static int handle_request_update (listen_socket_t *sock, /* {{{ */
} /* }}} static int handle_request_forget */
static int handle_request_update (listen_socket_t *sock, /* {{{ */
- char *buffer, size_t buffer_size)
+ time_t now,
+ char *buffer, size_t buffer_size)
{
char *file;
int values_num = 0;
{
char *file;
int values_num = 0;
int status;
char orig_buf[CMD_MAX];
int status;
char orig_buf[CMD_MAX];
status = has_privilege(sock, PRIV_HIGH);
if (status <= 0)
return status;
status = has_privilege(sock, PRIV_HIGH);
if (status <= 0)
return status;
/* we came across a "WROTE" entry during journal replay.
* throw away any values that we have accumulated for this file
*/
/* we came across a "WROTE" entry during journal replay.
* throw away any values that we have accumulated for this file
*/
-static int handle_request_wrote (const char *buffer) /* {{{ */
+static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */
{
int i;
cache_item_t *ci;
{
int i;
cache_item_t *ci;
- wipe_ci_values(ci, time(NULL));
+ wipe_ci_values(ci, now);
remove_from_queue(ci);
pthread_mutex_unlock(&cache_lock);
remove_from_queue(ci);
pthread_mutex_unlock(&cache_lock);
static int batch_start (listen_socket_t *sock) /* {{{ */
{
int status;
static int batch_start (listen_socket_t *sock) /* {{{ */
{
int status;
return send_response(sock, RESP_ERR, "Already in BATCH\n");
status = send_response(sock, RESP_OK,
"Go ahead. End with dot '.' on its own line.\n");
return send_response(sock, RESP_ERR, "Already in BATCH\n");
status = send_response(sock, RESP_OK,
"Go ahead. End with dot '.' on its own line.\n");
+ sock->batch_start = time(NULL);
sock->batch_cmd = 0;
return status;
sock->batch_cmd = 0;
return status;
/* finish "BATCH" processing and return results to the client */
static int batch_done (listen_socket_t *sock) /* {{{ */
{
/* finish "BATCH" processing and return results to the client */
static int batch_done (listen_socket_t *sock) /* {{{ */
{
- assert(sock->batch_mode);
- sock->batch_mode = 0;
+ assert(sock->batch_start);
+ sock->batch_start = 0;
sock->batch_cmd = 0;
return send_response(sock, RESP_OK, "errors\n");
} /* }}} static int batch_done */
/* if sock==NULL, we are in journal replay mode */
static int handle_request (listen_socket_t *sock, /* {{{ */
sock->batch_cmd = 0;
return send_response(sock, RESP_OK, "errors\n");
} /* }}} static int batch_done */
/* if sock==NULL, we are in journal replay mode */
static int handle_request (listen_socket_t *sock, /* {{{ */
char *buffer, size_t buffer_size)
{
char *buffer_ptr;
char *buffer, size_t buffer_size)
{
char *buffer_ptr;
- if (sock != NULL && sock->batch_mode)
+ if (sock != NULL && sock->batch_start)
sock->batch_cmd++;
if (strcasecmp (command, "update") == 0)
sock->batch_cmd++;
if (strcasecmp (command, "update") == 0)
- return (handle_request_update (sock, buffer_ptr, buffer_size));
+ return (handle_request_update (sock, now, buffer_ptr, buffer_size));
else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
{
/* this is only valid in replay mode */
else if (strcasecmp (command, "wrote") == 0 && sock == NULL)
{
/* this is only valid in replay mode */
- return (handle_request_wrote (buffer_ptr));
+ return (handle_request_wrote (buffer_ptr, now));
}
else if (strcasecmp (command, "flush") == 0)
return (handle_request_flush (sock, buffer_ptr, buffer_size));
}
else if (strcasecmp (command, "flush") == 0)
return (handle_request_flush (sock, buffer_ptr, buffer_size));
return (handle_request_help (sock, buffer_ptr, buffer_size));
else if (strcasecmp (command, "batch") == 0 && sock != NULL)
return batch_start(sock);
return (handle_request_help (sock, buffer_ptr, buffer_size));
else if (strcasecmp (command, "batch") == 0 && sock != NULL)
return batch_start(sock);
- else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode)
+ else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start)
return batch_done(sock);
else
return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
return batch_done(sock);
else
return send_response(sock, RESP_ERR, "Unknown command: %s\n", command);
int fail_cnt = 0;
uint64_t line = 0;
char entry[CMD_MAX];
int fail_cnt = 0;
uint64_t line = 0;
char entry[CMD_MAX];
if (file == NULL) return 0;
if (file == NULL) return 0;
else
RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
else
RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file);
while(!feof(fh))
{
size_t entry_len;
while(!feof(fh))
{
size_t entry_len;
entry[entry_len - 1] = '\0';
entry[entry_len - 1] = '\0';
- if (handle_request(NULL, entry, entry_len) == 0)
+ if (handle_request(NULL, now, entry, entry_len) == 0)
++entry_cnt;
else
++fail_cnt;
++entry_cnt;
else
++fail_cnt;
char *cmd;
ssize_t cmd_len;
ssize_t rbytes;
char *cmd;
ssize_t cmd_len;
ssize_t rbytes;
struct pollfd pollfd;
int status;
struct pollfd pollfd;
int status;
sock->next_read += rbytes;
sock->next_read += rbytes;
+ if (sock->batch_start)
+ now = sock->batch_start;
+ else
+ now = time(NULL);
+
while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
{
while ((cmd = next_cmd(sock, &cmd_len)) != NULL)
{
- status = handle_request (sock, cmd, cmd_len+1);
+ status = handle_request (sock, now, cmd, cmd_len+1);
if (status != 0)
goto out_close;
}
if (status != 0)
goto out_close;
}