redis plugin: Fix memleak
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "plugin.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #define REDIS_DEF_HOST "localhost"
32 #define REDIS_DEF_PASSWD ""
33 #define REDIS_DEF_PORT 6379
34 #define REDIS_DEF_TIMEOUT_SEC 2
35 #define REDIS_DEF_DB_COUNT 256
36 #define MAX_REDIS_VAL_SIZE 256
37 #define MAX_REDIS_QUERY 2048
38
39 /* Redis plugin configuration example:
40  *
41  * <Plugin redis>
42  *   <Node "mynode">
43  *     Host "localhost"
44  *     Port "6379"
45  *     Timeout 2
46  *     Password "foobar"
47  *   </Node>
48  * </Plugin>
49  */
50
51 struct redis_query_s;
52 typedef struct redis_query_s redis_query_t;
53 struct redis_query_s {
54   char query[MAX_REDIS_QUERY];
55   char type[DATA_MAX_NAME_LEN];
56   char instance[DATA_MAX_NAME_LEN];
57   int database;
58
59   redis_query_t *next;
60 };
61
62 struct redis_node_s;
63 typedef struct redis_node_s redis_node_t;
64 struct redis_node_s {
65   char *name;
66   char *host;
67   char *passwd;
68   int port;
69   struct timeval timeout;
70   bool report_command_stats;
71   bool report_cpu_usage;
72   redisContext *redisContext;
73   redis_query_t *queries;
74
75   redis_node_t *next;
76 };
77
78 static bool redis_have_instances;
79 static int redis_read(user_data_t *user_data);
80
81 static void redis_node_free(void *arg) {
82   redis_node_t *rn = arg;
83   if (rn == NULL)
84     return;
85
86   redis_query_t *rq = rn->queries;
87   while (rq != NULL) {
88     redis_query_t *next = rq->next;
89     sfree(rq);
90     rq = next;
91   }
92
93   redisFree(rn->redisContext);
94   sfree(rn->name);
95   sfree(rn->host);
96   sfree(rn->passwd);
97   sfree(rn);
98 } /* void redis_node_free */
99
100 static int redis_node_add(redis_node_t *rn) /* {{{ */
101 {
102   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
103
104   /* Disable automatic generation of default instance in the init callback. */
105   redis_have_instances = true;
106
107   char cb_name[sizeof("redis/") + DATA_MAX_NAME_LEN];
108   snprintf(cb_name, sizeof(cb_name), "redis/%s", rn->name);
109
110   return plugin_register_complex_read(
111       /* group = */ "redis",
112       /* name      = */ cb_name,
113       /* callback  = */ redis_read,
114       /* interval  = */ 0,
115       &(user_data_t){
116           .data = rn, .free_func = redis_node_free,
117       });
118 } /* }}} */
119
120 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
121 {
122   redis_query_t *rq;
123   int status;
124
125   rq = calloc(1, sizeof(*rq));
126   if (rq == NULL) {
127     ERROR("redis plugin: calloc failed adding redis_query.");
128     return NULL;
129   }
130   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
131   if (status != 0)
132     goto err;
133
134   /*
135    * Default to a gauge type.
136    */
137   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
138   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
139   replace_special(rq->instance, sizeof(rq->instance));
140
141   rq->database = 0;
142
143   for (int i = 0; i < ci->children_num; i++) {
144     oconfig_item_t *option = ci->children + i;
145
146     if (strcasecmp("Type", option->key) == 0) {
147       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
148     } else if (strcasecmp("Instance", option->key) == 0) {
149       status =
150           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
151     } else if (strcasecmp("Database", option->key) == 0) {
152       status = cf_util_get_int(option, &rq->database);
153       if (rq->database < 0) {
154         WARNING("redis plugin: The \"Database\" option must be positive "
155                 "integer or zero");
156         status = -1;
157       }
158     } else {
159       WARNING("redis plugin: unknown configuration option: %s", option->key);
160       status = -1;
161     }
162     if (status != 0)
163       goto err;
164   }
165   return rq;
166 err:
167   free(rq);
168   return NULL;
169 } /* }}} */
170
171 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
172 {
173   redis_node_t *rn = calloc(1, sizeof(*rn));
174   if (rn == NULL) {
175     ERROR("redis plugin: calloc failed adding node.");
176     return ENOMEM;
177   }
178
179   rn->port = REDIS_DEF_PORT;
180   rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
181   rn->report_cpu_usage = true;
182
183   rn->host = strdup(REDIS_DEF_HOST);
184   if (rn->host == NULL) {
185     ERROR("redis plugin: strdup failed adding node.");
186     sfree(rn);
187     return ENOMEM;
188   }
189
190   int status = cf_util_get_string(ci, &rn->name);
191   if (status != 0) {
192     sfree(rn->host);
193     sfree(rn);
194     return status;
195   }
196
197   for (int i = 0; i < ci->children_num; i++) {
198     oconfig_item_t *option = ci->children + i;
199
200     if (strcasecmp("Host", option->key) == 0)
201       status = cf_util_get_string(option, &rn->host);
202     else if (strcasecmp("Port", option->key) == 0) {
203       status = cf_util_get_port_number(option);
204       if (status > 0) {
205         rn->port = status;
206         status = 0;
207       }
208     } else if (strcasecmp("Query", option->key) == 0) {
209       redis_query_t *rq = redis_config_query(option);
210       if (rq == NULL) {
211         status = 1;
212       } else {
213         rq->next = rn->queries;
214         rn->queries = rq;
215       }
216     } else if (strcasecmp("Timeout", option->key) == 0) {
217       int timeout;
218       status = cf_util_get_int(option, &timeout);
219       if (status == 0) {
220         rn->timeout.tv_usec = timeout * 1000;
221         rn->timeout.tv_sec = rn->timeout.tv_usec / 1000000L;
222         rn->timeout.tv_usec %= 1000000L;
223       }
224     } else if (strcasecmp("Password", option->key) == 0)
225       status = cf_util_get_string(option, &rn->passwd);
226     else if (strcasecmp("ReportCommandStats", option->key) == 0)
227       status = cf_util_get_boolean(option, &rn->report_command_stats);
228     else if (strcasecmp("ReportCpuUsage", option->key) == 0)
229       status = cf_util_get_boolean(option, &rn->report_cpu_usage);
230     else
231       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
232               "block. I'll ignore this option.",
233               option->key);
234
235     if (status != 0)
236       break;
237   }
238
239   if (status != 0) {
240     redis_node_free(rn);
241     return status;
242   }
243
244   return redis_node_add(rn);
245 } /* }}} int redis_config_node */
246
247 static int redis_config(oconfig_item_t *ci) /* {{{ */
248 {
249   for (int i = 0; i < ci->children_num; i++) {
250     oconfig_item_t *option = ci->children + i;
251
252     if (strcasecmp("Node", option->key) == 0)
253       redis_config_node(option);
254     else
255       WARNING("redis plugin: Option `%s' not allowed in redis"
256               " configuration. It will be ignored.",
257               option->key);
258   }
259
260   return 0;
261 } /* }}} */
262
263 __attribute__((nonnull(2))) static void
264 redis_submit(const char *plugin_instance, const char *type,
265              const char *type_instance, value_t value) /* {{{ */
266 {
267   value_list_t vl = VALUE_LIST_INIT;
268
269   vl.values = &value;
270   vl.values_len = 1;
271   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
272   if (plugin_instance != NULL)
273     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
274   sstrncpy(vl.type, type, sizeof(vl.type));
275   if (type_instance != NULL)
276     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
277
278   plugin_dispatch_values(&vl);
279 } /* }}} */
280
281 __attribute__((nonnull(2))) static void
282 redis_submit2(const char *plugin_instance, const char *type,
283               const char *type_instance, value_t value0,
284               value_t value1) /* {{{ */
285 {
286   value_list_t vl = VALUE_LIST_INIT;
287   value_t values[] = {value0, value1};
288
289   vl.values = values;
290   vl.values_len = STATIC_ARRAY_SIZE(values);
291
292   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
293   sstrncpy(vl.type, type, sizeof(vl.type));
294
295   if (plugin_instance != NULL)
296     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
297
298   if (type_instance != NULL)
299     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
300
301   plugin_dispatch_values(&vl);
302 } /* }}} */
303
304 static int redis_init(void) /* {{{ */
305 {
306   if (redis_have_instances)
307     return 0;
308
309   redis_node_t *rn = calloc(1, sizeof(*rn));
310   if (rn == NULL)
311     return ENOMEM;
312
313   rn->port = REDIS_DEF_PORT;
314   rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
315
316   rn->name = strdup("default");
317   rn->host = strdup(REDIS_DEF_HOST);
318
319   if (rn->name == NULL || rn->host == NULL) {
320     sfree(rn->name);
321     sfree(rn->host);
322     sfree(rn);
323     return ENOMEM;
324   }
325
326   return redis_node_add(rn);
327 } /* }}} int redis_init */
328
329 static void *c_redisCommand(redis_node_t *rn, const char *format, ...) {
330   redisContext *c = rn->redisContext;
331
332   if (c == NULL)
333     return NULL;
334
335   va_list ap;
336   va_start(ap, format);
337   void *reply = redisvCommand(c, format, ap);
338   va_end(ap);
339
340   if (reply == NULL) {
341     ERROR("redis plugin: Connection error: %s", c->errstr);
342     redisFree(rn->redisContext);
343     rn->redisContext = NULL;
344   }
345
346   return reply;
347 } /* void c_redisCommand */
348
349 static int redis_get_info_value(char const *info_line, char const *field_name,
350                                 int ds_type, value_t *val) {
351   char *str = strstr(info_line, field_name);
352   static char buf[MAX_REDIS_VAL_SIZE];
353   if (str) {
354     int i;
355
356     str += strlen(field_name) + 1; /* also skip the ':' */
357     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
358          i++, str++)
359       buf[i] = *str;
360     buf[i] = '\0';
361
362     if (parse_value(buf, val, ds_type) == -1) {
363       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
364       return -1;
365     }
366
367     return 0;
368   }
369   return -1;
370 } /* int redis_get_info_value */
371
372 static int redis_handle_info(char *node, char const *info_line,
373                              char const *type, char const *type_instance,
374                              char const *field_name, int ds_type) /* {{{ */
375 {
376   value_t val;
377   if (redis_get_info_value(info_line, field_name, ds_type, &val) != 0)
378     return -1;
379
380   redis_submit(node, type, type_instance, val);
381   return 0;
382 } /* }}} int redis_handle_info */
383
384 static int redis_handle_query(redis_node_t *rn, redis_query_t *rq) /* {{{ */
385 {
386   redisReply *rr;
387   const data_set_t *ds;
388   value_t val;
389
390   ds = plugin_get_ds(rq->type);
391   if (!ds) {
392     ERROR("redis plugin: DS type `%s' not defined.", rq->type);
393     return -1;
394   }
395
396   if (ds->ds_num != 1) {
397     ERROR("redis plugin: DS type `%s' has too many datasources. This is not "
398           "supported currently.",
399           rq->type);
400     return -1;
401   }
402
403   if ((rr = c_redisCommand(rn, "SELECT %d", rq->database)) == NULL) {
404     WARNING("redis plugin: unable to switch to database `%d' on node `%s'.",
405             rq->database, rn->name);
406     return -1;
407   }
408
409   if ((rr = c_redisCommand(rn, rq->query)) == NULL) {
410     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
411     return -1;
412   }
413
414   switch (rr->type) {
415   case REDIS_REPLY_INTEGER:
416     switch (ds->ds[0].type) {
417     case DS_TYPE_COUNTER:
418       val.counter = (counter_t)rr->integer;
419       break;
420     case DS_TYPE_GAUGE:
421       val.gauge = (gauge_t)rr->integer;
422       break;
423     case DS_TYPE_DERIVE:
424       val.gauge = (derive_t)rr->integer;
425       break;
426     case DS_TYPE_ABSOLUTE:
427       val.gauge = (absolute_t)rr->integer;
428       break;
429     }
430     break;
431   case REDIS_REPLY_STRING:
432     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
433       WARNING("redis plugin: Query `%s': Unable to parse value.", rq->query);
434       freeReplyObject(rr);
435       return -1;
436     }
437     break;
438   case REDIS_REPLY_ERROR:
439     WARNING("redis plugin: Query `%s' failed: %s.", rq->query, rr->str);
440     freeReplyObject(rr);
441     return -1;
442   case REDIS_REPLY_ARRAY:
443     WARNING("redis plugin: Query `%s' should return string or integer. Arrays "
444             "are not supported.",
445             rq->query);
446     freeReplyObject(rr);
447     return -1;
448   default:
449     WARNING("redis plugin: Query `%s': Cannot coerce redis type (%i).",
450             rq->query, rr->type);
451     freeReplyObject(rr);
452     return -1;
453   }
454
455   redis_submit(rn->name, rq->type,
456                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
457   freeReplyObject(rr);
458   return 0;
459 } /* }}} int redis_handle_query */
460
461 static int redis_db_stats(const char *node, char const *info_line) /* {{{ */
462 {
463   /* redis_db_stats parses and dispatches Redis database statistics,
464    * currently the number of keys for each database.
465    * info_line needs to have the following format:
466    *   db0:keys=4,expires=0,avg_ttl=0
467    */
468
469   for (int db = 0; db < REDIS_DEF_DB_COUNT; db++) {
470     static char buf[MAX_REDIS_VAL_SIZE];
471     static char field_name[12];
472     static char db_id[4];
473     value_t val;
474     char *str;
475     int i;
476
477     snprintf(field_name, sizeof(field_name), "db%d:keys=", db);
478
479     str = strstr(info_line, field_name);
480     if (!str)
481       continue;
482
483     str += strlen(field_name);
484     for (i = 0; (*str && isdigit((int)*str)); i++, str++)
485       buf[i] = *str;
486     buf[i] = '\0';
487
488     if (parse_value(buf, &val, DS_TYPE_GAUGE) != 0) {
489       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
490       return -1;
491     }
492
493     snprintf(db_id, sizeof(db_id), "%d", db);
494     redis_submit(node, "records", db_id, val);
495   }
496   return 0;
497
498 } /* }}} int redis_db_stats */
499
500 static void redis_cpu_usage(const char *node, char const *info_line) {
501   while (42) {
502     value_t rusage_user;
503     value_t rusage_syst;
504
505     if (redis_get_info_value(info_line, "used_cpu_user", DS_TYPE_GAUGE,
506                              &rusage_user) != 0)
507       break;
508
509     if (redis_get_info_value(info_line, "used_cpu_sys", DS_TYPE_GAUGE,
510                              &rusage_syst) != 0)
511       break;
512
513     redis_submit2(node, "ps_cputime", "daemon",
514                   (value_t){.derive = rusage_user.gauge * 1000000},
515                   (value_t){.derive = rusage_syst.gauge * 1000000});
516     break;
517   }
518
519   while (42) {
520     value_t rusage_user;
521     value_t rusage_syst;
522
523     if (redis_get_info_value(info_line, "used_cpu_user_children", DS_TYPE_GAUGE,
524                              &rusage_user) != 0)
525       break;
526
527     if (redis_get_info_value(info_line, "used_cpu_sys_children", DS_TYPE_GAUGE,
528                              &rusage_syst) != 0)
529       break;
530
531     redis_submit2(node, "ps_cputime", "children",
532                   (value_t){.derive = rusage_user.gauge * 1000000},
533                   (value_t){.derive = rusage_syst.gauge * 1000000});
534     break;
535   }
536 } /* void redis_cpu_usage */
537
538 static void redis_check_connection(redis_node_t *rn) {
539   if (rn->redisContext)
540     return;
541
542   redisContext *rh = redisConnectWithTimeout(rn->host, rn->port, rn->timeout);
543
544   if (rh == NULL) {
545     ERROR("redis plugin: can't allocate redis context");
546     return;
547   }
548   if (rh->err) {
549     ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.", rn->name,
550           rn->host, rn->port, rh->errstr);
551     redisFree(rh);
552     return;
553   }
554
555   rn->redisContext = rh;
556
557   if (rn->passwd) {
558     redisReply *rr;
559
560     DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
561           rn->passwd);
562
563     if ((rr = c_redisCommand(rn, "AUTH %s", rn->passwd)) == NULL) {
564       WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
565       return;
566     }
567
568     if (rr->type != REDIS_REPLY_STATUS) {
569       WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
570       freeReplyObject(rr);
571       redisFree(rn->redisContext);
572       rn->redisContext = NULL;
573       return;
574     }
575
576     freeReplyObject(rr);
577   }
578   return;
579 } /* void redis_check_connection */
580
581 static void redis_read_server_info(redis_node_t *rn) {
582   redisReply *rr;
583
584   if ((rr = c_redisCommand(rn, "INFO")) == NULL) {
585     WARNING("redis plugin: unable to get INFO from node `%s'.", rn->name);
586     return;
587   }
588
589   redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
590                     DS_TYPE_GAUGE);
591   redis_handle_info(rn->name, rr->str, "current_connections", "clients",
592                     "connected_clients", DS_TYPE_GAUGE);
593   redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
594                     "blocked_clients", DS_TYPE_GAUGE);
595   redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
596                     DS_TYPE_GAUGE);
597   redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
598                     DS_TYPE_GAUGE);
599   /* changes_since_last_save: Deprecated in redis version 2.6 and above */
600   redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
601                     "changes_since_last_save", DS_TYPE_GAUGE);
602   redis_handle_info(rn->name, rr->str, "total_connections", NULL,
603                     "total_connections_received", DS_TYPE_DERIVE);
604   redis_handle_info(rn->name, rr->str, "total_operations", NULL,
605                     "total_commands_processed", DS_TYPE_DERIVE);
606   redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
607                     "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
608   redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
609                     DS_TYPE_DERIVE);
610   redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
611                     DS_TYPE_DERIVE);
612   redis_handle_info(rn->name, rr->str, "pubsub", "channels", "pubsub_channels",
613                     DS_TYPE_GAUGE);
614   redis_handle_info(rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns",
615                     DS_TYPE_GAUGE);
616   redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
617                     "connected_slaves", DS_TYPE_GAUGE);
618   redis_handle_info(rn->name, rr->str, "cache_result", "hits", "keyspace_hits",
619                     DS_TYPE_DERIVE);
620   redis_handle_info(rn->name, rr->str, "cache_result", "misses",
621                     "keyspace_misses", DS_TYPE_DERIVE);
622   redis_handle_info(rn->name, rr->str, "total_bytes", "input",
623                     "total_net_input_bytes", DS_TYPE_DERIVE);
624   redis_handle_info(rn->name, rr->str, "total_bytes", "output",
625                     "total_net_output_bytes", DS_TYPE_DERIVE);
626
627   redis_db_stats(rn->name, rr->str);
628
629   if (rn->report_cpu_usage)
630     redis_cpu_usage(rn->name, rr->str);
631
632   freeReplyObject(rr);
633 } /* void redis_read_server_info */
634
635 static void redis_read_command_stats(redis_node_t *rn) {
636   redisReply *rr;
637
638   if ((rr = c_redisCommand(rn, "INFO commandstats")) == NULL) {
639     WARNING("redis plugin: node `%s': unable to get `INFO commandstats'.",
640             rn->name);
641     return;
642   }
643
644   if (rr->type != REDIS_REPLY_STRING) {
645     WARNING("redis plugin: node `%s' `INFO commandstats' returned unsupported "
646             "redis type %i.",
647             rn->name, rr->type);
648     freeReplyObject(rr);
649     return;
650   }
651
652   char *command;
653   char *line;
654   char *ptr = rr->str;
655   char *saveptr = NULL;
656   while ((line = strtok_r(ptr, "\n\r", &saveptr)) != NULL) {
657     ptr = NULL;
658
659     if (line[0] == '#')
660       continue;
661
662     /* command name */
663     if (strstr(line, "cmdstat_") != line) {
664       ERROR("redis plugin: not found 'cmdstat_' prefix in line '%s'", line);
665       continue;
666     }
667
668     char *values = strstr(line, ":");
669     if (values == NULL) {
670       ERROR("redis plugin: not found ':' separator in line '%s'", line);
671       continue;
672     }
673
674     /* Null-terminate command token */
675     values[0] = '\0';
676     command = line + strlen("cmdstat_");
677     values++;
678
679     /* parse values */
680     /* cmdstat_publish:calls=20795774,usec=111039258,usec_per_call=5.34 */
681     char *field;
682     char *saveptr_field = NULL;
683     while ((field = strtok_r(values, "=", &saveptr_field)) != NULL) {
684       values = NULL;
685
686       const char *type;
687       /* only these are supported */
688       if (strcmp(field, "calls") == 0)
689         type = "commands";
690       else if (strcmp(field, "usec") == 0)
691         type = "redis_command_cputime";
692       else
693         continue;
694
695       if ((field = strtok_r(NULL, ",", &saveptr_field)) == NULL)
696         continue;
697
698       char *endptr = NULL;
699       errno = 0;
700       derive_t value = strtoll(field, &endptr, 0);
701
702       if ((endptr == field) || (errno != 0))
703         continue;
704
705       redis_submit(rn->name, type, command, (value_t){.derive = value});
706     }
707   }
708   freeReplyObject(rr);
709 } /* void redis_read_command_stats */
710
711 static int redis_read(user_data_t *user_data) /* {{{ */
712 {
713   redis_node_t *rn = user_data->data;
714
715   DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
716         rn->host, rn->port);
717
718   redis_check_connection(rn);
719
720   if (!rn->redisContext) /* no connection */
721     return -1;
722
723   redis_read_server_info(rn);
724
725   if (!rn->redisContext) /* connection lost */
726     return -1;
727
728   if (rn->report_command_stats) {
729     redis_read_command_stats(rn);
730
731     if (!rn->redisContext) /* connection lost */
732       return -1;
733   }
734
735   for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
736     redis_handle_query(rn, rq);
737     if (!rn->redisContext) /* connection lost */
738       return -1;
739   }
740
741   return 0;
742 }
743 /* }}} */
744
745 void module_register(void) /* {{{ */
746 {
747   plugin_register_complex_config("redis", redis_config);
748   plugin_register_init("redis", redis_init);
749 }
750 /* }}} */