12f68946c123d9e1d6b86a34039302af744fe61b
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "plugin.h"
26 #include "utils/common/common.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #define REDIS_DEF_HOST "localhost"
32 #define REDIS_DEF_PASSWD ""
33 #define REDIS_DEF_PORT 6379
34 #define REDIS_DEF_TIMEOUT_SEC 2
35 #define REDIS_DEF_DB_COUNT 256
36 #define MAX_REDIS_VAL_SIZE 256
37 #define MAX_REDIS_QUERY 2048
38
39 /* Redis plugin configuration example:
40  *
41  * <Plugin redis>
42  *   <Node "mynode">
43  *     Host "localhost"
44  *     Port "6379"
45  *     Timeout 2
46  *     Password "foobar"
47  *   </Node>
48  * </Plugin>
49  */
50
51 struct redis_query_s;
52 typedef struct redis_query_s redis_query_t;
53 struct redis_query_s {
54   char query[MAX_REDIS_QUERY];
55   char type[DATA_MAX_NAME_LEN];
56   char instance[DATA_MAX_NAME_LEN];
57   int database;
58
59   redis_query_t *next;
60 };
61
62 struct prev_s {
63   derive_t keyspace_hits;
64   derive_t keyspace_misses;
65 };
66 typedef struct prev_s prev_t;
67
68 struct redis_node_s;
69 typedef struct redis_node_s redis_node_t;
70 struct redis_node_s {
71   char *name;
72   char *host;
73   char *socket;
74   char *passwd;
75   int port;
76   struct timeval timeout;
77   bool report_command_stats;
78   bool report_cpu_usage;
79   redisContext *redisContext;
80   redis_query_t *queries;
81   prev_t prev;
82
83   redis_node_t *next;
84 };
85
86 static bool redis_have_instances;
87 static int redis_read(user_data_t *user_data);
88
89 static void redis_node_free(void *arg) {
90   redis_node_t *rn = arg;
91   if (rn == NULL)
92     return;
93
94   redis_query_t *rq = rn->queries;
95   while (rq != NULL) {
96     redis_query_t *next = rq->next;
97     sfree(rq);
98     rq = next;
99   }
100
101   if (rn->redisContext)
102     redisFree(rn->redisContext);
103   sfree(rn->name);
104   sfree(rn->host);
105   sfree(rn->socket);
106   sfree(rn->passwd);
107   sfree(rn);
108 } /* void redis_node_free */
109
110 static int redis_node_add(redis_node_t *rn) /* {{{ */
111 {
112   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
113
114   /* Disable automatic generation of default instance in the init callback. */
115   redis_have_instances = true;
116
117   char cb_name[sizeof("redis/") + DATA_MAX_NAME_LEN];
118   ssnprintf(cb_name, sizeof(cb_name), "redis/%s", rn->name);
119
120   return plugin_register_complex_read(
121       /* group = */ "redis",
122       /* name      = */ cb_name,
123       /* callback  = */ redis_read,
124       /* interval  = */ 0,
125       &(user_data_t){
126           .data = rn, .free_func = redis_node_free,
127       });
128 } /* }}} */
129
130 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
131 {
132   redis_query_t *rq;
133   int status;
134
135   rq = calloc(1, sizeof(*rq));
136   if (rq == NULL) {
137     ERROR("redis plugin: calloc failed adding redis_query.");
138     return NULL;
139   }
140   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
141   if (status != 0)
142     goto err;
143
144   /*
145    * Default to a gauge type.
146    */
147   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
148   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
149   replace_special(rq->instance, sizeof(rq->instance));
150
151   rq->database = 0;
152
153   for (int i = 0; i < ci->children_num; i++) {
154     oconfig_item_t *option = ci->children + i;
155
156     if (strcasecmp("Type", option->key) == 0) {
157       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
158     } else if (strcasecmp("Instance", option->key) == 0) {
159       status =
160           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
161     } else if (strcasecmp("Database", option->key) == 0) {
162       status = cf_util_get_int(option, &rq->database);
163       if (rq->database < 0) {
164         WARNING("redis plugin: The \"Database\" option must be positive "
165                 "integer or zero");
166         status = -1;
167       }
168     } else {
169       WARNING("redis plugin: unknown configuration option: %s", option->key);
170       status = -1;
171     }
172     if (status != 0)
173       goto err;
174   }
175   return rq;
176 err:
177   free(rq);
178   return NULL;
179 } /* }}} */
180
181 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
182 {
183   redis_node_t *rn = calloc(1, sizeof(*rn));
184   if (rn == NULL) {
185     ERROR("redis plugin: calloc failed adding node.");
186     return ENOMEM;
187   }
188
189   rn->port = REDIS_DEF_PORT;
190   rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
191   rn->report_cpu_usage = true;
192
193   rn->host = strdup(REDIS_DEF_HOST);
194   if (rn->host == NULL) {
195     ERROR("redis plugin: strdup failed adding node.");
196     sfree(rn);
197     return ENOMEM;
198   }
199
200   int status = cf_util_get_string(ci, &rn->name);
201   if (status != 0) {
202     sfree(rn->host);
203     sfree(rn);
204     return status;
205   }
206
207   for (int i = 0; i < ci->children_num; i++) {
208     oconfig_item_t *option = ci->children + i;
209
210     if (strcasecmp("Host", option->key) == 0)
211       status = cf_util_get_string(option, &rn->host);
212     else if (strcasecmp("Port", option->key) == 0) {
213       status = cf_util_get_port_number(option);
214       if (status > 0) {
215         rn->port = status;
216         status = 0;
217       }
218     } else if (strcasecmp("Socket", option->key) == 0) {
219       status = cf_util_get_string(option, &rn->socket);
220     } else if (strcasecmp("Query", option->key) == 0) {
221       redis_query_t *rq = redis_config_query(option);
222       if (rq == NULL) {
223         status = 1;
224       } else {
225         rq->next = rn->queries;
226         rn->queries = rq;
227       }
228     } else if (strcasecmp("Timeout", option->key) == 0) {
229       int timeout;
230       status = cf_util_get_int(option, &timeout);
231       if (status == 0) {
232         rn->timeout.tv_usec = timeout * 1000;
233         rn->timeout.tv_sec = rn->timeout.tv_usec / 1000000L;
234         rn->timeout.tv_usec %= 1000000L;
235       }
236     } else if (strcasecmp("Password", option->key) == 0)
237       status = cf_util_get_string(option, &rn->passwd);
238     else if (strcasecmp("ReportCommandStats", option->key) == 0)
239       status = cf_util_get_boolean(option, &rn->report_command_stats);
240     else if (strcasecmp("ReportCpuUsage", option->key) == 0)
241       status = cf_util_get_boolean(option, &rn->report_cpu_usage);
242     else
243       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
244               "block. I'll ignore this option.",
245               option->key);
246
247     if (status != 0)
248       break;
249   }
250
251   if (status != 0) {
252     redis_node_free(rn);
253     return status;
254   }
255
256   return redis_node_add(rn);
257 } /* }}} int redis_config_node */
258
259 static int redis_config(oconfig_item_t *ci) /* {{{ */
260 {
261   for (int i = 0; i < ci->children_num; i++) {
262     oconfig_item_t *option = ci->children + i;
263
264     if (strcasecmp("Node", option->key) == 0)
265       redis_config_node(option);
266     else
267       WARNING("redis plugin: Option `%s' not allowed in redis"
268               " configuration. It will be ignored.",
269               option->key);
270   }
271
272   return 0;
273 } /* }}} */
274
275 __attribute__((nonnull(2))) static void
276 redis_submit(const char *plugin_instance, const char *type,
277              const char *type_instance, value_t value) /* {{{ */
278 {
279   value_list_t vl = VALUE_LIST_INIT;
280
281   vl.values = &value;
282   vl.values_len = 1;
283   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
284   if (plugin_instance != NULL)
285     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
286   sstrncpy(vl.type, type, sizeof(vl.type));
287   if (type_instance != NULL)
288     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
289
290   plugin_dispatch_values(&vl);
291 } /* }}} */
292
293 __attribute__((nonnull(2))) static void
294 redis_submit2(const char *plugin_instance, const char *type,
295               const char *type_instance, value_t value0,
296               value_t value1) /* {{{ */
297 {
298   value_list_t vl = VALUE_LIST_INIT;
299   value_t values[] = {value0, value1};
300
301   vl.values = values;
302   vl.values_len = STATIC_ARRAY_SIZE(values);
303
304   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
305   sstrncpy(vl.type, type, sizeof(vl.type));
306
307   if (plugin_instance != NULL)
308     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
309
310   if (type_instance != NULL)
311     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
312
313   plugin_dispatch_values(&vl);
314 } /* }}} */
315
316 static int redis_init(void) /* {{{ */
317 {
318   if (redis_have_instances)
319     return 0;
320
321   redis_node_t *rn = calloc(1, sizeof(*rn));
322   if (rn == NULL)
323     return ENOMEM;
324
325   rn->port = REDIS_DEF_PORT;
326   rn->timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC;
327
328   rn->name = strdup("default");
329   rn->host = strdup(REDIS_DEF_HOST);
330
331   if (rn->name == NULL || rn->host == NULL) {
332     sfree(rn->name);
333     sfree(rn->host);
334     sfree(rn);
335     return ENOMEM;
336   }
337
338   return redis_node_add(rn);
339 } /* }}} int redis_init */
340
341 static void *c_redisCommand(redis_node_t *rn, const char *format, ...) {
342   redisContext *c = rn->redisContext;
343
344   if (c == NULL)
345     return NULL;
346
347   va_list ap;
348   va_start(ap, format);
349   void *reply = redisvCommand(c, format, ap);
350   va_end(ap);
351
352   if (reply == NULL) {
353     ERROR("redis plugin: Connection error: %s", c->errstr);
354     redisFree(rn->redisContext);
355     rn->redisContext = NULL;
356   }
357
358   return reply;
359 } /* void c_redisCommand */
360
361 static int redis_get_info_value(char const *info_line, char const *field_name,
362                                 int ds_type, value_t *val) {
363   char *str = strstr(info_line, field_name);
364   static char buf[MAX_REDIS_VAL_SIZE];
365   if (str) {
366     int i;
367
368     str += strlen(field_name) + 1; /* also skip the ':' */
369     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
370          i++, str++)
371       buf[i] = *str;
372     buf[i] = '\0';
373
374     if (parse_value(buf, val, ds_type) == -1) {
375       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
376       return -1;
377     }
378
379     return 0;
380   }
381   return -1;
382 } /* int redis_get_info_value */
383
384 static int redis_handle_info(char *node, char const *info_line,
385                              char const *type, char const *type_instance,
386                              char const *field_name, int ds_type) /* {{{ */
387 {
388   value_t val;
389   if (redis_get_info_value(info_line, field_name, ds_type, &val) != 0)
390     return -1;
391
392   redis_submit(node, type, type_instance, val);
393   return 0;
394 } /* }}} int redis_handle_info */
395
396 static int redis_handle_query(redis_node_t *rn, redis_query_t *rq) /* {{{ */
397 {
398   redisReply *rr;
399   const data_set_t *ds;
400   value_t val;
401
402   ds = plugin_get_ds(rq->type);
403   if (!ds) {
404     ERROR("redis plugin: DS type `%s' not defined.", rq->type);
405     return -1;
406   }
407
408   if (ds->ds_num != 1) {
409     ERROR("redis plugin: DS type `%s' has too many datasources. This is not "
410           "supported currently.",
411           rq->type);
412     return -1;
413   }
414
415   if ((rr = c_redisCommand(rn, "SELECT %d", rq->database)) == NULL) {
416     WARNING("redis plugin: unable to switch to database `%d' on node `%s'.",
417             rq->database, rn->name);
418     return -1;
419   }
420
421   if ((rr = c_redisCommand(rn, rq->query)) == NULL) {
422     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
423     return -1;
424   }
425
426   switch (rr->type) {
427   case REDIS_REPLY_INTEGER:
428     switch (ds->ds[0].type) {
429     case DS_TYPE_COUNTER:
430       val.counter = (counter_t)rr->integer;
431       break;
432     case DS_TYPE_GAUGE:
433       val.gauge = (gauge_t)rr->integer;
434       break;
435     case DS_TYPE_DERIVE:
436       val.gauge = (derive_t)rr->integer;
437       break;
438     case DS_TYPE_ABSOLUTE:
439       val.gauge = (absolute_t)rr->integer;
440       break;
441     }
442     break;
443   case REDIS_REPLY_STRING:
444     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
445       WARNING("redis plugin: Query `%s': Unable to parse value.", rq->query);
446       freeReplyObject(rr);
447       return -1;
448     }
449     break;
450   case REDIS_REPLY_ERROR:
451     WARNING("redis plugin: Query `%s' failed: %s.", rq->query, rr->str);
452     freeReplyObject(rr);
453     return -1;
454   case REDIS_REPLY_ARRAY:
455     WARNING("redis plugin: Query `%s' should return string or integer. Arrays "
456             "are not supported.",
457             rq->query);
458     freeReplyObject(rr);
459     return -1;
460   default:
461     WARNING("redis plugin: Query `%s': Cannot coerce redis type (%i).",
462             rq->query, rr->type);
463     freeReplyObject(rr);
464     return -1;
465   }
466
467   redis_submit(rn->name, rq->type,
468                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
469   freeReplyObject(rr);
470   return 0;
471 } /* }}} int redis_handle_query */
472
473 static int redis_db_stats(const char *node, char const *info_line) /* {{{ */
474 {
475   /* redis_db_stats parses and dispatches Redis database statistics,
476    * currently the number of keys for each database.
477    * info_line needs to have the following format:
478    *   db0:keys=4,expires=0,avg_ttl=0
479    */
480
481   for (int db = 0; db < REDIS_DEF_DB_COUNT; db++) {
482     static char buf[MAX_REDIS_VAL_SIZE];
483     static char field_name[12];
484     static char db_id[4];
485     value_t val;
486     char *str;
487     int i;
488
489     ssnprintf(field_name, sizeof(field_name), "db%d:keys=", db);
490
491     str = strstr(info_line, field_name);
492     if (!str)
493       continue;
494
495     str += strlen(field_name);
496     for (i = 0; (*str && isdigit((int)*str)); i++, str++)
497       buf[i] = *str;
498     buf[i] = '\0';
499
500     if (parse_value(buf, &val, DS_TYPE_GAUGE) != 0) {
501       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
502       return -1;
503     }
504
505     ssnprintf(db_id, sizeof(db_id), "%d", db);
506     redis_submit(node, "records", db_id, val);
507   }
508   return 0;
509
510 } /* }}} int redis_db_stats */
511
512 static void redis_cpu_usage(const char *node, char const *info_line) {
513   while (42) {
514     value_t rusage_user;
515     value_t rusage_syst;
516
517     if (redis_get_info_value(info_line, "used_cpu_user", DS_TYPE_GAUGE,
518                              &rusage_user) != 0)
519       break;
520
521     if (redis_get_info_value(info_line, "used_cpu_sys", DS_TYPE_GAUGE,
522                              &rusage_syst) != 0)
523       break;
524
525     redis_submit2(node, "ps_cputime", "daemon",
526                   (value_t){.derive = rusage_user.gauge * 1000000},
527                   (value_t){.derive = rusage_syst.gauge * 1000000});
528     break;
529   }
530
531   while (42) {
532     value_t rusage_user;
533     value_t rusage_syst;
534
535     if (redis_get_info_value(info_line, "used_cpu_user_children", DS_TYPE_GAUGE,
536                              &rusage_user) != 0)
537       break;
538
539     if (redis_get_info_value(info_line, "used_cpu_sys_children", DS_TYPE_GAUGE,
540                              &rusage_syst) != 0)
541       break;
542
543     redis_submit2(node, "ps_cputime", "children",
544                   (value_t){.derive = rusage_user.gauge * 1000000},
545                   (value_t){.derive = rusage_syst.gauge * 1000000});
546     break;
547   }
548 } /* void redis_cpu_usage */
549
550 static gauge_t calculate_ratio_percent(derive_t part1, derive_t part2,
551                                        derive_t *prev1, derive_t *prev2) {
552   if ((*prev1 == 0) || (*prev2 == 0) || (part1 < *prev1) || (part2 < *prev2)) {
553     *prev1 = part1;
554     *prev2 = part2;
555     return NAN;
556   }
557
558   derive_t num = part1 - *prev1;
559   derive_t denom = part2 - *prev2 + num;
560
561   *prev1 = part1;
562   *prev2 = part2;
563
564   if (denom == 0)
565     return NAN;
566
567   if (num == 0)
568     return 0;
569
570   return 100.0 * (gauge_t)num / (gauge_t)denom;
571 } /* gauge_t calculate_ratio_percent */
572
573 static void redis_keyspace_usage(redis_node_t *rn, char const *info_line) {
574   value_t hits, misses;
575
576   if (redis_get_info_value(info_line, "keyspace_hits", DS_TYPE_DERIVE, &hits) !=
577       0)
578     return;
579
580   if (redis_get_info_value(info_line, "keyspace_misses", DS_TYPE_DERIVE,
581                            &misses) != 0)
582     return;
583
584   redis_submit(rn->name, "cache_result", "hits", hits);
585   redis_submit(rn->name, "cache_result", "misses", misses);
586
587   prev_t *prev = &rn->prev;
588   gauge_t ratio = calculate_ratio_percent(
589       hits.derive, misses.derive, &prev->keyspace_hits, &prev->keyspace_misses);
590   redis_submit(rn->name, "percent", "hitratio", (value_t){.gauge = ratio});
591
592 } /* void redis_keyspace_usage */
593
594 static void redis_check_connection(redis_node_t *rn) {
595   if (rn->redisContext)
596     return;
597
598   redisContext *rh;
599   if (rn->socket != NULL)
600     rh = redisConnectUnixWithTimeout(rn->socket, rn->timeout);
601   else
602     rh = redisConnectWithTimeout(rn->host, rn->port, rn->timeout);
603
604   if (rh == NULL) {
605     ERROR("redis plugin: can't allocate redis context");
606     return;
607   }
608   if (rh->err) {
609     if (rn->socket)
610       ERROR("redis plugin: unable to connect to node `%s' (%s): %s.", rn->name,
611             rn->socket, rh->errstr);
612     else
613       ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.",
614             rn->name, rn->host, rn->port, rh->errstr);
615     redisFree(rh);
616     return;
617   }
618
619   rn->redisContext = rh;
620
621   if (rn->passwd) {
622     redisReply *rr;
623
624     DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
625           rn->passwd);
626
627     if ((rr = c_redisCommand(rn, "AUTH %s", rn->passwd)) == NULL) {
628       WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
629       return;
630     }
631
632     if (rr->type != REDIS_REPLY_STATUS) {
633       WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
634       freeReplyObject(rr);
635       redisFree(rn->redisContext);
636       rn->redisContext = NULL;
637       return;
638     }
639
640     freeReplyObject(rr);
641   }
642   return;
643 } /* void redis_check_connection */
644
645 static void redis_read_server_info(redis_node_t *rn) {
646   redisReply *rr;
647
648   if ((rr = c_redisCommand(rn, "INFO")) == NULL) {
649     WARNING("redis plugin: unable to get INFO from node `%s'.", rn->name);
650     return;
651   }
652
653   redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
654                     DS_TYPE_GAUGE);
655   redis_handle_info(rn->name, rr->str, "current_connections", "clients",
656                     "connected_clients", DS_TYPE_GAUGE);
657   redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
658                     "blocked_clients", DS_TYPE_GAUGE);
659   redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
660                     DS_TYPE_GAUGE);
661   redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
662                     DS_TYPE_GAUGE);
663   /* changes_since_last_save: Deprecated in redis version 2.6 and above */
664   redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
665                     "changes_since_last_save", DS_TYPE_GAUGE);
666   redis_handle_info(rn->name, rr->str, "total_connections", NULL,
667                     "total_connections_received", DS_TYPE_DERIVE);
668   redis_handle_info(rn->name, rr->str, "total_operations", NULL,
669                     "total_commands_processed", DS_TYPE_DERIVE);
670   redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
671                     DS_TYPE_DERIVE);
672   redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
673                     DS_TYPE_DERIVE);
674   redis_handle_info(rn->name, rr->str, "pubsub", "channels", "pubsub_channels",
675                     DS_TYPE_GAUGE);
676   redis_handle_info(rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns",
677                     DS_TYPE_GAUGE);
678   redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
679                     "connected_slaves", DS_TYPE_GAUGE);
680   redis_handle_info(rn->name, rr->str, "total_bytes", "input",
681                     "total_net_input_bytes", DS_TYPE_DERIVE);
682   redis_handle_info(rn->name, rr->str, "total_bytes", "output",
683                     "total_net_output_bytes", DS_TYPE_DERIVE);
684
685   redis_keyspace_usage(rn, rr->str);
686
687   redis_db_stats(rn->name, rr->str);
688
689   if (rn->report_cpu_usage)
690     redis_cpu_usage(rn->name, rr->str);
691
692   freeReplyObject(rr);
693 } /* void redis_read_server_info */
694
695 static void redis_read_command_stats(redis_node_t *rn) {
696   redisReply *rr;
697
698   if ((rr = c_redisCommand(rn, "INFO commandstats")) == NULL) {
699     WARNING("redis plugin: node `%s': unable to get `INFO commandstats'.",
700             rn->name);
701     return;
702   }
703
704   if (rr->type != REDIS_REPLY_STRING) {
705     WARNING("redis plugin: node `%s' `INFO commandstats' returned unsupported "
706             "redis type %i.",
707             rn->name, rr->type);
708     freeReplyObject(rr);
709     return;
710   }
711
712   char *command;
713   char *line;
714   char *ptr = rr->str;
715   char *saveptr = NULL;
716   while ((line = strtok_r(ptr, "\n\r", &saveptr)) != NULL) {
717     ptr = NULL;
718
719     if (line[0] == '#')
720       continue;
721
722     /* command name */
723     if (strstr(line, "cmdstat_") != line) {
724       ERROR("redis plugin: not found 'cmdstat_' prefix in line '%s'", line);
725       continue;
726     }
727
728     char *values = strstr(line, ":");
729     if (values == NULL) {
730       ERROR("redis plugin: not found ':' separator in line '%s'", line);
731       continue;
732     }
733
734     /* Null-terminate command token */
735     values[0] = '\0';
736     command = line + strlen("cmdstat_");
737     values++;
738
739     /* parse values */
740     /* cmdstat_publish:calls=20795774,usec=111039258,usec_per_call=5.34 */
741     char *field;
742     char *saveptr_field = NULL;
743     while ((field = strtok_r(values, "=", &saveptr_field)) != NULL) {
744       values = NULL;
745
746       const char *type;
747       /* only these are supported */
748       if (strcmp(field, "calls") == 0)
749         type = "commands";
750       else if (strcmp(field, "usec") == 0)
751         type = "redis_command_cputime";
752       else
753         continue;
754
755       if ((field = strtok_r(NULL, ",", &saveptr_field)) == NULL)
756         continue;
757
758       char *endptr = NULL;
759       errno = 0;
760       derive_t value = strtoll(field, &endptr, 0);
761
762       if ((endptr == field) || (errno != 0))
763         continue;
764
765       redis_submit(rn->name, type, command, (value_t){.derive = value});
766     }
767   }
768   freeReplyObject(rr);
769 } /* void redis_read_command_stats */
770
771 static int redis_read(user_data_t *user_data) /* {{{ */
772 {
773   redis_node_t *rn = user_data->data;
774
775 #if COLLECT_DEBUG
776   if (rn->socket)
777     DEBUG("redis plugin: querying info from node `%s' (%s).", rn->name,
778           rn->socket);
779   else
780     DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
781           rn->host, rn->port);
782 #endif
783
784   redis_check_connection(rn);
785
786   if (!rn->redisContext) /* no connection */
787     return -1;
788
789   redis_read_server_info(rn);
790
791   if (!rn->redisContext) /* connection lost */
792     return -1;
793
794   if (rn->report_command_stats) {
795     redis_read_command_stats(rn);
796
797     if (!rn->redisContext) /* connection lost */
798       return -1;
799   }
800
801   for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
802     redis_handle_query(rn, rq);
803     if (!rn->redisContext) /* connection lost */
804       return -1;
805   }
806
807   return 0;
808 }
809 /* }}} */
810
811 void module_register(void) /* {{{ */
812 {
813   plugin_register_complex_config("redis", redis_config);
814   plugin_register_init("redis", redis_init);
815 }
816 /* }}} */