#include "collectd.h"
-#include "common.h"
#include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_deq.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/deq/deq.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
#include "utils_random.h"
#include <proton/condition.h>
static uint64_t acknowledged;
static amqp1_config_transport_t *transport;
static bool stopping;
-static int event_thread_running;
+static bool event_thread_running;
static pthread_t event_thread_id;
/*
while (cdm) {
DEQ_REMOVE_HEAD(out_messages);
DEQ_INSERT_TAIL(to_send, cdm);
- if (DEQ_SIZE(to_send) == link_credit)
+ if (DEQ_SIZE(to_send) == (size_t)link_credit)
break;
cdm = DEQ_HEAD(out_messages);
}
cdm = DEQ_HEAD(out_messages);
}
- event_thread_running = 0;
+ event_thread_running = false;
return NULL;
} /* }}} void event_thread */
static int encqueue(cd_message_t *cdm,
amqp1_config_instance_t *instance) /* {{{ */
{
- size_t bufsize = BUFSIZE;
- pn_data_t *body;
- pn_message_t *message;
- int status = 0;
-
/* encode message */
- message = pn_message();
+ pn_message_t *message = pn_message();
pn_message_set_address(message, instance->send_to);
- body = pn_message_body(message);
+ pn_data_t *body = pn_message_body(message);
pn_data_clear(body);
pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
pn_data_exit(body);
/* put_binary copies and stores so ok to use mbuf */
- cdm->mbuf.size = bufsize;
- while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+ cdm->mbuf.size = BUFSIZE;
+
+ int status;
+ char *start;
+ while ((status = pn_message_encode(message, cdm->mbuf.start,
&cdm->mbuf.size)) == PN_OVERFLOW) {
- DEBUG("amqp1 plugin: increasing message buffer size %i",
- (int)cdm->mbuf.size);
+ DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
cdm->mbuf.size *= 2;
- cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+ start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+ if (start == NULL) {
+ status = -1;
+ break;
+ } else {
+ cdm->mbuf.start = start;
+ }
}
if (status != 0) {
ERROR("amqp1 plugin: error encoding message: %s",
pn_error_text(pn_message_error(message)));
pn_message_free(message);
- cd_message_free(cdm);
return -1;
}
pn_message_free(message);
/* activate the sender */
- if (conn != NULL) {
+ if (conn) {
pn_connection_wake(conn);
}
static int amqp1_notify(notification_t const *n,
user_data_t *user_data) /* {{{ */
{
- amqp1_config_instance_t *instance;
int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
- cd_message_t *cdm;
size_t bufsize = BUFSIZE;
- if ((n == NULL) || (user_data == NULL))
+ if (n == NULL || user_data == NULL)
return EINVAL;
- instance = user_data->data;
+ amqp1_config_instance_t *instance = user_data->data;
if (instance->notify != true) {
ERROR("amqp1 plugin: write notification failed");
}
- cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+ cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL) {
+ ERROR("amqp1 plugin: notify failed");
+ return -1;
+ }
+
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
case AMQP1_FORMAT_JSON:
- format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
- status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
+ format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+ status = format_json_notification(cdm->mbuf.start, bufsize, n);
if (status != 0) {
ERROR("amqp1 plugin: formatting notification failed");
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: notify format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
/* encode message and place on outbound queue */
status = encqueue(cdm, instance);
-
+ if (status != 0) {
+ ERROR("amqp1 plugin: notify enqueue failed");
+ cd_message_free(cdm);
+ }
return status;
+
} /* }}} int amqp1_notify */
static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data) {
- amqp1_config_instance_t *instance;
int status = 0;
size_t bfree = BUFSIZE;
size_t bfill = 0;
- cd_message_t *cdm;
size_t bufsize = BUFSIZE;
- if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
- (user_data == NULL))
+ if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
return EINVAL;
- instance = user_data->data;
+ amqp1_config_instance_t *instance = user_data->data;
if (instance->notify != false) {
ERROR("amqp1 plugin: write failed");
}
- cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
+ cd_message_t *cdm = malloc(sizeof(*cdm));
+ if (cdm == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ return -1;
+ }
DEQ_ITEM_INIT(cdm);
- cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
+ char *start = malloc(bufsize);
+ if (start == NULL) {
+ ERROR("amqp1 plugin: malloc failed.");
+ free(cdm);
+ return -1;
+ }
+ cdm->mbuf.size = bufsize;
+ cdm->mbuf.start = start;
cdm->instance = instance;
switch (instance->format) {
status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
if (status != 0) {
ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format cmd failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_JSON:
format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
instance->store_rates);
- format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+ if (status != 0) {
+ ERROR("amqp1 plugin: format_json_finalize failed with status %i.",
+ status);
+ cd_message_free(cdm);
+ return status;
+ }
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format json failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
case AMQP1_FORMAT_GRAPHITE:
status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
instance->escape_char, instance->graphite_flags);
if (status != 0) {
ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+ cd_message_free(cdm);
return status;
}
cdm->mbuf.size = strlen(cdm->mbuf.start);
+ if (cdm->mbuf.size >= BUFSIZE) {
+ ERROR("amqp1 plugin: format graphite failed");
+ cd_message_free(cdm);
+ return -1;
+ }
break;
default:
ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+ cd_message_free(cdm);
return -1;
}
- /* encode message and place on outboud queue */
- encqueue(cdm, instance);
+ /* encode message and place on outbound queue */
+ status = encqueue(cdm, instance);
+ if (status != 0) {
+ ERROR("amqp1 plugin: write enqueue failed");
+ cd_message_free(cdm);
+ }
+ return status;
- return 0;
} /* }}} int amqp1_write */
static void amqp1_config_transport_free(void *ptr) /* {{{ */
sfree(transport->name);
sfree(transport->host);
+ sfree(transport->port);
sfree(transport->user);
sfree(transport->password);
sfree(transport->address);
static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
{
- int status = 0;
- char *key = NULL;
- amqp1_config_instance_t *instance;
-
- instance = calloc(1, sizeof(*instance));
+ amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
if (instance == NULL) {
ERROR("amqp1 plugin: calloc failed.");
return ENOMEM;
}
- status = cf_util_get_string(ci, &instance->name);
+ int status = cf_util_get_string(ci, &instance->name);
if (status != 0) {
sfree(instance);
return status;
else if (strcasecmp("Notify", child->key) == 0)
status = cf_util_get_boolean(child, &instance->notify);
else if (strcasecmp("Format", child->key) == 0) {
+ char *key = NULL;
status = cf_util_get_string(child, &key);
if (status != 0)
return status;
- /* TODO: goto errout */
- // goto errout;
assert(key != NULL);
if (strcasecmp(key, "Command") == 0) {
instance->format = AMQP1_FORMAT_COMMAND;
} else
WARNING("amqp1 plugin: Ignoring unknown "
"instance configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
break;
ERROR("amqp1 plugin: send_to address would have been truncated.");
return -1;
}
- if (instance->notify == true) {
+ if (instance->notify) {
status = plugin_register_notification(
tpname, amqp1_notify,
&(user_data_t){
static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
{
- int status = 0;
-
transport = calloc(1, sizeof(*transport));
if (transport == NULL) {
ERROR("amqp1 plugin: calloc failed.");
/* Initialize transport configuration {{{ */
transport->retry_delay = 1;
- status = cf_util_get_string(ci, &transport->name);
+ int status = cf_util_get_string(ci, &transport->name);
if (status != 0) {
sfree(transport);
return status;
else
WARNING("amqp1 plugin: Ignoring unknown "
"transport configuration option "
- "\%s\".",
+ "\"%s\".",
child->key);
if (status != 0)
if (strcasecmp("Transport", child->key) == 0)
amqp1_config_transport(child);
else
- WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+ WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
child->key);
}
static int amqp1_init(void) /* {{{ */
{
- int status;
- char errbuf[1024];
-
if (transport == NULL) {
ERROR("amqp1: init failed, no transport configured");
return -1;
if (proactor == NULL) {
pthread_mutex_init(&send_lock, /* attr = */ NULL);
/* start_thread */
- status =
+ int status =
plugin_thread_create(&event_thread_id, NULL /* no attributes */,
event_thread, NULL /* no argument */, "handle");
if (status != 0) {
- ERROR("amqp1 plugin: pthread_create failed: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
} else {
- event_thread_running = 1;
+ event_thread_running = true;
}
}
return 0;
stopping = true;
/* Stop the proactor thread */
- if (event_thread_running == 1) {
+ if (event_thread_running) {
DEBUG("amqp1 plugin: Shutting down proactor thread.");
pn_connection_wake(conn);
}
DEBUG("amqp1 plugin: proactor thread exited.");
- if (transport != NULL) {
+ if (transport) {
amqp1_config_transport_free(transport);
}