X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fceph.c;h=1b1f40b458c44f9eb573f54c74730bb0056d13e3;hb=0febfbd11d87acd8a0f10bbcf5d4a58bb68b8dc0;hp=5726e0103581647b3cd0a96908e5492437903ede;hpb=905fa2fb9768f848916c3bd6b9543d0d63845f70;p=collectd.git diff --git a/src/ceph.c b/src/ceph.c index 5726e010..1b1f40b4 100644 --- a/src/ceph.c +++ b/src/ceph.c @@ -50,8 +50,6 @@ #include #include -#define MAX_RRD_DS_NAME_LEN 20 - #define RETRY_AVGCOUNT -1 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1) @@ -84,6 +82,21 @@ typedef size_t yajl_len_t; typedef unsigned int yajl_len_t; #endif +/** Number of types for ceph defined in types.db */ +#define CEPH_DSET_TYPES_NUM 3 +/** ceph types enum */ +enum ceph_dset_type_d +{ + DSET_LATENCY = 0, + DSET_BYTES = 1, + DSET_RATE = 2, + DSET_TYPE_UNFOUND = 1000 +}; + +/** Valid types for ceph defined in types.db */ +const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] = + {"ceph_latency", "ceph_bytes", "ceph_rate"}; + /******* ceph_daemon *******/ struct ceph_daemon { @@ -92,19 +105,23 @@ struct ceph_daemon /** daemon name **/ char name[DATA_MAX_NAME_LEN]; - int dset_num; - /** Path to the socket that we use to talk to the ceph daemon */ char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX]; - /** The set of key/value pairs that this daemon reports - * dset.type The daemon name - * dset.ds_num Number of data sources (key/value pairs) - * dset.ds Dynamically allocated array of key/value pairs + /** Number of counters */ + int ds_num; + /** Track ds types */ + uint32_t *ds_types; + /** Track ds names to match with types */ + char **ds_names; + + /** + * Keep track of last data for latency values so we can calculate rate + * since last poll. */ - /** Dynamically allocated array **/ - struct data_set_s *dset; - int **pc_types; + struct last_data **last_poll_data; + /** index of last poll data */ + int last_idx; }; /******* JSON parsing *******/ @@ -123,13 +140,6 @@ struct yajl_struct }; typedef struct yajl_struct yajl_struct; -/** - * Keep track of last data for latency values so we can calculate rate - * since last poll. - */ -struct last_data **last_poll_data = NULL; -int last_idx = 0; - enum perfcounter_type_d { PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8, @@ -154,21 +164,29 @@ static struct ceph_daemon **g_daemons = NULL; /** Number of elements in g_daemons */ static int g_num_daemons = 0; -struct values_holder -{ - int values_len; - value_t *values; -}; - /** - * A set of values_t data that we build up in memory while parsing the JSON. + * A set of data that we build up in memory while parsing the JSON. */ struct values_tmp { + /** ceph daemon we are processing data for*/ struct ceph_daemon *d; - int holder_num; - struct values_holder vh[0]; + /** track avgcount across counters for avgcount/sum latency pairs */ uint64_t avgcount; + /** current index of counters - used to get type of counter */ + int index; + /** do we already have an avgcount for latency pair */ + int avgcount_exists; + /** + * similar to index, but current index of latency type counters - + * used to get last poll data of counter + */ + int latency_index; + /** + * values list - maintain across counters since + * host/plugin/plugin instance are always the same + */ + value_list_t vlist; }; /** @@ -177,13 +195,11 @@ struct values_tmp */ struct last_data { - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; + char ds_name[DATA_MAX_NAME_LEN]; double last_sum; uint64_t last_count; }; - /******* network I/O *******/ enum cstate_t { @@ -239,7 +255,7 @@ static int ceph_cb_boolean(void *ctx, int bool_val) return CEPH_CB_CONTINUE; } -static int +static int ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) { yajl_struct *yajl = (yajl_struct*)ctx; @@ -271,7 +287,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) (strcmp(yajl->state[i-2].key,"filestore") == 0) && (strcmp(yajl->state[i].key,"avgcount") == 0)) { - DEBUG("Skipping avgcount for filestore.JournalWrBytes"); + DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes"); yajl->depth = (yajl->depth - 1); return CEPH_CB_CONTINUE; } @@ -304,7 +320,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len) return CEPH_CB_CONTINUE; } -static int ceph_cb_string(void *ctx, const unsigned char *string_val, +static int ceph_cb_string(void *ctx, const unsigned char *string_val, yajl_len_t string_len) { return CEPH_CB_CONTINUE; @@ -372,7 +388,7 @@ static yajl_callbacks callbacks = { static void ceph_daemon_print(const struct ceph_daemon *d) { - DEBUG("name=%s, asok_path=%s", d->name, d->asok_path); + DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path); } static void ceph_daemons_print(void) @@ -387,17 +403,26 @@ static void ceph_daemons_print(void) static void ceph_daemon_free(struct ceph_daemon *d) { int i = 0; - for(; i < d->dset_num; i++) + for(; i < d->last_idx; i++) + { + sfree(d->last_poll_data[i]); + } + sfree(d->last_poll_data); + d->last_poll_data = NULL; + d->last_idx = 0; + for(i = 0; i < d->ds_num; i++) { - plugin_unregister_data_set((d->dset + i)->type); - sfree(d->dset->ds); - sfree(d->pc_types[i]); + sfree(d->ds_names[i]); } - sfree(d->dset); - sfree(d->pc_types); + sfree(d->ds_types); + sfree(d->ds_names); sfree(d); } +/** + * Compact ds name by removing special characters and trimming length to + * DATA_MAX_NAME_LEN if necessary + */ static void compact_ds_name(char *source, char *dest) { int keys_num = 0, i; @@ -435,12 +460,13 @@ static void compact_ds_name(char *source, char *dest) strncat(tmp, keys[i], key_chars_remaining); key_chars_remaining -= strlen(keys[i]); } - /** to coordinate limitation of length of ds name from RRD + tmp[DATA_MAX_NAME_LEN - 1] = '\0'; + /** to coordinate limitation of length of type_instance * we will truncate ds_name * when the its length is more than - * MAX_RRD_DS_NAME_LEN + * DATA_MAX_NAME_LEN */ - if(strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1) + if(strlen(tmp) > DATA_MAX_NAME_LEN - 1) { append_status |= 0x4; /** we should reserve space for @@ -462,7 +488,7 @@ static void compact_ds_name(char *source, char *dest) */ reserved += 4; } - snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp); + snprintf(dest, DATA_MAX_NAME_LEN - reserved, "%s", tmp); offset = strlen(dest); switch (append_status) { @@ -487,146 +513,58 @@ static void compact_ds_name(char *source, char *dest) break; } } -static int parse_keys(const char *key_str, char *dset_name, char *ds_name) + +/** + * Parse key to remove "type" if this is for schema and initiate compaction + */ +static int parse_keys(const char *key_str, char *ds_name) { char *ptr, *rptr; - size_t dset_name_len = 0; size_t ds_name_len = 0; - char tmp_ds_name[DATA_MAX_NAME_LEN]; + /** + * allow up to 100 characters before compaction - compact_ds_name will not + * allow more than DATA_MAX_NAME_LEN chars + */ + int max_str_len = 100; + char tmp_ds_name[max_str_len]; memset(tmp_ds_name, 0, sizeof(tmp_ds_name)); - if(dset_name == NULL || ds_name == NULL || key_str == NULL || - key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0') + if(ds_name == NULL || key_str == NULL || key_str[0] == '\0' || + ds_name[0] != '\0') { return -1; } if((ptr = strchr(key_str, '.')) == NULL || (rptr = strrchr(key_str, '.')) == NULL) { - strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1); - strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1); + memcpy(tmp_ds_name, key_str, max_str_len - 1); goto compact; } - dset_name_len = - (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ? - (DATA_MAX_NAME_LEN - 1) : (ptr - key_str); - memcpy(dset_name, key_str, dset_name_len); - ds_name_len = - (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr); - if(ds_name_len == 0) - { /** only have two keys **/ - if(!strncmp(rptr + 1, "type", 4)) - {/** if last key is "type",ignore **/ - strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1); - } - else - {/** if last key isn't "type", copy last key **/ - strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1); - } - } - else if(!strncmp(rptr + 1, "type", 4)) - {/** more than two keys **/ - memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1); + + ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr); + if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4)) + { /** copy whole key **/ + memcpy(tmp_ds_name, key_str, max_str_len - 1); } else - {/** copy whole keys **/ - strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1); + {/** more than two keys **/ + memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ? + (max_str_len - 1) : (rptr - key_str))); } + compact: compact_ds_name(tmp_ds_name, ds_name); return 0; } -static int get_matching_dset(const struct ceph_daemon *d, const char *name) -{ - int idx; - for(idx = 0; idx < d->dset_num; ++idx) - { - if(strcmp(d->dset[idx].type, name) == 0) - { - return idx; - } - } - return -1; -} - -static int get_matching_value(const struct data_set_s *dset, const char *name, - int num_values) -{ - int idx; - for(idx = 0; idx < num_values; ++idx) - { - if(strcmp(dset->ds[idx].name, name) == 0) - { - return idx; - } - } - return -1; -} - +/** + * while parsing ceph admin socket schema, save counter name and type for later + * data processing + */ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, int pc_type) { - struct data_source_s *ds; - struct data_set_s *dset; - struct data_set_s *dset_array; - int **pc_types_array = NULL; - int *pc_types; - int *pc_types_new; - int idx = 0; - if(strlen(name) + 1 > DATA_MAX_NAME_LEN) - { - return -ENAMETOOLONG; - } - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; - memset(dset_name, 0, sizeof(dset_name)); + uint32_t type; + char ds_name[DATA_MAX_NAME_LEN]; memset(ds_name, 0, sizeof(ds_name)); - if(parse_keys(name, dset_name, ds_name)) - { - return 1; - } - idx = get_matching_dset(d, dset_name); - if(idx == -1) - {/* need to add a dset **/ - dset_array = realloc(d->dset, - sizeof(struct data_set_s) * (d->dset_num + 1)); - if(!dset_array) - { - return -ENOMEM; - } - pc_types_array = realloc(d->pc_types, - sizeof(int *) * (d->dset_num + 1)); - if(!pc_types_array) - { - return -ENOMEM; - } - dset = &dset_array[d->dset_num]; - /** this step is very important, otherwise, - * realloc for dset->ds will tricky because of - * a random addr in dset->ds - */ - memset(dset, 0, sizeof(struct data_set_s)); - dset->ds_num = 0; - snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name); - pc_types = pc_types_array[d->dset_num] = NULL; - d->dset = dset_array; - } - else - { - dset = &d->dset[idx]; - pc_types = d->pc_types[idx]; - } - struct data_source_s *ds_array = realloc(dset->ds, - sizeof(struct data_source_s) * (dset->ds_num + 1)); - if(!ds_array) - { - return -ENOMEM; - } - pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1)); - if(!pc_types_new) - { - return -ENOMEM; - } - dset->ds = ds_array; if(convert_special_metrics) { @@ -638,35 +576,42 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name, * other "Bytes". Instead of keeping an "average" or "rate", use the * "sum" in the pair and assign that to the derive value. */ - if((strcmp(dset_name,"filestore") == 0) && - strcmp(ds_name, "JournalWrBytes") == 0) + if((strcmp(name,"filestore.journal_wr_bytes.type") == 0)) { pc_type = 10; } } - if(idx == -1) + d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1)); + if(!d->ds_names) { - pc_types_array[d->dset_num] = pc_types_new; - d->pc_types = pc_types_array; - d->pc_types[d->dset_num][dset->ds_num] = pc_type; - d->dset_num++; + return -ENOMEM; } - else + + d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1)); + if(!d->ds_types) { - d->pc_types[idx] = pc_types_new; - d->pc_types[idx][dset->ds_num] = pc_type; + return -ENOMEM; } - ds = &ds_array[dset->ds_num++]; - snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name); - ds->type = (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE; - - /** - * Use min of 0 for DERIVE types so we don't get negative values on Ceph - * service restart - */ - ds->min = (ds->type == DS_TYPE_DERIVE) ? 0 : NAN; - ds->max = NAN; + + d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN); + if(!d->ds_names[d->ds_num]) + { + return -ENOMEM; + } + + type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE : + ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES); + d->ds_types[d->ds_num] = type; + + if(parse_keys(name, ds_name)) + { + return 1; + } + + sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1); + d->ds_num = (d->ds_num + 1); + return 0; } @@ -762,6 +707,7 @@ static int cc_add_daemon_config(oconfig_item_t *ci) "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path); return -EINVAL; } + array = realloc(g_daemons, sizeof(struct ceph_daemon *) * (g_num_daemons + 1)); if(array == NULL) @@ -826,6 +772,9 @@ static int ceph_config(oconfig_item_t *ci) return 0; } +/** + * Parse JSON and get error message if present + */ static int traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand) { @@ -848,90 +797,119 @@ traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand) } } +/** + * Add entry for each counter while parsing schema + */ static int node_handler_define_schema(void *arg, const char *val, const char *key) { struct ceph_daemon *d = (struct ceph_daemon *) arg; int pc_type; pc_type = atoi(val); - DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)", - d->name, key, pc_type); return ceph_daemon_add_ds_entry(d, key, pc_type); } -static int add_last(const char *dset_n, const char *ds_n, double cur_sum, +/** + * Latency counter does not yet have an entry in last poll data - add it. + */ +static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum, uint64_t cur_count) { - last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data)); - if(!last_poll_data[last_idx]) + d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data)); + if(!d->last_poll_data[d->last_idx]) { return -ENOMEM; } - sstrncpy(last_poll_data[last_idx]->dset_name,dset_n, - sizeof(last_poll_data[last_idx]->dset_name)); - sstrncpy(last_poll_data[last_idx]->ds_name,ds_n, - sizeof(last_poll_data[last_idx]->ds_name)); - last_poll_data[last_idx]->last_sum = cur_sum; - last_poll_data[last_idx]->last_count = cur_count; - last_idx++; + sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n, + sizeof(d->last_poll_data[d->last_idx]->ds_name)); + d->last_poll_data[d->last_idx]->last_sum = cur_sum; + d->last_poll_data[d->last_idx]->last_count = cur_count; + d->last_idx = (d->last_idx + 1); return 0; } -static int update_last(const char *dset_n, const char *ds_n, double cur_sum, - uint64_t cur_count) +/** + * Update latency counter or add new entry if it doesn't exist + */ +static int update_last(struct ceph_daemon *d, const char *ds_n, int index, + double cur_sum, uint64_t cur_count) { - int i; - for(i = 0; i < last_idx; i++) + if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)) { - if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0 && - (strcmp(last_poll_data[i]->ds_name,ds_n) == 0)) - { - last_poll_data[i]->last_sum = cur_sum; - last_poll_data[i]->last_count = cur_count; - return 0; - } + d->last_poll_data[index]->last_sum = cur_sum; + d->last_poll_data[index]->last_count = cur_count; + return 0; } - if(!last_poll_data) + if(!d->last_poll_data) { - last_poll_data = malloc(1 * sizeof(struct last_data *)); - if(!last_poll_data) + d->last_poll_data = malloc(1 * sizeof(struct last_data *)); + if(!d->last_poll_data) { return -ENOMEM; } } else { - struct last_data **tmp_last = realloc(last_poll_data, - ((last_idx+1) * sizeof(struct last_data *))); + struct last_data **tmp_last = realloc(d->last_poll_data, + ((d->last_idx+1) * sizeof(struct last_data *))); if(!tmp_last) { return -ENOMEM; } - last_poll_data = tmp_last; + d->last_poll_data = tmp_last; + } + return add_last(d, ds_n, cur_sum, cur_count); +} + +/** + * If using index guess failed (shouldn't happen, but possible if counters + * get rearranged), resort to searching for counter name + */ +static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n) +{ + int i = 0; + for(; i < d->last_idx; i++) + { + if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0) + { + return i; + } } - return add_last(dset_n,ds_n,cur_sum,cur_count); + return -1; } -static double get_last_avg(const char *dset_n, const char *ds_n, +/** + * Calculate average b/t current data and last poll data + * if last poll data exists + */ +static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index, double cur_sum, uint64_t cur_count) { - int i; double result = -1.1, sum_delt = 0.0; uint64_t count_delt = 0; - for(i = 0; i < last_idx; i++) + int tmp_index = 0; + if(d->last_idx > index) { - if((strcmp(last_poll_data[i]->dset_name,dset_n) == 0) && - (strcmp(last_poll_data[i]->ds_name,ds_n) == 0)) + if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0) { - if(cur_count < last_poll_data[i]->last_count) - { - break; - } - sum_delt = (cur_sum - last_poll_data[i]->last_sum); - count_delt = (cur_count - last_poll_data[i]->last_count); + tmp_index = index; + } + //test previous index + else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0)) + { + tmp_index = (index - 1); + } + else + { + tmp_index = backup_search_for_last_avg(d, ds_n); + } + + if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count)) + { + sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum); + count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count); result = (sum_delt / count_delt); - break; } } @@ -939,95 +917,140 @@ static double get_last_avg(const char *dset_n, const char *ds_n, { result = NAN; } - if(update_last(dset_n,ds_n,cur_sum,cur_count) == -ENOMEM) + if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM) { return -ENOMEM; } return result; } +/** + * If using index guess failed, resort to searching for counter name + */ +static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name) +{ + int idx = 0; + for(; idx < d->ds_num; idx++) + { + if(strcmp(d->ds_names[idx], ds_name) == 0) + { + return d->ds_types[idx]; + } + } + return DSET_TYPE_UNFOUND; +} + +/** + * Process counter data and dispatch values + */ static int node_handler_fetch_data(void *arg, const char *val, const char *key) { - int dset_idx, ds_idx; - value_t *uv; - char dset_name[DATA_MAX_NAME_LEN]; - char ds_name[MAX_RRD_DS_NAME_LEN]; + value_t uv; + double tmp_d; + uint64_t tmp_u; struct values_tmp *vtmp = (struct values_tmp*) arg; - memset(dset_name, 0, sizeof(dset_name)); + uint32_t type = DSET_TYPE_UNFOUND; + int index = vtmp->index; + + char ds_name[DATA_MAX_NAME_LEN]; memset(ds_name, 0, sizeof(ds_name)); - if(parse_keys(key, dset_name, ds_name)) + + if(parse_keys(key, ds_name)) { - DEBUG("enter node_handler_fetch_data"); return 1; } - dset_idx = get_matching_dset(vtmp->d, dset_name); - if(dset_idx == -1) + + if(index >= vtmp->d->ds_num) { - return 1; + //don't overflow bounds of array + index = (vtmp->d->ds_num - 1); } - ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name, - vtmp->d->dset[dset_idx].ds_num); - if(ds_idx == -1) + + /** + * counters should remain in same order we parsed schema... we maintain the + * index variable to keep track of current point in list of counters. first + * use index to guess point in array for retrieving type. if that doesn't + * work, use the old way to get the counter type + */ + if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0) { - DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d", - dset_name,ds_name,dset_idx,ds_idx); - return RETRY_AVGCOUNT; + //found match + type = vtmp->d->ds_types[index]; } - uv = &(vtmp->vh[dset_idx].values[ds_idx]); - - if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY) + else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0)) { - if(vtmp->avgcount == -1) - { - sscanf(val, "%" PRIu64, &vtmp->avgcount); - } - else - { - double sum, result; - sscanf(val, "%lf", &sum); - DEBUG("avgcount:%ld",vtmp->avgcount); - DEBUG("sum:%lf",sum); + //try previous key + type = vtmp->d->ds_types[index-1]; + } - if(vtmp->avgcount == 0) - { - vtmp->avgcount = 1; - } + if(type == DSET_TYPE_UNFOUND) + { + //couldn't find right type by guessing, check the old way + type = backup_search_for_type(vtmp->d, ds_name); + } - /** User wants latency values as long run avg */ - if(long_run_latency_avg) + switch(type) + { + case DSET_LATENCY: + if(vtmp->avgcount_exists == -1) { - result = (sum / vtmp->avgcount); - DEBUG("uv->gauge = sumd / avgcounti = :%lf", result); + sscanf(val, "%" PRIu64, &vtmp->avgcount); + vtmp->avgcount_exists = 0; + //return after saving avgcount - don't dispatch value + //until latency calculation + return 0; } else { - result = get_last_avg(dset_name, ds_name, sum, vtmp->avgcount); - if(result == -ENOMEM) + double sum, result; + sscanf(val, "%lf", &sum); + + if(vtmp->avgcount == 0) { - return -ENOMEM; + vtmp->avgcount = 1; } - DEBUG("uv->gauge = (sumd_now - sumd_last) / " - "(avgcounti_now - avgcounti_last) = :%lf", result); - } - uv->gauge = result; - vtmp->avgcount = -1; - } - } - else if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE) - { - uint64_t derive_val; - sscanf(val, "%" PRIu64, &derive_val); - uv->derive = derive_val; - DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive); - } - else - { - double other_val; - sscanf(val, "%lf", &other_val); - uv->gauge = other_val; - DEBUG("uv->gauge %lf",uv->gauge); + /** User wants latency values as long run avg */ + if(long_run_latency_avg) + { + result = (sum / vtmp->avgcount); + } + else + { + result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount); + if(result == -ENOMEM) + { + return -ENOMEM; + } + } + + uv.gauge = result; + vtmp->avgcount_exists = -1; + vtmp->latency_index = (vtmp->latency_index + 1); + } + break; + case DSET_BYTES: + sscanf(val, "%lf", &tmp_d); + uv.gauge = tmp_d; + break; + case DSET_RATE: + sscanf(val, "%" PRIu64, &tmp_u); + uv.derive = tmp_u; + break; + case DSET_TYPE_UNFOUND: + default: + ERROR("ceph plugin: ds %s was not properly initialized.", ds_name); + return -1; } + + sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type)); + sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance)); + vtmp->vlist.values = &uv; + vtmp->vlist.values_len = 1; + + vtmp->index = (vtmp->index + 1); + plugin_dispatch_values(&vtmp->vlist); + return 0; } @@ -1037,15 +1060,15 @@ static int cconn_connect(struct cconn *io) int flags, fd, err; if(io->state != CSTATE_UNCONNECTED) { - ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED"); + ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED"); return -EDOM; } fd = socket(PF_UNIX, SOCK_STREAM, 0); if(fd < 0) { int err = -errno; - ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: " - "error %d", err); + ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) " + "failed: error %d", err); return err; } memset(&address, 0, sizeof(struct sockaddr_un)); @@ -1056,7 +1079,8 @@ static int cconn_connect(struct cconn *io) connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un))); if(err < 0) { - ERROR("cconn_connect: connect(%d) failed: error %d", fd, err); + ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d", + fd, err); return err; } @@ -1064,7 +1088,8 @@ static int cconn_connect(struct cconn *io) if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) { err = -errno; - ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err); + ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d", + fd, err); return err; } io->asok = fd; @@ -1094,55 +1119,31 @@ static void cconn_close(struct cconn *io) static int cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand) { - int i, ret = 0; - struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) - + (sizeof(struct values_holder)) * io->d->dset_num); + int ret; + struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1); if(!vtmp) { return -ENOMEM; } - for(i = 0; i < io->d->dset_num; i++) - { - value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num)); - vtmp->vh[i].values = val; - vtmp->vh[i].values_len = io->d->dset[i].ds_num; - } + vtmp->vlist = (value_list_t)VALUE_LIST_INIT; + sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host)); + sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin)); + sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance)); + vtmp->d = io->d; - vtmp->holder_num = io->d->dset_num; - vtmp->avgcount = -1; + vtmp->avgcount_exists = -1; + vtmp->latency_index = 0; + vtmp->index = 0; yajl->handler_arg = vtmp; ret = traverse_json(io->json, io->json_len, hand); - if(ret) - { - goto done; - } - for(i = 0; i < vtmp->holder_num; i++) - { - value_list_t vl = VALUE_LIST_INIT; - sstrncpy(vl.host, hostname_g, sizeof(vl.host)); - sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin)); - strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance)); - sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type)); - vl.values = vtmp->vh[i].values; - vl.values_len = io->d->dset[i].ds_num; - DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"", - io->d->name, vl.values_len, io->json); - ret = plugin_dispatch_values(&vl); - if(ret) - { - goto done; - } - } - - done: for(i = 0; i < vtmp->holder_num; i++) - { - sfree(vtmp->vh[i].values); - } sfree(vtmp); return ret; } +/** + * Initiate JSON parsing and print error if one occurs + */ static int cconn_process_json(struct cconn *io) { if((io->request_type != ASOK_REQ_DATA) && @@ -1178,6 +1179,10 @@ static int cconn_process_json(struct cconn *io) result = cconn_process_data(io, &io->yajl, hand); break; case ASOK_REQ_SCHEMA: + //init daemon specific variables + io->d->ds_num = 0; + io->d->last_idx = 0; + io->d->last_poll_data = NULL; io->yajl.handler = node_handler_define_schema; io->yajl.handler_arg = io->d; result = traverse_json(io->json, io->json_len, hand); @@ -1215,7 +1220,8 @@ static int cconn_validate_revents(struct cconn *io, int revents) { if(revents & POLLERR) { - ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name); + ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR", + io->d->name); return -EIO; } switch (io->state) @@ -1226,10 +1232,9 @@ static int cconn_validate_revents(struct cconn *io, int revents) case CSTATE_READ_AMT: case CSTATE_READ_JSON: return (revents & POLLIN) ? 0 : -EINVAL; - return (revents & POLLIN) ? 0 : -EINVAL; default: - ERROR("cconn_validate_revents(name=%s) got to illegal state on " - "line %d", io->d->name, __LINE__); + ERROR("ceph plugin: cconn_validate_revents(name=%s) got to " + "illegal state on line %d", io->d->name, __LINE__); return -EDOM; } } @@ -1241,8 +1246,8 @@ static int cconn_handle_event(struct cconn *io) switch (io->state) { case CSTATE_UNCONNECTED: - ERROR("cconn_handle_event(name=%s) got to illegal state on line " - "%d", io->d->name, __LINE__); + ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal " + "state on line %d", io->d->name, __LINE__); return -EDOM; case CSTATE_WRITE_REQUEST: @@ -1253,7 +1258,7 @@ static int cconn_handle_event(struct cconn *io) size_t cmd_len = strlen(cmd); RETRY_ON_EINTR(ret, write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)", io->d->name, io->state, io->amt, ret); if(ret < 0) { @@ -1280,7 +1285,7 @@ static int cconn_handle_event(struct cconn *io) RETRY_ON_EINTR(ret, read(io->asok, ((char*)(&io->d->version)) + io->amt, sizeof(io->d->version) - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", io->d->name, io->state, ret); if(ret < 0) { @@ -1292,10 +1297,11 @@ static int cconn_handle_event(struct cconn *io) io->d->version = ntohl(io->d->version); if(io->d->version != 1) { - ERROR("cconn_handle_event(name=%s) not " - "expecting version %d!", io->d->name, io->d->version); + ERROR("ceph plugin: cconn_handle_event(name=%s) not " + "expecting version %d!", io->d->name, io->d->version); return -ENOTSUP; - }DEBUG("cconn_handle_event(name=%s): identified as " + } + DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as " "version %d", io->d->name, io->d->version); io->amt = 0; cconn_close(io); @@ -1308,7 +1314,7 @@ static int cconn_handle_event(struct cconn *io) RETRY_ON_EINTR(ret, read(io->asok, ((char*)(&io->json_len)) + io->amt, sizeof(io->json_len) - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", io->d->name, io->state, ret); if(ret < 0) { @@ -1323,7 +1329,7 @@ static int cconn_handle_event(struct cconn *io) io->json = calloc(1, io->json_len + 1); if(!io->json) { - ERROR("ERR CALLOCING IO->JSON"); + ERROR("ceph plugin: error callocing io->json"); return -ENOMEM; } } @@ -1333,7 +1339,7 @@ static int cconn_handle_event(struct cconn *io) { RETRY_ON_EINTR(ret, read(io->asok, io->json + io->amt, io->json_len - io->amt)); - DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)", + DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)", io->d->name, io->state, ret); if(ret < 0) { @@ -1353,8 +1359,8 @@ static int cconn_handle_event(struct cconn *io) return 0; } default: - ERROR("cconn_handle_event(name=%s) got to illegal state on " - "line %d", io->d->name, __LINE__); + ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal " + "state on line %d", io->d->name, __LINE__); return -EDOM; } } @@ -1367,7 +1373,7 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds) /* The request has already been serviced. */ return 0; } - else if((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0)) + else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0)) { /* If there are no counters to report on, don't bother * connecting */ @@ -1400,8 +1406,8 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds) fds->events = POLLIN; return 1; default: - ERROR("cconn_prepare(name=%s) got to illegal state on line %d", - io->d->name, __LINE__); + ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state " + "on line %d", io->d->name, __LINE__); return -EDOM; } } @@ -1427,7 +1433,7 @@ static int cconn_main_loop(uint32_t request_type) struct timeval end_tv; struct cconn io_array[g_num_daemons]; - DEBUG("entering cconn_main_loop(request_type = %d)", request_type); + DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type); /* create cconn array */ memset(io_array, 0, sizeof(io_array)); @@ -1456,7 +1462,7 @@ static int cconn_main_loop(uint32_t request_type) ret = cconn_prepare(io, fds + nfds); if(ret < 0) { - WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d", + WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d", io->d->name, i, io->state, ret); cconn_close(io); io->request_type = ASOK_REQ_NONE; @@ -1464,8 +1470,6 @@ static int cconn_main_loop(uint32_t request_type) } else if(ret == 1) { - DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)", - io->d->name, i, io->state); polled_io_array[nfds++] = io_array + i; } } @@ -1473,7 +1477,6 @@ static int cconn_main_loop(uint32_t request_type) { /* finished */ ret = 0; - DEBUG("cconn_main_loop: no more cconn to manage."); goto done; } gettimeofday(&tv, NULL); @@ -1482,13 +1485,13 @@ static int cconn_main_loop(uint32_t request_type) { /* Timed out */ ret = -ETIMEDOUT; - WARNING("ERROR: cconn_main_loop: timed out.\n"); + WARNING("ceph plugin: cconn_main_loop: timed out."); goto done; } RETRY_ON_EINTR(ret, poll(fds, nfds, diff)); if(ret < 0) { - ERROR("poll(2) error: %d", ret); + ERROR("ceph plugin: poll(2) error: %d", ret); goto done; } for(i = 0; i < nfds; ++i) @@ -1501,7 +1504,7 @@ static int cconn_main_loop(uint32_t request_type) } else if(cconn_validate_revents(io, revents)) { - WARNING("ERROR: cconn(name=%s,i=%d,st=%d): " + WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): " "revents validation error: " "revents=0x%08x", io->d->name, i, io->state, revents); cconn_close(io); @@ -1513,7 +1516,7 @@ static int cconn_main_loop(uint32_t request_type) int ret = cconn_handle_event(io); if(ret) { - WARNING("ERROR: cconn_handle_event(name=%s," + WARNING("ceph plugin: cconn_handle_event(name=%s," "i=%d,st=%d): error %d", io->d->name, i, io->state, ret); cconn_close(io); io->request_type = ASOK_REQ_NONE; @@ -1528,11 +1531,11 @@ static int cconn_main_loop(uint32_t request_type) } if(some_unreachable) { - DEBUG("cconn_main_loop: some Ceph daemons were unreachable."); + DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable."); } else { - DEBUG("cconn_main_loop: reached all Ceph daemons :)"); + DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)"); } return ret; } @@ -1545,34 +1548,12 @@ static int ceph_read(void) /******* lifecycle *******/ static int ceph_init(void) { - int i, ret, j; - DEBUG("ceph_init"); + int ret; ceph_daemons_print(); ret = cconn_main_loop(ASOK_REQ_VERSION); - if(ret) - { - return ret; - } - for(i = 0; i < g_num_daemons; ++i) - { - struct ceph_daemon *d = g_daemons[i]; - for(j = 0; j < d->dset_num; j++) - { - ret = plugin_register_data_set(d->dset + j); - if(ret) - { - ERROR("plugin_register_data_set(%s) failed!", d->name); - } - else - { - DEBUG("plugin_register_data_set(%s): " - "(d->dset)[%d]->ds_num=%d", - d->name, j, d->dset[j].ds_num); - } - } - } - return 0; + + return (ret) ? ret : 0; } static int ceph_shutdown(void) @@ -1585,14 +1566,7 @@ static int ceph_shutdown(void) sfree(g_daemons); g_daemons = NULL; g_num_daemons = 0; - for(i = 0; i < last_idx; i++) - { - sfree(last_poll_data[i]); - } - sfree(last_poll_data); - last_poll_data = NULL; - last_idx = 0; - DEBUG("finished ceph_shutdown"); + DEBUG("ceph plugin: finished ceph_shutdown"); return 0; }