Add connection retry
authorAndrew Smith <ansmith@redhat.com>
Mon, 16 Apr 2018 13:50:21 +0000 (09:50 -0400)
committerAndrew Smith <ansmith@redhat.com>
Mon, 16 Apr 2018 13:50:21 +0000 (09:50 -0400)
src/amqp1.c
src/collectd.conf.in
src/collectd.conf.pod

index d7be877..3397f52 100644 (file)
@@ -62,6 +62,7 @@ typedef struct amqp1_config_transport_t {
   char *user;
   char *password;
   char *address;
+  int  retry_delay;
 } amqp1_config_transport_t;
 
 typedef struct amqp1_config_instance_t {
@@ -92,7 +93,6 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t);
  * Globals
  */
 pn_connection_t *conn = NULL;
-pn_session_t *ssn = NULL;
 pn_link_t *sender = NULL;
 pn_proactor_t *proactor = NULL;
 pthread_mutex_t send_lock;
@@ -100,8 +100,8 @@ cd_message_list_t out_messages;
 uint64_t cd_tag = 1;
 uint64_t acknowledged = 0;
 amqp1_config_transport_t *transport = NULL;
-bool finished = false;
 
+static bool stopping = false;
 static int event_thread_running = 0;
 static pthread_t event_thread_id;
 
@@ -124,6 +124,10 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   int event_count = 0;
   pn_delivery_t *dlv;
 
+  if (stopping){
+    return 0;
+  }
+
   DEQ_INIT(to_send);
 
   pthread_mutex_lock(&send_lock);
@@ -162,6 +166,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   return event_count;
 } /* }}} int amqp1_send_out_messages */
 
