X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Famqp.c;h=bebaea7c8e7eb4bdbbcff93684321a851fc94cbd;hp=767a8776bbfb07ce2a26f932cf7fb5da4d22d047;hb=1a477ecb462094ad9e13320a9234716ead038b9c;hpb=30aaad03b66ff22d8cc50de2116b2d93a9d445f2 diff --git a/src/amqp.c b/src/amqp.c index 767a8776..bebaea7c 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -38,6 +38,18 @@ #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 @@ -390,8 +402,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); @@ -403,6 +419,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() amqp_socket_close (socket) + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + CLOSE_SOCKET (); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -415,6 +459,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -427,7 +472,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -440,7 +485,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); }