Fix compile time issues
[collectd.git] / src / amqp1.c
index 3397f52..67c96b7 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_deq.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/deq/deq.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
 #include "utils_random.h"
 
 #include <proton/condition.h>
 #define AMQP1_FORMAT_COMMAND 1
 #define AMQP1_FORMAT_GRAPHITE 2
 
-typedef struct amqp1_config_transport_t {
-  DEQ_LINKS(struct amqp1_config_transport_t);
+typedef struct amqp1_config_transport_s {
+  DEQ_LINKS(struct amqp1_config_transport_s);
   char *name;
   char *host;
   char *port;
   char *user;
   char *password;
   char *address;
-  int  retry_delay;
+  int retry_delay;
 } amqp1_config_transport_t;
 
-typedef struct amqp1_config_instance_t {
-  DEQ_LINKS(struct amqp1_config_instance_t);
+typedef struct amqp1_config_instance_s {
+  DEQ_LINKS(struct amqp1_config_instance_s);
   char *name;
-  _Bool notify;
+  bool notify;
   uint8_t format;
   unsigned int graphite_flags;
-  _Bool store_rates;
+  bool store_rates;
   char *prefix;
   char *postfix;
   char escape_char;
-  _Bool pre_settle;
-  char send_to[128];
+  bool pre_settle;
+  char send_to[1024];
 } amqp1_config_instance_t;
 
 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
 
-typedef struct cd_message_t {
-  DEQ_LINKS(struct cd_message_t);
-  pn_bytes_t mbuf;
+typedef struct cd_message_s {
+  DEQ_LINKS(struct cd_message_s);
+  pn_rwbytes_t mbuf;
   amqp1_config_instance_t *instance;
 } cd_message_t;
 
@@ -92,26 +92,23 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t);
 /*
  * Globals
  */
-pn_connection_t *conn = NULL;
-pn_link_t *sender = NULL;
-pn_proactor_t *proactor = NULL;
-pthread_mutex_t send_lock;
-cd_message_list_t out_messages;
-uint64_t cd_tag = 1;
-uint64_t acknowledged = 0;
-amqp1_config_transport_t *transport = NULL;
-
-static bool stopping = false;
-static int event_thread_running = 0;
+static pn_connection_t *conn;
+static pn_link_t *sender;
+static pn_proactor_t *proactor;
+static pthread_mutex_t send_lock;
+static cd_message_list_t out_messages;
+static uint64_t cd_tag = 1;
+static uint64_t acknowledged;
+static amqp1_config_transport_t *transport;
+static bool stopping;
+static bool event_thread_running;
 static pthread_t event_thread_id;
 
 /*
  * Functions
  */
 static void cd_message_free(cd_message_t *cdm) {
-  if (cdm->mbuf.start) {
-    free((void *)cdm->mbuf.start);
-  }
+  free(cdm->mbuf.start);
   free(cdm);
 } /* }}} void cd_message_free */
 
@@ -124,7 +121,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   int event_count = 0;
   pn_delivery_t *dlv;
 
-  if (stopping){
+  if (stopping) {
     return 0;
   }
 
@@ -138,7 +135,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
     while (cdm) {
       DEQ_REMOVE_HEAD(out_messages);
       DEQ_INSERT_TAIL(to_send, cdm);
-      if (DEQ_SIZE(to_send) == link_credit)
+      if (DEQ_SIZE(to_send) == (size_t)link_credit)
         break;
       cdm = DEQ_HEAD(out_messages);
     }
@@ -166,7 +163,6 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   return event_count;
 } /* }}} int amqp1_send_out_messages */
 
-
 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
 {
   if (pn_condition_is_set(cond)) {
@@ -184,7 +180,7 @@ static bool handle(pn_event_t *event) /* {{{ */
 
   case PN_CONNECTION_INIT: {
     conn = pn_event_connection(event);
-    pn_connection_set_container(conn, transport->address);
+    pn_connection_set_container(conn, transport->name);
     pn_connection_open(conn);
     pn_session_t *ssn = pn_session(conn);
     pn_session_open(ssn);
@@ -275,7 +271,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
     while (engine_running && !stopping) {
       pn_event_batch_t *events = pn_proactor_wait(proactor);
       pn_event_t *e;
-      while (( e = pn_event_batch_next(events))){
+      while ((e = pn_event_batch_next(events))) {
         engine_running = handle(e);
         if (!engine_running) {
           break;
@@ -303,29 +299,46 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
     cdm = DEQ_HEAD(out_messages);
   }
 
-  event_thread_running = 0;
+  event_thread_running = false;
 
   return NULL;
 } /* }}} void event_thread */
 
-static void encqueue(cd_message_t *cdm,
-                     amqp1_config_instance_t *instance) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
 {
-  size_t bufsize = BUFSIZE;
-  pn_data_t *body;
-  pn_message_t *message;
-
   /* encode message */
-  message = pn_message();
+  pn_message_t *message = pn_message();
   pn_message_set_address(message, instance->send_to);
-  body = pn_message_body(message);
+  pn_data_t *body = pn_message_body(message);
   pn_data_clear(body);
-  pn_data_put_binary(body, cdm->mbuf);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
-  cdm->mbuf.size = bufsize;
-  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+  cdm->mbuf.size = BUFSIZE;
+
+  int status;
+  char *start;
+  while ((status = pn_message_encode(message, cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+    if (start == NULL) {
+      status = -1;
+      break;
+    } else {
+      cdm->mbuf.start = start;
+    }
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    return -1;
+  }
 
   pthread_mutex_lock(&send_lock);
   DEQ_INSERT_TAIL(out_messages, cdm);
@@ -334,79 +347,109 @@ static void encqueue(cd_message_t *cdm,
   pn_message_free(message);
 
   /* activate the sender */
-  if (conn != NULL) {
+  if (conn) {
     pn_connection_wake(conn);
   }
 
-} /* }}} void encqueue */
+  return 0;
+} /* }}} int encqueue */
 
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
 {
-  amqp1_config_instance_t *instance;
   int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((n == NULL) || (user_data == NULL))
+  if (n == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != true) {
     ERROR("amqp1 plugin: write notification failed");
   }
 
-  cdm = NEW(cd_message_t);
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  if (cdm == NULL) {
+    ERROR("amqp1 plugin: notify failed");
+    return -1;
+  }
+
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  char *start = malloc(bufsize);
+  if (start == NULL) {
+    ERROR("amqp1 plugin: malloc failed");
+    free(cdm);
+    return -1;
+  }
+  cdm->mbuf.size = bufsize;
+  cdm->mbuf.start = start;
   cdm->instance = instance;
 
   switch (instance->format) {
   case AMQP1_FORMAT_JSON:
-    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
-    status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+    format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+    status = format_json_notification(cdm->mbuf.start, bufsize, n);
     if (status != 0) {
       ERROR("amqp1 plugin: formatting notification failed");
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: notify format json failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   default:
     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+    cd_message_free(cdm);
     return -1;
   }
 
   /* encode message and place on outbound queue */
-  encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
+  if (status != 0) {
+    ERROR("amqp1 plugin: notify enqueue failed");
+    cd_message_free(cdm);
+  }
+  return status;
 
-  return 0;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                        user_data_t *user_data) {
-  amqp1_config_instance_t *instance;
   int status = 0;
   size_t bfree = BUFSIZE;
   size_t bfill = 0;
-  cd_message_t *cdm;
   size_t bufsize = BUFSIZE;
 
-  if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
-      (user_data == NULL))
+  if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
     return EINVAL;
 
-  instance = user_data->data;
+  amqp1_config_instance_t *instance = user_data->data;
 
   if (instance->notify != false) {
     ERROR("amqp1 plugin: write failed");
   }
 
-  cdm = NEW(cd_message_t);
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  if (cdm == NULL) {
+    ERROR("amqp1 plugin: malloc failed.");
+    return -1;
+  }
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  char *start = malloc(bufsize);
+  if (start == NULL) {
+    ERROR("amqp1 plugin: malloc failed.");
+    free(cdm);
+    return -1;
+  }
+  cdm->mbuf.size = bufsize;
+  cdm->mbuf.start = start;
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -414,16 +457,33 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
     if (status != 0) {
       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format cmd failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   case AMQP1_FORMAT_JSON:
     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
                            instance->store_rates);
-    format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    if (status != 0) {
+      ERROR("amqp1 plugin: format_json_finalize failed with status %i.",
+            status);
+      cd_message_free(cdm);
+      return status;
+    }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format json failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   case AMQP1_FORMAT_GRAPHITE:
     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
@@ -431,19 +491,30 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                              instance->escape_char, instance->graphite_flags);
     if (status != 0) {
       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+      cd_message_free(cdm);
       return status;
     }
     cdm->mbuf.size = strlen(cdm->mbuf.start);
+    if (cdm->mbuf.size >= BUFSIZE) {
+      ERROR("amqp1 plugin: format graphite failed");
+      cd_message_free(cdm);
+      return -1;
+    }
     break;
   default:
     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+    cd_message_free(cdm);
     return -1;
   }
 
-  /* encode message and place on outboud queue */
-  encqueue(cdm, instance);
+  /* encode message and place on outbound queue */
+  status = encqueue(cdm, instance);
+  if (status != 0) {
+    ERROR("amqp1 plugin: write enqueue failed");
+    cd_message_free(cdm);
+  }
+  return status;
 
-  return 0;
 } /* }}} int amqp1_write */
 
 static void amqp1_config_transport_free(void *ptr) /* {{{ */
@@ -455,6 +526,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */
 
   sfree(transport->name);
   sfree(transport->host);
+  sfree(transport->port);
   sfree(transport->user);
   sfree(transport->password);
   sfree(transport->address);
@@ -478,20 +550,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */
 
 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-  char *key = NULL;
-  amqp1_config_instance_t *instance;
-
-  instance = calloc(1, sizeof(*instance));
+  amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
   if (instance == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
     return ENOMEM;
   }
 
-  /* Initialize instance configuration {{{ */
-  instance->name = NULL;
-
-  status = cf_util_get_string(ci, &instance->name);
+  int status = cf_util_get_string(ci, &instance->name);
   if (status != 0) {
     sfree(instance);
     return status;
@@ -505,11 +570,10 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Notify", child->key) == 0)
       status = cf_util_get_boolean(child, &instance->notify);
     else if (strcasecmp("Format", child->key) == 0) {
+      char *key = NULL;
       status = cf_util_get_string(child, &key);
       if (status != 0)
         return status;
-      /* TODO: goto errout */
-      //          goto errout;
       assert(key != NULL);
       if (strcasecmp(key, "Command") == 0) {
         instance->format = AMQP1_FORMAT_COMMAND;
@@ -539,15 +603,17 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
       char *tmp_buff = NULL;
       status = cf_util_get_string(child, &tmp_buff);
-      if (strlen(tmp_buff) > 1)
-        WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
-                "only one character. Others will be ignored.");
-      instance->escape_char = tmp_buff[0];
+      if (status == 0) {
+        if (strlen(tmp_buff) > 1)
+          WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+                  "only one character. Others will be ignored.");
+        instance->escape_char = tmp_buff[0];
+      }
       sfree(tmp_buff);
     } else
       WARNING("amqp1 plugin: Ignoring unknown "
               "instance configuration option "
-              "\%s\".",
+              "\"%s\".",
               child->key);
     if (status != 0)
       break;
@@ -557,22 +623,32 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     amqp1_config_instance_free(instance);
     return status;
   } else {
-    char tpname[128];
-    snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
-    snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
-             transport->address, instance->name);
-    if (instance->notify == true) {
+    char tpname[DATA_MAX_NAME_LEN];
+    status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                       transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
+    if (instance->notify) {
       status = plugin_register_notification(
           tpname, amqp1_notify,
           &(user_data_t){
-              .data = instance, .free_func = amqp1_config_instance_free,
+              .data = instance,
+              .free_func = amqp1_config_instance_free,
           });
     } else {
-      status = plugin_register_write(
-          tpname, amqp1_write,
-          &(user_data_t){
-              .data = instance, .free_func = amqp1_config_instance_free,
-          });
+      status =
+          plugin_register_write(tpname, amqp1_write,
+                                &(user_data_t){
+                                    .data = instance,
+                                    .free_func = amqp1_config_instance_free,
+                                });
     }
 
     if (status != 0) {
@@ -585,8 +661,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
 {
-  int status = 0;
-
   transport = calloc(1, sizeof(*transport));
   if (transport == NULL) {
     ERROR("amqp1 plugin: calloc failed.");
@@ -594,10 +668,9 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
   }
 
   /* Initialize transport configuration {{{ */
-  transport->name = NULL;
   transport->retry_delay = 1;
 
-  status = cf_util_get_string(ci, &transport->name);
+  int status = cf_util_get_string(ci, &transport->name);
   if (status != 0) {
     sfree(transport);
     return status;
@@ -623,7 +696,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
     else
       WARNING("amqp1 plugin: Ignoring unknown "
               "transport configuration option "
-              "\%s\".",
+              "\"%s\".",
               child->key);
 
     if (status != 0)
@@ -645,7 +718,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
     if (strcasecmp("Transport", child->key) == 0)
       amqp1_config_transport(child);
     else
-      WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
+      WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
               child->key);
   }
 
@@ -654,9 +727,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */
 
 static int amqp1_init(void) /* {{{ */
 {
-  int status;
-  char errbuf[1024];
-
   if (transport == NULL) {
     ERROR("amqp1: init failed, no transport configured");
     return -1;
@@ -665,14 +735,13 @@ static int amqp1_init(void) /* {{{ */
   if (proactor == NULL) {
     pthread_mutex_init(&send_lock, /* attr = */ NULL);
     /* start_thread */
-    status =
+    int status =
         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
                              event_thread, NULL /* no argument */, "handle");
     if (status != 0) {
-      ERROR("amqp1 plugin: pthread_create failed: %s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
+      ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
     } else {
-      event_thread_running = 1;
+      event_thread_running = true;
     }
   }
   return 0;
@@ -683,7 +752,7 @@ static int amqp1_shutdown(void) /* {{{ */
   stopping = true;
 
   /* Stop the proactor thread */
-  if (event_thread_running == 1) {
+  if (event_thread_running) {
     DEBUG("amqp1 plugin: Shutting down proactor thread.");
     pn_connection_wake(conn);
   }
@@ -692,7 +761,7 @@ static int amqp1_shutdown(void) /* {{{ */
 
   DEBUG("amqp1 plugin: proactor thread exited.");
 
-  if (transport != NULL) {
+  if (transport) {
     amqp1_config_transport_free(transport);
   }