+
 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
 {
   if (pn_condition_is_set(cond)) {
@@ -181,7 +186,7 @@ static bool handle(pn_event_t *event) /* {{{ */
     conn = pn_event_connection(event);
     pn_connection_set_container(conn, transport->address);
     pn_connection_open(conn);
-    ssn = pn_session(conn);
+    pn_session_t *ssn = pn_session(conn);
     pn_session_open(ssn);
     sender = pn_sender(ssn, "cd-sender");
     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
@@ -206,7 +211,7 @@ static bool handle(pn_event_t *event) /* {{{ */
   }
 
   case PN_CONNECTION_WAKE: {
-    if (!finished) {
+    if (!stopping) {
       amqp1_send_out_messages(sender);
     }
     break;
@@ -250,17 +255,53 @@ static bool handle(pn_event_t *event) /* {{{ */
 
 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
 {
+  char addr[PN_MAX_ADDR];
+  cd_message_t *cdm;
+
+  /* setup proactor */
+  proactor = pn_proactor();
+  pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
+
+  while (!stopping) {
+    /* make connection */
+    conn = pn_connection();
+    if (transport->user != NULL) {
+      pn_connection_set_user(conn, transport->user);
+      pn_connection_set_password(conn, transport->password);
+    }
+    pn_proactor_connect(proactor, conn, addr);
 
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(proactor);
-    pn_event_t *e;
-    for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(e)) {
-        finished = true;
+    bool engine_running = true;
+    while (engine_running && !stopping) {
+      pn_event_batch_t *events = pn_proactor_wait(proactor);
+      pn_event_t *e;
+      while (( e = pn_event_batch_next(events))){
+        engine_running = handle(e);
+        if (!engine_running) {
+          break;
+        }
       }
+      pn_proactor_done(proactor, events);
+    }
+
+    pn_proactor_release_connection(conn);
+
+    DEBUG("amqp1 plugin: retrying connection");
+    int delay = transport->retry_delay;
+    while (delay-- > 0 && !stopping) {
+      sleep(1.0);
     }
-    pn_proactor_done(proactor, events);
-  } while (!finished);
+  }
+
+  pn_proactor_disconnect(proactor, NULL);
+
+  /* Free the remaining out_messages */
+  cdm = DEQ_HEAD(out_messages);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(out_messages);
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(out_messages);
+  }
 
   event_thread_running = 0;
 
@@ -554,6 +595,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 
   /* Initialize transport configuration {{{ */
   transport->name = NULL;
+  transport->retry_delay = 1;
 
   status = cf_util_get_string(ci, &transport->name);
   if (status != 0) {
@@ -574,6 +616,8 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_string(child, &transport->password);
     else if (strcasecmp("Address", child->key) == 0)
       status = cf_util_get_string(child, &transport->address);
+    else if (strcasecmp("RetryDelay", child->key) == 0)
+      status = cf_util_get_int(child, &transport->retry_delay);
     else if (strcasecmp("Instance", child->key) == 0)
       amqp1_config_instance(child);
     else
@@ -610,7 +654,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_init(void) /* {{{ */
 {
-  char addr[PN_MAX_ADDR];
   int status;
   char errbuf[1024];
 
@@ -621,20 +664,12 @@ static int amqp1_init(void) /* {{{ */
 
   if (proactor == NULL) {
     pthread_mutex_init(&send_lock, /* attr = */ NULL);
-    proactor = pn_proactor();
-    pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
-    conn = pn_connection();
-    if (transport->user != NULL) {
-      pn_connection_set_user(conn, transport->user);
-      pn_connection_set_password(conn, transport->password);
-    }
-    pn_proactor_connect(proactor, conn, addr);
     /* start_thread */
     status =
         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
                              event_thread, NULL /* no argument */, "handle");
     if (status != 0) {
-      ERROR("amqp1: pthread_create failed: %s",
+      ERROR("amqp1 plugin: pthread_create failed: %s",
             sstrerror(errno, errbuf, sizeof(errbuf)));
     } else {
       event_thread_running = 1;
@@ -645,28 +680,17 @@ static int amqp1_init(void) /* {{{ */
 
 static int amqp1_shutdown(void) /* {{{ */
 {
-  cd_message_t *cdm;
+  stopping = true;
 
   /* Stop the proactor thread */
-  if (event_thread_running != 0) {
-    finished = true;
-    /* activate the event thread */
+  if (event_thread_running == 1) {
+    DEBUG("amqp1 plugin: Shutting down proactor thread.");
     pn_connection_wake(conn);
-    pthread_join(event_thread_id, NULL /* no return value */);
-    memset(&event_thread_id, 0, sizeof(event_thread_id));
-  }
-
-  /* Free the remaining out_messages */
-  cdm = DEQ_HEAD(out_messages);
-  while (cdm) {
-    DEQ_REMOVE_HEAD(out_messages);
-    cd_message_free(cdm);
-    cdm = DEQ_HEAD(out_messages);
   }
+  pthread_join(event_thread_id, NULL /* no return value */);
+  memset(&event_thread_id, 0, sizeof(event_thread_id));
 
-  if (proactor != NULL) {
-    pn_proactor_free(proactor);
-  }
+  DEBUG("amqp1 plugin: proactor thread exited.");
 
   if (transport != NULL) {
     amqp1_config_transport_free(transport);
index ce1c00b..4d8403a 100644 (file)
 #    User "guest"
 #    Password "guest"
 #    Address "collectd"
+#    RetryDelay 1
 #    <Instance "log">
 #        Format JSON
 #        PreSettle false
index 4d6c38e..f0f51da 100644 (file)
@@ -756,6 +756,7 @@ B<Synopsis:>
     User "guest"
     Password "guest"
     Address "collectd"
+#    RetryDelay 1
     <Instance "some_name">
         Format "command"
         PreSettle false
@@ -806,6 +807,12 @@ default "guest"/"guest" is used.
 This option specifies the prefix for the send-to value in the message.
 By default, "collectd" will be used.
 
+=item B<RetryDelay> I<RetryDelay>
+
+When the AMQP1 connection is lost, defines the time in seconds to wait
+before attempting to reconnect. Defaults to 1, which implies attempt
+to reconnect at 1 second intervals.
+
 =back
 
 The following options are accepted within each I<Instance> block: