Added library link check and addressed review comments
[collectd.git] / src / amqp1.c
index 2474664..4ba7359 100644 (file)
 #include "common.h"
 #include "plugin.h"
 #include "utils_cmd_putval.h"
+#include "utils_deq.h"
 #include "utils_format_graphite.h"
 #include "utils_format_json.h"
 #include "utils_random.h"
-#include "utils_deq.h"
 
-#include <proton/connection.h>
 #include <proton/condition.h>
+#include <proton/connection.h>
 #include <proton/delivery.h>
 #include <proton/link.h>
 #include <proton/message.h>
 #include <proton/session.h>
 #include <proton/transport.h>
 
-#include <stdio.h>
-#include <stdlib.h>
 #include <errno.h>
 #include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
 
 #define BUFSIZE 8192
 #define AMQP1_FORMAT_JSON 0
 #define AMQP1_FORMAT_COMMAND 1
 #define AMQP1_FORMAT_GRAPHITE 2
 
-typedef struct amqp1_config_transport_t {
-  DEQ_LINKS(struct amqp1_config_transport_t);
-  char              *name;
-  char              *host;
-  char              *port;
-  char              *user;
-  char              *password;
-  char              *address;
+typedef struct amqp1_config_transport_s {
+  DEQ_LINKS(struct amqp1_config_transport_s);
+  char *name;
+  char *host;
+  char *port;
+  char *user;
+  char *password;
+  char *address;
+  int retry_delay;
 } amqp1_config_transport_t;
 
-typedef struct amqp1_config_instance_t {
-  DEQ_LINKS(struct amqp1_config_instance_t);
-  char              *name;
-  _Bool             notify;
-  uint8_t           format;
-  unsigned int      graphite_flags;
-  _Bool             store_rates;
-  char              *prefix;
-  char              *postfix;
-  char              escape_char;
-  _Bool             pre_settle;
-  char              send_to[128];
+typedef struct amqp1_config_instance_s {
+  DEQ_LINKS(struct amqp1_config_instance_s);
+  char *name;
+  bool notify;
+  uint8_t format;
+  unsigned int graphite_flags;
+  bool store_rates;
+  char *prefix;
+  char *postfix;
+  char escape_char;
+  bool pre_settle;
+  char send_to[1024];
 } amqp1_config_instance_t;
 
 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
 
-typedef struct cd_message_t {
-  DEQ_LINKS(struct cd_message_t);
-  pn_bytes_t mbuf;
+typedef struct cd_message_s {
+  DEQ_LINKS(struct cd_message_s);
+  pn_rwbytes_t mbuf;
   amqp1_config_instance_t *instance;
 } cd_message_t;
 
@@ -91,39 +92,38 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t);
 /*
  * Globals
  */
-pn_connection_t          *conn = NULL;
-pn_session_t             *ssn = NULL;
-pn_link_t                *sender = NULL;
-pn_proactor_t            *proactor = NULL;
-pthread_mutex_t          send_lock;
-cd_message_list_t        out_messages;
-uint64_t                 cd_tag = 1;
-uint64_t                 acknowledged = 0;
-amqp1_config_transport_t *transport = NULL;
-bool                     finished = false;
-
-static int       event_thread_running = 0;
+static pn_connection_t *conn = NULL;
+static pn_link_t *sender = NULL;
+static pn_proactor_t *proactor = NULL;
+static pthread_mutex_t send_lock;
+static cd_message_list_t out_messages;
+static uint64_t cd_tag = 1;
+static uint64_t acknowledged = 0;
+static amqp1_config_transport_t *transport = NULL;
+static bool stopping = false;
+static int event_thread_running = 0;
 static pthread_t event_thread_id;
 
 /*
  * Functions
  */
-static void cd_message_free(cd_message_t *cdm)
-{
-  if (cdm->mbuf.start) {
-    free((void *)cdm->mbuf.start);
-  }
+static void cd_message_free(cd_message_t *cdm) {
+  free(cdm->mbuf.start);
   free(cdm);
 } /* }}} void cd_message_free */
 
 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
 {
-  uint64_t          dtag;
+  uint64_t dtag;
   cd_message_list_t to_send;
-  cd_message_t      *cdm;
-  int               link_credit = pn_link_credit(link);
-  int               event_count = 0;
-  pn_delivery_t     *dlv;
+  cd_message_t *cdm;
+  int link_credit = pn_link_credit(link);
+  int event_count = 0;
+  pn_delivery_t *dlv;
+
+  if (stopping) {
+    return 0;
+  }
 
   DEQ_INIT(to_send);
 
@@ -149,7 +149,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   while (cdm) {
     DEQ_REMOVE_HEAD(to_send);
     dtag++;
-    dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag)));
+    dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
     pn_link_advance(link);
     if (cdm->instance->pre_settle == true) {
@@ -166,10 +166,8 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
 {
   if (pn_condition_is_set(cond)) {
-    ERROR("amqp1 plugin: %s: %s: %s",
-          pn_event_type_name(pn_event_type(e)),
-          pn_condition_get_name(cond),
-          pn_condition_get_description(cond));
+    ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
+          pn_condition_get_name(cond), pn_condition_get_description(cond));
     pn_connection_close(pn_event_connection(e));
     conn = NULL;
   }
@@ -180,11 +178,11 @@ static bool handle(pn_event_t *event) /* {{{ */
 
   switch (pn_event_type(event)) {
 
-  case PN_CONNECTION_INIT:{
+  case PN_CONNECTION_INIT: {
     conn = pn_event_connection(event);
-    pn_connection_set_container(conn, transport->address);
+    pn_connection_set_container(conn, transport->name);
     pn_connection_open(conn);
-    ssn = pn_session(conn);
+    pn_session_t *ssn = pn_session(conn);
     pn_session_open(ssn);
     sender = pn_sender(ssn, "cd-sender");
     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
@@ -200,15 +198,16 @@ static bool handle(pn_event_t *event) /* {{{ */
 
   case PN_DELIVERY: {
     /* acknowledgement from peer that a message was delivered */
-    pn_delivery_t * dlv = pn_event_delivery(event);
+    pn_delivery_t *dlv = pn_event_delivery(event);
     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
+      pn_delivery_settle(dlv);
       acknowledged++;
     }
     break;
   }
 
   case PN_CONNECTION_WAKE: {
-    if (!finished) {
+    if (!stopping) {
       amqp1_send_out_messages(sender);
     }
     break;
@@ -220,13 +219,15 @@ static bool handle(pn_event_t *event) /* {{{ */
   }
 
   case PN_CONNECTION_REMOTE_CLOSE: {
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
     pn_connection_close(pn_event_connection(event));
     break;
   }
 
   case PN_SESSION_REMOTE_CLOSE: {
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
     pn_connection_close(pn_event_connection(event));
     break;
   }
@@ -242,47 +243,100 @@ static bool handle(pn_event_t *event) /* {{{ */
     return false;
   }
 
-  default: break;
+  default:
+    break;
   }
   return true;
 } /* }}} bool handle */
 
 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
 {
+  char addr[PN_MAX_ADDR];
+  cd_message_t *cdm;
+
+  /* setup proactor */
+  proactor = pn_proactor();
+  pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
 
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(proactor);
-    pn_event_t *e;
-    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(e)) {
-        finished = true;
+  while (!stopping) {
+    /* make connection */
+    conn = pn_connection();
+    if (transport->user != NULL) {
+      pn_connection_set_user(conn, transport->user);
+      pn_connection_set_password(conn, transport->password);
+    }
+    pn_proactor_connect(proactor, conn, addr);
+
+    bool engine_running = true;
+    while (engine_running && !stopping) {
+      pn_event_batch_t *events = pn_proactor_wait(proactor);
+      pn_event_t *e;
+      while ((e = pn_event_batch_next(events))) {
+        engine_running = handle(e);
+        if (!engine_running) {
+          break;
+        }
       }
+      pn_proactor_done(proactor, events);
     }
-    pn_proactor_done(proactor, events);
-  } while (!finished);
+
+    pn_proactor_release_connection(conn);
+
+    DEBUG("amqp1 plugin: retrying connection");
+    int delay = transport->retry_delay;
+    while (delay-- > 0 && !stopping) {
+      sleep(1.0);
+    }
+  }
+
+  pn_proactor_disconnect(proactor, NULL);
+
+  /* Free the remaining out_messages */
+  cdm = DEQ_HEAD(out_messages);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(out_messages);
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(out_messages);
+  }
 
   event_thread_running = 0;
 
   return NULL;
 } /* }}} void event_thread */
 
-static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
 {
-  size_t       bufsize = BUFSIZE;
-  pn_data_t    *body;
+  size_t bufsize = BUFSIZE;
+  pn_data_t *body;
   pn_message_t *message;
+  int status = 0;
 
   /* encode message */
   message = pn_message();
   pn_message_set_address(message, instance->send_to);
   body = pn_message_body(message);
   pn_data_clear(body);
-  pn_data_put_binary(body, cdm->mbuf);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
   cdm->mbuf.size = bufsize;
-  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+  while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %i",
+          (int)cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    cd_message_free(cdm);
+    return -1;
+  }
 
   pthread_mutex_lock(&send_lock);
   DEQ_INSERT_TAIL(out_messages, cdm);
@@ -295,16 +349,18 @@ static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {
     pn_connection_wake(conn);
   }
 
-} /* }}} void encqueue */
+  return 0;
+} /* }}} int encqueue */
 
-static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */
+static int amqp1_notify(notification_t const *n,
+                        user_data_t *user_data) /* {{{ */
 {
   amqp1_config_instance_t *instance;
-  int          status = 0;
-  size_t       bfree = BUFSIZE;
-  size_t       bfill = 0;
+  int status = 0;
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
   cd_message_t *cdm;
-  size_t       bufsize = BUFSIZE;
+  size_t bufsize = BUFSIZE;
 
   if ((n == NULL) || (user_data == NULL))
     return EINVAL;
@@ -315,9 +371,9 @@ static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{
     ERROR("amqp1 plugin: write notification failed");
   }
 
-  cdm = NEW(cd_message_t);
+  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -336,22 +392,22 @@ static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{
   }
 
   /* encode message and place on outbound queue */
-  encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
 
-  return 0;
+  return status;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
-                       user_data_t *user_data)
-{
+                       user_data_t *user_data) {
   amqp1_config_instance_t *instance;
-  int          status = 0;
-  size_t       bfree = BUFSIZE;
-  size_t       bfill = 0;
+  int status = 0;
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
   cd_message_t *cdm;
-  size_t       bufsize = BUFSIZE;
+  size_t bufsize = BUFSIZE;
 
-  if ((ds == NULL) || (vl == NULL) || (transport == NULL) || (user_data == NULL))
+  if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
+      (user_data == NULL))
     return EINVAL;
 
   instance = user_data->data;
