#include "collectd.h"
-#include "common.h"
#include "plugin.h"
+#include "utils/common/common.h"
#include <arpa/inet.h>
#include <errno.h>
};
/** Give user option to use default (long run = since daemon started) avg */
-static int long_run_latency_avg = 0;
+static int long_run_latency_avg;
/**
* Give user option to use default type for special cases -
static int convert_special_metrics = 1;
/** Array of daemons to monitor */
-static struct ceph_daemon **g_daemons = NULL;
+static struct ceph_daemon **g_daemons;
/** Number of elements in g_daemons */
-static size_t g_num_daemons = 0;
+static size_t g_num_daemons;
/**
* A set of data that we build up in memory while parsing the JSON.
uint64_t avgcount;
/** current index of counters - used to get type of counter */
int index;
- /** do we already have an avgcount for latency pair */
- int avgcount_exists;
/**
* similar to index, but current index of latency type counters -
* used to get last poll data of counter
if (dest_size > dest_len) { \
sstrncpy((dest) + dest_len, (src), dest_size - dest_len); \
} \
- (dest)[dest_size - 1] = 0; \
+ (dest)[dest_size - 1] = '\0'; \
} while (0)
static int ceph_cb_number(void *ctx, const char *number_val,
yajl_struct *state = (yajl_struct *)ctx;
char buffer[number_len + 1];
char key[2 * DATA_MAX_NAME_LEN] = {0};
- _Bool latency_type = 0;
int status;
memcpy(buffer, number_val, number_len);
BUFFER_ADD(key, state->stack[i]);
}
- /* Special case for latency metrics. */
- if ((strcmp("avgcount", state->key) == 0) ||
- (strcmp("sum", state->key) == 0)) {
- latency_type = 1;
-
- /* depth >= 2 => (stack[-1] != NULL && stack[-2] != NULL) */
- assert((state->depth < 2) || ((state->stack[state->depth - 1] != NULL) &&
- (state->stack[state->depth - 2] != NULL)));
-
- /* Super-special case for filestore.journal_wr_bytes.avgcount: For
- * some reason, Ceph schema encodes this as a count/sum pair while all
- * other "Bytes" data (excluding used/capacity bytes for OSD space) uses
- * a single "Derive" type. To spare further confusion, keep this KPI as
- * the same type of other "Bytes". Instead of keeping an "average" or
- * "rate", use the "sum" in the pair and assign that to the derive
- * value. */
- if (convert_special_metrics && (state->depth >= 2) &&
- (strcmp("filestore", state->stack[state->depth - 2]) == 0) &&
- (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) &&
- (strcmp("avgcount", state->key) == 0)) {
- DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
- return CEPH_CB_CONTINUE;
- }
- } else /* not a latency type */
- {
- BUFFER_ADD(key, ".");
- BUFFER_ADD(key, state->key);
+ /* Super-special case for filestore.journal_wr_bytes.avgcount: For
+ * some reason, Ceph schema encodes this as a count/sum pair while all
+ * other "Bytes" data (excluding used/capacity bytes for OSD space) uses
+ * a single "Derive" type. To spare further confusion, keep this KPI as
+ * the same type of other "Bytes". Instead of keeping an "average" or
+ * "rate", use the "sum" in the pair and assign that to the derive
+ * value. */
+ if (convert_special_metrics && (state->depth > 2) &&
+ state->stack[state->depth - 2] &&
+ (strcmp("filestore", state->stack[state->depth - 2]) == 0) &&
+ state->stack[state->depth - 1] &&
+ (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) &&
+ (strcmp("avgcount", state->key) == 0)) {
+ DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
+ return CEPH_CB_CONTINUE;
}
- status = state->handler(state->handler_arg, buffer, key);
- if ((status == RETRY_AVGCOUNT) && latency_type) {
- /* Add previously skipped part of the key, either "avgcount" or "sum",
- * and try again. */
- BUFFER_ADD(key, ".");
- BUFFER_ADD(key, state->key);
+ BUFFER_ADD(key, ".");
+ BUFFER_ADD(key, state->key);
- status = state->handler(state->handler_arg, buffer, key);
- }
+ status = state->handler(state->handler_arg, buffer, key);
if (status != 0) {
ERROR("ceph plugin: JSON handler failed with status %d.", status);
}
memmove(state->key, key, sz - 1);
- state->key[sz - 1] = 0;
+ state->key[sz - 1] = '\0';
return CEPH_CB_CONTINUE;
}
}
/* compact_ds_name removed the special characters ":", "_", "-" and "+" from the
- * intput string. Characters following these special characters are capitalized.
+ * input string. Characters following these special characters are capitalized.
* Trailing "+" and "-" characters are replaces with the strings "Plus" and
* "Minus". */
static int compact_ds_name(char *buffer, size_t buffer_size, char const *src) {
size_t src_len;
char *ptr = buffer;
size_t ptr_size = buffer_size;
- _Bool append_plus = 0;
- _Bool append_minus = 0;
+ bool append_plus = false;
+ bool append_minus = false;
if ((buffer == NULL) || (buffer_size <= strlen("Minus")) || (src == NULL))
return EINVAL;
/* Remove trailing "+" and "-". */
if (src_copy[src_len - 1] == '+') {
- append_plus = 1;
+ append_plus = true;
src_len--;
src_copy[src_len] = 0;
} else if (src_copy[src_len - 1] == '-') {
- append_minus = 1;
+ append_minus = true;
src_len--;
src_copy[src_len] = 0;
}
return 0;
}
-static _Bool has_suffix(char const *str, char const *suffix) {
+static bool has_suffix(char const *str, char const *suffix) {
size_t str_len = strlen(str);
size_t suffix_len = strlen(suffix);
size_t offset;
if (suffix_len > str_len)
- return 0;
+ return false;
offset = str_len - suffix_len;
if (strcmp(str + offset, suffix) == 0)
- return 1;
+ return true;
- return 0;
+ return false;
+}
+
+static void cut_suffix(char *buffer, size_t buffer_size, char const *str,
+ char const *suffix) {
+
+ size_t str_len = strlen(str);
+ size_t suffix_len = strlen(suffix);
+
+ size_t offset = str_len - suffix_len + 1;
+
+ if (offset > buffer_size) {
+ offset = buffer_size;
+ }
+
+ sstrncpy(buffer, str, offset);
}
/* count_parts returns the number of elements a "foo.bar.baz" style key has. */
*/
static int parse_keys(char *buffer, size_t buffer_size, const char *key_str) {
char tmp[2 * buffer_size];
+ size_t tmp_size = sizeof(tmp);
+ const char *cut_suffixes[] = {".type", ".avgcount", ".sum", ".avgtime"};
if (buffer == NULL || buffer_size == 0 || key_str == NULL ||
strlen(key_str) == 0)
return EINVAL;
- if ((count_parts(key_str) > 2) && has_suffix(key_str, ".type")) {
- /* strip ".type" suffix iff the key has more than two parts. */
- size_t sz = strlen(key_str) - strlen(".type") + 1;
+ sstrncpy(tmp, key_str, tmp_size);
- if (sz > sizeof(tmp))
- sz = sizeof(tmp);
- sstrncpy(tmp, key_str, sz);
- } else {
- sstrncpy(tmp, key_str, sizeof(tmp));
+ /* Strip suffix if it is ".type" or one of latency metric suffix. */
+ if (count_parts(key_str) > 2) {
+ for (size_t i = 0; i < STATIC_ARRAY_SIZE(cut_suffixes); i++) {
+ if (has_suffix(key_str, cut_suffixes[i])) {
+ cut_suffix(tmp, tmp_size, key_str, cut_suffixes[i]);
+ break;
+ }
+ }
}
return compact_ds_name(buffer, buffer_size, tmp);
if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
WARNING("ceph plugin: `Daemon' blocks need exactly one string "
"argument.");
- return (-1);
+ return -1;
}
ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
switch (type) {
case DSET_LATENCY:
- if (vtmp->avgcount_exists == -1) {
+ if (has_suffix(key, ".avgcount")) {
sscanf(val, "%" PRIu64, &vtmp->avgcount);
- vtmp->avgcount_exists = 0;
// return after saving avgcount - don't dispatch value
// until latency calculation
return 0;
- } else {
- double sum, result;
- sscanf(val, "%lf", &sum);
-
+ } else if (has_suffix(key, ".sum")) {
if (vtmp->avgcount == 0) {
vtmp->avgcount = 1;
}
-
- /** User wants latency values as long run avg */
+ // user wants latency values as long run avg
+ // skip this step
if (long_run_latency_avg) {
- result = (sum / vtmp->avgcount);
- } else {
- result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum,
- vtmp->avgcount);
- if (result == -ENOMEM) {
- return -ENOMEM;
- }
+ return 0;
}
+ double sum, result;
+ sscanf(val, "%lf", &sum);
+ result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum,
+ vtmp->avgcount);
+ if (result == -ENOMEM) {
+ return -ENOMEM;
+ }
+ uv.gauge = result;
+ vtmp->latency_index = (vtmp->latency_index + 1);
+ } else if (has_suffix(key, ".avgtime")) {
+
+ /* The "avgtime" metric reports ("sum" / "avgcount"), i.e. the average
+ * time per request since the start of the Ceph daemon. Report this only
+ * when the user has configured "long running average". Otherwise, use the
+ * rate of "sum" and "avgcount" to calculate the current latency.
+ */
+ if (!long_run_latency_avg) {
+ return 0;
+ }
+ double result;
+ sscanf(val, "%lf", &result);
uv.gauge = result;
- vtmp->avgcount_exists = -1;
vtmp->latency_index = (vtmp->latency_index + 1);
+ } else {
+ WARNING("ceph plugin: ignoring unknown latency metric: %s", key);
+ return 0;
}
break;
case DSET_BYTES:
static int cconn_process_data(struct cconn *io, yajl_struct *yajl,
yajl_handle hand) {
int ret;
- struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
+ struct values_tmp *vtmp = calloc(1, sizeof(*vtmp));
if (!vtmp) {
return -ENOMEM;
}
sizeof(vtmp->vlist.plugin_instance));
vtmp->d = io->d;
- vtmp->avgcount_exists = -1;
vtmp->latency_index = 0;
vtmp->index = 0;
yajl->handler_arg = vtmp;
}
/** Handle a network event for a connection */
-static int cconn_handle_event(struct cconn *io) {
- int ret;
+static ssize_t cconn_handle_event(struct cconn *io) {
+ ssize_t ret;
switch (io->state) {
case CSTATE_UNCONNECTED:
ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
size_t cmd_len = strlen(cmd);
RETRY_ON_EINTR(
ret, write(io->asok, ((char *)&cmd) + io->amt, cmd_len - io->amt));
- DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%zd)",
io->d->name, io->state, io->amt, ret);
if (ret < 0) {
return ret;
case CSTATE_READ_VERSION: {
RETRY_ON_EINTR(ret, read(io->asok, ((char *)(&io->d->version)) + io->amt,
sizeof(io->d->version) - io->amt));
- DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)",
io->d->name, io->state, ret);
if (ret < 0) {
return ret;
case CSTATE_READ_AMT: {
RETRY_ON_EINTR(ret, read(io->asok, ((char *)(&io->json_len)) + io->amt,
sizeof(io->json_len) - io->amt));
- DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)",
io->d->name, io->state, ret);
if (ret < 0) {
return ret;
case CSTATE_READ_JSON: {
RETRY_ON_EINTR(ret,
read(io->asok, io->json + io->amt, io->json_len - io->amt));
- DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
+ DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)",
io->d->name, io->state, ret);
if (ret < 0) {
return ret;
*/
static int milli_diff(const struct timeval *t1, const struct timeval *t2) {
int64_t ret;
- int sec_diff = t1->tv_sec - t2->tv_sec;
- int usec_diff = t1->tv_usec - t2->tv_usec;
+ long sec_diff = t1->tv_sec - t2->tv_sec;
+ long usec_diff = t1->tv_usec - t2->tv_usec;
ret = usec_diff / 1000;
ret += (sec_diff * 1000);
return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret);
/** This handles the actual network I/O to talk to the Ceph daemons.
*/
-static int cconn_main_loop(uint32_t request_type) {
- int ret, some_unreachable = 0;
+static ssize_t cconn_main_loop(uint32_t request_type) {
+ int some_unreachable = 0;
+ ssize_t ret;
struct timeval end_tv;
struct cconn io_array[g_num_daemons];
struct cconn *io = io_array + i;
ret = cconn_prepare(io, fds + nfds);
if (ret < 0) {
- WARNING("ceph plugin: cconn_prepare(name=%s,i=%zu,st=%d)=%d",
+ WARNING("ceph plugin: cconn_prepare(name=%s,i=%" PRIsz ",st=%d)=%zd",
io->d->name, i, io->state, ret);
cconn_close(io);
io->request_type = ASOK_REQ_NONE;
}
RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
if (ret < 0) {
- ERROR("ceph plugin: poll(2) error: %d", ret);
+ ERROR("ceph plugin: poll(2) error: %zd", ret);
goto done;
}
for (int i = 0; i < nfds; ++i) {
ret = cconn_handle_event(io);
if (ret) {
WARNING("ceph plugin: cconn_handle_event(name=%s,"
- "i=%d,st=%d): error %d",
+ "i=%d,st=%d): error %zd",
io->d->name, i, io->state, ret);
cconn_close(io);
io->request_type = ASOK_REQ_NONE;
return ret;
}
-static int ceph_read(void) { return cconn_main_loop(ASOK_REQ_DATA); }
+static int ceph_read(void) { return (int)cconn_main_loop(ASOK_REQ_DATA); }
/******* lifecycle *******/
static int ceph_init(void) {
return ENOENT;
}
- return cconn_main_loop(ASOK_REQ_VERSION);
+ return (int)cconn_main_loop(ASOK_REQ_VERSION);
}
static int ceph_shutdown(void) {