From: Pierre-Yves Ritschard Date: Thu, 20 Nov 2014 10:30:33 +0000 (+0100) Subject: add support for custom commands in redis plugin X-Git-Tag: collectd-5.5.0~129^2~4 X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=bb54fd1902984a9130025d277f5cc3411d4c2f8b add support for custom commands in redis plugin Now that the redis plugin has moved to hiredis, it could be worthwhile to add support for custom commands. This diff implements a mechanism for executing commands which allows for setting the type and type-instance. It doesn not support hash or array returns, but if this is deemed necessary could be added later on. The canonical use case for this is for people using redis has a queue (for instance, using solutions such as rq, sidekiq and similar solutions) who want a simple way to ensure the work queue size is not growing. To address this you would use: ``` Exec "LLEN myqueue" Instance "myqueue" ``` This would then produce a redis-local/queue_length-myqueue value. If the idea has traction I'll add the doc bits. --- diff --git a/src/redis.c b/src/redis.c index f5bb304f..02f3eddd 100644 --- a/src/redis.c +++ b/src/redis.c @@ -40,6 +40,7 @@ #define MAX_REDIS_NODE_NAME 64 #define MAX_REDIS_PASSWD_LENGTH 512 #define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_COMMAND 2048 /* Redis plugin configuration example: * @@ -53,6 +54,16 @@ * */ +struct redis_command_s; +typedef struct redis_command_s redis_command_t; +struct redis_command_s +{ + char command[MAX_REDIS_COMMAND]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_command_t *next; +}; + struct redis_node_s; typedef struct redis_node_s redis_node_t; struct redis_node_s @@ -62,6 +73,7 @@ struct redis_node_s char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; struct timeval timeout; + redis_command_t *commands; redis_node_t *next; }; @@ -110,10 +122,46 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */ return (0); } /* }}} */ +static redis_command_t *redis_config_command (oconfig_item_t *ci) /* {{{ */ +{ + redis_command_t *rc; + int status; + int i; + + rc = calloc(1, sizeof(*rc)); + if (rc == NULL) { + ERROR("redis plugin: calloca failed adding redis_command."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rc->type, sizeof(rc->type)); + if (status != 0) + goto err; + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Exec", option->key) == 0) { + status = cf_util_get_string_buffer(option, rc->command, sizeof(rc->command)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = cf_util_get_string_buffer(option, rc->instance, sizeof(rc->instance)); + } + if (status != 0) + goto err; + } + if (strlen(rc->command) == 0) { + WARNING("redis plugin: invalid command definition for: %s", rc->type); + goto err; + } + return rc; + err: + free(rc); + return NULL; +} /* }}} */ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ { redis_node_t rn; + redis_command_t *rc; int i; int status; int timeout; @@ -122,6 +170,7 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); rn.port = REDIS_DEF_PORT; rn.timeout.tv_usec = REDIS_DEF_TIMEOUT; + rn.commands = NULL; status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); if (status != 0) @@ -142,6 +191,16 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ status = 0; } } + else if (strcasecmp ("Command", option->key) == 0) + { + rc = redis_config_command(option); + if (rc == NULL) { + status =1; + } else { + rc->next = rn.commands; + rn.commands = rc; + } + } else if (strcasecmp ("Timeout", option->key) == 0) { status = cf_util_get_int (option, &timeout); @@ -256,9 +315,67 @@ int redis_handle_info (char *node, char const *info_line, char const *type, char } /* }}} int redis_handle_info */ +int redis_handle_command (redisContext *rh, redis_node_t *rn, redis_command_t *rc) /* {{{ */ +{ + redisReply *rr; + const data_set_t *ds; + value_t val; + + ds = plugin_get_ds (rc->type); + if (!ds) { + ERROR ("redis plugin: DataSet `%s' not defined.", rc->type); + return (-1); + } + + if (ds->ds_num != 1) { + ERROR ("redis plugin: DS `%s' has too many types.", rc->type); + return (-1); + } + + if ((rr = redisCommand(rh, rc->command)) == NULL) { + WARNING("redis plugin: unable to carry out command `%s'.", rc->command); + return (-1); + } + + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value (rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rc->type); + freeReplyObject (rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rc->type, (strlen(rc->instance) >0)?rc->instance:NULL, val); + freeReplyObject (rr); + return 0; +} /* }}} int redis_handle_info */ + static int redis_read (void) /* {{{ */ { redis_node_t *rn; + redis_command_t *rc; for (rn = nodes_head; rn != NULL; rn = rn->next) { @@ -312,6 +429,10 @@ static int redis_read (void) /* {{{ */ redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE); freeReplyObject (rr); + + for (rc = rn->commands; rc != NULL; rc = rc->next) + redis_handle_command(rh, rn, rc); + redisFree (rh); }