Updates for rpm spec and review feedback
[collectd.git] / src / amqp1.c
index 3397f52..5a5d2b8 100644 (file)
@@ -62,7 +62,7 @@ typedef struct amqp1_config_transport_t {
   char *user;
   char *password;
   char *address;
-  int  retry_delay;
+  int retry_delay;
 } amqp1_config_transport_t;
 
 typedef struct amqp1_config_instance_t {
@@ -76,14 +76,14 @@ typedef struct amqp1_config_instance_t {
   char *postfix;
   char escape_char;
   _Bool pre_settle;
-  char send_to[128];
+  char send_to[1024];
 } amqp1_config_instance_t;
 
 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
 
 typedef struct cd_message_t {
   DEQ_LINKS(struct cd_message_t);
-  pn_bytes_t mbuf;
+  pn_rwbytes_t mbuf;
   amqp1_config_instance_t *instance;
 } cd_message_t;
 
@@ -124,7 +124,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   int event_count = 0;
   pn_delivery_t *dlv;
 
-  if (stopping){
+  if (stopping) {
     return 0;
   }
 
@@ -166,7 +166,6 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   return event_count;
 } /* }}} int amqp1_send_out_messages */
 
-
 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
 {
   if (pn_condition_is_set(cond)) {
@@ -184,7 +183,7 @@ static bool handle(pn_event_t *event) /* {{{ */
 
   case PN_CONNECTION_INIT: {
     conn = pn_event_connection(event);
-    pn_connection_set_container(conn, transport->address);
+    pn_connection_set_container(conn, transport->name);
     pn_connection_open(conn);
     pn_session_t *ssn = pn_session(conn);
     pn_session_open(ssn);
@@ -275,7 +274,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
     while (engine_running && !stopping) {
       pn_event_batch_t *events = pn_proactor_wait(proactor);
       pn_event_t *e;
-      while (( e = pn_event_batch_next(events))){
+      while ((e = pn_event_batch_next(events))) {
         engine_running = handle(e);
         if (!engine_running) {
           break;
@@ -308,24 +307,39 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
   return NULL;
 } /* }}} void event_thread */
 
-static void encqueue(cd_message_t *cdm,
-                     amqp1_config_instance_t *instance) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
 {
   size_t bufsize = BUFSIZE;
   pn_data_t *body;
   pn_message_t *message;
+  int status = 0;
 
   /* encode message */
   message = pn_message();
   pn_message_set_address(message, instance->send_to);
   body = pn_message_body(message);
   pn_data_clear(body);
-  pn_data_put_binary(body, cdm->mbuf);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
   cdm->mbuf.size = bufsize;
-  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+  while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %i",
+          (int)cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    cd_message_free(cdm);
+    return -1;
+  }
 
   pthread_mutex_lock(&send_lock);
   DEQ_INSERT_TAIL(out_messages, cdm);
@@ -338,7 +352,8 @@ static void encqueue(cd_message_t *cdm,
     pn_connection_wake(conn);
   }
 
-} /* }}} void encqueue */
+  return 0;
+} /* }}} int encqueue */
 
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
@@ -361,7 +376,7 @@ static int amqp1_notify(notification_t const *n,
 
   cdm = NEW(cd_message_t);
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -380,9 +395,9 @@ static int amqp1_notify(notification_t const *n,
   }
 
   /* encode message and place on outbound queue */
-  encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
 
-  return 0;
+  return status;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
@@ -406,7 +421,7 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
 
   cdm = NEW(cd_message_t);
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -557,10 +572,19 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     amqp1_config_instance_free(instance);
     return status;
   } else {
-    char tpname[128];
-    snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
-    snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
-             transport->address, instance->name);
+    char tpname[1024];
+    int status;
+    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                      transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
     if (instance->notify == true) {
       status = plugin_register_notification(
           tpname, amqp1_notify,