X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fredis.c;h=9c921060fbeb9e3d6a5bcb7284e1ce8673b49d83;hb=a61961adfa3baec47f29cda368669b009b857fa5;hp=7c704ab5586ff1598fb422ba50c77b5064a68ee8;hpb=358bf39b09f69220fc8e1b6c2fe98e5e185e3364;p=collectd.git diff --git a/src/redis.c b/src/redis.c index 7c704ab5..9c921060 100644 --- a/src/redis.c +++ b/src/redis.c @@ -35,8 +35,8 @@ #define REDIS_DEF_HOST "localhost" #define REDIS_DEF_PASSWD "" #define REDIS_DEF_PORT 6379 -#define REDIS_DEF_TIMEOUT 2000 -#define REDIS_DEF_DB_COUNT 16 +#define REDIS_DEF_TIMEOUT_SEC 2 +#define REDIS_DEF_DB_COUNT 256 #define MAX_REDIS_NODE_NAME 64 #define MAX_REDIS_PASSWD_LENGTH 512 #define MAX_REDIS_VAL_SIZE 256 @@ -60,6 +60,8 @@ struct redis_query_s { char query[MAX_REDIS_QUERY]; char type[DATA_MAX_NAME_LEN]; char instance[DATA_MAX_NAME_LEN]; + int database; + redis_query_t *next; }; @@ -71,12 +73,13 @@ struct redis_node_s { char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; struct timeval timeout; + redisContext *redisContext; redis_query_t *queries; redis_node_t *next; }; -static redis_node_t *nodes_head = NULL; +static redis_node_t *nodes_head; static int redis_node_add(const redis_node_t *rn) /* {{{ */ { @@ -137,6 +140,8 @@ static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */ (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance)); replace_special(rq->instance, sizeof(rq->instance)); + rq->database = 0; + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; @@ -145,6 +150,13 @@ static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */ } else if (strcasecmp("Instance", option->key) == 0) { status = cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance)); + } else if (strcasecmp("Database", option->key) == 0) { + status = cf_util_get_int(option, &rq->database); + if (rq->database < 0) { + WARNING("redis plugin: The \"Database\" option must be positive " + "integer or zero"); + status = -1; + } } else { WARNING("redis plugin: unknown configuration option: %s", option->key); status = -1; @@ -165,7 +177,7 @@ static int redis_config_node(oconfig_item_t *ci) /* {{{ */ int timeout; redis_node_t rn = {.port = REDIS_DEF_PORT, - .timeout.tv_usec = REDIS_DEF_TIMEOUT}; + .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC}; sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host)); @@ -194,8 +206,11 @@ static int redis_config_node(oconfig_item_t *ci) /* {{{ */ } } else if (strcasecmp("Timeout", option->key) == 0) { status = cf_util_get_int(option, &timeout); - if (status == 0) - rn.timeout.tv_usec = timeout; + if (status == 0) { + rn.timeout.tv_usec = timeout * 1000; + rn.timeout.tv_sec = rn.timeout.tv_usec / 1000000L; + rn.timeout.tv_usec %= 1000000L; + } } else if (strcasecmp("Password", option->key) == 0) status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd)); else @@ -257,8 +272,7 @@ static int redis_init(void) /* {{{ */ redis_node_t rn = {.name = "default", .host = REDIS_DEF_HOST, .port = REDIS_DEF_PORT, - .timeout.tv_sec = 0, - .timeout.tv_usec = REDIS_DEF_TIMEOUT, + .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC, .next = NULL}; if (nodes_head == NULL) @@ -267,6 +281,26 @@ static int redis_init(void) /* {{{ */ return 0; } /* }}} int redis_init */ +static void *c_redisCommand(redis_node_t *rn, const char *format, ...) { + redisContext *c = rn->redisContext; + + if (c == NULL) + return NULL; + + va_list ap; + va_start(ap, format); + void *reply = redisvCommand(c, format, ap); + va_end(ap); + + if (reply == NULL) { + ERROR("redis plugin: Connection error: %s", c->errstr); + redisFree(rn->redisContext); + rn->redisContext = NULL; + } + + return reply; +} /* void c_redisCommand */ + static int redis_handle_info(char *node, char const *info_line, char const *type, char const *type_instance, char const *field_name, int ds_type) /* {{{ */ @@ -295,8 +329,7 @@ static int redis_handle_info(char *node, char const *info_line, } /* }}} int redis_handle_info */ -static int redis_handle_query(redisContext *rh, redis_node_t *rn, - redis_query_t *rq) /* {{{ */ +static int redis_handle_query(redis_node_t *rn, redis_query_t *rq) /* {{{ */ { redisReply *rr; const data_set_t *ds; @@ -304,16 +337,24 @@ static int redis_handle_query(redisContext *rh, redis_node_t *rn, ds = plugin_get_ds(rq->type); if (!ds) { - ERROR("redis plugin: DataSet `%s' not defined.", rq->type); + ERROR("redis plugin: DS type `%s' not defined.", rq->type); return -1; } if (ds->ds_num != 1) { - ERROR("redis plugin: DS `%s' has too many types.", rq->type); + ERROR("redis plugin: DS type `%s' has too many datasources. This is not " + "supported currently.", + rq->type); return -1; } - if ((rr = redisCommand(rh, rq->query)) == NULL) { + if ((rr = c_redisCommand(rn, "SELECT %d", rq->database)) == NULL) { + WARNING("redis plugin: unable to switch to database `%d' on node `%s'.", + rq->database, rn->name); + return -1; + } + + if ((rr = c_redisCommand(rn, rq->query)) == NULL) { WARNING("redis plugin: unable to carry out query `%s'.", rq->query); return -1; } @@ -337,13 +378,24 @@ static int redis_handle_query(redisContext *rh, redis_node_t *rn, break; case REDIS_REPLY_STRING: if (parse_value(rr->str, &val, ds->ds[0].type) == -1) { - WARNING("redis plugin: Unable to parse field `%s'.", rq->type); + WARNING("redis plugin: Query `%s': Unable to parse value.", rq->query); freeReplyObject(rr); return -1; } break; + case REDIS_REPLY_ERROR: + WARNING("redis plugin: Query `%s' failed: %s.", rq->query, rr->str); + freeReplyObject(rr); + return -1; + case REDIS_REPLY_ARRAY: + WARNING("redis plugin: Query `%s' should return string or integer. Arrays " + "are not supported.", + rq->query); + freeReplyObject(rr); + return -1; default: - WARNING("redis plugin: Cannot coerce redis type."); + WARNING("redis plugin: Query `%s': Cannot coerce redis type (%i).", + rq->query, rr->type); freeReplyObject(rr); return -1; } @@ -364,8 +416,8 @@ static int redis_db_stats(char *node, char const *info_line) /* {{{ */ for (int db = 0; db < REDIS_DEF_DB_COUNT; db++) { static char buf[MAX_REDIS_VAL_SIZE]; - static char field_name[11]; - static char db_id[3]; + static char field_name[12]; + static char db_id[4]; value_t val; char *str; int i; @@ -393,91 +445,122 @@ static int redis_db_stats(char *node, char const *info_line) /* {{{ */ } /* }}} int redis_db_stats */ +static void redis_check_connection(redis_node_t *rn) { + if (rn->redisContext) + return; + + redisContext *rh = + redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout); + + if (rh == NULL) { + ERROR("redis plugin: can't allocate redis context"); + return; + } + if (rh->err) { + ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.", rn->name, + rn->host, rn->port, rh->errstr); + redisFree(rh); + return; + } + + rn->redisContext = rh; + + if (strlen(rn->passwd) > 0) { + redisReply *rr; + + DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name, + rn->passwd); + + if ((rr = c_redisCommand(rn, "AUTH %s", rn->passwd)) == NULL) { + WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name); + return; + } + + if (rr->type != REDIS_REPLY_STATUS) { + WARNING("redis plugin: invalid authentication on node `%s'.", rn->name); + freeReplyObject(rr); + redisFree(rn->redisContext); + rn->redisContext = NULL; + return; + } + + freeReplyObject(rr); + } + return; +} /* void redis_check_connection */ + +static void redis_read_server_info(redis_node_t *rn) { + redisReply *rr; + + if ((rr = c_redisCommand(rn, "INFO")) == NULL) { + WARNING("redis plugin: unable to get INFO from node `%s'.", rn->name); + return; + } + + redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "current_connections", "clients", + "connected_clients", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "blocked_clients", NULL, + "blocked_clients", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", + DS_TYPE_GAUGE); + /* changes_since_last_save: Deprecated in redis version 2.6 and above */ + redis_handle_info(rn->name, rr->str, "volatile_changes", NULL, + "changes_since_last_save", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "total_connections", NULL, + "total_connections_received", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_operations", NULL, + "total_commands_processed", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "operations_per_second", NULL, + "instantaneous_ops_per_sec", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys", + DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys", + DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "pubsub", "channels", "pubsub_channels", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns", + DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "current_connections", "slaves", + "connected_slaves", DS_TYPE_GAUGE); + redis_handle_info(rn->name, rr->str, "cache_result", "hits", "keyspace_hits", + DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "cache_result", "misses", + "keyspace_misses", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_bytes", "input", + "total_net_input_bytes", DS_TYPE_DERIVE); + redis_handle_info(rn->name, rr->str, "total_bytes", "output", + "total_net_output_bytes", DS_TYPE_DERIVE); + + redis_db_stats(rn->name, rr->str); + + freeReplyObject(rr); +} /* void redis_read_server_info */ + static int redis_read(void) /* {{{ */ { for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) { - redisContext *rh; - redisReply *rr; - DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name, rn->host, rn->port); - rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout); - if (rh == NULL) { - ERROR("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, - rn->host, rn->port); - continue; - } + redis_check_connection(rn); - if (strlen(rn->passwd) > 0) { - DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name, - rn->passwd); - - if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) { - WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name); - goto redis_fail; - } + if (!rn->redisContext) /* no connection */ + continue; - if (rr->type != REDIS_REPLY_STATUS) { - WARNING("redis plugin: invalid authentication on node `%s'.", rn->name); - goto redis_fail; - } + redis_read_server_info(rn); - freeReplyObject(rr); - } + if (!rn->redisContext) /* connection lost */ + continue; - if ((rr = redisCommand(rh, "INFO")) == NULL) { - WARNING("redis plugin: unable to get info from node `%s'.", rn->name); - goto redis_fail; + for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) { + redis_handle_query(rn, rq); + if (!rn->redisContext) /* connection lost */ + break; } - - redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", - DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "current_connections", "clients", - "connected_clients", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "blocked_clients", NULL, - "blocked_clients", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory", - DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", - DS_TYPE_GAUGE); - /* changes_since_last_save: Deprecated in redis version 2.6 and above */ - redis_handle_info(rn->name, rr->str, "volatile_changes", NULL, - "changes_since_last_save", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "total_connections", NULL, - "total_connections_received", DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "total_operations", NULL, - "total_commands_processed", DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "operations_per_second", NULL, - "instantaneous_ops_per_sec", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys", - DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys", - DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "pubsub", "channels", - "pubsub_channels", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "pubsub", "patterns", - "pubsub_patterns", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "current_connections", "slaves", - "connected_slaves", DS_TYPE_GAUGE); - redis_handle_info(rn->name, rr->str, "cache_result", "hits", - "keyspace_hits", DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "cache_result", "misses", - "keyspace_misses", DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "total_bytes", "input", - "total_net_input_bytes", DS_TYPE_DERIVE); - redis_handle_info(rn->name, rr->str, "total_bytes", "output", - "total_net_output_bytes", DS_TYPE_DERIVE); - - redis_db_stats(rn->name, rr->str); - - for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) - redis_handle_query(rh, rn, rq); - - redis_fail: - if (rr != NULL) - freeReplyObject(rr); - redisFree(rh); } return 0;