Merge branch 'collectd-5.4'
[collectd.git] / src / amqp.c
index 56718f0..1764129 100644 (file)
 #include <amqp.h>
 #include <amqp_framing.h>
 
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+#ifdef HAVE_AMQP_TCP_SOCKET
+#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
+/* rabbitmq-c does not currently ship amqp_socket.h
+ * and, thus, does not define this function. */
+int amqp_socket_close(amqp_socket_t *);
+#endif
+#endif
+
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
 #define CAMQP_DM_VOLATILE   1
@@ -392,8 +406,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
-    int sockfd;
     int status;
+#ifdef HAVE_AMQP_TCP_SOCKET
+    amqp_socket_t *socket;
+#else
+    int sockfd;
+#endif
 
     if (conf->connection != NULL)
         return (0);
@@ -405,6 +423,33 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (ENOMEM);
     }
 
+#ifdef HAVE_AMQP_TCP_SOCKET
+# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */
+    /* TODO: add support for SSL using amqp_ssl_socket_new
+     *       and related functions */
+    socket = amqp_tcp_socket_new (conf->connection);
+    if (! socket)
+    {
+        ERROR ("amqp plugin: amqp_tcp_socket_new failed.");
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (ENOMEM);
+    }
+
+    status = amqp_socket_open (socket, CONF(conf, host), conf->port);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status *= -1;
+        ERROR ("amqp plugin: amqp_socket_open failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (status);
+    }
+#else /* HAVE_AMQP_TCP_SOCKET */
+# define CLOSE_SOCKET() close(sockfd)
+    /* this interface is deprecated as of rabbitmq-c 0.4 */
     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
     {
@@ -417,6 +462,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (status);
     }
     amqp_set_sockfd (conf->connection, sockfd);
+#endif
 
     reply = amqp_login (conf->connection, CONF(conf, vhost),
             /* channel max = */      0,
@@ -429,7 +475,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
                 CONF(conf, vhost), CONF(conf, user));
         amqp_destroy_connection (conf->connection);
-        close (sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
@@ -442,7 +488,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_channel_open failed.");
         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
         amqp_destroy_connection (conf->connection);
-        close(sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }