Merge remote-tracking branch 'origin/pr/598'
[collectd.git] / src / ceph.c
index 06d57af..1b1f40b 100644 (file)
@@ -50,8 +50,6 @@
 #include <math.h>
 #include <inttypes.h>
 
-#define MAX_RRD_DS_NAME_LEN 20
-
 #define RETRY_AVGCOUNT -1
 
 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
@@ -84,6 +82,21 @@ typedef size_t yajl_len_t;
 typedef unsigned int yajl_len_t;
 #endif
 
+/** Number of types for ceph defined in types.db */
+#define CEPH_DSET_TYPES_NUM 3
+/** ceph types enum */
+enum ceph_dset_type_d
+{
+    DSET_LATENCY = 0,
+    DSET_BYTES = 1,
+    DSET_RATE = 2,
+    DSET_TYPE_UNFOUND = 1000
+};
+
+/** Valid types for ceph defined in types.db */
+const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
+                                   {"ceph_latency", "ceph_bytes", "ceph_rate"};
+
 /******* ceph_daemon *******/
 struct ceph_daemon
 {
@@ -92,19 +105,23 @@ struct ceph_daemon
     /** daemon name **/
     char name[DATA_MAX_NAME_LEN];
 
-    int dset_num;
-
     /** Path to the socket that we use to talk to the ceph daemon */
     char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
 
-    /** The set of  key/value pairs that this daemon reports
-     * dset.type        The daemon name
-     * dset.ds_num      Number of data sources (key/value pairs) 
-     * dset.ds      Dynamically allocated array of key/value pairs
+    /** Number of counters */
+    int ds_num;
+    /** Track ds types */
+    uint32_t *ds_types;
+    /** Track ds names to match with types */
+    char **ds_names;
+
+    /**
+     * Keep track of last data for latency values so we can calculate rate
+     * since last poll.
      */
-    /** Dynamically allocated array **/
-    struct data_set_s *dset;
-    int **pc_types;
+    struct last_data **last_poll_data;
+    /** index of last poll data */
+    int last_idx;
 };
 
 /******* JSON parsing *******/
@@ -123,13 +140,6 @@ struct yajl_struct
 };
 typedef struct yajl_struct yajl_struct;
 
