src/utils_vl_lookup.[ch]: Support selecting values by regex.
[collectd.git] / src / aggregation.c
index 985fa7c..7ca26ca 100644 (file)
 
 #include <pthread.h>
 
+#define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
+
 struct aggregation_s /* {{{ */
 {
   identifier_t ident;
+  unsigned int group_by;
 
   _Bool calc_num;
   _Bool calc_sum;
@@ -135,17 +138,17 @@ static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
 
   inst->ds_type = ds->ds[0].type;
 
-#define COPY_FIELD(fld) do { \
-  sstrncpy (inst->ident.fld, \
-      LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
-      sizeof (inst->ident.fld)); \
+#define COPY_FIELD(field, group_mask) do { \
+  sstrncpy (inst->ident.field, \
+      (agg->group_by & group_mask) ? vl->field : agg->ident.field, \
+      sizeof (inst->ident.field)); \
 } while (0)
 
-  COPY_FIELD (host);
-  COPY_FIELD (plugin);
-  COPY_FIELD (plugin_instance);
-  COPY_FIELD (type);
-  COPY_FIELD (type_instance);
+  COPY_FIELD (host, LU_GROUP_BY_HOST);
+  COPY_FIELD (plugin, LU_GROUP_BY_PLUGIN);
+  COPY_FIELD (plugin_instance, LU_GROUP_BY_PLUGIN_INSTANCE);
+  COPY_FIELD (type, /* group_mask = */ 0);
+  COPY_FIELD (type_instance, LU_GROUP_BY_TYPE_INSTANCE);
 
 #undef COPY_FIELD
 
@@ -192,13 +195,21 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */
   gauge_t *rate;
 
   if (ds->ds_num != 1)
-    return (-1);
+  {
+    ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
+        "data source. This is currently not supported by this plugin. "
+        "Sorry.", ds->type);
+    return (EINVAL);
+  }
 
   rate = uc_get_rate (ds, vl);
   if (rate == NULL)
   {
-    ERROR ("aggregation plugin: uc_get_rate() failed.");
-    return (-1);
+    char ident[6 * DATA_MAX_NAME_LEN];
+    FORMAT_VL (ident, sizeof (ident), vl);
+    ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
+        ident);
+    return (ENOENT);
   }
 
   if (isnan (rate[0]))
@@ -241,6 +252,12 @@ static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
   status = rate_to_value (&v, rate, state, inst->ds_type, t);
   if (status != 0)
   {
+    /* If this is the first iteration and rate_to_value() was asked to return a
+     * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+     * gracefully. */
+    if (status == EAGAIN)
+      return (0);
+
     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
         status);
     return (-1);
@@ -277,23 +294,23 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
   }
   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
 
-  if (LU_IS_ALL (inst->ident.host))
+  if (AGG_MATCHES_ALL (inst->ident.host))
     sstrncpy (vl.host, "global", sizeof (vl.host));
   else
     sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
 
   sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
 
-  if (LU_IS_ALL (inst->ident.plugin))
+  if (AGG_MATCHES_ALL (inst->ident.plugin))
   {
-    if (LU_IS_ALL (inst->ident.plugin_instance))
+    if (AGG_MATCHES_ALL (inst->ident.plugin_instance))
       sstrncpy (pi_prefix, "", sizeof (pi_prefix));
     else
       sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
   }
   else
   {
-    if (LU_IS_ALL (inst->ident.plugin_instance))
+    if (AGG_MATCHES_ALL (inst->ident.plugin_instance))
       sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
     else
       ssnprintf (pi_prefix, sizeof (pi_prefix),
@@ -302,7 +319,7 @@ static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
 
   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
 
-  if (!LU_IS_ALL (inst->ident.type_instance))
+  if (!AGG_MATCHES_ALL (inst->ident.type_instance))
     sstrncpy (vl.type_instance, inst->ident.type_instance,
         sizeof (vl.type_instance));
 
@@ -410,14 +427,13 @@ static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
     value = ci->values[i].value.string;
 
     if (strcasecmp ("Host", value) == 0)
-      sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+      agg->group_by |= LU_GROUP_BY_HOST;
     else if (strcasecmp ("Plugin", value) == 0)
-      sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+      agg->group_by |= LU_GROUP_BY_PLUGIN;
     else if (strcasecmp ("PluginInstance", value) == 0)
-      sstrncpy (agg->ident.plugin_instance, LU_ANY,
-          sizeof (agg->ident.plugin_instance));
+      agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
     else if (strcasecmp ("TypeInstance", value) == 0)
-      sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+      agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
     else if (strcasecmp ("Type", value) == 0)
       ERROR ("aggregation plugin: Grouping by type is not supported.");
     else
@@ -443,12 +459,12 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
   }
   memset (agg, 0, sizeof (*agg));
 
-  sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
-  sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
-  sstrncpy (agg->ident.plugin_instance, LU_ALL,
+  sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
+  sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
+  sstrncpy (agg->ident.plugin_instance, "/.*/",
       sizeof (agg->ident.plugin_instance));
-  sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
-  sstrncpy (agg->ident.type_instance, LU_ALL,
+  sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
+  sstrncpy (agg->ident.type_instance, "/.*/",
       sizeof (agg->ident.type_instance));
 
   for (i = 0; i < ci->children_num; i++)
@@ -491,7 +507,7 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
 
   /* Sanity checking */
   is_valid = 1;
-  if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+  if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
   {
     ERROR ("aggregation plugin: It appears you did not specify the required "
         "\"Type\" option in this aggregation. "
@@ -504,15 +520,15 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
   else if (strchr (agg->ident.type, '/') != NULL)
   {
     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
-        "character. Especially, it may not be a wildcard. The current "
+        "character. Especially, it may not be a regex. The current "
         "value is \"%s\".", agg->ident.type);
     is_valid = 0;
   } /* }}} */
 
-  if (!LU_IS_ALL (agg->ident.host) /* {{{ */
-      && !LU_IS_ALL (agg->ident.plugin)
-      && !LU_IS_ALL (agg->ident.plugin_instance)
-      && !LU_IS_ALL (agg->ident.type_instance))
+  if (!AGG_MATCHES_ALL (agg->ident.host) /* {{{ */
+      && !AGG_MATCHES_ALL (agg->ident.plugin)
+      && !AGG_MATCHES_ALL (agg->ident.plugin_instance)
+      && !AGG_MATCHES_ALL (agg->ident.type_instance))
   {
     ERROR ("aggregation plugin: An aggregation must contain at least one "
         "wildcard. This is achieved by leaving at least one of the \"Host\", "
@@ -543,7 +559,7 @@ static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
     return (-1);
   } /* }}} */
 
-  status = lookup_add (lookup, &agg->ident, agg);
+  status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
   if (status != 0)
   {
     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
@@ -606,6 +622,18 @@ static int agg_read (void) /* {{{ */
 
   pthread_mutex_lock (&agg_instance_list_lock);
 
+  /* agg_instance_list_head only holds data, after the "write" callback has
+   * been called with a matching value list at least once. So on startup,
+   * there's a race between the aggregations read() and write() callback. If
+   * the read() callback is called first, agg_instance_list_head is NULL and
+   * "success" may be zero. This is expected and should not result in an error.
+   * Therefore we need to handle this case separately. */
+  if (agg_instance_list_head == NULL)
+  {
+    pthread_mutex_unlock (&agg_instance_list_lock);
+    return (0);
+  }
+
   for (this = agg_instance_list_head; this != NULL; this = this->next)
   {
     int status;