Changes to use base types, add comments, etc.
authordaryder <daryder@cisco.com>
Mon, 22 Dec 2014 16:26:52 +0000 (11:26 -0500)
committerdaryder <daryder@cisco.com>
Mon, 22 Dec 2014 16:26:52 +0000 (11:26 -0500)
src/ceph.c

index faebb31..e8bde9b 100644 (file)
@@ -50,8 +50,6 @@
 #include <math.h>
 #include <inttypes.h>
 
-#define MAX_RRD_DS_NAME_LEN 20
-
 #define RETRY_AVGCOUNT -1
 
 #if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
@@ -84,6 +82,21 @@ typedef size_t yajl_len_t;
 typedef unsigned int yajl_len_t;
 #endif
 
+/** Number of types for ceph defined in types.db */
+#define CEPH_DSET_TYPES_NUM 3
+/** ceph types enum */
+enum ceph_dset_type_d
+{
+    DSET_LATENCY = 0,
+    DSET_BYTES = 1,
+    DSET_RATE = 2,
+    DSET_TYPE_UNFOUND = 1000
+};
+
+/** Valid types for ceph defined in types.db */
+const char * ceph_dset_types [CEPH_DSET_TYPES_NUM] =
+                                   {"ceph_latency", "ceph_bytes", "ceph_rate"};
+
 /******* ceph_daemon *******/
 struct ceph_daemon
 {
@@ -92,19 +105,15 @@ struct ceph_daemon
     /** daemon name **/
     char name[DATA_MAX_NAME_LEN];
 
-    int dset_num;
-
     /** Path to the socket that we use to talk to the ceph daemon */
     char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
 
-    /** The set of  key/value pairs that this daemon reports
-     * dset.type        The daemon name
-     * dset.ds_num      Number of data sources (key/value pairs) 
-     * dset.ds      Dynamically allocated array of key/value pairs
-     */
-    /** Dynamically allocated array **/
-    struct data_set_s *dset;
-    int **pc_types;
+    /** Number of counters */
+    int ds_num;
+    /** Track ds types */
+    uint32_t *ds_types;
+    /** Track ds names to match with types */
+    char **ds_names;
 };
 
 /******* JSON parsing *******/
@@ -128,6 +137,7 @@ typedef struct yajl_struct yajl_struct;
  * since last poll.
  */
 struct last_data **last_poll_data = NULL;
+/** index of last poll data */
 int last_idx = 0;
 
 enum perfcounter_type_d
@@ -154,21 +164,29 @@ static struct ceph_daemon **g_daemons = NULL;
 /** Number of elements in g_daemons */
 static int g_num_daemons = 0;
 
-struct values_holder
-{
-    int values_len;
-    value_t *values;
-};
-
 /**
- * A set of values_t data that we build up in memory while parsing the JSON.
+ * A set of data that we build up in memory while parsing the JSON.
  */
 struct values_tmp
 {
+    /** ceph daemon we are processing data for*/
     struct ceph_daemon *d;
-    int holder_num;
-    struct values_holder vh[0];
+    /** track avgcount across counters for avgcount/sum latency pairs */
     uint64_t avgcount;
+    /** current index of counters - used to get type of counter */
+    int index;
+    /** do we already have an avgcount for latency pair */
+    int avgcount_exists;
+    /**
+     * similar to index, but current index of latency type counters -
+     * used to get last poll data of counter
+     */
+    int latency_index;
+    /**
+     * values list - maintain across counters since
+     * host/plugin/plugin instance are always the same
+     */
+    value_list_t vlist;
 };
 
 /**
@@ -177,8 +195,7 @@ struct values_tmp
  */
 struct last_data
 {
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
+    char ds_name[DATA_MAX_NAME_LEN];
     double last_sum;
     uint64_t last_count;
 };
@@ -271,7 +288,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
                         (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
                         (strcmp(yajl->state[i].key,"avgcount") == 0))
                 {
-                    DEBUG("Skipping avgcount for filestore.JournalWrBytes");
+                    DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
                     yajl->depth = (yajl->depth - 1);
                     return CEPH_CB_CONTINUE;
                 }
