From: Florian Forster Date: Mon, 25 Jun 2012 11:29:07 +0000 (+0200) Subject: aggregation plugin: Actually dispatch values. X-Git-Tag: collectd-5.2.0~24^2~7 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=61f8576741f9d11b688ae1a772c5f056915e72e1;p=collectd.git aggregation plugin: Actually dispatch values. --- diff --git a/src/aggregation.c b/src/aggregation.c index 2e13766e..c8f57793 100644 --- a/src/aggregation.c +++ b/src/aggregation.c @@ -51,6 +51,7 @@ struct agg_instance_s; typedef struct agg_instance_s agg_instance_t; struct agg_instance_s /* {{{ */ { + pthread_mutex_t lock; identifier_t ident; int ds_type; @@ -116,8 +117,8 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ } /* }}} void agg_instance_destroy */ /* Create a new aggregation instance. */ -static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ - aggregation_t *agg) +static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ + value_list_t const *vl, aggregation_t *agg) { agg_instance_t *inst; @@ -130,6 +131,9 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ return (NULL); } memset (inst, 0, sizeof (*inst)); + pthread_mutex_init (&inst->lock, /* attr = */ NULL); + + inst->ds_type = ds->ds[0].type; #define COPY_FIELD(fld) do { \ sstrncpy (inst->ident.fld, \ @@ -145,7 +149,6 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ #undef COPY_FIELD - inst->ds_type = -1; inst->min = NAN; inst->max = NAN; @@ -204,6 +207,8 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */ return (0); } + pthread_mutex_lock (&inst->lock); + inst->num++; inst->sum += rate[0]; inst->squares_sum += (rate[0] * rate[0]); @@ -213,16 +218,138 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */ if (isnan (inst->max) || (inst->max < rate[0])) inst->max = rate[0]; + pthread_mutex_unlock (&inst->lock); + sfree (rate); return (0); } /* }}} int agg_instance_update */ +static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ + char const *func, gauge_t rate, rate_to_value_state_t *state, + value_list_t *vl, char const *pi_prefix, cdtime_t t) +{ + value_t v; + int status; + + if (pi_prefix[0] != 0) + ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s", + pi_prefix, func); + else + sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance)); + + memset (&v, 0, sizeof (v)); + status = rate_to_value (&v, rate, state, inst->ds_type, t); + if (status != 0) + { + WARNING ("aggregation plugin: rate_to_value failed with status %i.", + status); + return (-1); + } + + vl->values = &v; + vl->values_len = 1; + + plugin_dispatch_values_secure (vl); + + vl->values = NULL; + vl->values_len = 0; + + return (0); +} /* }}} int agg_instance_read_func */ + +static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ +{ + value_list_t vl = VALUE_LIST_INIT; + char pi_prefix[DATA_MAX_NAME_LEN]; + + /* Pre-set all the fields in the value list that will not change per + * aggregation type (sum, average, ...). The struct will be re-used and must + * therefore be dispatched using the "secure" function. */ + + vl.time = t; + vl.interval = 0; + + vl.meta = meta_data_create (); + if (vl.meta == NULL) + { + ERROR ("aggregation plugin: meta_data_create failed."); + return (-1); + } + meta_data_add_boolean (vl.meta, "aggregation:created", 1); + + if (LU_IS_ALL (inst->ident.host)) + sstrncpy (vl.host, "global", sizeof (vl.host)); + else + sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); + + sstrncpy (vl.plugin, "aggregate", sizeof (vl.plugin)); + + if (LU_IS_ALL (inst->ident.plugin)) + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, "", sizeof (pi_prefix)); + else + sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix)); + } + else + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix)); + else + ssnprintf (pi_prefix, sizeof (pi_prefix), + "%s-%s", inst->ident.plugin, inst->ident.plugin_instance); + } + + sstrncpy (vl.type, inst->ident.type, sizeof (vl.type)); + + if (!LU_IS_ALL (inst->ident.type_instance)) + sstrncpy (vl.type_instance, inst->ident.type_instance, + sizeof (vl.type_instance)); + +#define READ_FUNC(func, rate) do { \ + if (inst->state_ ## func != NULL) { \ + agg_instance_read_func (inst, #func, rate, \ + inst->state_ ## func, &vl, pi_prefix, t); \ + } \ +} while (0) + + pthread_mutex_lock (&inst->lock); + + READ_FUNC (num, (gauge_t) inst->num); + + /* All other aggregations are only defined when there have been any values + * at all. */ + if (inst->num > 0) + { + READ_FUNC (sum, inst->sum); + READ_FUNC (average, (inst->sum / ((gauge_t) inst->num))); + READ_FUNC (min, inst->min); + READ_FUNC (max, inst->max); + READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum) + - (inst->sum * inst->sum)) / ((gauge_t) inst->num)); + } + + /* Reset internal state. */ + inst->num = 0; + inst->sum = 0.0; + inst->squares_sum = 0.0; + inst->min = NAN; + inst->max = NAN; + + pthread_mutex_unlock (&inst->lock); + + meta_data_destroy (vl.meta); + vl.meta = NULL; + + return (0); +} /* }}} int agg_instance_read */ + /* lookup_class_callback_t for utils_vl_lookup */ static void *agg_lookup_class_callback ( /* {{{ */ __attribute__((unused)) data_set_t const *ds, value_list_t const *vl, void *user_class) { - return (agg_instance_create (vl, (aggregation_t *) user_class)); + return (agg_instance_create (ds, vl, (aggregation_t *) user_class)); } /* }}} void *agg_class_callback */ /* lookup_obj_callback_t for utils_vl_lookup */ @@ -364,24 +491,28 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ static int agg_read (void) /* {{{ */ { agg_instance_t *this; - size_t i = 0; + cdtime_t t; + int success; + + t = cdtime (); + success = 0; pthread_mutex_lock (&agg_instance_list_lock); + for (this = agg_instance_list_head; this != NULL; this = this->next) { - DEBUG ("aggregation plugin: Handling instance: host = \"%s\", " - "plugin = \"%s\", plugin_instance = \"%s\", " - "type = \"%s\", type_instance = \"%s\"", - this->ident.host, - this->ident.plugin, this->ident.plugin_instance, - this->ident.type, this->ident.type_instance); - i++; + int status; + + status = agg_instance_read (this, t); + if (status != 0) + WARNING ("aggregation plugin: Reading an aggregation instance " + "failed with status %i.", status); + else + success++; } pthread_mutex_unlock (&agg_instance_list_lock); - DEBUG ("aggregation plugin: There are currently %zu instances.", i); - - return (0); + return ((success > 0) ? 0 : -1); } /* }}} int agg_read */ static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */