Merge remote-tracking branch 'github/pr/2059'
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "plugin.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #ifndef HOST_NAME_MAX
32 #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
33 #endif
34
35 #define REDIS_DEF_HOST "localhost"
36 #define REDIS_DEF_PASSWD ""
37 #define REDIS_DEF_PORT 6379
38 #define REDIS_DEF_TIMEOUT 2000
39 #define MAX_REDIS_NODE_NAME 64
40 #define MAX_REDIS_PASSWD_LENGTH 512
41 #define MAX_REDIS_VAL_SIZE 256
42 #define MAX_REDIS_QUERY 2048
43
44 /* Redis plugin configuration example:
45  *
46  * <Plugin redis>
47  *   <Node "mynode">
48  *     Host "localhost"
49  *     Port "6379"
50  *     Timeout 2
51  *     Password "foobar"
52  *   </Node>
53  * </Plugin>
54  */
55
56 struct redis_query_s;
57 typedef struct redis_query_s redis_query_t;
58 struct redis_query_s {
59   char query[MAX_REDIS_QUERY];
60   char type[DATA_MAX_NAME_LEN];
61   char instance[DATA_MAX_NAME_LEN];
62   redis_query_t *next;
63 };
64
65 struct redis_node_s;
66 typedef struct redis_node_s redis_node_t;
67 struct redis_node_s {
68   char name[MAX_REDIS_NODE_NAME];
69   char host[HOST_NAME_MAX];
70   char passwd[MAX_REDIS_PASSWD_LENGTH];
71   int port;
72   struct timeval timeout;
73   redis_query_t *queries;
74
75   redis_node_t *next;
76 };
77
78 static redis_node_t *nodes_head = NULL;
79
80 static int redis_node_add(const redis_node_t *rn) /* {{{ */
81 {
82   redis_node_t *rn_copy;
83   redis_node_t *rn_ptr;
84
85   /* Check for duplicates first */
86   for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next)
87     if (strcmp(rn->name, rn_ptr->name) == 0)
88       break;
89
90   if (rn_ptr != NULL) {
91     ERROR("redis plugin: A node with the name `%s' already exists.", rn->name);
92     return (-1);
93   }
94
95   rn_copy = malloc(sizeof(*rn_copy));
96   if (rn_copy == NULL) {
97     ERROR("redis plugin: malloc failed adding redis_node to the tree.");
98     return (-1);
99   }
100
101   memcpy(rn_copy, rn, sizeof(*rn_copy));
102   rn_copy->next = NULL;
103
104   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
105
106   if (nodes_head == NULL)
107     nodes_head = rn_copy;
108   else {
109     rn_ptr = nodes_head;
110     while (rn_ptr->next != NULL)
111       rn_ptr = rn_ptr->next;
112     rn_ptr->next = rn_copy;
113   }
114
115   return (0);
116 } /* }}} */
117
118 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
119 {
120   redis_query_t *rq;
121   int status;
122
123   rq = calloc(1, sizeof(*rq));
124   if (rq == NULL) {
125     ERROR("redis plugin: calloc failed adding redis_query.");
126     return NULL;
127   }
128   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
129   if (status != 0)
130     goto err;
131
132   /*
133    * Default to a gauge type.
134    */
135   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
136   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
137   replace_special(rq->instance, sizeof(rq->instance));
138
139   for (int i = 0; i < ci->children_num; i++) {
140     oconfig_item_t *option = ci->children + i;
141
142     if (strcasecmp("Type", option->key) == 0) {
143       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
144     } else if (strcasecmp("Instance", option->key) == 0) {
145       status =
146           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
147     } else {
148       WARNING("redis plugin: unknown configuration option: %s", option->key);
149       status = -1;
150     }
151     if (status != 0)
152       goto err;
153   }
154   return rq;
155 err:
156   free(rq);
157   return NULL;
158 } /* }}} */
159
160 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
161 {
162   redis_query_t *rq;
163   int status;
164   int timeout;
165
166   redis_node_t rn = {.port = REDIS_DEF_PORT,
167                      .timeout.tv_usec = REDIS_DEF_TIMEOUT};
168
169   sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host));
170
171   status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name));
172   if (status != 0)
173     return (status);
174
175   for (int i = 0; i < ci->children_num; i++) {
176     oconfig_item_t *option = ci->children + i;
177
178     if (strcasecmp("Host", option->key) == 0)
179       status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host));
180     else if (strcasecmp("Port", option->key) == 0) {
181       status = cf_util_get_port_number(option);
182       if (status > 0) {
183         rn.port = status;
184         status = 0;
185       }
186     } else if (strcasecmp("Query", option->key) == 0) {
187       rq = redis_config_query(option);
188       if (rq == NULL) {
189         status = 1;
190       } else {
191         rq->next = rn.queries;
192         rn.queries = rq;
193       }
194     } else if (strcasecmp("Timeout", option->key) == 0) {
195       status = cf_util_get_int(option, &timeout);
196       if (status == 0)
197         rn.timeout.tv_usec = timeout;
198     } else if (strcasecmp("Password", option->key) == 0)
199       status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd));
200     else
201       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
202               "block. I'll ignore this option.",
203               option->key);
204
205     if (status != 0)
206       break;
207   }
208
209   if (status != 0)
210     return (status);
211
212   return (redis_node_add(&rn));
213 } /* }}} int redis_config_node */
214
215 static int redis_config(oconfig_item_t *ci) /* {{{ */
216 {
217   for (int i = 0; i < ci->children_num; i++) {
218     oconfig_item_t *option = ci->children + i;
219
220     if (strcasecmp("Node", option->key) == 0)
221       redis_config_node(option);
222     else
223       WARNING("redis plugin: Option `%s' not allowed in redis"
224               " configuration. It will be ignored.",
225               option->key);
226   }
227
228   if (nodes_head == NULL) {
229     ERROR("redis plugin: No valid node configuration could be found.");
230     return (ENOENT);
231   }
232
233   return (0);
234 } /* }}} */
235
236 __attribute__((nonnull(2))) static void
237 redis_submit(char *plugin_instance, const char *type, const char *type_instance,
238              value_t value) /* {{{ */
239 {
240   value_list_t vl = VALUE_LIST_INIT;
241
242   vl.values = &value;
243   vl.values_len = 1;
244   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
245   if (plugin_instance != NULL)
246     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
247   sstrncpy(vl.type, type, sizeof(vl.type));
248   if (type_instance != NULL)
249     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
250
251   plugin_dispatch_values(&vl);
252 } /* }}} */
253
254 static int redis_init(void) /* {{{ */
255 {
256   redis_node_t rn = {.name = "default",
257                      .host = REDIS_DEF_HOST,
258                      .port = REDIS_DEF_PORT,
259                      .timeout.tv_sec = 0,
260                      .timeout.tv_usec = REDIS_DEF_TIMEOUT,
261                      .next = NULL};
262
263   if (nodes_head == NULL)
264     redis_node_add(&rn);
265
266   return (0);
267 } /* }}} int redis_init */
268
269 static int redis_handle_info(char *node, char const *info_line,
270                              char const *type, char const *type_instance,
271                              char const *field_name, int ds_type) /* {{{ */
272 {
273   char *str = strstr(info_line, field_name);
274   static char buf[MAX_REDIS_VAL_SIZE];
275   value_t val;
276   if (str) {
277     int i;
278
279     str += strlen(field_name) + 1; /* also skip the ':' */
280     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
281          i++, str++)
282       buf[i] = *str;
283     buf[i] = '\0';
284
285     if (parse_value(buf, &val, ds_type) == -1) {
286       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
287       return (-1);
288     }
289
290     redis_submit(node, type, type_instance, val);
291     return (0);
292   }
293   return (-1);
294
295 } /* }}} int redis_handle_info */
296
297 static int redis_handle_query(redisContext *rh, redis_node_t *rn,
298                               redis_query_t *rq) /* {{{ */
299 {
300   redisReply *rr;
301   const data_set_t *ds;
302   value_t val;
303
304   ds = plugin_get_ds(rq->type);
305   if (!ds) {
306     ERROR("redis plugin: DataSet `%s' not defined.", rq->type);
307     return (-1);
308   }
309
310   if (ds->ds_num != 1) {
311     ERROR("redis plugin: DS `%s' has too many types.", rq->type);
312     return (-1);
313   }
314
315   if ((rr = redisCommand(rh, rq->query)) == NULL) {
316     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
317     return (-1);
318   }
319
320   switch (rr->type) {
321   case REDIS_REPLY_INTEGER:
322     switch (ds->ds[0].type) {
323     case DS_TYPE_COUNTER:
324       val.counter = (counter_t)rr->integer;
325       break;
326     case DS_TYPE_GAUGE:
327       val.gauge = (gauge_t)rr->integer;
328       break;
329     case DS_TYPE_DERIVE:
330       val.gauge = (derive_t)rr->integer;
331       break;
332     case DS_TYPE_ABSOLUTE:
333       val.gauge = (absolute_t)rr->integer;
334       break;
335     }
336     break;
337   case REDIS_REPLY_STRING:
338     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
339       WARNING("redis plugin: Unable to parse field `%s'.", rq->type);
340       freeReplyObject(rr);
341       return (-1);
342     }
343     break;
344   default:
345     WARNING("redis plugin: Cannot coerce redis type.");
346     freeReplyObject(rr);
347     return (-1);
348   }
349
350   redis_submit(rn->name, rq->type,
351                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
352   freeReplyObject(rr);
353   return 0;
354 } /* }}} int redis_handle_query */
355
356 static int redis_read(void) /* {{{ */
357 {
358   for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
359     redisContext *rh;
360     redisReply *rr;
361
362     DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
363           rn->host, rn->port);
364
365     rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
366     if (rh == NULL) {
367       ERROR("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name,
368             rn->host, rn->port);
369       continue;
370     }
371
372     if (strlen(rn->passwd) > 0) {
373       DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
374             rn->passwd);
375
376       if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) {
377         WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
378         goto redis_fail;
379       }
380
381       if (rr->type != REDIS_REPLY_STATUS) {
382         WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
383         goto redis_fail;
384       }
385
386       freeReplyObject(rr);
387     }
388
389     if ((rr = redisCommand(rh, "INFO")) == NULL) {
390       WARNING("redis plugin: unable to get info from node `%s'.", rn->name);
391       goto redis_fail;
392     }
393
394     redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
395                       DS_TYPE_GAUGE);
396     redis_handle_info(rn->name, rr->str, "current_connections", "clients",
397                       "connected_clients", DS_TYPE_GAUGE);
398     redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
399                       "blocked_clients", DS_TYPE_GAUGE);
400     redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
401                       DS_TYPE_GAUGE);
402     redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
403                       DS_TYPE_GAUGE);
404     /* changes_since_last_save: Deprecated in redis version 2.6 and above */
405     redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
406                       "changes_since_last_save", DS_TYPE_GAUGE);
407     redis_handle_info(rn->name, rr->str, "total_connections", NULL,
408                       "total_connections_received", DS_TYPE_DERIVE);
409     redis_handle_info(rn->name, rr->str, "total_operations", NULL,
410                       "total_commands_processed", DS_TYPE_DERIVE);
411     redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
412                       "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
413     redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
414                       DS_TYPE_DERIVE);
415     redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
416                       DS_TYPE_DERIVE);
417     redis_handle_info(rn->name, rr->str, "pubsub", "channels",
418                       "pubsub_channels", DS_TYPE_GAUGE);
419     redis_handle_info(rn->name, rr->str, "pubsub", "patterns",
420                       "pubsub_patterns", DS_TYPE_GAUGE);
421     redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
422                       "connected_slaves", DS_TYPE_GAUGE);
423     redis_handle_info(rn->name, rr->str, "cache_result", "hits",
424                       "keyspace_hits", DS_TYPE_DERIVE);
425     redis_handle_info(rn->name, rr->str, "cache_result", "misses",
426                       "keyspace_misses", DS_TYPE_DERIVE);
427     redis_handle_info(rn->name, rr->str, "total_bytes", "input",
428                       "total_net_input_bytes", DS_TYPE_DERIVE);
429     redis_handle_info(rn->name, rr->str, "total_bytes", "output",
430                       "total_net_output_bytes", DS_TYPE_DERIVE);
431
432     for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next)
433       redis_handle_query(rh, rn, rq);
434
435   redis_fail:
436     if (rr != NULL)
437       freeReplyObject(rr);
438     redisFree(rh);
439   }
440
441   return 0;
442 }
443 /* }}} */
444
445 void module_register(void) /* {{{ */
446 {
447   plugin_register_complex_config("redis", redis_config);
448   plugin_register_init("redis", redis_init);
449   plugin_register_read("redis", redis_read);
450   /* TODO: plugin_register_write: one redis list per value id with
451    * X elements */
452 }
453 /* }}} */