@@ -372,7 +389,7 @@ static yajl_callbacks callbacks = {
 
 static void ceph_daemon_print(const struct ceph_daemon *d)
 {
-    DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
+    DEBUG("ceph plugin: name=%s, asok_path=%s", d->name, d->asok_path);
 }
 
 static void ceph_daemons_print(void)
@@ -387,17 +404,19 @@ static void ceph_daemons_print(void)
 static void ceph_daemon_free(struct ceph_daemon *d)
 {
     int i = 0;
-    for(; i < d->dset_num; i++)
+    for(; i < d->ds_num; i++)
     {
-        plugin_unregister_data_set((d->dset + i)->type);
-        sfree(d->dset->ds);
-        sfree(d->pc_types[i]);
+        sfree(d->ds_names[i]);
     }
-    sfree(d->dset);
-    sfree(d->pc_types);
+    sfree(d->ds_types);
+    sfree(d->ds_names);
     sfree(d);
 }
 
+/**
+ * Compact ds name by removing special characters and trimming length to
+ * DATA_MAX_NAME_LEN if necessary
+ */
 static void compact_ds_name(char *source, char *dest)
 {
     int keys_num = 0, i;
@@ -435,12 +454,13 @@ static void compact_ds_name(char *source, char *dest)
         strncat(tmp, keys[i], key_chars_remaining);
         key_chars_remaining -= strlen(keys[i]);
     }
-    /** to coordinate limitation of length of ds name from RRD
+    tmp[DATA_MAX_NAME_LEN - 1] = '\0';
+    /** to coordinate limitation of length of type_instance
      *  we will truncate ds_name
      *  when the its length is more than
-     *  MAX_RRD_DS_NAME_LEN
+     *  DATA_MAX_NAME_LEN
      */
-    if(strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
+    if(strlen(tmp) > DATA_MAX_NAME_LEN - 1)
     {
         append_status |= 0x4;
         /** we should reserve space for
@@ -462,7 +482,7 @@ static void compact_ds_name(char *source, char *dest)
          */
         reserved += 4;
     }
-    snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
+    snprintf(dest, DATA_MAX_NAME_LEN - reserved, "%s", tmp);
     offset = strlen(dest);
     switch (append_status)
     {
@@ -487,147 +507,58 @@ static void compact_ds_name(char *source, char *dest)
             break;
     }
 }
-static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
+
+/**
+ * Parse key to remove "type" if this is for schema and initiate compaction
+ */
+static int parse_keys(const char *key_str, char *ds_name)
 {
     char *ptr, *rptr;
-    size_t dset_name_len = 0;
     size_t ds_name_len = 0;
-    char tmp_ds_name[DATA_MAX_NAME_LEN];
+    /**
+     * allow up to 100 characters before compaction - compact_ds_name will not
+     * allow more than DATA_MAX_NAME_LEN chars
+     */
+    int max_str_len = 100;
+    char tmp_ds_name[max_str_len];
     memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
-    if(dset_name == NULL || ds_name == NULL || key_str == NULL ||
-            key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
+    if(ds_name == NULL || key_str == NULL ||  key_str[0] == '\0' || 
+                                                            ds_name[0] != '\0')
     {
         return -1;
     }
     if((ptr = strchr(key_str, '.')) == NULL
             || (rptr = strrchr(key_str, '.')) == NULL)
     {
-        strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
-        strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
+        memcpy(tmp_ds_name, key_str, max_str_len - 1);
         goto compact;
     }
-    dset_name_len =
-            (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
-                    (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
-    memcpy(dset_name, key_str, dset_name_len);
-    ds_name_len =
-           (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
-    if(ds_name_len == 0)
-    { /** only have two keys **/
-        if(!strncmp(rptr + 1, "type", 4))
-        {/** if last key is "type",ignore **/
-            strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
-        }
-        else
-        {/** if last key isn't "type", copy last key **/
-            strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
-        }
-    }
-    else if(!strncmp(rptr + 1, "type", 4))
-    {/** more than two keys **/
-        memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
+    ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr);
+    if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4))
+    { /** copy whole key **/
+        memcpy(tmp_ds_name, key_str, max_str_len - 1);
     }
     else
-    {/** copy whole keys **/
-        strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
+    {/** more than two keys **/
+        memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ?
+                (max_str_len - 1) : (rptr - key_str)));
     }
+    
     compact: compact_ds_name(tmp_ds_name, ds_name);
     return 0;
 }
 
-static int get_matching_dset(const struct ceph_daemon *d, const char *name)
-{
-    int idx;
-    for(idx = 0; idx < d->dset_num; ++idx)
-    {
-        if(strcmp(d->dset[idx].type, name) == 0)
-        {
-            return idx;
-        }
-    }
-    return -1;
-}
-
-static int get_matching_value(const struct data_set_s *dset, const char *name,
-        int num_values)
-{
-    int idx;
-    for(idx = 0; idx < num_values; ++idx)
-    {
-        if(strcmp(dset->ds[idx].name, name) == 0)
-        {
-            return idx;
-        }
-    }
-    return -1;
-}
-
+/**
+ * while parsing ceph admin socket schema, save counter name and type for later
+ * data processing
+ */
 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
         int pc_type)
 {
-    struct data_source_s *ds;
-    struct data_set_s *dset;
-    struct data_set_s *dset_array;
-    int **pc_types_array = NULL;
-    int *pc_types;
-    int *pc_types_new;
-    int idx = 0;
-    if(strlen(name) + 1 > DATA_MAX_NAME_LEN)
-    {
-        return -ENAMETOOLONG;
-    }
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
-    memset(dset_name, 0, sizeof(dset_name));
+    uint32_t type;
+    char ds_name[DATA_MAX_NAME_LEN];
     memset(ds_name, 0, sizeof(ds_name));
-    if(parse_keys(name, dset_name, ds_name))
-    {
-        return 1;
-    }
-    idx = get_matching_dset(d, dset_name);
-    if(idx == -1)
-    {/* need to add a dset **/
-        dset_array = realloc(d->dset,
-                sizeof(struct data_set_s) * (d->dset_num + 1));
-        if(!dset_array)
-        {
-            return -ENOMEM;
-        }
-        pc_types_array = realloc(d->pc_types,
-                sizeof(int *) * (d->dset_num + 1));
-        if(!pc_types_array)
-        {
-            return -ENOMEM;
-        }
-        dset = &dset_array[d->dset_num];
-        /** this step is very important, otherwise,
-         *  realloc for dset->ds will tricky because of
-         *  a random addr in dset->ds
-         */
-        memset(dset, 0, sizeof(struct data_set_s));
-        dset->ds_num = 0;
-        snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
-        pc_types = pc_types_array[d->dset_num] = NULL;
-        d->dset = dset_array;
-    }
-    else
-    {
-        dset = &d->dset[idx];
-        pc_types = d->pc_types[idx];
-    }
-    struct data_source_s *ds_array = realloc(dset->ds,
-            sizeof(struct data_source_s) * (dset->ds_num + 1));
-    if(!ds_array)
-    {
-        return -ENOMEM;
-    }
-    pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
-    if(!pc_types_new)
-    {
-        return -ENOMEM;
-    }
-    dset->ds = ds_array;
-
+    
     if(convert_special_metrics)
     {
         /**
@@ -638,35 +569,42 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
          * other "Bytes". Instead of keeping an "average" or "rate", use the
          * "sum" in the pair and assign that to the derive value.
          */
-        if((strcmp(dset_name,"filestore") == 0) &&
-                                        strcmp(ds_name, "JournalWrBytes") == 0)
+        if((strcmp(name,"filestore.journal_wr_bytes.type") == 0))
         {
             pc_type = 10;
         }
     }
 
-    if(idx == -1)
+    d->ds_names = realloc(d->ds_names, sizeof(char *) * (d->ds_num + 1));
+    if(!d->ds_names)
     {
-        pc_types_array[d->dset_num] = pc_types_new;
-        d->pc_types = pc_types_array;
-        d->pc_types[d->dset_num][dset->ds_num] = pc_type;
-        d->dset_num++;
+        return -ENOMEM;
     }
-    else
+
+    d->ds_types = realloc(d->ds_types, sizeof(uint32_t) * (d->ds_num + 1));
+    if(!d->ds_types)
+    {
+        return -ENOMEM;
+    }
+
+    d->ds_names[d->ds_num] = malloc(sizeof(char) * DATA_MAX_NAME_LEN);
+    if(!d->ds_names[d->ds_num])
     {
-        d->pc_types[idx] = pc_types_new;
-        d->pc_types[idx][dset->ds_num] = pc_type;
+        return -ENOMEM;
     }
-    ds = &ds_array[dset->ds_num++];
-    snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
-    ds->type = (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
 
-    /**
-     * Use min of 0 for DERIVE types so we don't get negative values on Ceph
-     * service restart
-     */
-    ds->min = (ds->type == DS_TYPE_DERIVE) ? 0 : NAN;
-    ds->max = NAN;
+    type = (pc_type & PERFCOUNTER_DERIVE) ? DSET_RATE :
+            ((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
+    d->ds_types[d->ds_num] = type;
+
+    if(parse_keys(name, ds_name))
+    {
+        return 1;
+    }
+
+    sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
+    d->ds_num = (d->ds_num + 1);
+    
     return 0;
 }
 
@@ -826,6 +764,9 @@ static int ceph_config(oconfig_item_t *ci)
     return 0;
 }
 
+/**
+ * Parse JSON and get error message if present
+ */
 static int
 traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
 {
@@ -848,27 +789,30 @@ traverse_json(const unsigned char *json, uint32_t json_len, yajl_handle hand)
     }
 }
 
+/**
+ * Add entry for each counter while parsing schema
+ */
 static int
 node_handler_define_schema(void *arg, const char *val, const char *key)
 {
     struct ceph_daemon *d = (struct ceph_daemon *) arg;
     int pc_type;
     pc_type = atoi(val);
-    DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
+    DEBUG("ceph plugin: ceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
             d->name, key, pc_type);
     return ceph_daemon_add_ds_entry(d, key, pc_type);
 }
 
-static int add_last(const char *dset_n, const char *ds_n, double cur_sum,
-        uint64_t cur_count)
+/**
+ * Latency counter does not yet have an entry in last poll data - add it.
+ */
+static int add_last(const char *ds_n, double cur_sum, uint64_t cur_count)
 {
     last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
     if(!last_poll_data[last_idx])
     {
         return -ENOMEM;
     }
-    sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,
-            sizeof(last_poll_data[last_idx]->dset_name));
     sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,
             sizeof(last_poll_data[last_idx]->ds_name));
     last_poll_data[last_idx]->last_sum = cur_sum;
@@ -877,19 +821,17 @@ static int add_last(const char *dset_n, const char *ds_n, double cur_sum,
     return 0;
 }
 
-static int update_last(const char *dset_n, const char *ds_n, double cur_sum,
+/**
+ * Update latency counter or add new entry if it doesn't exist
+ */
+static int update_last(const char *ds_n, int index, double cur_sum,
         uint64_t cur_count)
 {
-    int i;
-    for(i = 0; i < last_idx; i++)
+    if((last_idx > index) && (strcmp(last_poll_data[index]->ds_name, ds_n) == 0))
     {
-        if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0 &&
-                (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
-        {
-            last_poll_data[i]->last_sum = cur_sum;
-            last_poll_data[i]->last_count = cur_count;
-            return 0;
-        }
+        last_poll_data[index]->last_sum = cur_sum;
+        last_poll_data[index]->last_count = cur_count;
+        return 0;
     }
 
     if(!last_poll_data)
@@ -910,124 +852,173 @@ static int update_last(const char *dset_n, const char *ds_n, double cur_sum,
         }
         last_poll_data = tmp_last;
     }
-    return add_last(dset_n,ds_n,cur_sum,cur_count);
+    return add_last(ds_n, cur_sum, cur_count);
 }
 
-static double get_last_avg(const char *dset_n, const char *ds_n,
+/**
+ * Calculate average b/t current data and last poll data
+ * if last poll data exists
+ */
+static double get_last_avg(const char *ds_n, int index,
         double cur_sum, uint64_t cur_count)
 {
-    int i;
     double result = -1.1, sum_delt = 0.0;
     uint64_t count_delt = 0;
-    for(i = 0; i < last_idx; i++)
+    if((last_idx > index) &&
+            (strcmp(last_poll_data[index]->ds_name, ds_n) == 0) &&
+            (cur_count > last_poll_data[index]->last_count))
     {
-        if((strcmp(last_poll_data[i]->dset_name,dset_n) == 0) &&
-                (strcmp(last_poll_data[i]->ds_name,ds_n) == 0))
-        {
-            if(cur_count < last_poll_data[i]->last_count)
-            {
-                break;
-            }
-            sum_delt = (cur_sum - last_poll_data[i]->last_sum);
-            count_delt = (cur_count - last_poll_data[i]->last_count);
-            result = (sum_delt / count_delt);
-            break;
-        }
+        sum_delt = (cur_sum - last_poll_data[index]->last_sum);
+        count_delt = (cur_count - last_poll_data[index]->last_count);
+        result = (sum_delt / count_delt);
     }
 
     if(result == -1.1)
     {
         result = NAN;
     }
-    if(update_last(dset_n,ds_n,cur_sum,cur_count) == -ENOMEM)
+    if(update_last(ds_n, index, cur_sum, cur_count) == -ENOMEM)
     {
         return -ENOMEM;
     }
     return result;
 }
 
+/**
+ * If using index guess failed, resort to searching for counter name
+ */
+static uint32_t backup_search_for_type(struct ceph_daemon *d, char *ds_name)
+{
+    int idx = 0;
+    for(; idx < d->ds_num; idx++)
+    {
+        if(strcmp(d->ds_names[idx], ds_name) == 0)
+        {
+            return d->ds_types[idx];
+        }
+    }
+    return DSET_TYPE_UNFOUND;
+}
+
+/**
+ * Process counter data and dispatch values
+ */
 static int node_handler_fetch_data(void *arg, const char *val, const char *key)
 {
-    int dset_idx, ds_idx;
-    value_t *uv;
-    char dset_name[DATA_MAX_NAME_LEN];
-    char ds_name[MAX_RRD_DS_NAME_LEN];
+    value_t uv;
+    double tmp_d;
+    uint64_t tmp_u;
     struct values_tmp *vtmp = (struct values_tmp*) arg;
-    memset(dset_name, 0, sizeof(dset_name));
+    uint32_t type = DSET_TYPE_UNFOUND;
+    int index = vtmp->index;
+
+    char ds_name[DATA_MAX_NAME_LEN];
     memset(ds_name, 0, sizeof(ds_name));
-    if(parse_keys(key, dset_name, ds_name))
+
+    if(parse_keys(key, ds_name))
     {
-        DEBUG("enter node_handler_fetch_data");
         return 1;
     }
-    dset_idx = get_matching_dset(vtmp->d, dset_name);
-    if(dset_idx == -1)
+
+    if(index >= vtmp->d->ds_num)
     {
-        return 1;
+        //don't overflow bounds of array
+        index = (vtmp->d->ds_num - 1);
     }
-    ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
-            vtmp->d->dset[dset_idx].ds_num);
-    if(ds_idx == -1)
+    
+    /**
+     * counters should remain in same order we parsed schema... we maintain the
+     * index variable to keep track of current point in list of counters. first
+     * use index to guess point in array for retrieving type. if that doesn't
+     * work, use the old way to get the counter type
+     */
+    if(strcmp(ds_name, vtmp->d->ds_names[index]) == 0)
     {
-        DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
-            dset_name,ds_name,dset_idx,ds_idx);
-        return RETRY_AVGCOUNT;
+        //found match
+        type = vtmp->d->ds_types[index];
     }
-    uv = &(vtmp->vh[dset_idx].values[ds_idx]);
-
-    if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
+    else if((index > 0) && (strcmp(ds_name, vtmp->d->ds_names[index-1]) == 0))
     {
-        if(vtmp->avgcount == -1)
-        {
-            sscanf(val, "%" PRIu64, &vtmp->avgcount);
-        }
-        else
-        {
-            double sum, result;
-            sscanf(val, "%lf", &sum);
-            DEBUG("avgcount:%" PRIu64 "",vtmp->avgcount);
-            DEBUG("sum:%lf",sum);
+        //try previous key
+        type = vtmp->d->ds_types[index-1];
+    }
 
-            if(vtmp->avgcount == 0)
-            {
-                vtmp->avgcount = 1;
-            }
+    if(type == DSET_TYPE_UNFOUND)
+    {
+        //couldn't find right type by guessing, check the old way
+        type = backup_search_for_type(vtmp->d, ds_name);
+    }
 
-            /** User wants latency values as long run avg */
-            if(long_run_latency_avg)
+    switch(type)
+    {
+        case DSET_LATENCY:
+            if(vtmp->avgcount_exists == -1)
             {
-                result = (sum / vtmp->avgcount);
-                DEBUG("uv->gauge = sumd / avgcounti = :%lf", result);
+                sscanf(val, "%" PRIu64, &vtmp->avgcount);
+                vtmp->avgcount_exists = 0;
+                //return after saving avgcount - don't dispatch value
+                //until latency calculation
+                return 0;
             }
             else
             {
-                result = get_last_avg(dset_name, ds_name, sum, vtmp->avgcount);
-                if(result == -ENOMEM)
+                double sum, result;
+                sscanf(val, "%lf", &sum);
+                DEBUG("ceph plugin: avgcount:%" PRIu64,vtmp->avgcount);
+                DEBUG("ceph plugin: sum:%lf",sum);
+
+                if(vtmp->avgcount == 0)
                 {
-                    return -ENOMEM;
+                    vtmp->avgcount = 1;
+                }
+                
+                /** User wants latency values as long run avg */
+                if(long_run_latency_avg)
+                {
+                    result = (sum / vtmp->avgcount);
+                    DEBUG("ceph plugin: uv->gauge = sumd / avgcounti = :%lf", result);
+                }
+                else
+                {
+                    result = get_last_avg(ds_name, vtmp->latency_index, sum, vtmp->avgcount);
+                    if(result == -ENOMEM)
+                    {
+                        return -ENOMEM;
+                    }
+                    DEBUG("ceph plugin: uv->gauge = (sumd_now - sumd_last) / "
+                            "(avgcounti_now - avgcounti_last) = :%lf", result);
                 }
-                DEBUG("uv->gauge = (sumd_now - sumd_last) / "
-                        "(avgcounti_now - avgcounti_last) = :%lf", result);
-            }
 
-            uv->gauge = result;
-            vtmp->avgcount = -1;
-        }
-    }
-    else if(vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
-    {
-        uint64_t derive_val;
-        sscanf(val, "%" PRIu64, &derive_val);
-        uv->derive = derive_val;
-        DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
-    }
-    else
-    {
-        double other_val;
-        sscanf(val, "%lf", &other_val);
-        uv->gauge = other_val;
-        DEBUG("uv->gauge %lf",uv->gauge);
+                uv.gauge = result;
+                vtmp->avgcount_exists = -1;
+                vtmp->latency_index = (vtmp->latency_index + 1);
+            }
+            break;
+        case DSET_BYTES:
+            sscanf(val, "%lf", &tmp_d);
+            uv.gauge = tmp_d;
+            DEBUG("ceph plugin: uv->gauge = %lf",uv.gauge);
+            break;
+        case DSET_RATE:
+            sscanf(val, "%" PRIu64, &tmp_u);
+            uv.derive = tmp_u;
+            DEBUG("ceph plugin: uv->derive = %" PRIu64 "",(uint64_t)uv.derive);
+            break;
+        case DSET_TYPE_UNFOUND:
+        default:
+            ERROR("ceph plugin: ds %s was not properly initialized.", ds_name);
+            return -1;
     }
+
+    sstrncpy(vtmp->vlist.type, ceph_dset_types[type], sizeof(vtmp->vlist.type));
+    sstrncpy(vtmp->vlist.type_instance, ds_name, sizeof(vtmp->vlist.type_instance));
+    vtmp->vlist.values = &uv;
+    vtmp->vlist.values_len = 1;
+
+    DEBUG("ceph plugin: dispatching %s\n", ds_name);
+    vtmp->index = (vtmp->index + 1);
+    plugin_dispatch_values(&vtmp->vlist);
+
     return 0;
 }
 
@@ -1096,55 +1087,31 @@ static void cconn_close(struct cconn *io)
 static int
 cconn_process_data(struct cconn *io, yajl_struct *yajl, yajl_handle hand)
 {
-    int i, ret = 0;
-    struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp)
-                    + (sizeof(struct values_holder)) * io->d->dset_num);
+    int ret;
+    struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) * 1);
     if(!vtmp)
     {
         return -ENOMEM;
     }
 
