From bb54fd1902984a9130025d277f5cc3411d4c2f8b Mon Sep 17 00:00:00 2001 From: Pierre-Yves Ritschard Date: Thu, 20 Nov 2014 11:30:33 +0100 Subject: [PATCH] add support for custom commands in redis plugin Now that the redis plugin has moved to hiredis, it could be worthwhile to add support for custom commands. This diff implements a mechanism for executing commands which allows for setting the type and type-instance. It doesn not support hash or array returns, but if this is deemed necessary could be added later on. The canonical use case for this is for people using redis has a queue (for instance, using solutions such as rq, sidekiq and similar solutions) who want a simple way to ensure the work queue size is not growing. To address this you would use: ``` Exec "LLEN myqueue" Instance "myqueue" ``` This would then produce a redis-local/queue_length-myqueue value. If the idea has traction I'll add the doc bits. --- src/redis.c | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/src/redis.c b/src/redis.c index f5bb304f..02f3eddd 100644 --- a/src/redis.c +++ b/src/redis.c @@ -40,6 +40,7 @@ #define MAX_REDIS_NODE_NAME 64 #define MAX_REDIS_PASSWD_LENGTH 512 #define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_COMMAND 2048 /* Redis plugin configuration example: * @@ -53,6 +54,16 @@ * */ +struct redis_command_s; +typedef struct redis_command_s redis_command_t; +struct redis_command_s +{ + char command[MAX_REDIS_COMMAND]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_command_t *next; +}; + struct redis_node_s; typedef struct redis_node_s redis_node_t; struct redis_node_s @@ -62,6 +73,7 @@ struct redis_node_s char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; struct timeval timeout; + redis_command_t *commands; redis_node_t *next; }; @@ -110,10 +122,46 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */ return (0); } /* }}} */ +static redis_command_t *redis_config_command (oconfig_item_t *ci) /* {{{ */ +{ + redis_command_t *rc; + int status; + int i; + + rc = calloc(1, sizeof(*rc)); + if (rc == NULL) { + ERROR("redis plugin: calloca failed adding redis_command."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rc->type, sizeof(rc->type)); + if (status != 0) + goto err; + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Exec", option->key) == 0) { + status = cf_util_get_string_buffer(option, rc->command, sizeof(rc->command)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = cf_util_get_string_buffer(option, rc->instance, sizeof(rc->instance)); + } + if (status != 0) + goto err; + } + if (strlen(rc->command) == 0) { + WARNING("redis plugin: invalid command definition for: %s", rc->type); + goto err; + } + return rc; + err: + free(rc); + return NULL; +} /* }}} */ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ { redis_node_t rn; + redis_command_t *rc; int i; int status; int timeout; @@ -122,6 +170,7 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); rn.port = REDIS_DEF_PORT; rn.timeout.tv_usec = REDIS_DEF_TIMEOUT; + rn.commands = NULL; status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); if (status != 0) @@ -142,6 +191,16 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ status = 0; } } + else if (strcasecmp ("Command", option->key) == 0) + { + rc = redis_config_command(option); + if (rc == NULL) { + status =1; + } else { + rc->next = rn.commands; + rn.commands = rc; + } + } else if (strcasecmp ("Timeout", option->key) == 0) { status = cf_util_get_int (option, &timeout); @@ -256,9 +315,67 @@ int redis_handle_info (char *node, char const *info_line, char const *type, char } /* }}} int redis_handle_info */ +int redis_handle_command (redisContext *rh, redis_node_t *rn, redis_command_t *rc) /* {{{ */ +{ + redisReply *rr; + const data_set_t *ds; + value_t val; + + ds = plugin_get_ds (rc->type); + if (!ds) { + ERROR ("redis plugin: DataSet `%s' not defined.", rc->type); + return (-1); + } + + if (ds->ds_num != 1) { + ERROR ("redis plugin: DS `%s' has too many types.", rc->type); + return (-1); + } + + if ((rr = redisCommand(rh, rc->command)) == NULL) { + WARNING("redis plugin: unable to carry out command `%s'.", rc->command); + return (-1); + } + + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value (rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rc->type); + freeReplyObject (rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rc->type, (strlen(rc->instance) >0)?rc->instance:NULL, val); + freeReplyObject (rr); + return 0; +} /* }}} int redis_handle_info */ + static int redis_read (void) /* {{{ */ { redis_node_t *rn; + redis_command_t *rc; for (rn = nodes_head; rn != NULL; rn = rn->next) { @@ -312,6 +429,10 @@ static int redis_read (void) /* {{{ */ redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE); freeReplyObject (rr); + + for (rc = rn->commands; rc != NULL; rc = rc->next) + redis_handle_command(rh, rn, rc); + redisFree (rh); } -- 2.11.0