Merge branch 'collectd-5.5' into collectd-5.6
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "common.h"
31 #include "meta_data.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_subst.h"
34 #include "utils_vl_lookup.h"
35
36 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
37 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
38
39 struct aggregation_s /* {{{ */
40 {
41   identifier_t ident;
42   unsigned int group_by;
43
44   unsigned int regex_fields;
45
46   char *set_host;
47   char *set_plugin;
48   char *set_plugin_instance;
49   char *set_type_instance;
50
51   _Bool calc_num;
52   _Bool calc_sum;
53   _Bool calc_average;
54   _Bool calc_min;
55   _Bool calc_max;
56   _Bool calc_stddev;
57 }; /* }}} */
58 typedef struct aggregation_s aggregation_t;
59
60 struct agg_instance_s;
61 typedef struct agg_instance_s agg_instance_t;
62 struct agg_instance_s /* {{{ */
63 {
64   pthread_mutex_t lock;
65   identifier_t ident;
66
67   int ds_type;
68
69   derive_t num;
70   gauge_t sum;
71   gauge_t squares_sum;
72
73   gauge_t min;
74   gauge_t max;
75
76   rate_to_value_state_t *state_num;
77   rate_to_value_state_t *state_sum;
78   rate_to_value_state_t *state_average;
79   rate_to_value_state_t *state_min;
80   rate_to_value_state_t *state_max;
81   rate_to_value_state_t *state_stddev;
82
83   agg_instance_t *next;
84 }; /* }}} */
85
86 static lookup_t *lookup = NULL;
87
88 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
89 static agg_instance_t *agg_instance_list_head = NULL;
90
91 static _Bool agg_is_regex (char const *str) /* {{{ */
92 {
93   size_t len;
94
95   if (str == NULL)
96     return (0);
97
98   len = strlen (str);
99   if (len < 3)
100     return (0);
101
102   if ((str[0] == '/') && (str[len - 1] == '/'))
103     return (1);
104   else
105     return (0);
106 } /* }}} _Bool agg_is_regex */
107
108 static void agg_destroy (aggregation_t *agg) /* {{{ */
109 {
110   sfree (agg);
111 } /* }}} void agg_destroy */
112
113 /* Frees all dynamically allocated memory within the instance. */
114 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
115 {
116   if (inst == NULL)
117     return;
118
119   /* Remove this instance from the global list of instances. */
120   pthread_mutex_lock (&agg_instance_list_lock);
121   if (agg_instance_list_head == inst)
122     agg_instance_list_head = inst->next;
123   else if (agg_instance_list_head != NULL)
124   {
125     agg_instance_t *prev = agg_instance_list_head;
126     while ((prev != NULL) && (prev->next != inst))
127       prev = prev->next;
128     if (prev != NULL)
129       prev->next = inst->next;
130   }
131   pthread_mutex_unlock (&agg_instance_list_lock);
132
133   sfree (inst->state_num);
134   sfree (inst->state_sum);
135   sfree (inst->state_average);
136   sfree (inst->state_min);
137   sfree (inst->state_max);
138   sfree (inst->state_stddev);
139
140   memset (inst, 0, sizeof (*inst));
141   inst->ds_type = -1;
142   inst->min = NAN;
143   inst->max = NAN;
144 } /* }}} void agg_instance_destroy */
145
146 static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
147     value_list_t const *vl, aggregation_t const *agg)
148 {
149 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
150   if (agg->set_ ## field != NULL) \
151     sstrncpy (buffer, agg->set_ ## field, buffer_size); \
152   else if ((agg->regex_fields & group_mask) \
153       && (agg->group_by & group_mask)) \
154     sstrncpy (buffer, vl->field, buffer_size); \
155   else if ((agg->regex_fields & group_mask) \
156       && (AGG_MATCHES_ALL (agg->ident.field))) \
157     sstrncpy (buffer, all_value, buffer_size); \
158   else \
159     sstrncpy (buffer, agg->ident.field, buffer_size); \
160 } while (0)
161
162   /* Host */
163   COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
164       host, LU_GROUP_BY_HOST, "global");
165
166   /* Plugin */
167   if (agg->set_plugin != NULL)
168     sstrncpy (inst->ident.plugin, agg->set_plugin,
169         sizeof (inst->ident.plugin));
170   else
171     sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
172
173   /* Plugin instance */
174   if (agg->set_plugin_instance != NULL)
175     sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
176         sizeof (inst->ident.plugin_instance));
177   else
178   {
179     char tmp_plugin[DATA_MAX_NAME_LEN];
180     char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
181
182     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
183         && (agg->group_by & LU_GROUP_BY_PLUGIN))
184       sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
185     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
186         && (AGG_MATCHES_ALL (agg->ident.plugin)))
187       sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
188     else
189       sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
190
191     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
192         && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
193       sstrncpy (tmp_plugin_instance, vl->plugin_instance,
194           sizeof (tmp_plugin_instance));
195     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
196         && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
197       sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
198     else
199       sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
200           sizeof (tmp_plugin_instance));
201
202     if ((strcmp ("", tmp_plugin) == 0)
203         && (strcmp ("", tmp_plugin_instance) == 0))
204       sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
205           sizeof (inst->ident.plugin_instance));
206     else if (strcmp ("", tmp_plugin) != 0)
207       ssnprintf (inst->ident.plugin_instance,
208           sizeof (inst->ident.plugin_instance),
209           "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
210     else if (strcmp ("", tmp_plugin_instance) != 0)
211       ssnprintf (inst->ident.plugin_instance,
212           sizeof (inst->ident.plugin_instance),
213           "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
214     else
215       ssnprintf (inst->ident.plugin_instance,
216           sizeof (inst->ident.plugin_instance),
217           "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
218   }
219
220   /* Type */
221   sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
222
223   /* Type instance */
224   COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
225       type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
226
227 #undef COPY_FIELD
228
229   return (0);
230 } /* }}} int agg_instance_create_name */
231
232 /* Create a new aggregation instance. */
233 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
234     value_list_t const *vl, aggregation_t *agg)
235 {
236   agg_instance_t *inst;
237
238   DEBUG ("aggregation plugin: Creating new instance.");
239
240   inst = calloc (1, sizeof (*inst));
241   if (inst == NULL)
242   {
243     ERROR ("aggregation plugin: calloc() failed.");
244     return (NULL);
245   }
246   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
247
248   inst->ds_type = ds->ds[0].type;
249
250   agg_instance_create_name (inst, vl, agg);
251
252   inst->min = NAN;
253   inst->max = NAN;
254
255 #define INIT_STATE(field) do { \
256   inst->state_ ## field = NULL; \
257   if (agg->calc_ ## field) { \
258     inst->state_ ## field = calloc (1, sizeof (*inst->state_ ## field)); \
259     if (inst->state_ ## field == NULL) { \
260       agg_instance_destroy (inst); \
261       free (inst); \
262       ERROR ("aggregation plugin: calloc() failed."); \
263       return (NULL); \
264     } \
265   } \
266 } while (0)
267
268   INIT_STATE (num);
269   INIT_STATE (sum);
270   INIT_STATE (average);
271   INIT_STATE (min);
272   INIT_STATE (max);
273   INIT_STATE (stddev);
274
275 #undef INIT_STATE
276
277   pthread_mutex_lock (&agg_instance_list_lock);
278   inst->next = agg_instance_list_head;
279   agg_instance_list_head = inst;
280   pthread_mutex_unlock (&agg_instance_list_lock);
281
282   return (inst);
283 } /* }}} agg_instance_t *agg_instance_create */
284
285 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
286  * the rate of the value list is available. Value lists with more than one data
287  * source are not supported and will return an error. Returns zero on success
288  * and non-zero otherwise. */
289 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
290     data_set_t const *ds, value_list_t const *vl)
291 {
292   gauge_t *rate;
293
294   if (ds->ds_num != 1)
295   {
296     ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
297         "data source. This is currently not supported by this plugin. "
298         "Sorry.", ds->type);
299     return (EINVAL);
300   }
301
302   rate = uc_get_rate (ds, vl);
303   if (rate == NULL)
304   {
305     char ident[6 * DATA_MAX_NAME_LEN];
306     FORMAT_VL (ident, sizeof (ident), vl);
307     ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
308         ident);
309     return (ENOENT);
310   }
311
312   if (isnan (rate[0]))
313   {
314     sfree (rate);
315     return (0);
316   }
317
318   pthread_mutex_lock (&inst->lock);
319
320   inst->num++;
321   inst->sum += rate[0];
322   inst->squares_sum += (rate[0] * rate[0]);
323
324   if (isnan (inst->min) || (inst->min > rate[0]))
325     inst->min = rate[0];
326   if (isnan (inst->max) || (inst->max < rate[0]))
327     inst->max = rate[0];
328
329   pthread_mutex_unlock (&inst->lock);
330
331   sfree (rate);
332   return (0);
333 } /* }}} int agg_instance_update */
334
335 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
336   char const *func, gauge_t rate, rate_to_value_state_t *state,
337   value_list_t *vl, char const *pi_prefix, cdtime_t t)
338 {
339   value_t v;
340   int status;
341
342   if (pi_prefix[0] != 0)
343     subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
344         pi_prefix, AGG_FUNC_PLACEHOLDER, func);
345   else
346     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
347
348   status = rate_to_value (&v, rate, state, inst->ds_type, t);
349   if (status != 0)
350   {
351     /* If this is the first iteration and rate_to_value() was asked to return a
352      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
353      * gracefully. */
354     if (status == EAGAIN)
355       return (0);
356
357     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
358         status);
359     return (-1);
360   }
361
362   vl->values = &v;
363   vl->values_len = 1;
364
365   plugin_dispatch_values (vl);
366
367   vl->values = NULL;
368   vl->values_len = 0;
369
370   return (0);
371 } /* }}} int agg_instance_read_func */
372
373 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
374 {
375   value_list_t vl = VALUE_LIST_INIT;
376
377   /* Pre-set all the fields in the value list that will not change per
378    * aggregation type (sum, average, ...). The struct will be re-used and must
379    * therefore be dispatched using the "secure" function. */
380
381   vl.time = t;
382   vl.interval = 0;
383
384   vl.meta = meta_data_create ();
385   if (vl.meta == NULL)
386   {
387     ERROR ("aggregation plugin: meta_data_create failed.");
388     return (-1);
389   }
390   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
391
392   sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
393   sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
394   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
395   sstrncpy (vl.type_instance, inst->ident.type_instance,
396       sizeof (vl.type_instance));
397
398 #define READ_FUNC(func, rate) do { \
399   if (inst->state_ ## func != NULL) { \
400     agg_instance_read_func (inst, #func, rate, \
401         inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
402   } \
403 } while (0)
404
405   pthread_mutex_lock (&inst->lock);
406
407   READ_FUNC (num, (gauge_t) inst->num);
408
409   /* All other aggregations are only defined when there have been any values
410    * at all. */
411   if (inst->num > 0)
412   {
413     READ_FUNC (sum, inst->sum);
414     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
415     READ_FUNC (min, inst->min);
416     READ_FUNC (max, inst->max);
417     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
418           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
419   }
420
421   /* Reset internal state. */
422   inst->num = 0;
423   inst->sum = 0.0;
424   inst->squares_sum = 0.0;
425   inst->min = NAN;
426   inst->max = NAN;
427
428   pthread_mutex_unlock (&inst->lock);
429
430   meta_data_destroy (vl.meta);
431   vl.meta = NULL;
432
433   return (0);
434 } /* }}} int agg_instance_read */
435
436 /* lookup_class_callback_t for utils_vl_lookup */
437 static void *agg_lookup_class_callback ( /* {{{ */
438     data_set_t const *ds, value_list_t const *vl, void *user_class)
439 {
440   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
441 } /* }}} void *agg_class_callback */
442
443 /* lookup_obj_callback_t for utils_vl_lookup */
444 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
445     value_list_t const *vl,
446     __attribute__((unused)) void *user_class,
447     void *user_obj)
448 {
449   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
450 } /* }}} int agg_lookup_obj_callback */
451
452 /* lookup_free_class_callback_t for utils_vl_lookup */
453 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
454 {
455   agg_destroy ((aggregation_t *) user_class);
456 } /* }}} void agg_lookup_free_class_callback */
457
458 /* lookup_free_obj_callback_t for utils_vl_lookup */
459 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
460 {
461   agg_instance_destroy ((agg_instance_t *) user_obj);
462 } /* }}} void agg_lookup_free_obj_callback */
463
464 /*
465  * <Plugin "aggregation">
466  *   <Aggregation>
467  *     Plugin "cpu"
468  *     Type "cpu"
469  *
470  *     GroupBy Host
471  *     GroupBy TypeInstance
472  *
473  *     CalculateNum true
474  *     CalculateSum true
475  *     CalculateAverage true
476  *     CalculateMinimum true
477  *     CalculateMaximum true
478  *     CalculateStddev true
479  *   </Aggregation>
480  * </Plugin>
481  */
482 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
483     aggregation_t *agg)
484 {
485   for (int i = 0; i < ci->values_num; i++)
486   {
487     char const *value;
488
489     if (ci->values[i].type != OCONFIG_TYPE_STRING)
490     {
491       ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
492           "is not a string.", i + 1);
493       continue;
494     }
495
496     value = ci->values[i].value.string;
497
498     if (strcasecmp ("Host", value) == 0)
499       agg->group_by |= LU_GROUP_BY_HOST;
500     else if (strcasecmp ("Plugin", value) == 0)
501       agg->group_by |= LU_GROUP_BY_PLUGIN;
502     else if (strcasecmp ("PluginInstance", value) == 0)
503       agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
504     else if (strcasecmp ("TypeInstance", value) == 0)
505       agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
506     else if (strcasecmp ("Type", value) == 0)
507       ERROR ("aggregation plugin: Grouping by type is not supported.");
508     else
509       WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
510           "option is invalid and will be ignored.", value);
511   } /* for (ci->values) */
512
513   return (0);
514 } /* }}} int agg_config_handle_group_by */
515
516 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
517 {
518   aggregation_t *agg;
519   _Bool is_valid;
520   int status;
521
522   agg = calloc (1, sizeof (*agg));
523   if (agg == NULL)
524   {
525     ERROR ("aggregation plugin: calloc failed.");
526     return (-1);
527   }
528
529   sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
530   sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
531   sstrncpy (agg->ident.plugin_instance, "/.*/",
532       sizeof (agg->ident.plugin_instance));
533   sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
534   sstrncpy (agg->ident.type_instance, "/.*/",
535       sizeof (agg->ident.type_instance));
536
537   for (int i = 0; i < ci->children_num; i++)
538   {
539     oconfig_item_t *child = ci->children + i;
540
541     if (strcasecmp ("Host", child->key) == 0)
542       cf_util_get_string_buffer (child, agg->ident.host,
543           sizeof (agg->ident.host));
544     else if (strcasecmp ("Plugin", child->key) == 0)
545       cf_util_get_string_buffer (child, agg->ident.plugin,
546           sizeof (agg->ident.plugin));
547     else if (strcasecmp ("PluginInstance", child->key) == 0)
548       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
549           sizeof (agg->ident.plugin_instance));
550     else if (strcasecmp ("Type", child->key) == 0)
551       cf_util_get_string_buffer (child, agg->ident.type,
552           sizeof (agg->ident.type));
553     else if (strcasecmp ("TypeInstance", child->key) == 0)
554       cf_util_get_string_buffer (child, agg->ident.type_instance,
555           sizeof (agg->ident.type_instance));
556     else if (strcasecmp ("SetHost", child->key) == 0)
557       cf_util_get_string (child, &agg->set_host);
558     else if (strcasecmp ("SetPlugin", child->key) == 0)
559       cf_util_get_string (child, &agg->set_plugin);
560     else if (strcasecmp ("SetPluginInstance", child->key) == 0)
561       cf_util_get_string (child, &agg->set_plugin_instance);
562     else if (strcasecmp ("SetTypeInstance", child->key) == 0)
563       cf_util_get_string (child, &agg->set_type_instance);
564     else if (strcasecmp ("GroupBy", child->key) == 0)
565       agg_config_handle_group_by (child, agg);
566     else if (strcasecmp ("CalculateNum", child->key) == 0)
567       cf_util_get_boolean (child, &agg->calc_num);
568     else if (strcasecmp ("CalculateSum", child->key) == 0)
569       cf_util_get_boolean (child, &agg->calc_sum);
570     else if (strcasecmp ("CalculateAverage", child->key) == 0)
571       cf_util_get_boolean (child, &agg->calc_average);
572     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
573       cf_util_get_boolean (child, &agg->calc_min);
574     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
575       cf_util_get_boolean (child, &agg->calc_max);
576     else if (strcasecmp ("CalculateStddev", child->key) == 0)
577       cf_util_get_boolean (child, &agg->calc_stddev);
578     else
579       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
580           "<Aggregation /> blocks and will be ignored.", child->key);
581   }
582
583   if (agg_is_regex (agg->ident.host))
584     agg->regex_fields |= LU_GROUP_BY_HOST;
585   if (agg_is_regex (agg->ident.plugin))
586     agg->regex_fields |= LU_GROUP_BY_PLUGIN;
587   if (agg_is_regex (agg->ident.plugin_instance))
588     agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
589   if (agg_is_regex (agg->ident.type_instance))
590     agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
591
592   /* Sanity checking */
593   is_valid = 1;
594   if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
595   {
596     ERROR ("aggregation plugin: It appears you did not specify the required "
597         "\"Type\" option in this aggregation. "
598         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
599         "Type \"%s\", TypeInstance \"%s\")",
600         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
601         agg->ident.type, agg->ident.type_instance);
602     is_valid = 0;
603   }
604   else if (strchr (agg->ident.type, '/') != NULL)
605   {
606     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
607         "character. Especially, it may not be a regex. The current "
608         "value is \"%s\".", agg->ident.type);
609     is_valid = 0;
610   } /* }}} */
611
612   /* Check that there is at least one regex field without a grouping. {{{ */
613   if ((agg->regex_fields & ~agg->group_by) == 0)
614   {
615     ERROR ("aggregation plugin: An aggregation must contain at least one "
616         "wildcard. This is achieved by leaving at least one of the \"Host\", "
617         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
618         "or using a regular expression and not grouping by that field. "
619         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
620         "Type \"%s\", TypeInstance \"%s\")",
621         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
622         agg->ident.type, agg->ident.type_instance);
623     is_valid = 0;
624   } /* }}} */
625
626   /* Check that all grouping fields are regular expressions. {{{ */
627   if (agg->group_by & ~agg->regex_fields)
628   {
629     ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
630         "regular expression is configured or which are left blank) can be "
631         "specified in the \"GroupBy\" option. "
632         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
633         "Type \"%s\", TypeInstance \"%s\")",
634         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
635         agg->ident.type, agg->ident.type_instance);
636     is_valid = 0;
637   } /* }}} */
638
639   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
640       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
641   {
642     ERROR ("aggregation plugin: No aggregation function has been specified. "
643         "Without this, I don't know what I should be calculating. "
644         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
645         "Type \"%s\", TypeInstance \"%s\")",
646         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
647         agg->ident.type, agg->ident.type_instance);
648     is_valid = 0;
649   } /* }}} */
650
651   if (!is_valid) /* {{{ */
652   {
653     sfree (agg);
654     return (-1);
655   } /* }}} */
656
657   status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
658   if (status != 0)
659   {
660     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
661     sfree (agg);
662     return (-1);
663   }
664
665   DEBUG ("aggregation plugin: Successfully added aggregation: "
666       "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
667       "Type \"%s\", TypeInstance \"%s\")",
668       agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
669       agg->ident.type, agg->ident.type_instance);
670   return (0);
671 } /* }}} int agg_config_aggregation */
672
673 static int agg_config (oconfig_item_t *ci) /* {{{ */
674 {
675   pthread_mutex_lock (&agg_instance_list_lock);
676
677   if (lookup == NULL)
678   {
679     lookup = lookup_create (agg_lookup_class_callback,
680         agg_lookup_obj_callback,
681         agg_lookup_free_class_callback,
682         agg_lookup_free_obj_callback);
683     if (lookup == NULL)
684     {
685       pthread_mutex_unlock (&agg_instance_list_lock);
686       ERROR ("aggregation plugin: lookup_create failed.");
687       return (-1);
688     }
689   }
690
691   for (int i = 0; i < ci->children_num; i++)
692   {
693     oconfig_item_t *child = ci->children + i;
694
695     if (strcasecmp ("Aggregation", child->key) == 0)
696       agg_config_aggregation (child);
697     else
698       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
699           "<Plugin aggregation /> blocks and will be ignored.", child->key);
700   }
701
702   pthread_mutex_unlock (&agg_instance_list_lock);
703
704   return (0);
705 } /* }}} int agg_config */
706
707 static int agg_read (void) /* {{{ */
708 {
709   cdtime_t t;
710   int success;
711
712   t = cdtime ();
713   success = 0;
714
715   pthread_mutex_lock (&agg_instance_list_lock);
716
717   /* agg_instance_list_head only holds data, after the "write" callback has
718    * been called with a matching value list at least once. So on startup,
719    * there's a race between the aggregations read() and write() callback. If
720    * the read() callback is called first, agg_instance_list_head is NULL and
721    * "success" may be zero. This is expected and should not result in an error.
722    * Therefore we need to handle this case separately. */
723   if (agg_instance_list_head == NULL)
724   {
725     pthread_mutex_unlock (&agg_instance_list_lock);
726     return (0);
727   }
728
729   for (agg_instance_t *this = agg_instance_list_head; this != NULL; this = this->next)
730   {
731     int status;
732
733     status = agg_instance_read (this, t);
734     if (status != 0)
735       WARNING ("aggregation plugin: Reading an aggregation instance "
736           "failed with status %i.", status);
737     else
738       success++;
739   }
740
741   pthread_mutex_unlock (&agg_instance_list_lock);
742
743   return ((success > 0) ? 0 : -1);
744 } /* }}} int agg_read */
745
746 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
747     __attribute__((unused)) user_data_t *user_data)
748 {
749   _Bool created_by_aggregation = 0;
750   int status;
751
752   /* Ignore values that were created by the aggregation plugin to avoid weird
753    * effects. */
754   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
755       &created_by_aggregation);
756   if (created_by_aggregation)
757     return (0);
758
759   if (lookup == NULL)
760     status = ENOENT;
761   else
762   {
763     status = lookup_search (lookup, ds, vl);
764     if (status > 0)
765       status = 0;
766   }
767
768   return (status);
769 } /* }}} int agg_write */
770
771 void module_register (void)
772 {
773   plugin_register_complex_config ("aggregation", agg_config);
774   plugin_register_read ("aggregation", agg_read);
775   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
776 }
777
778 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */