X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=28912c5358f8d17214b34d7c681fe8bcf714b69b;hp=9c8847dd7e4a17ce157853a02ff2440d5354dce0;hb=3c3effad5f54f8c73260068e3ec32c56684243bb;hpb=db9842391d7f238faf7b5ad1b01059115a4f2264 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 9c8847d..28912c5 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -117,7 +117,7 @@ struct listen_socket_s socket_privilege privilege; /* state for BATCH processing */ - int batch_mode; + time_t batch_start; int batch_cmd; /* buffered IO */ @@ -138,6 +138,7 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; + time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) int flags; @@ -404,7 +405,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ int len; if (sock == NULL) return 0; /* journal replay mode */ - if (sock->batch_mode) return 0; /* no extra info returned when in BATCH */ + if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ va_start(argp, fmt); #ifdef HAVE_VSNPRINTF @@ -452,7 +453,7 @@ static int send_response (listen_socket_t *sock, response_code rc, if (sock == NULL) return rc; /* journal replay mode */ - if (sock->batch_mode) + if (sock->batch_start) { if (rc == RESP_OK) return rc; /* no response on success during BATCH */ @@ -477,7 +478,7 @@ static int send_response (listen_socket_t *sock, response_code rc, len += rclen; /* append the result to the wbuf, don't write to the user */ - if (sock->batch_mode) + if (sock->batch_start) return add_to_wbuf(sock, buffer, len); /* first write must be complete */ @@ -487,7 +488,7 @@ static int send_response (listen_socket_t *sock, response_code rc, return -1; } - if (sock->wbuf != NULL) + if (sock->wbuf != NULL && rc == RESP_OK) { wrote = 0; while (wrote < sock->wbuf_len) @@ -1220,7 +1221,7 @@ static int handle_request_flush (listen_socket_t *sock, /* {{{ */ /* NOTREACHED */ assert(1==0); -} /* }}} int handle_request_slurp */ +} /* }}} int handle_request_flush */ static int handle_request_flushall(listen_socket_t *sock) /* {{{ */ { @@ -1307,18 +1308,17 @@ static int handle_request_forget(listen_socket_t *sock, /* {{{ */ } /* }}} static int handle_request_forget */ static int handle_request_update (listen_socket_t *sock, /* {{{ */ - char *buffer, size_t buffer_size) + time_t now, + char *buffer, size_t buffer_size) { char *file; int values_num = 0; + int bad_timestamps = 0; int status; char orig_buf[CMD_MAX]; - time_t now; cache_item_t *ci; - now = time (NULL); - status = has_privilege(sock, PRIV_HIGH); if (status <= 0) return status; @@ -1401,6 +1401,8 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ { char **temp; char *value; + time_t stamp; + char *eostamp; status = buffer_get_field (&buffer, &buffer_size, &value); if (status != 0) @@ -1409,6 +1411,26 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ break; } + /* make sure update time is always moving forward */ + stamp = strtol(value, &eostamp, 10); + if (eostamp == value || eostamp == NULL || *eostamp != ':') + { + ++bad_timestamps; + add_response_info(sock, "Cannot find timestamp in '%s'!\n", value); + continue; + } + else if (stamp <= ci->last_update_stamp) + { + ++bad_timestamps; + add_response_info(sock, + "illegal attempt to update using time %ld when" + " last update time is %ld (minimum one second step)\n", + stamp, ci->last_update_stamp); + continue; + } + else + ci->last_update_stamp = stamp; + temp = (char **) realloc (ci->values, sizeof (char *) * (ci->values_num + 1)); if (temp == NULL) @@ -1439,9 +1461,21 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ pthread_mutex_unlock (&cache_lock); if (values_num < 1) - return send_response(sock, RESP_ERR, "No values updated.\n"); + { + /* if we had only one update attempt, then return the full + error message... try to get the most information out + of the limited error space allowed by the protocol + */ + if (bad_timestamps == 1) + return send_response(sock, RESP_ERR, "%s", sock->wbuf); + else + return send_response(sock, RESP_ERR, + "No values updated (%d bad timestamps).\n", + bad_timestamps); + } else - return send_response(sock, RESP_OK, "Enqueued %i value(s).\n", values_num); + return send_response(sock, RESP_OK, + "errors, enqueued %i value(s).\n", values_num); /* NOTREACHED */ assert(1==0); @@ -1451,7 +1485,7 @@ static int handle_request_update (listen_socket_t *sock, /* {{{ */ /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ -static int handle_request_wrote (const char *buffer) /* {{{ */ +static int handle_request_wrote (const char *buffer, time_t now) /* {{{ */ { int i; cache_item_t *ci; @@ -1474,7 +1508,7 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ free(ci->values); } - wipe_ci_values(ci, time(NULL)); + wipe_ci_values(ci, now); remove_from_queue(ci); pthread_mutex_unlock(&cache_lock); @@ -1485,12 +1519,12 @@ static int handle_request_wrote (const char *buffer) /* {{{ */ static int batch_start (listen_socket_t *sock) /* {{{ */ { int status; - if (sock->batch_mode) + if (sock->batch_start) return send_response(sock, RESP_ERR, "Already in BATCH\n"); status = send_response(sock, RESP_OK, "Go ahead. End with dot '.' on its own line.\n"); - sock->batch_mode = 1; + sock->batch_start = time(NULL); sock->batch_cmd = 0; return status; @@ -1499,14 +1533,15 @@ static int batch_start (listen_socket_t *sock) /* {{{ */ /* finish "BATCH" processing and return results to the client */ static int batch_done (listen_socket_t *sock) /* {{{ */ { - assert(sock->batch_mode); - sock->batch_mode = 0; + assert(sock->batch_start); + sock->batch_start = 0; sock->batch_cmd = 0; return send_response(sock, RESP_OK, "errors\n"); } /* }}} static int batch_done */ /* if sock==NULL, we are in journal replay mode */ static int handle_request (listen_socket_t *sock, /* {{{ */ + time_t now, char *buffer, size_t buffer_size) { char *buffer_ptr; @@ -1524,15 +1559,15 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (-1); } - if (sock != NULL && sock->batch_mode) + if (sock != NULL && sock->batch_start) sock->batch_cmd++; if (strcasecmp (command, "update") == 0) - return (handle_request_update (sock, buffer_ptr, buffer_size)); + return (handle_request_update (sock, now, buffer_ptr, buffer_size)); else if (strcasecmp (command, "wrote") == 0 && sock == NULL) { /* this is only valid in replay mode */ - return (handle_request_wrote (buffer_ptr)); + return (handle_request_wrote (buffer_ptr, now)); } else if (strcasecmp (command, "flush") == 0) return (handle_request_flush (sock, buffer_ptr, buffer_size)); @@ -1548,7 +1583,7 @@ static int handle_request (listen_socket_t *sock, /* {{{ */ return (handle_request_help (sock, buffer_ptr, buffer_size)); else if (strcasecmp (command, "batch") == 0 && sock != NULL) return batch_start(sock); - else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_mode) + else if (strcasecmp (command, ".") == 0 && sock != NULL && sock->batch_start) return batch_done(sock); else return send_response(sock, RESP_ERR, "Unknown command: %s\n", command); @@ -1664,6 +1699,7 @@ static int journal_replay (const char *file) /* {{{ */ int fail_cnt = 0; uint64_t line = 0; char entry[CMD_MAX]; + time_t now; if (file == NULL) return 0; @@ -1716,6 +1752,8 @@ static int journal_replay (const char *file) /* {{{ */ else RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + now = time(NULL); + while(!feof(fh)) { size_t entry_len; @@ -1737,7 +1775,7 @@ static int journal_replay (const char *file) /* {{{ */ entry[entry_len - 1] = '\0'; - if (handle_request(NULL, entry, entry_len) == 0) + if (handle_request(NULL, now, entry, entry_len) == 0) ++entry_cnt; else ++fail_cnt; @@ -1828,6 +1866,7 @@ static void *connection_thread_main (void *args) /* {{{ */ char *cmd; ssize_t cmd_len; ssize_t rbytes; + time_t now; struct pollfd pollfd; int status; @@ -1844,23 +1883,18 @@ static void *connection_thread_main (void *args) /* {{{ */ else if (status < 0) /* error */ { status = errno; - if (status == EINTR) - continue; - RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); + if (status != EINTR) + RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed."); continue; } if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */ - { - close_connection(sock); break; - } else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0) { RRDD_LOG (LOG_WARNING, "connection_thread_main: " "poll(2) returned something unexpected: %#04hx", pollfd.revents); - close_connection(sock); break; } @@ -1876,9 +1910,14 @@ static void *connection_thread_main (void *args) /* {{{ */ sock->next_read += rbytes; + if (sock->batch_start) + now = sock->batch_start; + else + now = time(NULL); + while ((cmd = next_cmd(sock, &cmd_len)) != NULL) { - status = handle_request (sock, cmd, cmd_len+1); + status = handle_request (sock, now, cmd, cmd_len+1); if (status != 0) goto out_close; }