X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=9c8c6e537c6cb1d2f3907402f7c602e2f8b326b6;hb=6d85a198dd90fb5af963ad847d9dbff7b8025c46;hp=89284c81962ea127d4c901ee0c64e7aceb13275a;hpb=d19bcbf5c310f3656503e64ba26829256112ded4;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 89284c81..9c8c6e53 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -31,6 +31,7 @@ #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" +#include "utils_format_graphite.h" #include @@ -42,8 +43,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 -#define CAMQP_FORMAT_COMMAND 1 -#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_GRAPHITE 3 #define CAMQP_CHANNEL 1 @@ -68,6 +70,10 @@ struct camqp_config_s uint8_t delivery_mode; _Bool store_rates; int format; + /* publish & graphite format only */ + char *prefix; + char *postfix; + char escape_char; /* subscribe only */ char *exchange_type; @@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange_type); sfree (conf->queue); sfree (conf->routing_key); + sfree (conf->prefix); + sfree (conf->postfix); + sfree (conf); } /* }}} void camqp_config_free */ @@ -591,6 +600,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_t *conf = user_data; int status; + cdtime_t interval = plugin_get_interval (); + while (subscriber_threads_running) { amqp_frame_t frame; @@ -601,8 +612,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_DOUBLE (interval)); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -613,9 +624,9 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); + CDTIME_T_TO_DOUBLE (interval)); camqp_close_connection (conf); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -661,7 +672,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ tmp = subscriber_threads + subscriber_threads_num; memset (tmp, 0, sizeof (*tmp)); - status = pthread_create (tmp, /* attr = */ NULL, + status = plugin_thread_create (tmp, /* attr = */ NULL, camqp_subscribe_thread, conf); if (status != 0) { @@ -699,6 +710,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) props.content_type = amqp_cstring_bytes("application/json"); + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); props.delivery_mode = conf->delivery_mode; @@ -777,6 +790,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); } + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + { + status = format_graphite (buffer, sizeof (buffer), ds, vl, + conf->prefix, conf->postfix, conf->escape_char, + conf->store_rates); + if (status != 0) + { + ERROR ("amqp plugin: format_graphite failed with status %i.", + status); + return (status); + } + } else { ERROR ("amqp plugin: Invalid format (%i).", conf->format); @@ -809,6 +834,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ conf->format = CAMQP_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) conf->format = CAMQP_FORMAT_JSON; + else if (strcasecmp ("Graphite", string) == 0) + conf->format = CAMQP_FORMAT_GRAPHITE; else { WARNING ("amqp plugin: Invalid format string: %s", @@ -849,6 +876,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* publish & graphite only */ + conf->prefix = NULL; + conf->postfix = NULL; + conf->escape_char = '_'; /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; @@ -906,6 +937,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_boolean (child, &conf->store_rates); else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->prefix); + else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->postfix); + else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish) + { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + conf->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key);