-    for(i = 0; i < io->d->dset_num; i++)
-    {
-        value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
-        vtmp->vh[i].values = val;
-        vtmp->vh[i].values_len = io->d->dset[i].ds_num;
-    }
+    vtmp->vlist = (value_list_t)VALUE_LIST_INIT;
+    sstrncpy(vtmp->vlist.host, hostname_g, sizeof(vtmp->vlist.host));
+    sstrncpy(vtmp->vlist.plugin, "ceph", sizeof(vtmp->vlist.plugin));
+    sstrncpy(vtmp->vlist.plugin_instance, io->d->name, sizeof(vtmp->vlist.plugin_instance));
+
     vtmp->d = io->d;
-    vtmp->holder_num = io->d->dset_num;
-    vtmp->avgcount = -1;
+    vtmp->avgcount_exists = -1;
+    vtmp->latency_index = 0;
+    vtmp->index = 0;
     yajl->handler_arg = vtmp;
     ret = traverse_json(io->json, io->json_len, hand);
-    if(ret)
-    {
-        goto done;
-    }
-    for(i = 0; i < vtmp->holder_num; i++)
-    {
-        value_list_t vl = VALUE_LIST_INIT;
-        sstrncpy(vl.host, hostname_g, sizeof(vl.host));
-        sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
-        strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
-        sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
-        vl.values = vtmp->vh[i].values;
-        vl.values_len = io->d->dset[i].ds_num;
-        DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
-                io->d->name, vl.values_len, io->json);
-        ret = plugin_dispatch_values(&vl);
-        if(ret)
-        {
-            goto done;
-        }
-    }
-
-    done: for(i = 0; i < vtmp->holder_num; i++)
-    {
-        sfree(vtmp->vh[i].values);
-    }
     sfree(vtmp);
     return ret;
 }
 
