#include "collectd.h"
-#include "common.h"
#include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_deq.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/deq/deq.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
#include "utils_random.h"
#include <proton/condition.h>
while (cdm) {
DEQ_REMOVE_HEAD(out_messages);
DEQ_INSERT_TAIL(to_send, cdm);
- if (DEQ_SIZE(to_send) == link_credit)
+ if (DEQ_SIZE(to_send) == (size_t)link_credit)
break;
cdm = DEQ_HEAD(out_messages);
}
cdm->mbuf.size = BUFSIZE;
int status;
+ char *start;
while ((status = pn_message_encode(message, cdm->mbuf.start,
&cdm->mbuf.size)) == PN_OVERFLOW) {
DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
cdm->mbuf.size *= 2;
- cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ if (start == NULL) {
+ status = -1;
+ break;
+ } else {
+ cdm->mbuf.start = start;
+ }
}
if (status != 0) {
ERROR("amqp1 plugin: error encoding message: %s",
pn_error_text(pn_message_error(message)));
pn_message_free(message);
- cd_message_free(cdm);
return -1;
}
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
{
+ int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
size_t bufsize = BUFSIZE;
}
cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL) {
+ ERROR("amqp1 plugin: notify failed");
+ return -1;
+ }
+
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
case AMQP1_FORMAT_JSON:
format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
- int status = format_json_notification(cdm->mbuf.start, bufsize, n);
+ status = format_json_notification(cdm->mbuf.start, bufsize, n);
if (status != 0) {
ERROR("amqp1 plugin: formatting notification failed");
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: notify format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
/* encode message and place on outbound queue */
- return encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: notify enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
} /* }}} int amqp1_notify */
}
cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ return -1;
+ }
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
if (status != 0) {
ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format cmd failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
instance->store_rates);
- format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ if (status != 0) {
+ ERROR("amqp1 plugin: format_json_finalize failed with status %i.",
+ status);
+ cd_message_free(cdm);
+ return status;
+ }
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_GRAPHITE:
status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
instance->escape_char, instance->graphite_flags);
if (status != 0) {
ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format graphite failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
/* encode message and place on outbound queue */
- return encqueue(cdm, instance);
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: write enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
} /* }}} int amqp1_write */
sfree(transport->name);
sfree(transport->host);
+ sfree(transport->port);
sfree(transport->user);
sfree(transport->password);
sfree(transport->address);
else if (strcasecmp("Format", child->key) == 0) {
char *key = NULL;
status = cf_util_get_string(child, &key);
- if (status != 0)
+ if (status != 0) {
+ amqp1_config_instance_free(instance);
return status;
+ }
assert(key != NULL);
if (strcasecmp(key, "Command") == 0) {
instance->format = AMQP1_FORMAT_COMMAND;
} else
WARNING("amqp1 plugin: Ignoring unknown "
"instance configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
break;
return status;
} else {
char tpname[DATA_MAX_NAME_LEN];
- status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+ status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
if ((status < 0) || (size_t)status >= sizeof(tpname)) {
ERROR("amqp1 plugin: Instance name would have been truncated.");
+ amqp1_config_instance_free(instance);
return -1;
}
- status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
- transport->address, instance->name);
+ status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+ transport->address, instance->name);
if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
ERROR("amqp1 plugin: send_to address would have been truncated.");
+ amqp1_config_instance_free(instance);
return -1;
}
if (instance->notify) {
status = plugin_register_notification(
tpname, amqp1_notify,
&(user_data_t){
- .data = instance, .free_func = amqp1_config_instance_free,
+ .data = instance,
+ .free_func = amqp1_config_instance_free,
});
} else {
- status = plugin_register_write(
- tpname, amqp1_write,
- &(user_data_t){
- .data = instance, .free_func = amqp1_config_instance_free,
- });
+ status =
+ plugin_register_write(tpname, amqp1_write,
+ &(user_data_t){
+ .data = instance,
+ .free_func = amqp1_config_instance_free,
+ });
}
if (status != 0) {
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
if (strcasecmp("Transport", child->key) == 0)
amqp1_config_transport(child);
else
- WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+ WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
child->key);
}