X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fredis.c;h=7864ead07c01c1edf079cecfb9c79474470cb277;hb=a7eecf6018a684dcf8323d4a41a7e704a5d57f02;hp=f5bb304f86b497b4f3b9965c216e22779af60ffe;hpb=e5980bd18554dc9472c49a12196a9d8f45a4bcf6;p=collectd.git diff --git a/src/redis.c b/src/redis.c index f5bb304f..7864ead0 100644 --- a/src/redis.c +++ b/src/redis.c @@ -40,6 +40,7 @@ #define MAX_REDIS_NODE_NAME 64 #define MAX_REDIS_PASSWD_LENGTH 512 #define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_QUERY 2048 /* Redis plugin configuration example: * @@ -53,6 +54,16 @@ * */ +struct redis_query_s; +typedef struct redis_query_s redis_query_t; +struct redis_query_s +{ + char query[MAX_REDIS_QUERY]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_query_t *next; +}; + struct redis_node_s; typedef struct redis_node_s redis_node_t; struct redis_node_s @@ -62,6 +73,7 @@ struct redis_node_s char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; struct timeval timeout; + redis_query_t *queries; redis_node_t *next; }; @@ -110,10 +122,52 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */ return (0); } /* }}} */ +static redis_query_t *redis_config_query (oconfig_item_t *ci) /* {{{ */ +{ + redis_query_t *rq; + int status; + int i; + + rq = calloc(1, sizeof(*rq)); + if (rq == NULL) { + ERROR("redis plugin: calloca failed adding redis_query."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query)); + if (status != 0) + goto err; + + /* + * Default to a gauge type. + */ + (void)strncpy(rq->type, "gauge", sizeof(rq->type)); + (void)strncpy(rq->instance, rq->query, sizeof(rq->instance)); + replace_special(rq->instance, sizeof(rq->instance)); + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Type", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance)); + } else { + WARNING("redis plugin: unknown configuration option: %s", option->key); + status = -1; + } + if (status != 0) + goto err; + } + return rq; + err: + free(rq); + return NULL; +} /* }}} */ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ { redis_node_t rn; + redis_query_t *rq; int i; int status; int timeout; @@ -122,6 +176,7 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); rn.port = REDIS_DEF_PORT; rn.timeout.tv_usec = REDIS_DEF_TIMEOUT; + rn.queries = NULL; status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); if (status != 0) @@ -142,6 +197,16 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ status = 0; } } + else if (strcasecmp ("Query", option->key) == 0) + { + rq = redis_config_query(option); + if (rq == NULL) { + status =1; + } else { + rq->next = rn.queries; + rn.queries = rq; + } + } else if (strcasecmp ("Timeout", option->key) == 0) { status = cf_util_get_int (option, &timeout); @@ -256,9 +321,67 @@ int redis_handle_info (char *node, char const *info_line, char const *type, char } /* }}} int redis_handle_info */ +int redis_handle_query (redisContext *rh, redis_node_t *rn, redis_query_t *rq) /* {{{ */ +{ + redisReply *rr; + const data_set_t *ds; + value_t val; + + ds = plugin_get_ds (rq->type); + if (!ds) { + ERROR ("redis plugin: DataSet `%s' not defined.", rq->type); + return (-1); + } + + if (ds->ds_num != 1) { + ERROR ("redis plugin: DS `%s' has too many types.", rq->type); + return (-1); + } + + if ((rr = redisCommand(rh, rq->query)) == NULL) { + WARNING("redis plugin: unable to carry out query `%s'.", rq->query); + return (-1); + } + + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value (rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rq->type); + freeReplyObject (rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rq->type, (strlen(rq->instance) >0)?rq->instance:NULL, val); + freeReplyObject (rr); + return 0; +} /* }}} int redis_handle_info */ + static int redis_read (void) /* {{{ */ { redis_node_t *rn; + redis_query_t *rq; for (rn = nodes_head; rn != NULL; rn = rn->next) { @@ -312,6 +435,10 @@ static int redis_read (void) /* {{{ */ redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE); freeReplyObject (rr); + + for (rq = rn->queries; rq != NULL; rq = rq->next) + redis_handle_query(rh, rn, rq); + redisFree (rh); }