amqp plugin: Added support for rabbitmq-c 0.4.x.
[collectd.git] / src / amqp.c
index 767a877..bebaea7 100644 (file)
 #include <amqp.h>
 #include <amqp_framing.h>
 
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
+/* rabbitmq-c does not currently ship amqp_socket.h
+ * and, thus, does not define this function. */
+int amqp_socket_close(amqp_socket_t *);
+#endif
+
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
 #define CAMQP_DM_VOLATILE   1
@@ -390,8 +402,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
-    int sockfd;
     int status;
+#ifdef HAVE_AMQP_TCP_SOCKET
+    amqp_socket_t *socket;
+#else
+    int sockfd;
+#endif
 
     if (conf->connection != NULL)
         return (0);
@@ -403,6 +419,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (ENOMEM);
     }
 
+#ifdef HAVE_AMQP_TCP_SOCKET
+# define CLOSE_SOCKET() amqp_socket_close (socket)
+    /* TODO: add support for SSL using amqp_ssl_socket_new
+     *       and related functions */
+    socket = amqp_tcp_socket_new (conf->connection);
+    if (! socket)
+    {
+        ERROR ("amqp plugin: amqp_tcp_socket_new failed.");
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (ENOMEM);
+    }
+
+    status = amqp_socket_open (socket, CONF(conf, host), conf->port);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status *= -1;
+        ERROR ("amqp plugin: amqp_socket_open failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        CLOSE_SOCKET ();
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (status);
+    }
+#else /* HAVE_AMQP_TCP_SOCKET */
+# define CLOSE_SOCKET close(sockfd)
+    /* this interface is deprecated as of rabbitmq-c 0.4 */
     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
     {
@@ -415,6 +459,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (status);
     }
     amqp_set_sockfd (conf->connection, sockfd);
+#endif
 
     reply = amqp_login (conf->connection, CONF(conf, vhost),
             /* channel max = */      0,
@@ -427,7 +472,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
                 CONF(conf, vhost), CONF(conf, user));
         amqp_destroy_connection (conf->connection);
-        close (sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
@@ -440,7 +485,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_channel_open failed.");
         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
         amqp_destroy_connection (conf->connection);
-        close(sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }