redis plugin: Reworked to use plugin_register_complex_read()
authorPavel Rochnyack <pavel2000@ngs.ru>
Sun, 17 Jun 2018 13:49:35 +0000 (20:49 +0700)
committerPavel Rochnyack <pavel2000@ngs.ru>
Sun, 17 Jun 2018 13:49:35 +0000 (20:49 +0700)
src/collectd.conf.pod
src/redis.c

index 31a094d..093ff76 100644 (file)
@@ -7307,7 +7307,7 @@ Server state is requested by sending I<INFO> query.
 The B<Node> block identifies a new Redis node, that is a new Redis instance
 running in an specified host and port. The name for node is a canonical
 identifier which is used as I<plugin instance>. It is limited to
-64E<nbsp>characters in length.
+128E<nbsp>characters in length.
 
 When no B<Node> is configured explicitly, plugin connects to "localhost:6379".
 
@@ -7329,9 +7329,9 @@ Use I<Password> to authenticate when connecting to I<Redis>.
 =item B<Timeout> I<Milliseconds>
 
 The B<Timeout> option set the socket timeout for node response. Since the Redis
-read function is blocking, you should keep this value as low as possible. Keep
-in mind that the sum of all B<Timeout> values for all B<Nodes> should be lower
-than B<Interval> defined globally.
+read function is blocking, you should keep this value as low as possible.
+It is expected what B<Timeout> values should be lower than B<Interval> defined
+globally.
 
 Defaults to 2000 (2 seconds).
 
@@ -7353,7 +7353,7 @@ See L<types.db(5)> for more details on types and their configuration.
 
 Within a query definition, an optional type instance to use when submitting
 the result of the query. When not supplied will default to the escaped
-command, up to 64 chars.
+command, up to 128 chars.
 
 =item B<Database> I<Index>
 
index 9c92106..83cf6f5 100644 (file)
 #include <hiredis/hiredis.h>
 #include <sys/time.h>
 
-#ifndef HOST_NAME_MAX
-#define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
-#endif
-
 #define REDIS_DEF_HOST "localhost"
 #define REDIS_DEF_PASSWD ""
 #define REDIS_DEF_PORT 6379
 #define REDIS_DEF_TIMEOUT_SEC 2
 #define REDIS_DEF_DB_COUNT 256
-#define MAX_REDIS_NODE_NAME 64
-#define MAX_REDIS_PASSWD_LENGTH 512
 #define MAX_REDIS_VAL_SIZE 256
 #define MAX_REDIS_QUERY 2048
 
