#include "collectd.h"
-#include "common.h"
#include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
#include <amqp.h>
#include <amqp_framing.h>
if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
amqp_connection_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- snprintf(buffer, buffer_size, "Server connection error %d: %s",
- m->reply_code, tmp);
+ ssnprintf(buffer, buffer_size, "Server connection error %d: %s",
+ m->reply_code, tmp);
sfree(tmp);
} else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
amqp_channel_close_t *m = r.reply.decoded;
char *tmp = camqp_bytes_cstring(&m->reply_text);
- snprintf(buffer, buffer_size, "Server channel error %d: %s",
- m->reply_code, tmp);
+ ssnprintf(buffer, buffer_size, "Server channel error %d: %s",
+ m->reply_code, tmp);
sfree(tmp);
} else {
- snprintf(buffer, buffer_size, "Server error method %#" PRIx32,
- r.reply.id);
+ ssnprintf(buffer, buffer_size, "Server error method %#" PRIx32,
+ r.reply.id);
}
break;
default:
- snprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
+ ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
}
return buffer;
amqp_queue_declare_ok_t *qd_ret;
amqp_basic_consume_ok_t *cm_ret;
- qd_ret = amqp_queue_declare(conf->connection,
- /* channel = */ CAMQP_CHANNEL,
- /* queue = */ (conf->queue != NULL)
- ? amqp_cstring_bytes(conf->queue)
- : AMQP_EMPTY_BYTES,
- /* passive = */ 0,
- /* durable = */ conf->queue_durable,
- /* exclusive = */ 0,
- /* auto_delete = */ conf->queue_auto_delete,
- /* arguments = */ AMQP_EMPTY_TABLE);
+ qd_ret =
+ amqp_queue_declare(conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */
+ (conf->queue != NULL) ? amqp_cstring_bytes(conf->queue)
+ : AMQP_EMPTY_BYTES,
+ /* passive = */ 0,
+ /* durable = */ conf->queue_durable,
+ /* exclusive = */ 0,
+ /* auto_delete = */ conf->queue_auto_delete,
+ /* arguments = */ AMQP_EMPTY_TABLE);
if (qd_ret == NULL) {
ERROR("amqp plugin: amqp_queue_declare failed.");
camqp_close_connection(conf);
amqp_queue_bind_ok_t *qb_ret;
assert(conf->queue != NULL);
- qb_ret =
- amqp_queue_bind(conf->connection,
- /* channel = */ CAMQP_CHANNEL,
- /* queue = */ amqp_cstring_bytes(conf->queue),
- /* exchange = */ amqp_cstring_bytes(conf->exchange),
- /* routing_key = */ (conf->routing_key != NULL)
- ? amqp_cstring_bytes(conf->routing_key)
- : AMQP_EMPTY_BYTES,
- /* arguments = */ AMQP_EMPTY_TABLE);
+ qb_ret = amqp_queue_bind(
+ conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ amqp_cstring_bytes(conf->queue),
+ /* exchange = */ amqp_cstring_bytes(conf->exchange),
+ /* routing_key = */
+ (conf->routing_key != NULL) ? amqp_cstring_bytes(conf->routing_key)
+ : AMQP_EMPTY_BYTES,
+ /* arguments = */ AMQP_EMPTY_TABLE);
if ((qb_ret == NULL) && camqp_is_error(conf)) {
char errbuf[1024];
ERROR("amqp plugin: amqp_queue_bind failed: %s",
#ifdef HAVE_AMQP_TCP_SOCKET
#define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \
- */
+ */
/* TODO: add support for SSL using amqp_ssl_socket_new
* and related functions */
socket = amqp_tcp_socket_new(conf->connection);
if (conf->routing_key != NULL) {
sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
} else {
- snprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
- vl->host, vl->plugin, vl->plugin_instance, vl->type,
- vl->type_instance);
+ ssnprintf(routing_key, sizeof(routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host, vl->plugin, vl->plugin_instance, vl->type,
+ vl->type_instance);
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
if (publish) {
char cbname[128];
- snprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
+ ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
- status =
- plugin_register_write(cbname, camqp_write,
- &(user_data_t){
- .data = conf, .free_func = camqp_config_free,
- });
+ status = plugin_register_write(cbname, camqp_write,
+ &(user_data_t){
+ .data = conf,
+ .free_func = camqp_config_free,
+ });
if (status != 0) {
camqp_config_free(conf);
return status;