X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fmemcached.c;h=76619789d82e15da745abff7f1682cf89686d74d;hb=2394ebb5fd42c82593c38e04bd194c24800bb263;hp=79c38a798ec99e99203e0fa422534fd4fbd94023;hpb=f374b72032a227a75b6bc9ae574cd28abbc16f24;p=collectd.git diff --git a/src/memcached.c b/src/memcached.c index 79c38a79..76619789 100644 --- a/src/memcached.c +++ b/src/memcached.c @@ -5,6 +5,7 @@ * Copyright (C) 2009 Doug MacEachern * Copyright (C) 2009 Franck Lombardi * Copyright (C) 2012 Nicolas Szalay + * Copyright (C) 2017 Pavel Rochnyak * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -26,6 +27,7 @@ * Doug MacEachern * Franck Lombardi * Nicolas Szalay + * Pavel Rochnyak **/ #include "collectd.h" @@ -38,8 +40,23 @@ #include #include +#include + #define MEMCACHED_DEF_HOST "127.0.0.1" #define MEMCACHED_DEF_PORT "11211" +#define MEMCACHED_CONNECT_TIMEOUT 10000 +#define MEMCACHED_IO_TIMEOUT 5000 + +struct prev_s { + gauge_t hits; + gauge_t gets; + gauge_t incr_hits; + gauge_t incr_misses; + gauge_t decr_hits; + gauge_t decr_misses; +}; + +typedef struct prev_s prev_t; struct memcached_s { char *name; @@ -47,6 +64,8 @@ struct memcached_s { char *socket; char *connhost; char *connport; + int fd; + prev_t prev; }; typedef struct memcached_s memcached_t; @@ -57,6 +76,12 @@ static void memcached_free(void *arg) { if (st == NULL) return; + if (st->fd >= 0) { + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; + } + sfree(st->name); sfree(st->host); sfree(st->socket); @@ -78,7 +103,7 @@ static int memcached_connect_unix(memcached_t *st) { char errbuf[1024]; ERROR("memcached plugin: memcached_connect_unix: socket(2) failed: %s", sstrerror(errno, errbuf, sizeof(errbuf))); - return (-1); + return -1; } /* connect to the memcached daemon */ @@ -86,10 +111,18 @@ static int memcached_connect_unix(memcached_t *st) { if (status != 0) { shutdown(fd, SHUT_RDWR); close(fd); - fd = -1; + return -1; + } + + /* switch to non-blocking mode */ + int flags = fcntl(fd, F_GETFL); + status = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (status != 0) { + close(fd); + return -1; } - return (fd); + return fd; } /* int memcached_connect_unix */ static int memcached_connect_inet(memcached_t *st) { @@ -109,7 +142,7 @@ static int memcached_connect_inet(memcached_t *st) { st->connhost, st->connport, (status == EAI_SYSTEM) ? sstrerror(errno, errbuf, sizeof(errbuf)) : gai_strerror(status)); - return (-1); + return -1; } for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; @@ -124,77 +157,152 @@ static int memcached_connect_inet(memcached_t *st) { continue; } + /* switch socket to non-blocking mode */ + int flags = fcntl(fd, F_GETFL); + status = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (status != 0) { + close(fd); + fd = -1; + continue; + } + /* connect to the memcached daemon */ status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); - if (status != 0) { + if (status != 0 && errno != EINPROGRESS) { shutdown(fd, SHUT_RDWR); close(fd); fd = -1; continue; } - /* A socket could be opened and connecting succeeded. We're done. */ + /* Wait until connection establishes */ + struct pollfd pollfd = { + .fd = fd, .events = POLLOUT, + }; + do + status = poll(&pollfd, 1, MEMCACHED_CONNECT_TIMEOUT); + while (status < 0 && errno == EINTR); + if (status <= 0) { + close(fd); + fd = -1; + continue; + } + + /* Check if all is good */ + int socket_error; + status = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&socket_error, + &(socklen_t){sizeof(socket_error)}); + if (status != 0 || socket_error != 0) { + close(fd); + fd = -1; + continue; + } + /* A socket is opened and connection succeeded. We're done. */ break; } freeaddrinfo(ai_list); - return (fd); + return fd; } /* int memcached_connect_inet */ -static int memcached_connect(memcached_t *st) { +static void memcached_connect(memcached_t *st) { + if (st->fd >= 0) + return; + if (st->socket != NULL) - return (memcached_connect_unix(st)); + st->fd = memcached_connect_unix(st); else - return (memcached_connect_inet(st)); + st->fd = memcached_connect_inet(st); + + if (st->fd >= 0) + INFO("memcached plugin: Instance \"%s\": connection established.", + st->name); } static int memcached_query_daemon(char *buffer, size_t buffer_size, memcached_t *st) { - int fd, status; + int status; size_t buffer_fill; - fd = memcached_connect(st); - if (fd < 0) { + memcached_connect(st); + if (st->fd < 0) { ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.", st->name); return -1; } - status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n")); + struct pollfd pollfd = { + .fd = st->fd, .events = POLLOUT, + }; + + do + status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT); + while (status < 0 && errno == EINTR); + + if (status <= 0) { + ERROR("memcached plugin: poll() failed for write() call."); + close(st->fd); + st->fd = -1; + return -1; + } + + status = (int)swrite(st->fd, "stats\r\n", strlen("stats\r\n")); if (status != 0) { char errbuf[1024]; - ERROR("memcached plugin: write(2) failed: %s", + ERROR("memcached plugin: Instance \"%s\": write(2) failed: %s", st->name, sstrerror(errno, errbuf, sizeof(errbuf))); - shutdown(fd, SHUT_RDWR); - close(fd); - return (-1); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; + return -1; } /* receive data from the memcached daemon */ memset(buffer, 0, buffer_size); buffer_fill = 0; - while ((status = (int)recv(fd, buffer + buffer_fill, - buffer_size - buffer_fill, /* flags = */ 0)) != - 0) { + pollfd.events = POLLIN; + while (1) { + do + status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT); + while (status < 0 && errno == EINTR); + + if (status <= 0) { + ERROR("memcached plugin: Instance \"%s\": Timeout reading from socket", + st->name); + close(st->fd); + st->fd = -1; + return -1; + } + + do + status = (int)recv(st->fd, buffer + buffer_fill, + buffer_size - buffer_fill, /* flags = */ 0); + while (status < 0 && errno == EINTR); + char const end_token[5] = {'E', 'N', 'D', '\r', '\n'}; if (status < 0) { char errbuf[1024]; - if ((errno == EAGAIN) || (errno == EINTR)) + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) continue; - ERROR("memcached: Error reading from socket: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - shutdown(fd, SHUT_RDWR); - close(fd); - return (-1); + ERROR("memcached plugin: Instance \"%s\": Error reading from socket: %s", + st->name, sstrerror(errno, errbuf, sizeof(errbuf))); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; + return -1; } buffer_fill += (size_t)status; if (buffer_fill > buffer_size) { buffer_fill = buffer_size; - WARNING("memcached plugin: Message was truncated."); + WARNING("memcached plugin: Instance \"%s\": Message was truncated.", + st->name); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; break; } @@ -206,13 +314,12 @@ static int memcached_query_daemon(char *buffer, size_t buffer_size, status = 0; if (buffer_fill == 0) { - WARNING("memcached plugin: No data returned by memcached."); + WARNING("memcached plugin: Instance \"%s\": No data returned by memcached.", + st->name); status = -1; } - shutdown(fd, SHUT_RDWR); - close(fd); - return (status); + return status; } /* int memcached_query_daemon */ static void memcached_init_vl(value_list_t *vl, memcached_t const *st) { @@ -285,6 +392,47 @@ static void submit_gauge2(const char *type, const char *type_inst, plugin_dispatch_values(&vl); } +static gauge_t calculate_rate(gauge_t part, gauge_t total, gauge_t *prev_part, + gauge_t *prev_total) { + if (isnan(*prev_part) || isnan(*prev_total) || (part < *prev_part) || + (total < *prev_total)) { + *prev_part = part; + *prev_total = total; + return NAN; + } + + gauge_t num = part - *prev_part; + gauge_t denom = total - *prev_total; + + *prev_part = part; + *prev_total = total; + + if (num == 0 || denom == 0) + return 0; + + return 100.0 * num / denom; +} + +static gauge_t calculate_rate2(gauge_t part1, gauge_t part2, gauge_t *prev1, + gauge_t *prev2) { + if (isnan(*prev1) || isnan(*prev2) || (part1 < *prev1) || (part2 < *prev2)) { + *prev1 = part1; + *prev2 = part2; + return NAN; + } + + gauge_t num = part1 - *prev1; + gauge_t denom = part2 - *prev2 + num; + + *prev1 = part1; + *prev2 = part2; + + if (num == 0 || denom == 0) + return 0; + + return 100.0 * num / denom; +} + static int memcached_read(user_data_t *user_data) { char buf[4096]; char *fields[3]; @@ -298,9 +446,9 @@ static int memcached_read(user_data_t *user_data) { gauge_t hits = NAN; gauge_t gets = NAN; gauge_t incr_hits = NAN; - derive_t incr = 0; + gauge_t incr_misses = NAN; gauge_t decr_hits = NAN; - derive_t decr = 0; + gauge_t decr_misses = NAN; derive_t rusage_user = 0; derive_t rusage_syst = 0; derive_t octets_rx = 0; @@ -308,6 +456,7 @@ static int memcached_read(user_data_t *user_data) { memcached_t *st; st = user_data->data; + prev_t *prev = &st->prev; /* get data from daemon */ if (memcached_query_daemon(buf, sizeof(buf), st) < 0) { @@ -341,9 +490,10 @@ static int memcached_read(user_data_t *user_data) { * CPU time consumed by the memcached process */ if (FIELD_IS("rusage_user")) { - rusage_user = atoll(fields[2]); + /* Convert to useconds */ + rusage_user = atof(fields[2]) * 1000000; } else if (FIELD_IS("rusage_system")) { - rusage_syst = atoll(fields[2]); + rusage_syst = atof(fields[2]) * 1000000; } /* @@ -377,6 +527,13 @@ static int memcached_read(user_data_t *user_data) { } else if (FIELD_IS("listen_disabled_num")) { submit_derive("connections", "listen_disabled", atof(fields[2]), st); } + /* + * Total number of connections opened since the server started running + * Report this as connection rate. + */ + else if (FIELD_IS("total_connections")) { + submit_derive("connections", "opened", atof(fields[2]), st); + } /* * Commands @@ -394,26 +551,26 @@ static int memcached_read(user_data_t *user_data) { else if (FIELD_IS("incr_misses")) { derive_t incr_count = atoll(fields[2]); submit_derive("memcached_ops", "incr_misses", incr_count, st); - incr += incr_count; + incr_misses = atof(fields[2]); } else if (FIELD_IS("incr_hits")) { derive_t incr_count = atoll(fields[2]); submit_derive("memcached_ops", "incr_hits", incr_count, st); incr_hits = atof(fields[2]); - incr += incr_count; } else if (FIELD_IS("decr_misses")) { derive_t decr_count = atoll(fields[2]); submit_derive("memcached_ops", "decr_misses", decr_count, st); - decr += decr_count; + decr_misses = atof(fields[2]); } else if (FIELD_IS("decr_hits")) { derive_t decr_count = atoll(fields[2]); submit_derive("memcached_ops", "decr_hits", decr_count, st); decr_hits = atof(fields[2]); - decr += decr_count; } /* - * Operations on the cache, i. e. cache hits, cache misses and evictions of - * items + * Operations on the cache: + * - get hits/misses + * - delete hits/misses + * - evictions */ else if (FIELD_IS("get_hits")) { submit_derive("memcached_ops", "hits", atoll(fields[2]), st); @@ -422,6 +579,10 @@ static int memcached_read(user_data_t *user_data) { submit_derive("memcached_ops", "misses", atoll(fields[2]), st); } else if (FIELD_IS("evictions")) { submit_derive("memcached_ops", "evictions", atoll(fields[2]), st); + } else if (FIELD_IS("delete_hits")) { + submit_derive("memcached_ops", "delete_hits", atoll(fields[2]), st); + } else if (FIELD_IS("delete_misses")) { + submit_derive("memcached_ops", "delete_misses", atoll(fields[2]), st); } /* @@ -444,36 +605,30 @@ static int memcached_read(user_data_t *user_data) { submit_derive2("memcached_octets", NULL, octets_rx, octets_tx, st); if (!isnan(gets) && !isnan(hits)) { - gauge_t rate = NAN; - - if (gets != 0.0) - rate = 100.0 * hits / gets; - + gauge_t rate = calculate_rate(hits, gets, &prev->hits, &prev->gets); submit_gauge("percent", "hitratio", rate, st); } - if (!isnan(incr_hits) && incr != 0) { - gauge_t incr_rate = 100.0 * incr_hits / incr; - submit_gauge("percent", "incr_hitratio", incr_rate, st); - submit_derive("memcached_ops", "incr", incr, st); + if (!isnan(incr_hits) && !isnan(incr_misses) && + (incr_hits + incr_misses > 0)) { + gauge_t rate = calculate_rate2(incr_hits, incr_misses, &prev->incr_hits, + &prev->incr_misses); + submit_gauge("percent", "incr_hitratio", rate, st); + submit_derive("memcached_ops", "incr", incr_hits + incr_misses, st); } - if (!isnan(decr_hits) && decr != 0) { - gauge_t decr_rate = 100.0 * decr_hits / decr; - submit_gauge("percent", "decr_hitratio", decr_rate, st); - submit_derive("memcached_ops", "decr", decr, st); + if (!isnan(decr_hits) && !isnan(decr_misses) && + (decr_hits + decr_misses > 0)) { + gauge_t rate = calculate_rate2(decr_hits, decr_misses, &prev->decr_hits, + &prev->decr_misses); + submit_gauge("percent", "decr_hitratio", rate, st); + submit_derive("memcached_ops", "decr", decr_hits + decr_misses, st); } return 0; } /* int memcached_read */ -static int memcached_add_read_callback(memcached_t *st) { - char callback_name[3 * DATA_MAX_NAME_LEN]; - int status; - - ssnprintf(callback_name, sizeof(callback_name), "memcached/%s", - (st->name != NULL) ? st->name : "__legacy__"); - +static int memcached_set_defaults(memcached_t *st) { /* If no
used then: * - Connect to the destination specified by , if present. * If not, use the default address. @@ -489,7 +644,7 @@ static int memcached_add_read_callback(memcached_t *st) { if (st->host) { st->connhost = strdup(st->host); if (st->connhost == NULL) - return (ENOMEM); + return ENOMEM; if ((strcmp("127.0.0.1", st->host) == 0) || (strcmp("localhost", st->host) == 0)) @@ -497,28 +652,48 @@ static int memcached_add_read_callback(memcached_t *st) { } else { st->connhost = strdup(MEMCACHED_DEF_HOST); if (st->connhost == NULL) - return (ENOMEM); + return ENOMEM; } } if (st->connport == NULL) { st->connport = strdup(MEMCACHED_DEF_PORT); if (st->connport == NULL) - return (ENOMEM); + return ENOMEM; } assert(st->connhost != NULL); assert(st->connport != NULL); + + st->prev.hits = NAN; + st->prev.gets = NAN; + st->prev.incr_hits = NAN; + st->prev.incr_misses = NAN; + st->prev.decr_hits = NAN; + st->prev.decr_misses = NAN; + + return 0; +} /* int memcached_set_defaults */ + +static int memcached_add_read_callback(memcached_t *st) { + char callback_name[3 * DATA_MAX_NAME_LEN]; - status = plugin_register_complex_read( + if (memcached_set_defaults(st) != 0) { + memcached_free(st); + return -1; + } + + snprintf(callback_name, sizeof(callback_name), "memcached/%s", + (st->name != NULL) ? st->name : "__legacy__"); + + return plugin_register_complex_read( /* group = */ "memcached", /* name = */ callback_name, /* callback = */ memcached_read, - /* interval = */ 0, &(user_data_t){ - .data = st, .free_func = memcached_free, - }); - - return (status); + /* interval = */ 0, + &(user_data_t){ + .data = st, .free_func = memcached_free, + }); } /* int memcached_add_read_callback */ /* Configuration handling functiions @@ -540,7 +715,7 @@ static int config_add_instance(oconfig_item_t *ci) { st = calloc(1, sizeof(*st)); if (st == NULL) { ERROR("memcached plugin: calloc failed."); - return (ENOMEM); + return ENOMEM; } st->name = NULL; @@ -549,12 +724,14 @@ static int config_add_instance(oconfig_item_t *ci) { st->connhost = NULL; st->connport = NULL; + st->fd = -1; + if (strcasecmp(ci->key, "Instance") == 0) status = cf_util_get_string(ci, &st->name); if (status != 0) { sfree(st); - return (status); + return status; } for (int i = 0; i < ci->children_num; i++) { @@ -577,19 +754,15 @@ static int config_add_instance(oconfig_item_t *ci) { break; } - if (status == 0) - status = memcached_add_read_callback(st); - if (status != 0) { memcached_free(st); - return (-1); + return -1; } - return (0); -} + return memcached_add_read_callback(st); +} /* int config_add_instance */ static int memcached_config(oconfig_item_t *ci) { - int status = 0; _Bool have_instance_block = 0; for (int i = 0; i < ci->children_num; i++) { @@ -601,7 +774,7 @@ static int memcached_config(oconfig_item_t *ci) { } else if (!have_instance_block) { /* Non-instance option: Assume legacy configuration (without * blocks) and call config_add_instance() with the block. */ - return (config_add_instance(ci)); + return config_add_instance(ci); } else WARNING("memcached plugin: The configuration option " "\"%s\" is not allowed here. Did you " @@ -610,33 +783,33 @@ static int memcached_config(oconfig_item_t *ci) { child->key); } /* for (ci->children) */ - return (status); -} + return 0; +} /* int memcached_config */ static int memcached_init(void) { memcached_t *st; int status; if (memcached_have_instances) - return (0); + return 0; /* No instances were configured, lets start a default instance. */ st = calloc(1, sizeof(*st)); if (st == NULL) - return (ENOMEM); + return ENOMEM; st->name = NULL; st->host = NULL; st->socket = NULL; st->connhost = NULL; st->connport = NULL; + st->fd = -1; + status = memcached_add_read_callback(st); if (status == 0) memcached_have_instances = 1; - else - memcached_free(st); - return (status); + return status; } /* int memcached_init */ void module_register(void) {