-/**
- * Keep track of last data for latency values so we can calculate rate
- * since last poll.
- */
-struct last_data **last_poll_data = NULL;
-int last_idx = 0;
-
 enum perfcounter_type_d
 {
     PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
@@ -154,21 +164,29 @@ static struct ceph_daemon **g_daemons = NULL;
 /** Number of elements in g_daemons */
 static int g_num_daemons = 0;
 
-struct values_holder
-{
-    int values_len;
-    value_t *values;
-};
-
 /**
- * A set of values_t data that we build up in memory while parsing the JSON.
+ * A set of data that we build up in memory while parsing the JSON.
  */
 struct values_tmp
 {
+    /** ceph daemon we are processing data for*/
     struct ceph_daemon *d;
-    int holder_num;
-    struct values_holder vh[0];
+    /** track avgcount across counters for avgcount/sum latency pairs */
     uint64_t avgcount;
+    /** current index of counters - used to get type of counter */
+    int index;
+    /** do we already have an avgcount for latency pair */
+    int avgcount_exists;
+    /**
+     * similar to index, but current index of latency type counters -
+     * used to get last poll data of counter
+     */
+    int latency_index;
+    /**
+     * values list - maintain across counters since
+     * host/plugin/plugin instance are always the same
+     */
+    value_list_t vlist;
 };
 
 /**
@@ -177,13 +195,11 @@ struct values_tmp
  */
 struct last_data
 {
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
+    char ds_name[DATA_MAX_NAME_LEN];
     double last_sum;
     uint64_t last_count;
 };
 
-
 /******* network I/O *******/
 enum cstate_t
 {
@@ -239,7 +255,7 @@ static int ceph_cb_boolean(void *ctx, int bool_val)
     return CEPH_CB_CONTINUE;
 }
 
-static int 
+static int
 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
 {
     yajl_struct *yajl = (yajl_struct*)ctx;
@@ -271,7 +287,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
                         (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
                         (strcmp(yajl->state[i].key,"avgcount") == 0))
                 {
-                    DEBUG("Skipping avgcount for filestore.JournalWrBytes");
+                    DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
                     yajl->depth = (yajl->depth - 1);
                     return CEPH_CB_CONTINUE;
                 }
@@ -304,7 +320,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
     return CEPH_CB_CONTINUE;
 }
 
-static int ceph_cb_string(void *ctx, const unsigned char *string_val, 
+static int ceph_cb_string(void *ctx, const unsigned char *string_val,
         yajl_len_t string_len)
 {
     return CEPH_CB_CONTINUE;
@@ -372,7 +388,7 @@ static yajl_callbacks callbacks = {
 
 static void ceph_daemon_print(const struct ceph_daemon *d)
 {
-    DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
+    DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
 }
 
 static void ceph_daemons_print(void)
@@ -387,17 +403,26 @@ static void ceph_daemons_print(void)
 static void ceph_daemon_free(struct ceph_daemon *d)
 {
     int i = 0;
-    for(; i < d->dset_num; i++)
+    for(; i < d->last_idx; i++)
+    {
+        sfree(d->last_poll_data[i]);
+    }
+    sfree(d->last_poll_data);
+    d->last_poll_data = NULL;
+    d->last_idx = 0;
+    for(i = 0; i < d->ds_num; i++)
     {
-        plugin_unregister_data_set((d->dset + i)->type);
-        sfree(d->dset->ds);
-        sfree(d->pc_types[i]);
+        sfree(d->ds_names[i]);
     }
-    sfree(d->dset);
-    sfree(d->pc_types);
+    sfree(d->ds_types);
+    sfree(d->ds_names);
     sfree(d);
 }
 
+/**
+ * Compact ds name by removing special characters and trimming length to
+ * DATA_MAX_NAME_LEN if necessary
+ */
 static void compact_ds_name(char *source, char *dest)
 {
     int keys_num = 0, i;
@@ -405,6 +430,7 @@ static void compact_ds_name(char *source, char *dest)
     char *keys[16];
     char len_str[3];
     char tmp[DATA_MAX_NAME_LEN];
+    size_t key_chars_remaining = (DATA_MAX_NAME_LEN-1);
     int reserved = 0;
     int offset = 0;
     memset(tmp, 0, sizeof(tmp));
@@ -431,15 +457,16 @@ static void compact_ds_name(char *source, char *dest)
     /** concatenate each part of source string **/
     for(i = 0; i < keys_num; i++)
     {
-        strcat(tmp, keys[i]);
+        strncat(tmp, keys[i], key_chars_remaining);
+        key_chars_remaining -= strlen(keys[i]);
     }
     tmp[DATA_MAX_NAME_LEN - 1] = '\0';
-    /** to coordinate limitation of length of ds name from RRD
+    /** to coordinate limitation of length of type_instance
      *  we will truncate ds_name
      *  when the its length is more than
-     *  MAX_RRD_DS_NAME_LEN
+     *  DATA_MAX_NAME_LEN
      */
-    if(strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
+    if(strlen(tmp) > DATA_MAX_NAME_LEN - 1)
     {
         append_status |= 0x4;
         /** we should reserve space for
@@ -461,7 +488,7 @@ static void compact_ds_name(char *source, char *dest)
          */
         reserved += 4;
     }
-    snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
+    snprintf(dest, DATA_MAX_NAME_LEN - reserved, "%s", tmp);
     offset = strlen(dest);
     switch (append_status)
     {
@@ -486,146 +513,58 @@ static void compact_ds_name(char *source, char *dest)
             break;
     }
 }
-static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
+
+/**
+ * Parse key to remove "type" if this is for schema and initiate compaction
+ */
+static int parse_keys(const char *key_str, char *ds_name)
 {
     char *ptr, *rptr;
-    size_t dset_name_len = 0;
     size_t ds_name_len = 0;
-    char tmp_ds_name[DATA_MAX_NAME_LEN];
+    /**
+     * allow up to 100 characters before compaction - compact_ds_name will not
+     * allow more than DATA_MAX_NAME_LEN chars
+     */
+    int max_str_len = 100;
+    char tmp_ds_name[max_str_len];
     memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
-    if(dset_name == NULL || ds_name == NULL || key_str == NULL ||
-            key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
+    if(ds_name == NULL || key_str == NULL ||  key_str[0] == '\0' ||
+                                                            ds_name[0] != '\0')
     {
         return -1;
     }
     if((ptr = strchr(key_str, '.')) == NULL
             || (rptr = strrchr(key_str, '.')) == NULL)
     {
-        strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
-        strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
+        memcpy(tmp_ds_name, key_str, max_str_len - 1);
         goto compact;
     }
-    dset_name_len =
-            (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
-                    (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
-    memcpy(dset_name, key_str, dset_name_len);
-    ds_name_len =
-           (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
-    if(ds_name_len == 0)
-    { /** only have two keys **/
-        if(!strncmp(rptr + 1, "type", 4))
-        {/** if last key is "type",ignore **/
-            strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
-        }
-        else
-        {/** if last key isn't "type", copy last key **/
-            strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
-        }
-    }
-    else if(!strncmp(rptr + 1, "type", 4))
-    {/** more than two keys **/
-        memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
+
+    ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr);
+    if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4))
+    { /** copy whole key **/
+        memcpy(tmp_ds_name, key_str, max_str_len - 1);
     }
     else
-    {/** copy whole keys **/
-        strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
+    {/** more than two keys **/
+        memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ?
+                (max_str_len - 1) : (rptr - key_str)));
     }
+
     compact: compact_ds_name(tmp_ds_name, ds_name);
     return 0;
 }
 
-static int get_matching_dset(const struct ceph_daemon *d, const char *name)
-{
-    int idx;
-    for(idx = 0; idx < d->dset_num; ++idx)
-    {
-        if(strcmp(d->dset[idx].type, name) == 0)
-        {
-            return idx;
-        }
-    }
-    return -1;
-}
-
-static int get_matching_value(const struct data_set_s *dset, const char *name,
-        int num_values)
-{
-    int idx;
-    for(idx = 0; idx < num_values; ++idx)
-    {
-        if(strcmp(dset->ds[idx].name, name) == 0)
-        {
-            return idx;
-        }
-    }
-    return -1;
-}
-
+/**
+ * while parsing ceph admin socket schema, save counter name and type for later
+ * data processing
+ */
 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
         int pc_type)
 {
-    struct data_source_s *ds;
-    struct data_set_s *dset;
-    struct data_set_s *dset_array;
-    int **pc_types_array = NULL;
-    int *pc_types;
-    int *pc_types_new;
-    int idx = 0;
-    if(strlen(name) + 1 > DATA_MAX_NAME_LEN)
-    {
-        return -ENAMETOOLONG;
-    }
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
-    memset(dset_name, 0, sizeof(dset_name));
+    uint32_t type;
+    char ds_name[DATA_MAX_NAME_LEN];
     memset(ds_name, 0, sizeof(ds_name));
-    if(parse_keys(name, dset_name, ds_name))
-    {
-        return 1;
-    }
-    idx = get_matching_dset(d, dset_name);
-    if(idx == -1)
-    {/* need to add a dset **/
-        dset_array = realloc(d->dset,
-                sizeof(struct data_set_s) * (d->dset_num + 1));
-        if(!dset_array)
-        {
-            return -ENOMEM;
-        }
-        pc_types_array = realloc(d->pc_types,
-                sizeof(int *) * (d->dset_num + 1));
-        if(!pc_types_array)
-        {
-            return -ENOMEM;
-        }
-        dset = &dset_array[d->dset_num];
-        /** this step is very important, otherwise,
-         *  realloc for dset->ds will tricky because of
-         *  a random addr in dset->ds
-         */
-        memset(dset, 0, sizeof(struct data_set_s));
-        dset->ds_num = 0;
-        snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
-        pc_types = pc_types_array[d->dset_num] = NULL;
-        d->dset = dset_array;
-    }
-    else
-    {
-        dset = &d->dset[idx];
-        pc_types = d->pc_types[idx];
-    }
-    struct data_source_s *ds_array = realloc(dset->ds,
-            sizeof(struct data_source_s) * (dset->ds_num + 1));
-    if(!ds_array)
-    {
-        return -ENOMEM;
-    }
-    pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
-    if(!pc_types_new)
-    {
-        return -ENOMEM;
-    }
-    dset->ds = ds_array;
 
     if(convert_special_metrics)
     {
@@ -637,35 +576,42 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
          * other "Bytes". Instead of keeping an "average" or "rate", use the
          * "sum" in the pair and assign that to the derive value.
          */
-        if((strcmp(dset_name,"filestore") == 0) &&
-                                        strcmp(ds_name, "JournalWrBytes") == 0)
+        if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
         {
             pc_type = 10;
         }
     }
 
-    if(idx == -1)
+    d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
+    if(!d->ds_names)
     {
-        pc_types_array[d->dset_num] = pc_types_new;
-        d->pc_types = pc_types_array;
-        d->pc_types[d->dset_num][dset->ds_num] = pc_type;
-        d->dset_num++;
+        return -ENOMEM;
     }
-    else
+
+    d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
+    if(!d->ds_types)
     {
-        d->pc_types[idx] = pc_types_new;
-        d->pc_types[idx][dset->ds_num] = pc_type;
+        return -ENOMEM;
     }
-    ds = &ds_array[dset->ds_num++];
-    snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
-    ds->type = (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
-            
-    /**
-     * Use min of 0 for DERIVE types so we don't get negative values on Ceph
-     * service restart
-     */
-    ds->min = (ds->type == DS_TYPE_DERIVE) ? 0 : NAN;
-    ds->max = NAN;
+
+    d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
+    if(!d->ds_names[d->ds_num])
+    {
+        return -ENOMEM;
+    }
+
+    type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
+            ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
+    d->ds_types[d->ds_num] = type;
+
+    if(parse_keys(name, ds_name))
+    {
+        return 1;
+    }
+
+    sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
+    d->ds_num = (d->ds_num + 1);
+
     return 0;
 }
 
@@ -761,6 +707,7 @@ static int cc_add_daemon_config(oconfig_item_t *ci)
                 "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
         return -EINVAL;
     }
+
     array = realloc(g_daemons,
                     sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
     if(array == NULL)
@@ -790,10 +737,16 @@ static int ceph_config(oconfig_item_t *ci)
         if(strcasecmp("Daemon", child->key) == 0)
         {
             ret = cc_add_daemon_config(child);
-            if(ret)
+            if(ret == ENOMEM)
             {
+                ERROR("ceph plugin: Couldn't allocate memory");
                 return ret;
             }
+            else if(ret)
+            {
+                //process other daemons and ignore this one
+                continue;
+            }
         }
         else if(strcasecmp("LongRunAvgLatency", child->key) == 0)
         {
@@ -819,6 +772,9 @@ static int ceph_config(oconfig_item_t *ci)
     return 0;
 }
 
+/**
+ * Parse JSON and get error message if present
+ */
 static int
 traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
 {
@@ -841,90 +797,119 @@ traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
     }
 }
 
+/**
+ * Add entry for each counter while parsing schema
+ */
 static int
 node_handler_define_schema(void *arg, const char *val, const char *key)
 {
     struct ceph_daemon *d = (struct ceph_daemon *) arg;
     int pc_type;
     pc_type = atoi(val);
-    DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
-            d->name, key, pc_type);
     return ceph_daemon_add_ds_entry(d, key, pc_type);
 }
 
-static int add_last(const char *dset_n, const char *ds_n, double cur_sum,
+/**
+ * Latency counter does not yet have an entry in last poll data - add it.
+ */
+static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
         uint64_t cur_count)
 {
-    last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
-    if(!last_poll_data[last_idx])
+    d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
+    if(!d->last_poll_data[d->last_idx])
     {
         return -ENOMEM;
     }
-    sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,
-            sizeof(last_poll_data[last_idx]->dset_name));
-    sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,
-            sizeof(last_poll_data[last_idx]->ds_name));
-    last_poll_data[last_idx]->last_sum = cur_sum;
-    last_poll_data[last_idx]->last_count = cur_count;
-    last_idx++;
+    sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
+            sizeof(d->last_poll_data[d->last_idx]->ds_name));
+    d->last_poll_data[d->last_idx]->last_sum = cur_sum;
+    d->last_poll_data[d->last_idx]->last_count = cur_count;
+    d->last_idx = (d->last_idx + 1);
     return 0;
 }
 
-static int update_last(const char *dset_n, const char *ds_n, double cur_sum,
-        uint64_t cur_count)
+/**
+ * Update latency counter or add new entry if it doesn't exist
+ */
+static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
+        double cur_sum, uint64_t cur_count)
 {
-    int i;
-    for(i = 0; i < last_idx; i++)
+    if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
     {
-        if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0 &&
-                (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
-        {
-            last_poll_data[i]->last_sum = cur_sum;
-            last_poll_data[i]->last_count = cur_count;
-            return 0;
-        }
+        d->last_poll_data[index]->last_sum = cur_sum;
+        d->last_poll_data[index]->last_count = cur_count;
+        return 0;
     }
 
-    if(!last_poll_data)
+    if(!d->last_poll_data)
     {
-        last_poll_data = malloc(1 * sizeof(struct last_data *));
-        if(!last_poll_data)
+        d->last_poll_data = malloc(1 * sizeof(struct last_data *));
+        if(!d->last_poll_data)
         {
             return -ENOMEM;
         }
     }
     else
     {
-        struct last_data **tmp_last = realloc(last_poll_data,
-                ((last_idx+1) * sizeof(struct last_data *)));
+        struct last_data **tmp_last = realloc(d->last_poll_data,
+                ((d->last_idx+1) * sizeof(struct last_data *)));
         if(!tmp_last)
         {
             return -ENOMEM;
         }
-        last_poll_data = tmp_last;
+        d->last_poll_data = tmp_last;
     }
-    return add_last(dset_n,ds_n,cur_sum,cur_count);
+    return add_last(d, ds_n, cur_sum, cur_count);
 }
 
-static double get_last_avg(const char *dset_n, const char *ds_n,
+/**
+ * If using index guess failed (shouldn't happen, but possible if counters
+ * get rearranged), resort to searching for counter name
+ */
+static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
+{
+    int i = 0;
+    for(; i < d->last_idx; i++)
+    {
+        if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
+        {
+            return i;
+        }
+    }
+    return -1;
+}
+
+/**
+ * Calculate average b/t current data and last poll data
+ * if last poll data exists
+ */
+static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
         double cur_sum, uint64_t cur_count)
 {
-    int i;
     double result = -1.1, sum_delt = 0.0;
     uint64_t count_delt = 0;
-    for(i = 0; i < last_idx; i++)
+    int tmp_index = 0;
+    if(d->last_idx > index)
     {
-        if((strcmp(last_poll_data[i]->dset_name,dset_n) == 0) &&
-                (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
+        if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
         {
-            if(cur_count < last_poll_data[i]->last_count)
-            {
-                break;
-            }
-            sum_delt = (cur_sum - last_poll_data[i]->last_sum);
-            count_delt = (cur_count - last_poll_data[i]->last_count);
+            tmp_index = index;
+        }
+        //test previous index
+        else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
+        {
+            tmp_index = (index - 1);
+        }
+        else
+        {
+            tmp_index = backup_search_for_last_avg(d, ds_n);
+        }
+
+        if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
+        {
+            sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
+            count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
             result = (sum_delt / count_delt);
-            break;
         }
     }
 
@@ -932,95 +917,140 @@ static double get_last_avg(const char *dset_n, const char *ds_n,
     {
         result = NAN;
     }
-    if(update_last(dset_n,ds_n,cur_sum,cur_count) == -ENOMEM)
+    if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
     {
         return -ENOMEM;
     }
     return result;
 }
 
+/**
+ * If using index guess failed, resort to searching for counter name
+ */
+static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
+{
+    int idx = 0;
+    for(; idx < d->ds_num; idx++)
+    {
+        if(strcmp(d->ds_names[idx], ds_name) == 0)
+        {
+            return d->ds_types[idx];
+        }
+    }
+    return DSET_TYPE_UNFOUND;
+}
+
+/**
+ * Process counter data and dispatch values
+ */
 static int node_handler_fetch_data(void *arg, const char *val, const char *key)
 {
-    int dset_idx, ds_idx;
-    value_t *uv;
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
+    value_t uv;
+    double tmp_d;
+    uint64_t tmp_u;
     struct values_tmp *vtmp = (struct values_tmp*) arg;
-    memset(dset_name, 0, sizeof(dset_name));
+    uint32_t type = DSET_TYPE_UNFOUND;
+    int index = vtmp->index;
+
+    char ds_name[DATA_MAX_NAME_LEN];
     memset(ds_name, 0, sizeof(ds_name));
-    if(parse_keys(key, dset_name, ds_name))
+
+    if(parse_keys(key, ds_name))
     {
-        DEBUG("enter node_handler_fetch_data");
         return 1;
     }
-    dset_idx = get_matching_dset(vtmp->d, dset_name);
-    if(dset_idx == -1)
+
+    if(index >= vtmp->d->ds_num)
     {
-        return 1;
+        //don't overflow bounds of array
+        index = (vtmp->d->ds_num - 1);
     }
-    ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
-            vtmp->d->dset[dset_idx].ds_num);
-    if(ds_idx == -1)
+
+    /**
+     * counters should remain in same order we parsed schema... we maintain the
+     * index variable to keep track of current point in list of counters. first
+     * use index to guess point in array for retrieving type. if that doesn't
+     * work, use the old way to get the counter type
+     */
+    if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
     {
-        DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
-            dset_name,ds_name,dset_idx,ds_idx);
-        return RETRY_AVGCOUNT;
+        //found match
+        type = vtmp->d->ds_types[index];
     }
-    uv = &(vtmp->vh[dset_idx].values[ds_idx]);
-
-    if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
+    else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
     {
-        if(vtmp->avgcount == -1)
-        {
-            sscanf(val, "%" PRIu64, &vtmp->avgcount);
-        }
-        else
-        {
-            double sum, result;
-            sscanf(val, "%lf", &sum);
-            DEBUG("avgcount:%ld",vtmp->avgcount);
-            DEBUG("sum:%lf",sum);
+        //try previous key
+        type = vtmp->d->ds_types[index-1];
+    }
 
-            if(vtmp->avgcount == 0)
-            {
-                vtmp->avgcount = 1;
-            }
+    if(type == DSET_TYPE_UNFOUND)
+    {
+        //couldn't find right type by guessing, check the old way
+        type = backup_search_for_type(vtmp->d, ds_name);
+    }
 
-            /** User wants latency values as long run avg */
-            if(long_run_latency_avg)
+    switch(type)
+    {
+        case DSET_LATENCY:
+            if(vtmp->avgcount_exists == -1)
             {
-                result = (sum / vtmp->avgcount);
-                DEBUG("uv->gauge = sumd / avgcounti = :%lf", result);
+                sscanf(val, "%" PRIu64, &vtmp->avgcount);
+                vtmp->avgcount_exists = 0;
+                //return after saving avgcount - don't dispatch value
+                //until latency calculation
+                return 0;
             }
             else
             {
-                result = get_last_avg(dset_name, ds_name, sum, vtmp->avgcount);
-                if(result == -ENOMEM)
+                double sum, result;
+                sscanf(val, "%lf", &sum);
+
+                if(vtmp->avgcount == 0)
                 {
-                    return -ENOMEM;
+                    vtmp->avgcount = 1;
                 }
-                DEBUG("uv->gauge = (sumd_now - sumd_last) / "
-                        "(avgcounti_now - avgcounti_last) = :%lf", result);
-            }
 
-            uv->gauge = result;
-            vtmp->avgcount = -1;
-        }
-    }
-    else if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
-    {
-        uint64_t derive_val;
-        sscanf(val, "%" PRIu64, &derive_val);
-        uv->derive = derive_val;
-        DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
-    }
-    else
-    {
-        double other_val;
-        sscanf(val, "%lf", &other_val);
-        uv->gauge = other_val;
-        DEBUG("uv->gauge %lf",uv->gauge);
+                /** User wants latency values as long run avg */
+                if(long_run_latency_avg)
+                {
+                    result = (sum / vtmp->avgcount);
+                }
+                else
+                {
+                    result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
+                    if(result == -ENOMEM)
+                    {
+                        return -ENOMEM;
+                    }
+                }
+
+                uv.gauge = result;
+                vtmp->avgcount_exists = -1;
+                vtmp->latency_index = (vtmp->latency_index + 1);
+            }
+            break;
+        case DSET_BYTES:
+            sscanf(val, "%lf", &tmp_d);
+            uv.gauge = tmp_d;
+            break;
+        case DSET_RATE:
+            sscanf(val, "%" PRIu64, &tmp_u);
+            uv.derive = tmp_u;
+            break;
+        case DSET_TYPE_UNFOUND:
+        default:
+            ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
+            return -1;
     }
+
+    sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
+    sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
+    vtmp->vlist.values = &uv;
+    vtmp->vlist.values_len = 1;
+
+    vtmp->index = (vtmp->index + 1);
+    plugin_dispatch_values(&vtmp->vlist);
+
     return 0;
 }
 
@@ -1030,15 +1060,15 @@ static int cconn_connect(struct cconn *io)
     int flags, fd, err;
     if(io->state != CSTATE_UNCONNECTED)
     {
-        ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
+        ERROR("ceph plugin: cconn_connect: io->state != CSTATE_UNCONNECTED");
         return -EDOM;
     }
     fd = socket(PF_UNIX, SOCK_STREAM, 0);
     if(fd < 0)
     {
         int err = -errno;
-        ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
-        "error %d", err);
+        ERROR("ceph plugin: cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) "
+            "failed: error %d", err);
         return err;
     }
     memset(&address, 0, sizeof(struct sockaddr_un));
