Merge pull request #2797 from rubenk/amqp-cleanups
authorRuben Kerkhof <ruben@rubenkerkhof.com>
Thu, 24 May 2018 18:24:28 +0000 (20:24 +0200)
committerGitHub <noreply@github.com>
Thu, 24 May 2018 18:24:28 +0000 (20:24 +0200)
Amqp1 cleanups

src/amqp1.c

index 4ba7359..e60142b 100644 (file)
@@ -92,16 +92,16 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t);
 /*
  * Globals
  */
-static pn_connection_t *conn = NULL;
-static pn_link_t *sender = NULL;
-static pn_proactor_t *proactor = NULL;
+static pn_connection_t *conn;
+static pn_link_t *sender;
+static pn_proactor_t *proactor;
 static pthread_mutex_t send_lock;
 static cd_message_list_t out_messages;
 static uint64_t cd_tag = 1;
-static uint64_t acknowledged = 0;
-static amqp1_config_transport_t *transport = NULL;
-static bool stopping = false;
-static int event_thread_running = 0;
+static uint64_t acknowledged;
+static amqp1_config_transport_t *transport;
+static bool stopping;
+static bool event_thread_running;
 static pthread_t event_thread_id;
 
 /*
@@ -299,7 +299,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
     cdm = DEQ_HEAD(out_messages);
   }
 
-  event_thread_running = 0;
+  event_thread_running = false;
 
   return NULL;
 } /* }}} void event_thread */
@@ -307,27 +307,23 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
 static int encqueue(cd_message_t *cdm,
                     amqp1_config_instance_t *instance) /* {{{ */
 {
-  size_t bufsize = BUFSIZE;
-  pn_data_t *body;
-  pn_message_t *message;
-  int status = 0;
-
   /* encode message */
-  message = pn_message();
+  pn_message_t *message = pn_message();
   pn_message_set_address(message, instance->send_to);
-  body = pn_message_body(message);
+  pn_data_t *body = pn_message_body(message);
   pn_data_clear(body);
   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
-  cdm->mbuf.size = bufsize;
-  while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+  cdm->mbuf.size = BUFSIZE;
+
+  int status;
+  while ((status = pn_message_encode(message, cdm->mbuf.start,
                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
-    DEBUG("amqp1 plugin: increasing message buffer size %i",
-          (int)cdm->mbuf.size);
+    DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
     cdm->mbuf.size *= 2;
-    cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+    cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
   }
 
   if (status != 0) {
@@ -345,7 +341,7 @@ static int encqueue(cd_message_t *cdm,
   pn_message_free(message);
 
   /* activate the sender */
-  if (conn != NULL) {
+  if (conn) {
     pn_connection_wake(conn);
   }
 
@@ -355,31 +351,28 @@ static int encqueue(cd_message_t *cdm,
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
 {
-  amqp1_config_instance_t *instance;
-  int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((n == NULL) || (user_data == NULL))
+  if (n == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != true) {
     ERROR("amqp1 plugin: write notification failed");
   }
 
-  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+  cd_message_t *cdm = malloc(sizeof(*cdm));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
   case AMQP1_FORMAT_JSON:
-    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
-    status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+    format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+    int status = format_json_notification(cdm->mbuf.start, bufsize, n);
     if (status != 0) {
       ERROR("amqp1 plugin: formatting notification failed");
       return status;
@@ -392,33 +385,29 @@ static int amqp1_notify(notification_t const *n,
   }
 
   /* encode message and place on outbound queue */
-  status = encqueue(cdm, instance);
+  return encqueue(cdm, instance);
 
-  return status;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                        user_data_t *user_data) {
-  amqp1_config_instance_t *instance;
   int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
-      (user_data == NULL))
+  if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != false) {
     ERROR("amqp1 plugin: write failed");
   }
 
-  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+  cd_message_t *cdm = malloc(sizeof(*cdm));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -452,10 +441,9 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     return -1;
   }
 
-  /* encode message and place on outboud queue */
-  encqueue(cdm, instance);
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
 
-  return 0;
 } /* }}} int amqp1_write */
 
 static void amqp1_config_transport_free(void *ptr) /* {{{ */
@@ -490,17 +478,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */
 
 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-  char *key = NULL;
-  amqp1_config_instance_t *instance;
-
-  instance = calloc(1, sizeof(*instance));
+  amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
   if (instance == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
     return ENOMEM;
   }
 
-  status = cf_util_get_string(ci, &instance->name);
+  int status = cf_util_get_string(ci, &instance->name);
   if (status != 0) {
     sfree(instance);
     return status;
@@ -514,11 +498,10 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Notify", child->key) == 0)
       status = cf_util_get_boolean(child, &instance->notify);
     else if (strcasecmp("Format", child->key) == 0) {
+      char *key = NULL;
       status = cf_util_get_string(child, &key);
       if (status != 0)
         return status;
-      /* TODO: goto errout */
-      //          goto errout;
       assert(key != NULL);
       if (strcasecmp(key, "Command") == 0) {
         instance->format = AMQP1_FORMAT_COMMAND;
@@ -580,7 +563,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
       ERROR("amqp1 plugin: send_to address would have been truncated.");
       return -1;
     }
-    if (instance->notify == true) {
+    if (instance->notify) {
       status = plugin_register_notification(
           tpname, amqp1_notify,
           &(user_data_t){
@@ -604,8 +587,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-
   transport = calloc(1, sizeof(*transport));
   if (transport == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
@@ -615,7 +596,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
   /* Initialize transport configuration {{{ */
   transport->retry_delay = 1;
 
-  status = cf_util_get_string(ci, &transport->name);
+  int status = cf_util_get_string(ci, &transport->name);
   if (status != 0) {
     sfree(transport);
     return status;
@@ -672,9 +653,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_init(void) /* {{{ */
 {
-  int status;
-  char errbuf[1024];
-
   if (transport == NULL) {
     ERROR("amqp1: init failed, no transport configured");
     return -1;
@@ -683,14 +661,13 @@ static int amqp1_init(void) /* {{{ */
   if (proactor == NULL) {
     pthread_mutex_init(&send_lock, /* attr = */ NULL);
     /* start_thread */
-    status =
+    int status =
         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
                              event_thread, NULL /* no argument */, "handle");
     if (status != 0) {
-      ERROR("amqp1 plugin: pthread_create failed: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
+      ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
     } else {
-      event_thread_running = 1;
+      event_thread_running = true;
     }
   }
   return 0;
@@ -701,7 +678,7 @@ static int amqp1_shutdown(void) /* {{{ */
   stopping = true;
 
   /* Stop the proactor thread */
-  if (event_thread_running == 1) {
+  if (event_thread_running) {
     DEBUG("amqp1 plugin: Shutting down proactor thread.");
     pn_connection_wake(conn);
   }
@@ -710,7 +687,7 @@ static int amqp1_shutdown(void) /* {{{ */
 
   DEBUG("amqp1 plugin: proactor thread exited.");
 
-  if (transport != NULL) {
+  if (transport) {
     amqp1_config_transport_free(transport);
   }