redis plugin: Implemented persistent connections
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "plugin.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #ifndef HOST_NAME_MAX
32 #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
33 #endif
34
35 #define REDIS_DEF_HOST "localhost"
36 #define REDIS_DEF_PASSWD ""
37 #define REDIS_DEF_PORT 6379
38 #define REDIS_DEF_TIMEOUT_SEC 2
39 #define REDIS_DEF_DB_COUNT 256
40 #define MAX_REDIS_NODE_NAME 64
41 #define MAX_REDIS_PASSWD_LENGTH 512
42 #define MAX_REDIS_VAL_SIZE 256
43 #define MAX_REDIS_QUERY 2048
44
45 /* Redis plugin configuration example:
46  *
47  * <Plugin redis>
48  *   <Node "mynode">
49  *     Host "localhost"
50  *     Port "6379"
51  *     Timeout 2
52  *     Password "foobar"
53  *   </Node>
54  * </Plugin>
55  */
56
57 struct redis_query_s;
58 typedef struct redis_query_s redis_query_t;
59 struct redis_query_s {
60   char query[MAX_REDIS_QUERY];
61   char type[DATA_MAX_NAME_LEN];
62   char instance[DATA_MAX_NAME_LEN];
63   int database;
64
65   redis_query_t *next;
66 };
67
68 struct redis_node_s;
69 typedef struct redis_node_s redis_node_t;
70 struct redis_node_s {
71   char name[MAX_REDIS_NODE_NAME];
72   char host[HOST_NAME_MAX];
73   char passwd[MAX_REDIS_PASSWD_LENGTH];
74   int port;
75   struct timeval timeout;
76   redisContext *redisContext;
77   redis_query_t *queries;
78
79   redis_node_t *next;
80 };
81
82 static redis_node_t *nodes_head;
83
84 static int redis_node_add(const redis_node_t *rn) /* {{{ */
85 {
86   redis_node_t *rn_copy;
87   redis_node_t *rn_ptr;
88
89   /* Check for duplicates first */
90   for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next)
91     if (strcmp(rn->name, rn_ptr->name) == 0)
92       break;
93
94   if (rn_ptr != NULL) {
95     ERROR("redis plugin: A node with the name `%s' already exists.", rn->name);
96     return -1;
97   }
98
99   rn_copy = malloc(sizeof(*rn_copy));
100   if (rn_copy == NULL) {
101     ERROR("redis plugin: malloc failed adding redis_node to the tree.");
102     return -1;
103   }
104
105   memcpy(rn_copy, rn, sizeof(*rn_copy));
106   rn_copy->next = NULL;
107
108   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
109
110   if (nodes_head == NULL)
111     nodes_head = rn_copy;
112   else {
113     rn_ptr = nodes_head;
114     while (rn_ptr->next != NULL)
115       rn_ptr = rn_ptr->next;
116     rn_ptr->next = rn_copy;
117   }
118
119   return 0;
120 } /* }}} */
121
122 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
123 {
124   redis_query_t *rq;
125   int status;
126
127   rq = calloc(1, sizeof(*rq));
128   if (rq == NULL) {
129     ERROR("redis plugin: calloc failed adding redis_query.");
130     return NULL;
131   }
132   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
133   if (status != 0)
134     goto err;
135
136   /*
137    * Default to a gauge type.
138    */
139   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
140   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
141   replace_special(rq->instance, sizeof(rq->instance));
142
143   rq->database = 0;
144
145   for (int i = 0; i < ci->children_num; i++) {
146     oconfig_item_t *option = ci->children + i;
147
148     if (strcasecmp("Type", option->key) == 0) {
149       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
150     } else if (strcasecmp("Instance", option->key) == 0) {
151       status =
152           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
153     } else if (strcasecmp("Database", option->key) == 0) {
154       status = cf_util_get_int(option, &rq->database);
155       if (rq->database < 0) {
156         WARNING("redis plugin: The \"Database\" option must be positive "
157                 "integer or zero");
158         status = -1;
159       }
160     } else {
161       WARNING("redis plugin: unknown configuration option: %s", option->key);
162       status = -1;
163     }
164     if (status != 0)
165       goto err;
166   }
167   return rq;
168 err:
169   free(rq);
170   return NULL;
171 } /* }}} */
172
173 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
174 {
175   redis_query_t *rq;
176   int status;
177   int timeout;
178
179   redis_node_t rn = {.port = REDIS_DEF_PORT,
180                      .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC};
181
182   sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host));
183
184   status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name));
185   if (status != 0)
186     return status;
187
188   for (int i = 0; i < ci->children_num; i++) {
189     oconfig_item_t *option = ci->children + i;
190
191     if (strcasecmp("Host", option->key) == 0)
192       status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host));
193     else if (strcasecmp("Port", option->key) == 0) {
194       status = cf_util_get_port_number(option);
195       if (status > 0) {
196         rn.port = status;
197         status = 0;
198       }
199     } else if (strcasecmp("Query", option->key) == 0) {
200       rq = redis_config_query(option);
201       if (rq == NULL) {
202         status = 1;
203       } else {
204         rq->next = rn.queries;
205         rn.queries = rq;
206       }
207     } else if (strcasecmp("Timeout", option->key) == 0) {
208       status = cf_util_get_int(option, &timeout);
209       if (status == 0) {
210         rn.timeout.tv_usec = timeout * 1000;
211         rn.timeout.tv_sec = rn.timeout.tv_usec / 1000000L;
212         rn.timeout.tv_usec %= 1000000L;
213       }
214     } else if (strcasecmp("Password", option->key) == 0)
215       status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd));
216     else
217       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
218               "block. I'll ignore this option.",
219               option->key);
220
221     if (status != 0)
222       break;
223   }
224
225   if (status != 0)
226     return status;
227
228   return redis_node_add(&rn);
229 } /* }}} int redis_config_node */
230
231 static int redis_config(oconfig_item_t *ci) /* {{{ */
232 {
233   for (int i = 0; i < ci->children_num; i++) {
234     oconfig_item_t *option = ci->children + i;
235
236     if (strcasecmp("Node", option->key) == 0)
237       redis_config_node(option);
238     else
239       WARNING("redis plugin: Option `%s' not allowed in redis"
240               " configuration. It will be ignored.",
241               option->key);
242   }
243
244   if (nodes_head == NULL) {
245     ERROR("redis plugin: No valid node configuration could be found.");
246     return ENOENT;
247   }
248
249   return 0;
250 } /* }}} */
251
252 __attribute__((nonnull(2))) static void
253 redis_submit(char *plugin_instance, const char *type, const char *type_instance,
254              value_t value) /* {{{ */
255 {
256   value_list_t vl = VALUE_LIST_INIT;
257
258   vl.values = &value;
259   vl.values_len = 1;
260   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
261   if (plugin_instance != NULL)
262     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
263   sstrncpy(vl.type, type, sizeof(vl.type));
264   if (type_instance != NULL)
265     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
266
267   plugin_dispatch_values(&vl);
268 } /* }}} */
269
270 static int redis_init(void) /* {{{ */
271 {
272   redis_node_t rn = {.name = "default",
273                      .host = REDIS_DEF_HOST,
274                      .port = REDIS_DEF_PORT,
275                      .timeout.tv_sec = REDIS_DEF_TIMEOUT_SEC,
276                      .next = NULL};
277
278   if (nodes_head == NULL)
279     redis_node_add(&rn);
280
281   return 0;
282 } /* }}} int redis_init */
283
284 static void *c_redisCommand(redis_node_t *rn, const char *format, ...) {
285   redisContext *c = rn->redisContext;
286
287   if (c == NULL)
288     return NULL;
289
290   va_list ap;
291   va_start(ap, format);
292   void *reply = redisvCommand(c, format, ap);
293   va_end(ap);
294
295   if (reply == NULL) {
296     ERROR("redis plugin: Connection error: %s", c->errstr);
297     redisFree(rn->redisContext);
298     rn->redisContext = NULL;
299   }
300
301   return reply;
302 } /* void c_redisCommand */
303
304 static int redis_handle_info(char *node, char const *info_line,
305                              char const *type, char const *type_instance,
306                              char const *field_name, int ds_type) /* {{{ */
307 {
308   char *str = strstr(info_line, field_name);
309   static char buf[MAX_REDIS_VAL_SIZE];
310   value_t val;
311   if (str) {
312     int i;
313
314     str += strlen(field_name) + 1; /* also skip the ':' */
315     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
316          i++, str++)
317       buf[i] = *str;
318     buf[i] = '\0';
319
320     if (parse_value(buf, &val, ds_type) == -1) {
321       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
322       return -1;
323     }
324
325     redis_submit(node, type, type_instance, val);
326     return 0;
327   }
328   return -1;
329
330 } /* }}} int redis_handle_info */
331
332 static int redis_handle_query(redis_node_t *rn, redis_query_t *rq) /* {{{ */
333 {
334   redisReply *rr;
335   const data_set_t *ds;
336   value_t val;
337
338   ds = plugin_get_ds(rq->type);
339   if (!ds) {
340     ERROR("redis plugin: DS type `%s' not defined.", rq->type);
341     return -1;
342   }
343
344   if (ds->ds_num != 1) {
345     ERROR("redis plugin: DS type `%s' has too many datasources. This is not "
346           "supported currently.",
347           rq->type);
348     return -1;
349   }
350
351   if ((rr = c_redisCommand(rn, "SELECT %d", rq->database)) == NULL) {
352     WARNING("redis plugin: unable to switch to database `%d' on node `%s'.",
353             rq->database, rn->name);
354     return -1;
355   }
356
357   if ((rr = c_redisCommand(rn, rq->query)) == NULL) {
358     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
359     return -1;
360   }
361
362   switch (rr->type) {
363   case REDIS_REPLY_INTEGER:
364     switch (ds->ds[0].type) {
365     case DS_TYPE_COUNTER:
366       val.counter = (counter_t)rr->integer;
367       break;
368     case DS_TYPE_GAUGE:
369       val.gauge = (gauge_t)rr->integer;
370       break;
371     case DS_TYPE_DERIVE:
372       val.gauge = (derive_t)rr->integer;
373       break;
374     case DS_TYPE_ABSOLUTE:
375       val.gauge = (absolute_t)rr->integer;
376       break;
377     }
378     break;
379   case REDIS_REPLY_STRING:
380     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
381       WARNING("redis plugin: Query `%s': Unable to parse value.", rq->query);
382       freeReplyObject(rr);
383       return -1;
384     }
385     break;
386   case REDIS_REPLY_ERROR:
387     WARNING("redis plugin: Query `%s' failed: %s.", rq->query, rr->str);
388     freeReplyObject(rr);
389     return -1;
390   case REDIS_REPLY_ARRAY:
391     WARNING("redis plugin: Query `%s' should return string or integer. Arrays "
392             "are not supported.",
393             rq->query);
394     freeReplyObject(rr);
395     return -1;
396   default:
397     WARNING("redis plugin: Query `%s': Cannot coerce redis type (%i).",
398             rq->query, rr->type);
399     freeReplyObject(rr);
400     return -1;
401   }
402
403   redis_submit(rn->name, rq->type,
404                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
405   freeReplyObject(rr);
406   return 0;
407 } /* }}} int redis_handle_query */
408
409 static int redis_db_stats(char *node, char const *info_line) /* {{{ */
410 {
411   /* redis_db_stats parses and dispatches Redis database statistics,
412    * currently the number of keys for each database.
413    * info_line needs to have the following format:
414    *   db0:keys=4,expires=0,avg_ttl=0
415    */
416
417   for (int db = 0; db < REDIS_DEF_DB_COUNT; db++) {
418     static char buf[MAX_REDIS_VAL_SIZE];
419     static char field_name[12];
420     static char db_id[4];
421     value_t val;
422     char *str;
423     int i;
424
425     snprintf(field_name, sizeof(field_name), "db%d:keys=", db);
426
427     str = strstr(info_line, field_name);
428     if (!str)
429       continue;
430
431     str += strlen(field_name);
432     for (i = 0; (*str && isdigit((int)*str)); i++, str++)
433       buf[i] = *str;
434     buf[i] = '\0';
435
436     if (parse_value(buf, &val, DS_TYPE_GAUGE) != 0) {
437       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
438       return -1;
439     }
440
441     snprintf(db_id, sizeof(db_id), "%d", db);
442     redis_submit(node, "records", db_id, val);
443   }
444   return 0;
445
446 } /* }}} int redis_db_stats */
447
448 static void redis_check_connection(redis_node_t *rn) {
449   if (rn->redisContext)
450     return;
451
452   redisContext *rh =
453       redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
454
455   if (rh == NULL) {
456     ERROR("redis plugin: can't allocate redis context");
457     return;
458   }
459   if (rh->err) {
460     ERROR("redis plugin: unable to connect to node `%s' (%s:%d): %s.", rn->name,
461           rn->host, rn->port, rh->errstr);
462     redisFree(rh);
463     return;
464   }
465
466   rn->redisContext = rh;
467
468   if (strlen(rn->passwd) > 0) {
469     redisReply *rr;
470
471     DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
472           rn->passwd);
473
474     if ((rr = c_redisCommand(rn, "AUTH %s", rn->passwd)) == NULL) {
475       WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
476       return;
477     }
478
479     if (rr->type != REDIS_REPLY_STATUS) {
480       WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
481       freeReplyObject(rr);
482       redisFree(rn->redisContext);
483       rn->redisContext = NULL;
484       return;
485     }
486
487     freeReplyObject(rr);
488   }
489   return;
490 } /* void redis_check_connection */
491
492 static void redis_read_server_info(redis_node_t *rn) {
493   redisReply *rr;
494
495   if ((rr = c_redisCommand(rn, "INFO")) == NULL) {
496     WARNING("redis plugin: unable to get INFO from node `%s'.", rn->name);
497     return;
498   }
499
500   redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
501                     DS_TYPE_GAUGE);
502   redis_handle_info(rn->name, rr->str, "current_connections", "clients",
503                     "connected_clients", DS_TYPE_GAUGE);
504   redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
505                     "blocked_clients", DS_TYPE_GAUGE);
506   redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
507                     DS_TYPE_GAUGE);
508   redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
509                     DS_TYPE_GAUGE);
510   /* changes_since_last_save: Deprecated in redis version 2.6 and above */
511   redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
512                     "changes_since_last_save", DS_TYPE_GAUGE);
513   redis_handle_info(rn->name, rr->str, "total_connections", NULL,
514                     "total_connections_received", DS_TYPE_DERIVE);
515   redis_handle_info(rn->name, rr->str, "total_operations", NULL,
516                     "total_commands_processed", DS_TYPE_DERIVE);
517   redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
518                     "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
519   redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
520                     DS_TYPE_DERIVE);
521   redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
522                     DS_TYPE_DERIVE);
523   redis_handle_info(rn->name, rr->str, "pubsub", "channels", "pubsub_channels",
524                     DS_TYPE_GAUGE);
525   redis_handle_info(rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns",
526                     DS_TYPE_GAUGE);
527   redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
528                     "connected_slaves", DS_TYPE_GAUGE);
529   redis_handle_info(rn->name, rr->str, "cache_result", "hits", "keyspace_hits",
530                     DS_TYPE_DERIVE);
531   redis_handle_info(rn->name, rr->str, "cache_result", "misses",
532                     "keyspace_misses", DS_TYPE_DERIVE);
533   redis_handle_info(rn->name, rr->str, "total_bytes", "input",
534                     "total_net_input_bytes", DS_TYPE_DERIVE);
535   redis_handle_info(rn->name, rr->str, "total_bytes", "output",
536                     "total_net_output_bytes", DS_TYPE_DERIVE);
537
538   redis_db_stats(rn->name, rr->str);
539
540   freeReplyObject(rr);
541 } /* void redis_read_server_info */
542
543 static int redis_read(void) /* {{{ */
544 {
545   for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
546     DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
547           rn->host, rn->port);
548
549     redis_check_connection(rn);
550
551     if (!rn->redisContext) /* no connection */
552       continue;
553
554     redis_read_server_info(rn);
555
556     if (!rn->redisContext) /* connection lost */
557       continue;
558
559     for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next) {
560       redis_handle_query(rn, rq);
561       if (!rn->redisContext) /* connection lost */
562         break;
563     }
564   }
565
566   return 0;
567 }
568 /* }}} */
569
570 void module_register(void) /* {{{ */
571 {
572   plugin_register_complex_config("redis", redis_config);
573   plugin_register_init("redis", redis_init);
574   plugin_register_read("redis", redis_read);
575   /* TODO: plugin_register_write: one redis list per value id with
576    * X elements */
577 }
578 /* }}} */