@@ -1049,7 +1079,8 @@ static int cconn_connect(struct cconn *io)
         connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
     if(err < 0)
     {
-        ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
+        ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d",
+            fd, err);
         return err;
     }
 
@@ -1057,7 +1088,8 @@ static int cconn_connect(struct cconn *io)
     if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
     {
         err = -errno;
-        ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
+        ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d",
+            fd, err);
         return err;
     }
     io->asok = fd;
@@ -1087,55 +1119,31 @@ static void cconn_close(struct cconn *io)
 static int
 cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
 {
-    int i, ret = 0;
-    struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp)
-                    + (sizeof(struct values_holder)) * io->d->dset_num);
+    int ret;
+    struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
     if(!vtmp)
     {
         return -ENOMEM;
     }
 
-    for(i = 0; i < io->d->dset_num; i++)
-    {
-        value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
-        vtmp->vh[i].values = val;
-        vtmp->vh[i].values_len = io->d->dset[i].ds_num;
-    }
+    vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
+    sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
+    sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
+    sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
+
     vtmp->d = io->d;
-    vtmp->holder_num = io->d->dset_num;
-    vtmp->avgcount = -1;
+    vtmp->avgcount_exists = -1;
+    vtmp->latency_index = 0;
+    vtmp->index = 0;
     yajl->handler_arg = vtmp;
     ret = traverse_json(io->json, io->json_len, hand);
