From 1a477ecb462094ad9e13320a9234716ead038b9c Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 23 Nov 2013 15:54:26 +0100 Subject: [PATCH] amqp plugin: Added support for rabbitmq-c 0.4.x. Upstream introduced a new socket interface and deprecated the old one. This leads to compiler errors when using GCC and -Werror. --- configure.in | 32 ++++++++++++++++++++++++++++++++ src/amqp.c | 51 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/configure.in b/configure.in index ff791d6c..81b26116 100644 --- a/configure.in +++ b/configure.in @@ -3528,6 +3528,38 @@ fi CPPFLAGS="$SAVE_CPPFLAGS" LDFLAGS="$SAVE_LDFLAGS" AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") + +with_amqp_tcp_socket="no" +if test "x$with_librabbitmq" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags" + LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags -lrabbitmq" + + AC_CHECK_HEADERS(amqp_tcp_socket.h amqp_socket.h) + AC_CHECK_FUNC(amqp_tcp_socket_new, [with_amqp_tcp_socket="yes"], [with_amqp_tcp_socket="no"]) + if test "x$with_amqp_tcp_socket" = "xyes" + then + AC_DEFINE(HAVE_AMQP_TCP_SOCKET, 1, + [Define if librabbitmq provides the new TCP socket interface.]) + fi + + AC_CHECK_DECLS(amqp_socket_close, + [amqp_socket_close_decl="yes"], [amqp_socket_close_decl="no"], + [[ +#include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif + ]]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi # }}} # --with-librouteros {{{ diff --git a/src/amqp.c b/src/amqp.c index 767a8776..bebaea7c 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -38,6 +38,18 @@ #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 @@ -390,8 +402,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); @@ -403,6 +419,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() amqp_socket_close (socket) + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + CLOSE_SOCKET (); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -415,6 +459,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -427,7 +472,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -440,7 +485,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } -- 2.11.0