X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Famqp.c;h=281130b142bd66f6f8664be5c02f5f3c92f0013f;hp=467b7ff46041f76d618c0308eee50ce77cf6353e;hb=06a86a60a7dabc685bdbd81ce3d36ea5f7e2c2d4;hpb=85d892df2794d992c8e3554d8990d1306f114c11 diff --git a/src/amqp.c b/src/amqp.c index 467b7ff4..281130b1 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -66,7 +66,7 @@ int amqp_socket_close(amqp_socket_t *); * Data types */ struct camqp_config_s { - _Bool publish; + bool publish; char *name; char *host; @@ -83,7 +83,7 @@ struct camqp_config_s { /* publish only */ uint8_t delivery_mode; - _Bool store_rates; + bool store_rates; int format; /* publish & graphite format only */ char *prefix; @@ -94,8 +94,8 @@ struct camqp_config_s { /* subscribe only */ char *exchange_type; char *queue; - _Bool queue_durable; - _Bool queue_auto_delete; + bool queue_durable; + bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -111,9 +111,9 @@ static const char *def_user = "guest"; static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; -static pthread_t *subscriber_threads = NULL; -static size_t subscriber_threads_num = 0; -static _Bool subscriber_threads_running = 1; +static pthread_t *subscriber_threads; +static size_t subscriber_threads_num; +static bool subscriber_threads_running = true; #define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f) @@ -176,16 +176,16 @@ static char *camqp_bytes_cstring(amqp_bytes_t *in) /* {{{ */ return ret; } /* }}} char *camqp_bytes_cstring */ -static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */ +static bool camqp_is_error(camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t r; r = amqp_get_rpc_reply(conf->connection); if (r.reply_type == AMQP_RESPONSE_NORMAL) - return 0; + return false; - return 1; -} /* }}} _Bool camqp_is_error */ + return true; +} /* }}} bool camqp_is_error */ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ char *buffer, size_t buffer_size) { @@ -396,7 +396,7 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ static int camqp_connect(camqp_config_t *conf) /* {{{ */ { - static time_t last_connect_time = 0; + static time_t last_connect_time; amqp_rpc_reply_t reply; int status; @@ -441,10 +441,8 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ status = amqp_socket_open(socket, CONF(conf, host), conf->port); if (status < 0) { - char errbuf[1024]; status *= -1; - ERROR("amqp plugin: amqp_socket_open failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status)); amqp_destroy_connection(conf->connection); conf->connection = NULL; return status; @@ -454,10 +452,8 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket(CONF(conf, host), conf->port); if (sockfd < 0) { - char errbuf[1024]; status = (-1) * sockfd; - ERROR("amqp plugin: amqp_open_socket failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status)); amqp_destroy_connection(conf->connection); conf->connection = NULL; return status; @@ -507,7 +503,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ static int camqp_shutdown(void) /* {{{ */ { - DEBUG("amqp plugin: Shutting down %zu subscriber threads.", + DEBUG("amqp plugin: Shutting down %" PRIsz " subscriber threads.", subscriber_threads_num); subscriber_threads_running = 0; @@ -545,10 +541,8 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */ while (received < body_size) { status = amqp_simple_wait_frame(conf->connection, &frame); if (status < 0) { - char errbuf[1024]; status = (-1) * status; - ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status)); camqp_close_connection(conf); return status; } @@ -597,10 +591,8 @@ static int camqp_read_header(camqp_config_t *conf) /* {{{ */ status = amqp_simple_wait_frame(conf->connection, &frame); if (status < 0) { - char errbuf[1024]; status = (-1) * status; - ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status)); camqp_close_connection(conf); return status; } @@ -693,9 +685,7 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */ status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread, conf, "amqp subscribe"); if (status != 0) { - char errbuf[1024]; - ERROR("amqp plugin: pthread_create failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status)); return status; } @@ -835,7 +825,7 @@ static int camqp_config_set_format(oconfig_item_t *ci, /* {{{ */ } /* }}} int config_set_string */ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ - _Bool publish) { + bool publish) { camqp_config_t *conf; int status; @@ -860,7 +850,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; - conf->store_rates = 0; + conf->store_rates = false; conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; @@ -869,8 +859,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; - conf->queue_durable = 0; - conf->queue_auto_delete = 1; + conf->queue_durable = false; + conf->queue_auto_delete = true; /* general */ conf->connection = NULL; pthread_mutex_init(&conf->lock, /* attr = */ NULL); @@ -912,7 +902,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ else if (strcasecmp("RoutingKey", child->key) == 0) status = cf_util_get_string(child, &conf->routing_key); else if ((strcasecmp("Persistent", child->key) == 0) && publish) { - _Bool tmp = 0; + bool tmp = 0; status = cf_util_get_boolean(child, &tmp); if (tmp) conf->delivery_mode = CAMQP_DM_PERSISTENT; @@ -982,10 +972,11 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ char cbname[128]; snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name); - status = plugin_register_write( - cbname, camqp_write, &(user_data_t){ - .data = conf, .free_func = camqp_config_free, - }); + status = + plugin_register_write(cbname, camqp_write, + &(user_data_t){ + .data = conf, .free_func = camqp_config_free, + }); if (status != 0) { camqp_config_free(conf); return status; @@ -1007,9 +998,9 @@ static int camqp_config(oconfig_item_t *ci) /* {{{ */ oconfig_item_t *child = ci->children + i; if (strcasecmp("Publish", child->key) == 0) - camqp_config_connection(child, /* publish = */ 1); + camqp_config_connection(child, /* publish = */ true); else if (strcasecmp("Subscribe", child->key) == 0) - camqp_config_connection(child, /* publish = */ 0); + camqp_config_connection(child, /* publish = */ false); else WARNING("amqp plugin: Ignoring unknown config option \"%s\".", child->key);