Bind plugin: plug a few leaks
[collectd.git] / src / memcached.c
1 /**
2  * collectd - src/memcached.c, based on src/hddtemp.c
3  * Copyright (C) 2007       Antony Dovgal
4  * Copyright (C) 2007-2012  Florian Forster
5  * Copyright (C) 2009       Doug MacEachern
6  * Copyright (C) 2009       Franck Lombardi
7  * Copyright (C) 2012       Nicolas Szalay
8  *
9  * This program is free software; you can redistribute it and/or modify it
10  * under the terms of the GNU General Public License as published by the
11  * Free Software Foundation; either version 2 of the License, or (at your
12  * option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License along
20  * with this program; if not, write to the Free Software Foundation, Inc.,
21  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
22  *
23  * Authors:
24  *   Antony Dovgal <tony at daylessday dot org>
25  *   Florian octo Forster <octo at collectd.org>
26  *   Doug MacEachern <dougm at hyperic.com>
27  *   Franck Lombardi
28  *   Nicolas Szalay
29  **/
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35
36 #include <netdb.h>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
39 #include <sys/un.h>
40
41 #define MEMCACHED_DEF_HOST "127.0.0.1"
42 #define MEMCACHED_DEF_PORT "11211"
43
44 struct memcached_s {
45   char *name;
46   char *socket;
47   char *host;
48   char *port;
49 };
50 typedef struct memcached_s memcached_t;
51
52 static _Bool memcached_have_instances = 0;
53
54 static void memcached_free(void *arg) {
55   memcached_t *st = arg;
56   if (st == NULL)
57     return;
58
59   sfree(st->name);
60   sfree(st->socket);
61   sfree(st->host);
62   sfree(st->port);
63   sfree(st);
64 }
65
66 static int memcached_connect_unix(memcached_t *st) {
67   struct sockaddr_un serv_addr = {0};
68   int fd;
69
70   serv_addr.sun_family = AF_UNIX;
71   sstrncpy(serv_addr.sun_path, st->socket, sizeof(serv_addr.sun_path));
72
73   /* create our socket descriptor */
74   fd = socket(AF_UNIX, SOCK_STREAM, 0);
75   if (fd < 0) {
76     char errbuf[1024];
77     ERROR("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
78           sstrerror(errno, errbuf, sizeof(errbuf)));
79     return (-1);
80   }
81
82   /* connect to the memcached daemon */
83   int status = connect(fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
84   if (status != 0) {
85     shutdown(fd, SHUT_RDWR);
86     close(fd);
87     fd = -1;
88   }
89
90   return (fd);
91 } /* int memcached_connect_unix */
92
93 static int memcached_connect_inet(memcached_t *st) {
94   const char *host;
95   const char *port;
96
97   struct addrinfo *ai_list;
98   int status;
99   int fd = -1;
100
101   host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
102   port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
103
104   struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
105                               .ai_flags = AI_ADDRCONFIG,
106                               .ai_socktype = SOCK_STREAM};
107
108   status = getaddrinfo(host, port, &ai_hints, &ai_list);
109   if (status != 0) {
110     char errbuf[1024];
111     ERROR("memcached plugin: memcached_connect_inet: "
112           "getaddrinfo(%s,%s) failed: %s",
113           host, port,
114           (status == EAI_SYSTEM) ? sstrerror(errno, errbuf, sizeof(errbuf))
115                                  : gai_strerror(status));
116     return (-1);
117   }
118
119   for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
120        ai_ptr = ai_ptr->ai_next) {
121     /* create our socket descriptor */
122     fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
123     if (fd < 0) {
124       char errbuf[1024];
125       WARNING("memcached plugin: memcached_connect_inet: "
126               "socket(2) failed: %s",
127               sstrerror(errno, errbuf, sizeof(errbuf)));
128       continue;
129     }
130
131     /* connect to the memcached daemon */
132     status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
133     if (status != 0) {
134       shutdown(fd, SHUT_RDWR);
135       close(fd);
136       fd = -1;
137       continue;
138     }
139
140     /* A socket could be opened and connecting succeeded. We're done. */
141     break;
142   }
143
144   freeaddrinfo(ai_list);
145   return (fd);
146 } /* int memcached_connect_inet */
147
148 static int memcached_connect(memcached_t *st) {
149   if (st->socket != NULL)
150     return (memcached_connect_unix(st));
151   else
152     return (memcached_connect_inet(st));
153 }
154
155 static int memcached_query_daemon(char *buffer, size_t buffer_size,
156                                   memcached_t *st) {
157   int fd, status;
158   size_t buffer_fill;
159
160   fd = memcached_connect(st);
161   if (fd < 0) {
162     ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.",
163           st->name);
164     return -1;
165   }
166
167   status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n"));
168   if (status != 0) {
169     char errbuf[1024];
170     ERROR("memcached plugin: write(2) failed: %s",
171           sstrerror(errno, errbuf, sizeof(errbuf)));
172     shutdown(fd, SHUT_RDWR);
173     close(fd);
174     return (-1);
175   }
176
177   /* receive data from the memcached daemon */
178   memset(buffer, 0, buffer_size);
179
180   buffer_fill = 0;
181   while ((status = (int)recv(fd, buffer + buffer_fill,
182                              buffer_size - buffer_fill, /* flags = */ 0)) !=
183          0) {
184     char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
185     if (status < 0) {
186       char errbuf[1024];
187
188       if ((errno == EAGAIN) || (errno == EINTR))
189         continue;
190
191       ERROR("memcached: Error reading from socket: %s",
192             sstrerror(errno, errbuf, sizeof(errbuf)));
193       shutdown(fd, SHUT_RDWR);
194       close(fd);
195       return (-1);
196     }
197
198     buffer_fill += (size_t)status;
199     if (buffer_fill > buffer_size) {
200       buffer_fill = buffer_size;
201       WARNING("memcached plugin: Message was truncated.");
202       break;
203     }
204
205     /* If buffer ends in end_token, we have all the data. */
206     if (memcmp(buffer + buffer_fill - sizeof(end_token), end_token,
207                sizeof(end_token)) == 0)
208       break;
209   } /* while (recv) */
210
211   status = 0;
212   if (buffer_fill == 0) {
213     WARNING("memcached plugin: No data returned by memcached.");
214     status = -1;
215   }
216
217   shutdown(fd, SHUT_RDWR);
218   close(fd);
219   return (status);
220 } /* int memcached_query_daemon */
221
222 static void memcached_init_vl(value_list_t *vl, memcached_t const *st) {
223   char const *host = st->host;
224
225   /* Set vl->host to hostname_g, if:
226    * - Legacy mode is used.
227    * - "Socket" option is given (doc: "Host option is ignored").
228    * - "Host" option is not provided.
229    * - "Host" option is set to "localhost" or "127.0.0.1". */
230   if ((strcmp(st->name, "__legacy__") == 0) || (st->socket != NULL) ||
231       (st->host == NULL) || (strcmp("127.0.0.1", st->host) == 0) ||
232       (strcmp("localhost", st->host) == 0))
233     host = hostname_g;
234
235   sstrncpy(vl->plugin, "memcached", sizeof(vl->plugin));
236   sstrncpy(vl->host, host, sizeof(vl->host));
237   if (strcmp(st->name, "__legacy__") != 0)
238     sstrncpy(vl->plugin_instance, st->name, sizeof(vl->plugin_instance));
239 }
240
241 static void submit_derive(const char *type, const char *type_inst,
242                           derive_t value, memcached_t *st) {
243   value_t values[1];
244   value_list_t vl = VALUE_LIST_INIT;
245   memcached_init_vl(&vl, st);
246
247   values[0].derive = value;
248
249   vl.values = values;
250   vl.values_len = 1;
251   sstrncpy(vl.type, type, sizeof(vl.type));
252   if (type_inst != NULL)
253     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
254
255   plugin_dispatch_values(&vl);
256 }
257
258 static void submit_derive2(const char *type, const char *type_inst,
259                            derive_t value0, derive_t value1, memcached_t *st) {
260   value_t values[2];
261   value_list_t vl = VALUE_LIST_INIT;
262   memcached_init_vl(&vl, st);
263
264   values[0].derive = value0;
265   values[1].derive = value1;
266
267   vl.values = values;
268   vl.values_len = 2;
269   sstrncpy(vl.type, type, sizeof(vl.type));
270   if (type_inst != NULL)
271     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
272
273   plugin_dispatch_values(&vl);
274 }
275
276 static void submit_gauge(const char *type, const char *type_inst, gauge_t value,
277                          memcached_t *st) {
278   value_t values[1];
279   value_list_t vl = VALUE_LIST_INIT;
280   memcached_init_vl(&vl, st);
281
282   values[0].gauge = value;
283
284   vl.values = values;
285   vl.values_len = 1;
286   sstrncpy(vl.type, type, sizeof(vl.type));
287   if (type_inst != NULL)
288     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
289
290   plugin_dispatch_values(&vl);
291 }
292
293 static void submit_gauge2(const char *type, const char *type_inst,
294                           gauge_t value0, gauge_t value1, memcached_t *st) {
295   value_t values[2];
296   value_list_t vl = VALUE_LIST_INIT;
297   memcached_init_vl(&vl, st);
298
299   values[0].gauge = value0;
300   values[1].gauge = value1;
301
302   vl.values = values;
303   vl.values_len = 2;
304   sstrncpy(vl.type, type, sizeof(vl.type));
305   if (type_inst != NULL)
306     sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
307
308   plugin_dispatch_values(&vl);
309 }
310
311 static int memcached_read(user_data_t *user_data) {
312   char buf[4096];
313   char *fields[3];
314   char *ptr;
315   char *line;
316   char *saveptr;
317   int fields_num;
318
319   gauge_t bytes_used = NAN;
320   gauge_t bytes_total = NAN;
321   gauge_t hits = NAN;
322   gauge_t gets = NAN;
323   gauge_t incr_hits = NAN;
324   derive_t incr = 0;
325   gauge_t decr_hits = NAN;
326   derive_t decr = 0;
327   derive_t rusage_user = 0;
328   derive_t rusage_syst = 0;
329   derive_t octets_rx = 0;
330   derive_t octets_tx = 0;
331
332   memcached_t *st;
333   st = user_data->data;
334
335   /* get data from daemon */
336   if (memcached_query_daemon(buf, sizeof(buf), st) < 0) {
337     return -1;
338   }
339
340 #define FIELD_IS(cnst)                                                         \
341   (((sizeof(cnst) - 1) == name_len) && (strcmp(cnst, fields[1]) == 0))
342
343   ptr = buf;
344   saveptr = NULL;
345   while ((line = strtok_r(ptr, "\n\r", &saveptr)) != NULL) {
346     int name_len;
347
348     ptr = NULL;
349
350     fields_num = strsplit(line, fields, 3);
351     if (fields_num != 3)
352       continue;
353
354     name_len = strlen(fields[1]);
355     if (name_len == 0)
356       continue;
357
358     /*
359      * For an explanation on these fields please refer to
360      * <https://github.com/memcached/memcached/blob/master/doc/protocol.txt>
361      */
362
363     /*
364      * CPU time consumed by the memcached process
365      */
366     if (FIELD_IS("rusage_user")) {
367       rusage_user = atoll(fields[2]);
368     } else if (FIELD_IS("rusage_system")) {
369       rusage_syst = atoll(fields[2]);
370     }
371
372     /*
373      * Number of threads of this instance
374      */
375     else if (FIELD_IS("threads")) {
376       submit_gauge2("ps_count", NULL, NAN, atof(fields[2]), st);
377     }
378
379     /*
380      * Number of items stored
381      */
382     else if (FIELD_IS("curr_items")) {
383       submit_gauge("memcached_items", "current", atof(fields[2]), st);
384     }
385
386     /*
387      * Number of bytes used and available (total - used)
388      */
389     else if (FIELD_IS("bytes")) {
390       bytes_used = atof(fields[2]);
391     } else if (FIELD_IS("limit_maxbytes")) {
392       bytes_total = atof(fields[2]);
393     }
394
395     /*
396      * Connections
397      */
398     else if (FIELD_IS("curr_connections")) {
399       submit_gauge("memcached_connections", "current", atof(fields[2]), st);
400     } else if (FIELD_IS("listen_disabled_num")) {
401       submit_derive("connections", "listen_disabled", atof(fields[2]), st);
402     }
403
404     /*
405      * Commands
406      */
407     else if ((name_len > 4) && (strncmp(fields[1], "cmd_", 4) == 0)) {
408       const char *name = fields[1] + 4;
409       submit_derive("memcached_command", name, atoll(fields[2]), st);
410       if (strcmp(name, "get") == 0)
411         gets = atof(fields[2]);
412     }
413
414     /*
415      * Increment/Decrement
416      */
417     else if (FIELD_IS("incr_misses")) {
418       derive_t incr_count = atoll(fields[2]);
419       submit_derive("memcached_ops", "incr_misses", incr_count, st);
420       incr += incr_count;
421     } else if (FIELD_IS("incr_hits")) {
422       derive_t incr_count = atoll(fields[2]);
423       submit_derive("memcached_ops", "incr_hits", incr_count, st);
424       incr_hits = atof(fields[2]);
425       incr += incr_count;
426     } else if (FIELD_IS("decr_misses")) {
427       derive_t decr_count = atoll(fields[2]);
428       submit_derive("memcached_ops", "decr_misses", decr_count, st);
429       decr += decr_count;
430     } else if (FIELD_IS("decr_hits")) {
431       derive_t decr_count = atoll(fields[2]);
432       submit_derive("memcached_ops", "decr_hits", decr_count, st);
433       decr_hits = atof(fields[2]);
434       decr += decr_count;
435     }
436
437     /*
438      * Operations on the cache, i. e. cache hits, cache misses and evictions of
439      * items
440      */
441     else if (FIELD_IS("get_hits")) {
442       submit_derive("memcached_ops", "hits", atoll(fields[2]), st);
443       hits = atof(fields[2]);
444     } else if (FIELD_IS("get_misses")) {
445       submit_derive("memcached_ops", "misses", atoll(fields[2]), st);
446     } else if (FIELD_IS("evictions")) {
447       submit_derive("memcached_ops", "evictions", atoll(fields[2]), st);
448     }
449
450     /*
451      * Network traffic
452      */
453     else if (FIELD_IS("bytes_read")) {
454       octets_rx = atoll(fields[2]);
455     } else if (FIELD_IS("bytes_written")) {
456       octets_tx = atoll(fields[2]);
457     }
458   } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
459
460   if (!isnan(bytes_used) && !isnan(bytes_total) && (bytes_used <= bytes_total))
461     submit_gauge2("df", "cache", bytes_used, bytes_total - bytes_used, st);
462
463   if ((rusage_user != 0) || (rusage_syst != 0))
464     submit_derive2("ps_cputime", NULL, rusage_user, rusage_syst, st);
465
466   if ((octets_rx != 0) || (octets_tx != 0))
467     submit_derive2("memcached_octets", NULL, octets_rx, octets_tx, st);
468
469   if (!isnan(gets) && !isnan(hits)) {
470     gauge_t rate = NAN;
471
472     if (gets != 0.0)
473       rate = 100.0 * hits / gets;
474
475     submit_gauge("percent", "hitratio", rate, st);
476   }
477
478   if (!isnan(incr_hits) && incr != 0) {
479     gauge_t incr_rate = 100.0 * incr_hits / incr;
480     submit_gauge("percent", "incr_hitratio", incr_rate, st);
481     submit_derive("memcached_ops", "incr", incr, st);
482   }
483
484   if (!isnan(decr_hits) && decr != 0) {
485     gauge_t decr_rate = 100.0 * decr_hits / decr;
486     submit_gauge("percent", "decr_hitratio", decr_rate, st);
487     submit_derive("memcached_ops", "decr", decr, st);
488   }
489
490   return 0;
491 } /* int memcached_read */
492
493 static int memcached_add_read_callback(memcached_t *st) {
494   char callback_name[3 * DATA_MAX_NAME_LEN];
495   int status;
496
497   assert(st->name != NULL);
498   ssnprintf(callback_name, sizeof(callback_name), "memcached/%s", st->name);
499
500   user_data_t ud = {.data = st, .free_func = memcached_free};
501
502   status = plugin_register_complex_read(/* group = */ "memcached",
503                                         /* name      = */ callback_name,
504                                         /* callback  = */ memcached_read,
505                                         /* interval  = */ 0,
506                                         /* user_data = */ &ud);
507   return (status);
508 } /* int memcached_add_read_callback */
509
510 /* Configuration handling functiions
511  * <Plugin memcached>
512  *   <Instance "instance_name">
513  *     Host foo.zomg.com
514  *     Port "1234"
515  *   </Instance>
516  * </Plugin>
517  */
518 static int config_add_instance(oconfig_item_t *ci) {
519   memcached_t *st;
520   int status = 0;
521
522   /* Disable automatic generation of default instance in the init callback. */
523   memcached_have_instances = 1;
524
525   st = calloc(1, sizeof(*st));
526   if (st == NULL) {
527     ERROR("memcached plugin: calloc failed.");
528     return (-1);
529   }
530
531   st->name = NULL;
532   st->socket = NULL;
533   st->host = NULL;
534   st->port = NULL;
535
536   if (strcasecmp(ci->key, "Plugin") == 0) /* default instance */
537     st->name = sstrdup("__legacy__");
538   else /* <Instance /> block */
539     status = cf_util_get_string(ci, &st->name);
540   if (status != 0) {
541     sfree(st);
542     return (status);
543   }
544   assert(st->name != NULL);
545
546   for (int i = 0; i < ci->children_num; i++) {
547     oconfig_item_t *child = ci->children + i;
548
549     if (strcasecmp("Socket", child->key) == 0)
550       status = cf_util_get_string(child, &st->socket);
551     else if (strcasecmp("Host", child->key) == 0)
552       status = cf_util_get_string(child, &st->host);
553     else if (strcasecmp("Port", child->key) == 0)
554       status = cf_util_get_service(child, &st->port);
555     else {
556       WARNING("memcached plugin: Option `%s' not allowed here.", child->key);
557       status = -1;
558     }
559
560     if (status != 0)
561       break;
562   }
563
564   if (status == 0)
565     status = memcached_add_read_callback(st);
566
567   if (status != 0) {
568     memcached_free(st);
569     return (-1);
570   }
571
572   return (0);
573 }
574
575 static int memcached_config(oconfig_item_t *ci) {
576   int status = 0;
577   _Bool have_instance_block = 0;
578
579   for (int i = 0; i < ci->children_num; i++) {
580     oconfig_item_t *child = ci->children + i;
581
582     if (strcasecmp("Instance", child->key) == 0) {
583       config_add_instance(child);
584       have_instance_block = 1;
585     } else if (!have_instance_block) {
586       /* Non-instance option: Assume legacy configuration (without <Instance />
587        * blocks) and call config_add_instance() with the <Plugin /> block. */
588       return (config_add_instance(ci));
589     } else
590       WARNING("memcached plugin: The configuration option "
591               "\"%s\" is not allowed here. Did you "
592               "forget to add an <Instance /> block "
593               "around the configuration?",
594               child->key);
595   } /* for (ci->children) */
596
597   return (status);
598 }
599
600 static int memcached_init(void) {
601   memcached_t *st;
602   int status;
603
604   if (memcached_have_instances)
605     return (0);
606
607   /* No instances were configured, lets start a default instance. */
608   st = calloc(1, sizeof(*st));
609   if (st == NULL)
610     return (ENOMEM);
611   st->name = sstrdup("__legacy__");
612   st->socket = NULL;
613   st->host = NULL;
614   st->port = NULL;
615
616   status = memcached_add_read_callback(st);
617   if (status == 0)
618     memcached_have_instances = 1;
619   else
620     memcached_free(st);
621
622   return (status);
623 } /* int memcached_init */
624
625 void module_register(void) {
626   plugin_register_complex_config("memcached", memcached_config);
627   plugin_register_init("memcached", memcached_init);
628 }