X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Faggregation.c;h=3d5f396aa53ad453db5e320b12a0b9f9c9e56d63;hb=c6557cb8950f50a41a1decfb34439b060e2fd371;hp=42722021adca5f9fd0ce29c60f0d68040e41d38a;hpb=aefb1ea8faaf373428e1862b6de13b26a630da06;p=collectd.git diff --git a/src/aggregation.c b/src/aggregation.c index 42722021..3d5f396a 100644 --- a/src/aggregation.c +++ b/src/aggregation.c @@ -25,18 +25,31 @@ **/ #include "collectd.h" + +#include + #include "plugin.h" #include "common.h" #include "configfile.h" #include "meta_data.h" #include "utils_cache.h" /* for uc_get_rate() */ +#include "utils_subst.h" #include "utils_vl_lookup.h" -#include +#define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0) +#define AGG_FUNC_PLACEHOLDER "%{aggregation}" struct aggregation_s /* {{{ */ { identifier_t ident; + unsigned int group_by; + + unsigned int regex_fields; + + char *set_host; + char *set_plugin; + char *set_plugin_instance; + char *set_type_instance; _Bool calc_num; _Bool calc_sum; @@ -78,6 +91,23 @@ static lookup_t *lookup = NULL; static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER; static agg_instance_t *agg_instance_list_head = NULL; +static _Bool agg_is_regex (char const *str) /* {{{ */ +{ + size_t len; + + if (str == NULL) + return (0); + + len = strlen (str); + if (len < 3) + return (0); + + if ((str[0] == '/') && (str[len - 1] == '/')) + return (1); + else + return (0); +} /* }}} _Bool agg_is_regex */ + static void agg_destroy (aggregation_t *agg) /* {{{ */ { sfree (agg); @@ -116,6 +146,92 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ inst->max = NAN; } /* }}} void agg_instance_destroy */ +static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */ + value_list_t const *vl, aggregation_t const *agg) +{ +#define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \ + if (agg->set_ ## field != NULL) \ + sstrncpy (buffer, agg->set_ ## field, buffer_size); \ + else if ((agg->regex_fields & group_mask) \ + && (agg->group_by & group_mask)) \ + sstrncpy (buffer, vl->field, buffer_size); \ + else if ((agg->regex_fields & group_mask) \ + && (AGG_MATCHES_ALL (agg->ident.field))) \ + sstrncpy (buffer, all_value, buffer_size); \ + else \ + sstrncpy (buffer, agg->ident.field, buffer_size); \ +} while (0) + + /* Host */ + COPY_FIELD (inst->ident.host, sizeof (inst->ident.host), + host, LU_GROUP_BY_HOST, "global"); + + /* Plugin */ + if (agg->set_plugin != NULL) + sstrncpy (inst->ident.plugin, agg->set_plugin, + sizeof (inst->ident.plugin)); + else + sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin)); + + /* Plugin instance */ + if (agg->set_plugin_instance != NULL) + sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance, + sizeof (inst->ident.plugin_instance)); + else + { + char tmp_plugin[DATA_MAX_NAME_LEN]; + char tmp_plugin_instance[DATA_MAX_NAME_LEN] = ""; + + if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) + && (agg->group_by & LU_GROUP_BY_PLUGIN)) + sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin)); + else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) + && (AGG_MATCHES_ALL (agg->ident.plugin))) + sstrncpy (tmp_plugin, "", sizeof (tmp_plugin)); + else + sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin)); + + if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) + && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE)) + sstrncpy (tmp_plugin_instance, vl->plugin_instance, + sizeof (tmp_plugin_instance)); + else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) + && (AGG_MATCHES_ALL (agg->ident.plugin_instance))) + sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance)); + else + sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance, + sizeof (tmp_plugin_instance)); + + if ((strcmp ("", tmp_plugin) == 0) + && (strcmp ("", tmp_plugin_instance) == 0)) + sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER, + sizeof (inst->ident.plugin_instance)); + else if (strcmp ("", tmp_plugin) != 0) + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER); + else if (strcmp ("", tmp_plugin_instance) != 0) + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER); + else + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER); + } + + /* Type */ + sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type)); + + /* Type instance */ + COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance), + type_instance, LU_GROUP_BY_TYPE_INSTANCE, ""); + +#undef COPY_FIELD + + return (0); +} /* }}} int agg_instance_create_name */ + /* Create a new aggregation instance. */ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ value_list_t const *vl, aggregation_t *agg) @@ -124,30 +240,17 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ DEBUG ("aggregation plugin: Creating new instance."); - inst = malloc (sizeof (*inst)); + inst = calloc (1, sizeof (*inst)); if (inst == NULL) { - ERROR ("aggregation plugin: malloc() failed."); + ERROR ("aggregation plugin: calloc() failed."); return (NULL); } - memset (inst, 0, sizeof (*inst)); pthread_mutex_init (&inst->lock, /* attr = */ NULL); inst->ds_type = ds->ds[0].type; -#define COPY_FIELD(fld) do { \ - sstrncpy (inst->ident.fld, \ - LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \ - sizeof (inst->ident.fld)); \ -} while (0) - - COPY_FIELD (host); - COPY_FIELD (plugin); - COPY_FIELD (plugin_instance); - COPY_FIELD (type); - COPY_FIELD (type_instance); - -#undef COPY_FIELD + agg_instance_create_name (inst, vl, agg); inst->min = NAN; inst->max = NAN; @@ -155,13 +258,13 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ #define INIT_STATE(field) do { \ inst->state_ ## field = NULL; \ if (agg->calc_ ## field) { \ - inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \ + inst->state_ ## field = calloc (1, sizeof (*inst->state_ ## field)); \ if (inst->state_ ## field == NULL) { \ agg_instance_destroy (inst); \ - ERROR ("aggregation plugin: malloc() failed."); \ + free (inst); \ + ERROR ("aggregation plugin: calloc() failed."); \ return (NULL); \ } \ - memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \ } \ } while (0) @@ -192,13 +295,21 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */ gauge_t *rate; if (ds->ds_num != 1) - return (-1); + { + ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one " + "data source. This is currently not supported by this plugin. " + "Sorry.", ds->type); + return (EINVAL); + } rate = uc_get_rate (ds, vl); if (rate == NULL) { - ERROR ("aggregation plugin: uc_get_rate() failed."); - return (-1); + char ident[6 * DATA_MAX_NAME_LEN]; + FORMAT_VL (ident, sizeof (ident), vl); + ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".", + ident); + return (ENOENT); } if (isnan (rate[0])) @@ -232,8 +343,8 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ int status; if (pi_prefix[0] != 0) - ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s", - pi_prefix, func); + subst_string (vl->plugin_instance, sizeof (vl->plugin_instance), + pi_prefix, AGG_FUNC_PLACEHOLDER, func); else sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance)); @@ -241,6 +352,12 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ status = rate_to_value (&v, rate, state, inst->ds_type, t); if (status != 0) { + /* If this is the first iteration and rate_to_value() was asked to return a + * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle + * gracefully. */ + if (status == EAGAIN) + return (0); + WARNING ("aggregation plugin: rate_to_value failed with status %i.", status); return (-1); @@ -249,7 +366,7 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ vl->values = &v; vl->values_len = 1; - plugin_dispatch_values_secure (vl); + plugin_dispatch_values (vl); vl->values = NULL; vl->values_len = 0; @@ -260,7 +377,6 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ { value_list_t vl = VALUE_LIST_INIT; - char pi_prefix[DATA_MAX_NAME_LEN]; /* Pre-set all the fields in the value list that will not change per * aggregation type (sum, average, ...). The struct will be re-used and must @@ -277,39 +393,16 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ } meta_data_add_boolean (vl.meta, "aggregation:created", 1); - if (LU_IS_ALL (inst->ident.host)) - sstrncpy (vl.host, "global", sizeof (vl.host)); - else - sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); - - sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin)); - - if (LU_IS_ALL (inst->ident.plugin)) - { - if (LU_IS_ALL (inst->ident.plugin_instance)) - sstrncpy (pi_prefix, "", sizeof (pi_prefix)); - else - sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix)); - } - else - { - if (LU_IS_ALL (inst->ident.plugin_instance)) - sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix)); - else - ssnprintf (pi_prefix, sizeof (pi_prefix), - "%s-%s", inst->ident.plugin, inst->ident.plugin_instance); - } - + sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); + sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin)); sstrncpy (vl.type, inst->ident.type, sizeof (vl.type)); - - if (!LU_IS_ALL (inst->ident.type_instance)) - sstrncpy (vl.type_instance, inst->ident.type_instance, - sizeof (vl.type_instance)); + sstrncpy (vl.type_instance, inst->ident.type_instance, + sizeof (vl.type_instance)); #define READ_FUNC(func, rate) do { \ if (inst->state_ ## func != NULL) { \ agg_instance_read_func (inst, #func, rate, \ - inst->state_ ## func, &vl, pi_prefix, t); \ + inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \ } \ } while (0) @@ -346,8 +439,7 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ /* lookup_class_callback_t for utils_vl_lookup */ static void *agg_lookup_class_callback ( /* {{{ */ - __attribute__((unused)) data_set_t const *ds, - value_list_t const *vl, void *user_class) + data_set_t const *ds, value_list_t const *vl, void *user_class) { return (agg_instance_create (ds, vl, (aggregation_t *) user_class)); } /* }}} void *agg_class_callback */ @@ -376,11 +468,11 @@ static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ /* * * - * Host "/any/" * Plugin "cpu" - * PluginInstance "/all/" * Type "cpu" - * TypeInstance "/any/" + * + * GroupBy Host + * GroupBy TypeInstance * * CalculateNum true * CalculateSum true @@ -391,19 +483,63 @@ static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ * * */ +static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */ + aggregation_t *agg) +{ + int i; + + for (i = 0; i < ci->values_num; i++) + { + char const *value; + + if (ci->values[i].type != OCONFIG_TYPE_STRING) + { + ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option " + "is not a string.", i + 1); + continue; + } + + value = ci->values[i].value.string; + + if (strcasecmp ("Host", value) == 0) + agg->group_by |= LU_GROUP_BY_HOST; + else if (strcasecmp ("Plugin", value) == 0) + agg->group_by |= LU_GROUP_BY_PLUGIN; + else if (strcasecmp ("PluginInstance", value) == 0) + agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE; + else if (strcasecmp ("TypeInstance", value) == 0) + agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE; + else if (strcasecmp ("Type", value) == 0) + ERROR ("aggregation plugin: Grouping by type is not supported."); + else + WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" " + "option is invalid and will be ignored.", value); + } /* for (ci->values) */ + + return (0); +} /* }}} int agg_config_handle_group_by */ + static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ { aggregation_t *agg; + _Bool is_valid; int status; int i; - agg = malloc (sizeof (*agg)); + agg = calloc (1, sizeof (*agg)); if (agg == NULL) { - ERROR ("aggregation plugin: malloc failed."); + ERROR ("aggregation plugin: calloc failed."); return (-1); } - memset (agg, 0, sizeof (*agg)); + + sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host)); + sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin)); + sstrncpy (agg->ident.plugin_instance, "/.*/", + sizeof (agg->ident.plugin_instance)); + sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type)); + sstrncpy (agg->ident.type_instance, "/.*/", + sizeof (agg->ident.type_instance)); for (i = 0; i < ci->children_num; i++) { @@ -424,6 +560,16 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ else if (strcasecmp ("TypeInstance", child->key) == 0) cf_util_get_string_buffer (child, agg->ident.type_instance, sizeof (agg->ident.type_instance)); + else if (strcasecmp ("SetHost", child->key) == 0) + cf_util_get_string (child, &agg->set_host); + else if (strcasecmp ("SetPlugin", child->key) == 0) + cf_util_get_string (child, &agg->set_plugin); + else if (strcasecmp ("SetPluginInstance", child->key) == 0) + cf_util_get_string (child, &agg->set_plugin_instance); + else if (strcasecmp ("SetTypeInstance", child->key) == 0) + cf_util_get_string (child, &agg->set_type_instance); + else if (strcasecmp ("GroupBy", child->key) == 0) + agg_config_handle_group_by (child, agg); else if (strcasecmp ("CalculateNum", child->key) == 0) cf_util_get_boolean (child, &agg->calc_num); else if (strcasecmp ("CalculateSum", child->key) == 0) @@ -441,12 +587,81 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ " blocks and will be ignored.", child->key); } - /* TODO(octo): Check identifier: - * - At least one wildcard. - * - Type is set. - */ + if (agg_is_regex (agg->ident.host)) + agg->regex_fields |= LU_GROUP_BY_HOST; + if (agg_is_regex (agg->ident.plugin)) + agg->regex_fields |= LU_GROUP_BY_PLUGIN; + if (agg_is_regex (agg->ident.plugin_instance)) + agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE; + if (agg_is_regex (agg->ident.type_instance)) + agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE; + + /* Sanity checking */ + is_valid = 1; + if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */ + { + ERROR ("aggregation plugin: It appears you did not specify the required " + "\"Type\" option in this aggregation. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } + else if (strchr (agg->ident.type, '/') != NULL) + { + ERROR ("aggregation plugin: The \"Type\" may not contain the '/' " + "character. Especially, it may not be a regex. The current " + "value is \"%s\".", agg->ident.type); + is_valid = 0; + } /* }}} */ + + /* Check that there is at least one regex field without a grouping. {{{ */ + if ((agg->regex_fields & ~agg->group_by) == 0) + { + ERROR ("aggregation plugin: An aggregation must contain at least one " + "wildcard. This is achieved by leaving at least one of the \"Host\", " + "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank " + "or using a regular expression and not grouping by that field. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + /* Check that all grouping fields are regular expressions. {{{ */ + if (agg->group_by & ~agg->regex_fields) + { + ERROR ("aggregation plugin: Only wildcard fields (fields for which a " + "regular expression is configured or which are left blank) can be " + "specified in the \"GroupBy\" option. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */ + && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) + { + ERROR ("aggregation plugin: No aggregation function has been specified. " + "Without this, I don't know what I should be calculating. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!is_valid) /* {{{ */ + { + sfree (agg); + return (-1); + } /* }}} */ - status = lookup_add (lookup, &agg->ident, agg); + status = lookup_add (lookup, &agg->ident, agg->group_by, agg); if (status != 0) { ERROR ("aggregation plugin: lookup_add failed with status %i.", status); @@ -454,6 +669,11 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ return (-1); } + DEBUG ("aggregation plugin: Successfully added aggregation: " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); return (0); } /* }}} int agg_config_aggregation */ @@ -461,6 +681,8 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ { int i; + pthread_mutex_lock (&agg_instance_list_lock); + if (lookup == NULL) { lookup = lookup_create (agg_lookup_class_callback, @@ -469,6 +691,7 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ agg_lookup_free_obj_callback); if (lookup == NULL) { + pthread_mutex_unlock (&agg_instance_list_lock); ERROR ("aggregation plugin: lookup_create failed."); return (-1); } @@ -485,6 +708,8 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */ " blocks and will be ignored.", child->key); } + pthread_mutex_unlock (&agg_instance_list_lock); + return (0); } /* }}} int agg_config */ @@ -499,6 +724,18 @@ static int agg_read (void) /* {{{ */ pthread_mutex_lock (&agg_instance_list_lock); + /* agg_instance_list_head only holds data, after the "write" callback has + * been called with a matching value list at least once. So on startup, + * there's a race between the aggregations read() and write() callback. If + * the read() callback is called first, agg_instance_list_head is NULL and + * "success" may be zero. This is expected and should not result in an error. + * Therefore we need to handle this case separately. */ + if (agg_instance_list_head == NULL) + { + pthread_mutex_unlock (&agg_instance_list_lock); + return (0); + } + for (this = agg_instance_list_head; this != NULL; this = this->next) { int status; @@ -510,6 +747,7 @@ static int agg_read (void) /* {{{ */ else success++; } + pthread_mutex_unlock (&agg_instance_list_lock); return ((success > 0) ? 0 : -1);