contrib/format.sh src/aggregation.c
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "meta_data.h"
31 #include "plugin.h"
32 #include "utils_cache.h" /* for uc_get_rate() */
33 #include "utils_subst.h"
34 #include "utils_vl_lookup.h"
35
36 #define AGG_MATCHES_ALL(str) (strcmp("/.*/", str) == 0)
37 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
38
39 struct aggregation_s /* {{{ */
40 {
41   lookup_identifier_t ident;
42   unsigned int group_by;
43
44   unsigned int regex_fields;
45
46   char *set_host;
47   char *set_plugin;
48   char *set_plugin_instance;
49   char *set_type_instance;
50
51   _Bool calc_num;
52   _Bool calc_sum;
53   _Bool calc_average;
54   _Bool calc_min;
55   _Bool calc_max;
56   _Bool calc_stddev;
57 }; /* }}} */
58 typedef struct aggregation_s aggregation_t;
59
60 struct agg_instance_s;
61 typedef struct agg_instance_s agg_instance_t;
62 struct agg_instance_s /* {{{ */
63 {
64   pthread_mutex_t lock;
65   lookup_identifier_t ident;
66
67   int ds_type;
68
69   derive_t num;
70   gauge_t sum;
71   gauge_t squares_sum;
72
73   gauge_t min;
74   gauge_t max;
75
76   rate_to_value_state_t *state_num;
77   rate_to_value_state_t *state_sum;
78   rate_to_value_state_t *state_average;
79   rate_to_value_state_t *state_min;
80   rate_to_value_state_t *state_max;
81   rate_to_value_state_t *state_stddev;
82
83   agg_instance_t *next;
84 }; /* }}} */
85
86 static lookup_t *lookup = NULL;
87
88 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
89 static agg_instance_t *agg_instance_list_head = NULL;
90
91 static _Bool agg_is_regex(char const *str) /* {{{ */
92 {
93   size_t len;
94
95   if (str == NULL)
96     return (0);
97
98   len = strlen(str);
99   if (len < 3)
100     return (0);
101
102   if ((str[0] == '/') && (str[len - 1] == '/'))
103     return (1);
104   else
105     return (0);
106 } /* }}} _Bool agg_is_regex */
107
108 static void agg_destroy(aggregation_t *agg) /* {{{ */
109 {
110   sfree(agg);
111 } /* }}} void agg_destroy */
112
113 /* Frees all dynamically allocated memory within the instance. */
114 static void agg_instance_destroy(agg_instance_t *inst) /* {{{ */
115 {
116   if (inst == NULL)
117     return;
118
119   /* Remove this instance from the global list of instances. */
120   pthread_mutex_lock(&agg_instance_list_lock);
121   if (agg_instance_list_head == inst)
122     agg_instance_list_head = inst->next;
123   else if (agg_instance_list_head != NULL) {
124     agg_instance_t *prev = agg_instance_list_head;
125     while ((prev != NULL) && (prev->next != inst))
126       prev = prev->next;
127     if (prev != NULL)
128       prev->next = inst->next;
129   }
130   pthread_mutex_unlock(&agg_instance_list_lock);
131
132   sfree(inst->state_num);
133   sfree(inst->state_sum);
134   sfree(inst->state_average);
135   sfree(inst->state_min);
136   sfree(inst->state_max);
137   sfree(inst->state_stddev);
138
139   memset(inst, 0, sizeof(*inst));
140   inst->ds_type = -1;
141   inst->min = NAN;
142   inst->max = NAN;
143 } /* }}} void agg_instance_destroy */
144
145 static int agg_instance_create_name(agg_instance_t *inst, /* {{{ */
146                                     value_list_t const *vl,
147                                     aggregation_t const *agg) {
148 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value)          \
149   do {                                                                         \
150     if (agg->set_##field != NULL)                                              \
151       sstrncpy(buffer, agg->set_##field, buffer_size);                         \
152     else if ((agg->regex_fields & group_mask) && (agg->group_by & group_mask)) \
153       sstrncpy(buffer, vl->field, buffer_size);                                \
154     else if ((agg->regex_fields & group_mask) &&                               \
155              (AGG_MATCHES_ALL(agg->ident.field)))                              \
156       sstrncpy(buffer, all_value, buffer_size);                                \
157     else                                                                       \
158       sstrncpy(buffer, agg->ident.field, buffer_size);                         \
159   } while (0)
160
161   /* Host */
162   COPY_FIELD(inst->ident.host, sizeof(inst->ident.host), host, LU_GROUP_BY_HOST,
163              "global");
164
165   /* Plugin */
166   if (agg->set_plugin != NULL)
167     sstrncpy(inst->ident.plugin, agg->set_plugin, sizeof(inst->ident.plugin));
168   else
169     sstrncpy(inst->ident.plugin, "aggregation", sizeof(inst->ident.plugin));
170
171   /* Plugin instance */
172   if (agg->set_plugin_instance != NULL)
173     sstrncpy(inst->ident.plugin_instance, agg->set_plugin_instance,
174              sizeof(inst->ident.plugin_instance));
175   else {
176     char tmp_plugin[DATA_MAX_NAME_LEN];
177     char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
178
179     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
180         (agg->group_by & LU_GROUP_BY_PLUGIN))
181       sstrncpy(tmp_plugin, vl->plugin, sizeof(tmp_plugin));
182     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN) &&
183              (AGG_MATCHES_ALL(agg->ident.plugin)))
184       sstrncpy(tmp_plugin, "", sizeof(tmp_plugin));
185     else
186       sstrncpy(tmp_plugin, agg->ident.plugin, sizeof(tmp_plugin));
187
188     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
189         (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
190       sstrncpy(tmp_plugin_instance, vl->plugin_instance,
191                sizeof(tmp_plugin_instance));
192     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE) &&
193              (AGG_MATCHES_ALL(agg->ident.plugin_instance)))
194       sstrncpy(tmp_plugin_instance, "", sizeof(tmp_plugin_instance));
195     else
196       sstrncpy(tmp_plugin_instance, agg->ident.plugin_instance,
197                sizeof(tmp_plugin_instance));
198
199     if ((strcmp("", tmp_plugin) == 0) && (strcmp("", tmp_plugin_instance) == 0))
200       sstrncpy(inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
201                sizeof(inst->ident.plugin_instance));
202     else if (strcmp("", tmp_plugin) != 0)
203       ssnprintf(inst->ident.plugin_instance,
204                 sizeof(inst->ident.plugin_instance), "%s-%s", tmp_plugin,
205                 AGG_FUNC_PLACEHOLDER);
206     else if (strcmp("", tmp_plugin_instance) != 0)
207       ssnprintf(inst->ident.plugin_instance,
208                 sizeof(inst->ident.plugin_instance), "%s-%s",
209                 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
210     else
211       ssnprintf(inst->ident.plugin_instance,
212                 sizeof(inst->ident.plugin_instance), "%s-%s-%s", tmp_plugin,
213                 tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
214   }
215
216   /* Type */
217   sstrncpy(inst->ident.type, agg->ident.type, sizeof(inst->ident.type));
218
219   /* Type instance */
220   COPY_FIELD(inst->ident.type_instance, sizeof(inst->ident.type_instance),
221              type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
222
223 #undef COPY_FIELD
224
225   return (0);
226 } /* }}} int agg_instance_create_name */
227
228 /* Create a new aggregation instance. */
229 static agg_instance_t *agg_instance_create(data_set_t const *ds, /* {{{ */
230                                            value_list_t const *vl,
231                                            aggregation_t *agg) {
232   agg_instance_t *inst;
233
234   DEBUG("aggregation plugin: Creating new instance.");
235
236   inst = calloc(1, sizeof(*inst));
237   if (inst == NULL) {
238     ERROR("aggregation plugin: calloc() failed.");
239     return (NULL);
240   }
241   pthread_mutex_init(&inst->lock, /* attr = */ NULL);
242
243   inst->ds_type = ds->ds[0].type;
244
245   agg_instance_create_name(inst, vl, agg);
246
247   inst->min = NAN;
248   inst->max = NAN;
249
250 #define INIT_STATE(field)                                                      \
251   do {                                                                         \
252     inst->state_##field = NULL;                                                \
253     if (agg->calc_##field) {                                                   \
254       inst->state_##field = calloc(1, sizeof(*inst->state_##field));           \
255       if (inst->state_##field == NULL) {                                       \
256         agg_instance_destroy(inst);                                            \
257         free(inst);                                                            \
258         ERROR("aggregation plugin: calloc() failed.");                         \
259         return (NULL);                                                         \
260       }                                                                        \
261     }                                                                          \
262   } while (0)
263
264   INIT_STATE(num);
265   INIT_STATE(sum);
266   INIT_STATE(average);
267   INIT_STATE(min);
268   INIT_STATE(max);
269   INIT_STATE(stddev);
270
271 #undef INIT_STATE
272
273   pthread_mutex_lock(&agg_instance_list_lock);
274   inst->next = agg_instance_list_head;
275   agg_instance_list_head = inst;
276   pthread_mutex_unlock(&agg_instance_list_lock);
277
278   return (inst);
279 } /* }}} agg_instance_t *agg_instance_create */
280
281 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
282  * the rate of the value list is available. Value lists with more than one data
283  * source are not supported and will return an error. Returns zero on success
284  * and non-zero otherwise. */
285 static int agg_instance_update(agg_instance_t *inst, /* {{{ */
286                                data_set_t const *ds, value_list_t const *vl) {
287   gauge_t *rate;
288
289   if (ds->ds_num != 1) {
290     ERROR("aggregation plugin: The \"%s\" type (data set) has more than one "
291           "data source. This is currently not supported by this plugin. "
292           "Sorry.",
293           ds->type);
294     return (EINVAL);
295   }
296
297   rate = uc_get_rate(ds, vl);
298   if (rate == NULL) {
299     char ident[6 * DATA_MAX_NAME_LEN];
300     FORMAT_VL(ident, sizeof(ident), vl);
301     ERROR("aggregation plugin: Unable to read the current rate of \"%s\".",
302           ident);
303     return (ENOENT);
304   }
305
306   if (isnan(rate[0])) {
307     sfree(rate);
308     return (0);
309   }
310
311   pthread_mutex_lock(&inst->lock);
312
313   inst->num++;
314   inst->sum += rate[0];
315   inst->squares_sum += (rate[0] * rate[0]);
316
317   if (isnan(inst->min) || (inst->min > rate[0]))
318     inst->min = rate[0];
319   if (isnan(inst->max) || (inst->max < rate[0]))
320     inst->max = rate[0];
321
322   pthread_mutex_unlock(&inst->lock);
323
324   sfree(rate);
325   return (0);
326 } /* }}} int agg_instance_update */
327
328 static int agg_instance_read_func(agg_instance_t *inst, /* {{{ */
329                                   char const *func, gauge_t rate,
330                                   rate_to_value_state_t *state,
331                                   value_list_t *vl, char const *pi_prefix,
332                                   cdtime_t t) {
333   value_t v;
334   int status;
335
336   if (pi_prefix[0] != 0)
337     subst_string(vl->plugin_instance, sizeof(vl->plugin_instance), pi_prefix,
338                  AGG_FUNC_PLACEHOLDER, func);
339   else
340     sstrncpy(vl->plugin_instance, func, sizeof(vl->plugin_instance));
341
342   status = rate_to_value(&v, rate, state, inst->ds_type, t);
343   if (status != 0) {
344     /* If this is the first iteration and rate_to_value() was asked to return a
345      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
346      * gracefully. */
347     if (status == EAGAIN)
348       return (0);
349
350     WARNING("aggregation plugin: rate_to_value failed with status %i.", status);
351     return (-1);
352   }
353
354   vl->values = &v;
355   vl->values_len = 1;
356
357   plugin_dispatch_values(vl);
358
359   vl->values = NULL;
360   vl->values_len = 0;
361
362   return (0);
363 } /* }}} int agg_instance_read_func */
364
365 static int agg_instance_read(agg_instance_t *inst, cdtime_t t) /* {{{ */
366 {
367   value_list_t vl = VALUE_LIST_INIT;
368
369   /* Pre-set all the fields in the value list that will not change per
370    * aggregation type (sum, average, ...). The struct will be re-used and must
371    * therefore be dispatched using the "secure" function. */
372
373   vl.time = t;
374   vl.interval = 0;
375
376   vl.meta = meta_data_create();
377   if (vl.meta == NULL) {
378     ERROR("aggregation plugin: meta_data_create failed.");
379     return (-1);
380   }
381   meta_data_add_boolean(vl.meta, "aggregation:created", 1);
382
383   sstrncpy(vl.host, inst->ident.host, sizeof(vl.host));
384   sstrncpy(vl.plugin, inst->ident.plugin, sizeof(vl.plugin));
385   sstrncpy(vl.type, inst->ident.type, sizeof(vl.type));
386   sstrncpy(vl.type_instance, inst->ident.type_instance,
387            sizeof(vl.type_instance));
388
389 #define READ_FUNC(func, rate)                                                  \
390   do {                                                                         \
391     if (inst->state_##func != NULL) {                                          \
392       agg_instance_read_func(inst, #func, rate, inst->state_##func, &vl,       \
393                              inst->ident.plugin_instance, t);                  \
394     }                                                                          \
395   } while (0)
396
397   pthread_mutex_lock(&inst->lock);
398
399   READ_FUNC(num, (gauge_t)inst->num);
400
401   /* All other aggregations are only defined when there have been any values
402    * at all. */
403   if (inst->num > 0) {
404     READ_FUNC(sum, inst->sum);
405     READ_FUNC(average, (inst->sum / ((gauge_t)inst->num)));
406     READ_FUNC(min, inst->min);
407     READ_FUNC(max, inst->max);
408     READ_FUNC(stddev,
409               sqrt((((gauge_t)inst->num) * inst->squares_sum) -
410                    (inst->sum * inst->sum)) /
411                   ((gauge_t)inst->num));
412   }
413
414   /* Reset internal state. */
415   inst->num = 0;
416   inst->sum = 0.0;
417   inst->squares_sum = 0.0;
418   inst->min = NAN;
419   inst->max = NAN;
420
421   pthread_mutex_unlock(&inst->lock);
422
423   meta_data_destroy(vl.meta);
424   vl.meta = NULL;
425
426   return (0);
427 } /* }}} int agg_instance_read */
428
429 /* lookup_class_callback_t for utils_vl_lookup */
430 static void *agg_lookup_class_callback(/* {{{ */
431                                        data_set_t const *ds,
432                                        value_list_t const *vl,
433                                        void *user_class) {
434   return (agg_instance_create(ds, vl, (aggregation_t *)user_class));
435 } /* }}} void *agg_class_callback */
436
437 /* lookup_obj_callback_t for utils_vl_lookup */
438 static int agg_lookup_obj_callback(data_set_t const *ds, /* {{{ */
439                                    value_list_t const *vl,
440                                    __attribute__((unused)) void *user_class,
441                                    void *user_obj) {
442   return (agg_instance_update((agg_instance_t *)user_obj, ds, vl));
443 } /* }}} int agg_lookup_obj_callback */
444
445 /* lookup_free_class_callback_t for utils_vl_lookup */
446 static void agg_lookup_free_class_callback(void *user_class) /* {{{ */
447 {
448   agg_destroy((aggregation_t *)user_class);
449 } /* }}} void agg_lookup_free_class_callback */
450
451 /* lookup_free_obj_callback_t for utils_vl_lookup */
452 static void agg_lookup_free_obj_callback(void *user_obj) /* {{{ */
453 {
454   agg_instance_destroy((agg_instance_t *)user_obj);
455 } /* }}} void agg_lookup_free_obj_callback */
456
457 /*
458  * <Plugin "aggregation">
459  *   <Aggregation>
460  *     Plugin "cpu"
461  *     Type "cpu"
462  *
463  *     GroupBy Host
464  *     GroupBy TypeInstance
465  *
466  *     CalculateNum true
467  *     CalculateSum true
468  *     CalculateAverage true
469  *     CalculateMinimum true
470  *     CalculateMaximum true
471  *     CalculateStddev true
472  *   </Aggregation>
473  * </Plugin>
474  */
475 static int agg_config_handle_group_by(oconfig_item_t const *ci, /* {{{ */
476                                       aggregation_t *agg) {
477   for (int i = 0; i < ci->values_num; i++) {
478     char const *value;
479
480     if (ci->values[i].type != OCONFIG_TYPE_STRING) {
481       ERROR("aggregation plugin: Argument %i of the \"GroupBy\" option "
482             "is not a string.",
483             i + 1);
484       continue;
485     }
486
487     value = ci->values[i].value.string;
488
489     if (strcasecmp("Host", value) == 0)
490       agg->group_by |= LU_GROUP_BY_HOST;
491     else if (strcasecmp("Plugin", value) == 0)
492       agg->group_by |= LU_GROUP_BY_PLUGIN;
493     else if (strcasecmp("PluginInstance", value) == 0)
494       agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
495     else if (strcasecmp("TypeInstance", value) == 0)
496       agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
497     else if (strcasecmp("Type", value) == 0)
498       ERROR("aggregation plugin: Grouping by type is not supported.");
499     else
500       WARNING("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
501               "option is invalid and will be ignored.",
502               value);
503   } /* for (ci->values) */
504
505   return (0);
506 } /* }}} int agg_config_handle_group_by */
507
508 static int agg_config_aggregation(oconfig_item_t *ci) /* {{{ */
509 {
510   aggregation_t *agg = calloc(1, sizeof(*agg));
511   if (agg == NULL) {
512     ERROR("aggregation plugin: calloc failed.");
513     return (-1);
514   }
515
516   sstrncpy(agg->ident.host, "/.*/", sizeof(agg->ident.host));
517   sstrncpy(agg->ident.plugin, "/.*/", sizeof(agg->ident.plugin));
518   sstrncpy(agg->ident.plugin_instance, "/.*/",
519            sizeof(agg->ident.plugin_instance));
520   sstrncpy(agg->ident.type, "/.*/", sizeof(agg->ident.type));
521   sstrncpy(agg->ident.type_instance, "/.*/", sizeof(agg->ident.type_instance));
522
523   for (int i = 0; i < ci->children_num; i++) {
524     oconfig_item_t *child = ci->children + i;
525     int status = 0;
526
527     if (strcasecmp("Host", child->key) == 0)
528       status = cf_util_get_string_buffer(child, agg->ident.host,
529                                          sizeof(agg->ident.host));
530     else if (strcasecmp("Plugin", child->key) == 0)
531       status = cf_util_get_string_buffer(child, agg->ident.plugin,
532                                          sizeof(agg->ident.plugin));
533     else if (strcasecmp("PluginInstance", child->key) == 0)
534       status = cf_util_get_string_buffer(child, agg->ident.plugin_instance,
535                                          sizeof(agg->ident.plugin_instance));
536     else if (strcasecmp("Type", child->key) == 0)
537       status = cf_util_get_string_buffer(child, agg->ident.type,
538                                          sizeof(agg->ident.type));
539     else if (strcasecmp("TypeInstance", child->key) == 0)
540       status = cf_util_get_string_buffer(child, agg->ident.type_instance,
541                                          sizeof(agg->ident.type_instance));
542     else if (strcasecmp("SetHost", child->key) == 0)
543       status = cf_util_get_string(child, &agg->set_host);
544     else if (strcasecmp("SetPlugin", child->key) == 0)
545       status = cf_util_get_string(child, &agg->set_plugin);
546     else if (strcasecmp("SetPluginInstance", child->key) == 0)
547       status = cf_util_get_string(child, &agg->set_plugin_instance);
548     else if (strcasecmp("SetTypeInstance", child->key) == 0)
549       status = cf_util_get_string(child, &agg->set_type_instance);
550     else if (strcasecmp("GroupBy", child->key) == 0)
551       status = agg_config_handle_group_by(child, agg);
552     else if (strcasecmp("CalculateNum", child->key) == 0)
553       status = cf_util_get_boolean(child, &agg->calc_num);
554     else if (strcasecmp("CalculateSum", child->key) == 0)
555       status = cf_util_get_boolean(child, &agg->calc_sum);
556     else if (strcasecmp("CalculateAverage", child->key) == 0)
557       status = cf_util_get_boolean(child, &agg->calc_average);
558     else if (strcasecmp("CalculateMinimum", child->key) == 0)
559       status = cf_util_get_boolean(child, &agg->calc_min);
560     else if (strcasecmp("CalculateMaximum", child->key) == 0)
561       status = cf_util_get_boolean(child, &agg->calc_max);
562     else if (strcasecmp("CalculateStddev", child->key) == 0)
563       status = cf_util_get_boolean(child, &agg->calc_stddev);
564     else
565       WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
566               "<Aggregation /> blocks and will be ignored.",
567               child->key);
568
569     if (status != 0) {
570       sfree(agg);
571       return status;
572     }
573   } /* for (int i = 0; i < ci->children_num; i++) */
574
575   if (agg_is_regex(agg->ident.host))
576     agg->regex_fields |= LU_GROUP_BY_HOST;
577   if (agg_is_regex(agg->ident.plugin))
578     agg->regex_fields |= LU_GROUP_BY_PLUGIN;
579   if (agg_is_regex(agg->ident.plugin_instance))
580     agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
581   if (agg_is_regex(agg->ident.type_instance))
582     agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
583
584   /* Sanity checking */
585   _Bool is_valid = 1;
586   if (strcmp("/.*/", agg->ident.type) == 0) /* {{{ */
587   {
588     ERROR("aggregation plugin: It appears you did not specify the required "
589           "\"Type\" option in this aggregation. "
590           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
591           "Type \"%s\", TypeInstance \"%s\")",
592           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
593           agg->ident.type, agg->ident.type_instance);
594     is_valid = 0;
595   } else if (strchr(agg->ident.type, '/') != NULL) {
596     ERROR("aggregation plugin: The \"Type\" may not contain the '/' "
597           "character. Especially, it may not be a regex. The current "
598           "value is \"%s\".",
599           agg->ident.type);
600     is_valid = 0;
601   } /* }}} */
602
603   /* Check that there is at least one regex field without a grouping. {{{ */
604   if ((agg->regex_fields & ~agg->group_by) == 0) {
605     ERROR("aggregation plugin: An aggregation must contain at least one "
606           "wildcard. This is achieved by leaving at least one of the \"Host\", "
607           "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
608           "or using a regular expression and not grouping by that field. "
609           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
610           "Type \"%s\", TypeInstance \"%s\")",
611           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
612           agg->ident.type, agg->ident.type_instance);
613     is_valid = 0;
614   } /* }}} */
615
616   /* Check that all grouping fields are regular expressions. {{{ */
617   if (agg->group_by & ~agg->regex_fields) {
618     ERROR("aggregation plugin: Only wildcard fields (fields for which a "
619           "regular expression is configured or which are left blank) can be "
620           "specified in the \"GroupBy\" option. "
621           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
622           "Type \"%s\", TypeInstance \"%s\")",
623           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
624           agg->ident.type, agg->ident.type_instance);
625     is_valid = 0;
626   } /* }}} */
627
628   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
629       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) {
630     ERROR("aggregation plugin: No aggregation function has been specified. "
631           "Without this, I don't know what I should be calculating. "
632           "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
633           "Type \"%s\", TypeInstance \"%s\")",
634           agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
635           agg->ident.type, agg->ident.type_instance);
636     is_valid = 0;
637   } /* }}} */
638
639   if (!is_valid) { /* {{{ */
640     sfree(agg);
641     return (-1);
642   } /* }}} */
643
644   int status = lookup_add(lookup, &agg->ident, agg->group_by, agg);
645   if (status != 0) {
646     ERROR("aggregation plugin: lookup_add failed with status %i.", status);
647     sfree(agg);
648     return (-1);
649   }
650
651   DEBUG("aggregation plugin: Successfully added aggregation: "
652         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
653         "Type \"%s\", TypeInstance \"%s\")",
654         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
655         agg->ident.type, agg->ident.type_instance);
656   return (0);
657 } /* }}} int agg_config_aggregation */
658
659 static int agg_config(oconfig_item_t *ci) /* {{{ */
660 {
661   pthread_mutex_lock(&agg_instance_list_lock);
662
663   if (lookup == NULL) {
664     lookup = lookup_create(agg_lookup_class_callback, agg_lookup_obj_callback,
665                            agg_lookup_free_class_callback,
666                            agg_lookup_free_obj_callback);
667     if (lookup == NULL) {
668       pthread_mutex_unlock(&agg_instance_list_lock);
669       ERROR("aggregation plugin: lookup_create failed.");
670       return (-1);
671     }
672   }
673
674   for (int i = 0; i < ci->children_num; i++) {
675     oconfig_item_t *child = ci->children + i;
676
677     if (strcasecmp("Aggregation", child->key) == 0)
678       agg_config_aggregation(child);
679     else
680       WARNING("aggregation plugin: The \"%s\" key is not allowed inside "
681               "<Plugin aggregation /> blocks and will be ignored.",
682               child->key);
683   }
684
685   pthread_mutex_unlock(&agg_instance_list_lock);
686
687   return (0);
688 } /* }}} int agg_config */
689
690 static int agg_read(void) /* {{{ */
691 {
692   cdtime_t t;
693   int success;
694
695   t = cdtime();
696   success = 0;
697
698   pthread_mutex_lock(&agg_instance_list_lock);
699
700   /* agg_instance_list_head only holds data, after the "write" callback has
701    * been called with a matching value list at least once. So on startup,
702    * there's a race between the aggregations read() and write() callback. If
703    * the read() callback is called first, agg_instance_list_head is NULL and
704    * "success" may be zero. This is expected and should not result in an error.
705    * Therefore we need to handle this case separately. */
706   if (agg_instance_list_head == NULL) {
707     pthread_mutex_unlock(&agg_instance_list_lock);
708     return (0);
709   }
710
711   for (agg_instance_t *this = agg_instance_list_head; this != NULL;
712        this = this->next) {
713     int status;
714
715     status = agg_instance_read(this, t);
716     if (status != 0)
717       WARNING("aggregation plugin: Reading an aggregation instance "
718               "failed with status %i.",
719               status);
720     else
721       success++;
722   }
723
724   pthread_mutex_unlock(&agg_instance_list_lock);
725
726   return ((success > 0) ? 0 : -1);
727 } /* }}} int agg_read */
728
729 static int agg_write(data_set_t const *ds, value_list_t const *vl, /* {{{ */
730                      __attribute__((unused)) user_data_t *user_data) {
731   _Bool created_by_aggregation = 0;
732   int status;
733
734   /* Ignore values that were created by the aggregation plugin to avoid weird
735    * effects. */
736   (void)meta_data_get_boolean(vl->meta, "aggregation:created",
737                               &created_by_aggregation);
738   if (created_by_aggregation)
739     return (0);
740
741   if (lookup == NULL)
742     status = ENOENT;
743   else {
744     status = lookup_search(lookup, ds, vl);
745     if (status > 0)
746       status = 0;
747   }
748
749   return (status);
750 } /* }}} int agg_write */
751
752 void module_register(void) {
753   plugin_register_complex_config("aggregation", agg_config);
754   plugin_register_read("aggregation", agg_read);
755   plugin_register_write("aggregation", agg_write, /* user_data = */ NULL);
756 }
757
758 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */