From 203c25da6fed60b39c31e382e2865c4fcfe3dc05 Mon Sep 17 00:00:00 2001 From: Pavel Rochnyack Date: Sun, 17 Jun 2018 20:49:35 +0700 Subject: [PATCH] redis plugin: Reworked to use plugin_register_complex_read() --- src/collectd.conf.pod | 10 +-- src/redis.c | 191 ++++++++++++++++++++++++++------------------------ 2 files changed, 104 insertions(+), 97 deletions(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 31a094d0..093ff765 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -7307,7 +7307,7 @@ Server state is requested by sending I query. The B block identifies a new Redis node, that is a new Redis instance running in an specified host and port. The name for node is a canonical identifier which is used as I. It is limited to -64Echaracters in length. +128Echaracters in length. When no B is configured explicitly, plugin connects to "localhost:6379". @@ -7329,9 +7329,9 @@ Use I to authenticate when connecting to I. =item B I The B option set the socket timeout for node response. Since the Redis -read function is blocking, you should keep this value as low as possible. Keep -in mind that the sum of all B values for all B should be lower -than B defined globally. +read function is blocking, you should keep this value as low as possible. +It is expected what B values should be lower than B defined +globally. Defaults to 2000 (2 seconds). @@ -7353,7 +7353,7 @@ See L for more details on types and their configuration. Within a query definition, an optional type instance to use when submitting the result of the query. When not supplied will default to the escaped -command, up to 64 chars. +command, up to 128 chars. =item B I diff --git a/src/redis.c b/src/redis.c index 9c921060..83cf6f59 100644 --- a/src/redis.c +++ b/src/redis.c @@ -28,17 +28,11 @@ #include #include -#ifndef HOST_NAME_MAX -#define HOST_NAME_MAX _POSIX_HOST_NAME_MAX -#endif - #define REDIS_DEF_HOST "localhost" #define REDIS_DEF_PASSWD "" #define REDIS_DEF_PORT 6379 #define REDIS_DEF_TIMEOUT_SEC 2 #define REDIS_DEF_DB_COUNT 256 -#define MAX_REDIS_NODE_NAME 64 -#define MAX_REDIS_PASSWD_LENGTH 512 #define MAX_REDIS_VAL_SIZE 256 #define MAX_REDIS_QUERY 2048 @@ -68,9 +62,9 @@ struct redis_query_s { struct redis_node_s; typedef struct redis_node_s redis_node_t; struct redis_node_s { - char name[MAX_REDIS_NODE_NAME]; - char host[HOST_NAME_MAX]; - char passwd[MAX_REDIS_PASSWD_LENGTH]; + char *name; + char *host; + char *passwd; int port; struct timeval timeout; redisContext *redisContext; @@ -79,44 +73,46 @@ struct redis_node_s { redis_node_t *next; }; -static redis_node_t *nodes_head; - -static int redis_node_add(const redis_node_t *rn) /* {{{ */ -{ - redis_node_t *rn_copy; - redis_node_t *rn_ptr; - - /* Check for duplicates first */ - for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next) - if (strcmp(rn->name, rn_ptr->name) == 0) - break; +static bool redis_have_instances; +static int redis_read(user_data_t *user_data); - if (rn_ptr != NULL) { - ERROR("redis plugin: A node with the name `%s' already exists.", rn->name); - return -1; - } +static void redis_node_free(void *arg) { + redis_node_t *rn = arg; + if (rn == NULL) + return; - rn_copy = malloc(sizeof(*rn_copy)); - if (rn_copy == NULL) { - ERROR("redis plugin: malloc failed adding redis_node to the tree."); - return -1; + redis_query_t *rq = rn->queries; + while (rq != NULL) { + redis_query_t *next = rq->next; + sfree(rq); + rq = next; } - memcpy(rn_copy, rn, sizeof(*rn_copy)); - rn_copy->next = NULL; + redisFree(rn->redisContext); + sfree(rn->name); + sfree(rn->host); + sfree(rn->passwd); + sfree(rn); +} /* void redis_node_free */ +static int redis_node_add(redis_node_t *rn) /* {{{ */ +{ DEBUG("redis plugin: Adding node \"%s\".", rn->name); - if (nodes_head == NULL) - nodes_head = rn_copy; - else { - rn_ptr = nodes_head; - while (rn_ptr->next != NULL) - rn_ptr = rn_ptr->next; - rn_ptr->next = rn_copy; - } + /* Disable automatic generation of default instance in the init callback. */ + redis_have_instances = true; - return 0; + char cb_name[sizeof("redis/") + DATA_MAX_NAME_LEN]; + snprintf(cb_name, sizeof(cb_name), "redis/%s", rn->name); + + return plugin_register_complex_read( + /* group = */ "redis", + /* name = */ cb_name, + /* callback = */ redis_read, + /* interval = */ 0, + &(user_data_t){ + .data = rn, .free_func = redis_node_free, + }); } /* }}} */ static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */ @@ -172,47 +168,58 @@ err: static int redis_config_node(oconfig_item_t *ci) /* {{{ */ { - redis_query_t *rq; - int status; - int timeout; + redis_node_t *rn = calloc(1, sizeof(*rn)); + if (rn == NULL) { + ERROR("redis plugin: calloc failed adding node."); + return ENOMEM; + } - redis_node_t rn = {.port = REDIS_DEF_PORT, - .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC}; + rn->port = REDIS_DEF_PORT; + rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC; - sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host)); + rn->host = strdup(REDIS_DEF_HOST); + if (rn->host == NULL) { + ERROR("redis plugin: strdup failed adding node."); + sfree(rn); + return ENOMEM; + } - status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name)); - if (status != 0) + int status = cf_util_get_string(ci, &rn->name); + if (status != 0) { + sfree(rn->host); + sfree(rn); return status; + } for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; if (strcasecmp("Host", option->key) == 0) - status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host)); + status = cf_util_get_string(option, &rn->host); else if (strcasecmp("Port", option->key) == 0) { status = cf_util_get_port_number(option); if (status > 0) { - rn.port = status; + rn->port = status; status = 0; } } else if (strcasecmp("Query", option->key) == 0) { - rq = redis_config_query(option); + redis_query_t *rq = redis_config_query(option); if (rq == NULL) { status = 1; } else { - rq->next = rn.queries; - rn.queries = rq; + rq->next = rn->queries; + rn->queries = rq; } } else if (strcasecmp("Timeout", option->key) == 0) { + int timeout; status = cf_util_get_int(option, &timeout); if (status == 0) { - rn.timeout.tv_usec = timeout * 1000; - rn.timeout.tv_sec = rn.timeout.tv_usec / 1000000L; - rn.timeout.tv_usec %= 1000000L; + rn->timeout.tv_usec = timeout * 1000; + rn->timeout.tv_sec = rn->timeout.tv_usec / 1000000L; + rn->timeout.tv_usec %= 1000000L; } } else if (strcasecmp("Password", option->key) == 0) - status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd)); + status = cf_util_get_string(option, &rn->passwd); else WARNING("redis plugin: Option `%s' not allowed inside a `Node' " "block. I'll ignore this option.", @@ -222,10 +229,12 @@ static int redis_config_node(oconfig_item_t *ci) /* {{{ */ break; } - if (status != 0) + if (status != 0) { + redis_node_free(rn); return status; + } - return redis_node_add(&rn); + return redis_node_add(rn); } /* }}} int redis_config_node */ static int redis_config(oconfig_item_t *ci) /* {{{ */ @@ -241,11 +250,6 @@ static int redis_config(oconfig_item_t *ci) /* {{{ */ option->key); } - if (nodes_head == NULL) { - ERROR("redis plugin: No valid node configuration could be found."); - return ENOENT; - } - return 0; } /* }}} */ @@ -269,16 +273,23 @@ redis_submit(char *plugin_instance, const char *type, const char *type_instance, static int redis_init(void) /* {{{ */ { - redis_node_t rn = {.name = "default", - .host = REDIS_DEF_HOST, - .port = REDIS_DEF_PORT, - .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC, - .next = NULL}; + if (redis_have_instances) + return 0; - if (nodes_head == NULL) - redis_node_add(&rn); + redis_node_t *rn = calloc(1, sizeof(*rn)); + if (rn == NULL) + return ENOMEM; - return 0; + rn->port = REDIS_DEF_PORT; + rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC; + + rn->name = strdup("default"); + rn->host = strdup(REDIS_DEF_HOST); + + if (rn->name == NULL || rn->host == NULL) + return ENOMEM; + + return redis_node_add(rn); } /* }}} int redis_init */ static void *c_redisCommand(redis_node_t *rn, const char *format, ...) { @@ -449,8 +460,7 @@ static void redis_check_connection(redis_node_t *rn) { if (rn->redisContext) return; - redisContext *rh = - redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout); + redisContext *rh = redisConnectWithTimeout(rn->host, rn->port, rn->timeout); if (rh == NULL) { ERROR("redis plugin: can't allocate redis context"); @@ -465,7 +475,7 @@ static void redis_check_connection(redis_node_t *rn) { rn->redisContext = rh; - if (strlen(rn->passwd) > 0) { + if (rn->passwd) { redisReply *rr; DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name, @@ -540,27 +550,27 @@ static void redis_read_server_info(redis_node_t *rn) { freeReplyObject(rr); } /* void redis_read_server_info */ -static int redis_read(void) /* {{{ */ +static int redis_read(user_data_t *user_data) /* {{{ */ { - for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) { - DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name, - rn->host, rn->port); + redis_node_t *rn = user_data->data; - redis_check_connection(rn); + DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name, + rn->host, rn->port); - if (!rn->redisContext) /* no connection */ - continue; + redis_check_connection(rn); - redis_read_server_info(rn); + if (!rn->redisContext) /* no connection */ + return -1; - if (!rn->redisContext) /* connection lost */ - continue; + redis_read_server_info(rn); - for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) { - redis_handle_query(rn, rq); - if (!rn->redisContext) /* connection lost */ - break; - } + if (!rn->redisContext) /* connection lost */ + return -1; + + for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) { + redis_handle_query(rn, rq); + if (!rn->redisContext) /* connection lost */ + return -1; } return 0; @@ -571,8 +581,5 @@ void module_register(void) /* {{{ */ { plugin_register_complex_config("redis", redis_config); plugin_register_init("redis", redis_init); - plugin_register_read("redis", redis_read); - /* TODO: plugin_register_write: one redis list per value id with - * X elements */ } /* }}} */ -- 2.11.0