char *user;
char *password;
char *address;
- int retry_delay;
+ int retry_delay;
} amqp1_config_transport_t;
typedef struct amqp1_config_instance_t {
char *postfix;
char escape_char;
_Bool pre_settle;
- char send_to[128];
+ char send_to[1024];
} amqp1_config_instance_t;
DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
typedef struct cd_message_t {
DEQ_LINKS(struct cd_message_t);
- pn_bytes_t mbuf;
+ pn_rwbytes_t mbuf;
amqp1_config_instance_t *instance;
} cd_message_t;
int event_count = 0;
pn_delivery_t *dlv;
- if (stopping){
+ if (stopping) {
return 0;
}
return event_count;
} /* }}} int amqp1_send_out_messages */
-
static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
{
if (pn_condition_is_set(cond)) {
case PN_CONNECTION_INIT: {
conn = pn_event_connection(event);
- pn_connection_set_container(conn, transport->address);
+ pn_connection_set_container(conn, transport->name);
pn_connection_open(conn);
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
while (engine_running && !stopping) {
pn_event_batch_t *events = pn_proactor_wait(proactor);
pn_event_t *e;
- while (( e = pn_event_batch_next(events))){
+ while ((e = pn_event_batch_next(events))) {
engine_running = handle(e);
if (!engine_running) {
break;
return NULL;
} /* }}} void event_thread */
-static void encqueue(cd_message_t *cdm,
- amqp1_config_instance_t *instance) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+ amqp1_config_instance_t *instance) /* {{{ */
{
size_t bufsize = BUFSIZE;
pn_data_t *body;
pn_message_t *message;
+ int status = 0;
/* encode message */
message = pn_message();
pn_message_set_address(message, instance->send_to);
body = pn_message_body(message);
pn_data_clear(body);
- pn_data_put_binary(body, cdm->mbuf);
+ pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
pn_data_exit(body);
/* put_binary copies and stores so ok to use mbuf */
cdm->mbuf.size = bufsize;
- pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+ while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+ &cdm->mbuf.size)) == PN_OVERFLOW) {
+ DEBUG("amqp1 plugin: increasing message buffer size %i",
+ (int)cdm->mbuf.size);
+ cdm->mbuf.size *= 2;
+ cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+ }
+
+ if (status != 0) {
+ ERROR("amqp1 plugin: error encoding message: %s",
+ pn_error_text(pn_message_error(message)));
+ pn_message_free(message);
+ cd_message_free(cdm);
+ return -1;
+ }
pthread_mutex_lock(&send_lock);
DEQ_INSERT_TAIL(out_messages, cdm);
pn_connection_wake(conn);
}
-} /* }}} void encqueue */
+ return 0;
+} /* }}} int encqueue */
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
cdm = NEW(cd_message_t);
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
}
/* encode message and place on outbound queue */
- encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
- return 0;
+ return status;
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
cdm = NEW(cd_message_t);
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+ cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
cdm->instance = instance;
switch (instance->format) {
amqp1_config_instance_free(instance);
return status;
} else {
- char tpname[128];
- snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
- snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
- transport->address, instance->name);
+ char tpname[1024];
+ int status;
+ status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+ if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+ ERROR("amqp1 plugin: Instance name would have been truncated.");
+ return -1;
+ }
+ status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+ transport->address, instance->name);
+ if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+ ERROR("amqp1 plugin: send_to address would have been truncated.");
+ return -1;
+ }
if (instance->notify == true) {
status = plugin_register_notification(
tpname, amqp1_notify,