} /* }}} _Bool camqp_is_error */
static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
} /* }}} _Bool camqp_is_error */
static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- ssnprintf(buffer, buffer_size, "Server connection error %d: %s",
- m->reply_code, tmp);
+ snprintf(buffer, buffer_size, "Server connection error %d: %s",
+ m->reply_code, tmp);
sfree(tmp);
} else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
sfree(tmp);
} else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- ssnprintf(buffer, buffer_size, "Server channel error %d: %s",
- m->reply_code, tmp);
+ snprintf(buffer, buffer_size, "Server channel error %d: %s",
+ m->reply_code, tmp);
- ssnprintf(buffer, buffer_size, "Server error method %#" PRIx32,
- r.reply.id);
+ snprintf(buffer, buffer_size, "Server error method %#" PRIx32,
+ r.reply.id);
- ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
+ snprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
} /* }}} int camqp_create_exchange */
#else
static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
} /* }}} int camqp_create_exchange */
#else
static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
/* Valid arguments: "auto_delete", "internal" */
argument_table.num_entries = STATIC_ARRAY_SIZE(argument_table_entries);
/* Valid arguments: "auto_delete", "internal" */
argument_table.num_entries = STATIC_ARRAY_SIZE(argument_table_entries);
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_exchange_declare failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
}
INFO("amqp plugin: Successfully created exchange \"%s\" "
"with type \"%s\".",
conf->exchange, conf->exchange_type);
if (conf->queue == NULL) {
ERROR("amqp plugin: camqp_bytes_cstring failed.");
camqp_close_connection(conf);
if (conf->queue == NULL) {
ERROR("amqp plugin: camqp_bytes_cstring failed.");
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_queue_bind failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_queue_bind failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_basic_consume failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
ERROR("amqp plugin: amqp_basic_consume failed: %s",
camqp_strerror(conf, errbuf, sizeof(errbuf)));
camqp_close_connection(conf);
time_t now = time(NULL);
if (now < (last_connect_time + conf->connection_retry_delay)) {
DEBUG("amqp plugin: skipping connection retry, "
"ConnectionRetryDelay: %d",
conf->connection_retry_delay);
time_t now = time(NULL);
if (now < (last_connect_time + conf->connection_retry_delay)) {
DEBUG("amqp plugin: skipping connection retry, "
"ConnectionRetryDelay: %d",
conf->connection_retry_delay);
conf->connection = amqp_new_connection();
if (conf->connection == NULL) {
ERROR("amqp plugin: amqp_new_connection failed.");
conf->connection = amqp_new_connection();
if (conf->connection == NULL) {
ERROR("amqp plugin: amqp_new_connection failed.");
}
status = amqp_socket_open(socket, CONF(conf, host), conf->port);
if (status < 0) {
}
status = amqp_socket_open(socket, CONF(conf, host), conf->port);
if (status < 0) {
- ERROR("amqp plugin: amqp_socket_open failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status));
}
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(CONF(conf, host), conf->port);
if (sockfd < 0) {
}
#else /* HAVE_AMQP_TCP_SOCKET */
#define CLOSE_SOCKET() close(sockfd)
/* this interface is deprecated as of rabbitmq-c 0.4 */
sockfd = amqp_open_socket(CONF(conf, host), conf->port);
if (sockfd < 0) {
- ERROR("amqp plugin: amqp_open_socket failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
while (received < body_size) {
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
while (received < body_size) {
status = amqp_simple_wait_frame(conf->connection, &frame);
if (status < 0) {
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
}
if (frame.frame_type != AMQP_FRAME_BODY) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
}
if (frame.frame_type != AMQP_FRAME_BODY) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
}
if ((body_size - received) < frame.payload.body_fragment.len) {
WARNING("amqp plugin: Body is larger than indicated by header.");
}
if ((body_size - received) < frame.payload.body_fragment.len) {
WARNING("amqp plugin: Body is larger than indicated by header.");
status = cmd_handle_putval(stderr, body);
if (status != 0)
ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status);
status = cmd_handle_putval(stderr, body);
if (status != 0)
ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status);
} else if (strcasecmp("application/json", content_type) == 0) {
ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not "
"been implemented yet. FIXME!");
} else if (strcasecmp("application/json", content_type) == 0) {
ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not "
"been implemented yet. FIXME!");
- ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
+ ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
}
if (frame.frame_type != AMQP_FRAME_HEADER) {
NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
}
properties = frame.payload.properties.decoded;
content_type = camqp_bytes_cstring(&properties->content_type);
if (content_type == NULL) {
ERROR("amqp plugin: Unable to determine content type.");
}
properties = frame.payload.properties.decoded;
content_type = camqp_bytes_cstring(&properties->content_type);
if (content_type == NULL) {
ERROR("amqp plugin: Unable to determine content type.");
} /* }}} void *camqp_subscribe_thread */
static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
} /* }}} void *camqp_subscribe_thread */
static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
conf, "amqp subscribe");
if (status != 0) {
status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
conf, "amqp subscribe");
if (status != 0) {
- char errbuf[1024];
- ERROR("amqp plugin: pthread_create failed: %s",
- sstrerror(status, errbuf, sizeof(errbuf)));
- return (status);
+ ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status));
+ return status;
amqp_basic_properties_t props = {._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
amqp_basic_properties_t props = {._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
} /* }}} int camqp_write_locked */
static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
} /* }}} int camqp_write_locked */
static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
if (conf->routing_key != NULL) {
sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
} else {
if (conf->routing_key != NULL) {
sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
} else {
- ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
- vl->host, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance);
+ snprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host, vl->plugin, vl->plugin_instance, vl->type,
+ vl->type_instance);
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
if (status != 0) {
ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
if (status != 0) {
ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
conf->postfix, conf->escape_char, conf->graphite_flags);
if (status != 0) {
ERROR("amqp plugin: format_graphite failed with status %i.", status);
conf->postfix, conf->escape_char, conf->graphite_flags);
if (status != 0) {
ERROR("amqp plugin: format_graphite failed with status %i.", status);
}
pthread_mutex_lock(&conf->lock);
status = camqp_write_locked(conf, buffer, routing_key);
pthread_mutex_unlock(&conf->lock);
}
pthread_mutex_lock(&conf->lock);
status = camqp_write_locked(conf, buffer, routing_key);
pthread_mutex_unlock(&conf->lock);
} /* }}} int config_set_string */
static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
} /* }}} int config_set_string */
static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
conf = calloc(1, sizeof(*conf));
if (conf == NULL) {
ERROR("amqp plugin: calloc failed.");
conf = calloc(1, sizeof(*conf));
if (conf == NULL) {
ERROR("amqp plugin: calloc failed.");
status = cf_util_get_string(child, &conf->password);
else if (strcasecmp("Exchange", child->key) == 0)
status = cf_util_get_string(child, &conf->exchange);
status = cf_util_get_string(child, &conf->password);
else if (strcasecmp("Exchange", child->key) == 0)
status = cf_util_get_string(child, &conf->exchange);
status = cf_util_get_string(child, &conf->exchange_type);
else if ((strcasecmp("Queue", child->key) == 0) && !publish)
status = cf_util_get_string(child, &conf->queue);
status = cf_util_get_string(child, &conf->exchange_type);
else if ((strcasecmp("Queue", child->key) == 0) && !publish)
status = cf_util_get_string(child, &conf->queue);
- ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
+ snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
- status = plugin_register_write(
- cbname, camqp_write, &(user_data_t){
- .data = conf, .free_func = camqp_config_free,
- });
+ status =
+ plugin_register_write(cbname, camqp_write,
+ &(user_data_t){
+ .data = conf, .free_func = camqp_config_free,
+ });
} /* }}} int camqp_config */
void module_register(void) {
plugin_register_complex_config("amqp", camqp_config);
plugin_register_shutdown("amqp", camqp_shutdown);
} /* void module_register */
} /* }}} int camqp_config */
void module_register(void) {
plugin_register_complex_config("amqp", camqp_config);
plugin_register_shutdown("amqp", camqp_shutdown);
} /* void module_register */