@@ -68,9 +62,9 @@ struct redis_query_s {
 struct redis_node_s;
 typedef struct redis_node_s redis_node_t;
 struct redis_node_s {
-  char name[MAX_REDIS_NODE_NAME];
-  char host[HOST_NAME_MAX];
-  char passwd[MAX_REDIS_PASSWD_LENGTH];
+  char *name;
+  char *host;
+  char *passwd;
   int port;
   struct timeval timeout;
   redisContext *redisContext;
@@ -79,44 +73,46 @@ struct redis_node_s {
   redis_node_t *next;
 };
 
-static redis_node_t *nodes_head;
-
-static int redis_node_add(const redis_node_t *rn) /* {{{ */
-{
-  redis_node_t *rn_copy;
-  redis_node_t *rn_ptr;
-
-  /* Check for duplicates first */
-  for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next)
-    if (strcmp(rn->name, rn_ptr->name) == 0)
-      break;
+static bool redis_have_instances;
+static int redis_read(user_data_t *user_data);
 
-  if (rn_ptr != NULL) {
-    ERROR("redis plugin: A node with the name `%s' already exists.", rn->name);
-    return -1;
-  }
+static void redis_node_free(void *arg) {
+  redis_node_t *rn = arg;
+  if (rn == NULL)
+    return;
 
-  rn_copy = malloc(sizeof(*rn_copy));
-  if (rn_copy == NULL) {
-    ERROR("redis plugin: malloc failed adding redis_node to the tree.");
-    return -1;
+  redis_query_t *rq = rn->queries;
+  while (rq != NULL) {
+    redis_query_t *next = rq->next;
+    sfree(rq);
+    rq = next;
   }
 
-  memcpy(rn_copy, rn, sizeof(*rn_copy));
-  rn_copy->next = NULL;
+  redisFree(rn->redisContext);
+  sfree(rn->name);
+  sfree(rn->host);
+  sfree(rn->passwd);
+  sfree(rn);
+} /* void redis_node_free */
 
+static int redis_node_add(redis_node_t *rn) /* {{{ */
+{
   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
 
-  if (nodes_head == NULL)
-    nodes_head = rn_copy;
-  else {
-    rn_ptr = nodes_head;
-    while (rn_ptr->next != NULL)
-      rn_ptr = rn_ptr->next;
-    rn_ptr->next = rn_copy;
-  }
+  /* Disable automatic generation of default instance in the init callback. */
+  redis_have_instances = true;
 
-  return 0;
+  char cb_name[sizeof("redis/") + DATA_MAX_NAME_LEN];
+  snprintf(cb_name, sizeof(cb_name), "redis/%s", rn->name);
+
+  return plugin_register_complex_read(
+      /* group = */ "redis",
+      /* name      = */ cb_name,
+      /* callback  = */ redis_read,
+      /* interval  = */ 0,
+      &(user_data_t){
+          .data = rn, .free_func = redis_node_free,
+      });
 } /* }}} */
 
 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
@@ -172,47 +168,58 @@ err:
 
 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
 {
-  redis_query_t *rq;
-  int status;
-  int timeout;
+  redis_node_t *rn = calloc(1, sizeof(*rn));
+  if (rn == NULL) {
+    ERROR("redis plugin: calloc failed adding node.");
+    return ENOMEM;
+  }
 
-  redis_node_t rn = {.port = REDIS_DEF_PORT,
-                     .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC};
+  rn->port = REDIS_DEF_PORT;
+  rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
 
-  sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host));
+  rn->host = strdup(REDIS_DEF_HOST);
+  if (rn->host == NULL) {
+    ERROR("redis plugin: strdup failed adding node.");
+    sfree(rn);
+    return ENOMEM;
+  }
 
-  status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name));
-  if (status != 0)
+  int status = cf_util_get_string(ci, &rn->name);
+  if (status != 0) {
+    sfree(rn->host);
+    sfree(rn);
     return status;
+  }
 
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *option = ci->children + i;
 
     if (strcasecmp("Host", option->key) == 0)
-      status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host));
+      status = cf_util_get_string(option, &rn->host);
     else if (strcasecmp("Port", option->key) == 0) {
       status = cf_util_get_port_number(option);
       if (status > 0) {
-        rn.port = status;
+        rn->port = status;
         status = 0;
       }
     } else if (strcasecmp("Query", option->key) == 0) {
-      rq = redis_config_query(option);
+      redis_query_t *rq = redis_config_query(option);
       if (rq == NULL) {
         status = 1;
       } else {
-        rq->next = rn.queries;
-        rn.queries = rq;
+        rq->next = rn->queries;
+        rn->queries = rq;
       }
     } else if (strcasecmp("Timeout", option->key) == 0) {
+      int timeout;
       status = cf_util_get_int(option, &timeout);
       if (status == 0) {
-        rn.timeout.tv_usec = timeout * 1000;
-        rn.timeout.tv_sec = rn.timeout.tv_usec / 1000000L;
-        rn.timeout.tv_usec %= 1000000L;
+        rn->timeout.tv_usec = timeout * 1000;
+        rn->timeout.tv_sec = rn->timeout.tv_usec / 1000000L;
+        rn->timeout.tv_usec %= 1000000L;
       }
     } else if (strcasecmp("Password", option->key) == 0)
-      status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd));
+      status = cf_util_get_string(option, &rn->passwd);
     else
       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
               "block. I'll ignore this option.",
