#include "collectd.h"
-#include "common.h"
#include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
#include <amqp.h>
#include <amqp_framing.h>
* Data types
*/
struct camqp_config_s {
- _Bool publish;
+ bool publish;
char *name;
char *host;
/* publish only */
uint8_t delivery_mode;
- _Bool store_rates;
+ bool store_rates;
int format;
/* publish & graphite format only */
char *prefix;
/* subscribe only */
char *exchange_type;
char *queue;
- _Bool queue_durable;
- _Bool queue_auto_delete;
+ bool queue_durable;
+ bool queue_auto_delete;
amqp_connection_state_t connection;
pthread_mutex_t lock;
static const char *def_password = "guest";
static const char *def_exchange = "amq.fanout";
-static pthread_t *subscriber_threads = NULL;
-static size_t subscriber_threads_num = 0;
-static _Bool subscriber_threads_running = 1;
+static pthread_t *subscriber_threads;
+static size_t subscriber_threads_num;
+static bool subscriber_threads_running = true;
#define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f)
return ret;
} /* }}} char *camqp_bytes_cstring */
-static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */
+static bool camqp_is_error(camqp_config_t *conf) /* {{{ */
{
amqp_rpc_reply_t r;
r = amqp_get_rpc_reply(conf->connection);
if (r.reply_type == AMQP_RESPONSE_NORMAL)
- return 0;
+ return false;
- return 1;
-} /* }}} _Bool camqp_is_error */
+ return true;
+} /* }}} bool camqp_is_error */
static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
char *buffer, size_t buffer_size) {
static int camqp_connect(camqp_config_t *conf) /* {{{ */
{
- static time_t last_connect_time = 0;
+ static time_t last_connect_time;
amqp_rpc_reply_t reply;
int status;
status = amqp_socket_open(socket, CONF(conf, host), conf->port);
if (status < 0) {
- char errbuf[1024];
status *= -1;
- ERROR("amqp plugin: amqp_socket_open failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(CONF(conf, host), conf->port);
if (sockfd < 0) {
- char errbuf[1024];
status = (-1) * sockfd;
- ERROR("amqp plugin: amqp_open_socket failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
amqp_destroy_connection(conf->connection);
conf->connection = NULL;
return status;
static int camqp_shutdown(void) /* {{{ */
{
- DEBUG("amqp plugin: Shutting down %zu subscriber threads.",
+ DEBUG("amqp plugin: Shutting down %" PRIsz " subscriber threads.",
subscriber_threads_num);
subscriber_threads_running = 0;
while (received < body_size) {
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
- char errbuf[1024];
status = (-1) * status;
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
camqp_close_connection(conf);
return status;
}
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
- char errbuf[1024];
status = (-1) * status;
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
camqp_close_connection(conf);
return status;
}
status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
conf, "amqp subscribe");
if (status != 0) {
- char errbuf[1024];
- ERROR("amqp plugin: pthread_create failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status));
return status;
}
} /* }}} int config_set_string */
static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
- _Bool publish) {
+ bool publish) {
camqp_config_t *conf;
int status;
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;
- conf->store_rates = 0;
+ conf->store_rates = false;
conf->graphite_flags = 0;
/* publish & graphite only */
conf->prefix = NULL;
/* subscribe only */
conf->exchange_type = NULL;
conf->queue = NULL;
- conf->queue_durable = 0;
- conf->queue_auto_delete = 1;
+ conf->queue_durable = false;
+ conf->queue_auto_delete = true;
/* general */
conf->connection = NULL;
pthread_mutex_init(&conf->lock, /* attr = */ NULL);
else if (strcasecmp("RoutingKey", child->key) == 0)
status = cf_util_get_string(child, &conf->routing_key);
else if ((strcasecmp("Persistent", child->key) == 0) && publish) {
- _Bool tmp = 0;
+ bool tmp = 0;
status = cf_util_get_boolean(child, &tmp);
if (tmp)
conf->delivery_mode = CAMQP_DM_PERSISTENT;
char cbname[128];
snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
- status = plugin_register_write(
- cbname, camqp_write, &(user_data_t){
- .data = conf, .free_func = camqp_config_free,
- });
+ status =
+ plugin_register_write(cbname, camqp_write,
+ &(user_data_t){
+ .data = conf, .free_func = camqp_config_free,
+ });
if (status != 0) {
camqp_config_free(conf);
return status;
oconfig_item_t *child = ci->children + i;
if (strcasecmp("Publish", child->key) == 0)
- camqp_config_connection(child, /* publish = */ 1);
+ camqp_config_connection(child, /* publish = */ true);
else if (strcasecmp("Subscribe", child->key) == 0)
- camqp_config_connection(child, /* publish = */ 0);
+ camqp_config_connection(child, /* publish = */ false);
else
WARNING("amqp plugin: Ignoring unknown config option \"%s\".",
child->key);