-    if(ret)
-    {
-        goto done;
-    }
-    for(i = 0; i < vtmp->holder_num; i++)
-    {
-        value_list_t vl = VALUE_LIST_INIT;
-        sstrncpy(vl.host, hostname_g, sizeof(vl.host));
-        sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
-        strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
-        sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
-        vl.values = vtmp->vh[i].values;
-        vl.values_len = io->d->dset[i].ds_num;
-        DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
-                io->d->name, vl.values_len, io->json);
-        ret = plugin_dispatch_values(&vl);
-        if(ret)
-        {
-            goto done;
-        }
-    }
-
-    done: for(i = 0; i < vtmp->holder_num; i++)
-    {
-        sfree(vtmp->vh[i].values);
-    }
     sfree(vtmp);
     return ret;
 }
 
+/**
+ * Initiate JSON parsing and print error if one occurs
+ */
 static int cconn_process_json(struct cconn *io)
 {
     if((io->request_type != ASOK_REQ_DATA) &&
@@ -1171,6 +1179,10 @@ static int cconn_process_json(struct cconn *io)
             result = cconn_process_data(io, &io->yajl, hand);
             break;
         case ASOK_REQ_SCHEMA:
+            //init daemon specific variables
+            io->d->ds_num = 0;
+            io->d->last_idx = 0;
+            io->d->last_poll_data = NULL;
             io->yajl.handler = node_handler_define_schema;
             io->yajl.handler_arg = io->d;
             result = traverse_json(io->json, io->json_len, hand);
@@ -1208,7 +1220,8 @@ static int cconn_validate_revents(struct cconn *io, int revents)
 {
     if(revents & POLLERR)
     {
-        ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name);
+        ERROR("ceph plugin: cconn_validate_revents(name=%s): got POLLERR",
+            io->d->name);
         return -EIO;
     }
     switch (io->state)
@@ -1219,10 +1232,9 @@ static int cconn_validate_revents(struct cconn *io, int revents)
         case CSTATE_READ_AMT:
         case CSTATE_READ_JSON:
             return (revents & POLLIN) ? 0 : -EINVAL;
-            return (revents & POLLIN) ? 0 : -EINVAL;
         default:
-            ERROR("cconn_validate_revents(name=%s) got to illegal state on "
-                    "line %d", io->d->name, __LINE__);
+            ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
+                "illegal state on line %d", io->d->name, __LINE__);
             return -EDOM;
     }
 }
