X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=edd4f749396bbb14c501792b62db70d188ade0de;hb=19e1e95c51fa96475bd8cd9dccfc53609454a909;hp=c9929dc0b12e00a9a925ebc6c98c559162a27167;hpb=531f59523e969b498c570f9f7083bf41b6811773;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index c9929dc0..edd4f749 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -74,6 +74,7 @@ struct camqp_config_s char *prefix; char *postfix; char escape_char; + unsigned int graphite_flags; /* subscribe only */ char *exchange_type; @@ -600,6 +601,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_t *conf = user_data; int status; + cdtime_t interval = plugin_get_interval (); + while (subscriber_threads_running) { amqp_frame_t frame; @@ -610,8 +613,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_DOUBLE (interval)); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -622,9 +625,9 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); + CDTIME_T_TO_DOUBLE (interval)); camqp_close_connection (conf); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -650,6 +653,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_free (conf); pthread_exit (NULL); + return (NULL); } /* }}} void *camqp_subscribe_thread */ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ @@ -669,7 +673,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ tmp = subscriber_threads + subscriber_threads_num; memset (tmp, 0, sizeof (*tmp)); - status = pthread_create (tmp, /* attr = */ NULL, + status = plugin_thread_create (tmp, /* attr = */ NULL, camqp_subscribe_thread, conf); if (status != 0) { @@ -790,7 +794,8 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ else if (conf->format == CAMQP_FORMAT_GRAPHITE) { status = format_graphite (buffer, sizeof (buffer), ds, vl, - conf->prefix, conf->postfix, conf->escape_char); + conf->prefix, conf->postfix, conf->escape_char, + conf->graphite_flags); if (status != 0) { ERROR ("amqp plugin: format_graphite failed with status %i.", @@ -872,6 +877,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -930,9 +936,19 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->delivery_mode = CAMQP_DM_VOLATILE; } else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) + { status = cf_util_get_boolean (child, &conf->store_rates); + (void) cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_STORE_RATES); + } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)