memcached: Persistent connections with IO timeouts
authorPavel Rochnyack <pavel2000@ngs.ru>
Mon, 31 Jul 2017 04:57:30 +0000 (11:57 +0700)
committerPavel Rochnyack <pavel2000@ngs.ru>
Mon, 31 Jul 2017 07:46:49 +0000 (14:46 +0700)
src/memcached.c

index 3502e35..36b3c9a 100644 (file)
@@ -5,6 +5,7 @@
  * Copyright (C) 2009       Doug MacEachern
  * Copyright (C) 2009       Franck Lombardi
  * Copyright (C) 2012       Nicolas Szalay
+ * Copyright (C) 2017       Pavel Rochnyak
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -26,6 +27,7 @@
  *   Doug MacEachern <dougm at hyperic.com>
  *   Franck Lombardi
  *   Nicolas Szalay
+ *   Pavel Rochnyak <pavel2000 ngs.ru>
  **/
 
 #include "collectd.h"
 #include <netinet/tcp.h>
 #include <sys/un.h>
 
+#include <poll.h>
+
 #define MEMCACHED_DEF_HOST "127.0.0.1"
 #define MEMCACHED_DEF_PORT "11211"
+#define MEMCACHED_CONNECT_TIMEOUT 10000
+#define MEMCACHED_IO_TIMEOUT 5000
 
 struct memcached_s {
   char *name;
@@ -47,6 +53,7 @@ struct memcached_s {
   char *socket;
   char *connhost;
   char *connport;
+  int fd;
 };
 typedef struct memcached_s memcached_t;
 
@@ -57,6 +64,11 @@ static void memcached_free(void *arg) {
   if (st == NULL)
     return;
 
+  if (st->fd >= 0) {
+    shutdown(st->fd, SHUT_RDWR);
+    close(st->fd);
+  }
+
   sfree(st->name);
   sfree(st->host);
   sfree(st->socket);
@@ -86,7 +98,15 @@ static int memcached_connect_unix(memcached_t *st) {
   if (status != 0) {
     shutdown(fd, SHUT_RDWR);
     close(fd);
-    fd = -1;
+    return -1;
+  }
+
+  /* switch to non-blocking mode */
+  int flags = fcntl(fd, F_GETFL);
+  status = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+  if (status != 0) {
+    close(fd);
+    return -1;
   }
 
   return fd;
@@ -124,16 +144,48 @@ static int memcached_connect_inet(memcached_t *st) {
       continue;
     }
 
+    /* switch socket to non-blocking mode */
+    int flags = fcntl(fd, F_GETFL);
+    status = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+    if (status != 0) {
+      close(fd);
+      fd = -1;
+      continue;
+    }
+
     /* connect to the memcached daemon */
     status = (int)connect(fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
-    if (status != 0) {
+    if (status != 0 && errno != EINPROGRESS) {
       shutdown(fd, SHUT_RDWR);
       close(fd);
       fd = -1;
       continue;
     }
 
-    /* A socket could be opened and connecting succeeded. We're done. */
+    /* Wait until connection establishes */
+    struct pollfd pollfd;
+    pollfd.fd = fd;
+    pollfd.events = POLLOUT;
+    do
+      status = poll(&pollfd, 1, MEMCACHED_CONNECT_TIMEOUT);
+    while (status < 0 && errno == EINTR);
+    if (status <= 0) {
+      close(fd);
+      fd = -1;
+      continue;
+    }
+
+    /* Check if all is good */
+    int socket_error;
+    socklen_t socket_error_len = sizeof(socket_error);
+    status = getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&socket_error,
+                        &socket_error_len);
+    if (status != 0 || socket_error != 0) {
+      close(fd);
+      fd = -1;
+      continue;
+    }
+    /* A socket is opened and connection succeeded. We're done. */
     break;
   }
 
@@ -141,32 +193,55 @@ static int memcached_connect_inet(memcached_t *st) {
   return fd;
 } /* int memcached_connect_inet */
 
-static int memcached_connect(memcached_t *st) {
+static void memcached_connect(memcached_t *st) {
+  if (st->fd >= 0)
+    return;
+
   if (st->socket != NULL)
-    return memcached_connect_unix(st);
+    st->fd = memcached_connect_unix(st);
   else
-    return memcached_connect_inet(st);
+    st->fd = memcached_connect_inet(st);
+
+  if (st->fd >= 0)
+    INFO("memcached plugin: Instance \"%s\": connection established.",
+         st->name);
 }
 
 static int memcached_query_daemon(char *buffer, size_t buffer_size,
                                   memcached_t *st) {
-  int fd, status;
+  int status;
   size_t buffer_fill;
 
-  fd = memcached_connect(st);
-  if (fd < 0) {
+  memcached_connect(st);
+  if (st->fd < 0) {
     ERROR("memcached plugin: Instance \"%s\" could not connect to daemon.",
           st->name);
     return -1;
   }
 
-  status = (int)swrite(fd, "stats\r\n", strlen("stats\r\n"));
+  struct pollfd pollfd;
+  pollfd.fd = st->fd;
+  pollfd.events = POLLOUT;
+
+  do
+    status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT);
+  while (status < 0 && errno == EINTR);
+
+  if (status <= 0) {
+    ERROR("memcached plugin: poll() failed for write() call.");
+    close(st->fd);
+    st->fd = -1;
+    return -1;
+  }
+
+  status = (int)swrite(st->fd, "stats\r\n", strlen("stats\r\n"));
   if (status != 0) {
     char errbuf[1024];
-    ERROR("memcached plugin: write(2) failed: %s",
+    ERROR("memcached plugin: Instance \"%s\": write(2) failed: %s", st->name,
           sstrerror(errno, errbuf, sizeof(errbuf)));
-    shutdown(fd, SHUT_RDWR);
-    close(fd);
+    shutdown(st->fd, SHUT_RDWR);
+    close(st->fd);
+    st->fd = -1;
     return -1;
   }
 
@@ -174,27 +249,48 @@ static int memcached_query_daemon(char *buffer, size_t buffer_size,
   memset(buffer, 0, buffer_size);
 
   buffer_fill = 0;
-  while ((status = (int)recv(fd, buffer + buffer_fill,
-                             buffer_size - buffer_fill, /* flags = */ 0)) !=
-         0) {
+  pollfd.events = POLLIN;
+  while (1) {
+    do
+      status = poll(&pollfd, 1, MEMCACHED_IO_TIMEOUT);
+    while (status < 0 && errno == EINTR);
+
+    if (status <= 0) {
+      ERROR("memcached plugin: Instance \"%s\": Timeout reading from socket",
+            st->name);
+      close(st->fd);
+      st->fd = -1;
+      return -1;
+    }
+
+    do
+      status = (int)recv(st->fd, buffer + buffer_fill,
+                         buffer_size - buffer_fill, /* flags = */ 0);
+    while (status < 0 && errno == EINTR);
+
     char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
     if (status < 0) {
       char errbuf[1024];
 
-      if ((errno == EAGAIN) || (errno == EINTR))
+      if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
         continue;
 
-      ERROR("memcached: Error reading from socket: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
-      shutdown(fd, SHUT_RDWR);
-      close(fd);
+      ERROR("memcached plugin: Instance \"%s\": Error reading from socket: %s",
+            st->name, sstrerror(errno, errbuf, sizeof(errbuf)));
+      shutdown(st->fd, SHUT_RDWR);
+      close(st->fd);
+      st->fd = -1;
       return -1;
     }
 
     buffer_fill += (size_t)status;
     if (buffer_fill > buffer_size) {
       buffer_fill = buffer_size;
-      WARNING("memcached plugin: Message was truncated.");
+      WARNING("memcached plugin: Instance \"%s\": Message was truncated.",
+              st->name);
+      shutdown(st->fd, SHUT_RDWR);
+      close(st->fd);
+      st->fd = -1;
       break;
     }
 
@@ -206,12 +302,11 @@ static int memcached_query_daemon(char *buffer, size_t buffer_size,
 
   status = 0;
   if (buffer_fill == 0) {
-    WARNING("memcached plugin: No data returned by memcached.");
+    WARNING("memcached plugin: Instance \"%s\": No data returned by memcached.",
+            st->name);
     status = -1;
   }
 
-  shutdown(fd, SHUT_RDWR);
-  close(fd);
   return status;
 } /* int memcached_query_daemon */
 
@@ -562,6 +657,8 @@ static int config_add_instance(oconfig_item_t *ci) {
   st->connhost = NULL;
   st->connport = NULL;
 
+  st->fd = -1;
+
   if (strcasecmp(ci->key, "Instance") == 0)
     status = cf_util_get_string(ci, &st->name);
 
@@ -639,6 +736,8 @@ static int memcached_init(void) {
   st->connhost = NULL;
   st->connport = NULL;
 
+  st->fd = -1;
+
   status = memcached_add_read_callback(st);
   if (status == 0)
     memcached_have_instances = 1;