X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;fp=src%2Famqp1.c;h=5a5d2b8edea144fca50f4942f900a5ba0bd88613;hb=fd958eb99c09d819a51dd147db7c189366e9382a;hp=3397f5258b14798318cb44f71d8aa7b9346fc227;hpb=46fdf1fbedc20bff90fc85555e0afba193ab0a31;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index 3397f525..5a5d2b8e 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -62,7 +62,7 @@ typedef struct amqp1_config_transport_t { char *user; char *password; char *address; - int retry_delay; + int retry_delay; } amqp1_config_transport_t; typedef struct amqp1_config_instance_t { @@ -76,14 +76,14 @@ typedef struct amqp1_config_instance_t { char *postfix; char escape_char; _Bool pre_settle; - char send_to[128]; + char send_to[1024]; } amqp1_config_instance_t; DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); typedef struct cd_message_t { DEQ_LINKS(struct cd_message_t); - pn_bytes_t mbuf; + pn_rwbytes_t mbuf; amqp1_config_instance_t *instance; } cd_message_t; @@ -124,7 +124,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ int event_count = 0; pn_delivery_t *dlv; - if (stopping){ + if (stopping) { return 0; } @@ -166,7 +166,6 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ return event_count; } /* }}} int amqp1_send_out_messages */ - static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ { if (pn_condition_is_set(cond)) { @@ -184,7 +183,7 @@ static bool handle(pn_event_t *event) /* {{{ */ case PN_CONNECTION_INIT: { conn = pn_event_connection(event); - pn_connection_set_container(conn, transport->address); + pn_connection_set_container(conn, transport->name); pn_connection_open(conn); pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); @@ -275,7 +274,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ while (engine_running && !stopping) { pn_event_batch_t *events = pn_proactor_wait(proactor); pn_event_t *e; - while (( e = pn_event_batch_next(events))){ + while ((e = pn_event_batch_next(events))) { engine_running = handle(e); if (!engine_running) { break; @@ -308,24 +307,39 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ return NULL; } /* }}} void event_thread */ -static void encqueue(cd_message_t *cdm, - amqp1_config_instance_t *instance) /* {{{ */ +static int encqueue(cd_message_t *cdm, + amqp1_config_instance_t *instance) /* {{{ */ { size_t bufsize = BUFSIZE; pn_data_t *body; pn_message_t *message; + int status = 0; /* encode message */ message = pn_message(); pn_message_set_address(message, instance->send_to); body = pn_message_body(message); pn_data_clear(body); - pn_data_put_binary(body, cdm->mbuf); + pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); pn_data_exit(body); /* put_binary copies and stores so ok to use mbuf */ cdm->mbuf.size = bufsize; - pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); + while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + &cdm->mbuf.size)) == PN_OVERFLOW) { + DEBUG("amqp1 plugin: increasing message buffer size %i", + (int)cdm->mbuf.size); + cdm->mbuf.size *= 2; + cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + } + + if (status != 0) { + ERROR("amqp1 plugin: error encoding message: %s", + pn_error_text(pn_message_error(message))); + pn_message_free(message); + cd_message_free(cdm); + return -1; + } pthread_mutex_lock(&send_lock); DEQ_INSERT_TAIL(out_messages, cdm); @@ -338,7 +352,8 @@ static void encqueue(cd_message_t *cdm, pn_connection_wake(conn); } -} /* }}} void encqueue */ + return 0; +} /* }}} int encqueue */ static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ @@ -361,7 +376,7 @@ static int amqp1_notify(notification_t const *n, cdm = NEW(cd_message_t); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -380,9 +395,9 @@ static int amqp1_notify(notification_t const *n, } /* encode message and place on outbound queue */ - encqueue(cdm, instance); + status = encqueue(cdm, instance); - return 0; + return status; } /* }}} int amqp1_notify */ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -406,7 +421,7 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ cdm = NEW(cd_message_t); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -557,10 +572,19 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ amqp1_config_instance_free(instance); return status; } else { - char tpname[128]; - snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); - snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address, instance->name); + char tpname[1024]; + int status; + status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + if ((status < 0) || (size_t)status >= sizeof(tpname)) { + ERROR("amqp1 plugin: Instance name would have been truncated."); + return -1; + } + status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); + if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { + ERROR("amqp1 plugin: send_to address would have been truncated."); + return -1; + } if (instance->notify == true) { status = plugin_register_notification( tpname, amqp1_notify,