#include "utils_format_graphite.h"
#include "utils_format_json.h"
#include "utils_random.h"
#include "utils_format_graphite.h"
#include "utils_format_json.h"
#include "utils_random.h"
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_t {
DEQ_LINKS(struct amqp1_config_instance_t);
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_t {
DEQ_LINKS(struct amqp1_config_instance_t);
} amqp1_config_instance_t;
DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
} amqp1_config_instance_t;
DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
-pn_connection_t *conn = NULL;
-pn_session_t *ssn = NULL;
-pn_link_t *sender = NULL;
-pn_proactor_t *proactor = NULL;
-pthread_mutex_t send_lock;
-cd_message_list_t out_messages;
-uint64_t cd_tag = 1;
-uint64_t acknowledged = 0;
+pn_connection_t *conn = NULL;
+pn_session_t *ssn = NULL;
+pn_link_t *sender = NULL;
+pn_proactor_t *proactor = NULL;
+pthread_mutex_t send_lock;
+cd_message_list_t out_messages;
+uint64_t cd_tag = 1;
+uint64_t acknowledged = 0;
pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
pn_link_advance(link);
if (cdm->instance->pre_settle == true) {
pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
pn_link_advance(link);
if (cdm->instance->pre_settle == true) {
static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
{
if (pn_condition_is_set(cond)) {
static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
{
if (pn_condition_is_set(cond)) {
- ERROR("amqp1 plugin: %s: %s: %s",
- pn_event_type_name(pn_event_type(e)),
- pn_condition_get_name(cond),
- pn_condition_get_description(cond));
+ ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
+ pn_condition_get_name(cond), pn_condition_get_description(cond));
conn = pn_event_connection(event);
pn_connection_set_container(conn, transport->address);
pn_connection_open(conn);
conn = pn_event_connection(event);
pn_connection_set_container(conn, transport->address);
pn_connection_open(conn);
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
cdm->mbuf.size = strlen(cdm->mbuf.start);
break;
case AMQP1_FORMAT_GRAPHITE:
format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
cdm->mbuf.size = strlen(cdm->mbuf.start);
break;
case AMQP1_FORMAT_GRAPHITE:
- status =
- format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix,
- instance->postfix, instance->escape_char, instance->graphite_flags);
+ status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
+ instance->prefix, instance->postfix,
+ instance->escape_char, instance->graphite_flags);
else if (strcasecmp("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)
else if (strcasecmp("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)
status = cf_util_get_boolean(child, &instance->store_rates);
else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
status = cf_util_get_flag(child, &instance->graphite_flags,
status = cf_util_get_boolean(child, &instance->store_rates);
else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
status = cf_util_get_flag(child, &instance->graphite_flags,
char tpname[128];
snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
char tpname[128];
snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
- status = plugin_register_notification(tpname, amqp1_notify, &(user_data_t) {
- .data = instance, .free_func = amqp1_config_instance_free, });
+ status = plugin_register_notification(
+ tpname, amqp1_notify,
+ &(user_data_t){
+ .data = instance, .free_func = amqp1_config_instance_free,
+ });
- status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
- .data = instance, .free_func = amqp1_config_instance_free, });
+ status = plugin_register_write(
+ tpname, amqp1_write,
+ &(user_data_t){
+ .data = instance, .free_func = amqp1_config_instance_free,
+ });
status = cf_util_get_string(child, &transport->password);
else if (strcasecmp("Address", child->key) == 0)
status = cf_util_get_string(child, &transport->address);
status = cf_util_get_string(child, &transport->password);
else if (strcasecmp("Address", child->key) == 0)
status = cf_util_get_string(child, &transport->address);
amqp1_config_instance(child);
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
amqp1_config_instance(child);
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
if (proactor == NULL) {
pthread_mutex_init(&send_lock, /* attr = */ NULL);
proactor = pn_proactor();
if (proactor == NULL) {
pthread_mutex_init(&send_lock, /* attr = */ NULL);
proactor = pn_proactor();
- pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port);
+ pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
- pn_connection_set_user(conn, transport->user);
- pn_connection_set_password(conn, transport->password);
+ pn_connection_set_user(conn, transport->user);
+ pn_connection_set_password(conn, transport->password);
- status = plugin_thread_create(&event_thread_id, NULL /* no attributes */,
- event_thread, NULL /* no argument */,
- "handle");
+ status =
+ plugin_thread_create(&event_thread_id, NULL /* no attributes */,
+ event_thread, NULL /* no argument */, "handle");
if (status != 0) {
ERROR("amqp1: pthread_create failed: %s",
sstrerror(errno, errbuf, sizeof(errbuf)));
if (status != 0) {
ERROR("amqp1: pthread_create failed: %s",
sstrerror(errno, errbuf, sizeof(errbuf)));
/* activate the event thread */
pn_connection_wake(conn);
pthread_join(event_thread_id, NULL /* no return value */);
/* activate the event thread */
pn_connection_wake(conn);
pthread_join(event_thread_id, NULL /* no return value */);
plugin_register_complex_config("amqp1", amqp1_config);
plugin_register_init("amqp1", amqp1_init);
plugin_register_complex_config("amqp1", amqp1_config);
plugin_register_init("amqp1", amqp1_init);