X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=97359cfd394facc2fa81669385c3bedecd019d98;hb=09147a2c9dde2e04e660409e7a5ac0eb09604b07;hp=bdc62b3bf59f5e5080fee7068a32a47da48349ec;hpb=082be7a56f25e0561be6ce87e5662e01b1e91975;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index bdc62b3b..97359cfd 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -23,7 +23,7 @@ * * Authors: * Sebastien Pahl - * Florian Forster + * Florian Forster **/ #include "collectd.h" @@ -38,6 +38,20 @@ #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_TCP_SOCKET +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 @@ -66,6 +80,9 @@ struct camqp_config_s char *exchange; char *routing_key; + /* Number of seconds to wait before connection is retried */ + int connection_retry_delay; + /* publish only */ uint8_t delivery_mode; _Bool store_rates; @@ -287,6 +304,10 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ /* type = */ amqp_cstring_bytes (conf->exchange_type), /* passive = */ 0, /* durable = */ 0, +#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000 + /* auto delete = */ 0, + /* internal = */ 0, +#endif /* arguments = */ argument_table); if ((ed_ret == NULL) && camqp_is_error (conf)) { @@ -391,13 +412,32 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { + static time_t last_connect_time = 0; + amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); + time_t now = time(NULL); + if (now < (last_connect_time + conf->connection_retry_delay)) + { + DEBUG("amqp plugin: skipping connection retry, " + "ConnectionRetryDelay: %d", conf->connection_retry_delay); + return(1); + } + else + { + DEBUG ("amqp plugin: retrying connection"); + last_connect_time = now; + } + conf->connection = amqp_new_connection (); if (conf->connection == NULL) { @@ -405,6 +445,33 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */ + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET() close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -417,6 +484,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -429,7 +497,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -442,7 +510,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -876,6 +944,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->password = NULL; conf->exchange = NULL; conf->routing_key = NULL; + conf->connection_retry_delay = 0; + /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; @@ -928,9 +998,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); - else if (strcasecmp ("QueueDurable", child->key) == 0) + else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish) status = cf_util_get_boolean (child, &conf->queue_durable); - else if (strcasecmp ("QueueAutoDelete", child->key) == 0) + else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish) status = cf_util_get_boolean (child, &conf->queue_auto_delete); else if (strcasecmp ("RoutingKey", child->key) == 0) status = cf_util_get_string (child, &conf->routing_key); @@ -971,6 +1041,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->escape_char = tmp_buff[0]; sfree (tmp_buff); } + else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0) + status = cf_util_get_int (child, &conf->connection_retry_delay); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key);