all plugins: remove pthread.h include
[collectd.git] / src / aggregation.c
1 /**
2  * collectd - src/aggregation.c
3  * Copyright (C) 2012       Florian Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian Forster <octo at collectd.org>
25  **/
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "common.h"
31 #include "configfile.h"
32 #include "meta_data.h"
33 #include "utils_cache.h" /* for uc_get_rate() */
34 #include "utils_subst.h"
35 #include "utils_vl_lookup.h"
36
37 #define AGG_MATCHES_ALL(str) (strcmp ("/.*/", str) == 0)
38 #define AGG_FUNC_PLACEHOLDER "%{aggregation}"
39
40 struct aggregation_s /* {{{ */
41 {
42   identifier_t ident;
43   unsigned int group_by;
44
45   unsigned int regex_fields;
46
47   char *set_host;
48   char *set_plugin;
49   char *set_plugin_instance;
50   char *set_type_instance;
51
52   _Bool calc_num;
53   _Bool calc_sum;
54   _Bool calc_average;
55   _Bool calc_min;
56   _Bool calc_max;
57   _Bool calc_stddev;
58 }; /* }}} */
59 typedef struct aggregation_s aggregation_t;
60
61 struct agg_instance_s;
62 typedef struct agg_instance_s agg_instance_t;
63 struct agg_instance_s /* {{{ */
64 {
65   pthread_mutex_t lock;
66   identifier_t ident;
67
68   int ds_type;
69
70   derive_t num;
71   gauge_t sum;
72   gauge_t squares_sum;
73
74   gauge_t min;
75   gauge_t max;
76
77   rate_to_value_state_t *state_num;
78   rate_to_value_state_t *state_sum;
79   rate_to_value_state_t *state_average;
80   rate_to_value_state_t *state_min;
81   rate_to_value_state_t *state_max;
82   rate_to_value_state_t *state_stddev;
83
84   agg_instance_t *next;
85 }; /* }}} */
86
87 static lookup_t *lookup = NULL;
88
89 static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
90 static agg_instance_t *agg_instance_list_head = NULL;
91
92 static _Bool agg_is_regex (char const *str) /* {{{ */
93 {
94   size_t len;
95
96   if (str == NULL)
97     return (0);
98
99   len = strlen (str);
100   if (len < 3)
101     return (0);
102
103   if ((str[0] == '/') && (str[len - 1] == '/'))
104     return (1);
105   else
106     return (0);
107 } /* }}} _Bool agg_is_regex */
108
109 static void agg_destroy (aggregation_t *agg) /* {{{ */
110 {
111   sfree (agg);
112 } /* }}} void agg_destroy */
113
114 /* Frees all dynamically allocated memory within the instance. */
115 static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
116 {
117   if (inst == NULL)
118     return;
119
120   /* Remove this instance from the global list of instances. */
121   pthread_mutex_lock (&agg_instance_list_lock);
122   if (agg_instance_list_head == inst)
123     agg_instance_list_head = inst->next;
124   else if (agg_instance_list_head != NULL)
125   {
126     agg_instance_t *prev = agg_instance_list_head;
127     while ((prev != NULL) && (prev->next != inst))
128       prev = prev->next;
129     if (prev != NULL)
130       prev->next = inst->next;
131   }
132   pthread_mutex_unlock (&agg_instance_list_lock);
133
134   sfree (inst->state_num);
135   sfree (inst->state_sum);
136   sfree (inst->state_average);
137   sfree (inst->state_min);
138   sfree (inst->state_max);
139   sfree (inst->state_stddev);
140
141   memset (inst, 0, sizeof (*inst));
142   inst->ds_type = -1;
143   inst->min = NAN;
144   inst->max = NAN;
145 } /* }}} void agg_instance_destroy */
146
147 static int agg_instance_create_name (agg_instance_t *inst, /* {{{ */
148     value_list_t const *vl, aggregation_t const *agg)
149 {
150 #define COPY_FIELD(buffer, buffer_size, field, group_mask, all_value) do { \
151   if (agg->set_ ## field != NULL) \
152     sstrncpy (buffer, agg->set_ ## field, buffer_size); \
153   else if ((agg->regex_fields & group_mask) \
154       && (agg->group_by & group_mask)) \
155     sstrncpy (buffer, vl->field, buffer_size); \
156   else if ((agg->regex_fields & group_mask) \
157       && (AGG_MATCHES_ALL (agg->ident.field))) \
158     sstrncpy (buffer, all_value, buffer_size); \
159   else \
160     sstrncpy (buffer, agg->ident.field, buffer_size); \
161 } while (0)
162
163   /* Host */
164   COPY_FIELD (inst->ident.host, sizeof (inst->ident.host),
165       host, LU_GROUP_BY_HOST, "global");
166
167   /* Plugin */
168   if (agg->set_plugin != NULL)
169     sstrncpy (inst->ident.plugin, agg->set_plugin,
170         sizeof (inst->ident.plugin));
171   else
172     sstrncpy (inst->ident.plugin, "aggregation", sizeof (inst->ident.plugin));
173
174   /* Plugin instance */
175   if (agg->set_plugin_instance != NULL)
176     sstrncpy (inst->ident.plugin_instance, agg->set_plugin_instance,
177         sizeof (inst->ident.plugin_instance));
178   else
179   {
180     char tmp_plugin[DATA_MAX_NAME_LEN];
181     char tmp_plugin_instance[DATA_MAX_NAME_LEN] = "";
182
183     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
184         && (agg->group_by & LU_GROUP_BY_PLUGIN))
185       sstrncpy (tmp_plugin, vl->plugin, sizeof (tmp_plugin));
186     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN)
187         && (AGG_MATCHES_ALL (agg->ident.plugin)))
188       sstrncpy (tmp_plugin, "", sizeof (tmp_plugin));
189     else
190       sstrncpy (tmp_plugin, agg->ident.plugin, sizeof (tmp_plugin));
191
192     if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
193         && (agg->group_by & LU_GROUP_BY_PLUGIN_INSTANCE))
194       sstrncpy (tmp_plugin_instance, vl->plugin_instance,
195           sizeof (tmp_plugin_instance));
196     else if ((agg->regex_fields & LU_GROUP_BY_PLUGIN_INSTANCE)
197         && (AGG_MATCHES_ALL (agg->ident.plugin_instance)))
198       sstrncpy (tmp_plugin_instance, "", sizeof (tmp_plugin_instance));
199     else
200       sstrncpy (tmp_plugin_instance, agg->ident.plugin_instance,
201           sizeof (tmp_plugin_instance));
202
203     if ((strcmp ("", tmp_plugin) == 0)
204         && (strcmp ("", tmp_plugin_instance) == 0))
205       sstrncpy (inst->ident.plugin_instance, AGG_FUNC_PLACEHOLDER,
206           sizeof (inst->ident.plugin_instance));
207     else if (strcmp ("", tmp_plugin) != 0)
208       ssnprintf (inst->ident.plugin_instance,
209           sizeof (inst->ident.plugin_instance),
210           "%s-%s", tmp_plugin, AGG_FUNC_PLACEHOLDER);
211     else if (strcmp ("", tmp_plugin_instance) != 0)
212       ssnprintf (inst->ident.plugin_instance,
213           sizeof (inst->ident.plugin_instance),
214           "%s-%s", tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
215     else
216       ssnprintf (inst->ident.plugin_instance,
217           sizeof (inst->ident.plugin_instance),
218           "%s-%s-%s", tmp_plugin, tmp_plugin_instance, AGG_FUNC_PLACEHOLDER);
219   }
220
221   /* Type */
222   sstrncpy (inst->ident.type, agg->ident.type, sizeof (inst->ident.type));
223
224   /* Type instance */
225   COPY_FIELD (inst->ident.type_instance, sizeof (inst->ident.type_instance),
226       type_instance, LU_GROUP_BY_TYPE_INSTANCE, "");
227
228 #undef COPY_FIELD
229
230   return (0);
231 } /* }}} int agg_instance_create_name */
232
233 /* Create a new aggregation instance. */
234 static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
235     value_list_t const *vl, aggregation_t *agg)
236 {
237   agg_instance_t *inst;
238
239   DEBUG ("aggregation plugin: Creating new instance.");
240
241   inst = calloc (1, sizeof (*inst));
242   if (inst == NULL)
243   {
244     ERROR ("aggregation plugin: calloc() failed.");
245     return (NULL);
246   }
247   pthread_mutex_init (&inst->lock, /* attr = */ NULL);
248
249   inst->ds_type = ds->ds[0].type;
250
251   agg_instance_create_name (inst, vl, agg);
252
253   inst->min = NAN;
254   inst->max = NAN;
255
256 #define INIT_STATE(field) do { \
257   inst->state_ ## field = NULL; \
258   if (agg->calc_ ## field) { \
259     inst->state_ ## field = calloc (1, sizeof (*inst->state_ ## field)); \
260     if (inst->state_ ## field == NULL) { \
261       agg_instance_destroy (inst); \
262       free (inst); \
263       ERROR ("aggregation plugin: calloc() failed."); \
264       return (NULL); \
265     } \
266   } \
267 } while (0)
268
269   INIT_STATE (num);
270   INIT_STATE (sum);
271   INIT_STATE (average);
272   INIT_STATE (min);
273   INIT_STATE (max);
274   INIT_STATE (stddev);
275
276 #undef INIT_STATE
277
278   pthread_mutex_lock (&agg_instance_list_lock);
279   inst->next = agg_instance_list_head;
280   agg_instance_list_head = inst;
281   pthread_mutex_unlock (&agg_instance_list_lock);
282
283   return (inst);
284 } /* }}} agg_instance_t *agg_instance_create */
285
286 /* Update the num, sum, min, max, ... fields of the aggregation instance, if
287  * the rate of the value list is available. Value lists with more than one data
288  * source are not supported and will return an error. Returns zero on success
289  * and non-zero otherwise. */
290 static int agg_instance_update (agg_instance_t *inst, /* {{{ */
291     data_set_t const *ds, value_list_t const *vl)
292 {
293   gauge_t *rate;
294
295   if (ds->ds_num != 1)
296   {
297     ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
298         "data source. This is currently not supported by this plugin. "
299         "Sorry.", ds->type);
300     return (EINVAL);
301   }
302
303   rate = uc_get_rate (ds, vl);
304   if (rate == NULL)
305   {
306     char ident[6 * DATA_MAX_NAME_LEN];
307     FORMAT_VL (ident, sizeof (ident), vl);
308     ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
309         ident);
310     return (ENOENT);
311   }
312
313   if (isnan (rate[0]))
314   {
315     sfree (rate);
316     return (0);
317   }
318
319   pthread_mutex_lock (&inst->lock);
320
321   inst->num++;
322   inst->sum += rate[0];
323   inst->squares_sum += (rate[0] * rate[0]);
324
325   if (isnan (inst->min) || (inst->min > rate[0]))
326     inst->min = rate[0];
327   if (isnan (inst->max) || (inst->max < rate[0]))
328     inst->max = rate[0];
329
330   pthread_mutex_unlock (&inst->lock);
331
332   sfree (rate);
333   return (0);
334 } /* }}} int agg_instance_update */
335
336 static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
337   char const *func, gauge_t rate, rate_to_value_state_t *state,
338   value_list_t *vl, char const *pi_prefix, cdtime_t t)
339 {
340   value_t v;
341   int status;
342
343   if (pi_prefix[0] != 0)
344     subst_string (vl->plugin_instance, sizeof (vl->plugin_instance),
345         pi_prefix, AGG_FUNC_PLACEHOLDER, func);
346   else
347     sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
348
349   memset (&v, 0, sizeof (v));
350   status = rate_to_value (&v, rate, state, inst->ds_type, t);
351   if (status != 0)
352   {
353     /* If this is the first iteration and rate_to_value() was asked to return a
354      * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
355      * gracefully. */
356     if (status == EAGAIN)
357       return (0);
358
359     WARNING ("aggregation plugin: rate_to_value failed with status %i.",
360         status);
361     return (-1);
362   }
363
364   vl->values = &v;
365   vl->values_len = 1;
366
367   plugin_dispatch_values (vl);
368
369   vl->values = NULL;
370   vl->values_len = 0;
371
372   return (0);
373 } /* }}} int agg_instance_read_func */
374
375 static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
376 {
377   value_list_t vl = VALUE_LIST_INIT;
378
379   /* Pre-set all the fields in the value list that will not change per
380    * aggregation type (sum, average, ...). The struct will be re-used and must
381    * therefore be dispatched using the "secure" function. */
382
383   vl.time = t;
384   vl.interval = 0;
385
386   vl.meta = meta_data_create ();
387   if (vl.meta == NULL)
388   {
389     ERROR ("aggregation plugin: meta_data_create failed.");
390     return (-1);
391   }
392   meta_data_add_boolean (vl.meta, "aggregation:created", 1);
393
394   sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
395   sstrncpy (vl.plugin, inst->ident.plugin, sizeof (vl.plugin));
396   sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
397   sstrncpy (vl.type_instance, inst->ident.type_instance,
398       sizeof (vl.type_instance));
399
400 #define READ_FUNC(func, rate) do { \
401   if (inst->state_ ## func != NULL) { \
402     agg_instance_read_func (inst, #func, rate, \
403         inst->state_ ## func, &vl, inst->ident.plugin_instance, t); \
404   } \
405 } while (0)
406
407   pthread_mutex_lock (&inst->lock);
408
409   READ_FUNC (num, (gauge_t) inst->num);
410
411   /* All other aggregations are only defined when there have been any values
412    * at all. */
413   if (inst->num > 0)
414   {
415     READ_FUNC (sum, inst->sum);
416     READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
417     READ_FUNC (min, inst->min);
418     READ_FUNC (max, inst->max);
419     READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
420           - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
421   }
422
423   /* Reset internal state. */
424   inst->num = 0;
425   inst->sum = 0.0;
426   inst->squares_sum = 0.0;
427   inst->min = NAN;
428   inst->max = NAN;
429
430   pthread_mutex_unlock (&inst->lock);
431
432   meta_data_destroy (vl.meta);
433   vl.meta = NULL;
434
435   return (0);
436 } /* }}} int agg_instance_read */
437
438 /* lookup_class_callback_t for utils_vl_lookup */
439 static void *agg_lookup_class_callback ( /* {{{ */
440     data_set_t const *ds, value_list_t const *vl, void *user_class)
441 {
442   return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
443 } /* }}} void *agg_class_callback */
444
445 /* lookup_obj_callback_t for utils_vl_lookup */
446 static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
447     value_list_t const *vl,
448     __attribute__((unused)) void *user_class,
449     void *user_obj)
450 {
451   return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
452 } /* }}} int agg_lookup_obj_callback */
453
454 /* lookup_free_class_callback_t for utils_vl_lookup */
455 static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
456 {
457   agg_destroy ((aggregation_t *) user_class);
458 } /* }}} void agg_lookup_free_class_callback */
459
460 /* lookup_free_obj_callback_t for utils_vl_lookup */
461 static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
462 {
463   agg_instance_destroy ((agg_instance_t *) user_obj);
464 } /* }}} void agg_lookup_free_obj_callback */
465
466 /*
467  * <Plugin "aggregation">
468  *   <Aggregation>
469  *     Plugin "cpu"
470  *     Type "cpu"
471  *
472  *     GroupBy Host
473  *     GroupBy TypeInstance
474  *
475  *     CalculateNum true
476  *     CalculateSum true
477  *     CalculateAverage true
478  *     CalculateMinimum true
479  *     CalculateMaximum true
480  *     CalculateStddev true
481  *   </Aggregation>
482  * </Plugin>
483  */
484 static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
485     aggregation_t *agg)
486 {
487   int i;
488
489   for (i = 0; i < ci->values_num; i++)
490   {
491     char const *value;
492
493     if (ci->values[i].type != OCONFIG_TYPE_STRING)
494     {
495       ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
496           "is not a string.", i + 1);
497       continue;
498     }
499
500     value = ci->values[i].value.string;
501
502     if (strcasecmp ("Host", value) == 0)
503       agg->group_by |= LU_GROUP_BY_HOST;
504     else if (strcasecmp ("Plugin", value) == 0)
505       agg->group_by |= LU_GROUP_BY_PLUGIN;
506     else if (strcasecmp ("PluginInstance", value) == 0)
507       agg->group_by |= LU_GROUP_BY_PLUGIN_INSTANCE;
508     else if (strcasecmp ("TypeInstance", value) == 0)
509       agg->group_by |= LU_GROUP_BY_TYPE_INSTANCE;
510     else if (strcasecmp ("Type", value) == 0)
511       ERROR ("aggregation plugin: Grouping by type is not supported.");
512     else
513       WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
514           "option is invalid and will be ignored.", value);
515   } /* for (ci->values) */
516
517   return (0);
518 } /* }}} int agg_config_handle_group_by */
519
520 static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
521 {
522   aggregation_t *agg;
523   _Bool is_valid;
524   int status;
525   int i;
526
527   agg = calloc (1, sizeof (*agg));
528   if (agg == NULL)
529   {
530     ERROR ("aggregation plugin: calloc failed.");
531     return (-1);
532   }
533
534   sstrncpy (agg->ident.host, "/.*/", sizeof (agg->ident.host));
535   sstrncpy (agg->ident.plugin, "/.*/", sizeof (agg->ident.plugin));
536   sstrncpy (agg->ident.plugin_instance, "/.*/",
537       sizeof (agg->ident.plugin_instance));
538   sstrncpy (agg->ident.type, "/.*/", sizeof (agg->ident.type));
539   sstrncpy (agg->ident.type_instance, "/.*/",
540       sizeof (agg->ident.type_instance));
541
542   for (i = 0; i < ci->children_num; i++)
543   {
544     oconfig_item_t *child = ci->children + i;
545
546     if (strcasecmp ("Host", child->key) == 0)
547       cf_util_get_string_buffer (child, agg->ident.host,
548           sizeof (agg->ident.host));
549     else if (strcasecmp ("Plugin", child->key) == 0)
550       cf_util_get_string_buffer (child, agg->ident.plugin,
551           sizeof (agg->ident.plugin));
552     else if (strcasecmp ("PluginInstance", child->key) == 0)
553       cf_util_get_string_buffer (child, agg->ident.plugin_instance,
554           sizeof (agg->ident.plugin_instance));
555     else if (strcasecmp ("Type", child->key) == 0)
556       cf_util_get_string_buffer (child, agg->ident.type,
557           sizeof (agg->ident.type));
558     else if (strcasecmp ("TypeInstance", child->key) == 0)
559       cf_util_get_string_buffer (child, agg->ident.type_instance,
560           sizeof (agg->ident.type_instance));
561     else if (strcasecmp ("SetHost", child->key) == 0)
562       cf_util_get_string (child, &agg->set_host);
563     else if (strcasecmp ("SetPlugin", child->key) == 0)
564       cf_util_get_string (child, &agg->set_plugin);
565     else if (strcasecmp ("SetPluginInstance", child->key) == 0)
566       cf_util_get_string (child, &agg->set_plugin_instance);
567     else if (strcasecmp ("SetTypeInstance", child->key) == 0)
568       cf_util_get_string (child, &agg->set_type_instance);
569     else if (strcasecmp ("GroupBy", child->key) == 0)
570       agg_config_handle_group_by (child, agg);
571     else if (strcasecmp ("CalculateNum", child->key) == 0)
572       cf_util_get_boolean (child, &agg->calc_num);
573     else if (strcasecmp ("CalculateSum", child->key) == 0)
574       cf_util_get_boolean (child, &agg->calc_sum);
575     else if (strcasecmp ("CalculateAverage", child->key) == 0)
576       cf_util_get_boolean (child, &agg->calc_average);
577     else if (strcasecmp ("CalculateMinimum", child->key) == 0)
578       cf_util_get_boolean (child, &agg->calc_min);
579     else if (strcasecmp ("CalculateMaximum", child->key) == 0)
580       cf_util_get_boolean (child, &agg->calc_max);
581     else if (strcasecmp ("CalculateStddev", child->key) == 0)
582       cf_util_get_boolean (child, &agg->calc_stddev);
583     else
584       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
585           "<Aggregation /> blocks and will be ignored.", child->key);
586   }
587
588   if (agg_is_regex (agg->ident.host))
589     agg->regex_fields |= LU_GROUP_BY_HOST;
590   if (agg_is_regex (agg->ident.plugin))
591     agg->regex_fields |= LU_GROUP_BY_PLUGIN;
592   if (agg_is_regex (agg->ident.plugin_instance))
593     agg->regex_fields |= LU_GROUP_BY_PLUGIN_INSTANCE;
594   if (agg_is_regex (agg->ident.type_instance))
595     agg->regex_fields |= LU_GROUP_BY_TYPE_INSTANCE;
596
597   /* Sanity checking */
598   is_valid = 1;
599   if (strcmp ("/.*/", agg->ident.type) == 0) /* {{{ */
600   {
601     ERROR ("aggregation plugin: It appears you did not specify the required "
602         "\"Type\" option in this aggregation. "
603         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
604         "Type \"%s\", TypeInstance \"%s\")",
605         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
606         agg->ident.type, agg->ident.type_instance);
607     is_valid = 0;
608   }
609   else if (strchr (agg->ident.type, '/') != NULL)
610   {
611     ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
612         "character. Especially, it may not be a regex. The current "
613         "value is \"%s\".", agg->ident.type);
614     is_valid = 0;
615   } /* }}} */
616
617   /* Check that there is at least one regex field without a grouping. {{{ */
618   if ((agg->regex_fields & ~agg->group_by) == 0)
619   {
620     ERROR ("aggregation plugin: An aggregation must contain at least one "
621         "wildcard. This is achieved by leaving at least one of the \"Host\", "
622         "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
623         "or using a regular expression and not grouping by that field. "
624         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
625         "Type \"%s\", TypeInstance \"%s\")",
626         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
627         agg->ident.type, agg->ident.type_instance);
628     is_valid = 0;
629   } /* }}} */
630
631   /* Check that all grouping fields are regular expressions. {{{ */
632   if (agg->group_by & ~agg->regex_fields)
633   {
634     ERROR ("aggregation plugin: Only wildcard fields (fields for which a "
635         "regular expression is configured or which are left blank) can be "
636         "specified in the \"GroupBy\" option. "
637         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
638         "Type \"%s\", TypeInstance \"%s\")",
639         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
640         agg->ident.type, agg->ident.type_instance);
641     is_valid = 0;
642   } /* }}} */
643
644   if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
645       && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
646   {
647     ERROR ("aggregation plugin: No aggregation function has been specified. "
648         "Without this, I don't know what I should be calculating. "
649         "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
650         "Type \"%s\", TypeInstance \"%s\")",
651         agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
652         agg->ident.type, agg->ident.type_instance);
653     is_valid = 0;
654   } /* }}} */
655
656   if (!is_valid) /* {{{ */
657   {
658     sfree (agg);
659     return (-1);
660   } /* }}} */
661
662   status = lookup_add (lookup, &agg->ident, agg->group_by, agg);
663   if (status != 0)
664   {
665     ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
666     sfree (agg);
667     return (-1);
668   }
669
670   DEBUG ("aggregation plugin: Successfully added aggregation: "
671       "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
672       "Type \"%s\", TypeInstance \"%s\")",
673       agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
674       agg->ident.type, agg->ident.type_instance);
675   return (0);
676 } /* }}} int agg_config_aggregation */
677
678 static int agg_config (oconfig_item_t *ci) /* {{{ */
679 {
680   int i;
681
682   pthread_mutex_lock (&agg_instance_list_lock);
683
684   if (lookup == NULL)
685   {
686     lookup = lookup_create (agg_lookup_class_callback,
687         agg_lookup_obj_callback,
688         agg_lookup_free_class_callback,
689         agg_lookup_free_obj_callback);
690     if (lookup == NULL)
691     {
692       pthread_mutex_unlock (&agg_instance_list_lock);
693       ERROR ("aggregation plugin: lookup_create failed.");
694       return (-1);
695     }
696   }
697
698   for (i = 0; i < ci->children_num; i++)
699   {
700     oconfig_item_t *child = ci->children + i;
701
702     if (strcasecmp ("Aggregation", child->key) == 0)
703       agg_config_aggregation (child);
704     else
705       WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
706           "<Plugin aggregation /> blocks and will be ignored.", child->key);
707   }
708
709   pthread_mutex_unlock (&agg_instance_list_lock);
710
711   return (0);
712 } /* }}} int agg_config */
713
714 static int agg_read (void) /* {{{ */
715 {
716   agg_instance_t *this;
717   cdtime_t t;
718   int success;
719
720   t = cdtime ();
721   success = 0;
722
723   pthread_mutex_lock (&agg_instance_list_lock);
724
725   /* agg_instance_list_head only holds data, after the "write" callback has
726    * been called with a matching value list at least once. So on startup,
727    * there's a race between the aggregations read() and write() callback. If
728    * the read() callback is called first, agg_instance_list_head is NULL and
729    * "success" may be zero. This is expected and should not result in an error.
730    * Therefore we need to handle this case separately. */
731   if (agg_instance_list_head == NULL)
732   {
733     pthread_mutex_unlock (&agg_instance_list_lock);
734     return (0);
735   }
736
737   for (this = agg_instance_list_head; this != NULL; this = this->next)
738   {
739     int status;
740
741     status = agg_instance_read (this, t);
742     if (status != 0)
743       WARNING ("aggregation plugin: Reading an aggregation instance "
744           "failed with status %i.", status);
745     else
746       success++;
747   }
748
749   pthread_mutex_unlock (&agg_instance_list_lock);
750
751   return ((success > 0) ? 0 : -1);
752 } /* }}} int agg_read */
753
754 static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
755     __attribute__((unused)) user_data_t *user_data)
756 {
757   _Bool created_by_aggregation = 0;
758   int status;
759
760   /* Ignore values that were created by the aggregation plugin to avoid weird
761    * effects. */
762   (void) meta_data_get_boolean (vl->meta, "aggregation:created",
763       &created_by_aggregation);
764   if (created_by_aggregation)
765     return (0);
766
767   if (lookup == NULL)
768     status = ENOENT;
769   else
770   {
771     status = lookup_search (lookup, ds, vl);
772     if (status > 0)
773       status = 0;
774   }
775
776   return (status);
777 } /* }}} int agg_write */
778
779 void module_register (void)
780 {
781   plugin_register_complex_config ("aggregation", agg_config);
782   plugin_register_read ("aggregation", agg_read);
783   plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
784 }
785
786 /* vim: set sw=2 sts=2 tw=78 et fdm=marker : */