7a214ca3390b6c60ba561bf4d591d9c9cf00f3bb
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/common/common.h"
31 #include "utils/lookup/vl_lookup.h"
32 #include "utils/metadata/meta_data.h"
33 #include "utils_cache.h" /* for uc_get_rate() */
34 #include "utils_subst.h"
35
36 #define AGG_MATCHES_ALL(str) (strcmp("/.*/", str) == 0)
37 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
38
39 struct aggregation_s /* {{{ */
40 {
41   lookup_identifier_t ident;
42   unsigned int group_by;
43
44   unsigned int regex_fields;
45
46   char *set_host;
47   char *set_plugin;
48   char *set_plugin_instance;
49   char *set_type_instance;
50
51   bool calc_num;
52   bool calc_sum;
53   bool calc_average;
54   bool calc_min;
55   bool calc_max;
56   bool calc_stddev;
57 }; /* }}} */
58 typedef struct aggregation_s aggregation_t;
59
60 struct agg_instance_s;
61 typedef struct agg_instance_s agg_instance_t;
62 struct agg_instance_s /* {{{ */
63 {
64   pthread_mutex_t lock;
65   lookup_identifier_t ident;
66
67   int ds_type;
68
69   derive_t num;
70   gauge_t sum;
71   gauge_t squares_sum;
72
73   gauge_t min;
74   gauge_t max;
75
76   rate_to_value_state_t *state_num;
77   rate_to_value_state_t *state_sum;
78   rate_to_value_state_t *state_average;
79   rate_to_value_state_t *state_min;
80   rate_to_value_state_t *state_max;
81   rate_to_value_state_t *state_stddev;
82
83   agg_instance_t *next;
84 }; /* }}} */
85
86 static lookup_t *lookup;
87
88 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
89 static agg_instance_t *agg_instance_list_head;
90
91 static bool agg_is_regex(char const *str) /* {{{ */
92 {
93   if (str == NULL)
94     return false;
95
96   size_t len = strlen(str);
97   if (len < 3)
98     return false;
99
100   if ((str[0] == '/') && (str[len - 1] == '/'))
101     return true;
102   else
103     return false;
104 } /* }}} bool agg_is_regex */
105
106 static void agg_destroy(aggregation_t *agg) /* {{{ */
107 {
108   sfree(agg);
109 } /* }}} void agg_destroy */
110
111 /* Frees all dynamically allocated memory within the instance. */
112 static void agg_instance_destroy(agg_instance_t *inst) /* {{{ */
113 {
114   if (inst == NULL)
115     return;
116
117   /* Remove this instance from the global list of instances. */
118   pthread_mutex_lock(&agg_instance_list_lock);
119   if (agg_instance_list_head == inst)
120     agg_instance_list_head = inst->next;
121   else if (agg_instance_list_head != NULL) {
122     agg_instance_t *prev = agg_instance_list_head;
123     while ((prev != NULL) && (prev->next != inst))
124       prev = prev->next;
125     if (prev != NULL)
126       prev->next = inst->next;
127   }
128   pthread_mutex_unlock(&agg_instance_list_lock);
129
130   sfree(inst->state_num);
131   sfree(inst->state_sum);
132   sfree(inst->state_average);
133   sfree(inst->state_min);
134   sfree(inst->state_max);
135   sfree(inst->state_stddev);
136
137   memset(inst, 0, sizeof(*inst));
138   inst->ds_type = -1;
139   inst->min = NAN;
140   inst->max = NAN;
141 } /* }}} void agg_instance_destroy */
142
143 static int agg_instance_create_name(agg_instance_t *inst, /* {{{ */
144                                     value_list_t const *vl,
145                                     aggregation_t const *agg) {
146 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value)          \
147   do {                                                                         \
148     if (agg->set_##field != NULL)                                              \
149       sstrncpy(buffer, agg->set_##field, buffer_size);                         \
150     else if ((agg->regex_fields & group_mask) && (agg->group_by & group_mask)) \
151       sstrncpy(buffer, vl->field, buffer_size);                                \
152     else if ((agg->regex_fields & group_mask) &&                               \
153              (AGG_MATCHES_ALL(agg->ident.field)))                              \
154       sstrncpy(buffer, all_value, buffer_size);                                \
155     else                                                                       \
156       sstrncpy(buffer, agg->ident.field, buffer_size);                         \
157   } while (0)
158
159   /* Host */
160   COPY_FIELD(inst->ident.host, sizeof(inst->ident.host), host, LU_GROUP_BY_HOST,
161              "global");
162
163   /* Plugin */
164   if (agg->set_plugin != NULL)
165     sstrncpy(inst->ident.plugin, agg->set_plugin, sizeof(inst->ident.plugin));
166   else
167     sstrncpy(inst->ident.plugin, "aggregation", sizeof(inst->ident.plugin));
168
169   /* Plugin instance */
170   if (agg->set_plugin_instance != NULL)
171     sstrncpy(inst->ident.plugin_instance, agg->set_plugin_instance,
172              sizeof(inst->ident.plugin_instance));
173   else {
174     char tmp_plugin[DATA_MAX_NAME_LEN];
175     char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
176
177     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
178         (agg->group_by & LU_GROUP_BY_PLUGIN))
179       sstrncpy(tmp_plugin, vl->plugin, sizeof(tmp_plugin));
180     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
181              (AGG_MATCHES_ALL(agg->ident.plugin)))
182       sstrncpy(tmp_plugin, "", sizeof(tmp_plugin));
183     else
184       sstrncpy(tmp_plugin, agg->ident.plugin, sizeof(tmp_plugin));
185
186     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
187         (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
188       sstrncpy(tmp_plugin_instance, vl->plugin_instance,
189                sizeof(tmp_plugin_instance));
190     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
191              (AGG_MATCHES_ALL(agg->ident.plugin_instance)))
192       sstrncpy(tmp_plugin_instance, "", sizeof(tmp_plugin_instance));
193     else
194       sstrncpy(tmp_plugin_instance, agg->ident.plugin_instance,
195                sizeof(tmp_plugin_instance));
196
197     if ((strcmp("", tmp_plugin) == 0) && (strcmp("", tmp_plugin_instance) == 0))
198       sstrncpy(inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
199                sizeof(inst->ident.plugin_instance));
200     else if (strcmp("", tmp_plugin) != 0)
201       ssnprintf(inst->ident.plugin_instance,
202                 sizeof(inst->ident.plugin_instance), "%s-%s", tmp_plugin,
203                 AGG_FUNC_PLACEHOLDER);
204     else if (strcmp("", tmp_plugin_instance) != 0)
205       ssnprintf(inst->ident.plugin_instance,
206                 sizeof(inst->ident.plugin_instance), "%s-%s",
207                 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
208     else
209       ssnprintf(inst->ident.plugin_instance,
210                 sizeof(inst->ident.plugin_instance), "%s-%s-%s", tmp_plugin,
211                 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
212   }
213
214   /* Type */
215   sstrncpy(inst->ident.type, agg->ident.type, sizeof(inst->ident.type));
216
217   /* Type instance */
218   COPY_FIELD(inst->ident.type_instance, sizeof(inst->ident.type_instance),
219              type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
220
221 #undef COPY_FIELD
222
223   return 0;
224 } /* }}} int agg_instance_create_name */
225
226 /* Create a new aggregation instance. */
227 static agg_instance_t *agg_instance_create(data_set_t const *ds, /* {{{ */
228                                            value_list_t const *vl,
229                                            aggregation_t *agg) {
230   DEBUG("aggregation plugin: Creating new instance.");
231
232   agg_instance_t *inst = calloc(1, sizeof(*inst));
233   if (inst == NULL) {
234     ERROR("aggregation plugin: calloc() failed.");
235     return NULL;
236   }
237   pthread_mutex_init(&inst->lock, /* attr = */ NULL);
238
239   inst->ds_type = ds->ds[0].type;
240
241   agg_instance_create_name(inst, vl, agg);
242
243   inst->min = NAN;
244   inst->max = NAN;
245
246 #define INIT_STATE(field)                                                      \
247   do {                                                                         \
248     inst->state_##field = NULL;                                                \
249     if (agg->calc_##field) {                                                   \
250       inst->state_##field = calloc(1, sizeof(*inst->state_##field));           \
251       if (inst->state_##field == NULL) {                                       \
252         agg_instance_destroy(inst);                                            \
253         free(inst);                                                            \
254         ERROR("aggregation plugin: calloc() failed.");                         \
255         return NULL;                                                           \
256       }                                                                        \
257     }                                                                          \
258   } while (0)
259
260   INIT_STATE(num);
261   INIT_STATE(sum);
262   INIT_STATE(average);
263   INIT_STATE(min);
264   INIT_STATE(max);
265   INIT_STATE(stddev);
266
267 #undef INIT_STATE
268
269   pthread_mutex_lock(&agg_instance_list_lock);
270   inst->next = agg_instance_list_head;
271   agg_instance_list_head = inst;
272   pthread_mutex_unlock(&agg_instance_list_lock);
273
274   return inst;
275 } /* }}} agg_instance_t *agg_instance_create */
276
277 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
278  * the rate of the value list is available. Value lists with more than one data
279  * source are not supported and will return an error. Returns zero on success
280  * and non-zero otherwise. */
281 static int agg_instance_update(agg_instance_t *inst, /* {{{ */
282                                data_set_t const *ds, value_list_t const *vl) {
283   if (ds->ds_num != 1) {
284     ERROR("aggregation plugin: The \"%s\" type (data set) has more than one "
285           "data source. This is currently not supported by this plugin. "
286           "Sorry.",
287           ds->type);
288     return EINVAL;
289   }
290
291   gauge_t *rate = uc_get_rate(ds, vl);
292   if (rate == NULL) {
293     char ident[6 * DATA_MAX_NAME_LEN];
294     FORMAT_VL(ident, sizeof(ident), vl);
295     ERROR("aggregation plugin: Unable to read the current rate of \"%s\".",
296           ident);
297     return ENOENT;
298   }
299
300   if (isnan(rate[0])) {
301     sfree(rate);
302     return 0;
303   }
304
305   pthread_mutex_lock(&inst->lock);
306
307   inst->num++;
308   inst->sum += rate[0];
309   inst->squares_sum += (rate[0] * rate[0]);
310
311   if (isnan(inst->min) || (inst->min > rate[0]))
312     inst->min = rate[0];
313   if (isnan(inst->max) || (inst->max < rate[0]))
314     inst->max = rate[0];
315
316   pthread_mutex_unlock(&inst->lock);
317
318   sfree(rate);
319   return 0;
320 } /* }}} int agg_instance_update */
321
322 static int agg_instance_read_func(agg_instance_t *inst, /* {{{ */
323                                   char const *func, gauge_t rate,
324                                   rate_to_value_state_t *state,
325                                   value_list_t *vl, char const *pi_prefix,
326                                   cdtime_t t) {
327   if (pi_prefix[0] != 0)
328     subst_string(vl->plugin_instance, sizeof(vl->plugin_instance), pi_prefix,
329                  AGG_FUNC_PLACEHOLDER, func);
330   else
331     sstrncpy(vl->plugin_instance, func, sizeof(vl->plugin_instance));
332
333   value_t v;
334
335   int status = rate_to_value(&v, rate, state, inst->ds_type, t);
336   if (status != 0) {
337     /* If this is the first iteration and rate_to_value() was asked to return a
338      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
339      * gracefully. */
340     if (status == EAGAIN)
341       return 0;
342
343     WARNING("aggregation plugin: rate_to_value failed with status %i.", status);
344     return -1;
345   }
346
347   vl->values = &v;
348   vl->values_len = 1;
349
350   plugin_dispatch_values(vl);
351
352   vl->values = NULL;
353   vl->values_len = 0;
354
355   return 0;
356 } /* }}} int agg_instance_read_func */
357
358 static int agg_instance_read(agg_instance_t *inst, cdtime_t t) /* {{{ */
359 {
360   value_list_t vl = VALUE_LIST_INIT;
361
362   /* Pre-set all the fields in the value list that will not change per
363    * aggregation type (sum, average, ...). The struct will be re-used and must
364    * therefore be dispatched using the "secure" function. */
365
366   vl.time = t;
367   vl.interval = 0;
368
369   vl.meta = meta_data_create();
370   if (vl.meta == NULL) {
371     ERROR("aggregation plugin: meta_data_create failed.");
372     return -1;
373   }
374   meta_data_add_boolean(vl.meta, "aggregation:created", 1);
375
376   sstrncpy(vl.host, inst->ident.host, sizeof(vl.host));
377   sstrncpy(vl.plugin, inst->ident.plugin, sizeof(vl.plugin));
378   sstrncpy(vl.type, inst->ident.type, sizeof(vl.type));
379   sstrncpy(vl.type_instance, inst->ident.type_instance,
380            sizeof(vl.type_instance));
381
382 #define READ_FUNC(func, rate)                                                  \
383   do {                                                                         \
384     if (inst->state_##func != NULL) {                                          \
385       agg_instance_read_func(inst, #func, rate, inst->state_##func, &vl,       \
386                              inst->ident.plugin_instance, t);                  \
387     }                                                                          \
388   } while (0)
389
390   pthread_mutex_lock(&inst->lock);
391
392   READ_FUNC(num, (gauge_t)inst->num);
393
394   /* All other aggregations are only defined when there have been any values
395    * at all. */
396   if (inst->num > 0) {
397     READ_FUNC(sum, inst->sum);
398     READ_FUNC(average, (inst->sum / ((gauge_t)inst->num)));
399     READ_FUNC(min, inst->min);
400     READ_FUNC(max, inst->max);
401     READ_FUNC(stddev,
402               sqrt((((gauge_t)inst->num) * inst->squares_sum) -
403                    (inst->sum * inst->sum)) /
404                   ((gauge_t)inst->num));
405   }
406
407   /* Reset internal state. */
408   inst->num = 0;
409   inst->sum = 0.0;
410   inst->squares_sum = 0.0;
411   inst->min = NAN;
412   inst->max = NAN;
413
414   pthread_mutex_unlock(&inst->lock);
415
416   meta_data_destroy(vl.meta);
417   vl.meta = NULL;
418
419   return 0;
420 } /* }}} int agg_instance_read */
421
422 /* lookup_class_callback_t for utils_vl_lookup */
423 static void *agg_lookup_class_callback(/* {{{ */
424                                        data_set_t const *ds,
425                                        value_list_t const *vl,
426                                        void *user_class) {
427   return agg_instance_create(ds, vl, (aggregation_t *)user_class);
428 } /* }}} void *agg_class_callback */
429
430 /* lookup_obj_callback_t for utils_vl_lookup */
431 static int agg_lookup_obj_callback(data_set_t const *ds, /* {{{ */
432                                    value_list_t const *vl,
433                                    __attribute__((unused)) void *user_class,
434                                    void *user_obj) {
435   return agg_instance_update((agg_instance_t *)user_obj, ds, vl);
436 } /* }}} int agg_lookup_obj_callback */
437
438 /* lookup_free_class_callback_t for utils_vl_lookup */
439 static void agg_lookup_free_class_callback(void *user_class) /* {{{ */
440 {
441   agg_destroy((aggregation_t *)user_class);
442 } /* }}} void agg_lookup_free_class_callback */
443
444 /* lookup_free_obj_callback_t for utils_vl_lookup */
445 static void agg_lookup_free_obj_callback(void *user_obj) /* {{{ */
446 {
447   agg_instance_destroy((agg_instance_t *)user_obj);
448 } /* }}} void agg_lookup_free_obj_callback */
449
450 /*
451  * <Plugin "aggregation">
452  *   <Aggregation>
453  *     Plugin "cpu"
454  *     Type "cpu"
455  *
456  *     GroupBy Host
457  *     GroupBy TypeInstance
458  *
459  *     CalculateNum true
460  *     CalculateSum true
461  *     CalculateAverage true
462  *     CalculateMinimum true
463  *     CalculateMaximum true
464  *     CalculateStddev true
465  *   </Aggregation>
466  * </Plugin>
467  */
468 static int agg_config_handle_group_by(oconfig_item_t const *ci, /* {{{ */
469                                       aggregation_t *agg) {
470   for (int i = 0; i < ci->values_num; i++) {
471     if (ci->values[i].type != OCONFIG_TYPE_STRING) {
472       ERROR("aggregation plugin: Argument %i of the \"GroupBy\" option "
473             "is not a string.",
474             i + 1);
475       continue;
476     }
477
478     const char *value = ci->values[i].value.string;
479
480     if (strcasecmp("Host", value) == 0)
481       agg->group_by |= LU_GROUP_BY_HOST;
482     else if (strcasecmp("Plugin", value) == 0)
483       agg->group_by |= LU_GROUP_BY_PLUGIN;
484     else if (strcasecmp("PluginInstance", value) == 0)
485       agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
486     else if (strcasecmp("TypeInstance", value) == 0)
487       agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
488     else if (strcasecmp("Type", value) == 0)
489       ERROR("aggregation plugin: Grouping by type is not supported.");
490     else
491       WARNING("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
492               "option is invalid and will be ignored.",
493               value);
494   } /* for (ci->values) */
495
496   return 0;
497 } /* }}} int agg_config_handle_group_by */
498
499 static int agg_config_aggregation(oconfig_item_t *ci) /* {{{ */
500 {
501   aggregation_t *agg = calloc(1, sizeof(*agg));
502   if (agg == NULL) {
503     ERROR("aggregation plugin: calloc failed.");
504     return -1;
505   }
506
507   sstrncpy(agg->ident.host, "/.*/", sizeof(agg->ident.host));
508   sstrncpy(agg->ident.plugin, "/.*/", sizeof(agg->ident.plugin));
509   sstrncpy(agg->ident.plugin_instance, "/.*/",
510            sizeof(agg->ident.plugin_instance));
511   sstrncpy(agg->ident.type, "/.*/", sizeof(agg->ident.type));
512   sstrncpy(agg->ident.type_instance, "/.*/", sizeof(agg->ident.type_instance));
513
514   for (int i = 0; i < ci->children_num; i++) {
515     oconfig_item_t *child = ci->children + i;
516     int status = 0;
517
518     if (strcasecmp("Host", child->key) == 0)
519       status = cf_util_get_string_buffer(child, agg->ident.host,
520                                          sizeof(agg->ident.host));
521     else if (strcasecmp("Plugin", child->key) == 0)
522       status = cf_util_get_string_buffer(child, agg->ident.plugin,
523                                          sizeof(agg->ident.plugin));
524     else if (strcasecmp("PluginInstance", child->key) == 0)
525       status = cf_util_get_string_buffer(child, agg->ident.plugin_instance,
526                                          sizeof(agg->ident.plugin_instance));
527     else if (strcasecmp("Type", child->key) == 0)
528       status = cf_util_get_string_buffer(child, agg->ident.type,
529                                          sizeof(agg->ident.type));
530     else if (strcasecmp("TypeInstance", child->key) == 0)
531       status = cf_util_get_string_buffer(child, agg->ident.type_instance,
532                                          sizeof(agg->ident.type_instance));
533     else if (strcasecmp("SetHost", child->key) == 0)
534       status = cf_util_get_string(child, &agg->set_host);
535     else if (strcasecmp("SetPlugin", child->key) == 0)
536       status = cf_util_get_string(child, &agg->set_plugin);
537     else if (strcasecmp("SetPluginInstance", child->key) == 0)
538       status = cf_util_get_string(child, &agg->set_plugin_instance);
539     else if (strcasecmp("SetTypeInstance", child->key) == 0)
540       status = cf_util_get_string(child, &agg->set_type_instance);
541     else if (strcasecmp("GroupBy", child->key) == 0)
542       status = agg_config_handle_group_by(child, agg);
543     else if (strcasecmp("CalculateNum", child->key) == 0)
544       status = cf_util_get_boolean(child, &agg->calc_num);
545     else if (strcasecmp("CalculateSum", child->key) == 0)
546       status = cf_util_get_boolean(child, &agg->calc_sum);
547     else if (strcasecmp("CalculateAverage", child->key) == 0)
548       status = cf_util_get_boolean(child, &agg->calc_average);
549     else if (strcasecmp("CalculateMinimum", child->key) == 0)
550       status = cf_util_get_boolean(child, &agg->calc_min);
551     else if (strcasecmp("CalculateMaximum", child->key) == 0)
552       status = cf_util_get_boolean(child, &agg->calc_max);
553     else if (strcasecmp("CalculateStddev", child->key) == 0)
554       status = cf_util_get_boolean(child, &agg->calc_stddev);
555     else
556       WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
557               "<Aggregation /> blocks and will be ignored.",
558               child->key);
559
560     if (status != 0) {
561       sfree(agg);
562       return status;
563     }
564   } /* for (int i = 0; i < ci->children_num; i++) */
565
566   if (agg_is_regex(agg->ident.host))
567     agg->regex_fields |= LU_GROUP_BY_HOST;
568   if (agg_is_regex(agg->ident.plugin))
569     agg->regex_fields |= LU_GROUP_BY_PLUGIN;
570   if (agg_is_regex(agg->ident.plugin_instance))
571     agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
572   if (agg_is_regex(agg->ident.type_instance))
573     agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
574
575   /* Sanity checking */
576   bool is_valid = true;
577   if (strcmp("/.*/", agg->ident.type) == 0) /* {{{ */
578   {
579     ERROR("aggregation plugin: It appears you did not specify the required "
580           "\"Type\" option in this aggregation. "
581           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
582           "Type \"%s\", TypeInstance \"%s\")",
583           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
584           agg->ident.type, agg->ident.type_instance);
585     is_valid = false;
586   } else if (strchr(agg->ident.type, '/') != NULL) {
587     ERROR("aggregation plugin: The \"Type\" may not contain the '/' "
588           "character. Especially, it may not be a regex. The current "
589           "value is \"%s\".",
590           agg->ident.type);
591     is_valid = false;
592   } /* }}} */
593
594   /* Check that there is at least one regex field without a grouping. {{{ */
595   if ((agg->regex_fields & ~agg->group_by) == 0) {
596     ERROR("aggregation plugin: An aggregation must contain at least one "
597           "wildcard. This is achieved by leaving at least one of the \"Host\", "
598           "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
599           "or using a regular expression and not grouping by that field. "
600           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
601           "Type \"%s\", TypeInstance \"%s\")",
602           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
603           agg->ident.type, agg->ident.type_instance);
604     is_valid = false;
605   } /* }}} */
606
607   /* Check that all grouping fields are regular expressions. {{{ */
608   if (agg->group_by & ~agg->regex_fields) {
609     ERROR("aggregation plugin: Only wildcard fields (fields for which a "
610           "regular expression is configured or which are left blank) can be "
611           "specified in the \"GroupBy\" option. "
612           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
613           "Type \"%s\", TypeInstance \"%s\")",
614           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
615           agg->ident.type, agg->ident.type_instance);
616     is_valid = false;
617   } /* }}} */
618
619   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
620       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) {
621     ERROR("aggregation plugin: No aggregation function has been specified. "
622           "Without this, I don't know what I should be calculating. "
623           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
624           "Type \"%s\", TypeInstance \"%s\")",
625           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
626           agg->ident.type, agg->ident.type_instance);
627     is_valid = false;
628   } /* }}} */
629
630   if (!is_valid) { /* {{{ */
631     sfree(agg);
632     return -1;
633   } /* }}} */
634
635   int status = lookup_add(lookup, &agg->ident, agg->group_by, agg);
636   if (status != 0) {
637     ERROR("aggregation plugin: lookup_add failed with status %i.", status);
638     sfree(agg);
639     return -1;
640   }
641
642   DEBUG("aggregation plugin: Successfully added aggregation: "
643         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
644         "Type \"%s\", TypeInstance \"%s\")",
645         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
646         agg->ident.type, agg->ident.type_instance);
647   return 0;
648 } /* }}} int agg_config_aggregation */
649
650 static int agg_config(oconfig_item_t *ci) /* {{{ */
651 {
652   pthread_mutex_lock(&agg_instance_list_lock);
653
654   if (lookup == NULL) {
655     lookup = lookup_create(agg_lookup_class_callback, agg_lookup_obj_callback,
656                            agg_lookup_free_class_callback,
657                            agg_lookup_free_obj_callback);
658     if (lookup == NULL) {
659       pthread_mutex_unlock(&agg_instance_list_lock);
660       ERROR("aggregation plugin: lookup_create failed.");
661       return -1;
662     }
663   }
664
665   for (int i = 0; i < ci->children_num; i++) {
666     oconfig_item_t *child = ci->children + i;
667
668     if (strcasecmp("Aggregation", child->key) == 0)
669       agg_config_aggregation(child);
670     else
671       WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
672               "<Plugin aggregation /> blocks and will be ignored.",
673               child->key);
674   }
675
676   pthread_mutex_unlock(&agg_instance_list_lock);
677
678   return 0;
679 } /* }}} int agg_config */
680
681 static int agg_read(void) /* {{{ */
682 {
683   cdtime_t t = cdtime();
684   int success = 0;
685
686   pthread_mutex_lock(&agg_instance_list_lock);
687
688   /* agg_instance_list_head only holds data, after the "write" callback has
689    * been called with a matching value list at least once. So on startup,
690    * there's a race between the aggregations read() and write() callback. If
691    * the read() callback is called first, agg_instance_list_head is NULL and
692    * "success" may be zero. This is expected and should not result in an error.
693    * Therefore we need to handle this case separately. */
694   if (agg_instance_list_head == NULL) {
695     pthread_mutex_unlock(&agg_instance_list_lock);
696     return 0;
697   }
698
699   for (agg_instance_t *this = agg_instance_list_head; this != NULL;
700        this = this->next) {
701     int status = agg_instance_read(this, t);
702     if (status != 0)
703       WARNING("aggregation plugin: Reading an aggregation instance "
704               "failed with status %i.",
705               status);
706     else
707       success++;
708   }
709
710   pthread_mutex_unlock(&agg_instance_list_lock);
711
712   return (success > 0) ? 0 : -1;
713 } /* }}} int agg_read */
714
715 static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
716                      __attribute__((unused)) user_data_t *user_data) {
717   bool created_by_aggregation = false;
718   /* Ignore values that were created by the aggregation plugin to avoid weird
719    * effects. */
720   (void)meta_data_get_boolean(vl->meta, "aggregation:created",
721                               &created_by_aggregation);
722   if (created_by_aggregation)
723     return 0;
724
725   int status;
726
727   if (lookup == NULL)
728     status = ENOENT;
729   else {
730     status = lookup_search(lookup, ds, vl);
731     if (status > 0)
732       status = 0;
733   }
734
735   return status;
736 } /* }}} int agg_write */
737
738 void module_register(void) {
739   plugin_register_complex_config("aggregation", agg_config);
740   plugin_register_read("aggregation", agg_read);
741   plugin_register_write("aggregation", agg_write, /* user_data = */ NULL);
742 }