@@ -222,10 +229,12 @@ static int redis_config_node(oconfig_item_t *ci) /* {{{ */
       break;
   }
 
-  if (status != 0)
+  if (status != 0) {
+    redis_node_free(rn);
     return status;
+  }
 
-  return redis_node_add(&rn);
+  return redis_node_add(rn);
 } /* }}} int redis_config_node */
 
 static int redis_config(oconfig_item_t *ci) /* {{{ */
@@ -241,11 +250,6 @@ static int redis_config(oconfig_item_t *ci) /* {{{ */
               option->key);
   }
 
-  if (nodes_head == NULL) {
-    ERROR("redis plugin: No valid node configuration could be found.");
-    return ENOENT;
-  }
-
   return 0;
 } /* }}} */
 
@@ -269,16 +273,23 @@ redis_submit(char *plugin_instance, const char *type, const char *type_instance,
 
 static int redis_init(void) /* {{{ */
 {
-  redis_node_t rn = {.name = "default",
-                     .host = REDIS_DEF_HOST,
-                     .port = REDIS_DEF_PORT,
-                     .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC,
-                     .next = NULL};
+  if (redis_have_instances)
+    return 0;
 
-  if (nodes_head == NULL)
-    redis_node_add(&rn);
+  redis_node_t *rn = calloc(1, sizeof(*rn));
+  if (rn == NULL)
+    return ENOMEM;
 
-  return 0;
+  rn->port = REDIS_DEF_PORT;
+  rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
+
+  rn->name = strdup("default");
+  rn->host = strdup(REDIS_DEF_HOST);
+
+  if (rn->name == NULL || rn->host == NULL)
+    return ENOMEM;
+
+  return redis_node_add(rn);
 } /* }}} int redis_init */
 
 static void *c_redisCommand(redis_node_t *rn, const char *format, ...) {
@@ -449,8 +460,7 @@ static void redis_check_connection(redis_node_t *rn) {
   if (rn->redisContext)
     return;
 
-  redisContext *rh =
-      redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
+  redisContext *rh = redisConnectWithTimeout(rn->host, rn->port, rn->timeout);
 
   if (rh == NULL) {
     ERROR("redis plugin: can't allocate redis context");
@@ -465,7 +475,7 @@ static void redis_check_connection(redis_node_t *rn) {
 
   rn->redisContext = rh;
 
-  if (strlen(rn->passwd) > 0) {
+  if (rn->passwd) {
     redisReply *rr;
 
     DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
@@ -540,27 +550,27 @@ static void redis_read_server_info(redis_node_t *rn) {
   freeReplyObject(rr);
 } /* void redis_read_server_info */
 
-static int redis_read(void) /* {{{ */
+static int redis_read(user_data_t *user_data) /* {{{ */
 {
-  for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
-    DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
-          rn->host, rn->port);
+  redis_node_t *rn = user_data->data;
 
-    redis_check_connection(rn);
+  DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
+        rn->host, rn->port);
 
-    if (!rn->redisContext) /* no connection */
-      continue;
+  redis_check_connection(rn);
 
-    redis_read_server_info(rn);
+  if (!rn->redisContext) /* no connection */
+    return -1;
 
-    if (!rn->redisContext) /* connection lost */
-      continue;
+  redis_read_server_info(rn);
 
-    for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
-      redis_handle_query(rn, rq);
-      if (!rn->redisContext) /* connection lost */
-        break;
-    }
+  if (!rn->redisContext) /* connection lost */
+    return -1;
+
+  for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
+    redis_handle_query(rn, rq);
+    if (!rn->redisContext) /* connection lost */
+      return -1;
   }
 
   return 0;
@@ -571,8 +581,5 @@ void module_register(void) /* {{{ */
 {
   plugin_register_complex_config("redis", redis_config);
   plugin_register_init("redis", redis_init);
-  plugin_register_read("redis", redis_read);
-  /* TODO: plugin_register_write: one redis list per value id with
-   * X elements */
 }
 /* }}} */