Merge branch 'collectd-5.5'
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include <pthread.h>
30
31 #include "plugin.h"
32 #include "common.h"
33 #include "configfile.h"
34 #include "meta_data.h"
35 #include "utils_cache.h" /* for uc_get_rate() */
36 #include "utils_subst.h"
37 #include "utils_vl_lookup.h"
38
39 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
40 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
41
42 struct aggregation_s /* {{{ */
43 {
44   identifier_t ident;
45   unsigned int group_by;
46
47   unsigned int regex_fields;
48
49   char *set_host;
50   char *set_plugin;
51   char *set_plugin_instance;
52   char *set_type_instance;
53
54   _Bool calc_num;
55   _Bool calc_sum;
56   _Bool calc_average;
57   _Bool calc_min;
58   _Bool calc_max;
59   _Bool calc_stddev;
60 }; /* }}} */
61 typedef struct aggregation_s aggregation_t;
62
63 struct agg_instance_s;
64 typedef struct agg_instance_s agg_instance_t;
65 struct agg_instance_s /* {{{ */
66 {
67   pthread_mutex_t lock;
68   identifier_t ident;
69
70   int ds_type;
71
72   derive_t num;
73   gauge_t sum;
74   gauge_t squares_sum;
75
76   gauge_t min;
77   gauge_t max;
78
79   rate_to_value_state_t *state_num;
80   rate_to_value_state_t *state_sum;
81   rate_to_value_state_t *state_average;
82   rate_to_value_state_t *state_min;
83   rate_to_value_state_t *state_max;
84   rate_to_value_state_t *state_stddev;
85
86   agg_instance_t *next;
87 }; /* }}} */
88
89 static lookup_t *lookup = NULL;
90
91 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
92 static agg_instance_t *agg_instance_list_head = NULL;
93
94 static _Bool agg_is_regex (char const *str) /* {{{ */
95 {
96   size_t len;
97
98   if (str == NULL)
99     return (0);
100
101   len = strlen (str);
102   if (len < 3)
103     return (0);
104
105   if ((str[0] == '/') && (str[len - 1] == '/'))
106     return (1);
107   else
108     return (0);
109 } /* }}} _Bool agg_is_regex */
110
111 static void agg_destroy (aggregation_t *agg) /* {{{ */
112 {
113   sfree (agg);
114 } /* }}} void agg_destroy */
115
116 /* Frees all dynamically allocated memory within the instance. */
117 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
118 {
119   if (inst == NULL)
120     return;
121
122   /* Remove this instance from the global list of instances. */
123   pthread_mutex_lock (&agg_instance_list_lock);
124   if (agg_instance_list_head == inst)
125     agg_instance_list_head = inst->next;
126   else if (agg_instance_list_head != NULL)
127   {
128     agg_instance_t *prev = agg_instance_list_head;
129     while ((prev != NULL) && (prev->next != inst))
130       prev = prev->next;
131     if (prev != NULL)
132       prev->next = inst->next;
133   }
134   pthread_mutex_unlock (&agg_instance_list_lock);
135
136   sfree (inst->state_num);
137   sfree (inst->state_sum);
138   sfree (inst->state_average);
139   sfree (inst->state_min);
140   sfree (inst->state_max);
141   sfree (inst->state_stddev);
142
143   memset (inst, 0, sizeof (*inst));
144   inst->ds_type = -1;
145   inst->min = NAN;
146   inst->max = NAN;
147 } /* }}} void agg_instance_destroy */
148
149 static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
150     value_list_t const *vl, aggregation_t const *agg)
151 {
152 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
153   if (agg->set_ ## field != NULL) \
154     sstrncpy (buffer, agg->set_ ## field, buffer_size); \
155   else if ((agg->regex_fields & group_mask) \
156       && (agg->group_by & group_mask)) \
157     sstrncpy (buffer, vl->field, buffer_size); \
158   else if ((agg->regex_fields & group_mask) \
159       && (AGG_MATCHES_ALL (agg->ident.field))) \
160     sstrncpy (buffer, all_value, buffer_size); \
161   else \
162     sstrncpy (buffer, agg->ident.field, buffer_size); \
163 } while (0)
164
165   /* Host */
166   COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
167       host, LU_GROUP_BY_HOST, "global");
168
169   /* Plugin */
170   if (agg->set_plugin != NULL)
171     sstrncpy (inst->ident.plugin, agg->set_plugin,
172         sizeof (inst->ident.plugin));
173   else
174     sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
175
176   /* Plugin instance */
177   if (agg->set_plugin_instance != NULL)
178     sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
179         sizeof (inst->ident.plugin_instance));
180   else
181   {
182     char tmp_plugin[DATA_MAX_NAME_LEN];
183     char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
184
185     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
186         && (agg->group_by & LU_GROUP_BY_PLUGIN))
187       sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
188     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
189         && (AGG_MATCHES_ALL (agg->ident.plugin)))
190       sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
191     else
192       sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
193
194     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
195         && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
196       sstrncpy (tmp_plugin_instance, vl->plugin_instance,
197           sizeof (tmp_plugin_instance));
198     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
199         && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
200       sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
201     else
202       sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
203           sizeof (tmp_plugin_instance));
204
205     if ((strcmp ("", tmp_plugin) == 0)
206         && (strcmp ("", tmp_plugin_instance) == 0))
207       sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
208           sizeof (inst->ident.plugin_instance));
209     else if (strcmp ("", tmp_plugin) != 0)
210       ssnprintf (inst->ident.plugin_instance,
211           sizeof (inst->ident.plugin_instance),
212           "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
213     else if (strcmp ("", tmp_plugin_instance) != 0)
214       ssnprintf (inst->ident.plugin_instance,
215           sizeof (inst->ident.plugin_instance),
216           "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
217     else
218       ssnprintf (inst->ident.plugin_instance,
219           sizeof (inst->ident.plugin_instance),
220           "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
221   }
222
223   /* Type */
224   sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
225
226   /* Type instance */
227   COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
228       type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
229
230 #undef COPY_FIELD
231
232   return (0);
233 } /* }}} int agg_instance_create_name */
234
235 /* Create a new aggregation instance. */
236 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
237     value_list_t const *vl, aggregation_t *agg)
238 {
239   agg_instance_t *inst;
240
241   DEBUG ("aggregation plugin: Creating new instance.");
242
243   inst = malloc (sizeof (*inst));
244   if (inst == NULL)
245   {
246     ERROR ("aggregation plugin: malloc() failed.");
247     return (NULL);
248   }
249   memset (inst, 0, sizeof (*inst));
250   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
251
252   inst->ds_type = ds->ds[0].type;
253
254   agg_instance_create_name (inst, vl, agg);
255
256   inst->min = NAN;
257   inst->max = NAN;
258
259 #define INIT_STATE(field) do { \
260   inst->state_ ## field = NULL; \
261   if (agg->calc_ ## field) { \
262     inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
263     if (inst->state_ ## field == NULL) { \
264       agg_instance_destroy (inst); \
265       free (inst); \
266       ERROR ("aggregation plugin: malloc() failed."); \
267       return (NULL); \
268     } \
269     memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
270   } \
271 } while (0)
272
273   INIT_STATE (num);
274   INIT_STATE (sum);
275   INIT_STATE (average);
276   INIT_STATE (min);
277   INIT_STATE (max);
278   INIT_STATE (stddev);
279
280 #undef INIT_STATE
281
282   pthread_mutex_lock (&agg_instance_list_lock);
283   inst->next = agg_instance_list_head;
284   agg_instance_list_head = inst;
285   pthread_mutex_unlock (&agg_instance_list_lock);
286
287   return (inst);
288 } /* }}} agg_instance_t *agg_instance_create */
289
290 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
291  * the rate of the value list is available. Value lists with more than one data
292  * source are not supported and will return an error. Returns zero on success
293  * and non-zero otherwise. */
294 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
295     data_set_t const *ds, value_list_t const *vl)
296 {
297   gauge_t *rate;
298
299   if (ds->ds_num != 1)
300   {
301     ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
302         "data source. This is currently not supported by this plugin. "
303         "Sorry.", ds->type);
304     return (EINVAL);
305   }
306
307   rate = uc_get_rate (ds, vl);
308   if (rate == NULL)
309   {
310     char ident[6 * DATA_MAX_NAME_LEN];
311     FORMAT_VL (ident, sizeof (ident), vl);
312     ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
313         ident);
314     return (ENOENT);
315   }
316
317   if (isnan (rate[0]))
318   {
319     sfree (rate);
320     return (0);
321   }
322
323   pthread_mutex_lock (&inst->lock);
324
325   inst->num++;
326   inst->sum += rate[0];
327   inst->squares_sum += (rate[0] * rate[0]);
328
329   if (isnan (inst->min) || (inst->min > rate[0]))
330     inst->min = rate[0];
331   if (isnan (inst->max) || (inst->max < rate[0]))
332     inst->max = rate[0];
333
334   pthread_mutex_unlock (&inst->lock);
335
336   sfree (rate);
337   return (0);
338 } /* }}} int agg_instance_update */
339
340 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
341   char const *func, gauge_t rate, rate_to_value_state_t *state,
342   value_list_t *vl, char const *pi_prefix, cdtime_t t)
343 {
344   value_t v;
345   int status;
346
347   if (pi_prefix[0] != 0)
348     subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
349         pi_prefix, AGG_FUNC_PLACEHOLDER, func);
350   else
351     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
352
353   memset (&v, 0, sizeof (v));
354   status = rate_to_value (&v, rate, state, inst->ds_type, t);
355   if (status != 0)
356   {
357     /* If this is the first iteration and rate_to_value() was asked to return a
358      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
359      * gracefully. */
360     if (status == EAGAIN)
361       return (0);
362
363     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
364         status);
365     return (-1);
366   }
367
368   vl->values = &v;
369   vl->values_len = 1;
370
371   plugin_dispatch_values (vl);
372
373   vl->values = NULL;
374   vl->values_len = 0;
375
376   return (0);
377 } /* }}} int agg_instance_read_func */
378
379 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
380 {
381   value_list_t vl = VALUE_LIST_INIT;
382
383   /* Pre-set all the fields in the value list that will not change per
384    * aggregation type (sum, average, ...). The struct will be re-used and must
385    * therefore be dispatched using the "secure" function. */
386
387   vl.time = t;
388   vl.interval = 0;
389
390   vl.meta = meta_data_create ();
391   if (vl.meta == NULL)
392   {
393     ERROR ("aggregation plugin: meta_data_create failed.");
394     return (-1);
395   }
396   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
397
398   sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
399   sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
400   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
401   sstrncpy (vl.type_instance, inst->ident.type_instance,
402       sizeof (vl.type_instance));
403
404 #define READ_FUNC(func, rate) do { \
405   if (inst->state_ ## func != NULL) { \
406     agg_instance_read_func (inst, #func, rate, \
407         inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
408   } \
409 } while (0)
410
411   pthread_mutex_lock (&inst->lock);
412
413   READ_FUNC (num, (gauge_t) inst->num);
414
415   /* All other aggregations are only defined when there have been any values
416    * at all. */
417   if (inst->num > 0)
418   {
419     READ_FUNC (sum, inst->sum);
420     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
421     READ_FUNC (min, inst->min);
422     READ_FUNC (max, inst->max);
423     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
424           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
425   }
426
427   /* Reset internal state. */
428   inst->num = 0;
429   inst->sum = 0.0;
430   inst->squares_sum = 0.0;
431   inst->min = NAN;
432   inst->max = NAN;
433
434   pthread_mutex_unlock (&inst->lock);
435
436   meta_data_destroy (vl.meta);
437   vl.meta = NULL;
438
439   return (0);
440 } /* }}} int agg_instance_read */
441
442 /* lookup_class_callback_t for utils_vl_lookup */
443 static void *agg_lookup_class_callback ( /* {{{ */
444     data_set_t const *ds, value_list_t const *vl, void *user_class)
445 {
446   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
447 } /* }}} void *agg_class_callback */
448
449 /* lookup_obj_callback_t for utils_vl_lookup */
450 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
451     value_list_t const *vl,
452     __attribute__((unused)) void *user_class,
453     void *user_obj)
454 {
455   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
456 } /* }}} int agg_lookup_obj_callback */
457
458 /* lookup_free_class_callback_t for utils_vl_lookup */
459 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
460 {
461   agg_destroy ((aggregation_t *) user_class);
462 } /* }}} void agg_lookup_free_class_callback */
463
464 /* lookup_free_obj_callback_t for utils_vl_lookup */
465 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
466 {
467   agg_instance_destroy ((agg_instance_t *) user_obj);
468 } /* }}} void agg_lookup_free_obj_callback */
469
470 /*
471  * <Plugin "aggregation">
472  *   <Aggregation>
473  *     Plugin "cpu"
474  *     Type "cpu"
475  *
476  *     GroupBy Host
477  *     GroupBy TypeInstance
478  *
479  *     CalculateNum true
480  *     CalculateSum true
481  *     CalculateAverage true
482  *     CalculateMinimum true
483  *     CalculateMaximum true
484  *     CalculateStddev true
485  *   </Aggregation>
486  * </Plugin>
487  */
488 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
489     aggregation_t *agg)
490 {
491   int i;
492
493   for (i = 0; i < ci->values_num; i++)
494   {
495     char const *value;
496
497     if (ci->values[i].type != OCONFIG_TYPE_STRING)
498     {
499       ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
500           "is not a string.", i + 1);
501       continue;
502     }
503
504     value = ci->values[i].value.string;
505
506     if (strcasecmp ("Host", value) == 0)
507       agg->group_by |= LU_GROUP_BY_HOST;
508     else if (strcasecmp ("Plugin", value) == 0)
509       agg->group_by |= LU_GROUP_BY_PLUGIN;
510     else if (strcasecmp ("PluginInstance", value) == 0)
511       agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
512     else if (strcasecmp ("TypeInstance", value) == 0)
513       agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
514     else if (strcasecmp ("Type", value) == 0)
515       ERROR ("aggregation plugin: Grouping by type is not supported.");
516     else
517       WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
518           "option is invalid and will be ignored.", value);
519   } /* for (ci->values) */
520
521   return (0);
522 } /* }}} int agg_config_handle_group_by */
523
524 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
525 {
526   aggregation_t *agg;
527   _Bool is_valid;
528   int status;
529   int i;
530
531   agg = malloc (sizeof (*agg));
532   if (agg == NULL)
533   {
534     ERROR ("aggregation plugin: malloc failed.");
535     return (-1);
536   }
537   memset (agg, 0, sizeof (*agg));
538
539   sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
540   sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
541   sstrncpy (agg->ident.plugin_instance, "/.*/",
542       sizeof (agg->ident.plugin_instance));
543   sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
544   sstrncpy (agg->ident.type_instance, "/.*/",
545       sizeof (agg->ident.type_instance));
546
547   for (i = 0; i < ci->children_num; i++)
548   {
549     oconfig_item_t *child = ci->children + i;
550
551     if (strcasecmp ("Host", child->key) == 0)
552       cf_util_get_string_buffer (child, agg->ident.host,
553           sizeof (agg->ident.host));
554     else if (strcasecmp ("Plugin", child->key) == 0)
555       cf_util_get_string_buffer (child, agg->ident.plugin,
556           sizeof (agg->ident.plugin));
557     else if (strcasecmp ("PluginInstance", child->key) == 0)
558       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
559           sizeof (agg->ident.plugin_instance));
560     else if (strcasecmp ("Type", child->key) == 0)
561       cf_util_get_string_buffer (child, agg->ident.type,
562           sizeof (agg->ident.type));
563     else if (strcasecmp ("TypeInstance", child->key) == 0)
564       cf_util_get_string_buffer (child, agg->ident.type_instance,
565           sizeof (agg->ident.type_instance));
566     else if (strcasecmp ("SetHost", child->key) == 0)
567       cf_util_get_string (child, &agg->set_host);
568     else if (strcasecmp ("SetPlugin", child->key) == 0)
569       cf_util_get_string (child, &agg->set_plugin);
570     else if (strcasecmp ("SetPluginInstance", child->key) == 0)
571       cf_util_get_string (child, &agg->set_plugin_instance);
572     else if (strcasecmp ("SetTypeInstance", child->key) == 0)
573       cf_util_get_string (child, &agg->set_type_instance);
574     else if (strcasecmp ("GroupBy", child->key) == 0)
575       agg_config_handle_group_by (child, agg);
576     else if (strcasecmp ("CalculateNum", child->key) == 0)
577       cf_util_get_boolean (child, &agg->calc_num);
578     else if (strcasecmp ("CalculateSum", child->key) == 0)
579       cf_util_get_boolean (child, &agg->calc_sum);
580     else if (strcasecmp ("CalculateAverage", child->key) == 0)
581       cf_util_get_boolean (child, &agg->calc_average);
582     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
583       cf_util_get_boolean (child, &agg->calc_min);
584     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
585       cf_util_get_boolean (child, &agg->calc_max);
586     else if (strcasecmp ("CalculateStddev", child->key) == 0)
587       cf_util_get_boolean (child, &agg->calc_stddev);
588     else
589       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
590           "<Aggregation /> blocks and will be ignored.", child->key);
591   }
592
593   if (agg_is_regex (agg->ident.host))
594     agg->regex_fields |= LU_GROUP_BY_HOST;
595   if (agg_is_regex (agg->ident.plugin))
596     agg->regex_fields |= LU_GROUP_BY_PLUGIN;
597   if (agg_is_regex (agg->ident.plugin_instance))
598     agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
599   if (agg_is_regex (agg->ident.type_instance))
600     agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
601
602   /* Sanity checking */
603   is_valid = 1;
604   if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
605   {
606     ERROR ("aggregation plugin: It appears you did not specify the required "
607         "\"Type\" option in this aggregation. "
608         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
609         "Type \"%s\", TypeInstance \"%s\")",
610         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
611         agg->ident.type, agg->ident.type_instance);
612     is_valid = 0;
613   }
614   else if (strchr (agg->ident.type, '/') != NULL)
615   {
616     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
617         "character. Especially, it may not be a regex. The current "
618         "value is \"%s\".", agg->ident.type);
619     is_valid = 0;
620   } /* }}} */
621
622   /* Check that there is at least one regex field without a grouping. {{{ */
623   if ((agg->regex_fields & ~agg->group_by) == 0)
624   {
625     ERROR ("aggregation plugin: An aggregation must contain at least one "
626         "wildcard. This is achieved by leaving at least one of the \"Host\", "
627         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
628         "or using a regular expression and not grouping by that field. "
629         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
630         "Type \"%s\", TypeInstance \"%s\")",
631         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
632         agg->ident.type, agg->ident.type_instance);
633     is_valid = 0;
634   } /* }}} */
635
636   /* Check that all grouping fields are regular expressions. {{{ */
637   if (agg->group_by & ~agg->regex_fields)
638   {
639     ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
640         "regular expression is configured or which are left blank) can be "
641         "specified in the \"GroupBy\" option. "
642         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
643         "Type \"%s\", TypeInstance \"%s\")",
644         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
645         agg->ident.type, agg->ident.type_instance);
646     is_valid = 0;
647   } /* }}} */
648
649   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
650       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
651   {
652     ERROR ("aggregation plugin: No aggregation function has been specified. "
653         "Without this, I don't know what I should be calculating. "
654         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
655         "Type \"%s\", TypeInstance \"%s\")",
656         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
657         agg->ident.type, agg->ident.type_instance);
658     is_valid = 0;
659   } /* }}} */
660
661   if (!is_valid) /* {{{ */
662   {
663     sfree (agg);
664     return (-1);
665   } /* }}} */
666
667   status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
668   if (status != 0)
669   {
670     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
671     sfree (agg);
672     return (-1);
673   }
674
675   DEBUG ("aggregation plugin: Successfully added aggregation: "
676       "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
677       "Type \"%s\", TypeInstance \"%s\")",
678       agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
679       agg->ident.type, agg->ident.type_instance);
680   return (0);
681 } /* }}} int agg_config_aggregation */
682
683 static int agg_config (oconfig_item_t *ci) /* {{{ */
684 {
685   int i;
686
687   pthread_mutex_lock (&agg_instance_list_lock);
688
689   if (lookup == NULL)
690   {
691     lookup = lookup_create (agg_lookup_class_callback,
692         agg_lookup_obj_callback,
693         agg_lookup_free_class_callback,
694         agg_lookup_free_obj_callback);
695     if (lookup == NULL)
696     {
697       pthread_mutex_unlock (&agg_instance_list_lock);
698       ERROR ("aggregation plugin: lookup_create failed.");
699       return (-1);
700     }
701   }
702
703   for (i = 0; i < ci->children_num; i++)
704   {
705     oconfig_item_t *child = ci->children + i;
706
707     if (strcasecmp ("Aggregation", child->key) == 0)
708       agg_config_aggregation (child);
709     else
710       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
711           "<Plugin aggregation /> blocks and will be ignored.", child->key);
712   }
713
714   pthread_mutex_unlock (&agg_instance_list_lock);
715
716   return (0);
717 } /* }}} int agg_config */
718
719 static int agg_read (void) /* {{{ */
720 {
721   agg_instance_t *this;
722   cdtime_t t;
723   int success;
724
725   t = cdtime ();
726   success = 0;
727
728   pthread_mutex_lock (&agg_instance_list_lock);
729
730   /* agg_instance_list_head only holds data, after the "write" callback has
731    * been called with a matching value list at least once. So on startup,
732    * there's a race between the aggregations read() and write() callback. If
733    * the read() callback is called first, agg_instance_list_head is NULL and
734    * "success" may be zero. This is expected and should not result in an error.
735    * Therefore we need to handle this case separately. */
736   if (agg_instance_list_head == NULL)
737   {
738     pthread_mutex_unlock (&agg_instance_list_lock);
739     return (0);
740   }
741
742   for (this = agg_instance_list_head; this != NULL; this = this->next)
743   {
744     int status;
745
746     status = agg_instance_read (this, t);
747     if (status != 0)
748       WARNING ("aggregation plugin: Reading an aggregation instance "
749           "failed with status %i.", status);
750     else
751       success++;
752   }
753
754   pthread_mutex_unlock (&agg_instance_list_lock);
755
756   return ((success > 0) ? 0 : -1);
757 } /* }}} int agg_read */
758
759 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
760     __attribute__((unused)) user_data_t *user_data)
761 {
762   _Bool created_by_aggregation = 0;
763   int status;
764
765   /* Ignore values that were created by the aggregation plugin to avoid weird
766    * effects. */
767   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
768       &created_by_aggregation);
769   if (created_by_aggregation)
770     return (0);
771
772   if (lookup == NULL)
773     status = ENOENT;
774   else
775   {
776     status = lookup_search (lookup, ds, vl);
777     if (status > 0)
778       status = 0;
779   }
780
781   return (status);
782 } /* }}} int agg_write */
783
784 void module_register (void)
785 {
786   plugin_register_complex_config ("aggregation", agg_config);
787   plugin_register_read ("aggregation", agg_read);
788   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
789 }
790
791 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */