db33c177c039fa1676d9fcdc690b47d428f9a82b
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28 #include "plugin.h"
29 #include "common.h"
30 #include "configfile.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_vl_lookup.h"
34
35 #include <pthread.h>
36
37 struct aggregation_s /* {{{ */
38 {
39   identifier_t ident;
40
41   _Bool calc_num;
42   _Bool calc_sum;
43   _Bool calc_average;
44   _Bool calc_min;
45   _Bool calc_max;
46   _Bool calc_stddev;
47 }; /* }}} */
48 typedef struct aggregation_s aggregation_t;
49
50 struct agg_instance_s;
51 typedef struct agg_instance_s agg_instance_t;
52 struct agg_instance_s /* {{{ */
53 {
54   pthread_mutex_t lock;
55   identifier_t ident;
56
57   int ds_type;
58
59   derive_t num;
60   gauge_t sum;
61   gauge_t squares_sum;
62
63   gauge_t min;
64   gauge_t max;
65
66   rate_to_value_state_t *state_num;
67   rate_to_value_state_t *state_sum;
68   rate_to_value_state_t *state_average;
69   rate_to_value_state_t *state_min;
70   rate_to_value_state_t *state_max;
71   rate_to_value_state_t *state_stddev;
72
73   agg_instance_t *next;
74 }; /* }}} */
75
76 static lookup_t *lookup = NULL;
77
78 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
79 static agg_instance_t *agg_instance_list_head = NULL;
80
81 static void agg_destroy (aggregation_t *agg) /* {{{ */
82 {
83   sfree (agg);
84 } /* }}} void agg_destroy */
85
86 /* Frees all dynamically allocated memory within the instance. */
87 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
88 {
89   if (inst == NULL)
90     return;
91
92   /* Remove this instance from the global list of instances. */
93   pthread_mutex_lock (&agg_instance_list_lock);
94   if (agg_instance_list_head == inst)
95     agg_instance_list_head = inst->next;
96   else if (agg_instance_list_head != NULL)
97   {
98     agg_instance_t *prev = agg_instance_list_head;
99     while ((prev != NULL) && (prev->next != inst))
100       prev = prev->next;
101     if (prev != NULL)
102       prev->next = inst->next;
103   }
104   pthread_mutex_unlock (&agg_instance_list_lock);
105
106   sfree (inst->state_num);
107   sfree (inst->state_sum);
108   sfree (inst->state_average);
109   sfree (inst->state_min);
110   sfree (inst->state_max);
111   sfree (inst->state_stddev);
112
113   memset (inst, 0, sizeof (*inst));
114   inst->ds_type = -1;
115   inst->min = NAN;
116   inst->max = NAN;
117 } /* }}} void agg_instance_destroy */
118
119 /* Create a new aggregation instance. */
120 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
121     value_list_t const *vl, aggregation_t *agg)
122 {
123   agg_instance_t *inst;
124
125   DEBUG ("aggregation plugin: Creating new instance.");
126
127   inst = malloc (sizeof (*inst));
128   if (inst == NULL)
129   {
130     ERROR ("aggregation plugin: malloc() failed.");
131     return (NULL);
132   }
133   memset (inst, 0, sizeof (*inst));
134   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
135
136   inst->ds_type = ds->ds[0].type;
137
138 #define COPY_FIELD(fld) do { \
139   sstrncpy (inst->ident.fld, \
140       LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
141       sizeof (inst->ident.fld)); \
142 } while (0)
143
144   COPY_FIELD (host);
145   COPY_FIELD (plugin);
146   COPY_FIELD (plugin_instance);
147   COPY_FIELD (type);
148   COPY_FIELD (type_instance);
149
150 #undef COPY_FIELD
151
152   inst->min = NAN;
153   inst->max = NAN;
154
155 #define INIT_STATE(field) do { \
156   inst->state_ ## field = NULL; \
157   if (agg->calc_ ## field) { \
158     inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
159     if (inst->state_ ## field == NULL) { \
160       agg_instance_destroy (inst); \
161       ERROR ("aggregation plugin: malloc() failed."); \
162       return (NULL); \
163     } \
164     memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
165   } \
166 } while (0)
167
168   INIT_STATE (num);
169   INIT_STATE (sum);
170   INIT_STATE (average);
171   INIT_STATE (min);
172   INIT_STATE (max);
173   INIT_STATE (stddev);
174
175 #undef INIT_STATE
176
177   pthread_mutex_lock (&agg_instance_list_lock);
178   inst->next = agg_instance_list_head;
179   agg_instance_list_head = inst;
180   pthread_mutex_unlock (&agg_instance_list_lock);
181
182   return (inst);
183 } /* }}} agg_instance_t *agg_instance_create */
184
185 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
186  * the rate of the value list is available. Value lists with more than one data
187  * source are not supported and will return an error. Returns zero on success
188  * and non-zero otherwise. */
189 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
190     data_set_t const *ds, value_list_t const *vl)
191 {
192   gauge_t *rate;
193
194   if (ds->ds_num != 1)
195   {
196     ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
197         "data source. This is currently not supported by this plugin. "
198         "Sorry.", ds->type);
199     return (EINVAL);
200   }
201
202   rate = uc_get_rate (ds, vl);
203   if (rate == NULL)
204   {
205     char ident[6 * DATA_MAX_NAME_LEN];
206     FORMAT_VL (ident, sizeof (ident), vl);
207     ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
208         ident);
209     return (ENOENT);
210   }
211
212   if (isnan (rate[0]))
213   {
214     sfree (rate);
215     return (0);
216   }
217
218   pthread_mutex_lock (&inst->lock);
219
220   inst->num++;
221   inst->sum += rate[0];
222   inst->squares_sum += (rate[0] * rate[0]);
223
224   if (isnan (inst->min) || (inst->min > rate[0]))
225     inst->min = rate[0];
226   if (isnan (inst->max) || (inst->max < rate[0]))
227     inst->max = rate[0];
228
229   pthread_mutex_unlock (&inst->lock);
230
231   sfree (rate);
232   return (0);
233 } /* }}} int agg_instance_update */
234
235 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
236   char const *func, gauge_t rate, rate_to_value_state_t *state,
237   value_list_t *vl, char const *pi_prefix, cdtime_t t)
238 {
239   value_t v;
240   int status;
241
242   if (pi_prefix[0] != 0)
243     ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
244         pi_prefix, func);
245   else
246     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
247
248   memset (&v, 0, sizeof (v));
249   status = rate_to_value (&v, rate, state, inst->ds_type, t);
250   if (status != 0)
251   {
252     /* If this is the first iteration and rate_to_value() was asked to return a
253      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
254      * gracefully. */
255     if (status == EAGAIN)
256       return (0);
257
258     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
259         status);
260     return (-1);
261   }
262
263   vl->values = &v;
264   vl->values_len = 1;
265
266   plugin_dispatch_values_secure (vl);
267
268   vl->values = NULL;
269   vl->values_len = 0;
270
271   return (0);
272 } /* }}} int agg_instance_read_func */
273
274 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
275 {
276   value_list_t vl = VALUE_LIST_INIT;
277   char pi_prefix[DATA_MAX_NAME_LEN];
278
279   /* Pre-set all the fields in the value list that will not change per
280    * aggregation type (sum, average, ...). The struct will be re-used and must
281    * therefore be dispatched using the "secure" function. */
282
283   vl.time = t;
284   vl.interval = 0;
285
286   vl.meta = meta_data_create ();
287   if (vl.meta == NULL)
288   {
289     ERROR ("aggregation plugin: meta_data_create failed.");
290     return (-1);
291   }
292   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
293
294   if (LU_IS_ALL (inst->ident.host))
295     sstrncpy (vl.host, "global", sizeof (vl.host));
296   else
297     sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
298
299   sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
300
301   if (LU_IS_ALL (inst->ident.plugin))
302   {
303     if (LU_IS_ALL (inst->ident.plugin_instance))
304       sstrncpy (pi_prefix, "", sizeof (pi_prefix));
305     else
306       sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
307   }
308   else
309   {
310     if (LU_IS_ALL (inst->ident.plugin_instance))
311       sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
312     else
313       ssnprintf (pi_prefix, sizeof (pi_prefix),
314           "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
315   }
316
317   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
318
319   if (!LU_IS_ALL (inst->ident.type_instance))
320     sstrncpy (vl.type_instance, inst->ident.type_instance,
321         sizeof (vl.type_instance));
322
323 #define READ_FUNC(func, rate) do { \
324   if (inst->state_ ## func != NULL) { \
325     agg_instance_read_func (inst, #func, rate, \
326         inst->state_ ## func, &vl, pi_prefix, t); \
327   } \
328 } while (0)
329
330   pthread_mutex_lock (&inst->lock);
331
332   READ_FUNC (num, (gauge_t) inst->num);
333
334   /* All other aggregations are only defined when there have been any values
335    * at all. */
336   if (inst->num > 0)
337   {
338     READ_FUNC (sum, inst->sum);
339     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
340     READ_FUNC (min, inst->min);
341     READ_FUNC (max, inst->max);
342     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
343           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
344   }
345
346   /* Reset internal state. */
347   inst->num = 0;
348   inst->sum = 0.0;
349   inst->squares_sum = 0.0;
350   inst->min = NAN;
351   inst->max = NAN;
352
353   pthread_mutex_unlock (&inst->lock);
354
355   meta_data_destroy (vl.meta);
356   vl.meta = NULL;
357
358   return (0);
359 } /* }}} int agg_instance_read */
360
361 /* lookup_class_callback_t for utils_vl_lookup */
362 static void *agg_lookup_class_callback ( /* {{{ */
363     __attribute__((unused)) data_set_t const *ds,
364     value_list_t const *vl, void *user_class)
365 {
366   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
367 } /* }}} void *agg_class_callback */
368
369 /* lookup_obj_callback_t for utils_vl_lookup */
370 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
371     value_list_t const *vl,
372     __attribute__((unused)) void *user_class,
373     void *user_obj)
374 {
375   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
376 } /* }}} int agg_lookup_obj_callback */
377
378 /* lookup_free_class_callback_t for utils_vl_lookup */
379 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
380 {
381   agg_destroy ((aggregation_t *) user_class);
382 } /* }}} void agg_lookup_free_class_callback */
383
384 /* lookup_free_obj_callback_t for utils_vl_lookup */
385 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
386 {
387   agg_instance_destroy ((agg_instance_t *) user_obj);
388 } /* }}} void agg_lookup_free_obj_callback */
389
390 /*
391  * <Plugin "aggregation">
392  *   <Aggregation>
393  *     Plugin "cpu"
394  *     Type "cpu"
395  *
396  *     GroupBy Host
397  *     GroupBy TypeInstance
398  *
399  *     CalculateNum true
400  *     CalculateSum true
401  *     CalculateAverage true
402  *     CalculateMinimum true
403  *     CalculateMaximum true
404  *     CalculateStddev true
405  *   </Aggregation>
406  * </Plugin>
407  */
408 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
409     aggregation_t *agg)
410 {
411   int i;
412
413   for (i = 0; i < ci->values_num; i++)
414   {
415     char const *value;
416
417     if (ci->values[i].type != OCONFIG_TYPE_STRING)
418     {
419       ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
420           "is not a string.", i + 1);
421       continue;
422     }
423
424     value = ci->values[i].value.string;
425
426     if (strcasecmp ("Host", value) == 0)
427       sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
428     else if (strcasecmp ("Plugin", value) == 0)
429       sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
430     else if (strcasecmp ("PluginInstance", value) == 0)
431       sstrncpy (agg->ident.plugin_instance, LU_ANY,
432           sizeof (agg->ident.plugin_instance));
433     else if (strcasecmp ("TypeInstance", value) == 0)
434       sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
435     else if (strcasecmp ("Type", value) == 0)
436       ERROR ("aggregation plugin: Grouping by type is not supported.");
437     else
438       WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
439           "option is invalid and will be ignored.", value);
440   } /* for (ci->values) */
441
442   return (0);
443 } /* }}} int agg_config_handle_group_by */
444
445 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
446 {
447   aggregation_t *agg;
448   _Bool is_valid;
449   int status;
450   int i;
451
452   agg = malloc (sizeof (*agg));
453   if (agg == NULL)
454   {
455     ERROR ("aggregation plugin: malloc failed.");
456     return (-1);
457   }
458   memset (agg, 0, sizeof (*agg));
459
460   sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
461   sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
462   sstrncpy (agg->ident.plugin_instance, LU_ALL,
463       sizeof (agg->ident.plugin_instance));
464   sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
465   sstrncpy (agg->ident.type_instance, LU_ALL,
466       sizeof (agg->ident.type_instance));
467
468   for (i = 0; i < ci->children_num; i++)
469   {
470     oconfig_item_t *child = ci->children + i;
471
472     if (strcasecmp ("Host", child->key) == 0)
473       cf_util_get_string_buffer (child, agg->ident.host,
474           sizeof (agg->ident.host));
475     else if (strcasecmp ("Plugin", child->key) == 0)
476       cf_util_get_string_buffer (child, agg->ident.plugin,
477           sizeof (agg->ident.plugin));
478     else if (strcasecmp ("PluginInstance", child->key) == 0)
479       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
480           sizeof (agg->ident.plugin_instance));
481     else if (strcasecmp ("Type", child->key) == 0)
482       cf_util_get_string_buffer (child, agg->ident.type,
483           sizeof (agg->ident.type));
484     else if (strcasecmp ("TypeInstance", child->key) == 0)
485       cf_util_get_string_buffer (child, agg->ident.type_instance,
486           sizeof (agg->ident.type_instance));
487     else if (strcasecmp ("GroupBy", child->key) == 0)
488       agg_config_handle_group_by (child, agg);
489     else if (strcasecmp ("CalculateNum", child->key) == 0)
490       cf_util_get_boolean (child, &agg->calc_num);
491     else if (strcasecmp ("CalculateSum", child->key) == 0)
492       cf_util_get_boolean (child, &agg->calc_sum);
493     else if (strcasecmp ("CalculateAverage", child->key) == 0)
494       cf_util_get_boolean (child, &agg->calc_average);
495     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
496       cf_util_get_boolean (child, &agg->calc_min);
497     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
498       cf_util_get_boolean (child, &agg->calc_max);
499     else if (strcasecmp ("CalculateStddev", child->key) == 0)
500       cf_util_get_boolean (child, &agg->calc_stddev);
501     else
502       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
503           "<Aggregation /> blocks and will be ignored.", child->key);
504   }
505
506   /* Sanity checking */
507   is_valid = 1;
508   if (LU_IS_ALL (agg->ident.type)) /* {{{ */
509   {
510     ERROR ("aggregation plugin: It appears you did not specify the required "
511         "\"Type\" option in this aggregation. "
512         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
513         "Type \"%s\", TypeInstance \"%s\")",
514         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
515         agg->ident.type, agg->ident.type_instance);
516     is_valid = 0;
517   }
518   else if (strchr (agg->ident.type, '/') != NULL)
519   {
520     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
521         "character. Especially, it may not be a wildcard. The current "
522         "value is \"%s\".", agg->ident.type);
523     is_valid = 0;
524   } /* }}} */
525
526   if (!LU_IS_ALL (agg->ident.host) /* {{{ */
527       && !LU_IS_ALL (agg->ident.plugin)
528       && !LU_IS_ALL (agg->ident.plugin_instance)
529       && !LU_IS_ALL (agg->ident.type_instance))
530   {
531     ERROR ("aggregation plugin: An aggregation must contain at least one "
532         "wildcard. This is achieved by leaving at least one of the \"Host\", "
533         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
534         "and not grouping by that field. "
535         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
536         "Type \"%s\", TypeInstance \"%s\")",
537         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
538         agg->ident.type, agg->ident.type_instance);
539     is_valid = 0;
540   } /* }}} */
541
542   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
543       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
544   {
545     ERROR ("aggregation plugin: No aggregation function has been specified. "
546         "Without this, I don't know what I should be calculating. "
547         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
548         "Type \"%s\", TypeInstance \"%s\")",
549         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
550         agg->ident.type, agg->ident.type_instance);
551     is_valid = 0;
552   } /* }}} */
553
554   if (!is_valid) /* {{{ */
555   {
556     sfree (agg);
557     return (-1);
558   } /* }}} */
559
560   status = lookup_add (lookup, &agg->ident, agg);
561   if (status != 0)
562   {
563     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
564     sfree (agg);
565     return (-1);
566   }
567
568   DEBUG ("aggregation plugin: Successfully added aggregation: "
569       "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
570       "Type \"%s\", TypeInstance \"%s\")",
571       agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
572       agg->ident.type, agg->ident.type_instance);
573   return (0);
574 } /* }}} int agg_config_aggregation */
575
576 static int agg_config (oconfig_item_t *ci) /* {{{ */
577 {
578   int i;
579
580   pthread_mutex_lock (&agg_instance_list_lock);
581
582   if (lookup == NULL)
583   {
584     lookup = lookup_create (agg_lookup_class_callback,
585         agg_lookup_obj_callback,
586         agg_lookup_free_class_callback,
587         agg_lookup_free_obj_callback);
588     if (lookup == NULL)
589     {
590       pthread_mutex_unlock (&agg_instance_list_lock);
591       ERROR ("aggregation plugin: lookup_create failed.");
592       return (-1);
593     }
594   }
595
596   for (i = 0; i < ci->children_num; i++)
597   {
598     oconfig_item_t *child = ci->children + i;
599
600     if (strcasecmp ("Aggregation", child->key) == 0)
601       agg_config_aggregation (child);
602     else
603       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
604           "<Plugin aggregation /> blocks and will be ignored.", child->key);
605   }
606
607   pthread_mutex_unlock (&agg_instance_list_lock);
608
609   return (0);
610 } /* }}} int agg_config */
611
612 static int agg_read (void) /* {{{ */
613 {
614   agg_instance_t *this;
615   cdtime_t t;
616   int success;
617
618   t = cdtime ();
619   success = 0;
620
621   pthread_mutex_lock (&agg_instance_list_lock);
622
623   /* agg_instance_list_head only holds data, after the "write" callback has
624    * been called with a matching value list at least once. So on startup,
625    * there's a race between the aggregations read() and write() callback. If
626    * the read() callback is called first, agg_instance_list_head is NULL and
627    * "success" may be zero. This is expected and should not result in an error.
628    * Therefore we need to handle this case separately. */
629   if (agg_instance_list_head == NULL)
630   {
631     pthread_mutex_unlock (&agg_instance_list_lock);
632     return (0);
633   }
634
635   for (this = agg_instance_list_head; this != NULL; this = this->next)
636   {
637     int status;
638
639     status = agg_instance_read (this, t);
640     if (status != 0)
641       WARNING ("aggregation plugin: Reading an aggregation instance "
642           "failed with status %i.", status);
643     else
644       success++;
645   }
646
647   pthread_mutex_unlock (&agg_instance_list_lock);
648
649   return ((success > 0) ? 0 : -1);
650 } /* }}} int agg_read */
651
652 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
653     __attribute__((unused)) user_data_t *user_data)
654 {
655   _Bool created_by_aggregation = 0;
656   int status;
657
658   /* Ignore values that were created by the aggregation plugin to avoid weird
659    * effects. */
660   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
661       &created_by_aggregation);
662   if (created_by_aggregation)
663     return (0);
664
665   if (lookup == NULL)
666     status = ENOENT;
667   else
668   {
669     status = lookup_search (lookup, ds, vl);
670     if (status > 0)
671       status = 0;
672   }
673
674   return (status);
675 } /* }}} int agg_write */
676
677 void module_register (void)
678 {
679   plugin_register_complex_config ("aggregation", agg_config);
680   plugin_register_read ("aggregation", agg_read);
681   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
682 }
683
684 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */