X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=84bcc066efcf41a93a14e11d82640e62c0dc6e14;hb=2193369303b99a35fe09d14632d66c00e20755c6;hp=eb54795069f1bcc17f39d2571981d50137f49e99;hpb=91ca6eb3c01b7f9bbcfd624dfbfcb8910dc65098;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index eb547950..84bcc066 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -35,6 +35,7 @@ #include "collectd.h" #include "common.h" #include "plugin.h" +#include "utils_cmd_putval.h" #include "utils_format_json.h" #include @@ -45,6 +46,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 + #define CAMQP_CHANNEL 1 /* @@ -54,6 +58,7 @@ struct camqp_config_s { _Bool publish; char *name; + int format; char *host; int port; @@ -392,6 +397,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_connect */ +static int shutdown (void) /* {{{ */ +{ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + + return (0); +} /* }}} int shutdown */ + +/* + * Subscribing code + */ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ size_t body_size) { @@ -552,6 +585,9 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_subscribe_init */ +/* + * Publishing code + */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer) { @@ -604,9 +640,27 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ bfree = sizeof (buffer); bfill = 0; - format_json_initialize (buffer, &bfill, &bfree); - format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); - format_json_finalize (buffer, &bfill, &bfree); + if (conf->format == CAMQP_FORMAT_COMMAND) + { + status = create_putval (buffer, sizeof (buffer), ds, vl); + if (status != 0) + { + ERROR ("amqp plugin: create_putval failed with status %i.", + status); + return (status); + } + } + else if (conf->format == CAMQP_FORMAT_JSON) + { + format_json_initialize (buffer, &bfill, &bfree); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); + format_json_finalize (buffer, &bfill, &bfree); + } + else + { + ERROR ("amqp plugin: Invalid format (%i).", conf->format); + return (-1); + } pthread_mutex_lock (&conf->lock); status = camqp_write_locked (conf, buffer); @@ -615,6 +669,36 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ return (status); } /* }}} int camqp_write */ +/* + * Config handling + */ +static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ + camqp_config_t *conf) +{ + char *string; + int status; + + string = NULL; + status = cf_util_get_string (ci, &string); + if (status != 0) + return (status); + + assert (string != NULL); + if (strcasecmp ("Command", string) == 0) + conf->format = CAMQP_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + conf->format = CAMQP_FORMAT_JSON; + else + { + WARNING ("amqp plugin: Invalid format string: %s", + string); + } + + free (string); + + return (0); +} /* }}} int config_set_string */ + static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ _Bool publish) { @@ -633,6 +717,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ memset (conf, 0, sizeof (*conf)); conf->publish = publish; conf->name = NULL; + conf->format = CAMQP_FORMAT_COMMAND; conf->host = NULL; conf->port = 5672; conf->vhost = NULL; @@ -662,7 +747,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Host", child->key) == 0) + if (strcasecmp ("Format", child->key) == 0) + status = camqp_config_set_format (child, conf); + else if (strcasecmp ("Host", child->key) == 0) status = cf_util_get_string (child, &conf->host); else if (strcasecmp ("Port", child->key) == 0) { @@ -776,31 +863,6 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ return (0); } /* }}} int camqp_config */ -static int shutdown (void) /* {{{ */ -{ - size_t i; - - DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", - subscriber_threads_num); - - subscriber_threads_running = 0; - for (i = 0; i < subscriber_threads_num; i++) - { - /* FIXME: Sending a signal is not very elegant here. Maybe find out how - * to use a timeout in the thread and check for the variable in regular - * intervals. */ - pthread_kill (subscriber_threads[i], SIGTERM); - pthread_join (subscriber_threads[i], /* retval = */ NULL); - } - - subscriber_threads_num = 0; - sfree (subscriber_threads); - - DEBUG ("amqp plugin: All subscriber threads exited."); - - return (0); -} /* }}} int shutdown */ - void module_register (void) { plugin_register_complex_config ("amqp", camqp_config);