amqp1: clean it up a little
authorRuben Kerkhof <ruben@rubenkerkhof.com>
Thu, 24 May 2018 11:47:59 +0000 (13:47 +0200)
committerRuben Kerkhof <ruben@rubenkerkhof.com>
Thu, 24 May 2018 11:47:59 +0000 (13:47 +0200)
A few things changed during the time this plugin was pending review.
Update the code to adhere to these changes.

src/amqp1.c

index eecd300..7cbdb5f 100644 (file)
@@ -307,25 +307,21 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
 static int encqueue(cd_message_t *cdm,
                     amqp1_config_instance_t *instance) /* {{{ */
 {
-  size_t bufsize = BUFSIZE;
-  pn_data_t *body;
-  pn_message_t *message;
-  int status = 0;
-
   /* encode message */
-  message = pn_message();
+  pn_message_t *message = pn_message();
   pn_message_set_address(message, instance->send_to);
-  body = pn_message_body(message);
+  pn_data_t *body = pn_message_body(message);
   pn_data_clear(body);
   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
-  cdm->mbuf.size = bufsize;
-  while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+  cdm->mbuf.size = BUFSIZE;
+
+  int status;
+  while ((status = pn_message_encode(message, cdm->mbuf.start,
                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
-    DEBUG("amqp1 plugin: increasing message buffer size %i",
-          (int)cdm->mbuf.size);
+    DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
     cdm->mbuf.size *= 2;
     cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
   }
@@ -345,7 +341,7 @@ static int encqueue(cd_message_t *cdm,
   pn_message_free(message);
 
   /* activate the sender */
-  if (conn != NULL) {
+  if (conn) {
     pn_connection_wake(conn);
   }
 
@@ -355,31 +351,28 @@ static int encqueue(cd_message_t *cdm,
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
 {
-  amqp1_config_instance_t *instance;
-  int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((n == NULL) || (user_data == NULL))
+  if (n == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != true) {
     ERROR("amqp1 plugin: write notification failed");
   }
 
-  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+  cd_message_t *cdm = malloc(sizeof(*cdm));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
   case AMQP1_FORMAT_JSON:
-    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
-    status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+    format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+    int status = format_json_notification(cdm->mbuf.start, bufsize, n);
     if (status != 0) {
       ERROR("amqp1 plugin: formatting notification failed");
       return status;
@@ -392,33 +385,29 @@ static int amqp1_notify(notification_t const *n,
   }
 
   /* encode message and place on outbound queue */
-  status = encqueue(cdm, instance);
+  return encqueue(cdm, instance);
 
-  return status;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                        user_data_t *user_data) {
-  amqp1_config_instance_t *instance;
   int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
-      (user_data == NULL))
+  if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != false) {
     ERROR("amqp1 plugin: write failed");
   }
 
-  cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+  cd_message_t *cdm = malloc(sizeof(*cdm));
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -452,10 +441,9 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     return -1;
   }
 
-  /* encode message and place on outboud queue */
-  encqueue(cdm, instance);
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
 
-  return 0;
 } /* }}} int amqp1_write */
 
 static void amqp1_config_transport_free(void *ptr) /* {{{ */
@@ -490,17 +478,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */
 
 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-  char *key = NULL;
-  amqp1_config_instance_t *instance;
-
-  instance = calloc(1, sizeof(*instance));
+  amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
   if (instance == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
     return ENOMEM;
   }
 
-  status = cf_util_get_string(ci, &instance->name);
+  int status = cf_util_get_string(ci, &instance->name);
   if (status != 0) {
     sfree(instance);
     return status;
@@ -514,11 +498,10 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Notify", child->key) == 0)
       status = cf_util_get_boolean(child, &instance->notify);
     else if (strcasecmp("Format", child->key) == 0) {
+      char *key = NULL;
       status = cf_util_get_string(child, &key);
       if (status != 0)
         return status;
-      /* TODO: goto errout */
-      //          goto errout;
       assert(key != NULL);
       if (strcasecmp(key, "Command") == 0) {
         instance->format = AMQP1_FORMAT_COMMAND;
@@ -580,7 +563,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
       ERROR("amqp1 plugin: send_to address would have been truncated.");
       return -1;
     }
-    if (instance->notify == true) {
+    if (instance->notify) {
       status = plugin_register_notification(
           tpname, amqp1_notify,
           &(user_data_t){
@@ -604,8 +587,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-
   transport = calloc(1, sizeof(*transport));
   if (transport == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
@@ -615,7 +596,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
   /* Initialize transport configuration {{{ */
   transport->retry_delay = 1;
 
-  status = cf_util_get_string(ci, &transport->name);
+  int status = cf_util_get_string(ci, &transport->name);
   if (status != 0) {
     sfree(transport);
     return status;
@@ -672,9 +653,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_init(void) /* {{{ */
 {
-  int status;
-  char errbuf[1024];
-
   if (transport == NULL) {
     ERROR("amqp1: init failed, no transport configured");
     return -1;
@@ -683,12 +661,11 @@ static int amqp1_init(void) /* {{{ */
   if (proactor == NULL) {
     pthread_mutex_init(&send_lock, /* attr = */ NULL);
     /* start_thread */
-    status =
+    int status =
         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
                              event_thread, NULL /* no argument */, "handle");
     if (status != 0) {
-      ERROR("amqp1 plugin: pthread_create failed: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
+      ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
     } else {
       event_thread_running = true;
     }
@@ -710,7 +687,7 @@ static int amqp1_shutdown(void) /* {{{ */
 
   DEBUG("amqp1 plugin: proactor thread exited.");
 
-  if (transport != NULL) {
+  if (transport) {
     amqp1_config_transport_free(transport);
   }