X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=1764129faf4a57fdc622062156d28ffb322a4bef;hb=bc014af07ee3ea6231c629cc72099f668788a343;hp=bebaea7c8e7eb4bdbbcff93684321a851fc94cbd;hpb=1a477ecb462094ad9e13320a9234716ead038b9c;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index bebaea7c..1764129f 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -23,7 +23,7 @@ * * Authors: * Sebastien Pahl - * Florian Forster + * Florian Forster **/ #include "collectd.h" @@ -44,11 +44,13 @@ #ifdef HAVE_AMQP_SOCKET_H # include #endif +#ifdef HAVE_AMQP_TCP_SOCKET #if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE /* rabbitmq-c does not currently ship amqp_socket.h * and, thus, does not define this function. */ int amqp_socket_close(amqp_socket_t *); #endif +#endif /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ @@ -91,6 +93,8 @@ struct camqp_config_s /* subscribe only */ char *exchange_type; char *queue; + _Bool queue_durable; + _Bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -326,9 +330,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ ? amqp_cstring_bytes (conf->queue) : AMQP_EMPTY_BYTES, /* passive = */ 0, - /* durable = */ 0, + /* durable = */ conf->queue_durable, /* exclusive = */ 0, - /* auto_delete = */ 1, + /* auto_delete = */ conf->queue_auto_delete, /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { @@ -420,7 +424,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ } #ifdef HAVE_AMQP_TCP_SOCKET -# define CLOSE_SOCKET() amqp_socket_close (socket) +# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */ /* TODO: add support for SSL using amqp_ssl_socket_new * and related functions */ socket = amqp_tcp_socket_new (conf->connection); @@ -439,13 +443,12 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ status *= -1; ERROR ("amqp plugin: amqp_socket_open failed: %s", sstrerror (status, errbuf, sizeof (errbuf))); - CLOSE_SOCKET (); amqp_destroy_connection (conf->connection); conf->connection = NULL; return (status); } #else /* HAVE_AMQP_TCP_SOCKET */ -# define CLOSE_SOCKET close(sockfd) +# define CLOSE_SOCKET() close(sockfd) /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) @@ -786,7 +789,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { camqp_config_t *conf = user_data->data; char routing_key[6 * DATA_MAX_NAME_LEN]; - char buffer[4096]; + char buffer[8192]; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) @@ -922,6 +925,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -929,6 +933,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; + conf->queue_durable = 0; + conf->queue_auto_delete = 1; /* general */ conf->connection = NULL; pthread_mutex_init (&conf->lock, /* attr = */ NULL); @@ -968,6 +974,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); + else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_durable); + else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_auto_delete); else if (strcasecmp ("RoutingKey", child->key) == 0) status = cf_util_get_string (child, &conf->routing_key); else if ((strcasecmp ("Persistent", child->key) == 0) && publish) @@ -987,6 +997,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)