From: Florian Forster Date: Mon, 12 Nov 2012 07:28:16 +0000 (+0100) Subject: Merge branch 'collectd-5.1' X-Git-Tag: collectd-5.2.0~21 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=e2bdca73d28ff16ab23af1d8eec984badb1a2513;hp=3066976a238c67c0f6ac874d3d08394a4e2a247f;p=collectd.git Merge branch 'collectd-5.1' --- diff --git a/AUTHORS b/AUTHORS index 90560e34..78dbad14 100644 --- a/AUTHORS +++ b/AUTHORS @@ -200,6 +200,9 @@ Sven Trenkel - netapp plugin. - python plugin. +Thomas Meson + - Graphite support for the AMQP plugin. + Tomasz Pala - conntrack plugin. diff --git a/configure.in b/configure.in index eff8d611..4a159767 100644 --- a/configure.in +++ b/configure.in @@ -4636,11 +4636,13 @@ fi if test "x$with_perfstat" = "xyes" then plugin_cpu="yes" + plugin_contextswitch="yes" plugin_disk="yes" plugin_memory="yes" plugin_swap="yes" plugin_interface="yes" plugin_load="yes" + plugin_uptime="yes" fi if test "x$with_procinfo" = "xyes" @@ -4850,6 +4852,7 @@ AC_ARG_ENABLE([all-plugins], m4_divert_once([HELP_ENABLE], []) +AC_PLUGIN([aggregation], [yes], [Aggregation plugin]) AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) @@ -5182,6 +5185,7 @@ Configuration: perl . . . . . . . . $with_perl_bindings Modules: + aggregation . . . . . $enable_aggregation amqp . . . . . . . $enable_amqp apache . . . . . . . $enable_apache apcups . . . . . . . $enable_apcups diff --git a/contrib/README b/contrib/README index bc1fe9ff..1ebf1f14 100644 --- a/contrib/README +++ b/contrib/README @@ -101,3 +101,8 @@ solaris-smf ----------- Manifest file for the Solaris SMF system and detailed information on how to register collectd as a service with this system. + +collectd.service +---------------- + Service file for systemd. Please ship this file as + /lib/systemd/system/collectd.service in any linux package of collectd. diff --git a/contrib/collectd.service b/contrib/collectd.service new file mode 100644 index 00000000..ee4d596d --- /dev/null +++ b/contrib/collectd.service @@ -0,0 +1,15 @@ +[Unit] +Description=statistics collection daemon +Documentation=man:collectd(1) +After=local-fs.target network.target +Requires=local-fs.target network.target + +[Service] +ExecStart=/usr/sbin/collectd -C /etc/collectd/collectd.conf -f +Restart=always +RestartSec=10 +StandardOutput=syslog +StandardError=syslog + +[Install] +WantedBy=multi-user.target diff --git a/src/Makefile.am b/src/Makefile.am index 1891f7a9..58ab17e5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -119,10 +119,21 @@ pkglib_LTLIBRARIES = BUILT_SOURCES = CLEANFILES = +if BUILD_PLUGIN_AGGREGATION +pkglib_LTLIBRARIES += aggregation.la +aggregation_la_SOURCES = aggregation.c \ + utils_vl_lookup.c utils_vl_lookup.h +aggregation_la_LDFLAGS = -module -avoid-version +aggregation_la_LIBADD = +collectd_LDADD += "-dlopen" aggregation.la +collectd_DEPENDENCIES += aggregation.la +endif + if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ utils_cmd_putval.c utils_cmd_putval.h \ + utils_format_graphite.c utils_format_graphite.h \ utils_format_json.c utils_format_json.h amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) @@ -212,6 +223,10 @@ if BUILD_PLUGIN_CONTEXTSWITCH pkglib_LTLIBRARIES += contextswitch.la contextswitch_la_SOURCES = contextswitch.c contextswitch_la_LDFLAGS = -module -avoid-version +contextswitch_la_LIBADD = +if BUILD_WITH_PERFSTAT +contextswitch_la_LIBADD += -lperfstat +endif collectd_LDADD += "-dlopen" contextswitch.la collectd_DEPENDENCIES += contextswitch.la endif @@ -1191,6 +1206,9 @@ uptime_la_LIBADD = if BUILD_WITH_LIBKSTAT uptime_la_LIBADD += -lkstat endif +if BUILD_WITH_PERFSTAT +uptime_la_LIBADD += -lperfstat +endif collectd_LDADD += "-dlopen" uptime.la collectd_DEPENDENCIES += uptime.la endif @@ -1256,7 +1274,8 @@ endif if BUILD_PLUGIN_WRITE_GRAPHITE pkglib_LTLIBRARIES += write_graphite.la write_graphite_la_SOURCES = write_graphite.c \ - utils_format_json.c utils_format_json.h + utils_format_graphite.c utils_format_graphite.h \ + utils_format_json.c utils_format_json.h write_graphite_la_LDFLAGS = -module -avoid-version collectd_LDADD += "-dlopen" write_graphite.la collectd_DEPENDENCIES += write_graphite.la @@ -1390,3 +1409,16 @@ uninstall-hook: rm -f $(DESTDIR)$(pkgdatadir)/types.db; rm -f $(DESTDIR)$(sysconfdir)/collectd.conf rm -f $(DESTDIR)$(pkgdatadir)/postgresql_default.conf; + +if BUILD_FEATURE_DEBUG +bin_PROGRAMS += utils_vl_lookup_test +utils_vl_lookup_test_SOURCES = utils_vl_lookup_test.c \ + utils_vl_lookup.h utils_vl_lookup.c \ + utils_avltree.c utils_avltree.h \ + common.h + +utils_vl_lookup_test_CPPFLAGS = $(AM_CPPFLAGS) $(LTDLINCL) -DBUILD_TEST=1 +utils_vl_lookup_test_CFLAGS = $(AM_CFLAGS) +utils_vl_lookup_test_LDFLAGS = -export-dynamic +utils_vl_lookup_test_LDADD = +endif diff --git a/src/aggregation.c b/src/aggregation.c new file mode 100644 index 00000000..db33c177 --- /dev/null +++ b/src/aggregation.c @@ -0,0 +1,684 @@ +/** + * collectd - src/aggregation.c + * Copyright (C) 2012 Florian Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Florian Forster + **/ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "configfile.h" +#include "meta_data.h" +#include "utils_cache.h" /* for uc_get_rate() */ +#include "utils_vl_lookup.h" + +#include + +struct aggregation_s /* {{{ */ +{ + identifier_t ident; + + _Bool calc_num; + _Bool calc_sum; + _Bool calc_average; + _Bool calc_min; + _Bool calc_max; + _Bool calc_stddev; +}; /* }}} */ +typedef struct aggregation_s aggregation_t; + +struct agg_instance_s; +typedef struct agg_instance_s agg_instance_t; +struct agg_instance_s /* {{{ */ +{ + pthread_mutex_t lock; + identifier_t ident; + + int ds_type; + + derive_t num; + gauge_t sum; + gauge_t squares_sum; + + gauge_t min; + gauge_t max; + + rate_to_value_state_t *state_num; + rate_to_value_state_t *state_sum; + rate_to_value_state_t *state_average; + rate_to_value_state_t *state_min; + rate_to_value_state_t *state_max; + rate_to_value_state_t *state_stddev; + + agg_instance_t *next; +}; /* }}} */ + +static lookup_t *lookup = NULL; + +static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER; +static agg_instance_t *agg_instance_list_head = NULL; + +static void agg_destroy (aggregation_t *agg) /* {{{ */ +{ + sfree (agg); +} /* }}} void agg_destroy */ + +/* Frees all dynamically allocated memory within the instance. */ +static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */ +{ + if (inst == NULL) + return; + + /* Remove this instance from the global list of instances. */ + pthread_mutex_lock (&agg_instance_list_lock); + if (agg_instance_list_head == inst) + agg_instance_list_head = inst->next; + else if (agg_instance_list_head != NULL) + { + agg_instance_t *prev = agg_instance_list_head; + while ((prev != NULL) && (prev->next != inst)) + prev = prev->next; + if (prev != NULL) + prev->next = inst->next; + } + pthread_mutex_unlock (&agg_instance_list_lock); + + sfree (inst->state_num); + sfree (inst->state_sum); + sfree (inst->state_average); + sfree (inst->state_min); + sfree (inst->state_max); + sfree (inst->state_stddev); + + memset (inst, 0, sizeof (*inst)); + inst->ds_type = -1; + inst->min = NAN; + inst->max = NAN; +} /* }}} void agg_instance_destroy */ + +/* Create a new aggregation instance. */ +static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */ + value_list_t const *vl, aggregation_t *agg) +{ + agg_instance_t *inst; + + DEBUG ("aggregation plugin: Creating new instance."); + + inst = malloc (sizeof (*inst)); + if (inst == NULL) + { + ERROR ("aggregation plugin: malloc() failed."); + return (NULL); + } + memset (inst, 0, sizeof (*inst)); + pthread_mutex_init (&inst->lock, /* attr = */ NULL); + + inst->ds_type = ds->ds[0].type; + +#define COPY_FIELD(fld) do { \ + sstrncpy (inst->ident.fld, \ + LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \ + sizeof (inst->ident.fld)); \ +} while (0) + + COPY_FIELD (host); + COPY_FIELD (plugin); + COPY_FIELD (plugin_instance); + COPY_FIELD (type); + COPY_FIELD (type_instance); + +#undef COPY_FIELD + + inst->min = NAN; + inst->max = NAN; + +#define INIT_STATE(field) do { \ + inst->state_ ## field = NULL; \ + if (agg->calc_ ## field) { \ + inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \ + if (inst->state_ ## field == NULL) { \ + agg_instance_destroy (inst); \ + ERROR ("aggregation plugin: malloc() failed."); \ + return (NULL); \ + } \ + memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \ + } \ +} while (0) + + INIT_STATE (num); + INIT_STATE (sum); + INIT_STATE (average); + INIT_STATE (min); + INIT_STATE (max); + INIT_STATE (stddev); + +#undef INIT_STATE + + pthread_mutex_lock (&agg_instance_list_lock); + inst->next = agg_instance_list_head; + agg_instance_list_head = inst; + pthread_mutex_unlock (&agg_instance_list_lock); + + return (inst); +} /* }}} agg_instance_t *agg_instance_create */ + +/* Update the num, sum, min, max, ... fields of the aggregation instance, if + * the rate of the value list is available. Value lists with more than one data + * source are not supported and will return an error. Returns zero on success + * and non-zero otherwise. */ +static int agg_instance_update (agg_instance_t *inst, /* {{{ */ + data_set_t const *ds, value_list_t const *vl) +{ + gauge_t *rate; + + if (ds->ds_num != 1) + { + ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one " + "data source. This is currently not supported by this plugin. " + "Sorry.", ds->type); + return (EINVAL); + } + + rate = uc_get_rate (ds, vl); + if (rate == NULL) + { + char ident[6 * DATA_MAX_NAME_LEN]; + FORMAT_VL (ident, sizeof (ident), vl); + ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".", + ident); + return (ENOENT); + } + + if (isnan (rate[0])) + { + sfree (rate); + return (0); + } + + pthread_mutex_lock (&inst->lock); + + inst->num++; + inst->sum += rate[0]; + inst->squares_sum += (rate[0] * rate[0]); + + if (isnan (inst->min) || (inst->min > rate[0])) + inst->min = rate[0]; + if (isnan (inst->max) || (inst->max < rate[0])) + inst->max = rate[0]; + + pthread_mutex_unlock (&inst->lock); + + sfree (rate); + return (0); +} /* }}} int agg_instance_update */ + +static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */ + char const *func, gauge_t rate, rate_to_value_state_t *state, + value_list_t *vl, char const *pi_prefix, cdtime_t t) +{ + value_t v; + int status; + + if (pi_prefix[0] != 0) + ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s", + pi_prefix, func); + else + sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance)); + + memset (&v, 0, sizeof (v)); + status = rate_to_value (&v, rate, state, inst->ds_type, t); + if (status != 0) + { + /* If this is the first iteration and rate_to_value() was asked to return a + * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle + * gracefully. */ + if (status == EAGAIN) + return (0); + + WARNING ("aggregation plugin: rate_to_value failed with status %i.", + status); + return (-1); + } + + vl->values = &v; + vl->values_len = 1; + + plugin_dispatch_values_secure (vl); + + vl->values = NULL; + vl->values_len = 0; + + return (0); +} /* }}} int agg_instance_read_func */ + +static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */ +{ + value_list_t vl = VALUE_LIST_INIT; + char pi_prefix[DATA_MAX_NAME_LEN]; + + /* Pre-set all the fields in the value list that will not change per + * aggregation type (sum, average, ...). The struct will be re-used and must + * therefore be dispatched using the "secure" function. */ + + vl.time = t; + vl.interval = 0; + + vl.meta = meta_data_create (); + if (vl.meta == NULL) + { + ERROR ("aggregation plugin: meta_data_create failed."); + return (-1); + } + meta_data_add_boolean (vl.meta, "aggregation:created", 1); + + if (LU_IS_ALL (inst->ident.host)) + sstrncpy (vl.host, "global", sizeof (vl.host)); + else + sstrncpy (vl.host, inst->ident.host, sizeof (vl.host)); + + sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin)); + + if (LU_IS_ALL (inst->ident.plugin)) + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, "", sizeof (pi_prefix)); + else + sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix)); + } + else + { + if (LU_IS_ALL (inst->ident.plugin_instance)) + sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix)); + else + ssnprintf (pi_prefix, sizeof (pi_prefix), + "%s-%s", inst->ident.plugin, inst->ident.plugin_instance); + } + + sstrncpy (vl.type, inst->ident.type, sizeof (vl.type)); + + if (!LU_IS_ALL (inst->ident.type_instance)) + sstrncpy (vl.type_instance, inst->ident.type_instance, + sizeof (vl.type_instance)); + +#define READ_FUNC(func, rate) do { \ + if (inst->state_ ## func != NULL) { \ + agg_instance_read_func (inst, #func, rate, \ + inst->state_ ## func, &vl, pi_prefix, t); \ + } \ +} while (0) + + pthread_mutex_lock (&inst->lock); + + READ_FUNC (num, (gauge_t) inst->num); + + /* All other aggregations are only defined when there have been any values + * at all. */ + if (inst->num > 0) + { + READ_FUNC (sum, inst->sum); + READ_FUNC (average, (inst->sum / ((gauge_t) inst->num))); + READ_FUNC (min, inst->min); + READ_FUNC (max, inst->max); + READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum) + - (inst->sum * inst->sum)) / ((gauge_t) inst->num)); + } + + /* Reset internal state. */ + inst->num = 0; + inst->sum = 0.0; + inst->squares_sum = 0.0; + inst->min = NAN; + inst->max = NAN; + + pthread_mutex_unlock (&inst->lock); + + meta_data_destroy (vl.meta); + vl.meta = NULL; + + return (0); +} /* }}} int agg_instance_read */ + +/* lookup_class_callback_t for utils_vl_lookup */ +static void *agg_lookup_class_callback ( /* {{{ */ + __attribute__((unused)) data_set_t const *ds, + value_list_t const *vl, void *user_class) +{ + return (agg_instance_create (ds, vl, (aggregation_t *) user_class)); +} /* }}} void *agg_class_callback */ + +/* lookup_obj_callback_t for utils_vl_lookup */ +static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */ + value_list_t const *vl, + __attribute__((unused)) void *user_class, + void *user_obj) +{ + return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl)); +} /* }}} int agg_lookup_obj_callback */ + +/* lookup_free_class_callback_t for utils_vl_lookup */ +static void agg_lookup_free_class_callback (void *user_class) /* {{{ */ +{ + agg_destroy ((aggregation_t *) user_class); +} /* }}} void agg_lookup_free_class_callback */ + +/* lookup_free_obj_callback_t for utils_vl_lookup */ +static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */ +{ + agg_instance_destroy ((agg_instance_t *) user_obj); +} /* }}} void agg_lookup_free_obj_callback */ + +/* + * + * + * Plugin "cpu" + * Type "cpu" + * + * GroupBy Host + * GroupBy TypeInstance + * + * CalculateNum true + * CalculateSum true + * CalculateAverage true + * CalculateMinimum true + * CalculateMaximum true + * CalculateStddev true + * + * + */ +static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */ + aggregation_t *agg) +{ + int i; + + for (i = 0; i < ci->values_num; i++) + { + char const *value; + + if (ci->values[i].type != OCONFIG_TYPE_STRING) + { + ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option " + "is not a string.", i + 1); + continue; + } + + value = ci->values[i].value.string; + + if (strcasecmp ("Host", value) == 0) + sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host)); + else if (strcasecmp ("Plugin", value) == 0) + sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin)); + else if (strcasecmp ("PluginInstance", value) == 0) + sstrncpy (agg->ident.plugin_instance, LU_ANY, + sizeof (agg->ident.plugin_instance)); + else if (strcasecmp ("TypeInstance", value) == 0) + sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance)); + else if (strcasecmp ("Type", value) == 0) + ERROR ("aggregation plugin: Grouping by type is not supported."); + else + WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" " + "option is invalid and will be ignored.", value); + } /* for (ci->values) */ + + return (0); +} /* }}} int agg_config_handle_group_by */ + +static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */ +{ + aggregation_t *agg; + _Bool is_valid; + int status; + int i; + + agg = malloc (sizeof (*agg)); + if (agg == NULL) + { + ERROR ("aggregation plugin: malloc failed."); + return (-1); + } + memset (agg, 0, sizeof (*agg)); + + sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host)); + sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin)); + sstrncpy (agg->ident.plugin_instance, LU_ALL, + sizeof (agg->ident.plugin_instance)); + sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type)); + sstrncpy (agg->ident.type_instance, LU_ALL, + sizeof (agg->ident.type_instance)); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.host, + sizeof (agg->ident.host)); + else if (strcasecmp ("Plugin", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.plugin, + sizeof (agg->ident.plugin)); + else if (strcasecmp ("PluginInstance", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.plugin_instance, + sizeof (agg->ident.plugin_instance)); + else if (strcasecmp ("Type", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.type, + sizeof (agg->ident.type)); + else if (strcasecmp ("TypeInstance", child->key) == 0) + cf_util_get_string_buffer (child, agg->ident.type_instance, + sizeof (agg->ident.type_instance)); + else if (strcasecmp ("GroupBy", child->key) == 0) + agg_config_handle_group_by (child, agg); + else if (strcasecmp ("CalculateNum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_num); + else if (strcasecmp ("CalculateSum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_sum); + else if (strcasecmp ("CalculateAverage", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_average); + else if (strcasecmp ("CalculateMinimum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_min); + else if (strcasecmp ("CalculateMaximum", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_max); + else if (strcasecmp ("CalculateStddev", child->key) == 0) + cf_util_get_boolean (child, &agg->calc_stddev); + else + WARNING ("aggregation plugin: The \"%s\" key is not allowed inside " + " blocks and will be ignored.", child->key); + } + + /* Sanity checking */ + is_valid = 1; + if (LU_IS_ALL (agg->ident.type)) /* {{{ */ + { + ERROR ("aggregation plugin: It appears you did not specify the required " + "\"Type\" option in this aggregation. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } + else if (strchr (agg->ident.type, '/') != NULL) + { + ERROR ("aggregation plugin: The \"Type\" may not contain the '/' " + "character. Especially, it may not be a wildcard. The current " + "value is \"%s\".", agg->ident.type); + is_valid = 0; + } /* }}} */ + + if (!LU_IS_ALL (agg->ident.host) /* {{{ */ + && !LU_IS_ALL (agg->ident.plugin) + && !LU_IS_ALL (agg->ident.plugin_instance) + && !LU_IS_ALL (agg->ident.type_instance)) + { + ERROR ("aggregation plugin: An aggregation must contain at least one " + "wildcard. This is achieved by leaving at least one of the \"Host\", " + "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank " + "and not grouping by that field. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */ + && !agg->calc_min && !agg->calc_max && !agg->calc_stddev) + { + ERROR ("aggregation plugin: No aggregation function has been specified. " + "Without this, I don't know what I should be calculating. " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + is_valid = 0; + } /* }}} */ + + if (!is_valid) /* {{{ */ + { + sfree (agg); + return (-1); + } /* }}} */ + + status = lookup_add (lookup, &agg->ident, agg); + if (status != 0) + { + ERROR ("aggregation plugin: lookup_add failed with status %i.", status); + sfree (agg); + return (-1); + } + + DEBUG ("aggregation plugin: Successfully added aggregation: " + "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", " + "Type \"%s\", TypeInstance \"%s\")", + agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance, + agg->ident.type, agg->ident.type_instance); + return (0); +} /* }}} int agg_config_aggregation */ + +static int agg_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + pthread_mutex_lock (&agg_instance_list_lock); + + if (lookup == NULL) + { + lookup = lookup_create (agg_lookup_class_callback, + agg_lookup_obj_callback, + agg_lookup_free_class_callback, + agg_lookup_free_obj_callback); + if (lookup == NULL) + { + pthread_mutex_unlock (&agg_instance_list_lock); + ERROR ("aggregation plugin: lookup_create failed."); + return (-1); + } + } + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Aggregation", child->key) == 0) + agg_config_aggregation (child); + else + WARNING ("aggregation plugin: The \"%s\" key is not allowed inside " + " blocks and will be ignored.", child->key); + } + + pthread_mutex_unlock (&agg_instance_list_lock); + + return (0); +} /* }}} int agg_config */ + +static int agg_read (void) /* {{{ */ +{ + agg_instance_t *this; + cdtime_t t; + int success; + + t = cdtime (); + success = 0; + + pthread_mutex_lock (&agg_instance_list_lock); + + /* agg_instance_list_head only holds data, after the "write" callback has + * been called with a matching value list at least once. So on startup, + * there's a race between the aggregations read() and write() callback. If + * the read() callback is called first, agg_instance_list_head is NULL and + * "success" may be zero. This is expected and should not result in an error. + * Therefore we need to handle this case separately. */ + if (agg_instance_list_head == NULL) + { + pthread_mutex_unlock (&agg_instance_list_lock); + return (0); + } + + for (this = agg_instance_list_head; this != NULL; this = this->next) + { + int status; + + status = agg_instance_read (this, t); + if (status != 0) + WARNING ("aggregation plugin: Reading an aggregation instance " + "failed with status %i.", status); + else + success++; + } + + pthread_mutex_unlock (&agg_instance_list_lock); + + return ((success > 0) ? 0 : -1); +} /* }}} int agg_read */ + +static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */ + __attribute__((unused)) user_data_t *user_data) +{ + _Bool created_by_aggregation = 0; + int status; + + /* Ignore values that were created by the aggregation plugin to avoid weird + * effects. */ + (void) meta_data_get_boolean (vl->meta, "aggregation:created", + &created_by_aggregation); + if (created_by_aggregation) + return (0); + + if (lookup == NULL) + status = ENOENT; + else + { + status = lookup_search (lookup, ds, vl); + if (status > 0) + status = 0; + } + + return (status); +} /* }}} int agg_write */ + +void module_register (void) +{ + plugin_register_complex_config ("aggregation", agg_config); + plugin_register_read ("aggregation", agg_read); + plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL); +} + +/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */ diff --git a/src/amqp.c b/src/amqp.c index 89284c81..c9e46c45 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -31,6 +31,7 @@ #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" +#include "utils_format_graphite.h" #include @@ -42,8 +43,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 -#define CAMQP_FORMAT_COMMAND 1 -#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_GRAPHITE 3 #define CAMQP_CHANNEL 1 @@ -68,6 +70,10 @@ struct camqp_config_s uint8_t delivery_mode; _Bool store_rates; int format; + /* publish & graphite format only */ + char *prefix; + char *postfix; + char escape_char; /* subscribe only */ char *exchange_type; @@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange_type); sfree (conf->queue); sfree (conf->routing_key); + sfree (conf->prefix); + sfree (conf->postfix); + sfree (conf); } /* }}} void camqp_config_free */ @@ -699,6 +708,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) props.content_type = amqp_cstring_bytes("application/json"); + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); props.delivery_mode = conf->delivery_mode; @@ -777,6 +788,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); } + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + { + status = format_graphite (buffer, sizeof (buffer), ds, vl, + conf->prefix, conf->postfix, conf->escape_char, + conf->store_rates); + if (status != 0) + { + ERROR ("amqp plugin: format_graphite failed with status %i.", + status); + return (status); + } + } else { ERROR ("amqp plugin: Invalid format (%i).", conf->format); @@ -809,6 +832,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ conf->format = CAMQP_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) conf->format = CAMQP_FORMAT_JSON; + else if (strcasecmp ("Graphite", string) == 0) + conf->format = CAMQP_FORMAT_GRAPHITE; else { WARNING ("amqp plugin: Invalid format string: %s", @@ -849,6 +874,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* publish & graphite only */ + conf->prefix = NULL; + conf->postfix = NULL; + conf->escape_char = '_'; /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; @@ -906,6 +935,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_boolean (child, &conf->store_rates); else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->prefix); + else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->postfix); + else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish) + { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + conf->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); diff --git a/src/collectd.conf.in b/src/collectd.conf.in index f3ef6759..9f5dedcd 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -52,6 +52,7 @@ # to missing dependencies or because they have been deactivated explicitly. # ############################################################################## +#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups @@ -161,6 +162,26 @@ # ription of those options is available in the collectd.conf(5) manual page. # ############################################################################## +# +# +# #Host "unspecified" +# Plugin "cpu" +# #PluginInstance "unspecified" +# Type "cpu" +# #TypeInstance "unspecified" +# +# GroupBy "Host" +# GroupBy "TypeInstance" +# +# CalculateNum false +# CalculateSum false +# CalculateAverage true +# CalculateMinimum false +# CalculateMaximum false +# CalculateStddev false +# +# + # # # Host "localhost" @@ -465,8 +486,10 @@ # # -# Host "127.0.0.1" -# Port "11211" +# +# Host "127.0.0.1" +# Port "11211" +# # # @@ -626,6 +649,7 @@ # Host "localhost" # Port 123 # ReverseLookups false +# IncludeUnitID true # # @@ -867,6 +891,7 @@ # # ReportByDevice false +# ReportBytes true # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index c025f949..f6f61c7e 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -193,12 +193,122 @@ C-Section. Which options exist depends on the plugin used. Some plugins require external configuration, too. The C, for example, required C to be configured in the webserver you're going to collect data from. These plugins are listed below as well, even if they don't -require any configuration within collectd's configfile. +require any configuration within collectd's configuration file. A list of all plugins and a short summary for each plugin can be found in the F file shipped with the sourcecode and hopefully binary packets as well. +=head2 Plugin C + +The I makes it possible to aggregate several values into +one using aggregation functions such as I, I, I and I. +This can be put to a wide variety of uses, e.g. average and total CPU +statistics for your entire fleet. + +The grouping is powerful but, as with many powerful tools, may be a bit +difficult to wrap your head around. The grouping will therefore be +demonstrated using an example: The average and sum of the CPU usage across +all CPUs of each host is to be calculated. + +To select all the affected values for our example, set C and +C. The other values are left unspecified, meaning "all values". The +I, I, I, I and I options +work as if they were specified in the C clause of an C