X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Faggregation.c;h=a924223481c42d5a1b611eedd0370aa558f8cf22;hb=e117ee5d033765dca02541a406a565f007efe0c0;hp=2e13766e6d0f4ba400eae84abd8d52f6d2a446b6;hpb=03ed4b711ea23607b255e8a187070133e63184cb;p=collectd.git diff --git a/src/aggregation.c b/src/aggregation.c index 2e13766e..a9242234 100644 --- a/src/aggregation.c +++ b/src/aggregation.c @@ -51,6 +51,7 @@ struct agg_instance_s; typedef struct agg_instance_s agg_instance_t; struct agg_instance_s /* {{{ */ { + pthread_mutex_t lock; identifier_t ident; int ds_type; @@ -116,8 +117,8 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ } /* }}} void agg_instance_destroy */ /* Create a new aggregation instance. */ -static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ - aggregation_t *agg) +static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ + value_list_t const *vl, aggregation_t *agg) { agg_instance_t *inst; @@ -130,6 +131,9 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ return (NULL); } memset (inst, 0, sizeof (*inst)); + pthread_mutex_init (&inst->lock, /* attr = */ NULL); + + inst->ds_type = ds->ds[0].type; #define COPY_FIELD(fld) do { \ sstrncpy (inst->ident.fld, \ @@ -145,7 +149,6 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */ #undef COPY_FIELD - inst->ds_type = -1; inst->min = NAN; inst->max = NAN; @@ -204,6 +207,8 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */ return (0); } + pthread_mutex_lock (&inst->lock); + inst->num++; inst->sum += rate[0]; inst->squares_sum += (rate[0] * rate[0]); @@ -213,16 +218,138 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */ if (isnan (inst->max) || (inst->max < rate[0])) inst->max = rate[0]; + pthread_mutex_unlock (&inst->lock); + sfree (rate); return (0); } /* }}} int agg_instance_update */ +static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ + char const *func, gauge_t rate, rate_to_value_state_t *state, + value_list_t *vl, char const *pi_prefix, cdtime_t t) +{ + value_t v; + int status; + + if (pi_prefix[0] != 0) + ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s", + pi_prefix, func); + else + sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance)); + + memset (&v, 0, sizeof (v)); + status = rate_to_value (&v, rate, state, inst->ds_type, t); + if (status != 0) + { + WARNING ("aggregation plugin: rate_to_value failed with status %i.", + status); + return (-1); + } + + vl->values = &v; + vl->values_len = 1; + + plugin_dispatch_values_secure (vl); + + vl->values = NULL; + vl->values_len = 0; + + return (0); +} /* }}} int agg_instance_read_func */ + +static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ +{ + value_list_t vl = VALUE_LIST_INIT; + char pi_prefix[DATA_MAX_NAME_LEN]; + + /* Pre-set all the fields in the value list that will not change per + * aggregation type (sum, average, ...). The struct will be re-used and must + * therefore be dispatched using the "secure" function. */ + + vl.time = t; + vl.interval = 0; + + vl.meta = meta_data_create (); + if (vl.meta == NULL) + { + ERROR ("aggregation plugin: meta_data_create failed."); + return (-1); + } + meta_data_add_boolean (vl.meta, "aggregation:created", 1); + + if (LU_IS_ALL (inst->ident.host)) + sstrncpy (vl.host, "global", sizeof (vl.host)); + else + sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); + + sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin)); + + if (LU_IS_ALL (inst->ident.plugin)) + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, "", sizeof (pi_prefix)); + else + sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix)); + } + else + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix)); + else + ssnprintf (pi_prefix, sizeof (pi_prefix), + "%s-%s", inst->ident.plugin, inst->ident.plugin_instance); + } + + sstrncpy (vl.type, inst->ident.type, sizeof (vl.type)); + + if (!LU_IS_ALL (inst->ident.type_instance)) + sstrncpy (vl.type_instance, inst->ident.type_instance, + sizeof (vl.type_instance)); + +#define READ_FUNC(func, rate) do { \ + if (inst->state_ ## func != NULL) { \ + agg_instance_read_func (inst, #func, rate, \ + inst->state_ ## func, &vl, pi_prefix, t); \ + } \ +} while (0) + + pthread_mutex_lock (&inst->lock); + + READ_FUNC (num, (gauge_t) inst->num); + + /* All other aggregations are only defined when there have been any values + * at all. */ + if (inst->num > 0) + { + READ_FUNC (sum, inst->sum); + READ_FUNC (average, (inst->sum / ((gauge_t) inst->num))); + READ_FUNC (min, inst->min); + READ_FUNC (max, inst->max); + READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum) + - (inst->sum * inst->sum)) / ((gauge_t) inst->num)); + } + + /* Reset internal state. */ + inst->num = 0; + inst->sum = 0.0; + inst->squares_sum = 0.0; + inst->min = NAN; + inst->max = NAN; + + pthread_mutex_unlock (&inst->lock); + + meta_data_destroy (vl.meta); + vl.meta = NULL; + + return (0); +} /* }}} int agg_instance_read */ + /* lookup_class_callback_t for utils_vl_lookup */ static void *agg_lookup_class_callback ( /* {{{ */ __attribute__((unused)) data_set_t const *ds, value_list_t const *vl, void *user_class) { - return (agg_instance_create (vl, (aggregation_t *) user_class)); + return (agg_instance_create (ds, vl, (aggregation_t *) user_class)); } /* }}} void *agg_class_callback */ /* lookup_obj_callback_t for utils_vl_lookup */ @@ -249,11 +376,11 @@ static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ /* * * - * Host "/any/" * Plugin "cpu" - * PluginInstance "/all/" * Type "cpu" - * TypeInstance "/any/" + * + * GroupBy Host + * GroupBy TypeInstance * * CalculateNum true * CalculateSum true @@ -264,9 +391,47 @@ static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ * * */ +static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */ + aggregation_t *agg) +{ + int i; + + for (i = 0; i < ci->values_num; i++) + { + char const *value; + + if (ci->values[i].type != OCONFIG_TYPE_STRING) + { + ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option " + "is not a string.", i + 1); + continue; + } + + value = ci->values[i].value.string; + + if (strcasecmp ("Host", value) == 0) + sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host)); + else if (strcasecmp ("Plugin", value) == 0) + sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin)); + else if (strcasecmp ("PluginInstance", value) == 0) + sstrncpy (agg->ident.plugin_instance, LU_ANY, + sizeof (agg->ident.plugin_instance)); + else if (strcasecmp ("TypeInstance", value) == 0) + sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance)); + else if (strcasecmp ("Type", value) == 0) + ERROR ("aggregation plugin: Grouping by type is not supported."); + else + WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" " + "option is invalid and will be ignored.", value); + } /* for (ci->values) */ + + return (0); +} /* }}} int agg_config_handle_group_by */ + static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ { aggregation_t *agg; + _Bool is_valid; int status; int i; @@ -278,6 +443,14 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ } memset (agg, 0, sizeof (*agg)); + sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host)); + sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin)); + sstrncpy (agg->ident.plugin_instance, LU_ALL, + sizeof (agg->ident.plugin_instance)); + sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type)); + sstrncpy (agg->ident.type_instance, LU_ALL, + sizeof (agg->ident.type_instance)); + for (i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -297,6 +470,8 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ else if (strcasecmp ("TypeInstance", child->key) == 0) cf_util_get_string_buffer (child, agg->ident.type_instance, sizeof (agg->ident.type_instance)); + else if (strcasecmp ("GroupBy", child->key) == 0) + agg_config_handle_group_by (child, agg); else if (strcasecmp ("CalculateNum", child->key) == 0) cf_util_get_boolean (child, &agg->calc_num); else if (strcasecmp ("CalculateSum", child->key) == 0) @@ -314,10 +489,59 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ " blocks and will be ignored.", child->key); } - /* TODO(octo): Check identifier: - * - At least one wildcard. - * - Type is set. - */ + /* Sanity checking */ + is_valid = 1; + if (LU_IS_ALL (agg->ident.type)) /* {{{ */ + { + ERROR ("aggregation plugin: It appears you did not specify the required " + "\"Type\" option in this aggregation. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } + else if (strchr (agg->ident.type, '/') != NULL) + { + ERROR ("aggregation plugin: The \"Type\" may not contain the '/' " + "character. Especially, it may not be a wildcard. The current " + "value is \"%s\".", agg->ident.type); + is_valid = 0; + } /* }}} */ + + if (!LU_IS_ALL (agg->ident.host) /* {{{ */ + && !LU_IS_ALL (agg->ident.plugin) + && !LU_IS_ALL (agg->ident.plugin_instance) + && !LU_IS_ALL (agg->ident.type_instance)) + { + ERROR ("aggregation plugin: An aggregation must contain at least one " + "wildcard. This is achieved by leaving at least one of the \"Host\", " + "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank " + "and not grouping by that field. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */ + && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) + { + ERROR ("aggregation plugin: No aggregation function has been specified. " + "Without this, I don't know what I should be calculating. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!is_valid) /* {{{ */ + { + sfree (agg); + return (-1); + } /* }}} */ status = lookup_add (lookup, &agg->ident, agg); if (status != 0) @@ -327,6 +551,11 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ return (-1); } + DEBUG ("aggregation plugin: Successfully added aggregation: " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); return (0); } /* }}} int agg_config_aggregation */ @@ -334,6 +563,8 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ { int i; + pthread_mutex_lock (&agg_instance_list_lock); + if (lookup == NULL) { lookup = lookup_create (agg_lookup_class_callback, @@ -342,6 +573,7 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ agg_lookup_free_obj_callback); if (lookup == NULL) { + pthread_mutex_unlock (&agg_instance_list_lock); ERROR ("aggregation plugin: lookup_create failed."); return (-1); } @@ -358,30 +590,49 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ " blocks and will be ignored.", child->key); } + pthread_mutex_unlock (&agg_instance_list_lock); + return (0); } /* }}} int agg_config */ static int agg_read (void) /* {{{ */ { agg_instance_t *this; - size_t i = 0; + cdtime_t t; + int success; + + t = cdtime (); + success = 0; pthread_mutex_lock (&agg_instance_list_lock); + + /* agg_instance_list_head only holds data, after the "write" callback has + * been called with a matching value list at least once. So on startup, + * there's a race between the aggregations read() and write() callback. If + * the read() callback is called first, agg_instance_list_head is NULL and + * "success" may be zero. This is expected and should not result in an error. + * Therefore we need to handle this case separately. */ + if (agg_instance_list_head == NULL) + { + pthread_mutex_unlock (&agg_instance_list_lock); + return (0); + } + for (this = agg_instance_list_head; this != NULL; this = this->next) { - DEBUG ("aggregation plugin: Handling instance: host = \"%s\", " - "plugin = \"%s\", plugin_instance = \"%s\", " - "type = \"%s\", type_instance = \"%s\"", - this->ident.host, - this->ident.plugin, this->ident.plugin_instance, - this->ident.type, this->ident.type_instance); - i++; + int status; + + status = agg_instance_read (this, t); + if (status != 0) + WARNING ("aggregation plugin: Reading an aggregation instance " + "failed with status %i.", status); + else + success++; } - pthread_mutex_unlock (&agg_instance_list_lock); - DEBUG ("aggregation plugin: There are currently %zu instances.", i); + pthread_mutex_unlock (&agg_instance_list_lock); - return (0); + return ((success > 0) ? 0 : -1); } /* }}} int agg_read */ static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */