X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fceph.c;h=19a09d861f308c1097b2e3dc9d4e4905f490b622;hb=c4439c9cb3e2348ad7013644731de27a55eca478;hp=c2284cb3baa70d26ad8f7d4a9e09a031158a1929;hpb=1ecde57a53fc7c50e6b0139ad745b868413d420c;p=collectd.git diff --git a/src/ceph.c b/src/ceph.c index c2284cb3..19a09d86 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -28,8 +28,8 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" #include #include @@ -148,7 +148,7 @@ enum perfcounter_type_d { }; /** Give user option to use default (long run = since daemon started) avg */ -static int long_run_latency_avg = 0; +static int long_run_latency_avg; /** * Give user option to use default type for special cases - @@ -161,10 +161,10 @@ static int long_run_latency_avg = 0; static int convert_special_metrics = 1; /** Array of daemons to monitor */ -static struct ceph_daemon **g_daemons = NULL; +static struct ceph_daemon **g_daemons; /** Number of elements in g_daemons */ -static size_t g_num_daemons = 0; +static size_t g_num_daemons; /** * A set of data that we build up in memory while parsing the JSON. @@ -176,8 +176,6 @@ struct values_tmp { uint64_t avgcount; /** current index of counters - used to get type of counter */ int index; - /** do we already have an avgcount for latency pair */ - int avgcount_exists; /** * similar to index, but current index of latency type counters - * used to get last poll data of counter @@ -253,7 +251,7 @@ static int ceph_cb_boolean(void *ctx, int bool_val) { return CEPH_CB_CONTINUE; } if (dest_size > dest_len) { \ sstrncpy((dest) + dest_len, (src), dest_size - dest_len); \ } \ - (dest)[dest_size - 1] = 0; \ + (dest)[dest_size - 1] = '\0'; \ } while (0) static int ceph_cb_number(void *ctx, const char *number_val, @@ -261,7 +259,6 @@ static int ceph_cb_number(void *ctx, const char *number_val, yajl_struct *state = (yajl_struct *)ctx; char buffer[number_len + 1]; char key[2 * DATA_MAX_NAME_LEN] = {0}; - _Bool latency_type = 0; int status; memcpy(buffer, number_val, number_len); @@ -276,44 +273,27 @@ static int ceph_cb_number(void *ctx, const char *number_val, BUFFER_ADD(key, state->stack[i]); } - /* Special case for latency metrics. */ - if ((strcmp("avgcount", state->key) == 0) || - (strcmp("sum", state->key) == 0)) { - latency_type = 1; - - /* depth >= 2 => (stack[-1] != NULL && stack[-2] != NULL) */ - assert((state->depth < 2) || ((state->stack[state->depth - 1] != NULL) && - (state->stack[state->depth - 2] != NULL))); - - /* Super-special case for filestore.journal_wr_bytes.avgcount: For - * some reason, Ceph schema encodes this as a count/sum pair while all - * other "Bytes" data (excluding used/capacity bytes for OSD space) uses - * a single "Derive" type. To spare further confusion, keep this KPI as - * the same type of other "Bytes". Instead of keeping an "average" or - * "rate", use the "sum" in the pair and assign that to the derive - * value. */ - if (convert_special_metrics && (state->depth >= 2) && - (strcmp("filestore", state->stack[state->depth - 2]) == 0) && - (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) && - (strcmp("avgcount", state->key) == 0)) { - DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); - return CEPH_CB_CONTINUE; - } - } else /* not a latency type */ - { - BUFFER_ADD(key, "."); - BUFFER_ADD(key, state->key); + /* Super-special case for filestore.journal_wr_bytes.avgcount: For + * some reason, Ceph schema encodes this as a count/sum pair while all + * other "Bytes" data (excluding used/capacity bytes for OSD space) uses + * a single "Derive" type. To spare further confusion, keep this KPI as + * the same type of other "Bytes". Instead of keeping an "average" or + * "rate", use the "sum" in the pair and assign that to the derive + * value. */ + if (convert_special_metrics && (state->depth > 2) && + state->stack[state->depth - 2] && + (strcmp("filestore", state->stack[state->depth - 2]) == 0) && + state->stack[state->depth - 1] && + (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) && + (strcmp("avgcount", state->key) == 0)) { + DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); + return CEPH_CB_CONTINUE; } - status = state->handler(state->handler_arg, buffer, key); - if ((status == RETRY_AVGCOUNT) && latency_type) { - /* Add previously skipped part of the key, either "avgcount" or "sum", - * and try again. */ - BUFFER_ADD(key, "."); - BUFFER_ADD(key, state->key); + BUFFER_ADD(key, "."); + BUFFER_ADD(key, state->key); - status = state->handler(state->handler_arg, buffer, key); - } + status = state->handler(state->handler_arg, buffer, key); if (status != 0) { ERROR("ceph plugin: JSON handler failed with status %d.", status); @@ -370,7 +350,7 @@ static int ceph_cb_map_key(void *ctx, const unsigned char *key, } memmove(state->key, key, sz - 1); - state->key[sz - 1] = 0; + state->key[sz - 1] = '\0'; return CEPH_CB_CONTINUE; } @@ -418,7 +398,7 @@ static void ceph_daemon_free(struct ceph_daemon *d) { } /* compact_ds_name removed the special characters ":", "_", "-" and "+" from the - * intput string. Characters following these special characters are capitalized. + * input string. Characters following these special characters are capitalized. * Trailing "+" and "-" characters are replaces with the strings "Plus" and * "Minus". */ static int compact_ds_name(char *buffer, size_t buffer_size, char const *src) { @@ -426,8 +406,8 @@ static int compact_ds_name(char *buffer, size_t buffer_size, char const *src) { size_t src_len; char *ptr = buffer; size_t ptr_size = buffer_size; - _Bool append_plus = 0; - _Bool append_minus = 0; + bool append_plus = false; + bool append_minus = false; if ((buffer == NULL) || (buffer_size <= strlen("Minus")) || (src == NULL)) return EINVAL; @@ -437,11 +417,11 @@ static int compact_ds_name(char *buffer, size_t buffer_size, char const *src) { /* Remove trailing "+" and "-". */ if (src_copy[src_len - 1] == '+') { - append_plus = 1; + append_plus = true; src_len--; src_copy[src_len] = 0; } else if (src_copy[src_len - 1] == '-') { - append_minus = 1; + append_minus = true; src_len--; src_copy[src_len] = 0; } @@ -492,19 +472,34 @@ static int compact_ds_name(char *buffer, size_t buffer_size, char const *src) { return 0; } -static _Bool has_suffix(char const *str, char const *suffix) { +static bool has_suffix(char const *str, char const *suffix) { size_t str_len = strlen(str); size_t suffix_len = strlen(suffix); size_t offset; if (suffix_len > str_len) - return 0; + return false; offset = str_len - suffix_len; if (strcmp(str + offset, suffix) == 0) - return 1; + return true; - return 0; + return false; +} + +static void cut_suffix(char *buffer, size_t buffer_size, char const *str, + char const *suffix) { + + size_t str_len = strlen(str); + size_t suffix_len = strlen(suffix); + + size_t offset = str_len - suffix_len + 1; + + if (offset > buffer_size) { + offset = buffer_size; + } + + sstrncpy(buffer, str, offset); } /* count_parts returns the number of elements a "foo.bar.baz" style key has. */ @@ -522,20 +517,23 @@ static size_t count_parts(char const *key) { */ static int parse_keys(char *buffer, size_t buffer_size, const char *key_str) { char tmp[2 * buffer_size]; + size_t tmp_size = sizeof(tmp); + const char *cut_suffixes[] = {".type", ".avgcount", ".sum", ".avgtime"}; if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen(key_str) == 0) return EINVAL; - if ((count_parts(key_str) > 2) && has_suffix(key_str, ".type")) { - /* strip ".type" suffix iff the key has more than two parts. */ - size_t sz = strlen(key_str) - strlen(".type") + 1; + sstrncpy(tmp, key_str, tmp_size); - if (sz > sizeof(tmp)) - sz = sizeof(tmp); - sstrncpy(tmp, key_str, sz); - } else { - sstrncpy(tmp, key_str, sizeof(tmp)); + /* Strip suffix if it is ".type" or one of latency metric suffix. */ + if (count_parts(key_str) > 2) { + for (size_t i = 0; i < STATIC_ARRAY_SIZE(cut_suffixes); i++) { + if (has_suffix(key_str, cut_suffixes[i])) { + cut_suffix(tmp, tmp_size, key_str, cut_suffixes[i]); + break; + } + } } return compact_ds_name(buffer, buffer_size, tmp); @@ -907,34 +905,47 @@ static int node_handler_fetch_data(void *arg, const char *val, switch (type) { case DSET_LATENCY: - if (vtmp->avgcount_exists == -1) { + if (has_suffix(key, ".avgcount")) { sscanf(val, "%" PRIu64, &vtmp->avgcount); - vtmp->avgcount_exists = 0; // return after saving avgcount - don't dispatch value // until latency calculation return 0; - } else { - double sum, result; - sscanf(val, "%lf", &sum); - + } else if (has_suffix(key, ".sum")) { if (vtmp->avgcount == 0) { vtmp->avgcount = 1; } - - /** User wants latency values as long run avg */ + // user wants latency values as long run avg + // skip this step if (long_run_latency_avg) { - result = (sum / vtmp->avgcount); - } else { - result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, - vtmp->avgcount); - if (result == -ENOMEM) { - return -ENOMEM; - } + return 0; } + double sum, result; + sscanf(val, "%lf", &sum); + result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, + vtmp->avgcount); + if (result == -ENOMEM) { + return -ENOMEM; + } + uv.gauge = result; + vtmp->latency_index = (vtmp->latency_index + 1); + } else if (has_suffix(key, ".avgtime")) { + + /* The "avgtime" metric reports ("sum" / "avgcount"), i.e. the average + * time per request since the start of the Ceph daemon. Report this only + * when the user has configured "long running average". Otherwise, use the + * rate of "sum" and "avgcount" to calculate the current latency. + */ + if (!long_run_latency_avg) { + return 0; + } + double result; + sscanf(val, "%lf", &result); uv.gauge = result; - vtmp->avgcount_exists = -1; vtmp->latency_index = (vtmp->latency_index + 1); + } else { + WARNING("ceph plugin: ignoring unknown latency metric: %s", key); + return 0; } break; case DSET_BYTES: @@ -1021,7 +1032,7 @@ static void cconn_close(struct cconn *io) { static int cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand) { int ret; - struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1); + struct values_tmp *vtmp = calloc(1, sizeof(*vtmp)); if (!vtmp) { return -ENOMEM; } @@ -1032,7 +1043,6 @@ static int cconn_process_data(struct cconn *io, yajl_struct *yajl, sizeof(vtmp->vlist.plugin_instance)); vtmp->d = io->d; - vtmp->avgcount_exists = -1; vtmp->latency_index = 0; vtmp->index = 0; yajl->handler_arg = vtmp; @@ -1132,8 +1142,8 @@ static int cconn_validate_revents(struct cconn *io, int revents) { } /** Handle a network event for a connection */ -static int cconn_handle_event(struct cconn *io) { - int ret; +static ssize_t cconn_handle_event(struct cconn *io) { + ssize_t ret; switch (io->state) { case CSTATE_UNCONNECTED: ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal " @@ -1148,7 +1158,7 @@ static int cconn_handle_event(struct cconn *io) { size_t cmd_len = strlen(cmd); RETRY_ON_EINTR( ret, write(io->asok, ((char *)&cmd) + io->amt, cmd_len - io->amt)); - DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%zd)", io->d->name, io->state, io->amt, ret); if (ret < 0) { return ret; @@ -1170,7 +1180,7 @@ static int cconn_handle_event(struct cconn *io) { case CSTATE_READ_VERSION: { RETRY_ON_EINTR(ret, read(io->asok, ((char *)(&io->d->version)) + io->amt, sizeof(io->d->version) - io->amt)); - DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)", io->d->name, io->state, ret); if (ret < 0) { return ret; @@ -1196,7 +1206,7 @@ static int cconn_handle_event(struct cconn *io) { case CSTATE_READ_AMT: { RETRY_ON_EINTR(ret, read(io->asok, ((char *)(&io->json_len)) + io->amt, sizeof(io->json_len) - io->amt)); - DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)", io->d->name, io->state, ret); if (ret < 0) { return ret; @@ -1217,7 +1227,7 @@ static int cconn_handle_event(struct cconn *io) { case CSTATE_READ_JSON: { RETRY_ON_EINTR(ret, read(io->asok, io->json + io->amt, io->json_len - io->amt)); - DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%zd)", io->d->name, io->state, ret); if (ret < 0) { return ret; @@ -1286,8 +1296,8 @@ static int cconn_prepare(struct cconn *io, struct pollfd *fds) { */ static int milli_diff(const struct timeval *t1, const struct timeval *t2) { int64_t ret; - int sec_diff = t1->tv_sec - t2->tv_sec; - int usec_diff = t1->tv_usec - t2->tv_usec; + long sec_diff = t1->tv_sec - t2->tv_sec; + long usec_diff = t1->tv_usec - t2->tv_usec; ret = usec_diff / 1000; ret += (sec_diff * 1000); return (ret > INT_MAX) ? INT_MAX : ((ret < INT_MIN) ? INT_MIN : (int)ret); @@ -1295,8 +1305,9 @@ static int milli_diff(const struct timeval *t1, const struct timeval *t2) { /** This handles the actual network I/O to talk to the Ceph daemons. */ -static int cconn_main_loop(uint32_t request_type) { - int ret, some_unreachable = 0; +static ssize_t cconn_main_loop(uint32_t request_type) { + int some_unreachable = 0; + ssize_t ret; struct timeval end_tv; struct cconn io_array[g_num_daemons]; @@ -1333,7 +1344,7 @@ static int cconn_main_loop(uint32_t request_type) { struct cconn *io = io_array + i; ret = cconn_prepare(io, fds + nfds); if (ret < 0) { - WARNING("ceph plugin: cconn_prepare(name=%s,i=%zu,st=%d)=%d", + WARNING("ceph plugin: cconn_prepare(name=%s,i=%" PRIsz ",st=%d)=%zd", io->d->name, i, io->state, ret); cconn_close(io); io->request_type = ASOK_REQ_NONE; @@ -1357,7 +1368,7 @@ static int cconn_main_loop(uint32_t request_type) { } RETRY_ON_EINTR(ret, poll(fds, nfds, diff)); if (ret < 0) { - ERROR("ceph plugin: poll(2) error: %d", ret); + ERROR("ceph plugin: poll(2) error: %zd", ret); goto done; } for (int i = 0; i < nfds; ++i) { @@ -1378,7 +1389,7 @@ static int cconn_main_loop(uint32_t request_type) { ret = cconn_handle_event(io); if (ret) { WARNING("ceph plugin: cconn_handle_event(name=%s," - "i=%d,st=%d): error %d", + "i=%d,st=%d): error %zd", io->d->name, i, io->state, ret); cconn_close(io); io->request_type = ASOK_REQ_NONE; @@ -1399,7 +1410,7 @@ done: return ret; } -static int ceph_read(void) { return cconn_main_loop(ASOK_REQ_DATA); } +static int ceph_read(void) { return (int)cconn_main_loop(ASOK_REQ_DATA); } /******* lifecycle *******/ static int ceph_init(void) { @@ -1426,7 +1437,7 @@ static int ceph_init(void) { return ENOENT; } - return cconn_main_loop(ASOK_REQ_VERSION); + return (int)cconn_main_loop(ASOK_REQ_VERSION); } static int ceph_shutdown(void) {