X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=1e23a563daccc430e3c6cda5331e5e3f9d2b9406;hb=f8379dd45f4a43595f4027992696ee8d02908bff;hp=bebaea7c8e7eb4bdbbcff93684321a851fc94cbd;hpb=1a477ecb462094ad9e13320a9234716ead038b9c;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index bebaea7c..1e23a563 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -44,11 +44,13 @@ #ifdef HAVE_AMQP_SOCKET_H # include #endif +#ifdef HAVE_AMQP_TCP_SOCKET #if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE /* rabbitmq-c does not currently ship amqp_socket.h * and, thus, does not define this function. */ int amqp_socket_close(amqp_socket_t *); #endif +#endif /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ @@ -192,11 +194,11 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - sstrncpy (buffer, "Success", sizeof (buffer)); + sstrncpy (buffer, "Success", buffer_size); break; case AMQP_RESPONSE_NONE: - sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + sstrncpy (buffer, "Missing RPC reply type", buffer_size); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: @@ -208,7 +210,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (sstrerror (r.library_error, buffer, buffer_size)); #endif else - sstrncpy (buffer, "End of stream", sizeof (buffer)); + sstrncpy (buffer, "End of stream", buffer_size); break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -297,6 +299,10 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ /* type = */ amqp_cstring_bytes (conf->exchange_type), /* passive = */ 0, /* durable = */ 0, +#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000 + /* auto delete = */ 0, + /* internal = */ 0, +#endif /* arguments = */ argument_table); if ((ed_ret == NULL) && camqp_is_error (conf)) { @@ -420,7 +426,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ } #ifdef HAVE_AMQP_TCP_SOCKET -# define CLOSE_SOCKET() amqp_socket_close (socket) +# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */ /* TODO: add support for SSL using amqp_ssl_socket_new * and related functions */ socket = amqp_tcp_socket_new (conf->connection); @@ -439,13 +445,12 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ status *= -1; ERROR ("amqp plugin: amqp_socket_open failed: %s", sstrerror (status, errbuf, sizeof (errbuf))); - CLOSE_SOCKET (); amqp_destroy_connection (conf->connection); conf->connection = NULL; return (status); } #else /* HAVE_AMQP_TCP_SOCKET */ -# define CLOSE_SOCKET close(sockfd) +# define CLOSE_SOCKET() close(sockfd) /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) @@ -691,7 +696,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ continue; } - status = camqp_read_header (conf); + camqp_read_header (conf); amqp_maybe_release_buffers (conf->connection); } /* while (subscriber_threads_running) */ @@ -922,6 +927,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -987,6 +993,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)