Merge branch 'collectd-5.5' into collectd-5.6
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "plugin.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #ifndef HOST_NAME_MAX
32 #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
33 #endif
34
35 #define REDIS_DEF_HOST "localhost"
36 #define REDIS_DEF_PASSWD ""
37 #define REDIS_DEF_PORT 6379
38 #define REDIS_DEF_TIMEOUT 2000
39 #define MAX_REDIS_NODE_NAME 64
40 #define MAX_REDIS_PASSWD_LENGTH 512
41 #define MAX_REDIS_VAL_SIZE 256
42 #define MAX_REDIS_QUERY 2048
43
44 /* Redis plugin configuration example:
45  *
46  * <Plugin redis>
47  *   <Node "mynode">
48  *     Host "localhost"
49  *     Port "6379"
50  *     Timeout 2
51  *     Password "foobar"
52  *   </Node>
53  * </Plugin>
54  */
55
56 struct redis_query_s;
57 typedef struct redis_query_s redis_query_t;
58 struct redis_query_s {
59   char query[MAX_REDIS_QUERY];
60   char type[DATA_MAX_NAME_LEN];
61   char instance[DATA_MAX_NAME_LEN];
62   redis_query_t *next;
63 };
64
65 struct redis_node_s;
66 typedef struct redis_node_s redis_node_t;
67 struct redis_node_s {
68   char name[MAX_REDIS_NODE_NAME];
69   char host[HOST_NAME_MAX];
70   char passwd[MAX_REDIS_PASSWD_LENGTH];
71   int port;
72   struct timeval timeout;
73   redis_query_t *queries;
74
75   redis_node_t *next;
76 };
77
78 static redis_node_t *nodes_head = NULL;
79
80 static int redis_node_add(const redis_node_t *rn) /* {{{ */
81 {
82   redis_node_t *rn_copy;
83   redis_node_t *rn_ptr;
84
85   /* Check for duplicates first */
86   for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next)
87     if (strcmp(rn->name, rn_ptr->name) == 0)
88       break;
89
90   if (rn_ptr != NULL) {
91     ERROR("redis plugin: A node with the name `%s' already exists.", rn->name);
92     return (-1);
93   }
94
95   rn_copy = malloc(sizeof(*rn_copy));
96   if (rn_copy == NULL) {
97     ERROR("redis plugin: malloc failed adding redis_node to the tree.");
98     return (-1);
99   }
100
101   memcpy(rn_copy, rn, sizeof(*rn_copy));
102   rn_copy->next = NULL;
103
104   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
105
106   if (nodes_head == NULL)
107     nodes_head = rn_copy;
108   else {
109     rn_ptr = nodes_head;
110     while (rn_ptr->next != NULL)
111       rn_ptr = rn_ptr->next;
112     rn_ptr->next = rn_copy;
113   }
114
115   return (0);
116 } /* }}} */
117
118 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
119 {
120   redis_query_t *rq;
121   int status;
122
123   rq = calloc(1, sizeof(*rq));
124   if (rq == NULL) {
125     ERROR("redis plugin: calloc failed adding redis_query.");
126     return NULL;
127   }
128   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
129   if (status != 0)
130     goto err;
131
132   /*
133    * Default to a gauge type.
134    */
135   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
136   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
137   replace_special(rq->instance, sizeof(rq->instance));
138
139   for (int i = 0; i < ci->children_num; i++) {
140     oconfig_item_t *option = ci->children + i;
141
142     if (strcasecmp("Type", option->key) == 0) {
143       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
144     } else if (strcasecmp("Instance", option->key) == 0) {
145       status =
146           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
147     } else {
148       WARNING("redis plugin: unknown configuration option: %s", option->key);
149       status = -1;
150     }
151     if (status != 0)
152       goto err;
153   }
154   return rq;
155 err:
156   free(rq);
157   return NULL;
158 } /* }}} */
159
160 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
161 {
162   redis_query_t *rq;
163   int status;
164   int timeout;
165
166   redis_node_t rn = {.port = REDIS_DEF_PORT,
167                      .timeout.tv_usec = REDIS_DEF_TIMEOUT};
168
169   sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host));
170
171   status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name));
172   if (status != 0)
173     return (status);
174
175   for (int i = 0; i < ci->children_num; i++) {
176     oconfig_item_t *option = ci->children + i;
177
178     if (strcasecmp("Host", option->key) == 0)
179       status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host));
180     else if (strcasecmp("Port", option->key) == 0) {
181       status = cf_util_get_port_number(option);
182       if (status > 0) {
183         rn.port = status;
184         status = 0;
185       }
186     } else if (strcasecmp("Query", option->key) == 0) {
187       rq = redis_config_query(option);
188       if (rq == NULL) {
189         status = 1;
190       } else {
191         rq->next = rn.queries;
192         rn.queries = rq;
193       }
194     } else if (strcasecmp("Timeout", option->key) == 0) {
195       status = cf_util_get_int(option, &timeout);
196       if (status == 0)
197         rn.timeout.tv_usec = timeout;
198     } else if (strcasecmp("Password", option->key) == 0)
199       status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd));
200     else
201       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
202               "block. I'll ignore this option.",
203               option->key);
204
205     if (status != 0)
206       break;
207   }
208
209   if (status != 0)
210     return (status);
211
212   return (redis_node_add(&rn));
213 } /* }}} int redis_config_node */
214
215 static int redis_config(oconfig_item_t *ci) /* {{{ */
216 {
217   for (int i = 0; i < ci->children_num; i++) {
218     oconfig_item_t *option = ci->children + i;
219
220     if (strcasecmp("Node", option->key) == 0)
221       redis_config_node(option);
222     else
223       WARNING("redis plugin: Option `%s' not allowed in redis"
224               " configuration. It will be ignored.",
225               option->key);
226   }
227
228   if (nodes_head == NULL) {
229     ERROR("redis plugin: No valid node configuration could be found.");
230     return (ENOENT);
231   }
232
233   return (0);
234 } /* }}} */
235
236 __attribute__((nonnull(2))) static void
237 redis_submit(char *plugin_instance, const char *type, const char *type_instance,
238              value_t value) /* {{{ */
239 {
240   value_t values[1];
241   value_list_t vl = VALUE_LIST_INIT;
242
243   values[0] = value;
244
245   vl.values = values;
246   vl.values_len = 1;
247   sstrncpy(vl.host, hostname_g, sizeof(vl.host));
248   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
249   if (plugin_instance != NULL)
250     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
251   sstrncpy(vl.type, type, sizeof(vl.type));
252   if (type_instance != NULL)
253     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
254
255   plugin_dispatch_values(&vl);
256 } /* }}} */
257
258 static int redis_init(void) /* {{{ */
259 {
260   redis_node_t rn = {.name = "default",
261                      .host = REDIS_DEF_HOST,
262                      .port = REDIS_DEF_PORT,
263                      .timeout.tv_sec = 0,
264                      .timeout.tv_usec = REDIS_DEF_TIMEOUT,
265                      .next = NULL};
266
267   if (nodes_head == NULL)
268     redis_node_add(&rn);
269
270   return (0);
271 } /* }}} int redis_init */
272
273 static int redis_handle_info(char *node, char const *info_line,
274                              char const *type, char const *type_instance,
275                              char const *field_name, int ds_type) /* {{{ */
276 {
277   char *str = strstr(info_line, field_name);
278   static char buf[MAX_REDIS_VAL_SIZE];
279   value_t val;
280   if (str) {
281     int i;
282
283     str += strlen(field_name) + 1; /* also skip the ':' */
284     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
285          i++, str++)
286       buf[i] = *str;
287     buf[i] = '\0';
288
289     if (parse_value(buf, &val, ds_type) == -1) {
290       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
291       return (-1);
292     }
293
294     redis_submit(node, type, type_instance, val);
295     return (0);
296   }
297   return (-1);
298
299 } /* }}} int redis_handle_info */
300
301 static int redis_handle_query(redisContext *rh, redis_node_t *rn,
302                               redis_query_t *rq) /* {{{ */
303 {
304   redisReply *rr;
305   const data_set_t *ds;
306   value_t val;
307
308   ds = plugin_get_ds(rq->type);
309   if (!ds) {
310     ERROR("redis plugin: DataSet `%s' not defined.", rq->type);
311     return (-1);
312   }
313
314   if (ds->ds_num != 1) {
315     ERROR("redis plugin: DS `%s' has too many types.", rq->type);
316     return (-1);
317   }
318
319   if ((rr = redisCommand(rh, rq->query)) == NULL) {
320     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
321     return (-1);
322   }
323
324   switch (rr->type) {
325   case REDIS_REPLY_INTEGER:
326     switch (ds->ds[0].type) {
327     case DS_TYPE_COUNTER:
328       val.counter = (counter_t)rr->integer;
329       break;
330     case DS_TYPE_GAUGE:
331       val.gauge = (gauge_t)rr->integer;
332       break;
333     case DS_TYPE_DERIVE:
334       val.gauge = (derive_t)rr->integer;
335       break;
336     case DS_TYPE_ABSOLUTE:
337       val.gauge = (absolute_t)rr->integer;
338       break;
339     }
340     break;
341   case REDIS_REPLY_STRING:
342     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
343       WARNING("redis plugin: Unable to parse field `%s'.", rq->type);
344       freeReplyObject(rr);
345       return (-1);
346     }
347     break;
348   default:
349     WARNING("redis plugin: Cannot coerce redis type.");
350     freeReplyObject(rr);
351     return (-1);
352   }
353
354   redis_submit(rn->name, rq->type,
355                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
356   freeReplyObject(rr);
357   return 0;
358 } /* }}} int redis_handle_query */
359
360 static int redis_read(void) /* {{{ */
361 {
362   for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
363     redisContext *rh;
364     redisReply *rr;
365
366     DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
367           rn->host, rn->port);
368
369     rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
370     if (rh == NULL) {
371       ERROR("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name,
372             rn->host, rn->port);
373       continue;
374     }
375
376     if (strlen(rn->passwd) > 0) {
377       DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
378             rn->passwd);
379
380       if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) {
381         WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
382         goto redis_fail;
383       }
384
385       if (rr->type != REDIS_REPLY_STATUS) {
386         WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
387         goto redis_fail;
388       }
389
390       freeReplyObject(rr);
391     }
392
393     if ((rr = redisCommand(rh, "INFO")) == NULL) {
394       WARNING("redis plugin: unable to get info from node `%s'.", rn->name);
395       goto redis_fail;
396     }
397
398     redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
399                       DS_TYPE_GAUGE);
400     redis_handle_info(rn->name, rr->str, "current_connections", "clients",
401                       "connected_clients", DS_TYPE_GAUGE);
402     redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
403                       "blocked_clients", DS_TYPE_GAUGE);
404     redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
405                       DS_TYPE_GAUGE);
406     redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
407                       DS_TYPE_GAUGE);
408     /* changes_since_last_save: Deprecated in redis version 2.6 and above */
409     redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
410                       "changes_since_last_save", DS_TYPE_GAUGE);
411     redis_handle_info(rn->name, rr->str, "total_connections", NULL,
412                       "total_connections_received", DS_TYPE_DERIVE);
413     redis_handle_info(rn->name, rr->str, "total_operations", NULL,
414                       "total_commands_processed", DS_TYPE_DERIVE);
415     redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
416                       "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
417     redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
418                       DS_TYPE_DERIVE);
419     redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
420                       DS_TYPE_DERIVE);
421     redis_handle_info(rn->name, rr->str, "pubsub", "channels",
422                       "pubsub_channels", DS_TYPE_GAUGE);
423     redis_handle_info(rn->name, rr->str, "pubsub", "patterns",
424                       "pubsub_patterns", DS_TYPE_GAUGE);
425     redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
426                       "connected_slaves", DS_TYPE_GAUGE);
427     redis_handle_info(rn->name, rr->str, "cache_result", "hits",
428                       "keyspace_hits", DS_TYPE_DERIVE);
429     redis_handle_info(rn->name, rr->str, "cache_result", "misses",
430                       "keyspace_misses", DS_TYPE_DERIVE);
431     redis_handle_info(rn->name, rr->str, "total_bytes", "input",
432                       "total_net_input_bytes", DS_TYPE_DERIVE);
433     redis_handle_info(rn->name, rr->str, "total_bytes", "output",
434                       "total_net_output_bytes", DS_TYPE_DERIVE);
435
436     for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next)
437       redis_handle_query(rh, rn, rq);
438
439   redis_fail:
440     if (rr != NULL)
441       freeReplyObject(rr);
442     redisFree(rh);
443   }
444
445   return 0;
446 }
447 /* }}} */
448
449 void module_register(void) /* {{{ */
450 {
451   plugin_register_complex_config("redis", redis_config);
452   plugin_register_init("redis", redis_init);
453   plugin_register_read("redis", redis_read);
454   /* TODO: plugin_register_write: one redis list per value id with
455    * X elements */
456 }
457 /* }}} */
458
459 /* vim: set sw=2 sts=2 et fdm=marker : */