octo code review changes
[collectd.git] / src / amqp1.c
index e60142b..67c96b7 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_deq.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/deq/deq.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
 #include "utils_random.h"
 
 #include <proton/condition.h>
@@ -135,7 +135,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
     while (cdm) {
       DEQ_REMOVE_HEAD(out_messages);
       DEQ_INSERT_TAIL(to_send, cdm);
-      if (DEQ_SIZE(to_send) == link_credit)
+      if (DEQ_SIZE(to_send) == (size_t)link_credit)
         break;
       cdm = DEQ_HEAD(out_messages);
     }
@@ -319,18 +319,24 @@ static int encqueue(cd_message_t *cdm,
   cdm->mbuf.size = BUFSIZE;
 
   int status;
+  char *start;
   while ((status = pn_message_encode(message, cdm->mbuf.start,
                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
     DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
     cdm->mbuf.size *= 2;
-    cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+    start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+    if (start == NULL) {
+      status = -1;
+      break;
+    } else {
+      cdm->mbuf.start = start;
+    }
   }
 
   if (status != 0) {
     ERROR("amqp1 plugin: error encoding message: %s",
           pn_error_text(pn_message_error(message)));
     pn_message_free(message);
-    cd_message_free(cdm);
     return -1;
   }
 
@@ -351,6 +357,7 @@ static int encqueue(cd_message_t *cdm,
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
 {
+  int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
   size_t bufsize = BUFSIZE;
@@ -365,27 +372,51 @@ static int amqp1_notify(notification_t const *n,
   }
 
   cd_message_t *cdm = malloc(sizeof(*cdm));
+  if (cdm == NULL) {
+    ERROR("amqp1 plugin: notify failed");
+    return -1;
+  }
+
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  char *start = malloc(bufsize);
+  if (start == NULL) {
+    ERROR("amqp1 plugin: malloc failed");
+    free(cdm);
+    return -1;
+  }
+  cdm->mbuf.size = bufsize;
+  cdm->mbuf.start = start;
   cdm->instance = instance;
 
   switch (instance->format) {
   case AMQP1_FORMAT_JSON:
     format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
-    int status = format_json_notification(cdm->mbuf.start, bufsize, n);
+    status = format_json_notification(cdm->mbuf.start, bufsize, n);
     if (status != 0) {
       ERROR("amqp1 plugin: formatting notification failed");
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: notify format json failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   default:
     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+    cd_message_free(cdm);
     return -1;
   }
 
   /* encode message and place on outbound queue */
-  return encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
+  if (status != 0) {
+    ERROR("amqp1 plugin: notify enqueue failed");
+    cd_message_free(cdm);
+  }
+  return status;
 
 } /* }}} int amqp1_notify */
 
@@ -406,8 +437,19 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
 
   cd_message_t *cdm = malloc(sizeof(*cdm));
+  if (cdm == NULL) {
+    ERROR("amqp1 plugin: malloc failed.");
+    return -1;
+  }
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  char *start = malloc(bufsize);
+  if (start == NULL) {
+    ERROR("amqp1 plugin: malloc failed.");
+    free(cdm);
+    return -1;
+  }
+  cdm->mbuf.size = bufsize;
+  cdm->mbuf.start = start;
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -415,16 +457,33 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
     if (status != 0) {
       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format cmd failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   case AMQP1_FORMAT_JSON:
     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
                            instance->store_rates);
-    format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    if (status != 0) {
+      ERROR("amqp1 plugin: format_json_finalize failed with status %i.",
+            status);
+      cd_message_free(cdm);
+      return status;
+    }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format json failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   case AMQP1_FORMAT_GRAPHITE:
     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
@@ -432,17 +491,29 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                              instance->escape_char, instance->graphite_flags);
     if (status != 0) {
       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format graphite failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   default:
     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+    cd_message_free(cdm);
     return -1;
   }
 
   /* encode message and place on outbound queue */
-  return encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
+  if (status != 0) {
+    ERROR("amqp1 plugin: write enqueue failed");
+    cd_message_free(cdm);
+  }
+  return status;
 
 } /* }}} int amqp1_write */
 
@@ -455,6 +526,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */
 
   sfree(transport->name);
   sfree(transport->host);
+  sfree(transport->port);
   sfree(transport->user);
   sfree(transport->password);
   sfree(transport->address);
@@ -541,7 +613,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     } else
       WARNING("amqp1 plugin: Ignoring unknown "
               "instance configuration option "
-              "\%s\".",
+              "\"%s\".",
               child->key);
     if (status != 0)
       break;
@@ -552,13 +624,13 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     return status;
   } else {
     char tpname[DATA_MAX_NAME_LEN];
-    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
     if ((status < 0) || (size_t)status >= sizeof(tpname)) {
       ERROR("amqp1 plugin: Instance name would have been truncated.");
       return -1;
     }
-    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
-                      transport->address, instance->name);
+    status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                       transport->address, instance->name);
     if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
       ERROR("amqp1 plugin: send_to address would have been truncated.");
       return -1;
@@ -567,14 +639,16 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
       status = plugin_register_notification(
           tpname, amqp1_notify,
           &(user_data_t){
-              .data = instance, .free_func = amqp1_config_instance_free,
+              .data = instance,
+              .free_func = amqp1_config_instance_free,
           });
     } else {
-      status = plugin_register_write(
-          tpname, amqp1_write,
-          &(user_data_t){
-              .data = instance, .free_func = amqp1_config_instance_free,
-          });
+      status =
+          plugin_register_write(tpname, amqp1_write,
+                                &(user_data_t){
+                                    .data = instance,
+                                    .free_func = amqp1_config_instance_free,
+                                });
     }
 
     if (status != 0) {
@@ -622,7 +696,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
     else
       WARNING("amqp1 plugin: Ignoring unknown "
               "transport configuration option "
-              "\%s\".",
+              "\"%s\".",
               child->key);
 
     if (status != 0)
@@ -644,7 +718,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
     if (strcasecmp("Transport", child->key) == 0)
       amqp1_config_transport(child);
     else
-      WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+      WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
               child->key);
   }