@@ -360,9 +416,9 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     ERROR("amqp1 plugin: write failed");
   }
 
-  cdm = NEW(cd_message_t);
+  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -377,14 +433,14 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   case AMQP1_FORMAT_JSON:
     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
-                             instance->store_rates);
+                           instance->store_rates);
     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
     cdm->mbuf.size = strlen(cdm->mbuf.start);
     break;
   case AMQP1_FORMAT_GRAPHITE:
-    status =
-        format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix,
-                        instance->postfix, instance->escape_char, instance->graphite_flags);
+    status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
+                             instance->prefix, instance->postfix,
+                             instance->escape_char, instance->graphite_flags);
     if (status != 0) {
       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
       return status;
@@ -434,7 +490,7 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */
 
 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 {
-  int  status=0;
+  int status = 0;
   char *key = NULL;
   amqp1_config_instance_t *instance;
 
@@ -444,9 +500,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     return ENOMEM;
   }
 
-  /* Initialize instance configuration {{{ */
-  instance->name = NULL;
-
   status = cf_util_get_string(ci, &instance->name);
   if (status != 0) {
     sfree(instance);
@@ -463,8 +516,8 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Format", child->key) == 0) {
       status = cf_util_get_string(child, &key);
       if (status != 0)
-          return status;
-          /* TODO: goto errout */
+        return status;
+      /* TODO: goto errout */
       //          goto errout;
       assert(key != NULL);
       if (strcasecmp(key, "Command") == 0) {
@@ -477,8 +530,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
         WARNING("amqp1 plugin: Invalid format string: %s", key);
       }
       sfree(key);
-    }
-    else if (strcasecmp("StoreRates", child->key) == 0)
+    } else if (strcasecmp("StoreRates", child->key) == 0)
       status = cf_util_get_boolean(child, &instance->store_rates);
     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
       status = cf_util_get_flag(child, &instance->graphite_flags,
@@ -496,16 +548,18 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
       char *tmp_buff = NULL;
       status = cf_util_get_string(child, &tmp_buff);
-      if (strlen(tmp_buff) > 1)
-        WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
-                "only one character. Others will be ignored.");
-      instance->escape_char = tmp_buff[0];
+      if (status == 0) {
+        if (strlen(tmp_buff) > 1)
+          WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+                  "only one character. Others will be ignored.");
+        instance->escape_char = tmp_buff[0];
+      }
       sfree(tmp_buff);
-    }
-    else
+    } else
       WARNING("amqp1 plugin: Ignoring unknown "
               "instance configuration option "
-              "\%s\".", child->key);
+              "\%s\".",
+              child->key);
     if (status != 0)
       break;
   }
