ceph plugin: Rewrite handling of JSON state.
authorFlorian Forster <octo@collectd.org>
Thu, 3 Dec 2015 12:05:03 +0000 (13:05 +0100)
committerFlorian Forster <octo@collectd.org>
Thu, 3 Dec 2015 12:05:03 +0000 (13:05 +0100)
The previous code didn't handle non-numeric map values correctly,
leaking state and resulting in ridiculously long key strings. This
rewrite fixes this and adds a unit test to ensure that this is actually
working as intended.

Fixes: #1350

src/ceph.c
src/ceph_test.c

index 5b83adc..251def9 100644 (file)
@@ -135,11 +135,10 @@ struct yajl_struct
 {
     node_handler_t handler;
     void * handler_arg;
-    struct {
-      char key[DATA_MAX_NAME_LEN];
-      int key_len;
-    } state[YAJL_MAX_DEPTH];
-    int depth;
+
+    char *key;
+    char *stack[YAJL_MAX_DEPTH];
+    size_t depth;
 };
 typedef struct yajl_struct yajl_struct;
 
@@ -267,69 +266,72 @@ static int ceph_cb_boolean(void *ctx, int bool_val)
 static int
 ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
 {
-    yajl_struct *yajl = (yajl_struct*)ctx;
+    yajl_struct *state = (yajl_struct*) ctx;
     char buffer[number_len+1];
-    int i, status;
     char key[2 * DATA_MAX_NAME_LEN];
     _Bool latency_type = 0;
+    size_t i;
+    int status;
 
     memcpy(buffer, number_val, number_len);
     buffer[sizeof(buffer) - 1] = 0;
 
-    sstrncpy (key, yajl->state[0].key, sizeof (key));
-    for (i = 1; i < yajl->depth; i++)
+    memset (key, 0, sizeof (key));
+    for (i = 0; i < state->depth; i++)
     {
-        /* Special case for latency metrics. */
-        if ((i == yajl->depth-1)
-                && ((strcmp(yajl->state[i].key,"avgcount") == 0)
-                    || (strcmp(yajl->state[i].key,"sum") == 0)))
-        {
-            /* Super-special case for filestore:JournalWrBytes. For some
-             * reason, Ceph schema encodes this as a count/sum pair while all
-             * other "Bytes" data (excluding used/capacity bytes for OSD space)
-             * uses a single "Derive" type. To spare further confusion, keep
-             * this KPI as the same type of other "Bytes". Instead of keeping
-             * an "average" or "rate", use the "sum" in the pair and assign
-             * that to the derive value. */
-            if (convert_special_metrics && (i >= 2)
-                    && (strcmp("filestore", yajl->state[i-2].key) == 0)
-                    && (strcmp("journal_wr_bytes", yajl->state[i-1].key) == 0)
-                    && (strcmp("avgcount", yajl->state[i].key) == 0))
-            {
-                DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
-                yajl->depth -= 1;
-                return CEPH_CB_CONTINUE;
-            }
+        if (state->stack[i] == NULL)
+            continue;
 
-            /* Don't add "avgcount" or "sum" to the key just yet. If the
-             * handler function signals RETRY_AVGCOUNT, we'll add it and try
-             * again. */
-            latency_type = 1;
-            break;
-        }
+        if (strlen (key) != 0)
+            BUFFER_ADD (key, ".");
+        BUFFER_ADD (key, state->stack[i]);
+    }
+
+    /* Special case for latency metrics. */
+    if ((strcmp ("avgcount", state->key) == 0)
+        || (strcmp ("sum", state->key) == 0))
+    {
+        latency_type = 1;
 
+        /* Super-special case for filestore.journal_wr_bytes.avgcount: For
+         * some reason, Ceph schema encodes this as a count/sum pair while all
+         * other "Bytes" data (excluding used/capacity bytes for OSD space) uses
+         * a single "Derive" type. To spare further confusion, keep this KPI as
+         * the same type of other "Bytes". Instead of keeping an "average" or
+         * "rate", use the "sum" in the pair and assign that to the derive
+         * value. */
+        if (convert_special_metrics && (state->depth >= 2)
+            && (strcmp("filestore", state->stack[state->depth - 2]) == 0)
+            && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0)
+            && (strcmp("avgcount", state->key) == 0))
+        {
+            DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
+            return CEPH_CB_CONTINUE;
+        }
+    }
+    else /* not a latency type */
+    {
         BUFFER_ADD (key, ".");
-        BUFFER_ADD (key, yajl->state[i].key);
+        BUFFER_ADD (key, state->key);
     }
 
