Merge branch 'ff/aggregate'
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
34
35 #include <pthread.h>
36
37 struct aggregation_s /* {{{ */
38 {
39   identifier_t ident;
40
41   _Bool calc_num;
42   _Bool calc_sum;
43   _Bool calc_average;
44   _Bool calc_min;
45   _Bool calc_max;
46   _Bool calc_stddev;
47 }; /* }}} */
48 typedef struct aggregation_s aggregation_t;
49
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
53 {
54   pthread_mutex_t lock;
55   identifier_t ident;
56
57   int ds_type;
58
59   derive_t num;
60   gauge_t sum;
61   gauge_t squares_sum;
62
63   gauge_t min;
64   gauge_t max;
65
66   rate_to_value_state_t *state_num;
67   rate_to_value_state_t *state_sum;
68   rate_to_value_state_t *state_average;
69   rate_to_value_state_t *state_min;
70   rate_to_value_state_t *state_max;
71   rate_to_value_state_t *state_stddev;
72
73   agg_instance_t *next;
74 }; /* }}} */
75
76 static lookup_t *lookup = NULL;
77
78 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
79 static agg_instance_t *agg_instance_list_head = NULL;
80
81 static void agg_destroy (aggregation_t *agg) /* {{{ */
82 {
83   sfree (agg);
84 } /* }}} void agg_destroy */
85
86 /* Frees all dynamically allocated memory within the instance. */
87 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
88 {
89   if (inst == NULL)
90     return;
91
92   /* Remove this instance from the global list of instances. */
93   pthread_mutex_lock (&agg_instance_list_lock);
94   if (agg_instance_list_head == inst)
95     agg_instance_list_head = inst->next;
96   else if (agg_instance_list_head != NULL)
97   {
98     agg_instance_t *prev = agg_instance_list_head;
99     while ((prev != NULL) && (prev->next != inst))
100       prev = prev->next;
101     if (prev != NULL)
102       prev->next = inst->next;
103   }
104   pthread_mutex_unlock (&agg_instance_list_lock);
105
106   sfree (inst->state_num);
107   sfree (inst->state_sum);
108   sfree (inst->state_average);
109   sfree (inst->state_min);
110   sfree (inst->state_max);
111   sfree (inst->state_stddev);
112
113   memset (inst, 0, sizeof (*inst));
114   inst->ds_type = -1;
115   inst->min = NAN;
116   inst->max = NAN;
117 } /* }}} void agg_instance_destroy */
118
119 /* Create a new aggregation instance. */
120 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
121     value_list_t const *vl, aggregation_t *agg)
122 {
123   agg_instance_t *inst;
124
125   DEBUG ("aggregation plugin: Creating new instance.");
126
127   inst = malloc (sizeof (*inst));
128   if (inst == NULL)
129   {
130     ERROR ("aggregation plugin: malloc() failed.");
131     return (NULL);
132   }
133   memset (inst, 0, sizeof (*inst));
134   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
135
136   inst->ds_type = ds->ds[0].type;
137
138 #define COPY_FIELD(fld) do { \
139   sstrncpy (inst->ident.fld, \
140       LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
141       sizeof (inst->ident.fld)); \
142 } while (0)
143
144   COPY_FIELD (host);
145   COPY_FIELD (plugin);
146   COPY_FIELD (plugin_instance);
147   COPY_FIELD (type);
148   COPY_FIELD (type_instance);
149
150 #undef COPY_FIELD
151
152   inst->min = NAN;
153   inst->max = NAN;
154
155 #define INIT_STATE(field) do { \
156   inst->state_ ## field = NULL; \
157   if (agg->calc_ ## field) { \
158     inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
159     if (inst->state_ ## field == NULL) { \
160       agg_instance_destroy (inst); \
161       ERROR ("aggregation plugin: malloc() failed."); \
162       return (NULL); \
163     } \
164     memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
165   } \
166 } while (0)
167
168   INIT_STATE (num);
169   INIT_STATE (sum);
170   INIT_STATE (average);
171   INIT_STATE (min);
172   INIT_STATE (max);
173   INIT_STATE (stddev);
174
175 #undef INIT_STATE
176
177   pthread_mutex_lock (&agg_instance_list_lock);
178   inst->next = agg_instance_list_head;
179   agg_instance_list_head = inst;
180   pthread_mutex_unlock (&agg_instance_list_lock);
181
182   return (inst);
183 } /* }}} agg_instance_t *agg_instance_create */
184
185 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
186  * the rate of the value list is available. Value lists with more than one data
187  * source are not supported and will return an error. Returns zero on success
188  * and non-zero otherwise. */
189 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
190     data_set_t const *ds, value_list_t const *vl)
191 {
192   gauge_t *rate;
193
194   if (ds->ds_num != 1)
195     return (-1);
196
197   rate = uc_get_rate (ds, vl);
198   if (rate == NULL)
199   {
200     ERROR ("aggregation plugin: uc_get_rate() failed.");
201     return (-1);
202   }
203
204   if (isnan (rate[0]))
205   {
206     sfree (rate);
207     return (0);
208   }
209
210   pthread_mutex_lock (&inst->lock);
211
212   inst->num++;
213   inst->sum += rate[0];
214   inst->squares_sum += (rate[0] * rate[0]);
215
216   if (isnan (inst->min) || (inst->min > rate[0]))
217     inst->min = rate[0];
218   if (isnan (inst->max) || (inst->max < rate[0]))
219     inst->max = rate[0];
220
221   pthread_mutex_unlock (&inst->lock);
222
223   sfree (rate);
224   return (0);
225 } /* }}} int agg_instance_update */
226
227 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
228   char const *func, gauge_t rate, rate_to_value_state_t *state,
229   value_list_t *vl, char const *pi_prefix, cdtime_t t)
230 {
231   value_t v;
232   int status;
233
234   if (pi_prefix[0] != 0)
235     ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
236         pi_prefix, func);
237   else
238     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
239
240   memset (&v, 0, sizeof (v));
241   status = rate_to_value (&v, rate, state, inst->ds_type, t);
242   if (status != 0)
243   {
244     /* If this is the first iteration and rate_to_value() was asked to return a
245      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
246      * gracefully. */
247     if (status == EAGAIN)
248       return (0);
249
250     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
251         status);
252     return (-1);
253   }
254
255   vl->values = &v;
256   vl->values_len = 1;
257
258   plugin_dispatch_values_secure (vl);
259
260   vl->values = NULL;
261   vl->values_len = 0;
262
263   return (0);
264 } /* }}} int agg_instance_read_func */
265
266 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
267 {
268   value_list_t vl = VALUE_LIST_INIT;
269   char pi_prefix[DATA_MAX_NAME_LEN];
270
271   /* Pre-set all the fields in the value list that will not change per
272    * aggregation type (sum, average, ...). The struct will be re-used and must
273    * therefore be dispatched using the "secure" function. */
274
275   vl.time = t;
276   vl.interval = 0;
277
278   vl.meta = meta_data_create ();
279   if (vl.meta == NULL)
280   {
281     ERROR ("aggregation plugin: meta_data_create failed.");
282     return (-1);
283   }
284   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
285
286   if (LU_IS_ALL (inst->ident.host))
287     sstrncpy (vl.host, "global", sizeof (vl.host));
288   else
289     sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
290
291   sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
292
293   if (LU_IS_ALL (inst->ident.plugin))
294   {
295     if (LU_IS_ALL (inst->ident.plugin_instance))
296       sstrncpy (pi_prefix, "", sizeof (pi_prefix));
297     else
298       sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
299   }
300   else
301   {
302     if (LU_IS_ALL (inst->ident.plugin_instance))
303       sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
304     else
305       ssnprintf (pi_prefix, sizeof (pi_prefix),
306           "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
307   }
308
309   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
310
311   if (!LU_IS_ALL (inst->ident.type_instance))
312     sstrncpy (vl.type_instance, inst->ident.type_instance,
313         sizeof (vl.type_instance));
314
315 #define READ_FUNC(func, rate) do { \
316   if (inst->state_ ## func != NULL) { \
317     agg_instance_read_func (inst, #func, rate, \
318         inst->state_ ## func, &vl, pi_prefix, t); \
319   } \
320 } while (0)
321
322   pthread_mutex_lock (&inst->lock);
323
324   READ_FUNC (num, (gauge_t) inst->num);
325
326   /* All other aggregations are only defined when there have been any values
327    * at all. */
328   if (inst->num > 0)
329   {
330     READ_FUNC (sum, inst->sum);
331     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
332     READ_FUNC (min, inst->min);
333     READ_FUNC (max, inst->max);
334     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
335           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
336   }
337
338   /* Reset internal state. */
339   inst->num = 0;
340   inst->sum = 0.0;
341   inst->squares_sum = 0.0;
342   inst->min = NAN;
343   inst->max = NAN;
344
345   pthread_mutex_unlock (&inst->lock);
346
347   meta_data_destroy (vl.meta);
348   vl.meta = NULL;
349
350   return (0);
351 } /* }}} int agg_instance_read */
352
353 /* lookup_class_callback_t for utils_vl_lookup */
354 static void *agg_lookup_class_callback ( /* {{{ */
355     __attribute__((unused)) data_set_t const *ds,
356     value_list_t const *vl, void *user_class)
357 {
358   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
359 } /* }}} void *agg_class_callback */
360
361 /* lookup_obj_callback_t for utils_vl_lookup */
362 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
363     value_list_t const *vl,
364     __attribute__((unused)) void *user_class,
365     void *user_obj)
366 {
367   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
368 } /* }}} int agg_lookup_obj_callback */
369
370 /* lookup_free_class_callback_t for utils_vl_lookup */
371 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
372 {
373   agg_destroy ((aggregation_t *) user_class);
374 } /* }}} void agg_lookup_free_class_callback */
375
376 /* lookup_free_obj_callback_t for utils_vl_lookup */
377 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
378 {
379   agg_instance_destroy ((agg_instance_t *) user_obj);
380 } /* }}} void agg_lookup_free_obj_callback */
381
382 /*
383  * <Plugin "aggregation">
384  *   <Aggregation>
385  *     Plugin "cpu"
386  *     Type "cpu"
387  *
388  *     GroupBy Host
389  *     GroupBy TypeInstance
390  *
391  *     CalculateNum true
392  *     CalculateSum true
393  *     CalculateAverage true
394  *     CalculateMinimum true
395  *     CalculateMaximum true
396  *     CalculateStddev true
397  *   </Aggregation>
398  * </Plugin>
399  */
400 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
401     aggregation_t *agg)
402 {
403   int i;
404
405   for (i = 0; i < ci->values_num; i++)
406   {
407     char const *value;
408
409     if (ci->values[i].type != OCONFIG_TYPE_STRING)
410     {
411       ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
412           "is not a string.", i + 1);
413       continue;
414     }
415
416     value = ci->values[i].value.string;
417
418     if (strcasecmp ("Host", value) == 0)
419       sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
420     else if (strcasecmp ("Plugin", value) == 0)
421       sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
422     else if (strcasecmp ("PluginInstance", value) == 0)
423       sstrncpy (agg->ident.plugin_instance, LU_ANY,
424           sizeof (agg->ident.plugin_instance));
425     else if (strcasecmp ("TypeInstance", value) == 0)
426       sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
427     else if (strcasecmp ("Type", value) == 0)
428       ERROR ("aggregation plugin: Grouping by type is not supported.");
429     else
430       WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
431           "option is invalid and will be ignored.", value);
432   } /* for (ci->values) */
433
434   return (0);
435 } /* }}} int agg_config_handle_group_by */
436
437 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
438 {
439   aggregation_t *agg;
440   _Bool is_valid;
441   int status;
442   int i;
443
444   agg = malloc (sizeof (*agg));
445   if (agg == NULL)
446   {
447     ERROR ("aggregation plugin: malloc failed.");
448     return (-1);
449   }
450   memset (agg, 0, sizeof (*agg));
451
452   sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
453   sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
454   sstrncpy (agg->ident.plugin_instance, LU_ALL,
455       sizeof (agg->ident.plugin_instance));
456   sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
457   sstrncpy (agg->ident.type_instance, LU_ALL,
458       sizeof (agg->ident.type_instance));
459
460   for (i = 0; i < ci->children_num; i++)
461   {
462     oconfig_item_t *child = ci->children + i;
463
464     if (strcasecmp ("Host", child->key) == 0)
465       cf_util_get_string_buffer (child, agg->ident.host,
466           sizeof (agg->ident.host));
467     else if (strcasecmp ("Plugin", child->key) == 0)
468       cf_util_get_string_buffer (child, agg->ident.plugin,
469           sizeof (agg->ident.plugin));
470     else if (strcasecmp ("PluginInstance", child->key) == 0)
471       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
472           sizeof (agg->ident.plugin_instance));
473     else if (strcasecmp ("Type", child->key) == 0)
474       cf_util_get_string_buffer (child, agg->ident.type,
475           sizeof (agg->ident.type));
476     else if (strcasecmp ("TypeInstance", child->key) == 0)
477       cf_util_get_string_buffer (child, agg->ident.type_instance,
478           sizeof (agg->ident.type_instance));
479     else if (strcasecmp ("GroupBy", child->key) == 0)
480       agg_config_handle_group_by (child, agg);
481     else if (strcasecmp ("CalculateNum", child->key) == 0)
482       cf_util_get_boolean (child, &agg->calc_num);
483     else if (strcasecmp ("CalculateSum", child->key) == 0)
484       cf_util_get_boolean (child, &agg->calc_sum);
485     else if (strcasecmp ("CalculateAverage", child->key) == 0)
486       cf_util_get_boolean (child, &agg->calc_average);
487     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
488       cf_util_get_boolean (child, &agg->calc_min);
489     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
490       cf_util_get_boolean (child, &agg->calc_max);
491     else if (strcasecmp ("CalculateStddev", child->key) == 0)
492       cf_util_get_boolean (child, &agg->calc_stddev);
493     else
494       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
495           "<Aggregation /> blocks and will be ignored.", child->key);
496   }
497
498   /* Sanity checking */
499   is_valid = 1;
500   if (LU_IS_ALL (agg->ident.type)) /* {{{ */
501   {
502     ERROR ("aggregation plugin: It appears you did not specify the required "
503         "\"Type\" option in this aggregation. "
504         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
505         "Type \"%s\", TypeInstance \"%s\")",
506         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
507         agg->ident.type, agg->ident.type_instance);
508     is_valid = 0;
509   }
510   else if (strchr (agg->ident.type, '/') != NULL)
511   {
512     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
513         "character. Especially, it may not be a wildcard. The current "
514         "value is \"%s\".", agg->ident.type);
515     is_valid = 0;
516   } /* }}} */
517
518   if (!LU_IS_ALL (agg->ident.host) /* {{{ */
519       && !LU_IS_ALL (agg->ident.plugin)
520       && !LU_IS_ALL (agg->ident.plugin_instance)
521       && !LU_IS_ALL (agg->ident.type_instance))
522   {
523     ERROR ("aggregation plugin: An aggregation must contain at least one "
524         "wildcard. This is achieved by leaving at least one of the \"Host\", "
525         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
526         "and not grouping by that field. "
527         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
528         "Type \"%s\", TypeInstance \"%s\")",
529         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
530         agg->ident.type, agg->ident.type_instance);
531     is_valid = 0;
532   } /* }}} */
533
534   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
535       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
536   {
537     ERROR ("aggregation plugin: No aggregation function has been specified. "
538         "Without this, I don't know what I should be calculating. "
539         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
540         "Type \"%s\", TypeInstance \"%s\")",
541         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
542         agg->ident.type, agg->ident.type_instance);
543     is_valid = 0;
544   } /* }}} */
545
546   if (!is_valid) /* {{{ */
547   {
548     sfree (agg);
549     return (-1);
550   } /* }}} */
551
552   status = lookup_add (lookup, &agg->ident, agg);
553   if (status != 0)
554   {
555     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
556     sfree (agg);
557     return (-1);
558   }
559
560   DEBUG ("aggregation plugin: Successfully added aggregation: "
561       "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
562       "Type \"%s\", TypeInstance \"%s\")",
563       agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
564       agg->ident.type, agg->ident.type_instance);
565   return (0);
566 } /* }}} int agg_config_aggregation */
567
568 static int agg_config (oconfig_item_t *ci) /* {{{ */
569 {
570   int i;
571
572   pthread_mutex_lock (&agg_instance_list_lock);
573
574   if (lookup == NULL)
575   {
576     lookup = lookup_create (agg_lookup_class_callback,
577         agg_lookup_obj_callback,
578         agg_lookup_free_class_callback,
579         agg_lookup_free_obj_callback);
580     if (lookup == NULL)
581     {
582       pthread_mutex_unlock (&agg_instance_list_lock);
583       ERROR ("aggregation plugin: lookup_create failed.");
584       return (-1);
585     }
586   }
587
588   for (i = 0; i < ci->children_num; i++)
589   {
590     oconfig_item_t *child = ci->children + i;
591
592     if (strcasecmp ("Aggregation", child->key) == 0)
593       agg_config_aggregation (child);
594     else
595       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
596           "<Plugin aggregation /> blocks and will be ignored.", child->key);
597   }
598
599   pthread_mutex_unlock (&agg_instance_list_lock);
600
601   return (0);
602 } /* }}} int agg_config */
603
604 static int agg_read (void) /* {{{ */
605 {
606   agg_instance_t *this;
607   cdtime_t t;
608   int success;
609
610   t = cdtime ();
611   success = 0;
612
613   pthread_mutex_lock (&agg_instance_list_lock);
614
615   /* agg_instance_list_head only holds data, after the "write" callback has
616    * been called with a matching value list at least once. So on startup,
617    * there's a race between the aggregations read() and write() callback. If
618    * the read() callback is called first, agg_instance_list_head is NULL and
619    * "success" may be zero. This is expected and should not result in an error.
620    * Therefore we need to handle this case separately. */
621   if (agg_instance_list_head == NULL)
622   {
623     pthread_mutex_unlock (&agg_instance_list_lock);
624     return (0);
625   }
626
627   for (this = agg_instance_list_head; this != NULL; this = this->next)
628   {
629     int status;
630
631     status = agg_instance_read (this, t);
632     if (status != 0)
633       WARNING ("aggregation plugin: Reading an aggregation instance "
634           "failed with status %i.", status);
635     else
636       success++;
637   }
638
639   pthread_mutex_unlock (&agg_instance_list_lock);
640
641   return ((success > 0) ? 0 : -1);
642 } /* }}} int agg_read */
643
644 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
645     __attribute__((unused)) user_data_t *user_data)
646 {
647   _Bool created_by_aggregation = 0;
648   int status;
649
650   /* Ignore values that were created by the aggregation plugin to avoid weird
651    * effects. */
652   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
653       &created_by_aggregation);
654   if (created_by_aggregation)
655     return (0);
656
657   if (lookup == NULL)
658     status = ENOENT;
659   else
660   {
661     status = lookup_search (lookup, ds, vl);
662     if (status > 0)
663       status = 0;
664   }
665
666   return (status);
667 } /* }}} int agg_write */
668
669 void module_register (void)
670 {
671   plugin_register_complex_config ("aggregation", agg_config);
672   plugin_register_read ("aggregation", agg_read);
673   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
674 }
675
676 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */