X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fredis.c;h=7864ead07c01c1edf079cecfb9c79474470cb277;hb=0003c4d3c184f0f437499d6073cd023dc7b659c2;hp=85a8354629240553f7e60a65ae0c5cb72446ae29;hpb=c7b0f0b0267f2ab7c84eea8306f0a516f5b8f769;p=collectd.git diff --git a/src/redis.c b/src/redis.c index 85a83546..7864ead0 100644 --- a/src/redis.c +++ b/src/redis.c @@ -26,7 +26,8 @@ #include "configfile.h" #include -#include +#include +#include #ifndef HOST_NAME_MAX # define HOST_NAME_MAX _POSIX_HOST_NAME_MAX @@ -37,6 +38,9 @@ #define REDIS_DEF_PORT 6379 #define REDIS_DEF_TIMEOUT 2000 #define MAX_REDIS_NODE_NAME 64 +#define MAX_REDIS_PASSWD_LENGTH 512 +#define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_QUERY 2048 /* Redis plugin configuration example: * @@ -44,20 +48,32 @@ * * Host "localhost" * Port "6379" - * Timeout 2000 + * Timeout 2 + * Password "foobar" * * */ +struct redis_query_s; +typedef struct redis_query_s redis_query_t; +struct redis_query_s +{ + char query[MAX_REDIS_QUERY]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_query_t *next; +}; + struct redis_node_s; typedef struct redis_node_s redis_node_t; struct redis_node_s { char name[MAX_REDIS_NODE_NAME]; char host[HOST_NAME_MAX]; - char passwd[HOST_NAME_MAX]; + char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; - int timeout; + struct timeval timeout; + redis_query_t *queries; redis_node_t *next; }; @@ -106,16 +122,61 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */ return (0); } /* }}} */ +static redis_query_t *redis_config_query (oconfig_item_t *ci) /* {{{ */ +{ + redis_query_t *rq; + int status; + int i; + + rq = calloc(1, sizeof(*rq)); + if (rq == NULL) { + ERROR("redis plugin: calloca failed adding redis_query."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query)); + if (status != 0) + goto err; + + /* + * Default to a gauge type. + */ + (void)strncpy(rq->type, "gauge", sizeof(rq->type)); + (void)strncpy(rq->instance, rq->query, sizeof(rq->instance)); + replace_special(rq->instance, sizeof(rq->instance)); + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Type", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance)); + } else { + WARNING("redis plugin: unknown configuration option: %s", option->key); + status = -1; + } + if (status != 0) + goto err; + } + return rq; + err: + free(rq); + return NULL; +} /* }}} */ + static int redis_config_node (oconfig_item_t *ci) /* {{{ */ { redis_node_t rn; + redis_query_t *rq; int i; int status; + int timeout; memset (&rn, 0, sizeof (rn)); sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); rn.port = REDIS_DEF_PORT; - rn.timeout = REDIS_DEF_TIMEOUT; + rn.timeout.tv_usec = REDIS_DEF_TIMEOUT; + rn.queries = NULL; status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); if (status != 0) @@ -136,8 +197,21 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ status = 0; } } + else if (strcasecmp ("Query", option->key) == 0) + { + rq = redis_config_query(option); + if (rq == NULL) { + status =1; + } else { + rq->next = rn.queries; + rn.queries = rq; + } + } else if (strcasecmp ("Timeout", option->key) == 0) - status = cf_util_get_int (option, &rn.timeout); + { + status = cf_util_get_int (option, &timeout); + if (status == 0) rn.timeout.tv_usec = timeout; + } else if (strcasecmp ("Password", option->key) == 0) status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd)); else @@ -179,39 +253,14 @@ static int redis_config (oconfig_item_t *ci) /* {{{ */ } /* }}} */ __attribute__ ((nonnull(2))) -static void redis_submit_g (char *plugin_instance, - const char *type, const char *type_instance, - gauge_t value) /* {{{ */ -{ - value_t values[1]; - value_list_t vl = VALUE_LIST_INIT; - - values[0].gauge = value; - - vl.values = values; - vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "redis", sizeof (vl.plugin)); - if (plugin_instance != NULL) - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof (vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_instance != NULL) - sstrncpy (vl.type_instance, type_instance, - sizeof (vl.type_instance)); - - plugin_dispatch_values (&vl); -} /* }}} */ - - __attribute__ ((nonnull(2))) -static void redis_submit_d (char *plugin_instance, +static void redis_submit (char *plugin_instance, const char *type, const char *type_instance, - derive_t value) /* {{{ */ + value_t value) /* {{{ */ { value_t values[1]; value_list_t vl = VALUE_LIST_INIT; - values[0].derive = value; + values[0] = value; vl.values = values; vl.values_len = 1; @@ -230,8 +279,14 @@ static void redis_submit_d (char *plugin_instance, static int redis_init (void) /* {{{ */ { - redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PASSWD, - REDIS_DEF_PORT, REDIS_DEF_TIMEOUT, /* next = */ NULL }; + redis_node_t rn = { + .name = "default", + .host = REDIS_DEF_HOST, + .port = REDIS_DEF_PORT, + .timeout.tv_sec = 0, + .timeout.tv_usec = REDIS_DEF_TIMEOUT, + .next = NULL +}; if (nodes_head == NULL) redis_node_add (&rn); @@ -239,20 +294,103 @@ static int redis_init (void) /* {{{ */ return (0); } /* }}} int redis_init */ +int redis_handle_info (char *node, char const *info_line, char const *type, char const *type_instance, char const *field_name, int ds_type) /* {{{ */ +{ + char *str = strstr (info_line, field_name); + static char buf[MAX_REDIS_VAL_SIZE]; + value_t val; + if (str) + { + int i; + + str += strlen (field_name) + 1; /* also skip the ':' */ + for(i=0;(*str && (isdigit(*str) || *str == '.'));i++,str++) + buf[i] = *str; + buf[i] ='\0'; + + if(parse_value (buf, &val, ds_type) == -1) + { + WARNING ("redis plugin: Unable to parse field `%s'.", field_name); + return (-1); + } + + redis_submit (node, type, type_instance, val); + return (0); + } + return (-1); + +} /* }}} int redis_handle_info */ + +int redis_handle_query (redisContext *rh, redis_node_t *rn, redis_query_t *rq) /* {{{ */ +{ + redisReply *rr; + const data_set_t *ds; + value_t val; + + ds = plugin_get_ds (rq->type); + if (!ds) { + ERROR ("redis plugin: DataSet `%s' not defined.", rq->type); + return (-1); + } + + if (ds->ds_num != 1) { + ERROR ("redis plugin: DS `%s' has too many types.", rq->type); + return (-1); + } + + if ((rr = redisCommand(rh, rq->query)) == NULL) { + WARNING("redis plugin: unable to carry out query `%s'.", rq->query); + return (-1); + } + + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value (rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rq->type); + freeReplyObject (rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rq->type, (strlen(rq->instance) >0)?rq->instance:NULL, val); + freeReplyObject (rr); + return 0; +} /* }}} int redis_handle_info */ + static int redis_read (void) /* {{{ */ { redis_node_t *rn; + redis_query_t *rq; for (rn = nodes_head; rn != NULL; rn = rn->next) { - REDIS rh; - REDIS_INFO info; - - int status; + redisContext *rh; + redisReply *rr; DEBUG ("redis plugin: querying info from node `%s' (%s:%d).", rn->name, rn->host, rn->port); - rh = credis_connect (rn->host, rn->port, rn->timeout); + rh = redisConnectWithTimeout ((char *)rn->host, rn->port, rn->timeout); if (rh == NULL) { ERROR ("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, rn->host, rn->port); @@ -262,56 +400,46 @@ static int redis_read (void) /* {{{ */ if (strlen (rn->passwd) > 0) { DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd); - status = credis_auth(rh, rn->passwd); - if (status != 0) + rr = redisCommand (rh, "AUTH %s", rn->passwd); + + if (rr == NULL || rr->type != REDIS_REPLY_STATUS) { WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name); - credis_close (rh); + if (rr != NULL) + freeReplyObject (rr); + + redisFree (rh); continue; } } - memset (&info, 0, sizeof (info)); - status = credis_info (rh, &info); - if (status != 0) + if ((rr = redisCommand(rh, "INFO")) == NULL) { - WARNING ("redis plugin: unable to get info from node `%s'.", rn->name); - credis_close (rh); + WARNING ("redis plugin: unable to connect to node `%s'.", rn->name); + redisFree (rh); continue; } - /* typedef struct _cr_info { - * char redis_version[CREDIS_VERSION_STRING_SIZE]; - * int bgsave_in_progress; - * int connected_clients; - * int connected_slaves; - * unsigned int used_memory; - * long long changes_since_last_save; - * int last_save_time; - * long long total_connections_received; - * long long total_commands_processed; - * int uptime_in_seconds; - * int uptime_in_days; - * int role; - * } REDIS_INFO; */ - - DEBUG ("redis plugin: received info from node `%s': connected_clients = %d; " - "connected_slaves = %d; used_memory = %lu; changes_since_last_save = %lld; " - "bgsave_in_progress = %d; total_connections_received = %lld; " - "total_commands_processed = %lld; uptime_in_seconds = %ld", rn->name, - info.connected_clients, info.connected_slaves, info.used_memory, - info.changes_since_last_save, info.bgsave_in_progress, - info.total_connections_received, info.total_commands_processed, - info.uptime_in_seconds); - - redis_submit_g (rn->name, "current_connections", "clients", info.connected_clients); - redis_submit_g (rn->name, "current_connections", "slaves", info.connected_slaves); - redis_submit_g (rn->name, "memory", "used", info.used_memory); - redis_submit_g (rn->name, "volatile_changes", NULL, info.changes_since_last_save); - redis_submit_d (rn->name, "total_connections", NULL, info.total_connections_received); - redis_submit_d (rn->name, "total_operations", NULL, info.total_commands_processed); - - credis_close (rh); + redis_handle_info (rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "current_connections", "clients", "connected_clients", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "blocked_clients", NULL, "blocked_clients", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "memory", NULL, "used_memory", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", DS_TYPE_GAUGE); + /* changes_since_last_save: Deprecated in redis version 2.6 and above */ + redis_handle_info (rn->name, rr->str, "volatile_changes", NULL, "changes_since_last_save", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "total_connections", NULL, "total_connections_received", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "total_operations", NULL, "total_commands_processed", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "expired_keys", NULL, "expired_keys", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "pubsub", "channels", "pubsub_channels", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE); + + freeReplyObject (rr); + + for (rq = rn->queries; rq != NULL; rq = rq->next) + redis_handle_query(rh, rn, rq); + + redisFree (rh); } return 0;