redis: removing sprintf() calls
[collectd.git] / src / redis.c
1 /**
2  * collectd - src/redis.c, based on src/memcached.c
3  * Copyright (C) 2010       Andrés J. Díaz <ajdiaz@connectical.com>
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; either version 2 of the License, or (at your
8  * option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
18  *
19  * Authors:
20  *   Andrés J. Díaz <ajdiaz@connectical.com>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "plugin.h"
27
28 #include <hiredis/hiredis.h>
29 #include <sys/time.h>
30
31 #ifndef HOST_NAME_MAX
32 #define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
33 #endif
34
35 #define REDIS_DEF_HOST "localhost"
36 #define REDIS_DEF_PASSWD ""
37 #define REDIS_DEF_PORT 6379
38 #define REDIS_DEF_TIMEOUT 2000
39 #define MAX_REDIS_NODE_NAME 64
40 #define MAX_REDIS_PASSWD_LENGTH 512
41 #define MAX_REDIS_VAL_SIZE 256
42 #define MAX_REDIS_QUERY 2048
43
44 /* Redis plugin configuration example:
45  *
46  * <Plugin redis>
47  *   <Node "mynode">
48  *     Host "localhost"
49  *     Port "6379"
50  *     Timeout 2
51  *     Password "foobar"
52  *   </Node>
53  * </Plugin>
54  */
55
56 struct redis_query_s;
57 typedef struct redis_query_s redis_query_t;
58 struct redis_query_s {
59   char query[MAX_REDIS_QUERY];
60   char type[DATA_MAX_NAME_LEN];
61   char instance[DATA_MAX_NAME_LEN];
62   redis_query_t *next;
63 };
64
65 struct redis_node_s;
66 typedef struct redis_node_s redis_node_t;
67 struct redis_node_s {
68   char name[MAX_REDIS_NODE_NAME];
69   char host[HOST_NAME_MAX];
70   char passwd[MAX_REDIS_PASSWD_LENGTH];
71   int port;
72   struct timeval timeout;
73   redis_query_t *queries;
74
75   redis_node_t *next;
76 };
77
78 static redis_node_t *nodes_head = NULL;
79
80 static int redis_node_add(const redis_node_t *rn) /* {{{ */
81 {
82   redis_node_t *rn_copy;
83   redis_node_t *rn_ptr;
84
85   /* Check for duplicates first */
86   for (rn_ptr = nodes_head; rn_ptr != NULL; rn_ptr = rn_ptr->next)
87     if (strcmp(rn->name, rn_ptr->name) == 0)
88       break;
89
90   if (rn_ptr != NULL) {
91     ERROR("redis plugin: A node with the name `%s' already exists.", rn->name);
92     return (-1);
93   }
94
95   rn_copy = malloc(sizeof(*rn_copy));
96   if (rn_copy == NULL) {
97     ERROR("redis plugin: malloc failed adding redis_node to the tree.");
98     return (-1);
99   }
100
101   memcpy(rn_copy, rn, sizeof(*rn_copy));
102   rn_copy->next = NULL;
103
104   DEBUG("redis plugin: Adding node \"%s\".", rn->name);
105
106   if (nodes_head == NULL)
107     nodes_head = rn_copy;
108   else {
109     rn_ptr = nodes_head;
110     while (rn_ptr->next != NULL)
111       rn_ptr = rn_ptr->next;
112     rn_ptr->next = rn_copy;
113   }
114
115   return (0);
116 } /* }}} */
117
118 static redis_query_t *redis_config_query(oconfig_item_t *ci) /* {{{ */
119 {
120   redis_query_t *rq;
121   int status;
122
123   rq = calloc(1, sizeof(*rq));
124   if (rq == NULL) {
125     ERROR("redis plugin: calloc failed adding redis_query.");
126     return NULL;
127   }
128   status = cf_util_get_string_buffer(ci, rq->query, sizeof(rq->query));
129   if (status != 0)
130     goto err;
131
132   /*
133    * Default to a gauge type.
134    */
135   (void)strncpy(rq->type, "gauge", sizeof(rq->type));
136   (void)sstrncpy(rq->instance, rq->query, sizeof(rq->instance));
137   replace_special(rq->instance, sizeof(rq->instance));
138
139   for (int i = 0; i < ci->children_num; i++) {
140     oconfig_item_t *option = ci->children + i;
141
142     if (strcasecmp("Type", option->key) == 0) {
143       status = cf_util_get_string_buffer(option, rq->type, sizeof(rq->type));
144     } else if (strcasecmp("Instance", option->key) == 0) {
145       status =
146           cf_util_get_string_buffer(option, rq->instance, sizeof(rq->instance));
147     } else {
148       WARNING("redis plugin: unknown configuration option: %s", option->key);
149       status = -1;
150     }
151     if (status != 0)
152       goto err;
153   }
154   return rq;
155 err:
156   free(rq);
157   return NULL;
158 } /* }}} */
159
160 static int redis_config_node(oconfig_item_t *ci) /* {{{ */
161 {
162   redis_query_t *rq;
163   int status;
164   int timeout;
165
166   redis_node_t rn = {.port = REDIS_DEF_PORT,
167                      .timeout.tv_usec = REDIS_DEF_TIMEOUT};
168
169   sstrncpy(rn.host, REDIS_DEF_HOST, sizeof(rn.host));
170
171   status = cf_util_get_string_buffer(ci, rn.name, sizeof(rn.name));
172   if (status != 0)
173     return (status);
174
175   for (int i = 0; i < ci->children_num; i++) {
176     oconfig_item_t *option = ci->children + i;
177
178     if (strcasecmp("Host", option->key) == 0)
179       status = cf_util_get_string_buffer(option, rn.host, sizeof(rn.host));
180     else if (strcasecmp("Port", option->key) == 0) {
181       status = cf_util_get_port_number(option);
182       if (status > 0) {
183         rn.port = status;
184         status = 0;
185       }
186     } else if (strcasecmp("Query", option->key) == 0) {
187       rq = redis_config_query(option);
188       if (rq == NULL) {
189         status = 1;
190       } else {
191         rq->next = rn.queries;
192         rn.queries = rq;
193       }
194     } else if (strcasecmp("Timeout", option->key) == 0) {
195       status = cf_util_get_int(option, &timeout);
196       if (status == 0)
197         rn.timeout.tv_usec = timeout;
198     } else if (strcasecmp("Password", option->key) == 0)
199       status = cf_util_get_string_buffer(option, rn.passwd, sizeof(rn.passwd));
200     else
201       WARNING("redis plugin: Option `%s' not allowed inside a `Node' "
202               "block. I'll ignore this option.",
203               option->key);
204
205     if (status != 0)
206       break;
207   }
208
209   if (status != 0)
210     return (status);
211
212   return (redis_node_add(&rn));
213 } /* }}} int redis_config_node */
214
215 static int redis_config(oconfig_item_t *ci) /* {{{ */
216 {
217   for (int i = 0; i < ci->children_num; i++) {
218     oconfig_item_t *option = ci->children + i;
219
220     if (strcasecmp("Node", option->key) == 0)
221       redis_config_node(option);
222     else
223       WARNING("redis plugin: Option `%s' not allowed in redis"
224               " configuration. It will be ignored.",
225               option->key);
226   }
227
228   if (nodes_head == NULL) {
229     ERROR("redis plugin: No valid node configuration could be found.");
230     return (ENOENT);
231   }
232
233   return (0);
234 } /* }}} */
235
236 __attribute__((nonnull(2))) static void
237 redis_submit(char *plugin_instance, const char *type, const char *type_instance,
238              value_t value) /* {{{ */
239 {
240   value_list_t vl = VALUE_LIST_INIT;
241
242   vl.values = &value;
243   vl.values_len = 1;
244   sstrncpy(vl.plugin, "redis", sizeof(vl.plugin));
245   if (plugin_instance != NULL)
246     sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
247   sstrncpy(vl.type, type, sizeof(vl.type));
248   if (type_instance != NULL)
249     sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
250
251   plugin_dispatch_values(&vl);
252 } /* }}} */
253
254 static int redis_init(void) /* {{{ */
255 {
256   redis_node_t rn = {.name = "default",
257                      .host = REDIS_DEF_HOST,
258                      .port = REDIS_DEF_PORT,
259                      .timeout.tv_sec = 0,
260                      .timeout.tv_usec = REDIS_DEF_TIMEOUT,
261                      .next = NULL};
262
263   if (nodes_head == NULL)
264     redis_node_add(&rn);
265
266   return (0);
267 } /* }}} int redis_init */
268
269 static int redis_handle_info(char *node, char const *info_line,
270                              char const *type, char const *type_instance,
271                              char const *field_name, int ds_type) /* {{{ */
272 {
273   char *str = strstr(info_line, field_name);
274   static char buf[MAX_REDIS_VAL_SIZE];
275   value_t val;
276   if (str) {
277     int i;
278
279     str += strlen(field_name) + 1; /* also skip the ':' */
280     for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.'));
281          i++, str++)
282       buf[i] = *str;
283     buf[i] = '\0';
284
285     if (parse_value(buf, &val, ds_type) == -1) {
286       WARNING("redis plugin: Unable to parse field `%s'.", field_name);
287       return (-1);
288     }
289
290     redis_submit(node, type, type_instance, val);
291     return (0);
292   }
293   return (-1);
294
295 } /* }}} int redis_handle_info */
296
297 static int redis_handle_query(redisContext *rh, redis_node_t *rn,
298                               redis_query_t *rq) /* {{{ */
299 {
300   redisReply *rr;
301   const data_set_t *ds;
302   value_t val;
303
304   ds = plugin_get_ds(rq->type);
305   if (!ds) {
306     ERROR("redis plugin: DataSet `%s' not defined.", rq->type);
307     return (-1);
308   }
309
310   if (ds->ds_num != 1) {
311     ERROR("redis plugin: DS `%s' has too many types.", rq->type);
312     return (-1);
313   }
314
315   if ((rr = redisCommand(rh, rq->query)) == NULL) {
316     WARNING("redis plugin: unable to carry out query `%s'.", rq->query);
317     return (-1);
318   }
319
320   switch (rr->type) {
321   case REDIS_REPLY_INTEGER:
322     switch (ds->ds[0].type) {
323     case DS_TYPE_COUNTER:
324       val.counter = (counter_t)rr->integer;
325       break;
326     case DS_TYPE_GAUGE:
327       val.gauge = (gauge_t)rr->integer;
328       break;
329     case DS_TYPE_DERIVE:
330       val.gauge = (derive_t)rr->integer;
331       break;
332     case DS_TYPE_ABSOLUTE:
333       val.gauge = (absolute_t)rr->integer;
334       break;
335     }
336     break;
337   case REDIS_REPLY_STRING:
338     if (parse_value(rr->str, &val, ds->ds[0].type) == -1) {
339       WARNING("redis plugin: Unable to parse field `%s'.", rq->type);
340       freeReplyObject(rr);
341       return (-1);
342     }
343     break;
344   default:
345     WARNING("redis plugin: Cannot coerce redis type.");
346     freeReplyObject(rr);
347     return (-1);
348   }
349
350   redis_submit(rn->name, rq->type,
351                (strlen(rq->instance) > 0) ? rq->instance : NULL, val);
352   freeReplyObject(rr);
353   return 0;
354 } /* }}} int redis_handle_query */
355
356 static int redis_db_stats(char *node, char const *info_line) /* {{{ */
357 {
358   unsigned char db;
359   unsigned char max_db = 16;
360   static char field_name[10];
361   static char db_id[3];
362   char *str;
363   static char buf[MAX_REDIS_VAL_SIZE];
364   value_t val;
365   int i;
366
367   for (db = 0; db < max_db; db++) {
368         ssnprintf(field_name, sizeof(field_name), "db%d:keys", db);
369
370         str = strstr(info_line, field_name);
371         if (str) {
372           str += strlen(field_name) + 1; /* also skip the '=' */
373           for (i = 0; (*str && (isdigit((unsigned char)*str) || *str == '.')); i++, str++)
374                 buf[i] = *str;
375           buf[i] = '\0';
376
377           if (parse_value(buf, &val, DS_TYPE_GAUGE) == -1) {
378                 WARNING("redis plugin: Unable to parse field `%s'.", field_name);
379                 return (-1);
380           }
381
382           ssnprintf(db_id, sizeof(db_id), "%d", db);
383           redis_submit (node, "records", db_id, val);
384         }
385   }
386   return (0);
387
388 } /* }}} int redis_db_stats */
389
390
391 static int redis_read(void) /* {{{ */
392 {
393   for (redis_node_t *rn = nodes_head; rn != NULL; rn = rn->next) {
394     redisContext *rh;
395     redisReply *rr;
396
397     DEBUG("redis plugin: querying info from node `%s' (%s:%d).", rn->name,
398           rn->host, rn->port);
399
400     rh = redisConnectWithTimeout((char *)rn->host, rn->port, rn->timeout);
401     if (rh == NULL) {
402       ERROR("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name,
403             rn->host, rn->port);
404       continue;
405     }
406
407     if (strlen(rn->passwd) > 0) {
408       DEBUG("redis plugin: authenticating node `%s' passwd(%s).", rn->name,
409             rn->passwd);
410
411       if ((rr = redisCommand(rh, "AUTH %s", rn->passwd)) == NULL) {
412         WARNING("redis plugin: unable to authenticate on node `%s'.", rn->name);
413         goto redis_fail;
414       }
415
416       if (rr->type != REDIS_REPLY_STATUS) {
417         WARNING("redis plugin: invalid authentication on node `%s'.", rn->name);
418         goto redis_fail;
419       }
420
421       freeReplyObject(rr);
422     }
423
424     if ((rr = redisCommand(rh, "INFO")) == NULL) {
425       WARNING("redis plugin: unable to get info from node `%s'.", rn->name);
426       goto redis_fail;
427     }
428
429     redis_handle_info(rn->name, rr->str, "uptime", NULL, "uptime_in_seconds",
430                       DS_TYPE_GAUGE);
431     redis_handle_info(rn->name, rr->str, "current_connections", "clients",
432                       "connected_clients", DS_TYPE_GAUGE);
433     redis_handle_info(rn->name, rr->str, "blocked_clients", NULL,
434                       "blocked_clients", DS_TYPE_GAUGE);
435     redis_handle_info(rn->name, rr->str, "memory", NULL, "used_memory",
436                       DS_TYPE_GAUGE);
437     redis_handle_info(rn->name, rr->str, "memory_lua", NULL, "used_memory_lua",
438                       DS_TYPE_GAUGE);
439     /* changes_since_last_save: Deprecated in redis version 2.6 and above */
440     redis_handle_info(rn->name, rr->str, "volatile_changes", NULL,
441                       "changes_since_last_save", DS_TYPE_GAUGE);
442     redis_handle_info(rn->name, rr->str, "total_connections", NULL,
443                       "total_connections_received", DS_TYPE_DERIVE);
444     redis_handle_info(rn->name, rr->str, "total_operations", NULL,
445                       "total_commands_processed", DS_TYPE_DERIVE);
446     redis_handle_info(rn->name, rr->str, "operations_per_second", NULL,
447                       "instantaneous_ops_per_sec", DS_TYPE_GAUGE);
448     redis_handle_info(rn->name, rr->str, "expired_keys", NULL, "expired_keys",
449                       DS_TYPE_DERIVE);
450     redis_handle_info(rn->name, rr->str, "evicted_keys", NULL, "evicted_keys",
451                       DS_TYPE_DERIVE);
452     redis_handle_info(rn->name, rr->str, "pubsub", "channels",
453                       "pubsub_channels", DS_TYPE_GAUGE);
454     redis_handle_info(rn->name, rr->str, "pubsub", "patterns",
455                       "pubsub_patterns", DS_TYPE_GAUGE);
456     redis_handle_info(rn->name, rr->str, "current_connections", "slaves",
457                       "connected_slaves", DS_TYPE_GAUGE);
458     redis_handle_info(rn->name, rr->str, "cache_result", "hits",
459                       "keyspace_hits", DS_TYPE_DERIVE);
460     redis_handle_info(rn->name, rr->str, "cache_result", "misses",
461                       "keyspace_misses", DS_TYPE_DERIVE);
462     redis_handle_info(rn->name, rr->str, "total_bytes", "input",
463                       "total_net_input_bytes", DS_TYPE_DERIVE);
464     redis_handle_info(rn->name, rr->str, "total_bytes", "output",
465                       "total_net_output_bytes", DS_TYPE_DERIVE);
466
467         redis_db_stats (rn->name, rr->str);
468
469     for (redis_query_t *rq = rn->queries; rq != NULL; rq = rq->next)
470       redis_handle_query(rh, rn, rq);
471
472   redis_fail:
473     if (rr != NULL)
474       freeReplyObject(rr);
475     redisFree(rh);
476   }
477
478   return 0;
479 }
480 /* }}} */
481
482 void module_register(void) /* {{{ */
483 {
484   plugin_register_complex_config("redis", redis_config);
485   plugin_register_init("redis", redis_init);
486   plugin_register_read("redis", redis_read);
487   /* TODO: plugin_register_write: one redis list per value id with
488    * X elements */
489 }
490 /* }}} */