aggregation plugin: Actually dispatch values.
authorFlorian Forster <octo@collectd.org>
Mon, 25 Jun 2012 11:29:07 +0000 (13:29 +0200)
committerFlorian Forster <octo@collectd.org>
Mon, 25 Jun 2012 11:29:07 +0000 (13:29 +0200)
src/aggregation.c

index 2e13766..c8f5779 100644 (file)
@@ -51,6 +51,7 @@ struct agg_instance_s;
 typedef struct agg_instance_s agg_instance_t;
 struct agg_instance_s /* {{{ */
 {
+  pthread_mutex_t lock;
   identifier_t ident;
 
   int ds_type;
@@ -116,8 +117,8 @@ static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
 } /* }}} void agg_instance_destroy */
 
 /* Create a new aggregation instance. */
-static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
-    aggregation_t *agg)
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl, aggregation_t *agg)
 {
   agg_instance_t *inst;
 
@@ -130,6 +131,9 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
     return (NULL);
   }
   memset (inst, 0, sizeof (*inst));
+  pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+  inst->ds_type = ds->ds[0].type;
 
 #define COPY_FIELD(fld) do { \
   sstrncpy (inst->ident.fld, \
@@ -145,7 +149,6 @@ static agg_instance_t *agg_instance_create (value_list_t const *vl, /* {{{ */
 
 #undef COPY_FIELD
 
-  inst->ds_type = -1;
   inst->min = NAN;
   inst->max = NAN;
 
@@ -204,6 +207,8 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */
     return (0);
   }
 
+  pthread_mutex_lock (&inst->lock);
+
   inst->num++;
   inst->sum += rate[0];
   inst->squares_sum += (rate[0] * rate[0]);
@@ -213,16 +218,138 @@ static int agg_instance_update (agg_instance_t *inst, /* {{{ */
   if (isnan (inst->max) || (inst->max < rate[0]))
     inst->max = rate[0];
 
+  pthread_mutex_unlock (&inst->lock);
+
   sfree (rate);
   return (0);
 } /* }}} int agg_instance_update */
 
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+  char const *func, gauge_t rate, rate_to_value_state_t *state,
+  value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+  value_t v;
+  int status;
+
+  if (pi_prefix[0] != 0)
+    ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+        pi_prefix, func);
+  else
+    sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+  memset (&v, 0, sizeof (v));
+  status = rate_to_value (&v, rate, state, inst->ds_type, t);
+  if (status != 0)
+  {
+    WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+        status);
+    return (-1);
+  }
+
+  vl->values = &v;
+  vl->values_len = 1;
+
+  plugin_dispatch_values_secure (vl);
+
+  vl->values = NULL;
+  vl->values_len = 0;
+
+  return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+  value_list_t vl = VALUE_LIST_INIT;
+  char pi_prefix[DATA_MAX_NAME_LEN];
+
+  /* Pre-set all the fields in the value list that will not change per
+   * aggregation type (sum, average, ...). The struct will be re-used and must
+   * therefore be dispatched using the "secure" function. */
+
+  vl.time = t;
+  vl.interval = 0;
+
+  vl.meta = meta_data_create ();
+  if (vl.meta == NULL)
+  {
+    ERROR ("aggregation plugin: meta_data_create failed.");
+    return (-1);
+  }
+  meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+  if (LU_IS_ALL (inst->ident.host))
+    sstrncpy (vl.host, "global", sizeof (vl.host));
+  else
+    sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+  sstrncpy (vl.plugin, "aggregate", sizeof (vl.plugin));
+
+  if (LU_IS_ALL (inst->ident.plugin))
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+    else
+      sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+  }
+  else
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+    else
+      ssnprintf (pi_prefix, sizeof (pi_prefix),
+          "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+  }
+
+  sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+  if (!LU_IS_ALL (inst->ident.type_instance))
+    sstrncpy (vl.type_instance, inst->ident.type_instance,
+        sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+  if (inst->state_ ## func != NULL) { \
+    agg_instance_read_func (inst, #func, rate, \
+        inst->state_ ## func, &vl, pi_prefix, t); \
+  } \
+} while (0)
+
+  pthread_mutex_lock (&inst->lock);
+
+  READ_FUNC (num, (gauge_t) inst->num);
+
+  /* All other aggregations are only defined when there have been any values
+   * at all. */
+  if (inst->num > 0)
+  {
+    READ_FUNC (sum, inst->sum);
+    READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+    READ_FUNC (min, inst->min);
+    READ_FUNC (max, inst->max);
+    READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+          - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+  }
+
+  /* Reset internal state. */
+  inst->num = 0;
+  inst->sum = 0.0;
+  inst->squares_sum = 0.0;
+  inst->min = NAN;
+  inst->max = NAN;
+
+  pthread_mutex_unlock (&inst->lock);
+
+  meta_data_destroy (vl.meta);
+  vl.meta = NULL;
+
+  return (0);
+} /* }}} int agg_instance_read */
+
 /* lookup_class_callback_t for utils_vl_lookup */
 static void *agg_lookup_class_callback ( /* {{{ */
     __attribute__((unused)) data_set_t const *ds,
     value_list_t const *vl, void *user_class)
 {
-  return (agg_instance_create (vl, (aggregation_t *) user_class));
+  return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
 } /* }}} void *agg_class_callback */
 
 /* lookup_obj_callback_t for utils_vl_lookup */
@@ -364,24 +491,28 @@ static int agg_config (oconfig_item_t *ci) /* {{{ */
 static int agg_read (void) /* {{{ */
 {
   agg_instance_t *this;
-  size_t i = 0;
+  cdtime_t t;
+  int success;
+
+  t = cdtime ();
+  success = 0;
 
   pthread_mutex_lock (&agg_instance_list_lock);
+
   for (this = agg_instance_list_head; this != NULL; this = this->next)
   {
-    DEBUG ("aggregation plugin: Handling instance: host = \"%s\", "
-        "plugin = \"%s\", plugin_instance = \"%s\", "
-        "type = \"%s\", type_instance = \"%s\"",
-        this->ident.host,
-        this->ident.plugin, this->ident.plugin_instance,
-        this->ident.type, this->ident.type_instance);
-    i++;
+    int status;
+
+    status = agg_instance_read (this, t);
+    if (status != 0)
+      WARNING ("aggregation plugin: Reading an aggregation instance "
+          "failed with status %i.", status);
+    else
+      success++;
   }
   pthread_mutex_unlock (&agg_instance_list_lock);
 
-  DEBUG ("aggregation plugin: There are currently %zu instances.", i);
-
-  return (0);
+  return ((success > 0) ? 0 : -1);
 } /* }}} int agg_read */
 
 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */