X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fredis.c;h=86062d9c5d5816063462ed6b5133d6c8856e34d5;hb=441e067a2d5a294517bd87ca45c87fd67377d2cf;hp=0ccc8ba80209657ceeec1fdc082fd0150a5303e0;hpb=d6ef3dad2f3dc9d7c35a9bbb563e52c3f0c1ec7c;p=collectd.git diff --git a/src/redis.c b/src/redis.c index 0ccc8ba8..919a8312 100644 --- a/src/redis.c +++ b/src/redis.c @@ -24,188 +24,243 @@ #include "common.h" #include "plugin.h" #include "configfile.h" -#include "utils_avltree.h" #include -#include +#include +#include -#define REDIS_DEF_HOST "127.0.0.1" -#define REDIS_DEF_PORT 6379 +#ifndef HOST_NAME_MAX +# define HOST_NAME_MAX _POSIX_HOST_NAME_MAX +#endif + +#define REDIS_DEF_HOST "localhost" +#define REDIS_DEF_PASSWD "" +#define REDIS_DEF_PORT 6379 +#define REDIS_DEF_TIMEOUT 2000 #define MAX_REDIS_NODE_NAME 64 +#define MAX_REDIS_PASSWD_LENGTH 512 +#define MAX_REDIS_VAL_SIZE 256 +#define MAX_REDIS_QUERY 2048 /* Redis plugin configuration example: * * - * - * Host localhost - * Port 6379 - * Timeout 2000 + * + * Host "localhost" + * Port "6379" + * Timeout 2 + * Password "foobar" * * */ -static c_avl_tree_t *redis_tree = NULL; -static pthread_mutex_t redis_lock = PTHREAD_MUTEX_INITIALIZER; - -typedef struct redis_node_s { +struct redis_query_s; +typedef struct redis_query_s redis_query_t; +struct redis_query_s +{ + char query[MAX_REDIS_QUERY]; + char type[DATA_MAX_NAME_LEN]; + char instance[DATA_MAX_NAME_LEN]; + redis_query_t *next; +}; + +struct redis_node_s; +typedef struct redis_node_s redis_node_t; +struct redis_node_s +{ char name[MAX_REDIS_NODE_NAME]; char host[HOST_NAME_MAX]; + char passwd[MAX_REDIS_PASSWD_LENGTH]; int port; - int timeout; -} redis_node_t; + struct timeval timeout; + redis_query_t *queries; -static int redis_config_node (redis_node_t *rn, oconfig_item_t *ci) /* {{{ */ + redis_node_t *next; +}; + +static redis_node_t *nodes_head = NULL; + +static int redis_node_add (const redis_node_t *rn) /* {{{ */ { - int i; - int status = 0; + redis_node_t *rn_copy; + redis_node_t *rn_ptr; + + /* Check for duplicates first */ + for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next) + if (strcmp (rn->name, rn_ptr->name) == 0) + break; - if ((ci->values_num != 1) - || (ci->values[0].type != OCONFIG_TYPE_STRING)) + if (rn_ptr != NULL) { - WARNING ("redis plugin: The `Node' block needs exactly one string " - "argument."); + ERROR ("redis plugin: A node with the name `%s' already exists.", + rn->name); return (-1); } - if (ci->children_num < 1) + rn_copy = malloc (sizeof (*rn_copy)); + if (rn_copy == NULL) { - WARNING ("redis plugin: The `Node' block needs at least one option."); + ERROR ("redis plugin: malloc failed adding redis_node to the tree."); return (-1); } - sstrncpy (rn->name, ci->values[0].value.string, sizeof (rn->name)); - - for (i = 0; i < ci->children_num; i++) - { - oconfig_item_t *option = ci->children + i; - status = 0; + memcpy (rn_copy, rn, sizeof (*rn_copy)); + rn_copy->next = NULL; - if (strcasecmp ("Host", option->key) == 0) - status = cf_util_get_string_buffer (option, rn->host, sizeof (rn->host)); - else if (strcasecmp ("Port", option->key) == 0) - rn->port = cf_util_get_port_number (option); - else if (strcasecmp ("Timeout", option->key) == 0) - status = cf_util_get_int (option, &rn->timeout); - else - { - WARNING ("redis plugin: Option `%s' not allowed inside a `Node' " - "block.", option->key); - status = -1; - } + DEBUG ("redis plugin: Adding node \"%s\".", rn->name); - if (status != 0) - break; + if (nodes_head == NULL) + nodes_head = rn_copy; + else + { + rn_ptr = nodes_head; + while (rn_ptr->next != NULL) + rn_ptr = rn_ptr->next; + rn_ptr->next = rn_copy; } - return (status); + return (0); } /* }}} */ -static redis_node_t *redis_node_get (const char *name, redis_node_t *rn) /* {{{ */ +static redis_query_t *redis_config_query (oconfig_item_t *ci) /* {{{ */ { - if (c_avl_get (redis_tree, name, (void *) rn) == 0) - return (rn); - else - return (NULL); + redis_query_t *rq; + int status; + int i; + + rq = calloc(1, sizeof(*rq)); + if (rq == NULL) { + ERROR("redis plugin: calloc failed adding redis_query."); + return NULL; + } + status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query)); + if (status != 0) + goto err; + + /* + * Default to a gauge type. + */ + (void)strncpy(rq->type, "gauge", sizeof(rq->type)); + (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance)); + replace_special(rq->instance, sizeof(rq->instance)); + + for (i = 0; i < ci->children_num; i++) { + oconfig_item_t *option = ci->children + i; + + if (strcasecmp("Type", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type)); + } else if (strcasecmp("Instance", option->key) == 0) { + status = cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance)); + } else { + WARNING("redis plugin: unknown configuration option: %s", option->key); + status = -1; + } + if (status != 0) + goto err; + } + return rq; + err: + free(rq); + return NULL; } /* }}} */ -static int redis_node_add (const redis_node_t *rn) /* {{{ */ +static int redis_config_node (oconfig_item_t *ci) /* {{{ */ { + redis_node_t rn; + redis_query_t *rq; + int i; int status; - redis_node_t *rn_copy = NULL; - redis_node_t *rn_ptr; - redis_node_t rn_get; - - rn_copy = (redis_node_t *) malloc (sizeof (redis_node_t)); - if (rn_copy == NULL) - { - sfree (rn_copy); - ERROR ("redis plugin: malloc failed adding redis_node to the tree."); - return (-1); - } - memcpy (rn_copy, rn, sizeof (redis_node_t)); - if (*rn_copy->name == '\0') - { - (void) strncpy(rn_copy->name, "default", sizeof (rn_copy->name)); /* in theory never fails */ - } + int timeout; - DEBUG ("redis plugin: adding entry `%s' to the tree.", rn_copy->name); + memset (&rn, 0, sizeof (rn)); + sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host)); + rn.port = REDIS_DEF_PORT; + rn.timeout.tv_usec = REDIS_DEF_TIMEOUT; + rn.queries = NULL; - pthread_mutex_lock (&redis_lock); + status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name)); + if (status != 0) + return (status); - if ( (rn_ptr = redis_node_get (rn_copy->name, &rn_get)) != NULL ) + for (i = 0; i < ci->children_num; i++) { - WARNING ("redis plugin: the node `%s' override a previous node with same node.", rn_copy->name); - } + oconfig_item_t *option = ci->children + i; + + if (strcasecmp ("Host", option->key) == 0) + status = cf_util_get_string_buffer (option, rn.host, sizeof (rn.host)); + else if (strcasecmp ("Port", option->key) == 0) + { + status = cf_util_get_port_number (option); + if (status > 0) + { + rn.port = status; + status = 0; + } + } + else if (strcasecmp ("Query", option->key) == 0) + { + rq = redis_config_query(option); + if (rq == NULL) { + status =1; + } else { + rq->next = rn.queries; + rn.queries = rq; + } + } + else if (strcasecmp ("Timeout", option->key) == 0) + { + status = cf_util_get_int (option, &timeout); + if (status == 0) rn.timeout.tv_usec = timeout; + } + else if (strcasecmp ("Password", option->key) == 0) + status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd)); + else + WARNING ("redis plugin: Option `%s' not allowed inside a `Node' " + "block. I'll ignore this option.", option->key); - status = c_avl_insert (redis_tree, rn_copy->name, rn_copy); - pthread_mutex_unlock (&redis_lock); + if (status != 0) + break; + } if (status != 0) - { - ERROR ("redis plugin: c_avl_insert (%s) failed adding noew node.", rn_copy->name); - sfree (rn_copy); - return (-1); - } + return (status); - return (status); -} /* }}} */ + return (redis_node_add (&rn)); +} /* }}} int redis_config_node */ static int redis_config (oconfig_item_t *ci) /* {{{ */ { - int status; int i; - redis_node_t rn = { - .name = "", - .host = "", - .port = REDIS_DEF_PORT, - .timeout = 2000 - }; - - if (redis_tree == NULL) - { - redis_tree = c_avl_create ((void *) strcmp); - if (redis_tree == NULL) - { - ERROR ("redis plugin: c_avl_create failed reading config."); - return (-1); - } - } - - status = 0; for (i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; if (strcasecmp ("Node", option->key) == 0) - { - if ( (status = redis_config_node (&rn, option)) == 0 ) - status = redis_node_add (&rn); - } + redis_config_node (option); else - { WARNING ("redis plugin: Option `%s' not allowed in redis" - " configuration.", option->key); - status = -1; - } - + " configuration. It will be ignored.", option->key); + } - if (status != 0) - break; + if (nodes_head == NULL) + { + ERROR ("redis plugin: No valid node configuration could be found."); + return (ENOENT); } - return (status); + return (0); } /* }}} */ __attribute__ ((nonnull(2))) -static void redis_submit_g (char *plugin_instance, +static void redis_submit (char *plugin_instance, const char *type, const char *type_instance, - gauge_t value) /* {{{ */ + value_t value) /* {{{ */ { value_t values[1]; value_list_t vl = VALUE_LIST_INIT; - values[0].gauge = value; + values[0] = value; vl.values = values; vl.values_len = 1; @@ -222,107 +277,177 @@ static void redis_submit_g (char *plugin_instance, plugin_dispatch_values (&vl); } /* }}} */ - __attribute__ ((nonnull(2))) -static void redis_submit_c (char *plugin_instance, - const char *type, const char *type_instance, - counter_t value) /* {{{ */ +static int redis_init (void) /* {{{ */ { - value_t values[1]; - value_list_t vl = VALUE_LIST_INIT; + redis_node_t rn = { + .name = "default", + .host = REDIS_DEF_HOST, + .port = REDIS_DEF_PORT, + .timeout.tv_sec = 0, + .timeout.tv_usec = REDIS_DEF_TIMEOUT, + .next = NULL +}; - values[0].counter = value; + if (nodes_head == NULL) + redis_node_add (&rn); - vl.values = values; - vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "redis", sizeof (vl.plugin)); - if (plugin_instance != NULL) - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof (vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_instance != NULL) - sstrncpy (vl.type_instance, type_instance, - sizeof (vl.type_instance)); + return (0); +} /* }}} int redis_init */ - plugin_dispatch_values (&vl); -} /* }}} */ +static int redis_handle_info (char *node, char const *info_line, char const *type, char const *type_instance, char const *field_name, int ds_type) /* {{{ */ +{ + char *str = strstr (info_line, field_name); + static char buf[MAX_REDIS_VAL_SIZE]; + value_t val; + if (str) + { + int i; -static int redis_read (void) /* {{{ */ + str += strlen (field_name) + 1; /* also skip the ':' */ + for(i=0;(*str && (isdigit((unsigned char)*str) || *str == '.'));i++,str++) + buf[i] = *str; + buf[i] ='\0'; + + if(parse_value (buf, &val, ds_type) == -1) + { + WARNING ("redis plugin: Unable to parse field `%s'.", field_name); + return (-1); + } + + redis_submit (node, type, type_instance, val); + return (0); + } + return (-1); + +} /* }}} int redis_handle_info */ + +static int redis_handle_query (redisContext *rh, redis_node_t *rn, redis_query_t *rq) /* {{{ */ { - REDIS rh; - REDIS_INFO info; + redisReply *rr; + const data_set_t *ds; + value_t val; + + ds = plugin_get_ds (rq->type); + if (!ds) { + ERROR ("redis plugin: DataSet `%s' not defined.", rq->type); + return (-1); + } - char key[64]; - int status; - c_avl_iterator_t *iter; + if (ds->ds_num != 1) { + ERROR ("redis plugin: DS `%s' has too many types.", rq->type); + return (-1); + } + + if ((rr = redisCommand(rh, rq->query)) == NULL) { + WARNING("redis plugin: unable to carry out query `%s'.", rq->query); + return (-1); + } + + switch (rr->type) { + case REDIS_REPLY_INTEGER: + switch (ds->ds[0].type) { + case DS_TYPE_COUNTER: + val.counter = (counter_t)rr->integer; + break; + case DS_TYPE_GAUGE: + val.gauge = (gauge_t)rr->integer; + break; + case DS_TYPE_DERIVE: + val.gauge = (derive_t)rr->integer; + break; + case DS_TYPE_ABSOLUTE: + val.gauge = (absolute_t)rr->integer; + break; + } + break; + case REDIS_REPLY_STRING: + if (parse_value (rr->str, &val, ds->ds[0].type) == -1) { + WARNING("redis plugin: Unable to parse field `%s'.", rq->type); + freeReplyObject (rr); + return (-1); + } + break; + default: + WARNING("redis plugin: Cannot coerce redis type."); + freeReplyObject(rr); + return (-1); + } + + redis_submit(rn->name, rq->type, (strlen(rq->instance) >0)?rq->instance:NULL, val); + freeReplyObject (rr); + return 0; +} /* }}} int redis_handle_query */ + +static int redis_read (void) /* {{{ */ +{ redis_node_t *rn; + redis_query_t *rq; - status = -1; - if ( (iter = c_avl_get_iterator (redis_tree)) == NULL ) + for (rn = nodes_head; rn != NULL; rn = rn->next) { - ERROR ("redis plugin: unable to iterate redis tree."); - return (-1); - } + redisContext *rh; + redisReply *rr; - while (c_avl_iterator_next (iter, (void *) &key, (void *) &rn) == 0) - { DEBUG ("redis plugin: querying info from node `%s' (%s:%d).", rn->name, rn->host, rn->port); - if ( (rh = credis_connect (rn->host, rn->port, rn->timeout)) == NULL ) + rh = redisConnectWithTimeout ((char *)rn->host, rn->port, rn->timeout); + if (rh == NULL) { ERROR ("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, rn->host, rn->port); - status = -1; - break; + continue; } - if ( (status = credis_info (rh, &info)) == -1 ) + if (strlen (rn->passwd) > 0) { - WARNING ("redis plugin: unable to get info from node `%s'.", rn->name); - credis_close (rh); - break; + DEBUG ("redis plugin: authenticating node `%s' passwd(%s).", rn->name, rn->passwd); + + if ((rr = redisCommand (rh, "AUTH %s", rn->passwd)) == NULL) + { + WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name); + goto redis_fail; + } + + if (rr->type != REDIS_REPLY_STATUS) + { + WARNING ("redis plugin: invalid authentication on node `%s'.", rn->name); + goto redis_fail; + } + + freeReplyObject (rr); } - /* typedef struct _cr_info { - * char redis_version[CREDIS_VERSION_STRING_SIZE]; - * int bgsave_in_progress; - * int connected_clients; - * int connected_slaves; - * unsigned int used_memory; - * long long changes_since_last_save; - * int last_save_time; - * long long total_connections_received; - * long long total_commands_processed; - * int uptime_in_seconds; - * int uptime_in_days; - * int role; - * } REDIS_INFO; */ - - DEBUG ("redis plugin: received info from node `%s': connected_clients = %d; " - "connected_slaves = %d; used_memory = %lu; changes_since_last_save = %lld; " - "bgsave_in_progress = %d; total_connections_received = %lld; " - "total_commands_processed = %lld; uptime_in_seconds = %ld", rn->name, - info.connected_clients, info.connected_slaves, info.used_memory, - info.changes_since_last_save, info.bgsave_in_progress, - info.total_connections_received, info.total_commands_processed, - info.uptime_in_seconds); - - redis_submit_g (rn->name, "connected_clients", NULL, info.connected_clients); - redis_submit_g (rn->name, "connected_slaves", NULL, info.connected_slaves); - redis_submit_g (rn->name, "used_memory", NULL, info.used_memory); - redis_submit_g (rn->name, "changes_since_last_save", NULL, info.changes_since_last_save); - redis_submit_g (rn->name, "bgsave_in_progress", NULL, info.bgsave_in_progress); - redis_submit_c (rn->name, "total_connections_received", NULL, info.total_connections_received); - redis_submit_c (rn->name, "total_commands_processed", NULL, info.total_commands_processed); - redis_submit_c (rn->name, "uptime_in_seconds", NULL, info.uptime_in_seconds); - - credis_close (rh); - status = 0; - } + if ((rr = redisCommand(rh, "INFO")) == NULL) + { + WARNING ("redis plugin: unable to get info from node `%s'.", rn->name); + goto redis_fail; + } - c_avl_iterator_destroy(iter); - if ( status != 0 ) - { - return (-1); + redis_handle_info (rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "current_connections", "clients", "connected_clients", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "blocked_clients", NULL, "blocked_clients", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "memory", NULL, "used_memory", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", DS_TYPE_GAUGE); + /* changes_since_last_save: Deprecated in redis version 2.6 and above */ + redis_handle_info (rn->name, rr->str, "volatile_changes", NULL, "changes_since_last_save", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "total_connections", NULL, "total_connections_received", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "total_operations", NULL, "total_commands_processed", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "expired_keys", NULL, "expired_keys", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "evicted_keys", NULL, "evicted_keys", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "pubsub", "channels", "pubsub_channels", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE); + redis_handle_info (rn->name, rr->str, "cache_result", "hits", "keyspace_hits", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "cache_result", "misses", "keyspace_misses", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "total_bytes", "input", "total_net_input_bytes", DS_TYPE_DERIVE); + redis_handle_info (rn->name, rr->str, "total_bytes", "output", "total_net_output_bytes", DS_TYPE_DERIVE); + + for (rq = rn->queries; rq != NULL; rq = rq->next) + redis_handle_query(rh, rn, rq); + +redis_fail: + if (rr != NULL) + freeReplyObject (rr); + redisFree (rh); } return 0; @@ -332,6 +457,7 @@ static int redis_read (void) /* {{{ */ void module_register (void) /* {{{ */ { plugin_register_complex_config ("redis", redis_config); + plugin_register_init ("redis", redis_init); plugin_register_read ("redis", redis_read); /* TODO: plugin_register_write: one redis list per value id with * X elements */