X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;h=67c96b750256550c90fee2776a338b170695c058;hb=21c84cec32921e6de8feaa5496f337496379ea23;hp=a6a2d35f8b20784541825335f11c4f680367e233;hpb=979b210145a48a04db82fe3d3daa3c67c51df28f;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index a6a2d35f..67c96b75 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -26,12 +26,12 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_cmd_putval.h" -#include "utils_deq.h" -#include "utils_format_graphite.h" -#include "utils_format_json.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/deq/deq.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" #include "utils_random.h" #include @@ -319,18 +319,24 @@ static int encqueue(cd_message_t *cdm, cdm->mbuf.size = BUFSIZE; int status; + char *start; while ((status = pn_message_encode(message, cdm->mbuf.start, &cdm->mbuf.size)) == PN_OVERFLOW) { DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size); cdm->mbuf.size *= 2; - cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size); + start = realloc(cdm->mbuf.start, cdm->mbuf.size); + if (start == NULL) { + status = -1; + break; + } else { + cdm->mbuf.start = start; + } } if (status != 0) { ERROR("amqp1 plugin: error encoding message: %s", pn_error_text(pn_message_error(message))); pn_message_free(message); - cd_message_free(cdm); return -1; } @@ -351,6 +357,7 @@ static int encqueue(cd_message_t *cdm, static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ { + int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; size_t bufsize = BUFSIZE; @@ -365,27 +372,51 @@ static int amqp1_notify(notification_t const *n, } cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: notify failed"); + return -1; + } + DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize)); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed"); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; cdm->instance = instance; switch (instance->format) { case AMQP1_FORMAT_JSON: format_json_initialize(cdm->mbuf.start, &bfill, &bfree); - int status = format_json_notification(cdm->mbuf.start, bufsize, n); + status = format_json_notification(cdm->mbuf.start, bufsize, n); if (status != 0) { ERROR("amqp1 plugin: formatting notification failed"); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: notify format json failed"); + cd_message_free(cdm); + return -1; + } break; default: ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format); + cd_message_free(cdm); return -1; } /* encode message and place on outbound queue */ - return encqueue(cdm, instance); + status = encqueue(cdm, instance); + if (status != 0) { + ERROR("amqp1 plugin: notify enqueue failed"); + cd_message_free(cdm); + } + return status; } /* }}} int amqp1_notify */ @@ -406,8 +437,19 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ } cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: malloc failed."); + return -1; + } DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize)); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed."); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; cdm->instance = instance; switch (instance->format) { @@ -415,16 +457,33 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl); if (status != 0) { ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format cmd failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_JSON: format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl, instance->store_rates); - format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + if (status != 0) { + ERROR("amqp1 plugin: format_json_finalize failed with status %i.", + status); + cd_message_free(cdm); + return status; + } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format json failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_GRAPHITE: status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, @@ -432,17 +491,29 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ instance->escape_char, instance->graphite_flags); if (status != 0) { ERROR("amqp1 plugin: format_graphite failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format graphite failed"); + cd_message_free(cdm); + return -1; + } break; default: ERROR("amqp1 plugin: Invalid write format (%i).", instance->format); + cd_message_free(cdm); return -1; } /* encode message and place on outbound queue */ - return encqueue(cdm, instance); + status = encqueue(cdm, instance); + if (status != 0) { + ERROR("amqp1 plugin: write enqueue failed"); + cd_message_free(cdm); + } + return status; } /* }}} int amqp1_write */ @@ -455,6 +526,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */ sfree(transport->name); sfree(transport->host); + sfree(transport->port); sfree(transport->user); sfree(transport->password); sfree(transport->address); @@ -541,7 +613,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ } else WARNING("amqp1 plugin: Ignoring unknown " "instance configuration option " - "\%s\".", + "\"%s\".", child->key); if (status != 0) break; @@ -552,13 +624,13 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ return status; } else { char tpname[DATA_MAX_NAME_LEN]; - status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); if ((status < 0) || (size_t)status >= sizeof(tpname)) { ERROR("amqp1 plugin: Instance name would have been truncated."); return -1; } - status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address, instance->name); + status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { ERROR("amqp1 plugin: send_to address would have been truncated."); return -1; @@ -567,14 +639,16 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ status = plugin_register_notification( tpname, amqp1_notify, &(user_data_t){ - .data = instance, .free_func = amqp1_config_instance_free, + .data = instance, + .free_func = amqp1_config_instance_free, }); } else { - status = plugin_register_write( - tpname, amqp1_write, - &(user_data_t){ - .data = instance, .free_func = amqp1_config_instance_free, - }); + status = + plugin_register_write(tpname, amqp1_write, + &(user_data_t){ + .data = instance, + .free_func = amqp1_config_instance_free, + }); } if (status != 0) { @@ -622,7 +696,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ else WARNING("amqp1 plugin: Ignoring unknown " "transport configuration option " - "\%s\".", + "\"%s\".", child->key); if (status != 0) @@ -644,7 +718,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("Transport", child->key) == 0) amqp1_config_transport(child); else - WARNING("amqp1 plugin: Ignoring unknown config option \%s\".", + WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".", child->key); }