X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fceph.c;h=9c53a3e76590e1ba490c83e24c26551815c935cf;hb=55ef2778ade050e2924eeb027c98feea103f1585;hp=3ce9c88f08e2317138e1ca7271b762b819aa528f;hpb=4c30f47794046ec7c8dfb5e1d41d03633032bac7;p=collectd.git diff --git a/src/ceph.c b/src/ceph.c index 3ce9c88f..9c53a3e7 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -135,11 +135,10 @@ struct yajl_struct { node_handler_t handler; void * handler_arg; - struct { - char key[DATA_MAX_NAME_LEN]; - int key_len; - } state[YAJL_MAX_DEPTH]; - int depth; + + char *key; + char *stack[YAJL_MAX_DEPTH]; + size_t depth; }; typedef struct yajl_struct yajl_struct; @@ -258,68 +257,81 @@ static int ceph_cb_boolean(void *ctx, int bool_val) return CEPH_CB_CONTINUE; } +#define BUFFER_ADD(dest, src) do { \ + size_t dest_size = sizeof (dest); \ + strncat ((dest), (src), dest_size - strlen (dest)); \ + (dest)[dest_size - 1] = 0; \ +} while (0) + static int ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; char buffer[number_len+1]; - int i, latency_type = 0, result; - char key[128]; + char key[2 * DATA_MAX_NAME_LEN]; + _Bool latency_type = 0; + size_t i; + int status; memcpy(buffer, number_val, number_len); buffer[sizeof(buffer) - 1] = 0; - ssnprintf(key, yajl->state[0].key_len, "%s", yajl->state[0].key); - for(i = 1; i < yajl->depth; i++) + memset (key, 0, sizeof (key)); + for (i = 0; i < state->depth; i++) + { + if (state->stack[i] == NULL) + continue; + + if (strlen (key) != 0) + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->stack[i]); + } + + /* Special case for latency metrics. */ + if ((strcmp ("avgcount", state->key) == 0) + || (strcmp ("sum", state->key) == 0)) { - if((i == yajl->depth-1) && ((strcmp(yajl->state[i].key,"avgcount") == 0) - || (strcmp(yajl->state[i].key,"sum") == 0))) + latency_type = 1; + + /* Super-special case for filestore.journal_wr_bytes.avgcount: For + * some reason, Ceph schema encodes this as a count/sum pair while all + * other "Bytes" data (excluding used/capacity bytes for OSD space) uses + * a single "Derive" type. To spare further confusion, keep this KPI as + * the same type of other "Bytes". Instead of keeping an "average" or + * "rate", use the "sum" in the pair and assign that to the derive + * value. */ + if (convert_special_metrics && (state->depth >= 2) + && (strcmp("filestore", state->stack[state->depth - 2]) == 0) + && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0) + && (strcmp("avgcount", state->key) == 0)) { - if(convert_special_metrics) - { - /** - * Special case for filestore:JournalWrBytes. For some reason, - * Ceph schema encodes this as a count/sum pair while all - * other "Bytes" data (excluding used/capacity bytes for OSD - * space) uses a single "Derive" type. To spare further - * confusion, keep this KPI as the same type of other "Bytes". - * Instead of keeping an "average" or "rate", use the "sum" in - * the pair and assign that to the derive value. - */ - if((strcmp(yajl->state[i-1].key, "journal_wr_bytes") == 0) && - (strcmp(yajl->state[i-2].key,"filestore") == 0) && - (strcmp(yajl->state[i].key,"avgcount") == 0)) - { - DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); - yajl->depth = (yajl->depth - 1); - return CEPH_CB_CONTINUE; - } - } - //probably a avgcount/sum pair. if not - we'll try full key later - latency_type = 1; - break; + DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); + return CEPH_CB_CONTINUE; } - strncat(key, ".", 1); - strncat(key, yajl->state[i].key, yajl->state[i].key_len+1); + } + else /* not a latency type */ + { + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->key); } - result = yajl->handler(yajl->handler_arg, buffer, key); - - if((result == RETRY_AVGCOUNT) && latency_type) + status = state->handler(state->handler_arg, buffer, key); + if((status == RETRY_AVGCOUNT) && latency_type) { - strncat(key, ".", 1); - strncat(key, yajl->state[yajl->depth-1].key, - yajl->state[yajl->depth-1].key_len+1); - result = yajl->handler(yajl->handler_arg, buffer, key); + /* Add previously skipped part of the key, either "avgcount" or "sum", + * and try again. */ + BUFFER_ADD (key, "."); + BUFFER_ADD (key, state->key); + + status = state->handler(state->handler_arg, buffer, key); } - if(result == -ENOMEM) + if (status != 0) { - ERROR("ceph plugin: memory allocation failed"); + ERROR("ceph plugin: JSON handler failed with status %d.", status); return CEPH_CB_ABORT; } - yajl->depth = (yajl->depth - 1); return CEPH_CB_CONTINUE; } @@ -331,37 +343,52 @@ static int ceph_cb_string(void *ctx, const unsigned char *string_val, static int ceph_cb_start_map(void *ctx) { + yajl_struct *state = (yajl_struct*) ctx; + + /* Push key to the stack */ + if (state->depth == YAJL_MAX_DEPTH) + return CEPH_CB_ABORT; + + state->stack[state->depth] = state->key; + state->depth++; + state->key = NULL; + return CEPH_CB_CONTINUE; } -static int -ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len) +static int ceph_cb_end_map(void *ctx) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; - if((yajl->depth+1) >= YAJL_MAX_DEPTH) - { - ERROR("ceph plugin: depth exceeds max, aborting."); + /* Pop key from the stack */ + if (state->depth == 0) return CEPH_CB_ABORT; - } - - char buffer[string_len+1]; - - memcpy(buffer, key, string_len); - buffer[sizeof(buffer) - 1] = 0; - snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer); - yajl->state[yajl->depth].key_len = sizeof(buffer); - yajl->depth = (yajl->depth + 1); + sfree (state->key); + state->depth--; + state->key = state->stack[state->depth]; + state->stack[state->depth] = NULL; return CEPH_CB_CONTINUE; } -static int ceph_cb_end_map(void *ctx) +static int +ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len) { - yajl_struct *yajl = (yajl_struct*)ctx; + yajl_struct *state = (yajl_struct*) ctx; + size_t sz = ((size_t) string_len) + 1; + + sfree (state->key); + state->key = malloc (sz); + if (state->key == NULL) + { + ERROR ("ceph plugin: malloc failed."); + return CEPH_CB_ABORT; + } + + memmove (state->key, key, sz - 1); + state->key[sz - 1] = 0; - yajl->depth = (yajl->depth - 1); return CEPH_CB_CONTINUE; } @@ -1070,7 +1097,7 @@ static int cconn_connect(struct cconn *io) fd = socket(PF_UNIX, SOCK_STREAM, 0); if(fd < 0) { - int err = -errno; + err = -errno; ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) " "failed: error %d", err); return err; @@ -1519,7 +1546,7 @@ static int cconn_main_loop(uint32_t request_type) } else { - int ret = cconn_handle_event(io); + ret = cconn_handle_event(io); if(ret) { WARNING("ceph plugin: cconn_handle_event(name=%s," @@ -1583,3 +1610,4 @@ void module_register(void) plugin_register_read("ceph", ceph_read); plugin_register_shutdown("ceph", ceph_shutdown); } +/* vim: set sw=4 sts=4 et : */