X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=2077d57b3d5e870caa6e089456e646810c1e5ce7;hb=39049b56158161b4f9eeacdad8918bcf0f1f7e90;hp=6d65bf838d274fc434c4196420aedb582eb5200c;hpb=4e89060ceb1a14ec7f9abfe9caa6b0da7e76bd5c;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 6d65bf83..2077d57b 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -28,11 +28,11 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_cmd_putval.h" -#include "utils_format_graphite.h" -#include "utils_format_json.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" #include #include @@ -111,8 +111,8 @@ static const char *def_user = "guest"; static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; -static pthread_t *subscriber_threads = NULL; -static size_t subscriber_threads_num = 0; +static pthread_t *subscriber_threads; +static size_t subscriber_threads_num; static bool subscriber_threads_running = true; #define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f) @@ -182,9 +182,9 @@ static bool camqp_is_error(camqp_config_t *conf) /* {{{ */ r = amqp_get_rpc_reply(conf->connection); if (r.reply_type == AMQP_RESPONSE_NORMAL) - return 0; + return false; - return 1; + return true; } /* }}} bool camqp_is_error */ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ @@ -217,23 +217,23 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { amqp_connection_close_t *m = r.reply.decoded; char *tmp = camqp_bytes_cstring(&m->reply_text); - snprintf(buffer, buffer_size, "Server connection error %d: %s", - m->reply_code, tmp); + ssnprintf(buffer, buffer_size, "Server connection error %d: %s", + m->reply_code, tmp); sfree(tmp); } else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { amqp_channel_close_t *m = r.reply.decoded; char *tmp = camqp_bytes_cstring(&m->reply_text); - snprintf(buffer, buffer_size, "Server channel error %d: %s", - m->reply_code, tmp); + ssnprintf(buffer, buffer_size, "Server channel error %d: %s", + m->reply_code, tmp); sfree(tmp); } else { - snprintf(buffer, buffer_size, "Server error method %#" PRIx32, - r.reply.id); + ssnprintf(buffer, buffer_size, "Server error method %#" PRIx32, + r.reply.id); } break; default: - snprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type); + ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type); } return buffer; @@ -320,16 +320,17 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ amqp_queue_declare_ok_t *qd_ret; amqp_basic_consume_ok_t *cm_ret; - qd_ret = amqp_queue_declare(conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* queue = */ (conf->queue != NULL) - ? amqp_cstring_bytes(conf->queue) - : AMQP_EMPTY_BYTES, - /* passive = */ 0, - /* durable = */ conf->queue_durable, - /* exclusive = */ 0, - /* auto_delete = */ conf->queue_auto_delete, - /* arguments = */ AMQP_EMPTY_TABLE); + qd_ret = + amqp_queue_declare(conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ + (conf->queue != NULL) ? amqp_cstring_bytes(conf->queue) + : AMQP_EMPTY_BYTES, + /* passive = */ 0, + /* durable = */ conf->queue_durable, + /* exclusive = */ 0, + /* auto_delete = */ conf->queue_auto_delete, + /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { ERROR("amqp plugin: amqp_queue_declare failed."); camqp_close_connection(conf); @@ -353,15 +354,15 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ amqp_queue_bind_ok_t *qb_ret; assert(conf->queue != NULL); - qb_ret = - amqp_queue_bind(conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* queue = */ amqp_cstring_bytes(conf->queue), - /* exchange = */ amqp_cstring_bytes(conf->exchange), - /* routing_key = */ (conf->routing_key != NULL) - ? amqp_cstring_bytes(conf->routing_key) - : AMQP_EMPTY_BYTES, - /* arguments = */ AMQP_EMPTY_TABLE); + qb_ret = amqp_queue_bind( + conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes(conf->queue), + /* exchange = */ amqp_cstring_bytes(conf->exchange), + /* routing_key = */ + (conf->routing_key != NULL) ? amqp_cstring_bytes(conf->routing_key) + : AMQP_EMPTY_BYTES, + /* arguments = */ AMQP_EMPTY_TABLE); if ((qb_ret == NULL) && camqp_is_error(conf)) { char errbuf[1024]; ERROR("amqp plugin: amqp_queue_bind failed: %s", @@ -396,7 +397,7 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ static int camqp_connect(camqp_config_t *conf) /* {{{ */ { - static time_t last_connect_time = 0; + static time_t last_connect_time; amqp_rpc_reply_t reply; int status; @@ -428,7 +429,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ #ifdef HAVE_AMQP_TCP_SOCKET #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \ - */ + */ /* TODO: add support for SSL using amqp_ssl_socket_new * and related functions */ socket = amqp_tcp_socket_new(conf->connection); @@ -748,9 +749,9 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (conf->routing_key != NULL) { sstrncpy(routing_key, conf->routing_key, sizeof(routing_key)); } else { - snprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s", - vl->host, vl->plugin, vl->plugin_instance, vl->type, - vl->type_instance); + ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s", + vl->host, vl->plugin, vl->plugin_instance, vl->type, + vl->type_instance); /* Switch slashes (the only character forbidden by collectd) and dots * (the separation character used by AMQP). */ @@ -850,7 +851,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; - conf->store_rates = 0; + conf->store_rates = false; conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; @@ -859,8 +860,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; - conf->queue_durable = 0; - conf->queue_auto_delete = 1; + conf->queue_durable = false; + conf->queue_auto_delete = true; /* general */ conf->connection = NULL; pthread_mutex_init(&conf->lock, /* attr = */ NULL); @@ -970,13 +971,13 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ if (publish) { char cbname[128]; - snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name); + ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name); - status = - plugin_register_write(cbname, camqp_write, - &(user_data_t){ - .data = conf, .free_func = camqp_config_free, - }); + status = plugin_register_write(cbname, camqp_write, + &(user_data_t){ + .data = conf, + .free_func = camqp_config_free, + }); if (status != 0) { camqp_config_free(conf); return status; @@ -998,9 +999,9 @@ static int camqp_config(oconfig_item_t *ci) /* {{{ */ oconfig_item_t *child = ci->children + i; if (strcasecmp("Publish", child->key) == 0) - camqp_config_connection(child, /* publish = */ 1); + camqp_config_connection(child, /* publish = */ true); else if (strcasecmp("Subscribe", child->key) == 0) - camqp_config_connection(child, /* publish = */ 0); + camqp_config_connection(child, /* publish = */ false); else WARNING("amqp plugin: Ignoring unknown config option \"%s\".", child->key);