@@ -514,16 +568,30 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     amqp1_config_instance_free(instance);
     return status;
   } else {
-    char tpname[128];
-    snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
-    snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
-             transport->address,instance->name);
+    char tpname[DATA_MAX_NAME_LEN];
+    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                      transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
     if (instance->notify == true) {
-      status = plugin_register_notification(tpname, amqp1_notify, &(user_data_t) {
-              .data = instance, .free_func = amqp1_config_instance_free, });
+      status = plugin_register_notification(
+          tpname, amqp1_notify,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
     } else {
-      status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
-              .data = instance, .free_func = amqp1_config_instance_free, });
+      status = plugin_register_write(
+          tpname, amqp1_write,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
     }
 
     if (status != 0) {
@@ -536,7 +604,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 {
-  int status=0;
+  int status = 0;
 
   transport = calloc(1, sizeof(*transport));
   if (transport == NULL) {
@@ -545,7 +613,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
   }
 
   /* Initialize transport configuration {{{ */
-  transport->name = NULL;
+  transport->retry_delay = 1;
 
   status = cf_util_get_string(ci, &transport->name);
   if (status != 0) {
@@ -566,12 +634,15 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_string(child, &transport->password);
     else if (strcasecmp("Address", child->key) == 0)
       status = cf_util_get_string(child, &transport->address);
-    else if (strcasecmp("Instance",child->key) == 0)
+    else if (strcasecmp("RetryDelay", child->key) == 0)
+      status = cf_util_get_int(child, &transport->retry_delay);
+    else if (strcasecmp("Instance", child->key) == 0)
       amqp1_config_instance(child);
     else
       WARNING("amqp1 plugin: Ignoring unknown "
               "transport configuration option "
-              "\%s\".", child->key);
+              "\%s\".",
+              child->key);
 
     if (status != 0)
       break;
@@ -581,7 +652,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
     amqp1_config_transport_free(transport);
   }
   return status;
-}  /* }}} int amqp1_config_transport */
+} /* }}} int amqp1_config_transport */
 
 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 {
@@ -592,7 +663,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
     if (strcasecmp("Transport", child->key) == 0)
       amqp1_config_transport(child);
     else
-      WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
+      WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
               child->key);
   }
 
@@ -601,8 +672,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_init(void) /* {{{ */
 {
-  char addr[PN_MAX_ADDR];
-  int  status;
+  int status;
   char errbuf[1024];
 
   if (transport == NULL) {
@@ -612,20 +682,12 @@ static int amqp1_init(void) /* {{{ */
 
   if (proactor == NULL) {
     pthread_mutex_init(&send_lock, /* attr = */ NULL);
-    proactor = pn_proactor();
-    pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port);
-    conn = pn_connection();
-    if (transport->user != NULL) {
-        pn_connection_set_user(conn, transport->user);
-        pn_connection_set_password(conn, transport->password);
-    }
-    pn_proactor_connect(proactor, conn, addr);
     /* start_thread */
-    status = plugin_thread_create(&event_thread_id, NULL /* no attributes */,
-                                  event_thread, NULL /* no argument */,
-                                  "handle");
+    status =
+        plugin_thread_create(&event_thread_id, NULL /* no attributes */,
+                             event_thread, NULL /* no argument */, "handle");
     if (status != 0) {
-      ERROR("amqp1: pthread_create failed: %s",
+      ERROR("amqp1 plugin: pthread_create failed: %s",
             sstrerror(errno, errbuf, sizeof(errbuf)));
     } else {
       event_thread_running = 1;
@@ -636,28 +698,17 @@ static int amqp1_init(void) /* {{{ */
 
 static int amqp1_shutdown(void) /* {{{ */
 {
-  cd_message_t *cdm;
+  stopping = true;
 
   /* Stop the proactor thread */
-  if (event_thread_running != 0) {
-    finished=true;
-    /* activate the event thread */
+  if (event_thread_running == 1) {
+    DEBUG("amqp1 plugin: Shutting down proactor thread.");
     pn_connection_wake(conn);
-    pthread_join(event_thread_id, NULL /* no return value */);
-    memset(&event_thread_id, 0, sizeof(event_thread_id));
-  }
-
-  /* Free the remaining out_messages */
-  cdm = DEQ_HEAD(out_messages);
-  while (cdm) {
-    DEQ_REMOVE_HEAD(out_messages);
-    cd_message_free(cdm);
-    cdm = DEQ_HEAD(out_messages);
   }
+  pthread_join(event_thread_id, NULL /* no return value */);
+  memset(&event_thread_id, 0, sizeof(event_thread_id));
 
-  if (proactor != NULL) {
-    pn_proactor_free(proactor);
-  }
+  DEBUG("amqp1 plugin: proactor thread exited.");
 
   if (transport != NULL) {
     amqp1_config_transport_free(transport);
@@ -666,9 +717,8 @@ static int amqp1_shutdown(void) /* {{{ */
   return 0;
 } /* }}} int amqp1_shutdown */
 
-void module_register(void)
-{
+void module_register(void) {
   plugin_register_complex_config("amqp1", amqp1_config);
   plugin_register_init("amqp1", amqp1_init);
-  plugin_register_shutdown("amqp1",amqp1_shutdown);
+  plugin_register_shutdown("amqp1", amqp1_shutdown);
 } /* void module_register */