@@ -1234,8 +1246,8 @@ static int cconn_handle_event(struct cconn *io)
     switch (io->state)
     {
         case CSTATE_UNCONNECTED:
-            ERROR("cconn_handle_event(name=%s) got to illegal state on line "
-                    "%d", io->d->name, __LINE__);
+            ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
+                "state on line %d", io->d->name, __LINE__);
 
             return -EDOM;
         case CSTATE_WRITE_REQUEST:
@@ -1246,7 +1258,7 @@ static int cconn_handle_event(struct cconn *io)
             size_t cmd_len = strlen(cmd);
             RETRY_ON_EINTR(ret,
                   write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
                     io->d->name, io->state, io->amt, ret);
             if(ret < 0)
             {
@@ -1273,7 +1285,7 @@ static int cconn_handle_event(struct cconn *io)
             RETRY_ON_EINTR(ret,
                     read(io->asok, ((char*)(&io->d->version)) + io->amt,
                             sizeof(io->d->version) - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1285,10 +1297,11 @@ static int cconn_handle_event(struct cconn *io)
                 io->d->version = ntohl(io->d->version);
                 if(io->d->version != 1)
                 {
-                    ERROR("cconn_handle_event(name=%s) not "
-                    "expecting version %d!", io->d->name, io->d->version);
+                    ERROR("ceph plugin: cconn_handle_event(name=%s) not "
+                        "expecting version %d!", io->d->name, io->d->version);
                     return -ENOTSUP;
-                }DEBUG("cconn_handle_event(name=%s): identified as "
+                }
+                DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
                         "version %d", io->d->name, io->d->version);
                 io->amt = 0;
                 cconn_close(io);
@@ -1301,7 +1314,7 @@ static int cconn_handle_event(struct cconn *io)
             RETRY_ON_EINTR(ret,
                     read(io->asok, ((char*)(&io->json_len)) + io->amt,
                             sizeof(io->json_len) - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1316,7 +1329,7 @@ static int cconn_handle_event(struct cconn *io)
                 io->json = calloc(1, io->json_len + 1);
                 if(!io->json)
                 {
-                    ERROR("ERR CALLOCING IO->JSON");
+                    ERROR("ceph plugin: error callocing io->json");
                     return -ENOMEM;
                 }
             }
@@ -1326,7 +1339,7 @@ static int cconn_handle_event(struct cconn *io)
         {
             RETRY_ON_EINTR(ret,
                    read(io->asok, io->json + io->amt, io->json_len - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1346,8 +1359,8 @@ static int cconn_handle_event(struct cconn *io)
             return 0;
         }
         default:
-            ERROR("cconn_handle_event(name=%s) got to illegal state on "
-            "line %d", io->d->name, __LINE__);
+            ERROR("ceph plugin: cconn_handle_event(name=%s) got to illegal "
+                "state on line %d", io->d->name, __LINE__);
             return -EDOM;
     }
 }
@@ -1360,7 +1373,7 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds)
         /* The request has already been serviced. */
         return 0;
     }
-    else if((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
+    else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
     {
         /* If there are no counters to report on, don't bother
          * connecting */
@@ -1393,8 +1406,8 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds)
             fds->events = POLLIN;
             return 1;
         default:
-            ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
-                    io->d->name, __LINE__);
+            ERROR("ceph plugin: cconn_prepare(name=%s) got to illegal state "
+                "on line %d", io->d->name, __LINE__);
             return -EDOM;
     }
 }
