2 * collectd - src/memcached.c, based on src/hddtemp.c
3 * Copyright (C) 2007 Antony Dovgal
4 * Copyright (C) 2007-2012 Florian Forster
5 * Copyright (C) 2009 Doug MacEachern
6 * Copyright (C) 2009 Franck Lombardi
7 * Copyright (C) 2012 Nicolas Szalay
9 * This program is free software; you can redistribute it and/or modify it
10 * under the terms of the GNU General Public License as published by the
11 * Free Software Foundation; either version 2 of the License, or (at your
12 * option) any later version.
14 * This program is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * General Public License for more details.
19 * You should have received a copy of the GNU General Public License along
20 * with this program; if not, write to the Free Software Foundation, Inc.,
21 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24 * Antony Dovgal <tony at daylessday dot org>
25 * Florian octo Forster <octo at collectd.org>
26 * Doug MacEachern <dougm at hyperic.com>
37 #include <netinet/in.h>
38 #include <netinet/tcp.h>
41 #define MEMCACHED_DEF_HOST "127.0.0.1"
42 #define MEMCACHED_DEF_PORT "11211"
50 typedef struct memcached_s memcached_t;
52 static _Bool memcached_have_instances = 0;
54 static void memcached_free(void *arg) {
55 memcached_t *st = arg;
66 static int memcached_connect_unix(memcached_t *st) {
67 struct sockaddr_un serv_addr = {0};
70 serv_addr.sun_family = AF_UNIX;
71 sstrncpy(serv_addr.sun_path, st->socket, sizeof(serv_addr.sun_path));
73 /* create our socket descriptor */
74 fd = socket(AF_UNIX, SOCK_STREAM, 0);
77 ERROR("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
78 sstrerror(errno, errbuf, sizeof(errbuf)));
82 /* connect to the memcached daemon */
83 int status = connect(fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
85 shutdown(fd, SHUT_RDWR);
91 } /* int memcached_connect_unix */
93 static int memcached_connect_inet(memcached_t *st) {
97 struct addrinfo *ai_list;
101 host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
102 port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
104 struct addrinfo ai_hints = {.ai_family = AF_UNSPEC,
105 .ai_flags = AI_ADDRCONFIG,
106 .ai_socktype = SOCK_STREAM};
108 status = getaddrinfo(host, port, &ai_hints, &ai_list);
111 ERROR("memcached plugin: memcached_connect_inet: "
112 "getaddrinfo(%s,%s) failed: %s",
114 (status == EAI_SYSTEM) ? sstrerror(errno, errbuf, sizeof(errbuf))
115 : gai_strerror(status));
119 for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL;
120 ai_ptr = ai_ptr->ai_next) {
121 /* create our socket descriptor */
122 fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
125 WARNING("memcached plugin: memcached_connect_inet: "
126 "socket(2) failed: %s",
127 sstrerror(errno, errbuf, sizeof(errbuf)));
131 /* connect to the memcached daemon */
132 status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
134 shutdown(fd, SHUT_RDWR);
140 /* A socket could be opened and connecting succeeded. We're done. */
144 freeaddrinfo(ai_list);
146 } /* int memcached_connect_inet */
148 static int memcached_connect(memcached_t *st) {
149 if (st->socket != NULL)
150 return (memcached_connect_unix(st));
152 return (memcached_connect_inet(st));
155 static int memcached_query_daemon(char *buffer, size_t buffer_size,
160 fd = memcached_connect(st);
162 ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.",
167 status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n"));
170 ERROR("memcached plugin: write(2) failed: %s",
171 sstrerror(errno, errbuf, sizeof(errbuf)));
172 shutdown(fd, SHUT_RDWR);
177 /* receive data from the memcached daemon */
178 memset(buffer, 0, buffer_size);
181 while ((status = (int)recv(fd, buffer + buffer_fill,
182 buffer_size - buffer_fill, /* flags = */ 0)) !=
184 char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
188 if ((errno == EAGAIN) || (errno == EINTR))
191 ERROR("memcached: Error reading from socket: %s",
192 sstrerror(errno, errbuf, sizeof(errbuf)));
193 shutdown(fd, SHUT_RDWR);
198 buffer_fill += (size_t)status;
199 if (buffer_fill > buffer_size) {
200 buffer_fill = buffer_size;
201 WARNING("memcached plugin: Message was truncated.");
205 /* If buffer ends in end_token, we have all the data. */
206 if (memcmp(buffer + buffer_fill - sizeof(end_token), end_token,
207 sizeof(end_token)) == 0)
212 if (buffer_fill == 0) {
213 WARNING("memcached plugin: No data returned by memcached.");
217 shutdown(fd, SHUT_RDWR);
220 } /* int memcached_query_daemon */
222 static void memcached_init_vl(value_list_t *vl, memcached_t const *st) {
223 char const *host = st->host;
225 /* Set vl->host to hostname_g, if:
226 * - Legacy mode is used.
227 * - "Socket" option is given (doc: "Host option is ignored").
228 * - "Host" option is not provided.
229 * - "Host" option is set to "localhost" or "127.0.0.1". */
230 if ((strcmp(st->name, "__legacy__") == 0) || (st->socket != NULL) ||
231 (st->host == NULL) || (strcmp("127.0.0.1", st->host) == 0) ||
232 (strcmp("localhost", st->host) == 0))
235 sstrncpy(vl->plugin, "memcached", sizeof(vl->plugin));
236 sstrncpy(vl->host, host, sizeof(vl->host));
237 if (strcmp(st->name, "__legacy__") != 0)
238 sstrncpy(vl->plugin_instance, st->name, sizeof(vl->plugin_instance));
241 static void submit_derive(const char *type, const char *type_inst,
242 derive_t value, memcached_t *st) {
244 value_list_t vl = VALUE_LIST_INIT;
245 memcached_init_vl(&vl, st);
247 values[0].derive = value;
251 sstrncpy(vl.type, type, sizeof(vl.type));
252 if (type_inst != NULL)
253 sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
255 plugin_dispatch_values(&vl);
258 static void submit_derive2(const char *type, const char *type_inst,
259 derive_t value0, derive_t value1, memcached_t *st) {
261 value_list_t vl = VALUE_LIST_INIT;
262 memcached_init_vl(&vl, st);
264 values[0].derive = value0;
265 values[1].derive = value1;
269 sstrncpy(vl.type, type, sizeof(vl.type));
270 if (type_inst != NULL)
271 sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
273 plugin_dispatch_values(&vl);
276 static void submit_gauge(const char *type, const char *type_inst, gauge_t value,
279 value_list_t vl = VALUE_LIST_INIT;
280 memcached_init_vl(&vl, st);
282 values[0].gauge = value;
286 sstrncpy(vl.type, type, sizeof(vl.type));
287 if (type_inst != NULL)
288 sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
290 plugin_dispatch_values(&vl);
293 static void submit_gauge2(const char *type, const char *type_inst,
294 gauge_t value0, gauge_t value1, memcached_t *st) {
296 value_list_t vl = VALUE_LIST_INIT;
297 memcached_init_vl(&vl, st);
299 values[0].gauge = value0;
300 values[1].gauge = value1;
304 sstrncpy(vl.type, type, sizeof(vl.type));
305 if (type_inst != NULL)
306 sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance));
308 plugin_dispatch_values(&vl);
311 static int memcached_read(user_data_t *user_data) {
319 gauge_t bytes_used = NAN;
320 gauge_t bytes_total = NAN;
323 gauge_t incr_hits = NAN;
325 gauge_t decr_hits = NAN;
327 derive_t rusage_user = 0;
328 derive_t rusage_syst = 0;
329 derive_t octets_rx = 0;
330 derive_t octets_tx = 0;
333 st = user_data->data;
335 /* get data from daemon */
336 if (memcached_query_daemon(buf, sizeof(buf), st) < 0) {
340 #define FIELD_IS(cnst) \
341 (((sizeof(cnst) - 1) == name_len) && (strcmp(cnst, fields[1]) == 0))
345 while ((line = strtok_r(ptr, "\n\r", &saveptr)) != NULL) {
350 fields_num = strsplit(line, fields, 3);
354 name_len = strlen(fields[1]);
359 * For an explanation on these fields please refer to
360 * <https://github.com/memcached/memcached/blob/master/doc/protocol.txt>
364 * CPU time consumed by the memcached process
366 if (FIELD_IS("rusage_user")) {
367 rusage_user = atoll(fields[2]);
368 } else if (FIELD_IS("rusage_system")) {
369 rusage_syst = atoll(fields[2]);
373 * Number of threads of this instance
375 else if (FIELD_IS("threads")) {
376 submit_gauge2("ps_count", NULL, NAN, atof(fields[2]), st);
380 * Number of items stored
382 else if (FIELD_IS("curr_items")) {
383 submit_gauge("memcached_items", "current", atof(fields[2]), st);
387 * Number of bytes used and available (total - used)
389 else if (FIELD_IS("bytes")) {
390 bytes_used = atof(fields[2]);
391 } else if (FIELD_IS("limit_maxbytes")) {
392 bytes_total = atof(fields[2]);
398 else if (FIELD_IS("curr_connections")) {
399 submit_gauge("memcached_connections", "current", atof(fields[2]), st);
400 } else if (FIELD_IS("listen_disabled_num")) {
401 submit_derive("connections", "listen_disabled", atof(fields[2]), st);
407 else if ((name_len > 4) && (strncmp(fields[1], "cmd_", 4) == 0)) {
408 const char *name = fields[1] + 4;
409 submit_derive("memcached_command", name, atoll(fields[2]), st);
410 if (strcmp(name, "get") == 0)
411 gets = atof(fields[2]);
415 * Increment/Decrement
417 else if (FIELD_IS("incr_misses")) {
418 derive_t incr_count = atoll(fields[2]);
419 submit_derive("memcached_ops", "incr_misses", incr_count, st);
421 } else if (FIELD_IS("incr_hits")) {
422 derive_t incr_count = atoll(fields[2]);
423 submit_derive("memcached_ops", "incr_hits", incr_count, st);
424 incr_hits = atof(fields[2]);
426 } else if (FIELD_IS("decr_misses")) {
427 derive_t decr_count = atoll(fields[2]);
428 submit_derive("memcached_ops", "decr_misses", decr_count, st);
430 } else if (FIELD_IS("decr_hits")) {
431 derive_t decr_count = atoll(fields[2]);
432 submit_derive("memcached_ops", "decr_hits", decr_count, st);
433 decr_hits = atof(fields[2]);
438 * Operations on the cache, i. e. cache hits, cache misses and evictions of
441 else if (FIELD_IS("get_hits")) {
442 submit_derive("memcached_ops", "hits", atoll(fields[2]), st);
443 hits = atof(fields[2]);
444 } else if (FIELD_IS("get_misses")) {
445 submit_derive("memcached_ops", "misses", atoll(fields[2]), st);
446 } else if (FIELD_IS("evictions")) {
447 submit_derive("memcached_ops", "evictions", atoll(fields[2]), st);
453 else if (FIELD_IS("bytes_read")) {
454 octets_rx = atoll(fields[2]);
455 } else if (FIELD_IS("bytes_written")) {
456 octets_tx = atoll(fields[2]);
458 } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
460 if (!isnan(bytes_used) && !isnan(bytes_total) && (bytes_used <= bytes_total))
461 submit_gauge2("df", "cache", bytes_used, bytes_total - bytes_used, st);
463 if ((rusage_user != 0) || (rusage_syst != 0))
464 submit_derive2("ps_cputime", NULL, rusage_user, rusage_syst, st);
466 if ((octets_rx != 0) || (octets_tx != 0))
467 submit_derive2("memcached_octets", NULL, octets_rx, octets_tx, st);
469 if (!isnan(gets) && !isnan(hits)) {
473 rate = 100.0 * hits / gets;
475 submit_gauge("percent", "hitratio", rate, st);
478 if (!isnan(incr_hits) && incr != 0) {
479 gauge_t incr_rate = 100.0 * incr_hits / incr;
480 submit_gauge("percent", "incr_hitratio", incr_rate, st);
481 submit_derive("memcached_ops", "incr", incr, st);
484 if (!isnan(decr_hits) && decr != 0) {
485 gauge_t decr_rate = 100.0 * decr_hits / decr;
486 submit_gauge("percent", "decr_hitratio", decr_rate, st);
487 submit_derive("memcached_ops", "decr", decr, st);
491 } /* int memcached_read */
493 static int memcached_add_read_callback(memcached_t *st) {
494 char callback_name[3 * DATA_MAX_NAME_LEN];
497 assert(st->name != NULL);
498 ssnprintf(callback_name, sizeof(callback_name), "memcached/%s", st->name);
500 user_data_t ud = {.data = st, .free_func = memcached_free};
502 status = plugin_register_complex_read(/* group = */ "memcached",
503 /* name = */ callback_name,
504 /* callback = */ memcached_read,
506 /* user_data = */ &ud);
508 } /* int memcached_add_read_callback */
510 /* Configuration handling functiions
512 * <Instance "instance_name">
518 static int config_add_instance(oconfig_item_t *ci) {
522 /* Disable automatic generation of default instance in the init callback. */
523 memcached_have_instances = 1;
525 st = calloc(1, sizeof(*st));
527 ERROR("memcached plugin: calloc failed.");
536 if (strcasecmp(ci->key, "Plugin") == 0) /* default instance */
537 st->name = sstrdup("__legacy__");
538 else /* <Instance /> block */
539 status = cf_util_get_string(ci, &st->name);
544 assert(st->name != NULL);
546 for (int i = 0; i < ci->children_num; i++) {
547 oconfig_item_t *child = ci->children + i;
549 if (strcasecmp("Socket", child->key) == 0)
550 status = cf_util_get_string(child, &st->socket);
551 else if (strcasecmp("Host", child->key) == 0)
552 status = cf_util_get_string(child, &st->host);
553 else if (strcasecmp("Port", child->key) == 0)
554 status = cf_util_get_service(child, &st->port);
556 WARNING("memcached plugin: Option `%s' not allowed here.", child->key);
565 status = memcached_add_read_callback(st);
575 static int memcached_config(oconfig_item_t *ci) {
577 _Bool have_instance_block = 0;
579 for (int i = 0; i < ci->children_num; i++) {
580 oconfig_item_t *child = ci->children + i;
582 if (strcasecmp("Instance", child->key) == 0) {
583 config_add_instance(child);
584 have_instance_block = 1;
585 } else if (!have_instance_block) {
586 /* Non-instance option: Assume legacy configuration (without <Instance />
587 * blocks) and call config_add_instance() with the <Plugin /> block. */
588 return (config_add_instance(ci));
590 WARNING("memcached plugin: The configuration option "
591 "\"%s\" is not allowed here. Did you "
592 "forget to add an <Instance /> block "
593 "around the configuration?",
595 } /* for (ci->children) */
600 static int memcached_init(void) {
604 if (memcached_have_instances)
607 /* No instances were configured, lets start a default instance. */
608 st = calloc(1, sizeof(*st));
611 st->name = sstrdup("__legacy__");
616 status = memcached_add_read_callback(st);
618 memcached_have_instances = 1;
623 } /* int memcached_init */
625 void module_register(void) {
626 plugin_register_complex_config("memcached", memcached_config);
627 plugin_register_init("memcached", memcached_init);