Merge remote-tracking branch 'origin/pr/598'
[collectd.git] / src / ceph.c
index e8bde9b..1b1f40b 100644 (file)
@@ -114,6 +114,14 @@ struct ceph_daemon
     uint32_t *ds_types;
     /** Track ds names to match with types */
     char **ds_names;
+
+    /**
+     * Keep track of last data for latency values so we can calculate rate
+     * since last poll.
+     */
+    struct last_data **last_poll_data;
+    /** index of last poll data */
+    int last_idx;
 };
 
 /******* JSON parsing *******/
@@ -132,14 +140,6 @@ struct yajl_struct
 };
 typedef struct yajl_struct yajl_struct;
 
-/**
- * Keep track of last data for latency values so we can calculate rate
- * since last poll.
- */
-struct last_data **last_poll_data = NULL;
-/** index of last poll data */
-int last_idx = 0;
-
 enum perfcounter_type_d
 {
     PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
@@ -200,7 +200,6 @@ struct last_data
     uint64_t last_count;
 };
 
-
 /******* network I/O *******/
 enum cstate_t
 {
@@ -256,7 +255,7 @@ static int ceph_cb_boolean(void *ctx, int bool_val)
     return CEPH_CB_CONTINUE;
 }
 
-static int 
+static int
 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
 {
     yajl_struct *yajl = (yajl_struct*)ctx;
@@ -321,7 +320,7 @@ ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
     return CEPH_CB_CONTINUE;
 }
 
-static int ceph_cb_string(void *ctx, const unsigned char *string_val, 
+static int ceph_cb_string(void *ctx, const unsigned char *string_val,
         yajl_len_t string_len)
 {
     return CEPH_CB_CONTINUE;
@@ -404,7 +403,14 @@ static void ceph_daemons_print(void)
 static void ceph_daemon_free(struct ceph_daemon *d)
 {
     int i = 0;
-    for(; i < d->ds_num; i++)
+    for(; i < d->last_idx; i++)
+    {
+        sfree(d->last_poll_data[i]);
+    }
+    sfree(d->last_poll_data);
+    d->last_poll_data = NULL;
+    d->last_idx = 0;
+    for(i = 0; i < d->ds_num; i++)
     {
         sfree(d->ds_names[i]);
     }
@@ -522,7 +528,7 @@ static int parse_keys(const char *key_str, char *ds_name)
     int max_str_len = 100;
     char tmp_ds_name[max_str_len];
     memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
-    if(ds_name == NULL || key_str == NULL ||  key_str[0] == '\0' || 
+    if(ds_name == NULL || key_str == NULL ||  key_str[0] == '\0' ||
                                                             ds_name[0] != '\0')
     {
         return -1;
@@ -533,6 +539,7 @@ static int parse_keys(const char *key_str, char *ds_name)
         memcpy(tmp_ds_name, key_str, max_str_len - 1);
         goto compact;
     }
+
     ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr);
     if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4))
     { /** copy whole key **/
@@ -543,7 +550,7 @@ static int parse_keys(const char *key_str, char *ds_name)
         memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ?
                 (max_str_len - 1) : (rptr - key_str)));
     }
-    
+
     compact: compact_ds_name(tmp_ds_name, ds_name);
     return 0;
 }
@@ -558,7 +565,7 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
     uint32_t type;
     char ds_name[DATA_MAX_NAME_LEN];
     memset(ds_name, 0, sizeof(ds_name));
-    
+
     if(convert_special_metrics)
     {
         /**
@@ -604,7 +611,7 @@ static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
 
     sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
     d->ds_num = (d->ds_num + 1);
-    
+
     return 0;
 }
 
@@ -700,6 +707,7 @@ static int cc_add_daemon_config(oconfig_item_t *ci)
                 "with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
         return -EINVAL;
     }
+
     array = realloc(g_daemons,
                     sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
     if(array == NULL)
@@ -798,86 +806,118 @@ node_handler_define_schema(void *arg, const char *val, const char *key)
     struct ceph_daemon *d = (struct ceph_daemon *) arg;
     int pc_type;
     pc_type = atoi(val);
-    DEBUG("ceph plugin: ceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
-            d->name, key, pc_type);
     return ceph_daemon_add_ds_entry(d, key, pc_type);
 }
 
 /**
  * Latency counter does not yet have an entry in last poll data - add it.
  */
-static int add_last(const char *ds_n, double cur_sum, uint64_t cur_count)
+static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
+        uint64_t cur_count)
 {
-    last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
-    if(!last_poll_data[last_idx])
+    d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
+    if(!d->last_poll_data[d->last_idx])
     {
         return -ENOMEM;
     }
-    sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,
-            sizeof(last_poll_data[last_idx]->ds_name));
-    last_poll_data[last_idx]->last_sum = cur_sum;
-    last_poll_data[last_idx]->last_count = cur_count;
-    last_idx++;
+    sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
+            sizeof(d->last_poll_data[d->last_idx]->ds_name));
+    d->last_poll_data[d->last_idx]->last_sum = cur_sum;
+    d->last_poll_data[d->last_idx]->last_count = cur_count;
+    d->last_idx = (d->last_idx + 1);
     return 0;
 }
 
 /**
  * Update latency counter or add new entry if it doesn't exist
  */
