/**
* collectd - src/ceph.c
* Copyright (C) 2011 New Dream Network
+ * Copyright (C) 2015 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Colin McCabe <cmccabe@alumni.cmu.edu>
- * Dennis Zou <yunzou@cisco.com>
- * Dan Ryder <daryder@cisco.com>
+ * Colin McCabe <cmccabe at alumni.cmu.edu>
+ * Dennis Zou <yunzou at cisco.com>
+ * Dan Ryder <daryder at cisco.com>
+ * Florian octo Forster <octo at collectd.org>
**/
+#define _DEFAULT_SOURCE
#define _BSD_SOURCE
#include "collectd.h"
uint32_t *ds_types;
/** Track ds names to match with types */
char **ds_names;
+
+ /**
+ * Keep track of last data for latency values so we can calculate rate
+ * since last poll.
+ */
+ struct last_data **last_poll_data;
+ /** index of last poll data */
+ int last_idx;
};
/******* JSON parsing *******/
{
node_handler_t handler;
void * handler_arg;
- struct {
- char key[DATA_MAX_NAME_LEN];
- int key_len;
- } state[YAJL_MAX_DEPTH];
- int depth;
+
+ char *key;
+ char *stack[YAJL_MAX_DEPTH];
+ size_t depth;
};
typedef struct yajl_struct yajl_struct;
-/**
- * Keep track of last data for latency values so we can calculate rate
- * since last poll.
- */
-struct last_data **last_poll_data = NULL;
-/** index of last poll data */
-int last_idx = 0;
-
enum perfcounter_type_d
{
PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
uint64_t last_count;
};
-
/******* network I/O *******/
enum cstate_t
{
return CEPH_CB_CONTINUE;
}
-static int
+#define BUFFER_ADD(dest, src) do { \
+ size_t dest_size = sizeof (dest); \
+ strncat ((dest), (src), dest_size - strlen (dest)); \
+ (dest)[dest_size - 1] = 0; \
+} while (0)
+
+static int
ceph_cb_number(void *ctx, const char *number_val, yajl_len_t number_len)
{
- yajl_struct *yajl = (yajl_struct*)ctx;
+ yajl_struct *state = (yajl_struct*) ctx;
char buffer[number_len+1];
- int i, latency_type = 0, result;
- char key[128];
+ char key[2 * DATA_MAX_NAME_LEN];
+ _Bool latency_type = 0;
+ size_t i;
+ int status;
memcpy(buffer, number_val, number_len);
buffer[sizeof(buffer) - 1] = 0;
- ssnprintf(key, yajl->state[0].key_len, "%s", yajl->state[0].key);
- for(i = 1; i < yajl->depth; i++)
+ memset (key, 0, sizeof (key));
+ for (i = 0; i < state->depth; i++)
+ {
+ if (state->stack[i] == NULL)
+ continue;
+
+ if (strlen (key) != 0)
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->stack[i]);
+ }
+
+ /* Special case for latency metrics. */
+ if ((strcmp ("avgcount", state->key) == 0)
+ || (strcmp ("sum", state->key) == 0))
{
- if((i == yajl->depth-1) && ((strcmp(yajl->state[i].key,"avgcount") == 0)
- || (strcmp(yajl->state[i].key,"sum") == 0)))
+ latency_type = 1;
+
+ /* Super-special case for filestore.journal_wr_bytes.avgcount: For
+ * some reason, Ceph schema encodes this as a count/sum pair while all
+ * other "Bytes" data (excluding used/capacity bytes for OSD space) uses
+ * a single "Derive" type. To spare further confusion, keep this KPI as
+ * the same type of other "Bytes". Instead of keeping an "average" or
+ * "rate", use the "sum" in the pair and assign that to the derive
+ * value. */
+ if (convert_special_metrics && (state->depth >= 2)
+ && (strcmp("filestore", state->stack[state->depth - 2]) == 0)
+ && (strcmp("journal_wr_bytes", state->stack[state->depth - 1]) == 0)
+ && (strcmp("avgcount", state->key) == 0))
{
- if(convert_special_metrics)
- {
- /**
- * Special case for filestore:JournalWrBytes. For some reason,
- * Ceph schema encodes this as a count/sum pair while all
- * other "Bytes" data (excluding used/capacity bytes for OSD
- * space) uses a single "Derive" type. To spare further
- * confusion, keep this KPI as the same type of other "Bytes".
- * Instead of keeping an "average" or "rate", use the "sum" in
- * the pair and assign that to the derive value.
- */
- if((strcmp(yajl->state[i-1].key, "journal_wr_bytes") == 0) &&
- (strcmp(yajl->state[i-2].key,"filestore") == 0) &&
- (strcmp(yajl->state[i].key,"avgcount") == 0))
- {
- DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
- yajl->depth = (yajl->depth - 1);
- return CEPH_CB_CONTINUE;
- }
- }
- //probably a avgcount/sum pair. if not - we'll try full key later
- latency_type = 1;
- break;
+ DEBUG("ceph plugin: Skipping avgcount for filestore.JournalWrBytes");
+ return CEPH_CB_CONTINUE;
}
- strncat(key, ".", 1);
- strncat(key, yajl->state[i].key, yajl->state[i].key_len+1);
+ }
+ else /* not a latency type */
+ {
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->key);
}
- result = yajl->handler(yajl->handler_arg, buffer, key);
-
- if((result == RETRY_AVGCOUNT) && latency_type)
+ status = state->handler(state->handler_arg, buffer, key);
+ if((status == RETRY_AVGCOUNT) && latency_type)
{
- strncat(key, ".", 1);
- strncat(key, yajl->state[yajl->depth-1].key,
- yajl->state[yajl->depth-1].key_len+1);
- result = yajl->handler(yajl->handler_arg, buffer, key);
+ /* Add previously skipped part of the key, either "avgcount" or "sum",
+ * and try again. */
+ BUFFER_ADD (key, ".");
+ BUFFER_ADD (key, state->key);
+
+ status = state->handler(state->handler_arg, buffer, key);
}
- if(result == -ENOMEM)
+ if (status != 0)
{
- ERROR("ceph plugin: memory allocation failed");
+ ERROR("ceph plugin: JSON handler failed with status %d.", status);
return CEPH_CB_ABORT;
}
- yajl->depth = (yajl->depth - 1);
return CEPH_CB_CONTINUE;
}
-static int ceph_cb_string(void *ctx, const unsigned char *string_val,
+static int ceph_cb_string(void *ctx, const unsigned char *string_val,
yajl_len_t string_len)
{
return CEPH_CB_CONTINUE;
static int ceph_cb_start_map(void *ctx)
{
+ yajl_struct *state = (yajl_struct*) ctx;
+
+ /* Push key to the stack */
+ if (state->depth == YAJL_MAX_DEPTH)
+ return CEPH_CB_ABORT;
+
+ state->stack[state->depth] = state->key;
+ state->depth++;
+ state->key = NULL;
+
return CEPH_CB_CONTINUE;
}
-static int
-ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
+static int ceph_cb_end_map(void *ctx)
{
- yajl_struct *yajl = (yajl_struct*)ctx;
+ yajl_struct *state = (yajl_struct*) ctx;
- if((yajl->depth+1) >= YAJL_MAX_DEPTH)
- {
- ERROR("ceph plugin: depth exceeds max, aborting.");
+ /* Pop key from the stack */
+ if (state->depth == 0)
return CEPH_CB_ABORT;
- }
-
- char buffer[string_len+1];
-
- memcpy(buffer, key, string_len);
- buffer[sizeof(buffer) - 1] = 0;
- snprintf(yajl->state[yajl->depth].key, sizeof(buffer), "%s", buffer);
- yajl->state[yajl->depth].key_len = sizeof(buffer);
- yajl->depth = (yajl->depth + 1);
+ sfree (state->key);
+ state->depth--;
+ state->key = state->stack[state->depth];
+ state->stack[state->depth] = NULL;
return CEPH_CB_CONTINUE;
}
-static int ceph_cb_end_map(void *ctx)
+static int
+ceph_cb_map_key(void *ctx, const unsigned char *key, yajl_len_t string_len)
{
- yajl_struct *yajl = (yajl_struct*)ctx;
+ yajl_struct *state = (yajl_struct*) ctx;
+ size_t sz = ((size_t) string_len) + 1;
+
+ sfree (state->key);
+ state->key = malloc (sz);
+ if (state->key == NULL)
+ {
+ ERROR ("ceph plugin: malloc failed.");
+ return CEPH_CB_ABORT;
+ }
+
+ memmove (state->key, key, sz - 1);
+ state->key[sz - 1] = 0;
- yajl->depth = (yajl->depth - 1);
return CEPH_CB_CONTINUE;
}
static void ceph_daemon_free(struct ceph_daemon *d)
{
int i = 0;
- for(; i < d->ds_num; i++)
+ for(; i < d->last_idx; i++)
+ {
+ sfree(d->last_poll_data[i]);
+ }
+ sfree(d->last_poll_data);
+ d->last_poll_data = NULL;
+ d->last_idx = 0;
+ for(i = 0; i < d->ds_num; i++)
{
sfree(d->ds_names[i]);
}
sfree(d);
}
-/**
- * Compact ds name by removing special characters and trimming length to
- * DATA_MAX_NAME_LEN if necessary
- */
-static void compact_ds_name(char *source, char *dest)
+/* compact_ds_name removed the special characters ":", "_", "-" and "+" from the
+ * intput string. Characters following these special characters are capitalized.
+ * Trailing "+" and "-" characters are replaces with the strings "Plus" and
+ * "Minus". */
+static int compact_ds_name (char *buffer, size_t buffer_size, char const *src)
{
- int keys_num = 0, i;
- char *save_ptr = NULL, *tmp_ptr = source;
- char *keys[16];
- char len_str[3];
- char tmp[DATA_MAX_NAME_LEN];
- size_t key_chars_remaining = (DATA_MAX_NAME_LEN-1);
- int reserved = 0;
- int offset = 0;
- memset(tmp, 0, sizeof(tmp));
- if(source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0')
- {
- return;
- }
- size_t src_len = strlen(source);
- snprintf(len_str, sizeof(len_str), "%zu", src_len);
- unsigned char append_status = 0x0;
- append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0;
- append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0;
- while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL)
- {
- tmp_ptr = NULL;
- /** capitalize 1st char **/
- keys[keys_num][0] = toupper(keys[keys_num][0]);
- keys_num++;
- if(keys_num >= 16)
- {
- break;
- }
- }
- /** concatenate each part of source string **/
- for(i = 0; i < keys_num; i++)
- {
- strncat(tmp, keys[i], key_chars_remaining);
- key_chars_remaining -= strlen(keys[i]);
- }
- tmp[DATA_MAX_NAME_LEN - 1] = '\0';
- /** to coordinate limitation of length of type_instance
- * we will truncate ds_name
- * when the its length is more than
- * DATA_MAX_NAME_LEN
- */
- if(strlen(tmp) > DATA_MAX_NAME_LEN - 1)
+ char *src_copy;
+ size_t src_len;
+ char *ptr = buffer;
+ size_t ptr_size = buffer_size;
+ _Bool append_plus = 0;
+ _Bool append_minus = 0;
+
+ if ((buffer == NULL) || (buffer_size <= strlen ("Minus")) || (src == NULL))
+ return EINVAL;
+
+ src_copy = strdup (src);
+ src_len = strlen(src);
+
+ /* Remove trailing "+" and "-". */
+ if (src_copy[src_len - 1] == '+')
{
- append_status |= 0x4;
- /** we should reserve space for
- * len_str
- */
- reserved += 2;
+ append_plus = 1;
+ src_len--;
+ src_copy[src_len] = 0;
}
- if(append_status & 0x1)
+ else if (src_copy[src_len - 1] == '-')
{
- /** we should reserve space for
- * "Minus"
- */
- reserved += 5;
+ append_minus = 1;
+ src_len--;
+ src_copy[src_len] = 0;
}
- if(append_status & 0x2)
+
+ /* Split at special chars, capitalize first character, append to buffer. */
+ char *dummy = src_copy;
+ char *token;
+ char *save_ptr = NULL;
+ while ((token = strtok_r (dummy, ":_-+", &save_ptr)) != NULL)
{
- /** we should reserve space for
- * "Plus"
- */
- reserved += 4;
+ size_t len;
+
+ dummy = NULL;
+
+ token[0] = toupper ((int) token[0]);
+
+ assert (ptr_size > 1);
+
+ len = strlen (token);
+ if (len >= ptr_size)
+ len = ptr_size - 1;
+
+ assert (len > 0);
+ assert (len < ptr_size);
+
+ sstrncpy (ptr, token, len + 1);
+ ptr += len;
+ ptr_size -= len;
+
+ assert (*ptr == 0);
+ if (ptr_size <= 1)
+ break;
}
- snprintf(dest, DATA_MAX_NAME_LEN - reserved, "%s", tmp);
- offset = strlen(dest);
- switch (append_status)
+
+ /* Append "Plus" or "Minus" if "+" or "-" has been stripped above. */
+ if (append_plus || append_minus)
{
- case 0x1:
- memcpy(dest + offset, "Minus", 5);
- break;
- case 0x2:
- memcpy(dest + offset, "Plus", 5);
- break;
- case 0x4:
- memcpy(dest + offset, len_str, 2);
- break;
- case 0x5:
- memcpy(dest + offset, "Minus", 5);
- memcpy(dest + offset + 5, len_str, 2);
- break;
- case 0x6:
- memcpy(dest + offset, "Plus", 4);
- memcpy(dest + offset + 4, len_str, 2);
- break;
- default:
- break;
+ char const *append = "Plus";
+ if (append_minus)
+ append = "Minus";
+
+ size_t offset = buffer_size - (strlen (append) + 1);
+ if (offset > strlen (buffer))
+ offset = strlen (buffer);
+
+ sstrncpy (buffer + offset, append, buffer_size - offset);
}
+
+ sfree (src_copy);
+ return 0;
+}
+
+static _Bool has_suffix (char const *str, char const *suffix)
+{
+ size_t str_len = strlen (str);
+ size_t suffix_len = strlen (suffix);
+ size_t offset;
+
+ if (suffix_len > str_len)
+ return 0;
+ offset = str_len - suffix_len;
+
+ if (strcmp (str + offset, suffix) == 0)
+ return 1;
+
+ return 0;
+}
+
+/* count_parts returns the number of elements a "foo.bar.baz" style key has. */
+static size_t count_parts (char const *key)
+{
+ char const *ptr;
+ size_t parts_num = 0;
+
+ for (ptr = key; ptr != NULL; ptr = strchr (ptr + 1, '.'))
+ parts_num++;
+
+ return parts_num;
}
/**
* Parse key to remove "type" if this is for schema and initiate compaction
*/
-static int parse_keys(const char *key_str, char *ds_name)
+static int parse_keys (char *buffer, size_t buffer_size, const char *key_str)
{
- char *ptr, *rptr;
- size_t ds_name_len = 0;
- /**
- * allow up to 100 characters before compaction - compact_ds_name will not
- * allow more than DATA_MAX_NAME_LEN chars
- */
- int max_str_len = 100;
- char tmp_ds_name[max_str_len];
- memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
- if(ds_name == NULL || key_str == NULL || key_str[0] == '\0' ||
- ds_name[0] != '\0')
- {
- return -1;
- }
- if((ptr = strchr(key_str, '.')) == NULL
- || (rptr = strrchr(key_str, '.')) == NULL)
+ char tmp[2 * buffer_size];
+
+ if (buffer == NULL || buffer_size == 0 || key_str == NULL || strlen (key_str) == 0)
+ return EINVAL;
+
+ if ((count_parts (key_str) > 2) && has_suffix (key_str, ".type"))
{
- memcpy(tmp_ds_name, key_str, max_str_len - 1);
- goto compact;
- }
- ds_name_len = (rptr - ptr) > max_str_len ? max_str_len : (rptr - ptr);
- if((ds_name_len == 0) || strncmp(rptr + 1, "type", 4))
- { /** copy whole key **/
- memcpy(tmp_ds_name, key_str, max_str_len - 1);
+ /* strip ".type" suffix iff the key has more than two parts. */
+ size_t sz = strlen (key_str) - strlen (".type") + 1;
+
+ if (sz > sizeof (tmp))
+ sz = sizeof (tmp);
+ sstrncpy (tmp, key_str, sz);
}
else
- {/** more than two keys **/
- memcpy(tmp_ds_name, key_str, ((rptr - key_str) > (max_str_len - 1) ?
- (max_str_len - 1) : (rptr - key_str)));
+ {
+ sstrncpy (tmp, key_str, sizeof (tmp));
}
-
- compact: compact_ds_name(tmp_ds_name, ds_name);
- return 0;
+
+ return compact_ds_name (buffer, buffer_size, tmp);
}
/**
uint32_t type;
char ds_name[DATA_MAX_NAME_LEN];
memset(ds_name, 0, sizeof(ds_name));
-
+
if(convert_special_metrics)
{
/**
((pc_type & PERFCOUNTER_LATENCY) ? DSET_LATENCY : DSET_BYTES);
d->ds_types[d->ds_num] = type;
- if(parse_keys(name, ds_name))
+ if (parse_keys(ds_name, sizeof (ds_name), name))
{
return 1;
}
sstrncpy(d->ds_names[d->ds_num], ds_name, DATA_MAX_NAME_LEN -1);
d->ds_num = (d->ds_num + 1);
-
+
return 0;
}
static int cc_add_daemon_config(oconfig_item_t *ci)
{
int ret, i;
- struct ceph_daemon *array, *nd, cd;
+ struct ceph_daemon *nd, cd;
+ struct ceph_daemon **tmp;
memset(&cd, 0, sizeof(struct ceph_daemon));
if((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
"with '/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
return -EINVAL;
}
- array = realloc(g_daemons,
- sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
- if(array == NULL)
+
+ tmp = realloc(g_daemons, (g_num_daemons+1) * sizeof(*g_daemons));
+ if(tmp == NULL)
{
/* The positive return value here indicates that this is a
* runtime error, not a configuration error. */
return ENOMEM;
}
- g_daemons = (struct ceph_daemon**) array;
- nd = malloc(sizeof(struct ceph_daemon));
+ g_daemons = tmp;
+
+ nd = malloc(sizeof(*nd));
if(!nd)
{
return ENOMEM;
}
- memcpy(nd, &cd, sizeof(struct ceph_daemon));
+ memcpy(nd, &cd, sizeof(*nd));
g_daemons[g_num_daemons++] = nd;
return 0;
}
struct ceph_daemon *d = (struct ceph_daemon *) arg;
int pc_type;
pc_type = atoi(val);
- DEBUG("ceph plugin: ceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
- d->name, key, pc_type);
return ceph_daemon_add_ds_entry(d, key, pc_type);
}
/**
* Latency counter does not yet have an entry in last poll data - add it.
*/
-static int add_last(const char *ds_n, double cur_sum, uint64_t cur_count)
+static int add_last(struct ceph_daemon *d, const char *ds_n, double cur_sum,
+ uint64_t cur_count)
{
- last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
- if(!last_poll_data[last_idx])
+ d->last_poll_data[d->last_idx] = malloc(1 * sizeof(struct last_data));
+ if(!d->last_poll_data[d->last_idx])
{
return -ENOMEM;
}
- sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,
- sizeof(last_poll_data[last_idx]->ds_name));
- last_poll_data[last_idx]->last_sum = cur_sum;
- last_poll_data[last_idx]->last_count = cur_count;
- last_idx++;
+ sstrncpy(d->last_poll_data[d->last_idx]->ds_name,ds_n,
+ sizeof(d->last_poll_data[d->last_idx]->ds_name));
+ d->last_poll_data[d->last_idx]->last_sum = cur_sum;
+ d->last_poll_data[d->last_idx]->last_count = cur_count;
+ d->last_idx = (d->last_idx + 1);
return 0;
}
/**
* Update latency counter or add new entry if it doesn't exist
*/
-static int update_last(const char *ds_n, int index, double cur_sum,
- uint64_t cur_count)
+static int update_last(struct ceph_daemon *d, const char *ds_n, int index,
+ double cur_sum, uint64_t cur_count)
{
- if((last_idx > index) && (strcmp(last_poll_data[index]->ds_name, ds_n) == 0))
+ if((d->last_idx > index) && (strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0))
{
- last_poll_data[index]->last_sum = cur_sum;
- last_poll_data[index]->last_count = cur_count;
+ d->last_poll_data[index]->last_sum = cur_sum;
+ d->last_poll_data[index]->last_count = cur_count;
return 0;
}
- if(!last_poll_data)
+ if(!d->last_poll_data)
{
- last_poll_data = malloc(1 * sizeof(struct last_data *));
- if(!last_poll_data)
+ d->last_poll_data = malloc(1 * sizeof(struct last_data *));
+ if(!d->last_poll_data)
{
return -ENOMEM;
}
}
else
{
- struct last_data **tmp_last = realloc(last_poll_data,
- ((last_idx+1) * sizeof(struct last_data *)));
+ struct last_data **tmp_last = realloc(d->last_poll_data,
+ ((d->last_idx+1) * sizeof(struct last_data *)));
if(!tmp_last)
{
return -ENOMEM;
}
- last_poll_data = tmp_last;
+ d->last_poll_data = tmp_last;
+ }
+ return add_last(d, ds_n, cur_sum, cur_count);
+}
+
+/**
+ * If using index guess failed (shouldn't happen, but possible if counters
+ * get rearranged), resort to searching for counter name
+ */
+static int backup_search_for_last_avg(struct ceph_daemon *d, const char *ds_n)
+{
+ int i = 0;
+ for(; i < d->last_idx; i++)
+ {
+ if(strcmp(d->last_poll_data[i]->ds_name, ds_n) == 0)
+ {
+ return i;
+ }
}
- return add_last(ds_n, cur_sum, cur_count);
+ return -1;
}
/**
* Calculate average b/t current data and last poll data
* if last poll data exists
*/
-static double get_last_avg(const char *ds_n, int index,
+static double get_last_avg(struct ceph_daemon *d, const char *ds_n, int index,
double cur_sum, uint64_t cur_count)
{
double result = -1.1, sum_delt = 0.0;
uint64_t count_delt = 0;
- if((last_idx > index) &&
- (strcmp(last_poll_data[index]->ds_name, ds_n) == 0) &&
- (cur_count > last_poll_data[index]->last_count))
+ int tmp_index = 0;
+ if(d->last_idx > index)
{
- sum_delt = (cur_sum - last_poll_data[index]->last_sum);
- count_delt = (cur_count - last_poll_data[index]->last_count);
- result = (sum_delt / count_delt);
+ if(strcmp(d->last_poll_data[index]->ds_name, ds_n) == 0)
+ {
+ tmp_index = index;
+ }
+ //test previous index
+ else if((index > 0) && (strcmp(d->last_poll_data[index-1]->ds_name, ds_n) == 0))
+ {
+ tmp_index = (index - 1);
+ }
+ else
+ {
+ tmp_index = backup_search_for_last_avg(d, ds_n);
+ }
+
+ if((tmp_index > -1) && (cur_count > d->last_poll_data[tmp_index]->last_count))
+ {
+ sum_delt = (cur_sum - d->last_poll_data[tmp_index]->last_sum);
+ count_delt = (cur_count - d->last_poll_data[tmp_index]->last_count);
+ result = (sum_delt / count_delt);
+ }
}
if(result == -1.1)
{
result = NAN;
}
- if(update_last(ds_n, index, cur_sum, cur_count) == -ENOMEM)
+ if(update_last(d, ds_n, tmp_index, cur_sum, cur_count) == -ENOMEM)
{
return -ENOMEM;
}
char ds_name[DATA_MAX_NAME_LEN];
memset(ds_name, 0, sizeof(ds_name));
- if(parse_keys(key, ds_name))
+ if (parse_keys (ds_name, sizeof (ds_name), key))
{
return 1;
}
//don't overflow bounds of array
index = (vtmp->d->ds_num - 1);
}
-
+
/**
* counters should remain in same order we parsed schema... we maintain the
* index variable to keep track of current point in list of counters. first
{
double sum, result;
sscanf(val, "%lf", &sum);
- DEBUG("ceph plugin: avgcount:%" PRIu64,vtmp->avgcount);
- DEBUG("ceph plugin: sum:%lf",sum);
if(vtmp->avgcount == 0)
{
vtmp->avgcount = 1;
}
-
+
/** User wants latency values as long run avg */
if(long_run_latency_avg)
{
result = (sum / vtmp->avgcount);
- DEBUG("ceph plugin: uv->gauge = sumd / avgcounti = :%lf", result);
}
else
{
- result = get_last_avg(ds_name, vtmp->latency_index, sum, vtmp->avgcount);
+ result = get_last_avg(vtmp->d, ds_name, vtmp->latency_index, sum, vtmp->avgcount);
if(result == -ENOMEM)
{
return -ENOMEM;
}
- DEBUG("ceph plugin: uv->gauge = (sumd_now - sumd_last) / "
- "(avgcounti_now - avgcounti_last) = :%lf", result);
}
uv.gauge = result;
case DSET_BYTES:
sscanf(val, "%lf", &tmp_d);
uv.gauge = tmp_d;
- DEBUG("ceph plugin: uv->gauge = %lf",uv.gauge);
break;
case DSET_RATE:
sscanf(val, "%" PRIu64, &tmp_u);
uv.derive = tmp_u;
- DEBUG("ceph plugin: uv->derive = %" PRIu64 "",(uint64_t)uv.derive);
break;
case DSET_TYPE_UNFOUND:
default:
vtmp->vlist.values = &uv;
vtmp->vlist.values_len = 1;
- DEBUG("ceph plugin: dispatching %s\n", ds_name);
vtmp->index = (vtmp->index + 1);
plugin_dispatch_values(&vtmp->vlist);
{
ERROR("ceph plugin: cconn_connect: connect(%d) failed: error %d",
fd, err);
+ close(fd);
return err;
}
err = -errno;
ERROR("ceph plugin: cconn_connect: fcntl(%d, O_NONBLOCK) error %d",
fd, err);
+ close(fd);
return err;
}
io->asok = fd;
result = cconn_process_data(io, &io->yajl, hand);
break;
case ASOK_REQ_SCHEMA:
+ //init daemon specific variables
+ io->d->ds_num = 0;
+ io->d->last_idx = 0;
+ io->d->last_poll_data = NULL;
io->yajl.handler = node_handler_define_schema;
io->yajl.handler_arg = io->d;
result = traverse_json(io->json, io->json_len, hand);
case CSTATE_READ_AMT:
case CSTATE_READ_JSON:
return (revents & POLLIN) ? 0 : -EINVAL;
- return (revents & POLLIN) ? 0 : -EINVAL;
default:
ERROR("ceph plugin: cconn_validate_revents(name=%s) got to "
"illegal state on line %d", io->d->name, __LINE__);
}
else if(ret == 1)
{
- DEBUG("ceph plugin: did cconn_prepare(name=%s,i=%d,st=%d)",
- io->d->name, i, io->state);
polled_io_array[nfds++] = io_array + i;
}
}
{
/* finished */
ret = 0;
- DEBUG("ceph plugin: cconn_main_loop: no more cconn to manage.");
goto done;
}
gettimeofday(&tv, NULL);
static int ceph_init(void)
{
int ret;
- DEBUG("ceph plugin: ceph_init");
ceph_daemons_print();
ret = cconn_main_loop(ASOK_REQ_VERSION);
sfree(g_daemons);
g_daemons = NULL;
g_num_daemons = 0;
- for(i = 0; i < last_idx; i++)
- {
- sfree(last_poll_data[i]);
- }
- sfree(last_poll_data);
- last_poll_data = NULL;
- last_idx = 0;
DEBUG("ceph plugin: finished ceph_shutdown");
return 0;
}
plugin_register_read("ceph", ceph_read);
plugin_register_shutdown("ceph", ceph_shutdown);
}
+/* vim: set sw=4 sts=4 et : */