+/**
+ * Initiate JSON parsing and print error if one occurs
+ */
 static int cconn_process_json(struct cconn *io)
 {
     if((io->request_type != ASOK_REQ_DATA) &&
@@ -1256,7 +1223,7 @@ static int cconn_handle_event(struct cconn *io)
             size_t cmd_len = strlen(cmd);
             RETRY_ON_EINTR(ret,
                   write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
                     io->d->name, io->state, io->amt, ret);
             if(ret < 0)
             {
@@ -1283,7 +1250,7 @@ static int cconn_handle_event(struct cconn *io)
             RETRY_ON_EINTR(ret,
                     read(io->asok, ((char*)(&io->d->version)) + io->amt,
                             sizeof(io->d->version) - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1299,7 +1266,7 @@ static int cconn_handle_event(struct cconn *io)
                         "expecting version %d!", io->d->name, io->d->version);
                     return -ENOTSUP;
                 }
-                DEBUG("cconn_handle_event(name=%s): identified as "
+                DEBUG("ceph plugin: cconn_handle_event(name=%s): identified as "
                         "version %d", io->d->name, io->d->version);
                 io->amt = 0;
                 cconn_close(io);
@@ -1312,7 +1279,7 @@ static int cconn_handle_event(struct cconn *io)
             RETRY_ON_EINTR(ret,
                     read(io->asok, ((char*)(&io->json_len)) + io->amt,
                             sizeof(io->json_len) - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1337,7 +1304,7 @@ static int cconn_handle_event(struct cconn *io)
         {
             RETRY_ON_EINTR(ret,
                    read(io->asok, io->json + io->amt, io->json_len - io->amt));
-            DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+            DEBUG("ceph plugin: cconn_handle_event(name=%s,state=%d,ret=%d)",
                     io->d->name, io->state, ret);
             if(ret < 0)
             {
@@ -1371,7 +1338,7 @@ static int cconn_prepare(struct cconn *io, struct pollfd* fds)
         /* The request has already been serviced. */
         return 0;
     }
-    else if((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
+    else if((io->request_type == ASOK_REQ_DATA) && (io->d->ds_num == 0))
     {
         /* If there are no counters to report on, don't bother
          * connecting */
@@ -1431,7 +1398,7 @@ static int cconn_main_loop(uint32_t request_type)
     struct timeval end_tv;
     struct cconn io_array[g_num_daemons];
 
-    DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
+    DEBUG("ceph plugin: entering cconn_main_loop(request_type = %d)", request_type);
 
     /* create cconn array */
     memset(io_array, 0, sizeof(io_array));
@@ -1468,7 +1435,7 @@ static int cconn_main_loop(uint32_t request_type)
             }
             else if(ret == 1)
             {
-                DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
+                DEBUG("ceph plugin: did cconn_prepare(name=%s,i=%d,st=%d)",
                         io->d->name, i, io->state);
                 polled_io_array[nfds++] = io_array + i;
             }
@@ -1477,7 +1444,7 @@ static int cconn_main_loop(uint32_t request_type)
         {
             /* finished */
             ret = 0;
-            DEBUG("cconn_main_loop: no more cconn to manage.");
+            DEBUG("ceph plugin: cconn_main_loop: no more cconn to manage.");
             goto done;
         }
         gettimeofday(&tv, NULL);
@@ -1532,11 +1499,11 @@ static int cconn_main_loop(uint32_t request_type)
     }
     if(some_unreachable)
     {
-        DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
+        DEBUG("ceph plugin: cconn_main_loop: some Ceph daemons were unreachable.");
     }
     else
     {
-        DEBUG("cconn_main_loop: reached all Ceph daemons :)");
+        DEBUG("ceph plugin: cconn_main_loop: reached all Ceph daemons :)");
     }
     return ret;
 }
@@ -1549,35 +1516,13 @@ static int ceph_read(void)
 /******* lifecycle *******/
 static int ceph_init(void)
 {
-    int i, ret, j;
-    DEBUG("ceph_init");
+    int ret;
+    DEBUG("ceph plugin: ceph_init");
     ceph_daemons_print();
 
     ret = cconn_main_loop(ASOK_REQ_VERSION);
-    if(ret)
-    {
-        return ret;
-    }
-    for(i = 0; i < g_num_daemons; ++i)
-    {
-        struct ceph_daemon *d = g_daemons[i];
-        for(j = 0; j < d->dset_num; j++)
-        {
-            ret = plugin_register_data_set(d->dset + j);
-            if(ret)
-            {
-                ERROR("ceph plugin: plugin_register_data_set(%s) failed!",
-                    d->name);
-            }
-            else
-            {
-                DEBUG("plugin_register_data_set(%s): "
-                        "(d->dset)[%d]->ds_num=%d",
-                        d->name, j, d->dset[j].ds_num);
-            }
-        }
-    }
-    return 0;
+
+    return (ret) ? ret : 0;
 }
 
 static int ceph_shutdown(void)
@@ -1597,7 +1542,7 @@ static int ceph_shutdown(void)
     sfree(last_poll_data);
     last_poll_data = NULL;
     last_idx = 0;
-    DEBUG("finished ceph_shutdown");
+    DEBUG("ceph plugin: finished ceph_shutdown");
     return 0;
 }