X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fmemcached.c;h=36b3c9a97c15831abf11ee9dd520f81c995750b0;hb=1d1e565400b7c770db22e25ad97c06ceab0170ad;hp=02215e1cc3bae0bedc92ceab6d41a025f66d817e;hpb=2079ee1517e34de372f58e7e2267ad5c71a8a41f;p=collectd.git diff --git a/src/memcached.c b/src/memcached.c index 02215e1c..36b3c9a9 100644 --- a/src/memcached.c +++ b/src/memcached.c @@ -5,6 +5,7 @@ * Copyright (C) 2009 Doug MacEachern * Copyright (C) 2009 Franck Lombardi * Copyright (C) 2012 Nicolas Szalay + * Copyright (C) 2017 Pavel Rochnyak * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -26,6 +27,7 @@ * Doug MacEachern * Franck Lombardi * Nicolas Szalay + * Pavel Rochnyak **/ #include "collectd.h" @@ -38,14 +40,20 @@ #include #include +#include + #define MEMCACHED_DEF_HOST "127.0.0.1" #define MEMCACHED_DEF_PORT "11211" +#define MEMCACHED_CONNECT_TIMEOUT 10000 +#define MEMCACHED_IO_TIMEOUT 5000 struct memcached_s { char *name; - char *socket; char *host; - char *port; + char *socket; + char *connhost; + char *connport; + int fd; }; typedef struct memcached_s memcached_t; @@ -56,10 +64,16 @@ static void memcached_free(void *arg) { if (st == NULL) return; + if (st->fd >= 0) { + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + } + sfree(st->name); - sfree(st->socket); sfree(st->host); - sfree(st->port); + sfree(st->socket); + sfree(st->connhost); + sfree(st->connport); sfree(st); } @@ -76,7 +90,7 @@ static int memcached_connect_unix(memcached_t *st) { char errbuf[1024]; ERROR("memcached plugin: memcached_connect_unix: socket(2) failed: %s", sstrerror(errno, errbuf, sizeof(errbuf))); - return (-1); + return -1; } /* connect to the memcached daemon */ @@ -84,36 +98,38 @@ static int memcached_connect_unix(memcached_t *st) { if (status != 0) { shutdown(fd, SHUT_RDWR); close(fd); - fd = -1; + return -1; } - return (fd); + /* switch to non-blocking mode */ + int flags = fcntl(fd, F_GETFL); + status = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (status != 0) { + close(fd); + return -1; + } + + return fd; } /* int memcached_connect_unix */ static int memcached_connect_inet(memcached_t *st) { - const char *host; - const char *port; - struct addrinfo *ai_list; int status; int fd = -1; - host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST; - port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT; - struct addrinfo ai_hints = {.ai_family = AF_UNSPEC, .ai_flags = AI_ADDRCONFIG, .ai_socktype = SOCK_STREAM}; - status = getaddrinfo(host, port, &ai_hints, &ai_list); + status = getaddrinfo(st->connhost, st->connport, &ai_hints, &ai_list); if (status != 0) { char errbuf[1024]; ERROR("memcached plugin: memcached_connect_inet: " "getaddrinfo(%s,%s) failed: %s", - host, port, + st->connhost, st->connport, (status == EAI_SYSTEM) ? sstrerror(errno, errbuf, sizeof(errbuf)) : gai_strerror(status)); - return (-1); + return -1; } for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; @@ -128,77 +144,153 @@ static int memcached_connect_inet(memcached_t *st) { continue; } + /* switch socket to non-blocking mode */ + int flags = fcntl(fd, F_GETFL); + status = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (status != 0) { + close(fd); + fd = -1; + continue; + } + /* connect to the memcached daemon */ status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); - if (status != 0) { + if (status != 0 && errno != EINPROGRESS) { shutdown(fd, SHUT_RDWR); close(fd); fd = -1; continue; } - /* A socket could be opened and connecting succeeded. We're done. */ + /* Wait until connection establishes */ + struct pollfd pollfd; + pollfd.fd = fd; + pollfd.events = POLLOUT; + do + status = poll(&pollfd, 1, MEMCACHED_CONNECT_TIMEOUT); + while (status < 0 && errno == EINTR); + if (status <= 0) { + close(fd); + fd = -1; + continue; + } + + /* Check if all is good */ + int socket_error; + socklen_t socket_error_len = sizeof(socket_error); + status = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&socket_error, + &socket_error_len); + if (status != 0 || socket_error != 0) { + close(fd); + fd = -1; + continue; + } + /* A socket is opened and connection succeeded. We're done. */ break; } freeaddrinfo(ai_list); - return (fd); + return fd; } /* int memcached_connect_inet */ -static int memcached_connect(memcached_t *st) { +static void memcached_connect(memcached_t *st) { + if (st->fd >= 0) + return; + if (st->socket != NULL) - return (memcached_connect_unix(st)); + st->fd = memcached_connect_unix(st); else - return (memcached_connect_inet(st)); + st->fd = memcached_connect_inet(st); + + if (st->fd >= 0) + INFO("memcached plugin: Instance \"%s\": connection established.", + st->name); } static int memcached_query_daemon(char *buffer, size_t buffer_size, memcached_t *st) { - int fd, status; + int status; size_t buffer_fill; - fd = memcached_connect(st); - if (fd < 0) { + memcached_connect(st); + if (st->fd < 0) { ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.", st->name); return -1; } - status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n")); + struct pollfd pollfd; + pollfd.fd = st->fd; + pollfd.events = POLLOUT; + + do + status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT); + while (status < 0 && errno == EINTR); + + if (status <= 0) { + ERROR("memcached plugin: poll() failed for write() call."); + close(st->fd); + st->fd = -1; + return -1; + } + + status = (int)swrite(st->fd, "stats\r\n", strlen("stats\r\n")); if (status != 0) { char errbuf[1024]; - ERROR("memcached plugin: write(2) failed: %s", + ERROR("memcached plugin: Instance \"%s\": write(2) failed: %s", st->name, sstrerror(errno, errbuf, sizeof(errbuf))); - shutdown(fd, SHUT_RDWR); - close(fd); - return (-1); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; + return -1; } /* receive data from the memcached daemon */ memset(buffer, 0, buffer_size); buffer_fill = 0; - while ((status = (int)recv(fd, buffer + buffer_fill, - buffer_size - buffer_fill, /* flags = */ 0)) != - 0) { + pollfd.events = POLLIN; + while (1) { + do + status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT); + while (status < 0 && errno == EINTR); + + if (status <= 0) { + ERROR("memcached plugin: Instance \"%s\": Timeout reading from socket", + st->name); + close(st->fd); + st->fd = -1; + return -1; + } + + do + status = (int)recv(st->fd, buffer + buffer_fill, + buffer_size - buffer_fill, /* flags = */ 0); + while (status < 0 && errno == EINTR); + char const end_token[5] = {'E', 'N', 'D', '\r', '\n'}; if (status < 0) { char errbuf[1024]; - if ((errno == EAGAIN) || (errno == EINTR)) + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) continue; - ERROR("memcached: Error reading from socket: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - shutdown(fd, SHUT_RDWR); - close(fd); - return (-1); + ERROR("memcached plugin: Instance \"%s\": Error reading from socket: %s", + st->name, sstrerror(errno, errbuf, sizeof(errbuf))); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; + return -1; } buffer_fill += (size_t)status; if (buffer_fill > buffer_size) { buffer_fill = buffer_size; - WARNING("memcached plugin: Message was truncated."); + WARNING("memcached plugin: Instance \"%s\": Message was truncated.", + st->name); + shutdown(st->fd, SHUT_RDWR); + close(st->fd); + st->fd = -1; break; } @@ -210,43 +302,28 @@ static int memcached_query_daemon(char *buffer, size_t buffer_size, status = 0; if (buffer_fill == 0) { - WARNING("memcached plugin: No data returned by memcached."); + WARNING("memcached plugin: Instance \"%s\": No data returned by memcached.", + st->name); status = -1; } - shutdown(fd, SHUT_RDWR); - close(fd); - return (status); + return status; } /* int memcached_query_daemon */ static void memcached_init_vl(value_list_t *vl, memcached_t const *st) { - char const *host = st->host; - - /* Set vl->host to hostname_g, if: - * - Legacy mode is used. - * - "Socket" option is given (doc: "Host option is ignored"). - * - "Host" option is not provided. - * - "Host" option is set to "localhost" or "127.0.0.1". */ - if ((strcmp(st->name, "__legacy__") == 0) || (st->socket != NULL) || - (st->host == NULL) || (strcmp("127.0.0.1", st->host) == 0) || - (strcmp("localhost", st->host) == 0)) - host = hostname_g; - sstrncpy(vl->plugin, "memcached", sizeof(vl->plugin)); - sstrncpy(vl->host, host, sizeof(vl->host)); - if (strcmp(st->name, "__legacy__") != 0) + if (st->host != NULL) + sstrncpy(vl->host, st->host, sizeof(vl->host)); + if (st->name != NULL) sstrncpy(vl->plugin_instance, st->name, sizeof(vl->plugin_instance)); } static void submit_derive(const char *type, const char *type_inst, derive_t value, memcached_t *st) { - value_t values[1]; value_list_t vl = VALUE_LIST_INIT; - memcached_init_vl(&vl, st); - - values[0].derive = value; - vl.values = values; + memcached_init_vl(&vl, st); + vl.values = &(value_t){.derive = value}; vl.values_len = 1; sstrncpy(vl.type, type, sizeof(vl.type)); if (type_inst != NULL) @@ -257,15 +334,14 @@ static void submit_derive(const char *type, const char *type_inst, static void submit_derive2(const char *type, const char *type_inst, derive_t value0, derive_t value1, memcached_t *st) { - value_t values[2]; value_list_t vl = VALUE_LIST_INIT; - memcached_init_vl(&vl, st); - - values[0].derive = value0; - values[1].derive = value1; + value_t values[] = { + {.derive = value0}, {.derive = value1}, + }; + memcached_init_vl(&vl, st); vl.values = values; - vl.values_len = 2; + vl.values_len = STATIC_ARRAY_SIZE(values); sstrncpy(vl.type, type, sizeof(vl.type)); if (type_inst != NULL) sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance)); @@ -275,13 +351,10 @@ static void submit_derive2(const char *type, const char *type_inst, static void submit_gauge(const char *type, const char *type_inst, gauge_t value, memcached_t *st) { - value_t values[1]; value_list_t vl = VALUE_LIST_INIT; - memcached_init_vl(&vl, st); - values[0].gauge = value; - - vl.values = values; + memcached_init_vl(&vl, st); + vl.values = &(value_t){.gauge = value}; vl.values_len = 1; sstrncpy(vl.type, type, sizeof(vl.type)); if (type_inst != NULL) @@ -292,15 +365,14 @@ static void submit_gauge(const char *type, const char *type_inst, gauge_t value, static void submit_gauge2(const char *type, const char *type_inst, gauge_t value0, gauge_t value1, memcached_t *st) { - value_t values[2]; value_list_t vl = VALUE_LIST_INIT; - memcached_init_vl(&vl, st); - - values[0].gauge = value0; - values[1].gauge = value1; + value_t values[] = { + {.gauge = value0}, {.gauge = value1}, + }; + memcached_init_vl(&vl, st); vl.values = values; - vl.values_len = 2; + vl.values_len = STATIC_ARRAY_SIZE(values); sstrncpy(vl.type, type, sizeof(vl.type)); if (type_inst != NULL) sstrncpy(vl.type_instance, type_inst, sizeof(vl.type_instance)); @@ -435,8 +507,10 @@ static int memcached_read(user_data_t *user_data) { } /* - * Operations on the cache, i. e. cache hits, cache misses and evictions of - * items + * Operations on the cache: + * - get hits/misses + * - delete hits/misses + * - evictions */ else if (FIELD_IS("get_hits")) { submit_derive("memcached_ops", "hits", atoll(fields[2]), st); @@ -445,6 +519,10 @@ static int memcached_read(user_data_t *user_data) { submit_derive("memcached_ops", "misses", atoll(fields[2]), st); } else if (FIELD_IS("evictions")) { submit_derive("memcached_ops", "evictions", atoll(fields[2]), st); + } else if (FIELD_IS("delete_hits")) { + submit_derive("memcached_ops", "delete_hits", atoll(fields[2]), st); + } else if (FIELD_IS("delete_misses")) { + submit_derive("memcached_ops", "delete_misses", atoll(fields[2]), st); } /* @@ -490,27 +568,72 @@ static int memcached_read(user_data_t *user_data) { return 0; } /* int memcached_read */ +static int memcached_set_defaults(memcached_t *st) { + /* If no
used then: + * - Connect to the destination specified by , if present. + * If not, use the default address. + * - Use the default hostname (set st->host to NULL), if + * - Legacy mode is used (no configuration options at all), or + * - "Host" option is not provided, or + * - "Host" option is set to "localhost" or "127.0.0.1". + * + * If
used then host may be set to "localhost" or "127.0.0.1" + * explicitly. + */ + if (st->connhost == NULL) { + if (st->host) { + st->connhost = strdup(st->host); + if (st->connhost == NULL) + return ENOMEM; + + if ((strcmp("127.0.0.1", st->host) == 0) || + (strcmp("localhost", st->host) == 0)) + sfree(st->host); + } else { + st->connhost = strdup(MEMCACHED_DEF_HOST); + if (st->connhost == NULL) + return ENOMEM; + } + } + + if (st->connport == NULL) { + st->connport = strdup(MEMCACHED_DEF_PORT); + if (st->connport == NULL) + return ENOMEM; + } + + assert(st->connhost != NULL); + assert(st->connport != NULL); + + return 0; +} /* int memcached_set_defaults */ + static int memcached_add_read_callback(memcached_t *st) { char callback_name[3 * DATA_MAX_NAME_LEN]; - int status; - assert(st->name != NULL); - ssnprintf(callback_name, sizeof(callback_name), "memcached/%s", st->name); - - user_data_t ud = {.data = st, .free_func = memcached_free}; + if (memcached_set_defaults(st) != 0) { + memcached_free(st); + return -1; + } - status = plugin_register_complex_read(/* group = */ "memcached", - /* name = */ callback_name, - /* callback = */ memcached_read, - /* interval = */ 0, - /* user_data = */ &ud); - return (status); + snprintf(callback_name, sizeof(callback_name), "memcached/%s", + (st->name != NULL) ? st->name : "__legacy__"); + + return plugin_register_complex_read( + /* group = */ "memcached", + /* name = */ callback_name, + /* callback = */ memcached_read, + /* interval = */ 0, + &(user_data_t){ + .data = st, .free_func = memcached_free, + }); } /* int memcached_add_read_callback */ /* Configuration handling functiions * * * Host foo.zomg.com + * Address 1.2.3.4 * Port "1234" * * @@ -525,23 +648,24 @@ static int config_add_instance(oconfig_item_t *ci) { st = calloc(1, sizeof(*st)); if (st == NULL) { ERROR("memcached plugin: calloc failed."); - return (-1); + return ENOMEM; } st->name = NULL; - st->socket = NULL; st->host = NULL; - st->port = NULL; + st->socket = NULL; + st->connhost = NULL; + st->connport = NULL; - if (strcasecmp(ci->key, "Plugin") == 0) /* default instance */ - st->name = sstrdup("__legacy__"); - else /* block */ + st->fd = -1; + + if (strcasecmp(ci->key, "Instance") == 0) status = cf_util_get_string(ci, &st->name); + if (status != 0) { sfree(st); - return (status); + return status; } - assert(st->name != NULL); for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -550,8 +674,10 @@ static int config_add_instance(oconfig_item_t *ci) { status = cf_util_get_string(child, &st->socket); else if (strcasecmp("Host", child->key) == 0) status = cf_util_get_string(child, &st->host); + else if (strcasecmp("Address", child->key) == 0) + status = cf_util_get_string(child, &st->connhost); else if (strcasecmp("Port", child->key) == 0) - status = cf_util_get_service(child, &st->port); + status = cf_util_get_service(child, &st->connport); else { WARNING("memcached plugin: Option `%s' not allowed here.", child->key); status = -1; @@ -561,19 +687,15 @@ static int config_add_instance(oconfig_item_t *ci) { break; } - if (status == 0) - status = memcached_add_read_callback(st); - if (status != 0) { memcached_free(st); - return (-1); + return -1; } - return (0); -} + return memcached_add_read_callback(st); +} /* int config_add_instance */ static int memcached_config(oconfig_item_t *ci) { - int status = 0; _Bool have_instance_block = 0; for (int i = 0; i < ci->children_num; i++) { @@ -585,7 +707,7 @@ static int memcached_config(oconfig_item_t *ci) { } else if (!have_instance_block) { /* Non-instance option: Assume legacy configuration (without * blocks) and call config_add_instance() with the block. */ - return (config_add_instance(ci)); + return config_add_instance(ci); } else WARNING("memcached plugin: The configuration option " "\"%s\" is not allowed here. Did you " @@ -594,32 +716,33 @@ static int memcached_config(oconfig_item_t *ci) { child->key); } /* for (ci->children) */ - return (status); -} + return 0; +} /* int memcached_config */ static int memcached_init(void) { memcached_t *st; int status; if (memcached_have_instances) - return (0); + return 0; /* No instances were configured, lets start a default instance. */ st = calloc(1, sizeof(*st)); if (st == NULL) - return (ENOMEM); - st->name = sstrdup("__legacy__"); - st->socket = NULL; + return ENOMEM; + st->name = NULL; st->host = NULL; - st->port = NULL; + st->socket = NULL; + st->connhost = NULL; + st->connport = NULL; + + st->fd = -1; status = memcached_add_read_callback(st); if (status == 0) memcached_have_instances = 1; - else - memcached_free(st); - return (status); + return status; } /* int memcached_init */ void module_register(void) {