@@ -1420,7 +1433,7 @@ static int cconn_main_loop(uint32_t request_type)
     struct timeval end_tv;
     struct cconn io_array[g_num_daemons];
 
-    DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
+    DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
 
     /* create cconn array */
     memset(io_array, 0, sizeof(io_array));
@@ -1449,7 +1462,7 @@ static int cconn_main_loop(uint32_t request_type)
             ret = cconn_prepare(io, fds + nfds);
             if(ret < 0)
             {
-                WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
+                WARNING("ceph plugin: cconn_prepare(name=%s,i=%d,st=%d)=%d",
                         io->d->name, i, io->state, ret);
                 cconn_close(io);
                 io->request_type = ASOK_REQ_NONE;
@@ -1457,8 +1470,6 @@ static int cconn_main_loop(uint32_t request_type)
             }
             else if(ret == 1)
             {
-                DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
-                        io->d->name, i, io->state);
                 polled_io_array[nfds++] = io_array + i;
             }
         }
@@ -1466,7 +1477,6 @@ static int cconn_main_loop(uint32_t request_type)
         {
             /* finished */
             ret = 0;
-            DEBUG("cconn_main_loop: no more cconn to manage.");
             goto done;
         }
         gettimeofday(&tv, NULL);
@@ -1475,13 +1485,13 @@ static int cconn_main_loop(uint32_t request_type)
         {
             /* Timed out */
             ret = -ETIMEDOUT;
-            WARNING("ERROR: cconn_main_loop: timed out.\n");
+            WARNING("ceph plugin: cconn_main_loop: timed out.");
             goto done;
         }
         RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
         if(ret < 0)
         {
-            ERROR("poll(2) error: %d", ret);
+            ERROR("ceph plugin: poll(2) error: %d", ret);
             goto done;
         }
         for(i = 0; i < nfds; ++i)