-static int update_last(const char *ds_n, int index, double cur_sum,
-        uint64_t cur_count)
+static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
+        double cur_sum, uint64_t cur_count)
 {
-    if((last_idx > index) && (strcmp(last_poll_data[index]->ds_name, ds_n) == 0))
+    if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
     {
-        last_poll_data[index]->last_sum = cur_sum;
-        last_poll_data[index]->last_count = cur_count;
+        d->last_poll_data[index]->last_sum = cur_sum;
+        d->last_poll_data[index]->last_count = cur_count;
         return 0;
     }
 
-    if(!last_poll_data)
+    if(!d->last_poll_data)
     {
-        last_poll_data = malloc(1 * sizeof(struct last_data *));
-        if(!last_poll_data)
+        d->last_poll_data = malloc(1 * sizeof(struct last_data *));
+        if(!d->last_poll_data)
         {
             return -ENOMEM;
         }
     }
     else
     {
-        struct last_data **tmp_last = realloc(last_poll_data,
-                ((last_idx+1) * sizeof(struct last_data *)));
+        struct last_data **tmp_last = realloc(d->last_poll_data,
+                ((d->last_idx+1) * sizeof(struct last_data *)));
         if(!tmp_last)
         {
             return -ENOMEM;
         }
-        last_poll_data = tmp_last;
+        d->last_poll_data = tmp_last;
     }
-    return add_last(ds_n, cur_sum, cur_count);
+    return add_last(d, ds_n, cur_sum, cur_count);
+}
+
+/**
+ * If using index guess failed (shouldn't happen, but possible if counters
+ * get rearranged), resort to searching for counter name
+ */
+static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
+{
+    int i = 0;
+    for(; i < d->last_idx; i++)
+    {
+        if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
+        {
+            return i;
+        }
+    }
+    return -1;
 }
 
 /**
  * Calculate average b/t current data and last poll data
  * if last poll data exists
  */
-static double get_last_avg(const char *ds_n, int index,
+static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
         double cur_sum, uint64_t cur_count)
 {
     double result = -1.1, sum_delt = 0.0;
     uint64_t count_delt = 0;
-    if((last_idx > index) &&
-            (strcmp(last_poll_data[index]->ds_name, ds_n) == 0) &&
-            (cur_count > last_poll_data[index]->last_count))
+    int tmp_index = 0;
+    if(d->last_idx > index)
     {
-        sum_delt = (cur_sum - last_poll_data[index]->last_sum);
-        count_delt = (cur_count - last_poll_data[index]->last_count);
-        result = (sum_delt / count_delt);
+        if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
+        {
+            tmp_index = index;
+        }
+        //test previous index
+        else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
+        {
+            tmp_index = (index - 1);
+        }
+        else
+        {
+            tmp_index = backup_search_for_last_avg(d, ds_n);
+        }
+
+        if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
+        {
+            sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
+            count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
+            result = (sum_delt / count_delt);
+        }
     }
 
     if(result == -1.1)
     {
         result = NAN;
     }
-    if(update_last(ds_n, index, cur_sum, cur_count) == -ENOMEM)
+    if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
     {
         return -ENOMEM;
     }
@@ -925,7 +965,7 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key)
         //don't overflow bounds of array
         index = (vtmp->d->ds_num - 1);
     }
