**/
#include "collectd.h"
+
#include "common.h"
#include "plugin.h"
#include "utils_cmd_putval.h"
#include "utils_format_json.h"
#include "utils_format_graphite.h"
-#include <pthread.h>
-
#include <amqp.h>
#include <amqp_framing.h>
static int camqp_shutdown (void) /* {{{ */
{
- size_t i;
-
DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
subscriber_threads_num);
subscriber_threads_running = 0;
- for (i = 0; i < subscriber_threads_num; i++)
+ for (size_t i = 0; i < subscriber_threads_num; i++)
{
/* FIXME: Sending a signal is not very elegant here. Maybe find out how
* to use a timeout in the thread and check for the variable in regular
if (strcasecmp ("text/collectd", content_type) == 0)
{
- status = handle_putval (stderr, body);
+ status = cmd_handle_putval (stderr, body);
if (status != 0)
- ERROR ("amqp plugin: handle_putval failed with status %i.",
+ ERROR ("amqp plugin: cmd_handle_putval failed with status %i.",
status);
return (status);
}
static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
const char *buffer, const char *routing_key)
{
- amqp_basic_properties_t props;
int status;
status = camqp_connect (conf);
if (status != 0)
return (status);
- memset (&props, 0, sizeof (props));
- props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
- | AMQP_BASIC_DELIVERY_MODE_FLAG
- | AMQP_BASIC_APP_ID_FLAG;
+ amqp_basic_properties_t props = {
+ ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG,
+ .delivery_mode = conf->delivery_mode,
+ .app_id = amqp_cstring_bytes("collectd")
+ };
+
if (conf->format == CAMQP_FORMAT_COMMAND)
props.content_type = amqp_cstring_bytes("text/collectd");
else if (conf->format == CAMQP_FORMAT_JSON)
props.content_type = amqp_cstring_bytes("text/graphite");
else
assert (23 == 42);
- props.delivery_mode = conf->delivery_mode;
- props.app_id = amqp_cstring_bytes("collectd");
status = amqp_basic_publish(conf->connection,
/* channel = */ 1,
if ((ds == NULL) || (vl == NULL) || (conf == NULL))
return (EINVAL);
- memset (buffer, 0, sizeof (buffer));
-
if (conf->routing_key != NULL)
{
sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
}
else
{
- size_t i;
ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
vl->host,
vl->plugin, vl->plugin_instance,
/* Switch slashes (the only character forbidden by collectd) and dots
* (the separation character used by AMQP). */
- for (i = 0; routing_key[i] != 0; i++)
+ for (size_t i = 0; routing_key[i] != 0; i++)
{
if (routing_key[i] == '.')
routing_key[i] = '/';
if (conf->format == CAMQP_FORMAT_COMMAND)
{
- status = create_putval (buffer, sizeof (buffer), ds, vl);
+ status = cmd_create_putval (buffer, sizeof (buffer), ds, vl);
if (status != 0)
{
- ERROR ("amqp plugin: create_putval failed with status %i.",
+ ERROR ("amqp plugin: cmd_create_putval failed with status %i.",
status);
return (status);
}
{
camqp_config_t *conf;
int status;
- int i;
conf = calloc (1, sizeof (*conf));
if (conf == NULL)
return (status);
}
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
if (publish)
{
char cbname[128];
- user_data_t ud = { conf, camqp_config_free };
-
ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
- status = plugin_register_write (cbname, camqp_write, &ud);
+ status = plugin_register_write (cbname, camqp_write,
+ &(user_data_t) {
+ .data = conf,
+ .free_func = camqp_config_free,
+ });
if (status != 0)
{
camqp_config_free (conf);
static int camqp_config (oconfig_item_t *ci) /* {{{ */
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;