-    status = yajl->handler(yajl->handler_arg, buffer, key);
+    status = state->handler(state->handler_arg, buffer, key);
     if((status == RETRY_AVGCOUNT) && latency_type)
     {
         /* Add previously skipped part of the key, either "avgcount" or "sum",
          * and try again. */
         BUFFER_ADD (key, ".");
-        BUFFER_ADD (key, yajl->state[yajl->depth-1].key);
+        BUFFER_ADD (key, state->key);
 
-        status = yajl->handler(yajl->handler_arg, buffer, key);
+        status = state->handler(state->handler_arg, buffer, key);
     }
 
-    if(status == -ENOMEM)
+    if (status != 0)
     {
-        ERROR("ceph plugin: memory allocation failed");
+        ERROR("ceph plugin: JSON handler failed with status %d.", status);
         return CEPH_CB_ABORT;
     }
 
-    yajl->depth -= 1;
     return CEPH_CB_CONTINUE;
 }
 
@@ -341,37 +343,52 @@ static int ceph_cb_string(void *ctx, const unsigned char *string_val,
 
 static int ceph_cb_start_map(void *ctx)
 {
+    yajl_struct *state = (yajl_struct*) ctx;
+
+    /* Push key to the stack */
+    if (state->depth == YAJL_MAX_DEPTH)
+        return CEPH_CB_ABORT;
+
+    state->stack[state->depth] = state->key;
+    state->depth++;
+    state->key = NULL;
+
     return CEPH_CB_CONTINUE;
 }
 
-static int
-ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
+static int ceph_cb_end_map(void *ctx)
 {
-    yajl_struct *yajl = (yajl_struct*)ctx;
+    yajl_struct *state = (yajl_struct*) ctx;
 
-    if((yajl->depth+1)  >= YAJL_MAX_DEPTH)
-    {
-        ERROR("ceph plugin: depth exceeds max, aborting.");
+    /* Pop key from the stack */
+    if (state->depth == 0)
         return CEPH_CB_ABORT;
-    }
-
-    char buffer[string_len+1];
-
-    memcpy(buffer, key, string_len);
-    buffer[sizeof(buffer) - 1] = 0;
 
-    snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer);
-    yajl->state[yajl->depth].key_len = sizeof(buffer);
-    yajl->depth = (yajl->depth + 1);
+    sfree (state->key);
+    state->depth--;
+    state->key = state->stack[state->depth];
+    state->stack[state->depth] = NULL;
 
     return CEPH_CB_CONTINUE;
 }
 
-static int ceph_cb_end_map(void *ctx)
+static int
+ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
 {
-    yajl_struct *yajl = (yajl_struct*)ctx;
+    yajl_struct *state = (yajl_struct*) ctx;
+    size_t sz = ((size_t) string_len) + 1;
+
+    sfree (state->key);
+    state->key = malloc (sz);
+    if (state->key == NULL)
+    {
+        ERROR ("ceph plugin: malloc failed.");
+        return CEPH_CB_ABORT;
+    }
+
+    memmove (state->key, key, sz - 1);
+    state->key[sz - 1] = 0;
 
-    yajl->depth = (yajl->depth - 1);
     return CEPH_CB_CONTINUE;
 }
 
index ffe2c7f..ae67125 100644 (file)
 #include "ceph.c" /* sic */
 #include "testing.h"
 
