X-Git-Url: https://git.octo.it/?a=blobdiff_plain;ds=sidebyside;f=src%2Faggregation.c;h=c4c1627af360ce463ba030cb64e80a4f6667b723;hb=95dcf60c822e4ab92c0dd1a7ff6cba73bc55bcd1;hp=7ca26ca4698fa34bdc3e43269f180234805a959b;hpb=16202999622d778521903ca2feef7d55c5b1b5b3;p=collectd.git diff --git a/src/aggregation.c b/src/aggregation.c index 7ca26ca4..c4c1627a 100644 --- a/src/aggregation.c +++ b/src/aggregation.c @@ -25,22 +25,30 @@ **/ #include "collectd.h" + #include "plugin.h" #include "common.h" #include "configfile.h" #include "meta_data.h" #include "utils_cache.h" /* for uc_get_rate() */ +#include "utils_subst.h" #include "utils_vl_lookup.h" -#include - #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0) +#define AGG_FUNC_PLACEHOLDER "%{aggregation}" struct aggregation_s /* {{{ */ { identifier_t ident; unsigned int group_by; + unsigned int regex_fields; + + char *set_host; + char *set_plugin; + char *set_plugin_instance; + char *set_type_instance; + _Bool calc_num; _Bool calc_sum; _Bool calc_average; @@ -81,6 +89,23 @@ static lookup_t *lookup = NULL; static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER; static agg_instance_t *agg_instance_list_head = NULL; +static _Bool agg_is_regex (char const *str) /* {{{ */ +{ + size_t len; + + if (str == NULL) + return (0); + + len = strlen (str); + if (len < 3) + return (0); + + if ((str[0] == '/') && (str[len - 1] == '/')) + return (1); + else + return (0); +} /* }}} _Bool agg_is_regex */ + static void agg_destroy (aggregation_t *agg) /* {{{ */ { sfree (agg); @@ -119,6 +144,92 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ inst->max = NAN; } /* }}} void agg_instance_destroy */ +static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */ + value_list_t const *vl, aggregation_t const *agg) +{ +#define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \ + if (agg->set_ ## field != NULL) \ + sstrncpy (buffer, agg->set_ ## field, buffer_size); \ + else if ((agg->regex_fields & group_mask) \ + && (agg->group_by & group_mask)) \ + sstrncpy (buffer, vl->field, buffer_size); \ + else if ((agg->regex_fields & group_mask) \ + && (AGG_MATCHES_ALL (agg->ident.field))) \ + sstrncpy (buffer, all_value, buffer_size); \ + else \ + sstrncpy (buffer, agg->ident.field, buffer_size); \ +} while (0) + + /* Host */ + COPY_FIELD (inst->ident.host, sizeof (inst->ident.host), + host, LU_GROUP_BY_HOST, "global"); + + /* Plugin */ + if (agg->set_plugin != NULL) + sstrncpy (inst->ident.plugin, agg->set_plugin, + sizeof (inst->ident.plugin)); + else + sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin)); + + /* Plugin instance */ + if (agg->set_plugin_instance != NULL) + sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance, + sizeof (inst->ident.plugin_instance)); + else + { + char tmp_plugin[DATA_MAX_NAME_LEN]; + char tmp_plugin_instance[DATA_MAX_NAME_LEN] = ""; + + if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) + && (agg->group_by & LU_GROUP_BY_PLUGIN)) + sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin)); + else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) + && (AGG_MATCHES_ALL (agg->ident.plugin))) + sstrncpy (tmp_plugin, "", sizeof (tmp_plugin)); + else + sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin)); + + if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) + && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE)) + sstrncpy (tmp_plugin_instance, vl->plugin_instance, + sizeof (tmp_plugin_instance)); + else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) + && (AGG_MATCHES_ALL (agg->ident.plugin_instance))) + sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance)); + else + sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance, + sizeof (tmp_plugin_instance)); + + if ((strcmp ("", tmp_plugin) == 0) + && (strcmp ("", tmp_plugin_instance) == 0)) + sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER, + sizeof (inst->ident.plugin_instance)); + else if (strcmp ("", tmp_plugin) != 0) + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER); + else if (strcmp ("", tmp_plugin_instance) != 0) + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER); + else + ssnprintf (inst->ident.plugin_instance, + sizeof (inst->ident.plugin_instance), + "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER); + } + + /* Type */ + sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type)); + + /* Type instance */ + COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance), + type_instance, LU_GROUP_BY_TYPE_INSTANCE, ""); + +#undef COPY_FIELD + + return (0); +} /* }}} int agg_instance_create_name */ + /* Create a new aggregation instance. */ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ value_list_t const *vl, aggregation_t *agg) @@ -127,30 +238,17 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ DEBUG ("aggregation plugin: Creating new instance."); - inst = malloc (sizeof (*inst)); + inst = calloc (1, sizeof (*inst)); if (inst == NULL) { - ERROR ("aggregation plugin: malloc() failed."); + ERROR ("aggregation plugin: calloc() failed."); return (NULL); } - memset (inst, 0, sizeof (*inst)); pthread_mutex_init (&inst->lock, /* attr = */ NULL); inst->ds_type = ds->ds[0].type; -#define COPY_FIELD(field, group_mask) do { \ - sstrncpy (inst->ident.field, \ - (agg->group_by & group_mask) ? vl->field : agg->ident.field, \ - sizeof (inst->ident.field)); \ -} while (0) - - COPY_FIELD (host, LU_GROUP_BY_HOST); - COPY_FIELD (plugin, LU_GROUP_BY_PLUGIN); - COPY_FIELD (plugin_instance, LU_GROUP_BY_PLUGIN_INSTANCE); - COPY_FIELD (type, /* group_mask = */ 0); - COPY_FIELD (type_instance, LU_GROUP_BY_TYPE_INSTANCE); - -#undef COPY_FIELD + agg_instance_create_name (inst, vl, agg); inst->min = NAN; inst->max = NAN; @@ -158,13 +256,13 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ #define INIT_STATE(field) do { \ inst->state_ ## field = NULL; \ if (agg->calc_ ## field) { \ - inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \ + inst->state_ ## field = calloc (1, sizeof (*inst->state_ ## field)); \ if (inst->state_ ## field == NULL) { \ agg_instance_destroy (inst); \ - ERROR ("aggregation plugin: malloc() failed."); \ + free (inst); \ + ERROR ("aggregation plugin: calloc() failed."); \ return (NULL); \ } \ - memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \ } \ } while (0) @@ -243,8 +341,8 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ int status; if (pi_prefix[0] != 0) - ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s", - pi_prefix, func); + subst_string (vl->plugin_instance, sizeof (vl->plugin_instance), + pi_prefix, AGG_FUNC_PLACEHOLDER, func); else sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance)); @@ -266,7 +364,7 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ vl->values = &v; vl->values_len = 1; - plugin_dispatch_values_secure (vl); + plugin_dispatch_values (vl); vl->values = NULL; vl->values_len = 0; @@ -277,7 +375,6 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ { value_list_t vl = VALUE_LIST_INIT; - char pi_prefix[DATA_MAX_NAME_LEN]; /* Pre-set all the fields in the value list that will not change per * aggregation type (sum, average, ...). The struct will be re-used and must @@ -294,39 +391,16 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ } meta_data_add_boolean (vl.meta, "aggregation:created", 1); - if (AGG_MATCHES_ALL (inst->ident.host)) - sstrncpy (vl.host, "global", sizeof (vl.host)); - else - sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); - - sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin)); - - if (AGG_MATCHES_ALL (inst->ident.plugin)) - { - if (AGG_MATCHES_ALL (inst->ident.plugin_instance)) - sstrncpy (pi_prefix, "", sizeof (pi_prefix)); - else - sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix)); - } - else - { - if (AGG_MATCHES_ALL (inst->ident.plugin_instance)) - sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix)); - else - ssnprintf (pi_prefix, sizeof (pi_prefix), - "%s-%s", inst->ident.plugin, inst->ident.plugin_instance); - } - + sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); + sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin)); sstrncpy (vl.type, inst->ident.type, sizeof (vl.type)); - - if (!AGG_MATCHES_ALL (inst->ident.type_instance)) - sstrncpy (vl.type_instance, inst->ident.type_instance, - sizeof (vl.type_instance)); + sstrncpy (vl.type_instance, inst->ident.type_instance, + sizeof (vl.type_instance)); #define READ_FUNC(func, rate) do { \ if (inst->state_ ## func != NULL) { \ agg_instance_read_func (inst, #func, rate, \ - inst->state_ ## func, &vl, pi_prefix, t); \ + inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \ } \ } while (0) @@ -363,8 +437,7 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ /* lookup_class_callback_t for utils_vl_lookup */ static void *agg_lookup_class_callback ( /* {{{ */ - __attribute__((unused)) data_set_t const *ds, - value_list_t const *vl, void *user_class) + data_set_t const *ds, value_list_t const *vl, void *user_class) { return (agg_instance_create (ds, vl, (aggregation_t *) user_class)); } /* }}} void *agg_class_callback */ @@ -451,13 +524,12 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ int status; int i; - agg = malloc (sizeof (*agg)); + agg = calloc (1, sizeof (*agg)); if (agg == NULL) { - ERROR ("aggregation plugin: malloc failed."); + ERROR ("aggregation plugin: calloc failed."); return (-1); } - memset (agg, 0, sizeof (*agg)); sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host)); sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin)); @@ -486,6 +558,14 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ else if (strcasecmp ("TypeInstance", child->key) == 0) cf_util_get_string_buffer (child, agg->ident.type_instance, sizeof (agg->ident.type_instance)); + else if (strcasecmp ("SetHost", child->key) == 0) + cf_util_get_string (child, &agg->set_host); + else if (strcasecmp ("SetPlugin", child->key) == 0) + cf_util_get_string (child, &agg->set_plugin); + else if (strcasecmp ("SetPluginInstance", child->key) == 0) + cf_util_get_string (child, &agg->set_plugin_instance); + else if (strcasecmp ("SetTypeInstance", child->key) == 0) + cf_util_get_string (child, &agg->set_type_instance); else if (strcasecmp ("GroupBy", child->key) == 0) agg_config_handle_group_by (child, agg); else if (strcasecmp ("CalculateNum", child->key) == 0) @@ -505,6 +585,15 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ " blocks and will be ignored.", child->key); } + if (agg_is_regex (agg->ident.host)) + agg->regex_fields |= LU_GROUP_BY_HOST; + if (agg_is_regex (agg->ident.plugin)) + agg->regex_fields |= LU_GROUP_BY_PLUGIN; + if (agg_is_regex (agg->ident.plugin_instance)) + agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE; + if (agg_is_regex (agg->ident.type_instance)) + agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE; + /* Sanity checking */ is_valid = 1; if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */ @@ -525,15 +614,26 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ is_valid = 0; } /* }}} */ - if (!AGG_MATCHES_ALL (agg->ident.host) /* {{{ */ - && !AGG_MATCHES_ALL (agg->ident.plugin) - && !AGG_MATCHES_ALL (agg->ident.plugin_instance) - && !AGG_MATCHES_ALL (agg->ident.type_instance)) + /* Check that there is at least one regex field without a grouping. {{{ */ + if ((agg->regex_fields & ~agg->group_by) == 0) { ERROR ("aggregation plugin: An aggregation must contain at least one " "wildcard. This is achieved by leaving at least one of the \"Host\", " "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank " - "and not grouping by that field. " + "or using a regular expression and not grouping by that field. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + /* Check that all grouping fields are regular expressions. {{{ */ + if (agg->group_by & ~agg->regex_fields) + { + ERROR ("aggregation plugin: Only wildcard fields (fields for which a " + "regular expression is configured or which are left blank) can be " + "specified in the \"GroupBy\" option. " "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " "Type \"%s\", TypeInstance \"%s\")", agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,