X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Famqp.c;h=39575b93e68d52e2ba98e25716eac78b57d35e0f;hp=4c9ea0fd99860b94294969b3ac7cb53f62330330;hb=956a2f4996d9fb0526b6f60c29be9b17c0ad27ba;hpb=8f6aa6970bf787e6a11e095322af3338ec781d78 diff --git a/src/amqp.c b/src/amqp.c index 4c9ea0fd..39575b93 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -218,17 +218,17 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ amqp_connection_close_t *m = r.reply.decoded; char *tmp = camqp_bytes_cstring(&m->reply_text); ssnprintf(buffer, buffer_size, "Server connection error %d: %s", - m->reply_code, tmp); + m->reply_code, tmp); sfree(tmp); } else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { amqp_channel_close_t *m = r.reply.decoded; char *tmp = camqp_bytes_cstring(&m->reply_text); ssnprintf(buffer, buffer_size, "Server channel error %d: %s", - m->reply_code, tmp); + m->reply_code, tmp); sfree(tmp); } else { ssnprintf(buffer, buffer_size, "Server error method %#" PRIx32, - r.reply.id); + r.reply.id); } break; @@ -322,7 +322,8 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ qd_ret = amqp_queue_declare(conf->connection, /* channel = */ CAMQP_CHANNEL, - /* queue = */ (conf->queue != NULL) + /* queue = */ + (conf->queue != NULL) ? amqp_cstring_bytes(conf->queue) : AMQP_EMPTY_BYTES, /* passive = */ 0, @@ -353,15 +354,15 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ amqp_queue_bind_ok_t *qb_ret; assert(conf->queue != NULL); - qb_ret = - amqp_queue_bind(conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* queue = */ amqp_cstring_bytes(conf->queue), - /* exchange = */ amqp_cstring_bytes(conf->exchange), - /* routing_key = */ (conf->routing_key != NULL) - ? amqp_cstring_bytes(conf->routing_key) - : AMQP_EMPTY_BYTES, - /* arguments = */ AMQP_EMPTY_TABLE); + qb_ret = amqp_queue_bind( + conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes(conf->queue), + /* exchange = */ amqp_cstring_bytes(conf->exchange), + /* routing_key = */ + (conf->routing_key != NULL) ? amqp_cstring_bytes(conf->routing_key) + : AMQP_EMPTY_BYTES, + /* arguments = */ AMQP_EMPTY_TABLE); if ((qb_ret == NULL) && camqp_is_error(conf)) { char errbuf[1024]; ERROR("amqp plugin: amqp_queue_bind failed: %s", @@ -428,7 +429,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ #ifdef HAVE_AMQP_TCP_SOCKET #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \ - */ + */ /* TODO: add support for SSL using amqp_ssl_socket_new * and related functions */ socket = amqp_tcp_socket_new(conf->connection); @@ -749,8 +750,8 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ sstrncpy(routing_key, conf->routing_key, sizeof(routing_key)); } else { ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s", - vl->host, vl->plugin, vl->plugin_instance, vl->type, - vl->type_instance); + vl->host, vl->plugin, vl->plugin_instance, vl->type, + vl->type_instance); /* Switch slashes (the only character forbidden by collectd) and dots * (the separation character used by AMQP). */ @@ -972,11 +973,11 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ char cbname[128]; ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name); - status = - plugin_register_write(cbname, camqp_write, - &(user_data_t){ - .data = conf, .free_func = camqp_config_free, - }); + status = plugin_register_write(cbname, camqp_write, + &(user_data_t){ + .data = conf, + .free_func = camqp_config_free, + }); if (status != 0) { camqp_config_free(conf); return status;