-    
+
     /**
      * counters should remain in same order we parsed schema... we maintain the
      * index variable to keep track of current point in list of counters. first
@@ -964,29 +1004,24 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key)
             {
                 double sum, result;
                 sscanf(val, "%lf", &sum);
-                DEBUG("ceph plugin: avgcount:%" PRIu64,vtmp->avgcount);
-                DEBUG("ceph plugin: sum:%lf",sum);
 
                 if(vtmp->avgcount == 0)
                 {
                     vtmp->avgcount = 1;
                 }
-                
+
                 /** User wants latency values as long run avg */
                 if(long_run_latency_avg)
                 {
                     result = (sum / vtmp->avgcount);
-                    DEBUG("ceph plugin: uv->gauge = sumd / avgcounti = :%lf", result);
                 }
                 else
                 {
-                    result = get_last_avg(ds_name, vtmp->latency_index, sum, vtmp->avgcount);
+                    result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
                     if(result == -ENOMEM)
                     {
                         return -ENOMEM;
                     }
-                    DEBUG("ceph plugin: uv->gauge = (sumd_now - sumd_last) / "
-                            "(avgcounti_now - avgcounti_last) = :%lf", result);
                 }
 
                 uv.gauge = result;
@@ -997,12 +1032,10 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key)
         case DSET_BYTES:
             sscanf(val, "%lf", &tmp_d);
             uv.gauge = tmp_d;
-            DEBUG("ceph plugin: uv->gauge = %lf",uv.gauge);
             break;
         case DSET_RATE:
             sscanf(val, "%" PRIu64, &tmp_u);
             uv.derive = tmp_u;
-            DEBUG("ceph plugin: uv->derive = %" PRIu64 "",(uint64_t)uv.derive);
             break;
         case DSET_TYPE_UNFOUND:
         default:
@@ -1015,7 +1048,6 @@ static int node_handler_fetch_data(void *arg, const char *val, const char *key)
     vtmp->vlist.values = &uv;
     vtmp->vlist.values_len = 1;
 
-    DEBUG("ceph plugin: dispatching %s\n", ds_name);
     vtmp->index = (vtmp->index + 1);
     plugin_dispatch_values(&vtmp->vlist);
 
@@ -1147,6 +1179,10 @@ static int cconn_process_json(struct cconn *io)
             result = cconn_process_data(io, &io->yajl, hand);
             break;
         case ASOK_REQ_SCHEMA:
+            //init daemon specific variables
+            io->d->ds_num = 0;
+            io->d->last_idx = 0;
+            io->d->last_poll_data = NULL;
             io->yajl.handler = node_handler_define_schema;
             io->yajl.handler_arg = io->d;
             result = traverse_json(io->json, io->json_len, hand);
@@ -1196,7 +1232,6 @@ static int cconn_validate_revents(struct cconn *io, int revents)
         case CSTATE_READ_AMT:
         case CSTATE_READ_JSON:
             return (revents & POLLIN) ? 0 : -EINVAL;
-            return (revents & POLLIN) ? 0 : -EINVAL;
         default:
             ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
                 "illegal state on line %d", io->d->name, __LINE__);
@@ -1435,8 +1470,6 @@ static int cconn_main_loop(uint32_t request_type)
             }
             else if(ret == 1)
             {
-                DEBUG("ceph plugin: did cconn_prepare(name=%s,i=%d,st=%d)",
-                        io->d->name, i, io->state);
                 polled_io_array[nfds++] = io_array + i;
             }
         }
@@ -1444,7 +1477,6 @@ static int cconn_main_loop(uint32_t request_type)
         {
             /* finished */
             ret = 0;
-            DEBUG("ceph plugin: cconn_main_loop: no more cconn to manage.");
             goto done;
         }
         gettimeofday(&tv, NULL);
@@ -1517,7 +1549,6 @@ static int ceph_read(void)
 static int ceph_init(void)
 {
     int ret;
-    DEBUG("ceph plugin: ceph_init");
     ceph_daemons_print();
 
     ret = cconn_main_loop(ASOK_REQ_VERSION);
@@ -1535,13 +1566,6 @@ static int ceph_shutdown(void)
     sfree(g_daemons);
     g_daemons = NULL;
     g_num_daemons = 0;
-    for(i = 0; i < last_idx; i++)
-    {
-        sfree(last_poll_data[i]);
-    }
-    sfree(last_poll_data);
-    last_poll_data = NULL;
-    last_idx = 0;
     DEBUG("ceph plugin: finished ceph_shutdown");
     return 0;
 }