@@ -1494,7 +1504,7 @@ static int cconn_main_loop(uint32_t request_type)
             }
             else if(cconn_validate_revents(io, revents))
             {
-                WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
+                WARNING("ceph plugin: cconn(name=%s,i=%d,st=%d): "
                 "revents validation error: "
                 "revents=0x%08x", io->d->name, i, io->state, revents);
                 cconn_close(io);
@@ -1506,7 +1516,7 @@ static int cconn_main_loop(uint32_t request_type)
                 int ret = cconn_handle_event(io);
                 if(ret)
                 {
-                    WARNING("ERROR: cconn_handle_event(name=%s,"
+                    WARNING("ceph plugin: cconn_handle_event(name=%s,"
                     "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
                     cconn_close(io);
                     io->request_type = ASOK_REQ_NONE;
@@ -1521,11 +1531,11 @@ static int cconn_main_loop(uint32_t request_type)
     }
     if(some_unreachable)
     {
-        DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
+        DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
     }
     else
     {
-        DEBUG("cconn_main_loop: reached all Ceph daemons :)");
+        DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
     }
     return ret;
 }
@@ -1538,34 +1548,12 @@ static int ceph_read(void)
 /******* lifecycle *******/
 static int ceph_init(void)
 {
-    int i, ret, j;
-    DEBUG("ceph_init");
+    int ret;
     ceph_daemons_print();
 
     ret = cconn_main_loop(ASOK_REQ_VERSION);
-    if(ret)
-    {
-        return ret;
-    }
-    for(i = 0; i < g_num_daemons; ++i)
-    {
-        struct ceph_daemon *d = g_daemons[i];
-        for(j = 0; j < d->dset_num; j++)
-        {
-            ret = plugin_register_data_set(d->dset + j);
-            if(ret)
-            {
-                ERROR("plugin_register_data_set(%s) failed!", d->name);
-            }
-            else
-            {
-                DEBUG("plugin_register_data_set(%s): "
-                        "(d->dset)[%d]->ds_num=%d",
-                        d->name, j, d->dset[j].ds_num);
-            }
-        }
-    }
-    return 0;
+
+    return (ret) ? ret : 0;
 }
 
 static int ceph_shutdown(void)
@@ -1578,14 +1566,7 @@ static int ceph_shutdown(void)
     sfree(g_daemons);
     g_daemons = NULL;
     g_num_daemons = 0;
-    for(i = 0; i < last_idx; i++)
-    {
-        sfree(last_poll_data[i]);
-    }
-    sfree(last_poll_data);
-    last_poll_data = NULL;
-    last_idx = 0;
-    DEBUG("finished ceph_shutdown");
+    DEBUG("ceph plugin: finished ceph_shutdown");
     return 0;
 }