* Data types
*/
struct camqp_config_s {
- _Bool publish;
+ bool publish;
char *name;
char *host;
/* publish only */
uint8_t delivery_mode;
- _Bool store_rates;
+ bool store_rates;
int format;
/* publish & graphite format only */
char *prefix;
/* subscribe only */
char *exchange_type;
char *queue;
- _Bool queue_durable;
- _Bool queue_auto_delete;
+ bool queue_durable;
+ bool queue_auto_delete;
amqp_connection_state_t connection;
pthread_mutex_t lock;
static pthread_t *subscriber_threads = NULL;
static size_t subscriber_threads_num = 0;
-static _Bool subscriber_threads_running = 1;
+static bool subscriber_threads_running = true;
#define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f)
return ret;
} /* }}} char *camqp_bytes_cstring */
-static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */
+static bool camqp_is_error(camqp_config_t *conf) /* {{{ */
{
amqp_rpc_reply_t r;
return 0;
return 1;
-} /* }}} _Bool camqp_is_error */
+} /* }}} bool camqp_is_error */
static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
char *buffer, size_t buffer_size) {
status = amqp_socket_open(socket, CONF(conf, host), conf->port);
if (status < 0) {
- char errbuf[1024];
status *= -1;
- ERROR("amqp plugin: amqp_socket_open failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(CONF(conf, host), conf->port);
if (sockfd < 0) {
- char errbuf[1024];
status = (-1) * sockfd;
- ERROR("amqp plugin: amqp_open_socket failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
static int camqp_shutdown(void) /* {{{ */
{
- DEBUG("amqp plugin: Shutting down %zu subscriber threads.",
+ DEBUG("amqp plugin: Shutting down %" PRIsz " subscriber threads.",
subscriber_threads_num);
subscriber_threads_running = 0;
while (received < body_size) {
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
- char errbuf[1024];
status = (-1) * status;
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
camqp_close_connection(conf);
return status;
}
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
- char errbuf[1024];
status = (-1) * status;
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
camqp_close_connection(conf);
return status;
}
status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
conf, "amqp subscribe");
if (status != 0) {
- char errbuf[1024];
- ERROR("amqp plugin: pthread_create failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status));
return status;
}
} /* }}} int config_set_string */
static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
- _Bool publish) {
+ bool publish) {
camqp_config_t *conf;
int status;
else if (strcasecmp("RoutingKey", child->key) == 0)
status = cf_util_get_string(child, &conf->routing_key);
else if ((strcasecmp("Persistent", child->key) == 0) && publish) {
- _Bool tmp = 0;
+ bool tmp = 0;
status = cf_util_get_boolean(child, &tmp);
if (tmp)
conf->delivery_mode = CAMQP_DM_PERSISTENT;
char cbname[128];
snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
- status = plugin_register_write(
- cbname, camqp_write, &(user_data_t){
- .data = conf, .free_func = camqp_config_free,
- });
+ status =
+ plugin_register_write(cbname, camqp_write,
+ &(user_data_t){
+ .data = conf, .free_func = camqp_config_free,
+ });
if (status != 0) {
camqp_config_free(conf);
return status;