+struct case_s
+{
+  char *key;
+  char *value;
+};
+typedef struct case_s case_t;
+
+struct test_s
+{
+  case_t *cases;
+  size_t  cases_num;
+};
+typedef struct test_s test_t;
+
+static int test_handler(void *user, char const *val, char const *key)
+{
+  test_t *t = user;
+  size_t i;
+
+  char status[1024];
+  _Bool ok;
+
+  /* special case for latency metrics. */
+  if (strcmp ("filestore.example_latency", key) == 0)
+    return RETRY_AVGCOUNT;
+
+  snprintf (status, sizeof (status), "unexpected call: test_handler(\"%s\") = \"%s\"", key, val);
+  ok = 0;
+
+  for (i = 0; i < t->cases_num; i++)
+  {
+    if (strcmp (key, t->cases[i].key) != 0)
+      continue;
+
+    if (strcmp (val, t->cases[i].value) != 0)
+    {
+      snprintf (status, sizeof (status), "test_handler(\"%s\") = \"%s\", want \"%s\"", key, val, t->cases[i].value);
+      ok = 0;
+      break;
+    }
+
+    snprintf (status, sizeof (status), "test_handler(\"%s\") = \"%s\"", key, val);
+    ok = 1;
+    break;
+  }
+
+  OK1(ok, status);
+  return ok ? 0 : -1;
+}
+
+DEF_TEST(traverse_json)
+{
+  char const *json = "{\n"
+      "    \"WBThrottle\": {\n"
+      "        \"bytes_dirtied\": {\n"
+      "            \"type\": 2,\n"
+      "            \"description\": \"Dirty data\",\n"
+      "            \"nick\": \"\"\n"
+      "        },\n"
+      "        \"bytes_wb\": {\n"
+      "            \"type\": 2,\n"
+      "            \"description\": \"Written data\",\n"
+      "            \"nick\": \"\"\n"
+      "        },\n"
+      "        \"ios_dirtied\": {\n"
+      "            \"type\": 2,\n"
+      "            \"description\": \"Dirty operations\",\n"
+      "            \"nick\": \"\"\n"
+      "        },\n"
+      "        \"ios_wb\": {\n"
+      "            \"type\": 2,\n"
+      "            \"description\": \"Written operations\",\n"
+      "            \"nick\": \"\"\n"
+      "        },\n"
+      "        \"inodes_dirtied\": {\n"
+      "            \"type\": 2,\n"
+      "            \"description\": \"Entries waiting for write\",\n"
+      "            \"nick\": \"\"\n"
+      "        },\n"
+      "        \"inodes_wb\": {\n"
+      "            \"type\": 10,\n"
+      "            \"description\": \"Written entries\",\n"
+      "            \"nick\": \"\"\n"
+      "        }\n"
+      "    },\n"
+      "    \"filestore\": {\n"
+      "        \"journal_wr_bytes\": {\n"
+      "            \"avgcount\": 23,\n"
+      "            \"sum\": 3117\n"
+      "        },\n"
+      "        \"example_latency\": {\n"
+      "            \"avgcount\": 42,\n"
+      "            \"sum\": 4711\n"
+      "        }\n"
+      "    }\n"
+      "}\n";
+  case_t cases[] = {
+    {"WBThrottle.bytes_dirtied.type", "2"},
+    {"WBThrottle.bytes_wb.type", "2"},
+    {"WBThrottle.ios_dirtied.type", "2"},
+    {"WBThrottle.ios_wb.type", "2"},
+    {"WBThrottle.inodes_dirtied.type", "2"},
+    {"WBThrottle.inodes_wb.type", "10"},
+    {"filestore.journal_wr_bytes", "3117"},
+    {"filestore.example_latency.avgcount", "42"},
+    {"filestore.example_latency.sum", "4711"},
+  };
+  test_t t = {cases, STATIC_ARRAY_SIZE (cases)};
+
+  yajl_struct ctx = {test_handler, &t};
+
+  yajl_handle hndl;
+#if HAVE_YAJL_V2
+  hndl = yajl_alloc (&callbacks, NULL, &ctx);
+  CHECK_ZERO (traverse_json ((unsigned char *) json, (uint32_t) strlen (json), hndl));
+  CHECK_ZERO (yajl_complete_parse (hndl));
+#else
+  hndl = yajl_alloc (&callbacks, NULL, NULL, &ctx);
+  CHECK_ZERO (traverse_json ((unsigned char *) json, (uint32_t) strlen (json), hndl));
+  CHECK_ZERO (yajl_parse_complete (hndl));
+#endif
+
+  return 0;
+}
+
 DEF_TEST(parse_keys)
 {
   struct {
@@ -37,6 +162,7 @@ DEF_TEST(parse_keys)
     {"aa.bb.cc.dd.ee.ff", "Aa.bb.cc.dd.ee.ff"},
     {"aa.bb.cc.dd.ee.ff.type", "Aa.bb.cc.dd.ee.ff"},
     {"aa.type", "Aa.type"},
+    {"WBThrottle.bytes_dirtied.type", "WBThrottle.bytesDirtied"},
   };
   size_t i;
 
@@ -56,6 +182,7 @@ DEF_TEST(parse_keys)
 
 int main (void)
 {
+  RUN_TEST(traverse_json);
   RUN_TEST(parse_keys);
 
   END_TEST;