- netapp plugin.
- python plugin.
+Thomas Meson <zllak at hycik.org>
+ - Graphite support for the AMQP plugin.
+
Tomasz Pala <gotar at pld-linux.org>
- conntrack plugin.
if test "x$with_perfstat" = "xyes"
then
plugin_cpu="yes"
+ plugin_contextswitch="yes"
plugin_disk="yes"
plugin_memory="yes"
plugin_swap="yes"
plugin_interface="yes"
plugin_load="yes"
+ plugin_uptime="yes"
fi
if test "x$with_procinfo" = "xyes"
m4_divert_once([HELP_ENABLE], [])
+AC_PLUGIN([aggregation], [yes], [Aggregation plugin])
AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin])
AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics])
AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC])
perl . . . . . . . . $with_perl_bindings
Modules:
+ aggregation . . . . . $enable_aggregation
amqp . . . . . . . $enable_amqp
apache . . . . . . . $enable_apache
apcups . . . . . . . $enable_apcups
-----------
Manifest file for the Solaris SMF system and detailed information on how to
register collectd as a service with this system.
+
+collectd.service
+----------------
+ Service file for systemd. Please ship this file as
+ /lib/systemd/system/collectd.service in any linux package of collectd.
--- /dev/null
+[Unit]
+Description=statistics collection daemon
+Documentation=man:collectd(1)
+After=local-fs.target network.target
+Requires=local-fs.target network.target
+
+[Service]
+ExecStart=/usr/sbin/collectd -C /etc/collectd/collectd.conf -f
+Restart=always
+RestartSec=10
+StandardOutput=syslog
+StandardError=syslog
+
+[Install]
+WantedBy=multi-user.target
BUILT_SOURCES =
CLEANFILES =
+if BUILD_PLUGIN_AGGREGATION
+pkglib_LTLIBRARIES += aggregation.la
+aggregation_la_SOURCES = aggregation.c \
+ utils_vl_lookup.c utils_vl_lookup.h
+aggregation_la_LDFLAGS = -module -avoid-version
+aggregation_la_LIBADD =
+collectd_LDADD += "-dlopen" aggregation.la
+collectd_DEPENDENCIES += aggregation.la
+endif
+
if BUILD_PLUGIN_AMQP
pkglib_LTLIBRARIES += amqp.la
amqp_la_SOURCES = amqp.c \
utils_cmd_putval.c utils_cmd_putval.h \
+ utils_format_graphite.c utils_format_graphite.h \
utils_format_json.c utils_format_json.h
amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
pkglib_LTLIBRARIES += contextswitch.la
contextswitch_la_SOURCES = contextswitch.c
contextswitch_la_LDFLAGS = -module -avoid-version
+contextswitch_la_LIBADD =
+if BUILD_WITH_PERFSTAT
+contextswitch_la_LIBADD += -lperfstat
+endif
collectd_LDADD += "-dlopen" contextswitch.la
collectd_DEPENDENCIES += contextswitch.la
endif
if BUILD_WITH_LIBKSTAT
uptime_la_LIBADD += -lkstat
endif
+if BUILD_WITH_PERFSTAT
+uptime_la_LIBADD += -lperfstat
+endif
collectd_LDADD += "-dlopen" uptime.la
collectd_DEPENDENCIES += uptime.la
endif
if BUILD_PLUGIN_WRITE_GRAPHITE
pkglib_LTLIBRARIES += write_graphite.la
write_graphite_la_SOURCES = write_graphite.c \
- utils_format_json.c utils_format_json.h
+ utils_format_graphite.c utils_format_graphite.h \
+ utils_format_json.c utils_format_json.h
write_graphite_la_LDFLAGS = -module -avoid-version
collectd_LDADD += "-dlopen" write_graphite.la
collectd_DEPENDENCIES += write_graphite.la
rm -f $(DESTDIR)$(pkgdatadir)/types.db;
rm -f $(DESTDIR)$(sysconfdir)/collectd.conf
rm -f $(DESTDIR)$(pkgdatadir)/postgresql_default.conf;
+
+if BUILD_FEATURE_DEBUG
+bin_PROGRAMS += utils_vl_lookup_test
+utils_vl_lookup_test_SOURCES = utils_vl_lookup_test.c \
+ utils_vl_lookup.h utils_vl_lookup.c \
+ utils_avltree.c utils_avltree.h \
+ common.h
+
+utils_vl_lookup_test_CPPFLAGS = $(AM_CPPFLAGS) $(LTDLINCL) -DBUILD_TEST=1
+utils_vl_lookup_test_CFLAGS = $(AM_CFLAGS)
+utils_vl_lookup_test_LDFLAGS = -export-dynamic
+utils_vl_lookup_test_LDADD =
+endif
--- /dev/null
+/**
+ * collectd - src/aggregation.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "meta_data.h"
+#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_vl_lookup.h"
+
+#include <pthread.h>
+
+struct aggregation_s /* {{{ */
+{
+ identifier_t ident;
+
+ _Bool calc_num;
+ _Bool calc_sum;
+ _Bool calc_average;
+ _Bool calc_min;
+ _Bool calc_max;
+ _Bool calc_stddev;
+}; /* }}} */
+typedef struct aggregation_s aggregation_t;
+
+struct agg_instance_s;
+typedef struct agg_instance_s agg_instance_t;
+struct agg_instance_s /* {{{ */
+{
+ pthread_mutex_t lock;
+ identifier_t ident;
+
+ int ds_type;
+
+ derive_t num;
+ gauge_t sum;
+ gauge_t squares_sum;
+
+ gauge_t min;
+ gauge_t max;
+
+ rate_to_value_state_t *state_num;
+ rate_to_value_state_t *state_sum;
+ rate_to_value_state_t *state_average;
+ rate_to_value_state_t *state_min;
+ rate_to_value_state_t *state_max;
+ rate_to_value_state_t *state_stddev;
+
+ agg_instance_t *next;
+}; /* }}} */
+
+static lookup_t *lookup = NULL;
+
+static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static agg_instance_t *agg_instance_list_head = NULL;
+
+static void agg_destroy (aggregation_t *agg) /* {{{ */
+{
+ sfree (agg);
+} /* }}} void agg_destroy */
+
+/* Frees all dynamically allocated memory within the instance. */
+static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
+{
+ if (inst == NULL)
+ return;
+
+ /* Remove this instance from the global list of instances. */
+ pthread_mutex_lock (&agg_instance_list_lock);
+ if (agg_instance_list_head == inst)
+ agg_instance_list_head = inst->next;
+ else if (agg_instance_list_head != NULL)
+ {
+ agg_instance_t *prev = agg_instance_list_head;
+ while ((prev != NULL) && (prev->next != inst))
+ prev = prev->next;
+ if (prev != NULL)
+ prev->next = inst->next;
+ }
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ sfree (inst->state_num);
+ sfree (inst->state_sum);
+ sfree (inst->state_average);
+ sfree (inst->state_min);
+ sfree (inst->state_max);
+ sfree (inst->state_stddev);
+
+ memset (inst, 0, sizeof (*inst));
+ inst->ds_type = -1;
+ inst->min = NAN;
+ inst->max = NAN;
+} /* }}} void agg_instance_destroy */
+
+/* Create a new aggregation instance. */
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+ value_list_t const *vl, aggregation_t *agg)
+{
+ agg_instance_t *inst;
+
+ DEBUG ("aggregation plugin: Creating new instance.");
+
+ inst = malloc (sizeof (*inst));
+ if (inst == NULL)
+ {
+ ERROR ("aggregation plugin: malloc() failed.");
+ return (NULL);
+ }
+ memset (inst, 0, sizeof (*inst));
+ pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+ inst->ds_type = ds->ds[0].type;
+
+#define COPY_FIELD(fld) do { \
+ sstrncpy (inst->ident.fld, \
+ LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
+ sizeof (inst->ident.fld)); \
+} while (0)
+
+ COPY_FIELD (host);
+ COPY_FIELD (plugin);
+ COPY_FIELD (plugin_instance);
+ COPY_FIELD (type);
+ COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+ inst->min = NAN;
+ inst->max = NAN;
+
+#define INIT_STATE(field) do { \
+ inst->state_ ## field = NULL; \
+ if (agg->calc_ ## field) { \
+ inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
+ if (inst->state_ ## field == NULL) { \
+ agg_instance_destroy (inst); \
+ ERROR ("aggregation plugin: malloc() failed."); \
+ return (NULL); \
+ } \
+ memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
+ } \
+} while (0)
+
+ INIT_STATE (num);
+ INIT_STATE (sum);
+ INIT_STATE (average);
+ INIT_STATE (min);
+ INIT_STATE (max);
+ INIT_STATE (stddev);
+
+#undef INIT_STATE
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+ inst->next = agg_instance_list_head;
+ agg_instance_list_head = inst;
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return (inst);
+} /* }}} agg_instance_t *agg_instance_create */
+
+/* Update the num, sum, min, max, ... fields of the aggregation instance, if
+ * the rate of the value list is available. Value lists with more than one data
+ * source are not supported and will return an error. Returns zero on success
+ * and non-zero otherwise. */
+static int agg_instance_update (agg_instance_t *inst, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl)
+{
+ gauge_t *rate;
+
+ if (ds->ds_num != 1)
+ {
+ ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
+ "data source. This is currently not supported by this plugin. "
+ "Sorry.", ds->type);
+ return (EINVAL);
+ }
+
+ rate = uc_get_rate (ds, vl);
+ if (rate == NULL)
+ {
+ char ident[6 * DATA_MAX_NAME_LEN];
+ FORMAT_VL (ident, sizeof (ident), vl);
+ ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
+ ident);
+ return (ENOENT);
+ }
+
+ if (isnan (rate[0]))
+ {
+ sfree (rate);
+ return (0);
+ }
+
+ pthread_mutex_lock (&inst->lock);
+
+ inst->num++;
+ inst->sum += rate[0];
+ inst->squares_sum += (rate[0] * rate[0]);
+
+ if (isnan (inst->min) || (inst->min > rate[0]))
+ inst->min = rate[0];
+ if (isnan (inst->max) || (inst->max < rate[0]))
+ inst->max = rate[0];
+
+ pthread_mutex_unlock (&inst->lock);
+
+ sfree (rate);
+ return (0);
+} /* }}} int agg_instance_update */
+
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+ char const *func, gauge_t rate, rate_to_value_state_t *state,
+ value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+ value_t v;
+ int status;
+
+ if (pi_prefix[0] != 0)
+ ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+ pi_prefix, func);
+ else
+ sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+ memset (&v, 0, sizeof (v));
+ status = rate_to_value (&v, rate, state, inst->ds_type, t);
+ if (status != 0)
+ {
+ /* If this is the first iteration and rate_to_value() was asked to return a
+ * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+ * gracefully. */
+ if (status == EAGAIN)
+ return (0);
+
+ WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+ status);
+ return (-1);
+ }
+
+ vl->values = &v;
+ vl->values_len = 1;
+
+ plugin_dispatch_values_secure (vl);
+
+ vl->values = NULL;
+ vl->values_len = 0;
+
+ return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ char pi_prefix[DATA_MAX_NAME_LEN];
+
+ /* Pre-set all the fields in the value list that will not change per
+ * aggregation type (sum, average, ...). The struct will be re-used and must
+ * therefore be dispatched using the "secure" function. */
+
+ vl.time = t;
+ vl.interval = 0;
+
+ vl.meta = meta_data_create ();
+ if (vl.meta == NULL)
+ {
+ ERROR ("aggregation plugin: meta_data_create failed.");
+ return (-1);
+ }
+ meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+ if (LU_IS_ALL (inst->ident.host))
+ sstrncpy (vl.host, "global", sizeof (vl.host));
+ else
+ sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+ sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
+
+ if (LU_IS_ALL (inst->ident.plugin))
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+ else
+ sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+ }
+ else
+ {
+ if (LU_IS_ALL (inst->ident.plugin_instance))
+ sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+ else
+ ssnprintf (pi_prefix, sizeof (pi_prefix),
+ "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+ }
+
+ sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+ if (!LU_IS_ALL (inst->ident.type_instance))
+ sstrncpy (vl.type_instance, inst->ident.type_instance,
+ sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+ if (inst->state_ ## func != NULL) { \
+ agg_instance_read_func (inst, #func, rate, \
+ inst->state_ ## func, &vl, pi_prefix, t); \
+ } \
+} while (0)
+
+ pthread_mutex_lock (&inst->lock);
+
+ READ_FUNC (num, (gauge_t) inst->num);
+
+ /* All other aggregations are only defined when there have been any values
+ * at all. */
+ if (inst->num > 0)
+ {
+ READ_FUNC (sum, inst->sum);
+ READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+ READ_FUNC (min, inst->min);
+ READ_FUNC (max, inst->max);
+ READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+ - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+ }
+
+ /* Reset internal state. */
+ inst->num = 0;
+ inst->sum = 0.0;
+ inst->squares_sum = 0.0;
+ inst->min = NAN;
+ inst->max = NAN;
+
+ pthread_mutex_unlock (&inst->lock);
+
+ meta_data_destroy (vl.meta);
+ vl.meta = NULL;
+
+ return (0);
+} /* }}} int agg_instance_read */
+
+/* lookup_class_callback_t for utils_vl_lookup */
+static void *agg_lookup_class_callback ( /* {{{ */
+ __attribute__((unused)) data_set_t const *ds,
+ value_list_t const *vl, void *user_class)
+{
+ return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
+} /* }}} void *agg_class_callback */
+
+/* lookup_obj_callback_t for utils_vl_lookup */
+static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
+ value_list_t const *vl,
+ __attribute__((unused)) void *user_class,
+ void *user_obj)
+{
+ return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
+} /* }}} int agg_lookup_obj_callback */
+
+/* lookup_free_class_callback_t for utils_vl_lookup */
+static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
+{
+ agg_destroy ((aggregation_t *) user_class);
+} /* }}} void agg_lookup_free_class_callback */
+
+/* lookup_free_obj_callback_t for utils_vl_lookup */
+static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
+{
+ agg_instance_destroy ((agg_instance_t *) user_obj);
+} /* }}} void agg_lookup_free_obj_callback */
+
+/*
+ * <Plugin "aggregation">
+ * <Aggregation>
+ * Plugin "cpu"
+ * Type "cpu"
+ *
+ * GroupBy Host
+ * GroupBy TypeInstance
+ *
+ * CalculateNum true
+ * CalculateSum true
+ * CalculateAverage true
+ * CalculateMinimum true
+ * CalculateMaximum true
+ * CalculateStddev true
+ * </Aggregation>
+ * </Plugin>
+ */
+static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
+ aggregation_t *agg)
+{
+ int i;
+
+ for (i = 0; i < ci->values_num; i++)
+ {
+ char const *value;
+
+ if (ci->values[i].type != OCONFIG_TYPE_STRING)
+ {
+ ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
+ "is not a string.", i + 1);
+ continue;
+ }
+
+ value = ci->values[i].value.string;
+
+ if (strcasecmp ("Host", value) == 0)
+ sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+ else if (strcasecmp ("Plugin", value) == 0)
+ sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+ else if (strcasecmp ("PluginInstance", value) == 0)
+ sstrncpy (agg->ident.plugin_instance, LU_ANY,
+ sizeof (agg->ident.plugin_instance));
+ else if (strcasecmp ("TypeInstance", value) == 0)
+ sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("Type", value) == 0)
+ ERROR ("aggregation plugin: Grouping by type is not supported.");
+ else
+ WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
+ "option is invalid and will be ignored.", value);
+ } /* for (ci->values) */
+
+ return (0);
+} /* }}} int agg_config_handle_group_by */
+
+static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
+{
+ aggregation_t *agg;
+ _Bool is_valid;
+ int status;
+ int i;
+
+ agg = malloc (sizeof (*agg));
+ if (agg == NULL)
+ {
+ ERROR ("aggregation plugin: malloc failed.");
+ return (-1);
+ }
+ memset (agg, 0, sizeof (*agg));
+
+ sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
+ sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
+ sstrncpy (agg->ident.plugin_instance, LU_ALL,
+ sizeof (agg->ident.plugin_instance));
+ sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
+ sstrncpy (agg->ident.type_instance, LU_ALL,
+ sizeof (agg->ident.type_instance));
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Host", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.host,
+ sizeof (agg->ident.host));
+ else if (strcasecmp ("Plugin", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.plugin,
+ sizeof (agg->ident.plugin));
+ else if (strcasecmp ("PluginInstance", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.plugin_instance,
+ sizeof (agg->ident.plugin_instance));
+ else if (strcasecmp ("Type", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.type,
+ sizeof (agg->ident.type));
+ else if (strcasecmp ("TypeInstance", child->key) == 0)
+ cf_util_get_string_buffer (child, agg->ident.type_instance,
+ sizeof (agg->ident.type_instance));
+ else if (strcasecmp ("GroupBy", child->key) == 0)
+ agg_config_handle_group_by (child, agg);
+ else if (strcasecmp ("CalculateNum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_num);
+ else if (strcasecmp ("CalculateSum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_sum);
+ else if (strcasecmp ("CalculateAverage", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_average);
+ else if (strcasecmp ("CalculateMinimum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_min);
+ else if (strcasecmp ("CalculateMaximum", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_max);
+ else if (strcasecmp ("CalculateStddev", child->key) == 0)
+ cf_util_get_boolean (child, &agg->calc_stddev);
+ else
+ WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+ "<Aggregation /> blocks and will be ignored.", child->key);
+ }
+
+ /* Sanity checking */
+ is_valid = 1;
+ if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+ {
+ ERROR ("aggregation plugin: It appears you did not specify the required "
+ "\"Type\" option in this aggregation. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ }
+ else if (strchr (agg->ident.type, '/') != NULL)
+ {
+ ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
+ "character. Especially, it may not be a wildcard. The current "
+ "value is \"%s\".", agg->ident.type);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!LU_IS_ALL (agg->ident.host) /* {{{ */
+ && !LU_IS_ALL (agg->ident.plugin)
+ && !LU_IS_ALL (agg->ident.plugin_instance)
+ && !LU_IS_ALL (agg->ident.type_instance))
+ {
+ ERROR ("aggregation plugin: An aggregation must contain at least one "
+ "wildcard. This is achieved by leaving at least one of the \"Host\", "
+ "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
+ "and not grouping by that field. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
+ && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
+ {
+ ERROR ("aggregation plugin: No aggregation function has been specified. "
+ "Without this, I don't know what I should be calculating. "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ is_valid = 0;
+ } /* }}} */
+
+ if (!is_valid) /* {{{ */
+ {
+ sfree (agg);
+ return (-1);
+ } /* }}} */
+
+ status = lookup_add (lookup, &agg->ident, agg);
+ if (status != 0)
+ {
+ ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
+ sfree (agg);
+ return (-1);
+ }
+
+ DEBUG ("aggregation plugin: Successfully added aggregation: "
+ "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+ "Type \"%s\", TypeInstance \"%s\")",
+ agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+ agg->ident.type, agg->ident.type_instance);
+ return (0);
+} /* }}} int agg_config_aggregation */
+
+static int agg_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+
+ if (lookup == NULL)
+ {
+ lookup = lookup_create (agg_lookup_class_callback,
+ agg_lookup_obj_callback,
+ agg_lookup_free_class_callback,
+ agg_lookup_free_obj_callback);
+ if (lookup == NULL)
+ {
+ pthread_mutex_unlock (&agg_instance_list_lock);
+ ERROR ("aggregation plugin: lookup_create failed.");
+ return (-1);
+ }
+ }
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Aggregation", child->key) == 0)
+ agg_config_aggregation (child);
+ else
+ WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+ "<Plugin aggregation /> blocks and will be ignored.", child->key);
+ }
+
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return (0);
+} /* }}} int agg_config */
+
+static int agg_read (void) /* {{{ */
+{
+ agg_instance_t *this;
+ cdtime_t t;
+ int success;
+
+ t = cdtime ();
+ success = 0;
+
+ pthread_mutex_lock (&agg_instance_list_lock);
+
+ /* agg_instance_list_head only holds data, after the "write" callback has
+ * been called with a matching value list at least once. So on startup,
+ * there's a race between the aggregations read() and write() callback. If
+ * the read() callback is called first, agg_instance_list_head is NULL and
+ * "success" may be zero. This is expected and should not result in an error.
+ * Therefore we need to handle this case separately. */
+ if (agg_instance_list_head == NULL)
+ {
+ pthread_mutex_unlock (&agg_instance_list_lock);
+ return (0);
+ }
+
+ for (this = agg_instance_list_head; this != NULL; this = this->next)
+ {
+ int status;
+
+ status = agg_instance_read (this, t);
+ if (status != 0)
+ WARNING ("aggregation plugin: Reading an aggregation instance "
+ "failed with status %i.", status);
+ else
+ success++;
+ }
+
+ pthread_mutex_unlock (&agg_instance_list_lock);
+
+ return ((success > 0) ? 0 : -1);
+} /* }}} int agg_read */
+
+static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
+ __attribute__((unused)) user_data_t *user_data)
+{
+ _Bool created_by_aggregation = 0;
+ int status;
+
+ /* Ignore values that were created by the aggregation plugin to avoid weird
+ * effects. */
+ (void) meta_data_get_boolean (vl->meta, "aggregation:created",
+ &created_by_aggregation);
+ if (created_by_aggregation)
+ return (0);
+
+ if (lookup == NULL)
+ status = ENOENT;
+ else
+ {
+ status = lookup_search (lookup, ds, vl);
+ if (status > 0)
+ status = 0;
+ }
+
+ return (status);
+} /* }}} int agg_write */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("aggregation", agg_config);
+ plugin_register_read ("aggregation", agg_read);
+ plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
+}
+
+/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */
#include "plugin.h"
#include "utils_cmd_putval.h"
#include "utils_format_json.h"
+#include "utils_format_graphite.h"
#include <pthread.h>
#define CAMQP_DM_VOLATILE 1
#define CAMQP_DM_PERSISTENT 2
-#define CAMQP_FORMAT_COMMAND 1
-#define CAMQP_FORMAT_JSON 2
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON 2
+#define CAMQP_FORMAT_GRAPHITE 3
#define CAMQP_CHANNEL 1
uint8_t delivery_mode;
_Bool store_rates;
int format;
+ /* publish & graphite format only */
+ char *prefix;
+ char *postfix;
+ char escape_char;
/* subscribe only */
char *exchange_type;
sfree (conf->exchange_type);
sfree (conf->queue);
sfree (conf->routing_key);
+ sfree (conf->prefix);
+ sfree (conf->postfix);
+
sfree (conf);
} /* }}} void camqp_config_free */
props.content_type = amqp_cstring_bytes("text/collectd");
else if (conf->format == CAMQP_FORMAT_JSON)
props.content_type = amqp_cstring_bytes("application/json");
+ else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+ props.content_type = amqp_cstring_bytes("text/graphite");
else
assert (23 == 42);
props.delivery_mode = conf->delivery_mode;
format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
format_json_finalize (buffer, &bfill, &bfree);
}
+ else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+ {
+ status = format_graphite (buffer, sizeof (buffer), ds, vl,
+ conf->prefix, conf->postfix, conf->escape_char,
+ conf->store_rates);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: format_graphite failed with status %i.",
+ status);
+ return (status);
+ }
+ }
else
{
ERROR ("amqp plugin: Invalid format (%i).", conf->format);
conf->format = CAMQP_FORMAT_COMMAND;
else if (strcasecmp ("JSON", string) == 0)
conf->format = CAMQP_FORMAT_JSON;
+ else if (strcasecmp ("Graphite", string) == 0)
+ conf->format = CAMQP_FORMAT_GRAPHITE;
else
{
WARNING ("amqp plugin: Invalid format string: %s",
/* publish only */
conf->delivery_mode = CAMQP_DM_VOLATILE;
conf->store_rates = 0;
+ /* publish & graphite only */
+ conf->prefix = NULL;
+ conf->postfix = NULL;
+ conf->escape_char = '_';
/* subscribe only */
conf->exchange_type = NULL;
conf->queue = NULL;
status = cf_util_get_boolean (child, &conf->store_rates);
else if ((strcasecmp ("Format", child->key) == 0) && publish)
status = camqp_config_set_format (child, conf);
+ else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
+ status = cf_util_get_string (child, &conf->prefix);
+ else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
+ status = cf_util_get_string (child, &conf->postfix);
+ else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
+ {
+ char *tmp_buff = NULL;
+ status = cf_util_get_string (child, &tmp_buff);
+ if (strlen (tmp_buff) > 1)
+ WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
+ "only one character. Others will be ignored.");
+ conf->escape_char = tmp_buff[0];
+ sfree (tmp_buff);
+ }
else
WARNING ("amqp plugin: Ignoring unknown "
"configuration option \"%s\".", child->key);
# to missing dependencies or because they have been deactivated explicitly. #
##############################################################################
+#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
#@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
#@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
# ription of those options is available in the collectd.conf(5) manual page. #
##############################################################################
+#<Plugin "aggregation">
+# <Aggregation>
+# #Host "unspecified"
+# Plugin "cpu"
+# #PluginInstance "unspecified"
+# Type "cpu"
+# #TypeInstance "unspecified"
+#
+# GroupBy "Host"
+# GroupBy "TypeInstance"
+#
+# CalculateNum false
+# CalculateSum false
+# CalculateAverage true
+# CalculateMinimum false
+# CalculateMaximum false
+# CalculateStddev false
+# </Aggregation>
+#</Plugin>
+
#<Plugin "amqp">
# <Publish "name">
# Host "localhost"
#</Plugin>
#<Plugin memcached>
-# Host "127.0.0.1"
-# Port "11211"
+# <Instance "local">
+# Host "127.0.0.1"
+# Port "11211"
+# </Instance>
#</Plugin>
#<Plugin modbus>
# Host "localhost"
# Port 123
# ReverseLookups false
+# IncludeUnitID true
#</Plugin>
#<Plugin nut>
#<Plugin "swap">
# ReportByDevice false
+# ReportBytes true
#</Plugin>
#<Plugin "table">
require external configuration, too. The C<apache plugin>, for example,
required C<mod_status> to be configured in the webserver you're going to
collect data from. These plugins are listed below as well, even if they don't
-require any configuration within collectd's configfile.
+require any configuration within collectd's configuration file.
A list of all plugins and a short summary for each plugin can be found in the
F<README> file shipped with the sourcecode and hopefully binary packets as
well.
+=head2 Plugin C<aggregation>
+
+The I<Aggregation plugin> makes it possible to aggregate several values into
+one using aggregation functions such as I<sum>, I<average>, I<min> and I<max>.
+This can be put to a wide variety of uses, e.g. average and total CPU
+statistics for your entire fleet.
+
+The grouping is powerful but, as with many powerful tools, may be a bit
+difficult to wrap your head around. The grouping will therefore be
+demonstrated using an example: The average and sum of the CPU usage across
+all CPUs of each host is to be calculated.
+
+To select all the affected values for our example, set C<Plugin cpu> and
+C<Type cpu>. The other values are left unspecified, meaning "all values". The
+I<Host>, I<Plugin>, I<PluginInstance>, I<Type> and I<TypeInstance> options
+work as if they were specified in the C<WHERE> clause of an C<SELECT> SQL
+statement.
+
+ Plugin "cpu"
+ Type "cpu"
+
+Although the I<Host>, I<PluginInstance> (CPU number, i.e. 0, 1, 2, ...) and
+I<TypeInstance> (idle, user, system, ...) fields are left unspecified in the
+example, the intention is to have a new value for each host / type instance
+pair. This is achieved by "grouping" the values using the C<GroupBy> option.
+It can be specified multiple times to group by more than one field.
+
+ GroupBy "Host"
+ GroupBy "TypeInstance"
+
+We do neither specify nor group by I<plugin instance> (the CPU number), so all
+metrics that differ in the CPU number only will be aggregated. Each
+aggregation needs I<at least one> such field, otherwise no aggregation would
+take place.
+
+The full example configuration looks like this:
+
+ <Plugin "aggregation">
+ <Aggregation>
+ Plugin "cpu"
+ Type "cpu"
+
+ GroupBy "Host"
+ GroupBy "TypeInstance"
+
+ CalculateSum true
+ CalculateAverage true
+ </Aggregation>
+ </Plugin>
+
+There are a couple of limitations you should be aware of:
+
+=over 4
+
+=item
+
+The I<Type> cannot be left unspecified, because it is not reasonable to add
+apples to oranges. Also, the internal lookup structure won't work if you try
+to group by type.
+
+=item
+
+There must be at least one unspecified, ungrouped field. Otherwise nothing
+will be aggregated.
+
+=back
+
+As you can see in the example above, each aggregation has its own
+B<Aggregation> block. You can have multiple aggregation blocks and aggregation
+blocks may match the same values, i.e. one value list can update multiple
+aggregations. The following options are valid inside B<Aggregation> blocks:
+
+=over 4
+
+=item B<Host> I<Host>
+
+=item B<Plugin> I<Plugin>
+
+=item B<PluginInstance> I<PluginInstance>
+
+=item B<Type> I<Type>
+
+=item B<TypeInstance> I<TypeInstance>
+
+Selects the value lists to be added to this aggregation. B<Type> must be a
+valid data set name, see L<types.db(5)> for details.
+
+=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
+
+Group valued by the specified field. The B<GroupBy> option may be repeated to
+group by multiple fields.
+
+=item B<CalculateNum> B<true>|B<false>
+
+=item B<CalculateSum> B<true>|B<false>
+
+=item B<CalculateAverage> B<true>|B<false>
+
+=item B<CalculateMinimum> B<true>|B<false>
+
+=item B<CalculateMaximum> B<true>|B<false>
+
+=item B<CalculateStddev> B<true>|B<false>
+
+Boolean options for enabling calculation of the number of value lists, their
+sum, average, minimum, maximum andE<nbsp>/ or standard deviation. All options
+are disabled by default.
+
+=back
+
=head2 Plugin C<amqp>
The I<AMQMP plugin> can be used to communicate with other instances of
# Persistent false
# Format "command"
# StoreRates false
+ # GraphitePrefix "collectd."
+ # GraphiteEscapeChar "_"
</Publish>
# Receive values from an AMQP broker
an easy and straight forward exchange format. The C<Content-Type> header field
will be set to C<application/json>.
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
A subscribing client I<should> use the C<Content-Type> header field to
determine how to decode the values. Currently, the I<AMQP plugin> itself can
only decode the B<Command> format.
Please note that currently this option is only used if the B<Format> option has
been set to B<JSON>.
+=item B<GraphitePrefix> (Publish and B<Format>=I<Graphite> only)
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix> (Publish and B<Format>=I<Graphite> only)
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar> (Publish and B<Format>=I<Graphite> only)
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
=back
=head2 Plugin C<apache>
about cache utilization, memory and bandwidth used.
L<http://www.danga.com/memcached/>
+ <Plugin "memcached">
+ <Instance "name">
+ Host "memcache.example.com"
+ Port 11211
+ </Instance>
+ </Plugin>
+
+The plugin configuration consists of one or more B<Instance> blocks which
+specify one I<memcached> connection each. Within the B<Instance> blocks, the
+following options are allowed:
+
=over 4
=item B<Host> I<Hostname>
TCP-Port to connect to. Defaults to B<11211>.
+=item B<Socket> I<Path>
+
+Connect to I<memcached> using the UNIX domain socket at I<Path>. If this
+setting is given, the B<Host> and B<Port> settings are ignored.
+
=back
=head2 Plugin C<modbus>
lookups. The default is to do reverse lookups to preserve backwards
compatibility, though.
+=item B<IncludeUnitID> B<true>|B<false>
+
+When a peer is a refclock, include the unit ID in the I<type instance>.
+Defaults to B<false> for backward compatibility.
+
+If two refclock peers use the same driver and this is B<false>, the plugin will
+try to write simultaneous measurements from both to the same type instance.
+This will result in error messages in the log and only one set of measurements
+making it through.
+
=back
=head2 Plugin C<nut>
Defines the "database alias" or "service name" to connect to. Usually, these
names are defined in the file named C<$ORACLE_HOME/network/admin/tnsnames.ora>.
+=item B<Host> I<Host>
+
+Hostname to use when dispatching values for this database. Defaults to using
+the global hostname of the I<collectd> instance.
+
=item B<Username> I<Username>
Username used for authentication.
The name of the database of the current connection.
+=item I<instance>
+
+The name of the database plugin instance. See the B<Instance> option of the
+database specification below for details.
+
=item I<username>
The username used to connect to the database.
Specify whether to use an SSL connection when contacting the server. The
following modes are supported:
+=item B<Instance> I<name>
+
+Specify the plugin instance name that should be used instead of the database
+name (which is the default, if this option has not been specified). This
+allows to query multiple databases of the same name on the same host (e.g.
+when running multiple database server versions in parallel).
+
=over 4
=item I<disable>
connections. Either a service name of a port number may be given. Please note
that numerical port numbers must be given as a string, too.
+=item B<Password> I<Password>
+
+Use I<Password> to authenticate when connecting to I<Redis>.
+
=item B<Timeout> I<Timeout in miliseconds>
The B<Timeout> option set the socket timeout for node response. Since the Redis
locally, or B<DataDir> is set to a relative path, this will not work as
expected. Default is B<true>.
+=item B<StepSize> I<Seconds>
+
+B<Force> the stepsize of newly created RRD-files. Ideally (and per default)
+this setting is unset and the stepsize is set to the interval in which the data
+is collected. Do not use this option unless you absolutely have to for some
+reason. Setting this option may cause problems with the C<snmp plugin>, the
+C<exec plugin> or when the daemon is set up to receive data from other hosts.
+
+=item B<HeartBeat> I<Seconds>
+
+B<Force> the heartbeat of newly created RRD-files. This setting should be unset
+in which case the heartbeat is set to twice the B<StepSize> which should equal
+the interval in which data is collected. Do not set this option unless you have
+a very good reason to do so.
+
+=item B<RRARows> I<NumRows>
+
+The C<rrdtool plugin> calculates the number of PDPs per CDP based on the
+B<StepSize>, this setting and a timespan. This plugin creates RRD-files with
+three times five RRAs, i. e. five RRAs with the CFs B<MIN>, B<AVERAGE>, and
+B<MAX>. The five RRAs are optimized for graphs covering one hour, one day, one
+week, one month, and one year.
+
+So for each timespan, it calculates how many PDPs need to be consolidated into
+one CDP by calculating:
+ number of PDPs = timespan / (stepsize * rrarows)
+
+Bottom line is, set this no smaller than the width of you graphs in pixels. The
+default is 1200.
+
+=item B<RRATimespan> I<Seconds>
+
+Adds an RRA-timespan, given in seconds. Use this option multiple times to have
+more then one RRA. If this option is never used, the built-in default of (3600,
+86400, 604800, 2678400, 31622400) is used.
+
+For more information on how RRA-sizes are calculated see B<RRARows> above.
+
+=item B<XFF> I<Factor>
+
+Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
+I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
+one (exclusive).
+
=back
=head2 Plugin C<rrdtool>
=item B<XFF> I<Factor>
Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
+I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
+one (exclusive).
=item B<CacheFlush> I<Seconds>
This option is only available if the I<Swap plugin> can read C</proc/swaps>
(under Linux) or use the L<swapctl(2)> mechanism (under I<Solaris>).
+=item B<ReportBytes> B<false>|B<true>
+
+When enabled, the I<swap I/O> is reported in bytes. When disabled, the default,
+I<swap I/O> is reported in pages. This option is available under Linux only.
+
=back
=head2 Plugin C<syslog>
}
return (diff);
-} /* counter_t counter_to_gauge */
+} /* counter_t counter_diff */
+
+int rate_to_value (value_t *ret_value, gauge_t rate, /* {{{ */
+ rate_to_value_state_t *state,
+ int ds_type, cdtime_t t)
+{
+ gauge_t delta_gauge;
+ cdtime_t delta_t;
+
+ if (ds_type == DS_TYPE_GAUGE)
+ {
+ state->last_value.gauge = rate;
+ state->last_time = t;
+
+ *ret_value = state->last_value;
+ return (0);
+ }
+
+ /* Counter and absolute can't handle negative rates. Reset "last time"
+ * to zero, so that the next valid rate will re-initialize the
+ * structure. */
+ if ((rate < 0.0)
+ && ((ds_type == DS_TYPE_COUNTER)
+ || (ds_type == DS_TYPE_ABSOLUTE)))
+ {
+ memset (state, 0, sizeof (*state));
+ return (EINVAL);
+ }
+
+ /* Another invalid state: The time is not increasing. */
+ if (t <= state->last_time)
+ {
+ memset (state, 0, sizeof (*state));
+ return (EINVAL);
+ }
+
+ delta_t = t - state->last_time;
+ delta_gauge = (rate * CDTIME_T_TO_DOUBLE (delta_t)) + state->residual;
+
+ /* Previous value is invalid. */
+ if (state->last_time == 0) /* {{{ */
+ {
+ if (ds_type == DS_TYPE_DERIVE)
+ {
+ state->last_value.derive = (derive_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.derive);
+ }
+ else if (ds_type == DS_TYPE_COUNTER)
+ {
+ state->last_value.counter = (counter_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.counter);
+ }
+ else if (ds_type == DS_TYPE_ABSOLUTE)
+ {
+ state->last_value.absolute = (absolute_t) rate;
+ state->residual = rate - ((gauge_t) state->last_value.absolute);
+ }
+ else
+ {
+ assert (23 == 42);
+ }
+
+ state->last_time = t;
+ return (EAGAIN);
+ } /* }}} */
+
+ if (ds_type == DS_TYPE_DERIVE)
+ {
+ derive_t delta_derive = (derive_t) delta_gauge;
+
+ state->last_value.derive += delta_derive;
+ state->residual = delta_gauge - ((gauge_t) delta_derive);
+ }
+ else if (ds_type == DS_TYPE_COUNTER)
+ {
+ counter_t delta_counter = (counter_t) delta_gauge;
+
+ state->last_value.counter += delta_counter;
+ state->residual = delta_gauge - ((gauge_t) delta_counter);
+ }
+ else if (ds_type == DS_TYPE_ABSOLUTE)
+ {
+ absolute_t delta_absolute = (absolute_t) delta_gauge;
+
+ state->last_value.absolute = delta_absolute;
+ state->residual = delta_gauge - ((gauge_t) delta_absolute);
+ }
+ else
+ {
+ assert (23 == 42);
+ }
+
+ state->last_time = t;
+ *ret_value = state->last_value;
+ return (0);
+} /* }}} value_t rate_to_value */
int service_name_to_port_number (const char *service_name)
{
|| (strcasecmp ("no", (s)) == 0) \
|| (strcasecmp ("off", (s)) == 0))
+struct rate_to_value_state_s
+{
+ value_t last_value;
+ cdtime_t last_time;
+ gauge_t residual;
+};
+typedef struct rate_to_value_state_s rate_to_value_state_t;
+
char *sstrncpy (char *dest, const char *src, size_t n);
int ssnprintf (char *dest, size_t n, const char *format, ...);
char *sstrdup(const char *s);
counter_t counter_diff (counter_t old_value, counter_t new_value);
+/* Convert a rate back to a value_t. When converting to a derive_t, counter_t
+ * or absoltue_t, take fractional residuals into account. This is important
+ * when scaling counters, for example.
+ * Returns zero on success. Returns EAGAIN when called for the first time; in
+ * this case the value_t is invalid and the next call should succeed. Other
+ * return values indicate an error. */
+int rate_to_value (value_t *ret_value, gauge_t rate,
+ rate_to_value_state_t *state, int ds_type, cdtime_t t);
+
/* Converts a service name (a string) to a port number
* (in the range [1-65535]). Returns less than zero on error. */
int service_name_to_port_number (const char *service_name);
/* no global variables */
/* #endif KERNEL_LINUX */
+#elif HAVE_PERFSTAT
+# include <sys/protosw.h>
+# include <libperfstat.h>
+/* #endif HAVE_PERFSTAT */
+
#else
# error "No applicable input method."
#endif
if (status == -2)
ERROR ("contextswitch plugin: Unable to find context switch value.");
-#endif /* KERNEL_LINUX */
+/* #endif KERNEL_LINUX */
+
+#elif HAVE_PERFSTAT
+ int status = 0;
+ perfstat_cpu_total_t perfcputotal;
+
+ status = perfstat_cpu_total(NULL, &perfcputotal, sizeof(perfstat_cpu_total_t), 1);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ ERROR ("contextswitch plugin: perfstat_cpu_total: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return (-1);
+ }
+
+ cs_submit(perfcputotal.pswitch);
+ status = 0;
+#endif /* defined(HAVE_PERFSTAT) */
return status;
}
/**
* collectd - src/disk.c
- * Copyright (C) 2005-2010 Florian octo Forster
+ * Copyright (C) 2005-2012 Florian octo Forster
* Copyright (C) 2009 Manuel Sanmartin
*
* This program is free software; you can redistribute it and/or modify it
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
* Manuel Sanmartin
**/
#if HAVE_IOKIT_IOKITLIB_H
static mach_port_t io_master_port = MACH_PORT_NULL;
+/* This defaults to false for backwards compatibility. Please fix in the next
+ * major version. */
+static _Bool use_bsd_name = 0;
/* #endif HAVE_IOKIT_IOKITLIB_H */
#elif KERNEL_LINUX
static const char *config_keys[] =
{
"Disk",
+ "UseBSDName",
"IgnoreSelected"
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
invert = 0;
ignorelist_set_invert (ignorelist, invert);
}
+ else if (strcasecmp ("UseBSDName", key) == 0)
+ {
+#if HAVE_IOKIT_IOKITLIB_H
+ use_bsd_name = IS_TRUE (value) ? 1 : 0;
+#else
+ WARNING ("disk plugin: The \"UseBSDName\" option is only supported "
+ "on Mach / Mac OS X and will be ignored.");
+#endif
+ }
else
{
return (-1);
CFDictionaryRef props_dict;
CFDictionaryRef stats_dict;
CFDictionaryRef child_dict;
- kern_return_t status;
+ CFStringRef tmp_cf_string_ref;
+ kern_return_t status;
signed long long read_ops;
signed long long read_byt;
int disk_major;
int disk_minor;
- char disk_name[64];
+ char disk_name[DATA_MAX_NAME_LEN];
+ char disk_name_bsd[DATA_MAX_NAME_LEN];
/* Get the list of all disk objects. */
if (IOServiceGetMatchingServices (io_master_port,
continue;
}
+ /* tmp_cf_string_ref doesn't need to be released. */
+ tmp_cf_string_ref = (CFStringRef) CFDictionaryGetValue (props_dict,
+ CFSTR(kIOBSDNameKey));
+ if (!tmp_cf_string_ref)
+ {
+ DEBUG ("disk plugin: CFDictionaryGetValue("
+ "kIOBSDNameKey) failed.");
+ CFRelease (props_dict);
+ IOObjectRelease (disk_child);
+ IOObjectRelease (disk);
+ continue;
+ }
+ assert (CFGetTypeID (tmp_cf_string_ref) == CFStringGetTypeID ());
+
+ memset (disk_name_bsd, 0, sizeof (disk_name_bsd));
+ CFStringGetCString (tmp_cf_string_ref,
+ disk_name_bsd, sizeof (disk_name_bsd),
+ kCFStringEncodingUTF8);
+ if (disk_name_bsd[0] == 0)
+ {
+ ERROR ("disk plugin: CFStringGetCString() failed.");
+ CFRelease (props_dict);
+ IOObjectRelease (disk_child);
+ IOObjectRelease (disk);
+ continue;
+ }
+ DEBUG ("disk plugin: disk_name_bsd = \"%s\"", disk_name_bsd);
+
stats_dict = (CFDictionaryRef) CFDictionaryGetValue (props_dict,
CFSTR (kIOBlockStorageDriverStatisticsKey));
if (stats_dict == NULL)
{
- DEBUG ("CFDictionaryGetValue (%s) failed.",
+ DEBUG ("disk plugin: CFDictionaryGetValue ("
+ "%s) failed.",
kIOBlockStorageDriverStatisticsKey);
CFRelease (props_dict);
IOObjectRelease (disk_child);
kNilOptions)
!= kIOReturnSuccess)
{
- DEBUG ("IORegistryEntryCreateCFProperties (disk_child) failed.");
+ DEBUG ("disk plugin: IORegistryEntryCreateCFProperties ("
+ "disk_child) failed.");
IOObjectRelease (disk_child);
CFRelease (props_dict);
IOObjectRelease (disk);
write_tme = dict_get_value (stats_dict,
kIOBlockStorageDriverStatisticsTotalWriteTimeKey);
- if (ssnprintf (disk_name, sizeof (disk_name),
- "%i-%i", disk_major, disk_minor) >= sizeof (disk_name))
- {
- DEBUG ("snprintf (major, minor) failed.");
- CFRelease (child_dict);
- IOObjectRelease (disk_child);
- CFRelease (props_dict);
- IOObjectRelease (disk);
- continue;
- }
- DEBUG ("disk_name = %s", disk_name);
+ if (use_bsd_name)
+ sstrncpy (disk_name, disk_name_bsd, sizeof (disk_name));
+ else
+ ssnprintf (disk_name, sizeof (disk_name), "%i-%i",
+ disk_major, disk_minor);
+ DEBUG ("disk plugin: disk_name = \"%s\"", disk_name);
if ((read_byt != -1LL) || (write_byt != -1LL))
disk_submit (disk_name, "disk_octets", read_byt, write_byt);
$$.children = $2.statement;
$$.children_num = $2.statement_num;
}
+ | block_begin block_end
+ {
+ if (strcmp ($1.key, $2) != 0)
+ {
+ printf ("block_begin = %s; block_end = %s;\n", $1.key, $2);
+ yyerror ("Block not closed..\n");
+ exit (1);
+ }
+ free ($2); $2 = NULL;
+ $$ = $1;
+ $$.children = NULL;
+ $$.children_num = 0;
+ }
;
statement:
ci_root->children = $1.statement;
ci_root->children_num = $1.statement_num;
}
+ | /* epsilon */
+ {
+ ci_root = malloc (sizeof (oconfig_item_t));
+ memset (ci_root, '\0', sizeof (oconfig_item_t));
+ ci_root->children = NULL;
+ ci_root->children_num = 0;
+ }
;
%%
/**
* collectd - src/memcached.c, based on src/hddtemp.c
* Copyright (C) 2007 Antony Dovgal
- * Copyright (C) 2007-2010 Florian Forster
+ * Copyright (C) 2007-2012 Florian Forster
* Copyright (C) 2009 Doug MacEachern
* Copyright (C) 2009 Franck Lombardi
+ * Copyright (C) 2012 Nicolas Szalay
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Florian octo Forster <octo at collectd.org>
* Doug MacEachern <dougm at hyperic.com>
* Franck Lombardi
+ * Nicolas Szalay
**/
#include "collectd.h"
#include "plugin.h"
#include "configfile.h"
-# include <poll.h>
-# include <netdb.h>
-# include <sys/socket.h>
-# include <sys/un.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
-
-/* Hack to work around the missing define in AIX */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
#define MEMCACHED_DEF_HOST "127.0.0.1"
#define MEMCACHED_DEF_PORT "11211"
-#define MEMCACHED_RETRY_COUNT 100
-
-static const char *config_keys[] =
+struct memcached_s
{
- "Socket",
- "Host",
- "Port"
+ char *name;
+ char *socket;
+ char *host;
+ char *port;
};
-static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+typedef struct memcached_s memcached_t;
-static char *memcached_socket = NULL;
-static char *memcached_host = NULL;
-static char memcached_port[16];
+static _Bool memcached_have_instances = 0;
-static int memcached_query_daemon (char *buffer, int buffer_size) /* {{{ */
+static void memcached_free (memcached_t *st)
{
- int fd;
- ssize_t status;
- int buffer_fill;
- int i = 0;
-
- if (memcached_socket != NULL) {
- struct sockaddr_un serv_addr;
-
- memset (&serv_addr, 0, sizeof (serv_addr));
- serv_addr.sun_family = AF_UNIX;
- sstrncpy (serv_addr.sun_path, memcached_socket,
- sizeof (serv_addr.sun_path));
-
- /* create our socket descriptor */
- fd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (fd < 0) {
- char errbuf[1024];
- ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
- sizeof (errbuf)));
- return -1;
- }
-
- /* connect to the memcached daemon */
- status = (ssize_t) connect (fd, (struct sockaddr *) &serv_addr,
- sizeof (serv_addr));
- if (status != 0) {
- shutdown (fd, SHUT_RDWR);
- close (fd);
- fd = -1;
- }
- }
- else { /* if (memcached_socket == NULL) */
- const char *host;
- const char *port;
-
- struct addrinfo ai_hints;
- struct addrinfo *ai_list, *ai_ptr;
- int ai_return = 0;
-
- memset (&ai_hints, '\0', sizeof (ai_hints));
- ai_hints.ai_flags = 0;
+ if (st == NULL)
+ return;
+
+ sfree (st->name);
+ sfree (st->socket);
+ sfree (st->host);
+ sfree (st->port);
+}
+
+static int memcached_connect_unix (memcached_t *st)
+{
+ struct sockaddr_un serv_addr;
+ int fd;
+
+ memset (&serv_addr, 0, sizeof (serv_addr));
+ serv_addr.sun_family = AF_UNIX;
+ sstrncpy (serv_addr.sun_path, st->socket,
+ sizeof (serv_addr.sun_path));
+
+ /* create our socket descriptor */
+ fd = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0)
+ {
+ char errbuf[1024];
+ ERROR ("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return (-1);
+ }
+
+ return (fd);
+} /* int memcached_connect_unix */
+
+static int memcached_connect_inet (memcached_t *st)
+{
+ char *host;
+ char *port;
+
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list, *ai_ptr;
+ int status;
+ int fd = -1;
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+ ai_hints.ai_flags = 0;
#ifdef AI_ADDRCONFIG
- /* ai_hints.ai_flags |= AI_ADDRCONFIG; */
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
#endif
- ai_hints.ai_family = AF_INET;
- ai_hints.ai_socktype = SOCK_STREAM;
- ai_hints.ai_protocol = 0;
-
- host = memcached_host;
- if (host == NULL) {
- host = MEMCACHED_DEF_HOST;
- }
-
- port = memcached_port;
- if (strlen (port) == 0) {
- port = MEMCACHED_DEF_PORT;
- }
-
- if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
- char errbuf[1024];
- ERROR ("memcached: getaddrinfo (%s, %s): %s",
- host, port,
- (ai_return == EAI_SYSTEM)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : gai_strerror (ai_return));
- return -1;
- }
-
- fd = -1;
- for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
- /* create our socket descriptor */
- fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
- if (fd < 0) {
- char errbuf[1024];
- ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
- continue;
- }
-
- /* connect to the memcached daemon */
- status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen);
- if (status != 0) {
- shutdown (fd, SHUT_RDWR);
- close (fd);
- fd = -1;
- continue;
- }
-
- /* A socket could be opened and connecting succeeded. We're
- * done. */
- break;
- }
-
- freeaddrinfo (ai_list);
- }
-
- if (fd < 0) {
- ERROR ("memcached: Could not connect to daemon.");
- return -1;
- }
-
- if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
- ERROR ("memcached: Could not send command to the memcached daemon.");
- return -1;
- }
-
- {
- struct pollfd p;
- int status;
-
- memset (&p, 0, sizeof (p));
- p.fd = fd;
- p.events = POLLIN | POLLERR | POLLHUP;
- p.revents = 0;
-
- status = poll (&p, /* nfds = */ 1,
- /* timeout = */ CDTIME_T_TO_MS (interval_g));
- if (status <= 0)
- {
- if (status == 0)
- {
- ERROR ("memcached: poll(2) timed out after %.3f seconds.",
- CDTIME_T_TO_DOUBLE (interval_g));
- }
- else
- {
- char errbuf[1024];
- ERROR ("memcached: poll(2) failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- }
- shutdown (fd, SHUT_RDWR);
- close (fd);
- return (-1);
- }
- }
-
- /* receive data from the memcached daemon */
- memset (buffer, '\0', buffer_size);
-
- buffer_fill = 0;
- while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
- if (i > MEMCACHED_RETRY_COUNT) {
- ERROR("recv() timed out");
- break;
- }
- i++;
-
- if (status == -1) {
- char errbuf[1024];
-
- if (errno == EAGAIN) {
- continue;
- }
-
- ERROR ("memcached: Error reading from socket: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- shutdown(fd, SHUT_RDWR);
- close (fd);
- return -1;
- }
- buffer_fill += status;
-
- if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
- /* we got all the data */
- break;
- }
- }
-
- if (buffer_fill >= buffer_size) {
- buffer[buffer_size - 1] = '\0';
- WARNING ("memcached: Message from memcached has been truncated.");
- } else if (buffer_fill == 0) {
- WARNING ("memcached: Peer has unexpectedly shut down the socket. "
- "Buffer: `%s'", buffer);
- shutdown(fd, SHUT_RDWR);
- close(fd);
- return -1;
- }
-
- shutdown(fd, SHUT_RDWR);
- close(fd);
- return 0;
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_STREAM;
+ ai_hints.ai_protocol = 0;
+
+ host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
+ port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
+
+ ai_list = NULL;
+ status = getaddrinfo (host, port, &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("memcached plugin: memcached_connect_inet: "
+ "getaddrinfo(%s,%s) failed: %s",
+ host, port,
+ (status == EAI_SYSTEM)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : gai_strerror (status));
+ return (-1);
+ }
+
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ /* create our socket descriptor */
+ fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+ if (fd < 0)
+ {
+ char errbuf[1024];
+ WARNING ("memcached plugin: memcached_connect_inet: "
+ "socket(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ continue;
+ }
+
+ /* connect to the memcached daemon */
+ status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ if (status != 0)
+ {
+ shutdown (fd, SHUT_RDWR);
+ close (fd);
+ fd = -1;
+ continue;
+ }
+
+ /* A socket could be opened and connecting succeeded. We're done. */
+ break;
+ }
+
+ freeaddrinfo (ai_list);
+ return (fd);
+} /* int memcached_connect_inet */
+
+static int memcached_connect (memcached_t *st)
+{
+ if (st->socket != NULL)
+ return (memcached_connect_unix (st));
+ else
+ return (memcached_connect_inet (st));
}
-/* }}} */
-static int memcached_config (const char *key, const char *value) /* {{{ */
+static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
{
- if (strcasecmp (key, "Socket") == 0) {
- if (memcached_socket != NULL) {
- free (memcached_socket);
- }
- memcached_socket = strdup (value);
- } else if (strcasecmp (key, "Host") == 0) {
- if (memcached_host != NULL) {
- free (memcached_host);
- }
- memcached_host = strdup (value);
- } else if (strcasecmp (key, "Port") == 0) {
- int port = (int) (atof (value));
- if ((port > 0) && (port <= 65535)) {
- ssnprintf (memcached_port, sizeof (memcached_port), "%i", port);
- } else {
- sstrncpy (memcached_port, value, sizeof (memcached_port));
- }
- } else {
- return -1;
- }
-
- return 0;
+ int fd = -1;
+ int status;
+ size_t buffer_fill;
+
+ fd = memcached_connect (st);
+ if (fd < 0) {
+ ERROR ("memcached plugin: Instance \"%s\" could not connect to daemon.",
+ st->name);
+ return -1;
+ }
+
+ status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("memcached plugin: write(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ shutdown(fd, SHUT_RDWR);
+ close (fd);
+ return (-1);
+ }
+
+ /* receive data from the memcached daemon */
+ memset (buffer, 0, buffer_size);
+
+ buffer_fill = 0;
+ while ((status = (int) recv (fd, buffer + buffer_fill,
+ buffer_size - buffer_fill, /* flags = */ 0)) != 0)
+ {
+ char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
+ if (status < 0)
+ {
+ char errbuf[1024];
+
+ if ((errno == EAGAIN) || (errno == EINTR))
+ continue;
+
+ ERROR ("memcached: Error reading from socket: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ shutdown(fd, SHUT_RDWR);
+ close (fd);
+ return (-1);
+ }
+
+ buffer_fill += (size_t) status;
+ if (buffer_fill > buffer_size)
+ {
+ buffer_fill = buffer_size;
+ WARNING ("memcached plugin: Message was truncated.");
+ break;
+ }
+
+ /* If buffer ends in end_token, we have all the data. */
+ if (memcmp (buffer + buffer_fill - sizeof (end_token),
+ end_token, sizeof (end_token)) == 0)
+ break;
+ } /* while (recv) */
+
+ status = 0;
+ if (buffer_fill == 0)
+ {
+ WARNING ("memcached plugin: No data returned by memcached.");
+ status = -1;
+ }
+
+ shutdown(fd, SHUT_RDWR);
+ close(fd);
+ return (status);
+} /* int memcached_query_daemon */
+
+static void memcached_init_vl (value_list_t *vl, memcached_t const *st)
+{
+ sstrncpy (vl->plugin, "memcached", sizeof (vl->plugin));
+ if (strcmp (st->name, "__legacy__") == 0) /* legacy mode */
+ {
+ sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+ }
+ else
+ {
+ if (st->socket != NULL)
+ sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+ else
+ sstrncpy (vl->host,
+ (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST,
+ sizeof (vl->host));
+ sstrncpy (vl->plugin_instance, st->name, sizeof (vl->plugin_instance));
+ }
}
-/* }}} */
static void submit_derive (const char *type, const char *type_inst,
- derive_t value) /* {{{ */
+ derive_t value, memcached_t *st)
{
- value_t values[1];
- value_list_t vl = VALUE_LIST_INIT;
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+ memcached_init_vl (&vl, st);
- values[0].derive = value;
+ values[0].derive = value;
- vl.values = values;
- vl.values_len = 1;
- sstrncpy (vl.host, hostname_g, sizeof (vl.host));
- sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
- sstrncpy (vl.type, type, sizeof (vl.type));
- if (type_inst != NULL)
- sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+ vl.values = values;
+ vl.values_len = 1;
+ sstrncpy (vl.type, type, sizeof (vl.type));
+ if (type_inst != NULL)
+ sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
- plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+ plugin_dispatch_values (&vl);
+}
static void submit_derive2 (const char *type, const char *type_inst,
- derive_t value0, derive_t value1) /* {{{ */
+ derive_t value0, derive_t value1, memcached_t *st)
{
- value_t values[2];
- value_list_t vl = VALUE_LIST_INIT;
+ value_t values[2];
+ value_list_t vl = VALUE_LIST_INIT;
+ memcached_init_vl (&vl, st);
- values[0].derive = value0;
- values[1].derive = value1;
+ values[0].derive = value0;
+ values[1].derive = value1;
- vl.values = values;
- vl.values_len = 2;
- sstrncpy (vl.host, hostname_g, sizeof (vl.host));
- sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
- sstrncpy (vl.type, type, sizeof (vl.type));
- if (type_inst != NULL)
- sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+ vl.values = values;
+ vl.values_len = 2;
+ sstrncpy (vl.type, type, sizeof (vl.type));
+ if (type_inst != NULL)
+ sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
- plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+ plugin_dispatch_values (&vl);
+}
static void submit_gauge (const char *type, const char *type_inst,
- gauge_t value) /* {{{ */
+ gauge_t value, memcached_t *st)
{
- value_t values[1];
- value_list_t vl = VALUE_LIST_INIT;
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+ memcached_init_vl (&vl, st);
- values[0].gauge = value;
+ values[0].gauge = value;
- vl.values = values;
- vl.values_len = 1;
- sstrncpy (vl.host, hostname_g, sizeof (vl.host));
- sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
- sstrncpy (vl.type, type, sizeof (vl.type));
- if (type_inst != NULL)
- sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+ vl.values = values;
+ vl.values_len = 1;
+ sstrncpy (vl.type, type, sizeof (vl.type));
+ if (type_inst != NULL)
+ sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
- plugin_dispatch_values (&vl);
+ plugin_dispatch_values (&vl);
}
-/* }}} */
static void submit_gauge2 (const char *type, const char *type_inst,
- gauge_t value0, gauge_t value1) /* {{{ */
+ gauge_t value0, gauge_t value1, memcached_t *st)
{
- value_t values[2];
- value_list_t vl = VALUE_LIST_INIT;
+ value_t values[2];
+ value_list_t vl = VALUE_LIST_INIT;
+ memcached_init_vl (&vl, st);
- values[0].gauge = value0;
- values[1].gauge = value1;
+ values[0].gauge = value0;
+ values[1].gauge = value1;
- vl.values = values;
- vl.values_len = 2;
- sstrncpy (vl.host, hostname_g, sizeof (vl.host));
- sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
- sstrncpy (vl.type, type, sizeof (vl.type));
- if (type_inst != NULL)
- sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+ vl.values = values;
+ vl.values_len = 2;
+ sstrncpy (vl.type, type, sizeof (vl.type));
+ if (type_inst != NULL)
+ sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
- plugin_dispatch_values (&vl);
+ plugin_dispatch_values (&vl);
}
-/* }}} */
-static int memcached_read (void) /* {{{ */
+static int memcached_read (user_data_t *user_data)
{
- char buf[4096];
- char *fields[3];
- char *ptr;
- char *line;
- char *saveptr;
- int fields_num;
-
- gauge_t bytes_used = NAN;
- gauge_t bytes_total = NAN;
- gauge_t hits = NAN;
- gauge_t gets = NAN;
- derive_t rusage_user = 0;
- derive_t rusage_syst = 0;
- derive_t octets_rx = 0;
- derive_t octets_tx = 0;
-
- /* get data from daemon */
- if (memcached_query_daemon (buf, sizeof (buf)) < 0) {
- return -1;
- }
+ char buf[4096];
+ char *fields[3];
+ char *ptr;
+ char *line;
+ char *saveptr;
+ int fields_num;
+
+ gauge_t bytes_used = NAN;
+ gauge_t bytes_total = NAN;
+ gauge_t hits = NAN;
+ gauge_t gets = NAN;
+ derive_t rusage_user = 0;
+ derive_t rusage_syst = 0;
+ derive_t octets_rx = 0;
+ derive_t octets_tx = 0;
+
+ memcached_t *st;
+ st = user_data->data;
+
+ /* get data from daemon */
+ if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
+ return -1;
+ }
#define FIELD_IS(cnst) \
- (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
-
- ptr = buf;
- saveptr = NULL;
- while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
- {
- int name_len;
-
- ptr = NULL;
-
- fields_num = strsplit(line, fields, 3);
- if (fields_num != 3)
- continue;
-
- name_len = strlen(fields[1]);
- if (name_len == 0)
- continue;
-
- /*
- * For an explanation on these fields please refer to
- * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
- */
-
- /*
- * CPU time consumed by the memcached process
- */
- if (FIELD_IS ("rusage_user"))
- {
- rusage_user = atoll (fields[2]);
- }
- else if (FIELD_IS ("rusage_system"))
- {
- rusage_syst = atoll(fields[2]);
- }
-
- /*
- * Number of threads of this instance
- */
- else if (FIELD_IS ("threads"))
- {
- submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]));
- }
-
- /*
- * Number of items stored
- */
- else if (FIELD_IS ("curr_items"))
- {
- submit_gauge ("memcached_items", "current", atof (fields[2]));
- }
-
- /*
- * Number of bytes used and available (total - used)
- */
- else if (FIELD_IS ("bytes"))
- {
- bytes_used = atof (fields[2]);
- }
- else if (FIELD_IS ("limit_maxbytes"))
- {
- bytes_total = atof(fields[2]);
- }
-
- /*
- * Connections
- */
- else if (FIELD_IS ("curr_connections"))
- {
- submit_gauge ("memcached_connections", "current", atof (fields[2]));
- }
-
- /*
- * Commands
- */
- else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
- {
- const char *name = fields[1] + 4;
- submit_derive ("memcached_command", name, atoll (fields[2]));
- if (strcmp (name, "get") == 0)
- gets = atof (fields[2]);
- }
-
- /*
- * Operations on the cache, i. e. cache hits, cache misses and evictions of items
- */
- else if (FIELD_IS ("get_hits"))
- {
- submit_derive ("memcached_ops", "hits", atoll (fields[2]));
- hits = atof (fields[2]);
- }
- else if (FIELD_IS ("get_misses"))
- {
- submit_derive ("memcached_ops", "misses", atoll (fields[2]));
- }
- else if (FIELD_IS ("evictions"))
- {
- submit_derive ("memcached_ops", "evictions", atoll (fields[2]));
- }
-
- /*
- * Network traffic
- */
- else if (FIELD_IS ("bytes_read"))
- {
- octets_rx = atoll (fields[2]);
- }
- else if (FIELD_IS ("bytes_written"))
- {
- octets_tx = atoll (fields[2]);
- }
- } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
-
- if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
- submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used);
-
- if ((rusage_user != 0) || (rusage_syst != 0))
- submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst);
-
- if ((octets_rx != 0) || (octets_tx != 0))
- submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx);
-
- if (!isnan (gets) && !isnan (hits))
- {
- gauge_t rate = NAN;
-
- if (gets != 0.0)
- rate = 100.0 * hits / gets;
-
- submit_gauge ("percent", "hitratio", rate);
- }
-
- return 0;
+ (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
+
+ ptr = buf;
+ saveptr = NULL;
+ while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
+ {
+ int name_len;
+
+ ptr = NULL;
+
+ fields_num = strsplit(line, fields, 3);
+ if (fields_num != 3)
+ continue;
+
+ name_len = strlen(fields[1]);
+ if (name_len == 0)
+ continue;
+
+ /*
+ * For an explanation on these fields please refer to
+ * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
+ */
+
+ /*
+ * CPU time consumed by the memcached process
+ */
+ if (FIELD_IS ("rusage_user"))
+ {
+ rusage_user = atoll (fields[2]);
+ }
+ else if (FIELD_IS ("rusage_system"))
+ {
+ rusage_syst = atoll(fields[2]);
+ }
+
+ /*
+ * Number of threads of this instance
+ */
+ else if (FIELD_IS ("threads"))
+ {
+ submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
+ }
+
+ /*
+ * Number of items stored
+ */
+ else if (FIELD_IS ("curr_items"))
+ {
+ submit_gauge ("memcached_items", "current", atof (fields[2]), st);
+ }
+
+ /*
+ * Number of bytes used and available (total - used)
+ */
+ else if (FIELD_IS ("bytes"))
+ {
+ bytes_used = atof (fields[2]);
+ }
+ else if (FIELD_IS ("limit_maxbytes"))
+ {
+ bytes_total = atof(fields[2]);
+ }
+
+ /*
+ * Connections
+ */
+ else if (FIELD_IS ("curr_connections"))
+ {
+ submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
+ }
+
+ /*
+ * Commands
+ */
+ else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
+ {
+ const char *name = fields[1] + 4;
+ submit_derive ("memcached_command", name, atoll (fields[2]), st);
+ if (strcmp (name, "get") == 0)
+ gets = atof (fields[2]);
+ }
+
+ /*
+ * Operations on the cache, i. e. cache hits, cache misses and evictions of items
+ */
+ else if (FIELD_IS ("get_hits"))
+ {
+ submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
+ hits = atof (fields[2]);
+ }
+ else if (FIELD_IS ("get_misses"))
+ {
+ submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
+ }
+ else if (FIELD_IS ("evictions"))
+ {
+ submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
+ }
+
+ /*
+ * Network traffic
+ */
+ else if (FIELD_IS ("bytes_read"))
+ {
+ octets_rx = atoll (fields[2]);
+ }
+ else if (FIELD_IS ("bytes_written"))
+ {
+ octets_tx = atoll (fields[2]);
+ }
+ } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
+
+ if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
+ submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
+
+ if ((rusage_user != 0) || (rusage_syst != 0))
+ submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
+
+ if ((octets_rx != 0) || (octets_tx != 0))
+ submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
+
+ if (!isnan (gets) && !isnan (hits))
+ {
+ gauge_t rate = NAN;
+
+ if (gets != 0.0)
+ rate = 100.0 * hits / gets;
+
+ submit_gauge ("percent", "hitratio", rate, st);
+ }
+
+ return 0;
+} /* int memcached_read */
+
+static int memcached_add_read_callback (memcached_t *st)
+{
+ user_data_t ud;
+ char callback_name[3*DATA_MAX_NAME_LEN];
+ int status;
+
+ memset (&ud, 0, sizeof (ud));
+ ud.data = st;
+ ud.free_func = (void *) memcached_free;
+
+ assert (st->name != NULL);
+ ssnprintf (callback_name, sizeof (callback_name), "memcached/%s", st->name);
+
+ status = plugin_register_complex_read (/* group = */ "memcached",
+ /* name = */ callback_name,
+ /* callback = */ memcached_read,
+ /* interval = */ NULL,
+ /* user_data = */ &ud);
+ return (status);
+} /* int memcached_add_read_callback */
+
+/* Configuration handling functiions
+ * <Plugin memcached>
+ * <Instance "instance_name">
+ * Host foo.zomg.com
+ * Port "1234"
+ * </Instance>
+ * </Plugin>
+ */
+static int config_add_instance(oconfig_item_t *ci)
+{
+ memcached_t *st;
+ int i;
+ int status = 0;
+
+ /* Disable automatic generation of default instance in the init callback. */
+ memcached_have_instances = 1;
+
+ st = malloc (sizeof (*st));
+ if (st == NULL)
+ {
+ ERROR ("memcached plugin: malloc failed.");
+ return (-1);
+ }
+
+ memset (st, 0, sizeof (*st));
+ st->name = NULL;
+ st->socket = NULL;
+ st->host = NULL;
+ st->port = NULL;
+
+ if (strcasecmp (ci->key, "Plugin") == 0) /* default instance */
+ st->name = sstrdup ("__legacy__");
+ else /* <Instance /> block */
+ status = cf_util_get_string (ci, &st->name);
+ if (status != 0)
+ {
+ sfree (st);
+ return (status);
+ }
+ assert (st->name != NULL);
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Socket", child->key) == 0)
+ status = cf_util_get_string (child, &st->socket);
+ else if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &st->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ status = cf_util_get_service (child, &st->port);
+ else
+ {
+ WARNING ("memcached plugin: Option `%s' not allowed here.",
+ child->key);
+ status = -1;
+ }
+
+ if (status != 0)
+ break;
+ }
+
+ if (status == 0)
+ status = memcached_add_read_callback (st);
+
+ if (status != 0)
+ {
+ memcached_free(st);
+ return (-1);
+ }
+
+ return (0);
}
-/* }}} */
-void module_register (void) /* {{{ */
+static int memcached_config (oconfig_item_t *ci)
{
- plugin_register_config ("memcached", memcached_config, config_keys, config_keys_num);
- plugin_register_read ("memcached", memcached_read);
+ int status = 0;
+ _Bool have_instance_block = 0;
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Instance", child->key) == 0)
+ {
+ config_add_instance (child);
+ have_instance_block = 1;
+ }
+ else if (!have_instance_block)
+ {
+ /* Non-instance option: Assume legacy configuration (without <Instance />
+ * blocks) and call config_add_instance() with the <Plugin /> block. */
+ return (config_add_instance (ci));
+ }
+ else
+ WARNING ("memcached plugin: The configuration option "
+ "\"%s\" is not allowed here. Did you "
+ "forget to add an <Instance /> block "
+ "around the configuration?",
+ child->key);
+ } /* for (ci->children) */
+
+ return (status);
}
-/* }}} */
-
-/*
- * Local variables:
- * tab-width: 4
- * c-basic-offset: 4
- * End:
- * vim600: sw=4 ts=4 fdm=marker noexpandtab
- * vim<600: sw=4 ts=4 noexpandtab
- */
+static int memcached_init (void)
+{
+ memcached_t *st;
+ int status;
+
+ if (memcached_have_instances)
+ return (0);
+
+ /* No instances were configured, lets start a default instance. */
+ st = malloc (sizeof (*st));
+ if (st == NULL)
+ return (ENOMEM);
+ memset (st, 0, sizeof (*st));
+ st->name = sstrdup ("__legacy__");
+ st->socket = NULL;
+ st->host = NULL;
+ st->port = NULL;
+
+ status = memcached_add_read_callback (st);
+ if (status == 0)
+ memcached_have_instances = 1;
+ else
+ memcached_free (st);
+
+ return (status);
+} /* int memcached_init */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("memcached", memcached_config);
+ plugin_register_init ("memcached", memcached_init);
+}
/**
* collectd - src/ntpd.c
- * Copyright (C) 2006-2007 Florian octo Forster
+ * Copyright (C) 2006-2012 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
**/
#define _BSD_SOURCE /* For NI_MAXHOST */
{
"Host",
"Port",
- "ReverseLookups"
+ "ReverseLookups",
+ "IncludeUnitID"
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
-static int do_reverse_lookups = 1;
+static _Bool do_reverse_lookups = 1;
+
+/* This option only exists for backward compatibility. If it is false and two
+ * ntpd peers use the same refclock driver, the plugin will try to write
+ * simultaneous measurements from both to the same type instance. */
+static _Bool include_unit_id = 0;
# define NTPD_DEFAULT_HOST "localhost"
# define NTPD_DEFAULT_PORT "123"
else
do_reverse_lookups = 0;
}
+ else if (strcasecmp (key, "IncludeUnitID") == 0)
+ {
+ if (IS_TRUE (value))
+ include_unit_id = 1;
+ else
+ include_unit_id = 0;
+ }
else
{
return (-1);
plugin_dispatch_values (&vl);
}
+/* Each time a peer is polled, ntpd shifts the reach register to the left and
+ * sets the LSB based on whether the peer was reachable. If the LSB is zero,
+ * the values are out of date. */
+static void ntpd_submit_reach (char *type, char *type_inst, uint8_t reach,
+ double value)
+{
+ if (!(reach & 1))
+ value = NAN;
+
+ ntpd_submit (type, type_inst, value);
+}
+
static int ntpd_connect (void)
{
char *host;
return (val_double);
}
+static uint32_t ntpd_get_refclock_id (struct info_peer_summary const *peer_info)
+{
+ uint32_t addr = ntohl (peer_info->srcadr);
+ uint32_t refclock_id = (addr >> 8) & 0x00FF;
+
+ return (refclock_id);
+}
+
+static int ntpd_get_name_from_address (char *buffer, size_t buffer_size,
+ struct info_peer_summary const *peer_info, _Bool do_reverse_lookup)
+{
+ struct sockaddr_storage sa;
+ socklen_t sa_len;
+ int flags = 0;
+ int status;
+
+ memset (&sa, 0, sizeof (sa));
+
+ if (peer_info->v6_flag)
+ {
+ struct sockaddr_in6 sa6;
+
+ assert (sizeof (sa) >= sizeof (sa6));
+
+ memset (&sa6, 0, sizeof (sa6));
+ sa6.sin6_family = AF_INET6;
+ sa6.sin6_port = htons (123);
+ memcpy (&sa6.sin6_addr, &peer_info->srcadr6,
+ sizeof (struct in6_addr));
+ sa_len = sizeof (sa6);
+
+ memcpy (&sa, &sa6, sizeof (sa6));
+ }
+ else
+ {
+ struct sockaddr_in sa4;
+
+ assert (sizeof (sa) >= sizeof (sa4));
+
+ memset (&sa4, 0, sizeof (sa4));
+ sa4.sin_family = AF_INET;
+ sa4.sin_port = htons (123);
+ memcpy (&sa4.sin_addr, &peer_info->srcadr,
+ sizeof (struct in_addr));
+ sa_len = sizeof (sa4);
+
+ memcpy (&sa, &sa4, sizeof (sa4));
+ }
+
+ if (!do_reverse_lookup)
+ flags |= NI_NUMERICHOST;
+
+ status = getnameinfo ((struct sockaddr const *) &sa, sa_len,
+ buffer, buffer_size,
+ NULL, 0, /* No port name */
+ flags);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("ntpd plugin: getnameinfo failed: %s",
+ (status == EAI_SYSTEM)
+ ? sstrerror (errno, errbuf, sizeof (errbuf))
+ : gai_strerror (status));
+ return (-1);
+ }
+
+ return (0);
+} /* ntpd_get_name_from_address */
+
+static int ntpd_get_name_refclock (char *buffer, size_t buffer_size,
+ struct info_peer_summary const *peer_info)
+{
+ uint32_t refclock_id = ntpd_get_refclock_id (peer_info);
+ uint32_t unit_id = ntohl (peer_info->srcadr) & 0x00FF;
+
+ if (refclock_id >= refclock_names_num)
+ return (ntpd_get_name_from_address (buffer, buffer_size,
+ peer_info,
+ /* do_reverse_lookup = */ 0));
+
+ if (include_unit_id)
+ ssnprintf (buffer, buffer_size, "%s-%"PRIu32,
+ refclock_names[refclock_id], unit_id);
+ else
+ sstrncpy (buffer, refclock_names[refclock_id], buffer_size);
+
+ return (0);
+} /* int ntpd_get_name_refclock */
+
+static int ntpd_get_name (char *buffer, size_t buffer_size,
+ struct info_peer_summary const *peer_info)
+{
+ uint32_t addr = ntohl (peer_info->srcadr);
+
+ if (!peer_info->v6_flag && ((addr & REFCLOCK_MASK) == REFCLOCK_ADDR))
+ return (ntpd_get_name_refclock (buffer, buffer_size,
+ peer_info));
+ else
+ return (ntpd_get_name_from_address (buffer, buffer_size,
+ peer_info, do_reverse_lookups));
+} /* int ntpd_addr_to_name */
+
static int ntpd_read (void)
{
struct info_kernel *ik;
double offset;
char peername[NI_MAXHOST];
- int refclock_id;
-
- ptr = ps + i;
- refclock_id = 0;
+ uint32_t refclock_id;
- /* Convert the `long floating point' offset value to double */
- M_LFPTOD (ntohl (ptr->offset_int), ntohl (ptr->offset_frc), offset);
+ ptr = ps + i;
- /* Special IP addresses for hardware clocks and stuff.. */
- if (!ptr->v6_flag
- && ((ntohl (ptr->srcadr) & REFCLOCK_MASK)
- == REFCLOCK_ADDR))
+ status = ntpd_get_name (peername, sizeof (peername), ptr);
+ if (status != 0)
{
- struct in_addr addr_obj;
- char *addr_str;
-
- refclock_id = (ntohl (ptr->srcadr) >> 8) & 0x000000FF;
-
- if (refclock_id < refclock_names_num)
- {
- sstrncpy (peername, refclock_names[refclock_id],
- sizeof (peername));
- }
- else
- {
- memset ((void *) &addr_obj, '\0', sizeof (addr_obj));
- addr_obj.s_addr = ptr->srcadr;
- addr_str = inet_ntoa (addr_obj);
-
- sstrncpy (peername, addr_str, sizeof (peername));
- }
+ ERROR ("ntpd plugin: Determining name of peer failed.");
+ continue;
}
- else /* Normal network host. */
- {
- struct sockaddr_storage sa;
- socklen_t sa_len;
- int flags = 0;
-
- memset (&sa, '\0', sizeof (sa));
-
- if (ptr->v6_flag)
- {
- struct sockaddr_in6 sa6;
-
- assert (sizeof (sa) >= sizeof (sa6));
-
- memset (&sa6, 0, sizeof (sa6));
- sa6.sin6_family = AF_INET6;
- sa6.sin6_port = htons (123);
- memcpy (&sa6.sin6_addr, &ptr->srcadr6,
- sizeof (struct in6_addr));
- sa_len = sizeof (sa6);
-
- memcpy (&sa, &sa6, sizeof (sa6));
- }
- else
- {
- struct sockaddr_in sa4;
-
- assert (sizeof (sa) >= sizeof (sa4));
- memset (&sa4, 0, sizeof (sa4));
- sa4.sin_family = AF_INET;
- sa4.sin_port = htons (123);
- memcpy (&sa4.sin_addr, &ptr->srcadr,
- sizeof (struct in_addr));
- sa_len = sizeof (sa4);
+ refclock_id = ntpd_get_refclock_id (ptr);
- memcpy (&sa, &sa4, sizeof (sa4));
- }
-
- if (do_reverse_lookups == 0)
- flags |= NI_NUMERICHOST;
-
- status = getnameinfo ((const struct sockaddr *) &sa,
- sa_len,
- peername, sizeof (peername),
- NULL, 0, /* No port name */
- flags);
- if (status != 0)
- {
- char errbuf[1024];
- ERROR ("ntpd plugin: getnameinfo failed: %s",
- (status == EAI_SYSTEM)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : gai_strerror (status));
- continue;
- }
- }
+ /* Convert the `long floating point' offset value to double */
+ M_LFPTOD (ntohl (ptr->offset_int), ntohl (ptr->offset_frc), offset);
DEBUG ("peer %i:\n"
" peername = %s\n"
" srcadr = 0x%08x\n"
+ " reach = 0%03o\n"
" delay = %f\n"
" offset_int = %i\n"
" offset_frc = %i\n"
i,
peername,
ntohl (ptr->srcadr),
+ ptr->reach,
ntpd_read_fp (ptr->delay),
ntohl (ptr->offset_int),
ntohl (ptr->offset_frc),
ntpd_read_fp (ptr->dispersion));
if (refclock_id != 1) /* not the system clock (offset will always be zero.. */
- ntpd_submit ("time_offset", peername, offset);
- ntpd_submit ("time_dispersion", peername, ntpd_read_fp (ptr->dispersion));
+ ntpd_submit_reach ("time_offset", peername, ptr->reach,
+ offset);
+ ntpd_submit_reach ("time_dispersion", peername, ptr->reach,
+ ntpd_read_fp (ptr->dispersion));
if (refclock_id == 0) /* not a reference clock */
- ntpd_submit ("delay", peername, ntpd_read_fp (ptr->delay));
+ ntpd_submit_reach ("delay", peername, ptr->reach,
+ ntpd_read_fp (ptr->delay));
}
free (ps);
struct o_database_s
{
char *name;
+ char *host;
char *connect_id;
char *username;
char *password;
* </Plugin>
*/
-static int o_config_set_string (char **ret_string, /* {{{ */
- oconfig_item_t *ci)
-{
- char *string;
-
- if ((ci->values_num != 1)
- || (ci->values[0].type != OCONFIG_TYPE_STRING))
- {
- WARNING ("oracle plugin: The `%s' config option "
- "needs exactly one string argument.", ci->key);
- return (-1);
- }
-
- string = strdup (ci->values[0].value.string);
- if (string == NULL)
- {
- ERROR ("oracle plugin: strdup failed.");
- return (-1);
- }
-
- if (*ret_string != NULL)
- free (*ret_string);
- *ret_string = string;
-
- return (0);
-} /* }}} int o_config_set_string */
-
static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
{
o_database_t *db;
return (-1);
}
memset (db, 0, sizeof (*db));
+ db->name = NULL;
+ db->host = NULL;
+ db->connect_id = NULL;
+ db->username = NULL;
+ db->password = NULL;
- status = o_config_set_string (&db->name, ci);
+ status = cf_util_get_string (ci, &db->name);
if (status != 0)
{
sfree (db);
oconfig_item_t *child = ci->children + i;
if (strcasecmp ("ConnectID", child->key) == 0)
- status = o_config_set_string (&db->connect_id, child);
+ status = cf_util_get_string (child, &db->connect_id);
+ else if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &db->host);
else if (strcasecmp ("Username", child->key) == 0)
- status = o_config_set_string (&db->username, child);
+ status = cf_util_get_string (child, &db->username);
else if (strcasecmp ("Password", child->key) == 0)
- status = o_config_set_string (&db->password, child);
+ status = cf_util_get_string (child, &db->password);
else if (strcasecmp ("Query", child->key) == 0)
status = udb_query_pick_from_list (child, queries, queries_num,
&db->queries, &db->queries_num);
} /* for (j = 1; j <= param_counter; j++) */
/* }}} End of the ``define'' stuff. */
- status = udb_query_prepare_result (q, prep_area, hostname_g,
+ status = udb_query_prepare_result (q, prep_area,
+ (db->host != NULL) ? db->host : hostname_g,
/* plugin = */ "oracle", db->name, column_names, column_num,
/* interval = */ 0);
if (status != 0)
C_PSQL_PARAM_DB,
C_PSQL_PARAM_USER,
C_PSQL_PARAM_INTERVAL,
+ C_PSQL_PARAM_INSTANCE,
} c_psql_param_t;
/* Parameter configuration. Stored as `user data' in the query objects. */
char *user;
char *password;
+ char *instance;
+
char *sslmode;
char *krbsrvname;
db->user = NULL;
db->password = NULL;
+ db->instance = sstrdup (name);
+
db->sslmode = NULL;
db->krbsrvname = NULL;
sfree (db->user);
sfree (db->password);
+ sfree (db->instance);
+
sfree (db->sslmode);
sfree (db->krbsrvname);
if (CONNECTION_OK != PQstatus (db->conn)) {
c_complain (LOG_ERR, &db->conn_complaint,
- "Failed to connect to database %s: %s",
- db->database, PQerrorMessage (db->conn));
+ "Failed to connect to database %s (%s): %s",
+ db->database, db->instance,
+ PQerrorMessage (db->conn));
return -1;
}
? CDTIME_T_TO_DOUBLE (db->interval) : interval_g);
params[i] = interval;
break;
+ case C_PSQL_PARAM_INSTANCE:
+ params[i] = db->instance;
+ break;
default:
assert (0);
}
else if ((NULL == data) || (0 == data->params_num))
res = c_psql_exec_query_noparams (db, q);
else {
- log_err ("Connection to database \"%s\" does not support parameters "
- "(protocol version %d) - cannot execute query \"%s\".",
- db->database, db->proto_version,
+ log_err ("Connection to database \"%s\" (%s) does not support "
+ "parameters (protocol version %d) - "
+ "cannot execute query \"%s\".",
+ db->database, db->instance, db->proto_version,
udb_query_get_name (q));
return -1;
}
host = db->host;
status = udb_query_prepare_result (q, prep_area, host, "postgresql",
- db->database, column_names, (size_t) column_num, db->interval);
+ db->instance, column_names, (size_t) column_num, db->interval);
if (0 != status) {
log_err ("udb_query_prepare_result failed with status %i.",
status);
db = ud->data;
assert (NULL != db->database);
+ assert (NULL != db->instance);
if (0 != c_psql_check_connection (db))
return -1;
data->params[data->params_num] = C_PSQL_PARAM_USER;
else if (0 == strcasecmp (param_str, "interval"))
data->params[data->params_num] = C_PSQL_PARAM_INTERVAL;
+ else if (0 == strcasecmp (param_str, "instance"))
+ data->params[data->params_num] = C_PSQL_PARAM_INSTANCE;
else {
log_err ("Invalid parameter \"%s\".", param_str);
return 1;
cf_util_get_string (c, &db->user);
else if (0 == strcasecmp (c->key, "Password"))
cf_util_get_string (c, &db->password);
+ else if (0 == strcasecmp (c->key, "Instance"))
+ cf_util_get_string (c, &db->instance);
else if (0 == strcasecmp (c->key, "SSLMode"))
cf_util_get_string (c, &db->sslmode);
else if (0 == strcasecmp (c->key, "KRBSrvName"))
ud.data = db;
ud.free_func = c_psql_database_delete;
- ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database);
+ ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance);
CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
* Copyright (C) 2009 Andrés J. DÃaz
* Copyright (C) 2009 Manuel Sanmartin
* Copyright (C) 2010 Clément Stenac
+ * Copyright (C) 2012 Cosmin Ioiart
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Andrés J. DÃaz <ajdiaz at connectical.com>
* Manuel Sanmartin
* Clément Stenac <clement.stenac at diwi.org>
+ * Cosmin Ioiart <cioiart at gmail.com>
**/
#include "collectd.h"
#define MAXARGLN 1024
/* #endif HAVE_PROCINFO_H */
+#elif KERNEL_SOLARIS
+# include <procfs.h>
+# include <dirent.h>
+/* #endif KERNEL_SOLARIS */
+
#else
# error "No applicable input method."
#endif
# include <regex.h>
#endif
+#if HAVE_KSTAT_H
+# include <kstat.h>
+#endif
+
#ifndef ARG_MAX
# define ARG_MAX 4096
#endif
#endif /* HAVE_PROCINFO_H */
/* put name of process from config to list_head_g tree
- list_head_g is a list of 'procstat_t' structs with
- processes names we want to watch */
+ * list_head_g is a list of 'procstat_t' structs with
+ * processes names we want to watch */
static void ps_list_register (const char *name, const char *regexp)
{
procstat_t *new;
}
DEBUG ("name = %s; num_proc = %lu; num_lwp = %lu; "
- "vmem_size = %lu; vmem_rss = %lu; vmem_data = %lu; "
+ "vmem_size = %lu; vmem_rss = %lu; vmem_data = %lu; "
"vmem_code = %lu; "
"vmem_minflt_counter = %"PRIi64"; vmem_majflt_counter = %"PRIi64"; "
"cpu_user_counter = %"PRIi64"; cpu_system_counter = %"PRIi64"; "
ps->io_rchar, ps->io_wchar, ps->io_syscr, ps->io_syscw);
} /* void ps_submit_proc_list */
+#if KERNEL_LINUX || KERNEL_SOLARIS
+static void ps_submit_fork_rate (derive_t value)
+{
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ values[0].derive = value;
+
+ vl.values = values;
+ vl.values_len = 1;
+ sstrncpy(vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy(vl.plugin, "processes", sizeof (vl.plugin));
+ sstrncpy(vl.plugin_instance, "", sizeof (vl.plugin_instance));
+ sstrncpy(vl.type, "fork_rate", sizeof (vl.type));
+ sstrncpy(vl.type_instance, "", sizeof (vl.type_instance));
+
+ plugin_dispatch_values(&vl);
+}
+#endif /* KERNEL_LINUX || KERNEL_SOLARIS*/
+
/* ------- additional functions for KERNEL_LINUX/HAVE_THREAD_INFO ------- */
#if KERNEL_LINUX
static int ps_read_tasks (int pid)
continue;
numfields = strsplit (buffer, fields,
- STATIC_ARRAY_SIZE (fields));
+ STATIC_ARRAY_SIZE (fields));
if (numfields < 2)
continue;
return buf;
} /* char *ps_get_cmdline (...) */
-static unsigned long read_fork_rate ()
+static int read_fork_rate ()
{
FILE *proc_stat;
- char buf[1024];
- unsigned long result = 0;
- int numfields;
- char *fields[3];
+ char buffer[1024];
+ value_t value;
+ _Bool value_valid = 0;
- proc_stat = fopen("/proc/stat", "r");
- if (proc_stat == NULL) {
+ proc_stat = fopen ("/proc/stat", "r");
+ if (proc_stat == NULL)
+ {
char errbuf[1024];
ERROR ("processes plugin: fopen (/proc/stat) failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
- return ULONG_MAX;
+ return (-1);
}
- while (fgets (buf, sizeof(buf), proc_stat) != NULL)
+ while (fgets (buffer, sizeof (buffer), proc_stat) != NULL)
{
- char *endptr;
+ int status;
+ char *fields[3];
+ int fields_num;
- numfields = strsplit(buf, fields, STATIC_ARRAY_SIZE (fields));
- if (numfields != 2)
+ fields_num = strsplit (buffer, fields,
+ STATIC_ARRAY_SIZE (fields));
+ if (fields_num != 2)
continue;
if (strcmp ("processes", fields[0]) != 0)
continue;
- errno = 0;
- endptr = NULL;
- result = strtoul(fields[1], &endptr, /* base = */ 10);
- if ((endptr == fields[1]) || (errno != 0)) {
- ERROR ("processes plugin: Cannot parse fork rate: %s",
- fields[1]);
- result = ULONG_MAX;
- break;
- }
+ status = parse_value (fields[1], &value, DS_TYPE_DERIVE);
+ if (status == 0)
+ value_valid = 1;
break;
}
-
fclose(proc_stat);
- return result;
+ if (!value_valid)
+ return (-1);
+
+ ps_submit_fork_rate (value.derive);
+ return (0);
}
+#endif /*KERNEL_LINUX */
-static void ps_submit_fork_rate (unsigned long value)
+#if KERNEL_SOLARIS
+static const char *ps_get_cmdline (pid_t pid, /* {{{ */
+ char *buffer, size_t buffer_size)
{
- value_t values[1];
- value_list_t vl = VALUE_LIST_INIT;
+ char path[PATH_MAX];
+ psinfo_t info;
+ int status;
- values[0].derive = (derive_t) value;
+ snprintf(path, sizeof (path), "/proc/%i/psinfo", pid);
- vl.values = values;
- vl.values_len = 1;
- sstrncpy (vl.host, hostname_g, sizeof (vl.host));
- sstrncpy (vl.plugin, "processes", sizeof (vl.plugin));
- sstrncpy (vl.plugin_instance, "", sizeof (vl.plugin_instance));
- sstrncpy (vl.type, "fork_rate", sizeof (vl.type));
- sstrncpy (vl.type_instance, "", sizeof (vl.type_instance));
+ status = read_file_contents (path, (void *) &info, sizeof (info));
+ if (status != ((int) buffer_size))
+ {
+ ERROR ("processes plugin: Unexpected return value "
+ "while reading \"%s\": "
+ "Returned %i but expected %zu.",
+ path, status, buffer_size);
+ return (NULL);
+ }
- plugin_dispatch_values (&vl);
+ info.pr_psargs[sizeof (info.pr_psargs) - 1] = 0;
+ sstrncpy (buffer, info.pr_psargs, buffer_size);
+
+ return (buffer);
+} /* }}} int ps_get_cmdline */
+
+/*
+ * Reads process information on the Solaris OS. The information comes mainly from
+ * /proc/PID/status, /proc/PID/psinfo and /proc/PID/usage
+ * The values for input and ouput chars are calculated "by hand"
+ * Added a few "solaris" specific process states as well
+ */
+static int ps_read_process(int pid, procstat_t *ps, char *state)
+{
+ char filename[64];
+ char f_psinfo[64], f_usage[64];
+ char *buffer;
+
+ pstatus_t *myStatus;
+ psinfo_t *myInfo;
+ prusage_t *myUsage;
+
+ snprintf(filename, sizeof (filename), "/proc/%i/status", pid);
+ snprintf(f_psinfo, sizeof (f_psinfo), "/proc/%i/psinfo", pid);
+ snprintf(f_usage, sizeof (f_usage), "/proc/%i/usage", pid);
+
+
+ buffer = malloc(sizeof (pstatus_t));
+ memset(buffer, 0, sizeof (pstatus_t));
+ read_file_contents(filename, buffer, sizeof (pstatus_t));
+ myStatus = (pstatus_t *) buffer;
+
+ buffer = malloc(sizeof (psinfo_t));
+ memset(buffer, 0, sizeof(psinfo_t));
+ read_file_contents(f_psinfo, buffer, sizeof (psinfo_t));
+ myInfo = (psinfo_t *) buffer;
+
+ buffer = malloc(sizeof (prusage_t));
+ memset(buffer, 0, sizeof(prusage_t));
+ read_file_contents(f_usage, buffer, sizeof (prusage_t));
+ myUsage = (prusage_t *) buffer;
+
+ sstrncpy(ps->name, myInfo->pr_fname, sizeof (myInfo->pr_fname));
+ ps->num_lwp = myStatus->pr_nlwp;
+ if (myInfo->pr_wstat != 0) {
+ ps->num_proc = 0;
+ ps->num_lwp = 0;
+ *state = (char) 'Z';
+ return (0);
+ } else {
+ ps->num_proc = 1;
+ ps->num_lwp = myInfo->pr_nlwp;
+ }
+
+ /*
+ * Convert system time and user time from nanoseconds to microseconds
+ * for compatibility with the linux module
+ */
+ ps->cpu_system_counter = myStatus -> pr_stime.tv_nsec / 1000;
+ ps->cpu_user_counter = myStatus -> pr_utime.tv_nsec / 1000;
+
+ /*
+ * Convert rssize from KB to bytes to be consistent w/ the linux module
+ */
+ ps->vmem_rss = myInfo->pr_rssize * 1024;
+ ps->vmem_size = myInfo->pr_size * 1024;
+ ps->vmem_minflt_counter = myUsage->pr_minf;
+ ps->vmem_majflt_counter = myUsage->pr_majf;
+
+ /*
+ * TODO: Data and code segment calculations for Solaris
+ */
+
+ ps->vmem_data = -1;
+ ps->vmem_code = -1;
+ ps->stack_size = myStatus->pr_stksize;
+
+ /*
+ * Calculating input/ouput chars
+ * Formula used is total chars / total blocks => chars/block
+ * then convert input/output blocks to chars
+ */
+ ulong_t tot_chars = myUsage->pr_ioch;
+ ulong_t tot_blocks = myUsage->pr_inblk + myUsage->pr_oublk;
+ ulong_t chars_per_block = 1;
+ if (tot_blocks != 0)
+ chars_per_block = tot_chars / tot_blocks;
+ ps->io_rchar = myUsage->pr_inblk * chars_per_block;
+ ps->io_wchar = myUsage->pr_oublk * chars_per_block;
+ ps->io_syscr = myUsage->pr_sysc;
+ ps->io_syscw = myUsage->pr_sysc;
+
+
+ /*
+ * TODO: Find way of setting BLOCKED and PAGING status
+ */
+
+ *state = (char) 'R';
+ if (myStatus->pr_flags & PR_ASLEEP)
+ *state = (char) 'S';
+ else if (myStatus->pr_flags & PR_STOPPED)
+ *state = (char) 'T';
+ else if (myStatus->pr_flags & PR_DETACH)
+ *state = (char) 'E';
+ else if (myStatus->pr_flags & PR_DAEMON)
+ *state = (char) 'A';
+ else if (myStatus->pr_flags & PR_ISSYS)
+ *state = (char) 'Y';
+ else if (myStatus->pr_flags & PR_ORPHAN)
+ *state = (char) 'O';
+
+ sfree(myStatus);
+ sfree(myInfo);
+ sfree(myUsage);
+
+ return (0);
}
-#endif /* KERNEL_LINUX */
+/*
+ * Reads the number of threads created since the last reboot. On Solaris these
+ * are retrieved from kstat (module cpu, name sys, class misc, stat nthreads).
+ * The result is the sum for all the threads created on each cpu
+ */
+static int read_fork_rate()
+{
+ extern kstat_ctl_t *kc;
+ kstat_t *ksp_chain = NULL;
+ derive_t result = 0;
+
+ if (kc == NULL)
+ return (-1);
+
+ for (ksp_chain = kc->kc_chain;
+ ksp_chain != NULL;
+ ksp_chain = ksp_chain->ks_next)
+ {
+ if ((strcmp (ksp_chain->ks_module, "cpu") == 0)
+ && (strcmp (ksp_chain->ks_name, "sys") == 0)
+ && (strcmp (ksp_chain->ks_class, "misc") == 0))
+ {
+ long long tmp;
+
+ kstat_read (kc, ksp_chain, NULL);
+
+ tmp = get_kstat_value(ksp_chain, "nthreads");
+ if (tmp != -1LL)
+ result += tmp;
+ }
+ }
+
+ ps_submit_fork_rate (result);
+ return (0);
+}
+#endif /* KERNEL_SOLARIS */
#if HAVE_THREAD_INFO
static int mach_get_task_name (task_t t, int *pid, char *name, size_t name_max_len)
procstat_entry_t pse;
char state;
- unsigned long fork_rate;
-
procstat_t *ps_ptr;
running = sleeping = zombies = stopped = paging = blocked = 0;
for (ps_ptr = list_head_g; ps_ptr != NULL; ps_ptr = ps_ptr->next)
ps_submit_proc_list (ps_ptr);
- fork_rate = read_fork_rate();
- if (fork_rate != ULONG_MAX)
- ps_submit_fork_rate(fork_rate);
+ read_fork_rate();
/* #endif KERNEL_LINUX */
#elif HAVE_LIBKVM_GETPROCS && HAVE_STRUCT_KINFO_PROC_FREEBSD
kvm_t *kd;
char errbuf[1024];
- struct kinfo_proc *procs; /* array of processes */
+ struct kinfo_proc *procs; /* array of processes */
struct kinfo_proc *proc_ptr = NULL;
- int count; /* returns number of processes */
+ int count; /* returns number of processes */
int i;
procstat_t *ps_ptr;
if (procentry[i].pi_pid == 0)
cmdline = "swapper";
cargs = cmdline;
- }
+ }
else
{
if (getargs(&procentry[i], sizeof(struct procentry64), arglist, MAXARGLN) >= 0)
for (ps = list_head_g; ps != NULL; ps = ps->next)
ps_submit_proc_list (ps);
-#endif /* HAVE_PROCINFO_H */
+/* #endif HAVE_PROCINFO_H */
+
+#elif KERNEL_SOLARIS
+ /*
+ * The Solaris section adds a few more process states and removes some
+ * process states compared to linux. Most notably there is no "PAGING"
+ * and "BLOCKED" state for a process. The rest is similar to the linux
+ * code.
+ */
+ int running = 0;
+ int sleeping = 0;
+ int zombies = 0;
+ int stopped = 0;
+ int detached = 0;
+ int daemon = 0;
+ int system = 0;
+ int orphan = 0;
+
+ struct dirent *ent;
+ DIR *proc;
+
+ int status;
+ procstat_t *ps_ptr;
+ char state;
+
+ char cmdline[PRARGSZ];
+
+ ps_list_reset ();
+
+ proc = opendir ("/proc");
+ if (proc == NULL)
+ return (-1);
+
+ while ((ent = readdir(proc)) != NULL)
+ {
+ int pid;
+ struct procstat ps;
+ procstat_entry_t pse;
+
+ if (!isdigit ((int) ent->d_name[0]))
+ continue;
+
+ if ((pid = atoi (ent->d_name)) < 1)
+ continue;
+
+ status = ps_read_process (pid, &ps, &state);
+ if (status != 0)
+ {
+ DEBUG("ps_read_process failed: %i", status);
+ continue;
+ }
+
+ pse.id = pid;
+ pse.age = 0;
+
+ pse.num_proc = ps.num_proc;
+ pse.num_lwp = ps.num_lwp;
+ pse.vmem_size = ps.vmem_size;
+ pse.vmem_rss = ps.vmem_rss;
+ pse.vmem_data = ps.vmem_data;
+ pse.vmem_code = ps.vmem_code;
+ pse.stack_size = ps.stack_size;
+
+ pse.vmem_minflt = 0;
+ pse.vmem_minflt_counter = ps.vmem_minflt_counter;
+ pse.vmem_majflt = 0;
+ pse.vmem_majflt_counter = ps.vmem_majflt_counter;
+
+ pse.cpu_user = 0;
+ pse.cpu_user_counter = ps.cpu_user_counter;
+ pse.cpu_system = 0;
+ pse.cpu_system_counter = ps.cpu_system_counter;
+
+ pse.io_rchar = ps.io_rchar;
+ pse.io_wchar = ps.io_wchar;
+ pse.io_syscr = ps.io_syscr;
+ pse.io_syscw = ps.io_syscw;
+
+ switch (state)
+ {
+ case 'R': running++; break;
+ case 'S': sleeping++; break;
+ case 'E': detached++; break;
+ case 'Z': zombies++; break;
+ case 'T': stopped++; break;
+ case 'A': daemon++; break;
+ case 'Y': system++; break;
+ case 'O': orphan++; break;
+ }
+
+
+ ps_list_add (ps.name,
+ ps_get_cmdline ((pid_t) pid,
+ cmdline, sizeof (cmdline)),
+ &pse);
+ } /* while(readdir) */
+ closedir (proc);
+
+ ps_submit_state ("running", running);
+ ps_submit_state ("sleeping", sleeping);
+ ps_submit_state ("zombies", zombies);
+ ps_submit_state ("stopped", stopped);
+ ps_submit_state ("detached", detached);
+ ps_submit_state ("daemon", daemon);
+ ps_submit_state ("system", system);
+ ps_submit_state ("orphan", orphan);
+
+ for (ps_ptr = list_head_g; ps_ptr != NULL; ps_ptr = ps_ptr->next)
+ ps_submit_proc_list (ps_ptr);
+
+ read_fork_rate();
+#endif /* KERNEL_SOLARIS */
return (0);
} /* int ps_read */
{
char name[MAX_REDIS_NODE_NAME];
char host[HOST_NAME_MAX];
+ char passwd[HOST_NAME_MAX];
int port;
int timeout;
}
else if (strcasecmp ("Timeout", option->key) == 0)
status = cf_util_get_int (option, &rn.timeout);
+ else if (strcasecmp ("Password", option->key) == 0)
+ status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd));
else
WARNING ("redis plugin: Option `%s' not allowed inside a `Node' "
"block. I'll ignore this option.", option->key);
continue;
}
+ if (strlen (rn->passwd) > 0)
+ {
+ DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd);
+ status = credis_auth(rh, rn->passwd);
+ if (status != 0)
+ {
+ WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name);
+ credis_close (rh);
+ continue;
+ }
+ }
+
memset (&info, 0, sizeof (info));
status = credis_info (rh, &info);
if (status != 0)
/**
* collectd - src/rrdcached.c
- * Copyright (C) 2008 Florian octo Forster
+ * Copyright (C) 2008-2012 Florian octo Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
**/
#include "collectd.h"
*/
static char *datadir = NULL;
static char *daemon_address = NULL;
-static int config_create_files = 1;
-static int config_collect_stats = 1;
+static _Bool config_create_files = 1;
+static _Bool config_collect_stats = 1;
static rrdcreate_config_t rrdcreate_config =
{
/* stepsize = */ 0,
return (0);
} /* int value_list_to_filename */
-static const char *config_get_string (oconfig_item_t *ci)
+static int rc_config_get_int_positive (oconfig_item_t const *ci, int *ret)
{
- if ((ci->children_num != 0) || (ci->values_num != 1)
- || ((ci->values[0].type != OCONFIG_TYPE_STRING)
- && (ci->values[0].type != OCONFIG_TYPE_BOOLEAN)))
+ int status;
+ int tmp = 0;
+
+ status = cf_util_get_int (ci, &tmp);
+ if (status != 0)
+ return (status);
+ if (tmp < 0)
+ return (EINVAL);
+
+ *ret = tmp;
+ return (0);
+} /* int rc_config_get_int_positive */
+
+static int rc_config_get_xff (oconfig_item_t const *ci, double *ret)
+{
+ double value;
+
+ if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
{
- ERROR ("rrdcached plugin: %s expects a single string argument.",
- ci->key);
- return (NULL);
+ ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
+ "in the range [0.0, 1.0)", ci->key);
+ return (EINVAL);
}
- if (ci->values[0].type == OCONFIG_TYPE_BOOLEAN) {
- if (ci->values[0].value.boolean)
- return "true";
- else
- return "false";
+ value = ci->values[0].value.number;
+ if ((value >= 0.0) && (value < 1.0))
+ {
+ *ret = value;
+ return (0);
}
- return (ci->values[0].value.string);
-} /* const char *config_get_string */
+
+ ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
+ "in the range [0.0, 1.0)", ci->key);
+ return (EINVAL);
+} /* int rc_config_get_xff */
+
+static int rc_config_add_timespan (int timespan)
+{
+ int *tmp;
+
+ if (timespan <= 0)
+ return (EINVAL);
+
+ tmp = realloc (rrdcreate_config.timespans,
+ sizeof (*rrdcreate_config.timespans)
+ * (rrdcreate_config.timespans_num + 1));
+ if (tmp == NULL)
+ return (ENOMEM);
+ rrdcreate_config.timespans = tmp;
+
+ rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
+ rrdcreate_config.timespans_num++;
+
+ return (0);
+} /* int rc_config_add_timespan */
static int rc_config (oconfig_item_t *ci)
{
int i;
- for (i = 0; i < ci->children_num; ++i) {
- const char *key = ci->children[i].key;
- const char *value = config_get_string (ci->children + i);
-
- if (value == NULL) /* config_get_strings prints error message */
- continue;
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t const *child = ci->children + i;
+ const char *key = child->key;
+ int status = 0;
if (strcasecmp ("DataDir", key) == 0)
{
- if (datadir != NULL)
- free (datadir);
- datadir = strdup (value);
- if (datadir != NULL)
+ status = cf_util_get_string (child, &datadir);
+ if (status == 0)
{
int len = strlen (datadir);
+
while ((len > 0) && (datadir[len - 1] == '/'))
{
len--;
- datadir[len] = '\0';
+ datadir[len] = 0;
}
+
if (len <= 0)
- {
- free (datadir);
- datadir = NULL;
- }
+ sfree (datadir);
}
}
else if (strcasecmp ("DaemonAddress", key) == 0)
- {
- sfree (daemon_address);
- daemon_address = strdup (value);
- if (daemon_address == NULL)
- {
- ERROR ("rrdcached plugin: strdup failed.");
- continue;
- }
- }
+ status = cf_util_get_string (child, &daemon_address);
else if (strcasecmp ("CreateFiles", key) == 0)
+ status = cf_util_get_boolean (child, &config_create_files);
+ else if (strcasecmp ("CollectStatistics", key) == 0)
+ status = cf_util_get_boolean (child, &config_collect_stats);
+ else if (strcasecmp ("StepSize", key) == 0)
{
- if (IS_FALSE (value))
- config_create_files = 0;
- else
- config_create_files = 1;
+ int tmp = -1;
+
+ status = rc_config_get_int_positive (child, &tmp);
+ if (status == 0)
+ rrdcreate_config.stepsize = (unsigned long) tmp;
}
- else if (strcasecmp ("CollectStatistics", key) == 0)
+ else if (strcasecmp ("HeartBeat", key) == 0)
+ status = rc_config_get_int_positive (child, &rrdcreate_config.heartbeat);
+ else if (strcasecmp ("RRARows", key) == 0)
+ status = rc_config_get_int_positive (child, &rrdcreate_config.rrarows);
+ else if (strcasecmp ("RRATimespan", key) == 0)
{
- if (IS_FALSE (value))
- config_collect_stats = 0;
- else
- config_collect_stats = 1;
+ int tmp = -1;
+ status = rc_config_get_int_positive (child, &tmp);
+ if (status == 0)
+ status = rc_config_add_timespan (tmp);
}
+ else if (strcasecmp ("XFF", key) == 0)
+ status = rc_config_get_xff (child, &rrdcreate_config.xff);
else
{
WARNING ("rrdcached plugin: Ignoring invalid option %s.", key);
continue;
}
+
+ if (status != 0)
+ WARNING ("rrdcached plugin: Handling the \"%s\" option failed.", key);
}
- if (daemon_address != NULL) {
+ if (daemon_address != NULL)
+ {
plugin_register_write ("rrdcached", rc_write, /* user_data = */ NULL);
plugin_register_flush ("rrdcached", rc_flush, /* user_data = */ NULL);
}
if (daemon_address == NULL)
return (-1);
- if (config_collect_stats == 0)
+ if (!config_collect_stats)
return (-1);
vl.values = values;
static int rc_init (void)
{
- if (config_collect_stats != 0)
+ if (config_collect_stats)
plugin_register_read ("rrdcached", rc_read);
return (0);
values_array[0] = values;
values_array[1] = NULL;
- if (config_create_files != 0)
+ if (config_create_files)
{
struct stat statbuf;
/**
* collectd - src/swap.c
- * Copyright (C) 2005-2010 Florian octo Forster
+ * Copyright (C) 2005-2012 Florian octo Forster
* Copyright (C) 2009 Stefan Völkel
* Copyright (C) 2009 Manuel Sanmartin
* Copyright (C) 2010 Aurélien Reynaud
#define MAX(x,y) ((x) > (y) ? (x) : (y))
#if KERNEL_LINUX
-# define SWAP_HAVE_CONFIG 1
-/* No global variables */
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
+static derive_t pagesize;
+static _Bool report_bytes = 0;
+static _Bool report_by_device = 0;
/* #endif KERNEL_LINUX */
#elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
-# define SWAP_HAVE_CONFIG 1
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
static derive_t pagesize;
+static _Bool report_by_device = 0;
/* #endif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS */
#elif defined(VM_SWAPUSAGE)
# error "No applicable input method."
#endif /* HAVE_LIBSTATGRAB */
-#if SWAP_HAVE_CONFIG
static const char *config_keys[] =
{
+ "ReportBytes",
"ReportByDevice"
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
-static _Bool report_by_device = 0;
-
static int swap_config (const char *key, const char *value) /* {{{ */
{
- if (strcasecmp ("ReportByDevice", key) == 0)
+ if (strcasecmp ("ReportBytes", key) == 0)
+ {
+#if KERNEL_LINUX
+ report_bytes = IS_TRUE (value) ? 1 : 0;
+#else
+ WARNING ("swap plugin: The \"ReportBytes\" option is only "
+ "valid under Linux. "
+ "The option is going to be ignored.");
+#endif
+ }
+ else if (strcasecmp ("ReportByDevice", key) == 0)
{
+#if SWAP_HAVE_REPORT_BY_DEVICE
if (IS_TRUE (value))
report_by_device = 1;
else
report_by_device = 0;
+#else
+ WARNING ("swap plugin: The \"ReportByDevice\" option is not "
+ "supported on this platform. "
+ "The option is going to be ignored.");
+#endif /* SWAP_HAVE_REPORT_BY_DEVICE */
}
else
{
return (0);
} /* }}} int swap_config */
-#endif /* SWAP_HAVE_CONFIG */
static int swap_init (void) /* {{{ */
{
#if KERNEL_LINUX
- /* No init stuff */
+ pagesize = (derive_t) sysconf (_SC_PAGESIZE);
/* #endif KERNEL_LINUX */
#elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
swap_submit (plugin_instance, "swap", type_instance, v);
} /* }}} void swap_submit_gauge */
-#if KERNEL_LINUX
+#if KERNEL_LINUX || HAVE_PERFSTAT
static void swap_submit_derive (const char *plugin_instance, /* {{{ */
const char *type_instance, derive_t value)
{
v.derive = value;
swap_submit (plugin_instance, "swap_io", type_instance, v);
} /* }}} void swap_submit_derive */
+#endif
+#if KERNEL_LINUX
static int swap_read_separate (void) /* {{{ */
{
FILE *fh;
if (have_data != 0x03)
return (ENOENT);
+ if (report_bytes)
+ {
+ swap_in = swap_in * pagesize;
+ swap_out = swap_out * pagesize;
+ }
+
swap_submit_derive (NULL, "in", swap_in);
swap_submit_derive (NULL, "out", swap_out);
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
+
swap_submit_gauge (NULL, "used", (gauge_t) (pmemory.pgsp_total - pmemory.pgsp_free) * pagesize);
swap_submit_gauge (NULL, "free", (gauge_t) pmemory.pgsp_free * pagesize );
+ swap_submit_gauge (NULL, "reserved", (gauge_t) pmemory.pgsp_rsvd * pagesize);
+ swap_submit_derive (NULL, "in", (derive_t) pmemory.pgspins * pagesize);
+ swap_submit_derive (NULL, "out", (derive_t) pmemory.pgspouts * pagesize);
return (0);
} /* }}} int swap_read */
void module_register (void)
{
-#if SWAP_HAVE_CONFIG
- plugin_register_config ("swap", swap_config, config_keys, config_keys_num);
-#endif
+ plugin_register_config ("swap", swap_config,
+ config_keys, config_keys_num);
plugin_register_init ("swap", swap_init);
plugin_register_read ("swap", swap_read);
} /* void module_register */
#endif
#if KERNEL_LINUX
+# include <asm/types.h>
+/* sys/socket.h is necessary to compile when using netlink on older systems. */
+# include <sys/socket.h>
+# include <linux/netlink.h>
+# include <linux/inet_diag.h>
+# include <sys/socket.h>
+# include <arpa/inet.h>
/* #endif KERNEL_LINUX */
#elif HAVE_SYSCTLBYNAME
#endif /* KERNEL_AIX */
#if KERNEL_LINUX
+struct nlreq {
+ struct nlmsghdr nlh;
+ struct inet_diag_req r;
+};
+
static const char *tcp_state[] =
{
"", /* 0 */
static int port_collect_listening = 0;
static port_entry_t *port_list_head = NULL;
+#if KERNEL_LINUX
+static uint32_t sequence_number = 0;
+
+enum
+{
+ SRC_DUNNO,
+ SRC_NETLINK,
+ SRC_PROC
+} linux_source = SRC_DUNNO;
+#endif
+
static void conn_submit_port_entry (port_entry_t *pe)
{
value_t values[1];
} /* int conn_handle_ports */
#if KERNEL_LINUX
+/* Returns zero on success, less than zero on socket error and greater than
+ * zero on other errors. */
+static int conn_read_netlink (void)
+{
+ int fd;
+ struct sockaddr_nl nladdr;
+ struct nlreq req;
+ struct msghdr msg;
+ struct iovec iov;
+ struct inet_diag_msg *r;
+ char buf[8192];
+
+ /* If this fails, it's likely a permission problem. We'll fall back to
+ * reading this information from files below. */
+ fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG);
+ if (fd < 0)
+ {
+ ERROR ("tcpconns plugin: conn_read_netlink: socket(AF_NETLINK, SOCK_RAW, "
+ "NETLINK_INET_DIAG) failed: %s",
+ sstrerror (errno, buf, sizeof (buf)));
+ return (-1);
+ }
+
+ memset(&nladdr, 0, sizeof(nladdr));
+ nladdr.nl_family = AF_NETLINK;
+
+ memset(&req, 0, sizeof(req));
+ req.nlh.nlmsg_len = sizeof(req);
+ req.nlh.nlmsg_type = TCPDIAG_GETSOCK;
+ /* NLM_F_ROOT: return the complete table instead of a single entry.
+ * NLM_F_MATCH: return all entries matching criteria (not implemented)
+ * NLM_F_REQUEST: must be set on all request messages */
+ req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
+ req.nlh.nlmsg_pid = 0;
+ /* The sequence_number is used to track our messages. Since netlink is not
+ * reliable, we don't want to end up with a corrupt or incomplete old
+ * message in case the system is/was out of memory. */
+ req.nlh.nlmsg_seq = ++sequence_number;
+ req.r.idiag_family = AF_INET;
+ req.r.idiag_states = 0xfff;
+ req.r.idiag_ext = 0;
+
+ memset(&iov, 0, sizeof(iov));
+ iov.iov_base = &req;
+ iov.iov_len = sizeof(req);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = (void*)&nladdr;
+ msg.msg_namelen = sizeof(nladdr);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (sendmsg (fd, &msg, 0) < 0)
+ {
+ ERROR ("tcpconns plugin: conn_read_netlink: sendmsg(2) failed: %s",
+ sstrerror (errno, buf, sizeof (buf)));
+ close (fd);
+ return (-1);
+ }
+
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+
+ while (1)
+ {
+ int status;
+ struct nlmsghdr *h;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = (void*)&nladdr;
+ msg.msg_namelen = sizeof(nladdr);
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ status = recvmsg(fd, (void *) &msg, /* flags = */ 0);
+ if (status < 0)
+ {
+ if ((errno == EINTR) || (errno == EAGAIN))
+ continue;
+
+ ERROR ("tcpconns plugin: conn_read_netlink: recvmsg(2) failed: %s",
+ sstrerror (errno, buf, sizeof (buf)));
+ close (fd);
+ return (-1);
+ }
+ else if (status == 0)
+ {
+ close (fd);
+ DEBUG ("tcpconns plugin: conn_read_netlink: Unexpected zero-sized "
+ "reply from netlink socket.");
+ return (0);
+ }
+
+ h = (struct nlmsghdr*)buf;
+ while (NLMSG_OK(h, status))
+ {
+ if (h->nlmsg_seq != sequence_number)
+ {
+ h = NLMSG_NEXT(h, status);
+ continue;
+ }
+
+ if (h->nlmsg_type == NLMSG_DONE)
+ {
+ close (fd);
+ return (0);
+ }
+ else if (h->nlmsg_type == NLMSG_ERROR)
+ {
+ struct nlmsgerr *msg_error;
+
+ msg_error = NLMSG_DATA(h);
+ WARNING ("tcpconns plugin: conn_read_netlink: Received error %i.",
+ msg_error->error);
+
+ close (fd);
+ return (1);
+ }
+
+ r = NLMSG_DATA(h);
+
+ /* This code does not (need to) distinguish between IPv4 and IPv6. */
+ conn_handle_ports (ntohs(r->id.idiag_sport),
+ ntohs(r->id.idiag_dport),
+ r->idiag_state);
+
+ h = NLMSG_NEXT(h, status);
+ } /* while (NLMSG_OK) */
+ } /* while (1) */
+
+ /* Not reached because the while() loop above handles the exit condition. */
+ return (0);
+} /* int conn_read_netlink */
+
static int conn_handle_line (char *buffer)
{
char *fields[32];
static int conn_read (void)
{
- int errors_num = 0;
+ int status;
conn_reset_port_entry ();
- if (conn_read_file ("/proc/net/tcp") != 0)
- errors_num++;
- if (conn_read_file ("/proc/net/tcp6") != 0)
- errors_num++;
-
- if (errors_num < 2)
+ if (linux_source == SRC_NETLINK)
{
- conn_submit_all ();
+ status = conn_read_netlink ();
}
- else
+ else if (linux_source == SRC_PROC)
{
- ERROR ("tcpconns plugin: Neither /proc/net/tcp nor /proc/net/tcp6 "
- "coult be read.");
- return (-1);
+ int errors_num = 0;
+
+ if (conn_read_file ("/proc/net/tcp") != 0)
+ errors_num++;
+ if (conn_read_file ("/proc/net/tcp6") != 0)
+ errors_num++;
+
+ if (errors_num < 2)
+ status = 0;
+ else
+ status = ENOENT;
+ }
+ else /* if (linux_source == SRC_DUNNO) */
+ {
+ /* Try to use netlink for getting this data, it is _much_ faster on systems
+ * with a large amount of connections. */
+ status = conn_read_netlink ();
+ if (status == 0)
+ {
+ INFO ("tcpconns plugin: Reading from netlink succeeded. "
+ "Will use the netlink method from now on.");
+ linux_source = SRC_NETLINK;
+ }
+ else
+ {
+ INFO ("tcpconns plugin: Reading from netlink failed. "
+ "Will read from /proc from now on.");
+ linux_source = SRC_PROC;
+
+ /* return success here to avoid the "plugin failed" message. */
+ return (0);
+ }
}
+ if (status == 0)
+ conn_submit_all ();
+ else
+ return (status);
+
return (0);
} /* int conn_read */
/* #endif KERNEL_LINUX */
/* Using sysctl interface to retrieve the boot time on *BSD / Darwin / OS X systems */
/* #endif HAVE_SYS_SYSCTL_H */
+#elif HAVE_PERFSTAT
+# include <sys/protosw.h>
+# include <libperfstat.h>
+/* Using perfstat_cpu_total to retrive the boot time in AIX */
+/* #endif HAVE_PERFSTAT */
+
#else
# error "No applicable input method."
#endif
"but `boottime' is zero!");
return (-1);
}
-#endif /* HAVE_SYS_SYSCTL_H */
+/* #endif HAVE_SYS_SYSCTL_H */
+
+#elif HAVE_PERFSTAT
+ int status;
+ perfstat_cpu_total_t cputotal;
+ int hertz;
+
+ status = perfstat_cpu_total(NULL, &cputotal,
+ sizeof(perfstat_cpu_total_t), 1);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ ERROR ("uptime plugin: perfstat_cpu_total: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ return (-1);
+ }
+
+ hertz = sysconf(_SC_CLK_TCK);
+ if (hertz <= 0)
+ hertz = HZ;
+
+ boottime = time(NULL) - cputotal.lbolt / hertz;
+#endif /* HAVE_PERFSTAT */
return (0);
} /* }}} int uptime_init */
char **names = NULL;
cdtime_t *times = NULL;
size_t number = 0;
+ size_t size_arrays = 0;
int status = 0;
pthread_mutex_lock (&cache_lock);
+ size_arrays = (size_t) c_avl_size (cache_tree);
+ if (size_arrays < 1)
+ {
+ /* Handle the "no values" case here, to avoid the error message when
+ * calloc() returns NULL. */
+ pthread_mutex_unlock (&cache_lock);
+ return (0);
+ }
+
+ names = calloc (size_arrays, sizeof (*names));
+ times = calloc (size_arrays, sizeof (*times));
+ if ((names == NULL) || (times == NULL))
+ {
+ ERROR ("uc_get_names: calloc failed.");
+ sfree (names);
+ sfree (times);
+ pthread_mutex_unlock (&cache_lock);
+ return (ENOMEM);
+ }
+
iter = c_avl_get_iterator (cache_tree);
while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
{
- char **temp;
-
/* remove missing values when list values */
if (value->state == STATE_MISSING)
continue;
- if (ret_times != NULL)
- {
- cdtime_t *tmp_times;
+ /* c_avl_size does not return a number smaller than the number of elements
+ * returned by c_avl_iterator_next. */
+ assert (number < size_arrays);
- tmp_times = (cdtime_t *) realloc (times, sizeof (cdtime_t) * (number + 1));
- if (tmp_times == NULL)
- {
- status = -1;
- break;
- }
- times = tmp_times;
+ if (ret_times != NULL)
times[number] = value->last_time;
- }
- temp = (char **) realloc (names, sizeof (char *) * (number + 1));
- if (temp == NULL)
- {
- status = -1;
- break;
- }
- names = temp;
names[number] = strdup (key);
if (names[number] == NULL)
{
status = -1;
break;
}
+
number++;
} /* while (c_avl_iterator_next) */
--- /dev/null
+/**
+ * collectd - src/utils_format_graphite.c
+ * Copyright (C) 2012 Thomas Meson
+ * Copyright (C) 2012 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Thomas Meson <zllak at hycik.org>
+ * Florian octo Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+
+#include "utils_cache.h"
+#include "utils_format_json.h"
+#include "utils_parse_option.h"
+
+/* Utils functions to format data sets in graphite format.
+ * Largely taken from write_graphite.c as it remains the same formatting */
+
+static int gr_format_values (char *ret, size_t ret_len,
+ int ds_num, const data_set_t *ds, const value_list_t *vl,
+ gauge_t const *rates)
+{
+ size_t offset = 0;
+ int status;
+
+ assert (0 == strcmp (ds->type, vl->type));
+
+ memset (ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+ status = ssnprintf (ret + offset, ret_len - offset, \
+ __VA_ARGS__); \
+ if (status < 1) \
+ { \
+ return (-1); \
+ } \
+ else if (((size_t) status) >= (ret_len - offset)) \
+ { \
+ return (-1); \
+ } \
+ else \
+ offset += ((size_t) status); \
+} while (0)
+
+ if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+ BUFFER_ADD ("%f", vl->values[ds_num].gauge);
+ else if (rates != NULL)
+ BUFFER_ADD ("%f", rates[ds_num]);
+ else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+ BUFFER_ADD ("%llu", vl->values[ds_num].counter);
+ else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+ BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
+ else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+ BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
+ else
+ {
+ ERROR ("gr_format_values plugin: Unknown data source type: %i",
+ ds->ds[ds_num].type);
+ return (-1);
+ }
+
+#undef BUFFER_ADD
+
+ return (0);
+}
+
+static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len,
+ char escape_char)
+{
+ size_t i;
+
+ memset (dst, 0, dst_len);
+
+ if (src == NULL)
+ return;
+
+ for (i = 0; i < dst_len; i++)
+ {
+ if (src[i] == 0)
+ {
+ dst[i] = 0;
+ break;
+ }
+
+ if ((src[i] == '.')
+ || isspace ((int) src[i])
+ || iscntrl ((int) src[i]))
+ dst[i] = escape_char;
+ else
+ dst[i] = src[i];
+ }
+}
+
+static int gr_format_name (char *ret, int ret_len,
+ const value_list_t *vl,
+ const char *ds_name,
+ char *prefix,
+ char *postfix,
+ char escape_char)
+{
+ char n_host[DATA_MAX_NAME_LEN];
+ char n_plugin[DATA_MAX_NAME_LEN];
+ char n_plugin_instance[DATA_MAX_NAME_LEN];
+ char n_type[DATA_MAX_NAME_LEN];
+ char n_type_instance[DATA_MAX_NAME_LEN];
+
+ char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
+ char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
+
+ if (prefix == NULL)
+ prefix = "";
+
+ if (postfix == NULL)
+ postfix = "";
+
+ gr_copy_escape_part (n_host, vl->host,
+ sizeof (n_host), escape_char);
+ gr_copy_escape_part (n_plugin, vl->plugin,
+ sizeof (n_plugin), escape_char);
+ gr_copy_escape_part (n_plugin_instance, vl->plugin_instance,
+ sizeof (n_plugin_instance), escape_char);
+ gr_copy_escape_part (n_type, vl->type,
+ sizeof (n_type), escape_char);
+ gr_copy_escape_part (n_type_instance, vl->type_instance,
+ sizeof (n_type_instance), escape_char);
+
+ if (n_plugin_instance[0] != '\0')
+ ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
+ n_plugin,
+ '-',
+ n_plugin_instance);
+ else
+ sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
+
+ if (n_type_instance[0] != '\0')
+ ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
+ n_type,
+ '-',
+ n_type_instance);
+ else
+ sstrncpy (tmp_type, n_type, sizeof (tmp_type));
+
+ if (ds_name != NULL)
+ ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
+ prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
+ else
+ ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
+ prefix, n_host, postfix, tmp_plugin, tmp_type);
+
+ return (0);
+}
+
+int format_graphite (char *buffer, size_t buffer_size,
+ const data_set_t *ds, const value_list_t *vl, char *prefix,
+ char *postfix, char escape_char,
+ _Bool store_rates)
+{
+ int status = 0;
+ int i;
+ int buffer_pos = 0;
+
+ gauge_t *rates = NULL;
+ if (store_rates)
+ rates = uc_get_rate (ds, vl);
+
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ const char *ds_name = NULL;
+ char key[10*DATA_MAX_NAME_LEN];
+ char values[512];
+ size_t message_len;
+ char message[1024];
+
+ ds_name = ds->ds[i].name;
+
+ /* Copy the identifier to `key' and escape it. */
+ status = gr_format_name (key, sizeof (key), vl, ds_name,
+ prefix, postfix, escape_char);
+ if (status != 0)
+ {
+ ERROR ("format_graphite: error with gr_format_name");
+ sfree (rates);
+ return (status);
+ }
+
+ escape_string (key, sizeof (key));
+ /* Convert the values to an ASCII representation and put that into
+ * `values'. */
+ status = gr_format_values (values, sizeof (values), i, ds, vl, rates);
+ if (status != 0)
+ {
+ ERROR ("format_graphite: error with gr_format_values");
+ sfree (rates);
+ return (status);
+ }
+
+ /* Compute the graphite command */
+ message_len = (size_t) ssnprintf (message, sizeof (message),
+ "%s %s %u\r\n",
+ key,
+ values,
+ (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+ if (message_len >= sizeof (message)) {
+ ERROR ("format_graphite: message buffer too small: "
+ "Need %zu bytes.", message_len + 1);
+ sfree (rates);
+ return (-ENOMEM);
+ }
+
+ /* Append it in case we got multiple data set */
+ if ((buffer_pos + message_len) >= buffer_size)
+ {
+ ERROR ("format_graphite: target buffer too small");
+ sfree (rates);
+ return (-ENOMEM);
+ }
+ memcpy((void *) (buffer + buffer_pos), message, message_len);
+ buffer_pos += message_len;
+ }
+ sfree (rates);
+ return (status);
+} /* int format_graphite */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
--- /dev/null
+/**
+ * collectd - src/utils_format_graphite.h
+ * Copyright (C) 2012 Thomas Meson
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Thomas Meson <zllak at hycik.org>
+ **/
+
+#ifndef UTILS_FORMAT_GRAPHITE_H
+#define UTILS_FORMAT_GRAPHITE_H 1
+
+#include "collectd.h"
+#include "plugin.h"
+
+int format_graphite (char *buffer,
+ size_t buffer_size, const data_set_t *ds,
+ const value_list_t *vl, const char *prefix,
+ const char *postfix, const char escape_char,
+ _Bool store_rates);
+
+#endif /* UTILS_FORMAT_GRAPHITE_H */
#undef BUFFER_ADD
return (0);
-} /* }}} int buffer_add_string */
+} /* }}} int escape_string */
static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
const data_set_t *ds, const value_list_t *vl, int store_rates)
} /* }}} int values_to_json */
static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */
- const data_set_t *ds, const value_list_t *vl)
+ const data_set_t *ds)
{
size_t offset = 0;
int i;
} /* }}} int dstypes_to_json */
static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */
- const data_set_t *ds, const value_list_t *vl)
+ const data_set_t *ds)
{
size_t offset = 0;
int i;
return (0);
} /* }}} int dsnames_to_json */
+static int meta_data_to_json (char *buffer, size_t buffer_size, /* {{{ */
+ meta_data_t *meta)
+{
+ size_t offset = 0;
+ char **keys = NULL;
+ int keys_num;
+ int status;
+ int i;
+
+ memset (buffer, 0, buffer_size);
+
+#define BUFFER_ADD(...) do { \
+ status = ssnprintf (buffer + offset, buffer_size - offset, \
+ __VA_ARGS__); \
+ if (status < 1) \
+ return (-1); \
+ else if (((size_t) status) >= (buffer_size - offset)) \
+ return (-ENOMEM); \
+ else \
+ offset += ((size_t) status); \
+} while (0)
+
+ keys_num = meta_data_toc (meta, &keys);
+ for (i = 0; i < keys_num; ++i)
+ {
+ int type;
+ char *key = keys[i];
+
+ type = meta_data_type (meta, key);
+ if (type == MD_TYPE_STRING)
+ {
+ char *value = NULL;
+ if (meta_data_get_string (meta, key, &value) == 0)
+ {
+ char temp[512] = "";
+ escape_string (temp, sizeof (temp), value);
+ sfree (value);
+ BUFFER_ADD (",\"%s\":%s", key, temp);
+ }
+ }
+ else if (type == MD_TYPE_SIGNED_INT)
+ {
+ int64_t value = 0;
+ if (meta_data_get_signed_int (meta, key, &value) == 0)
+ BUFFER_ADD (",\"%s\":%"PRIi64, key, value);
+ }
+ else if (type == MD_TYPE_UNSIGNED_INT)
+ {
+ uint64_t value = 0;
+ if (meta_data_get_unsigned_int (meta, key, &value) == 0)
+ BUFFER_ADD (",\"%s\":%"PRIu64, key, value);
+ }
+ else if (type == MD_TYPE_DOUBLE)
+ {
+ double value = 0.0;
+ if (meta_data_get_double (meta, key, &value) == 0)
+ BUFFER_ADD (",\"%s\":%f", key, value);
+ }
+ else if (type == MD_TYPE_BOOLEAN)
+ {
+ _Bool value = 0;
+ if (meta_data_get_boolean (meta, key, &value) == 0)
+ BUFFER_ADD (",\"%s\":%s", key, value ? "true" : "false");
+ }
+
+ free (key);
+ } /* for (keys) */
+ free (keys);
+
+ if (offset <= 0)
+ return (ENOENT);
+
+ buffer[0] = '{'; /* replace leading ',' */
+ BUFFER_ADD ("}");
+
+#undef BUFFER_ADD
+
+ return (0);
+} /* int meta_data_to_json */
+
static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
const data_set_t *ds, const value_list_t *vl, int store_rates)
{
return (status);
BUFFER_ADD ("\"values\":%s", temp);
- status = dstypes_to_json (temp, sizeof (temp), ds, vl);
+ status = dstypes_to_json (temp, sizeof (temp), ds);
if (status != 0)
return (status);
BUFFER_ADD (",\"dstypes\":%s", temp);
- status = dsnames_to_json (temp, sizeof (temp), ds, vl);
+ status = dsnames_to_json (temp, sizeof (temp), ds);
if (status != 0)
return (status);
BUFFER_ADD (",\"dsnames\":%s", temp);
BUFFER_ADD_KEYVAL ("type", vl->type);
BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance);
+ if (vl->meta != NULL)
+ {
+ char meta_buffer[buffer_size];
+ memset (meta_buffer, 0, sizeof (meta_buffer));
+ status = meta_data_to_json (meta_buffer, sizeof (meta_buffer), vl->meta);
+ if (status != 0)
+ return (status);
+
+ BUFFER_ADD (",\"meta\":%s", meta_buffer);
+ } /* if (vl->meta != NULL) */
+
BUFFER_ADD ("}");
#undef BUFFER_ADD_KEYVAL
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "utils_vl_lookup.h"
+#include "utils_avltree.h"
+
+#if BUILD_TEST
+# define sstrncpy strncpy
+# define plugin_log(s, ...) do { \
+ printf ("[severity %i] ", s); \
+ printf (__VA_ARGS__); \
+ printf ("\n"); \
+} while (0)
+#endif
+
+/*
+ * Types
+ */
+struct lookup_s
+{
+ c_avl_tree_t *by_type_tree;
+
+ lookup_class_callback_t cb_user_class;
+ lookup_obj_callback_t cb_user_obj;
+ lookup_free_class_callback_t cb_free_class;
+ lookup_free_obj_callback_t cb_free_obj;
+};
+
+struct user_obj_s;
+typedef struct user_obj_s user_obj_t;
+struct user_obj_s
+{
+ void *user_obj;
+ identifier_t ident;
+
+ user_obj_t *next;
+};
+
+struct user_class_s
+{
+ void *user_class;
+ identifier_t ident;
+ user_obj_t *user_obj_list; /* list of user_obj */
+};
+typedef struct user_class_s user_class_t;
+
+struct user_class_list_s;
+typedef struct user_class_list_s user_class_list_t;
+struct user_class_list_s
+{
+ user_class_t entry;
+ user_class_list_t *next;
+};
+
+struct by_type_entry_s
+{
+ c_avl_tree_t *by_plugin_tree; /* plugin -> user_class_list_t */
+ user_class_list_t *wildcard_plugin_list;
+};
+typedef struct by_type_entry_s by_type_entry_t;
+
+/*
+ * Private functions
+ */
+static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_t *user_class)
+{
+ user_obj_t *user_obj;
+
+ user_obj = malloc (sizeof (*user_obj));
+ if (user_obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (NULL);
+ }
+ memset (user_obj, 0, sizeof (*user_obj));
+ user_obj->next = NULL;
+
+ user_obj->user_obj = obj->cb_user_class (ds, vl, user_class->user_class);
+ if (user_obj->user_obj == NULL)
+ {
+ sfree (user_obj);
+ WARNING("utils_vl_lookup: User-provided constructor failed.");
+ return (NULL);
+ }
+
+ sstrncpy (user_obj->ident.host,
+ LU_IS_ALL (user_class->ident.host) ? "/all/" : vl->host,
+ sizeof (user_obj->ident.host));
+ sstrncpy (user_obj->ident.plugin,
+ LU_IS_ALL (user_class->ident.plugin) ? "/all/" : vl->plugin,
+ sizeof (user_obj->ident.plugin));
+ sstrncpy (user_obj->ident.plugin_instance,
+ LU_IS_ALL (user_class->ident.plugin_instance) ? "/all/" : vl->plugin_instance,
+ sizeof (user_obj->ident.plugin_instance));
+ sstrncpy (user_obj->ident.type,
+ LU_IS_ALL (user_class->ident.type) ? "/all/" : vl->type,
+ sizeof (user_obj->ident.type));
+ sstrncpy (user_obj->ident.type_instance,
+ LU_IS_ALL (user_class->ident.type_instance) ? "/all/" : vl->type_instance,
+ sizeof (user_obj->ident.type_instance));
+
+ if (user_class->user_obj_list == NULL)
+ {
+ user_class->user_obj_list = user_obj;
+ }
+ else
+ {
+ user_obj_t *last = user_class->user_obj_list;
+ while (last->next != NULL)
+ last = last->next;
+ last->next = user_obj;
+ }
+
+ return (user_obj);
+} /* }}} void *lu_create_user_obj */
+
+static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
+ value_list_t const *vl)
+{
+ user_obj_t *ptr;
+
+ for (ptr = user_class->user_obj_list;
+ ptr != NULL;
+ ptr = ptr->next)
+ {
+ if (!LU_IS_ALL (ptr->ident.host)
+ && (strcmp (ptr->ident.host, vl->host) != 0))
+ continue;
+ if (!LU_IS_ALL (ptr->ident.plugin_instance)
+ && (strcmp (ptr->ident.plugin_instance, vl->plugin_instance) != 0))
+ continue;
+ if (!LU_IS_ALL (ptr->ident.type_instance)
+ && (strcmp (ptr->ident.type_instance, vl->type_instance) != 0))
+ continue;
+
+ return (ptr);
+ }
+
+ return (NULL);
+} /* }}} user_obj_t *lu_find_user_obj */
+
+static int lu_handle_user_class (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_t *user_class)
+{
+ user_obj_t *user_obj;
+ int status;
+
+ assert (strcmp (vl->type, user_class->ident.type) == 0);
+ assert (LU_IS_WILDCARD (user_class->ident.plugin)
+ || (strcmp (vl->plugin, user_class->ident.plugin) == 0));
+
+ /* When we get here, type and plugin already match the user class. Now check
+ * the rest of the fields. */
+ if (!LU_IS_WILDCARD (user_class->ident.type_instance)
+ && (strcmp (vl->type_instance, user_class->ident.type_instance) != 0))
+ return (1);
+ if (!LU_IS_WILDCARD (user_class->ident.plugin_instance)
+ && (strcmp (vl->plugin_instance,
+ user_class->ident.plugin_instance) != 0))
+ return (1);
+ if (!LU_IS_WILDCARD (user_class->ident.host)
+ && (strcmp (vl->host, user_class->ident.host) != 0))
+ return (1);
+
+ user_obj = lu_find_user_obj (user_class, vl);
+ if (user_obj == NULL)
+ {
+ /* call lookup_class_callback_t() and insert into the list of user objects. */
+ user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+ if (user_obj == NULL)
+ return (-1);
+ }
+
+ status = obj->cb_user_obj (ds, vl,
+ user_class->user_class, user_obj->user_obj);
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: The user object callback failed with status %i.",
+ status);
+ /* Returning a negative value means: abort! */
+ if (status < 0)
+ return (status);
+ else
+ return (1);
+ }
+
+ return (0);
+} /* }}} int lu_handle_user_class */
+
+static int lu_handle_user_class_list (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl,
+ user_class_list_t *user_class_list)
+{
+ user_class_list_t *ptr;
+ int retval = 0;
+
+ for (ptr = user_class_list; ptr != NULL; ptr = ptr->next)
+ {
+ int status;
+
+ status = lu_handle_user_class (obj, ds, vl, &ptr->entry);
+ if (status < 0)
+ return (status);
+ else if (status == 0)
+ retval++;
+ }
+
+ return (retval);
+} /* }}} int lu_handle_user_class_list */
+
+static by_type_entry_t *lu_search_by_type (lookup_t *obj, /* {{{ */
+ char const *type, _Bool allocate_if_missing)
+{
+ by_type_entry_t *by_type;
+ char *type_copy;
+ int status;
+
+ status = c_avl_get (obj->by_type_tree, type, (void *) &by_type);
+ if (status == 0)
+ return (by_type);
+
+ if (!allocate_if_missing)
+ return (NULL);
+
+ type_copy = strdup (type);
+ if (type_copy == NULL)
+ {
+ ERROR ("utils_vl_lookup: strdup failed.");
+ return (NULL);
+ }
+
+ by_type = malloc (sizeof (*by_type));
+ if (by_type == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ sfree (type_copy);
+ return (NULL);
+ }
+ memset (by_type, 0, sizeof (*by_type));
+ by_type->wildcard_plugin_list = NULL;
+
+ by_type->by_plugin_tree = c_avl_create ((void *) strcmp);
+ if (by_type->by_plugin_tree == NULL)
+ {
+ ERROR ("utils_vl_lookup: c_avl_create failed.");
+ sfree (by_type);
+ sfree (type_copy);
+ return (NULL);
+ }
+
+ status = c_avl_insert (obj->by_type_tree,
+ /* key = */ type_copy, /* value = */ by_type);
+ assert (status <= 0); /* >0 => entry exists => race condition. */
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: c_avl_insert failed.");
+ c_avl_destroy (by_type->by_plugin_tree);
+ sfree (by_type);
+ sfree (type_copy);
+ return (NULL);
+ }
+
+ return (by_type);
+} /* }}} by_type_entry_t *lu_search_by_type */
+
+static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
+ identifier_t const *ident, user_class_list_t *user_class_list)
+{
+ user_class_list_t *ptr = NULL;
+
+ /* Lookup user_class_list from the per-plugin structure. If this is the first
+ * user_class to be added, the blocks return immediately. Otherwise they will
+ * set "ptr" to non-NULL. */
+ if (LU_IS_WILDCARD (ident->plugin))
+ {
+ if (by_type->wildcard_plugin_list == NULL)
+ {
+ by_type->wildcard_plugin_list = user_class_list;
+ return (0);
+ }
+
+ ptr = by_type->wildcard_plugin_list;
+ } /* if (plugin is wildcard) */
+ else /* (plugin is not wildcard) */
+ {
+ int status;
+
+ status = c_avl_get (by_type->by_plugin_tree,
+ ident->plugin, (void *) &ptr);
+
+ if (status != 0) /* plugin not yet in tree */
+ {
+ char *plugin_copy = strdup (ident->plugin);
+
+ if (plugin_copy == NULL)
+ {
+ ERROR ("utils_vl_lookup: strdup failed.");
+ sfree (user_class_list);
+ return (ENOMEM);
+ }
+
+ status = c_avl_insert (by_type->by_plugin_tree,
+ plugin_copy, user_class_list);
+ if (status != 0)
+ {
+ ERROR ("utils_vl_lookup: c_avl_insert(\"%s\") failed with status %i.",
+ plugin_copy, status);
+ sfree (plugin_copy);
+ sfree (user_class_list);
+ return (status);
+ }
+ else
+ {
+ return (0);
+ }
+ } /* if (plugin not yet in tree) */
+ } /* if (plugin is not wildcard) */
+
+ assert (ptr != NULL);
+
+ while (ptr->next != NULL)
+ ptr = ptr->next;
+ ptr->next = user_class_list;
+
+ return (0);
+} /* }}} int lu_add_by_plugin */
+
+static void lu_destroy_user_obj (lookup_t *obj, /* {{{ */
+ user_obj_t *user_obj)
+{
+ while (user_obj != NULL)
+ {
+ user_obj_t *next = user_obj->next;
+
+ if (obj->cb_free_obj != NULL)
+ obj->cb_free_obj (user_obj->user_obj);
+ user_obj->user_obj = NULL;
+
+ sfree (user_obj);
+ user_obj = next;
+ }
+} /* }}} void lu_destroy_user_obj */
+
+static void lu_destroy_user_class_list (lookup_t *obj, /* {{{ */
+ user_class_list_t *user_class_list)
+{
+ while (user_class_list != NULL)
+ {
+ user_class_list_t *next = user_class_list->next;
+
+ if (obj->cb_free_class != NULL)
+ obj->cb_free_class (user_class_list->entry.user_class);
+ user_class_list->entry.user_class = NULL;
+
+ lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
+ user_class_list->entry.user_obj_list = NULL;
+
+ sfree (user_class_list);
+ user_class_list = next;
+ }
+} /* }}} void lu_destroy_user_class_list */
+
+static void lu_destroy_by_type (lookup_t *obj, /* {{{ */
+ by_type_entry_t *by_type)
+{
+
+ while (42)
+ {
+ char *plugin = NULL;
+ user_class_list_t *user_class_list = NULL;
+ int status;
+
+ status = c_avl_pick (by_type->by_plugin_tree,
+ (void *) &plugin, (void *) &user_class_list);
+ if (status != 0)
+ break;
+
+ DEBUG ("utils_vl_lookup: lu_destroy_by_type: Destroying plugin \"%s\".",
+ plugin);
+ sfree (plugin);
+ lu_destroy_user_class_list (obj, user_class_list);
+ }
+
+ c_avl_destroy (by_type->by_plugin_tree);
+ by_type->by_plugin_tree = NULL;
+
+ lu_destroy_user_class_list (obj, by_type->wildcard_plugin_list);
+ by_type->wildcard_plugin_list = NULL;
+
+ sfree (by_type);
+} /* }}} int lu_destroy_by_type */
+
+/*
+ * Public functions
+ */
+lookup_t *lookup_create (lookup_class_callback_t cb_user_class, /* {{{ */
+ lookup_obj_callback_t cb_user_obj,
+ lookup_free_class_callback_t cb_free_class,
+ lookup_free_obj_callback_t cb_free_obj)
+{
+ lookup_t *obj = malloc (sizeof (*obj));
+ if (obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (NULL);
+ }
+ memset (obj, 0, sizeof (*obj));
+
+ obj->by_type_tree = c_avl_create ((void *) strcmp);
+ if (obj->by_type_tree == NULL)
+ {
+ ERROR ("utils_vl_lookup: c_avl_create failed.");
+ sfree (obj);
+ return (NULL);
+ }
+
+ obj->cb_user_class = cb_user_class;
+ obj->cb_user_obj = cb_user_obj;
+ obj->cb_free_class = cb_free_class;
+ obj->cb_free_obj = cb_free_obj;
+
+ return (obj);
+} /* }}} lookup_t *lookup_create */
+
+void lookup_destroy (lookup_t *obj) /* {{{ */
+{
+ int status;
+
+ if (obj == NULL)
+ return;
+
+ while (42)
+ {
+ char *type = NULL;
+ by_type_entry_t *by_type = NULL;
+
+ status = c_avl_pick (obj->by_type_tree, (void *) &type, (void *) &by_type);
+ if (status != 0)
+ break;
+
+ DEBUG ("utils_vl_lookup: lookup_destroy: Destroying type \"%s\".", type);
+ sfree (type);
+ lu_destroy_by_type (obj, by_type);
+ }
+
+ c_avl_destroy (obj->by_type_tree);
+ obj->by_type_tree = NULL;
+
+ sfree (obj);
+} /* }}} void lookup_destroy */
+
+int lookup_add (lookup_t *obj, /* {{{ */
+ identifier_t const *ident, void *user_class)
+{
+ by_type_entry_t *by_type = NULL;
+ user_class_list_t *user_class_obj;
+
+ by_type = lu_search_by_type (obj, ident->type, /* allocate = */ 1);
+ if (by_type == NULL)
+ return (-1);
+
+ user_class_obj = malloc (sizeof (*user_class_obj));
+ if (user_class_obj == NULL)
+ {
+ ERROR ("utils_vl_lookup: malloc failed.");
+ return (ENOMEM);
+ }
+ memset (user_class_obj, 0, sizeof (*user_class_obj));
+ user_class_obj->entry.user_class = user_class;
+ memmove (&user_class_obj->entry.ident, ident, sizeof (*ident));
+ user_class_obj->entry.user_obj_list = NULL;
+ user_class_obj->next = NULL;
+
+ return (lu_add_by_plugin (by_type, ident, user_class_obj));
+} /* }}} int lookup_add */
+
+/* returns the number of successful calls to the callback function */
+int lookup_search (lookup_t *obj, /* {{{ */
+ data_set_t const *ds, value_list_t const *vl)
+{
+ by_type_entry_t *by_type = NULL;
+ user_class_list_t *user_class_list = NULL;
+ int retval = 0;
+ int status;
+
+ if ((obj == NULL) || (ds == NULL) || (vl == NULL))
+ return (-EINVAL);
+
+ by_type = lu_search_by_type (obj, vl->type, /* allocate = */ 0);
+ if (by_type == NULL)
+ return (0);
+
+ status = c_avl_get (by_type->by_plugin_tree,
+ vl->plugin, (void *) &user_class_list);
+ if (status == 0)
+ {
+ status = lu_handle_user_class_list (obj, ds, vl, user_class_list);
+ if (status < 0)
+ return (status);
+ retval += status;
+ }
+
+ if (by_type->wildcard_plugin_list != NULL)
+ {
+ status = lu_handle_user_class_list (obj, ds, vl,
+ by_type->wildcard_plugin_list);
+ if (status < 0)
+ return (status);
+ retval += status;
+ }
+
+ return (retval);
+} /* }}} lookup_search */
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup.h
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#ifndef UTILS_VL_LOOKUP_H
+#define UTILS_VL_LOOKUP_H 1
+
+#include "plugin.h"
+
+/*
+ * Types
+ */
+struct lookup_s;
+typedef struct lookup_s lookup_t;
+
+/* Given a user_class, constructs a new user_obj. */
+typedef void *(*lookup_class_callback_t) (data_set_t const *ds,
+ value_list_t const *vl, void *user_class);
+
+/* Given a user_class and a ds/vl combination, does stuff with the data.
+ * This is the main working horse of the module. */
+typedef int (*lookup_obj_callback_t) (data_set_t const *ds,
+ value_list_t const *vl,
+ void *user_class, void *user_obj);
+
+/* Used to free user_class pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_class_callback_t) (void *user_class);
+
+/* Used to free user_obj pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_obj_callback_t) (void *user_obj);
+
+struct identifier_s
+{
+ char host[DATA_MAX_NAME_LEN];
+ char plugin[DATA_MAX_NAME_LEN];
+ char plugin_instance[DATA_MAX_NAME_LEN];
+ char type[DATA_MAX_NAME_LEN];
+ char type_instance[DATA_MAX_NAME_LEN];
+};
+typedef struct identifier_s identifier_t;
+
+#define LU_ANY "/any/"
+#define LU_ALL "/all/"
+
+#define LU_IS_ANY(str) (strcmp (str, LU_ANY) == 0)
+#define LU_IS_ALL(str) (strcmp (str, LU_ALL) == 0)
+#define LU_IS_WILDCARD(str) (LU_IS_ANY(str) || LU_IS_ALL(str))
+
+/*
+ * Functions
+ */
+__attribute__((nonnull(1,2)))
+lookup_t *lookup_create (lookup_class_callback_t,
+ lookup_obj_callback_t,
+ lookup_free_class_callback_t,
+ lookup_free_obj_callback_t);
+void lookup_destroy (lookup_t *obj);
+
+int lookup_add (lookup_t *obj,
+ identifier_t const *ident, void *user_class);
+
+/* TODO(octo): Pass lookup_obj_callback_t to lookup_search()? */
+int lookup_search (lookup_t *obj,
+ data_set_t const *ds, value_list_t const *vl);
+
+#endif /* UTILS_VL_LOOKUP_H */
--- /dev/null
+/**
+ * collectd - src/utils_vl_lookup_test.c
+ * Copyright (C) 2012 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "utils_vl_lookup.h"
+
+static _Bool expect_new_obj = 0;
+static _Bool have_new_obj = 0;
+
+static identifier_t last_class_ident;
+static identifier_t last_obj_ident;
+
+static data_source_t dsrc_test = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_test = { "test", 1, &dsrc_test };
+
+static data_source_t dsrc_unknown = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_unknown = { "unknown", 1, &dsrc_unknown };
+
+static int lookup_obj_callback (data_set_t const *ds,
+ value_list_t const *vl,
+ void *user_class, void *user_obj)
+{
+ identifier_t *class = user_class;
+ identifier_t *obj = user_obj;
+
+ assert (expect_new_obj == have_new_obj);
+
+ memcpy (&last_class_ident, class, sizeof (last_class_ident));
+ memcpy (&last_obj_ident, obj, sizeof (last_obj_ident));
+
+ if (strcmp (obj->plugin_instance, "failure") == 0)
+ return (-1);
+
+ return (0);
+}
+
+static void *lookup_class_callback (data_set_t const *ds,
+ value_list_t const *vl, void *user_class)
+{
+ identifier_t *class = user_class;
+ identifier_t *obj;
+
+ assert (expect_new_obj);
+
+ memcpy (&last_class_ident, class, sizeof (last_class_ident));
+
+ obj = malloc (sizeof (*obj));
+ strncpy (obj->host, vl->host, sizeof (obj->host));
+ strncpy (obj->plugin, vl->plugin, sizeof (obj->plugin));
+ strncpy (obj->plugin_instance, vl->plugin_instance, sizeof (obj->plugin_instance));
+ strncpy (obj->type, vl->type, sizeof (obj->type));
+ strncpy (obj->type_instance, vl->type_instance, sizeof (obj->type_instance));
+
+ have_new_obj = 1;
+
+ return ((void *) obj);
+}
+
+static void checked_lookup_add (lookup_t *obj, /* {{{ */
+ char const *host,
+ char const *plugin, char const *plugin_instance,
+ char const *type, char const *type_instance)
+{
+ identifier_t ident;
+ void *user_class;
+ int status;
+
+ memset (&ident, 0, sizeof (ident));
+ strncpy (ident.host, host, sizeof (ident.host));
+ strncpy (ident.plugin, plugin, sizeof (ident.plugin));
+ strncpy (ident.plugin_instance, plugin_instance, sizeof (ident.plugin_instance));
+ strncpy (ident.type, type, sizeof (ident.type));
+ strncpy (ident.type_instance, type_instance, sizeof (ident.type_instance));
+
+ user_class = malloc (sizeof (ident));
+ memmove (user_class, &ident, sizeof (ident));
+
+ status = lookup_add (obj, &ident, user_class);
+ assert (status == 0);
+} /* }}} void test_add */
+
+static int checked_lookup_search (lookup_t *obj,
+ char const *host,
+ char const *plugin, char const *plugin_instance,
+ char const *type, char const *type_instance,
+ _Bool expect_new)
+{
+ int status;
+ value_list_t vl = VALUE_LIST_STATIC;
+ data_set_t const *ds = &ds_unknown;
+
+ strncpy (vl.host, host, sizeof (vl.host));
+ strncpy (vl.plugin, plugin, sizeof (vl.plugin));
+ strncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
+ strncpy (vl.type, type, sizeof (vl.type));
+ strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+ if (strcmp (vl.type, "test") == 0)
+ ds = &ds_test;
+
+ expect_new_obj = expect_new;
+ have_new_obj = 0;
+
+ status = lookup_search (obj, ds, &vl);
+ return (status);
+}
+
+static lookup_t *checked_lookup_create (void)
+{
+ lookup_t *obj = lookup_create (
+ lookup_class_callback,
+ lookup_obj_callback,
+ (void *) free,
+ (void *) free);
+ assert (obj != NULL);
+ return (obj);
+}
+
+static void testcase0 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+
+ checked_lookup_add (obj, "/any/", "test", "", "test", "/all/");
+ checked_lookup_search (obj, "host0", "test", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host0", "test", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "test", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host1", "test", "", "test", "1",
+ /* expect new = */ 0);
+
+ lookup_destroy (obj);
+}
+
+static void testcase1 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+
+ checked_lookup_add (obj, "/any/", "/all/", "/all/", "test", "/all/");
+ checked_lookup_search (obj, "host0", "plugin0", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host0", "plugin0", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host0", "plugin1", "", "test", "0",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host0", "plugin1", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin0", "", "test", "0",
+ /* expect new = */ 1);
+ checked_lookup_search (obj, "host1", "plugin0", "", "test", "1",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin1", "", "test", "0",
+ /* expect new = */ 0);
+ checked_lookup_search (obj, "host1", "plugin1", "", "test", "1",
+ /* expect new = */ 0);
+
+ lookup_destroy (obj);
+}
+
+static void testcase2 (void)
+{
+ lookup_t *obj = checked_lookup_create ();
+ int status;
+
+ checked_lookup_add (obj, "/any/", "plugin0", "", "test", "/all/");
+ checked_lookup_add (obj, "/any/", "/all/", "", "test", "ti0");
+
+ status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "",
+ /* expect new = */ 0);
+ assert (status == 0);
+ status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "",
+ /* expect new = */ 1);
+ assert (status == 1);
+ status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "ti0",
+ /* expect new = */ 1);
+ assert (status == 1);
+ status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "ti0",
+ /* expect new = */ 0);
+ assert (status == 2);
+
+ lookup_destroy (obj);
+}
+
+int main (int argc, char **argv) /* {{{ */
+{
+ testcase0 ();
+ testcase1 ();
+ testcase2 ();
+ return (EXIT_SUCCESS);
+} /* }}} int main */
#include "utils_cache.h"
#include "utils_parse_option.h"
+#include "utils_format_graphite.h"
/* Folks without pthread will need to disable this plugin. */
#include <pthread.h>
return (status);
}
-static int wg_format_values (char *ret, size_t ret_len,
- int ds_num, const data_set_t *ds, const value_list_t *vl,
- _Bool store_rates)
-{
- size_t offset = 0;
- int status;
- gauge_t *rates = NULL;
-
- assert (0 == strcmp (ds->type, vl->type));
-
- memset (ret, 0, ret_len);
-
-#define BUFFER_ADD(...) do { \
- status = ssnprintf (ret + offset, ret_len - offset, \
- __VA_ARGS__); \
- if (status < 1) \
- { \
- sfree (rates); \
- return (-1); \
- } \
- else if (((size_t) status) >= (ret_len - offset)) \
- { \
- sfree (rates); \
- return (-1); \
- } \
- else \
- offset += ((size_t) status); \
-} while (0)
-
- if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
- BUFFER_ADD ("%f", vl->values[ds_num].gauge);
- else if (store_rates)
- {
- if (rates == NULL)
- rates = uc_get_rate (ds, vl);
- if (rates == NULL)
- {
- WARNING ("format_values: "
- "uc_get_rate failed.");
- return (-1);
- }
- BUFFER_ADD ("%g", rates[ds_num]);
- }
- else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
- BUFFER_ADD ("%llu", vl->values[ds_num].counter);
- else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
- BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
- else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
- BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
- else
- {
- ERROR ("format_values plugin: Unknown data source type: %i",
- ds->ds[ds_num].type);
- sfree (rates);
- return (-1);
- }
-
-#undef BUFFER_ADD
-
- sfree (rates);
- return (0);
-}
-
-static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len,
- char escape_char)
-{
- size_t i;
-
- memset (dst, 0, dst_len);
-
- if (src == NULL)
- return;
-
- for (i = 0; i < dst_len; i++)
- {
- if (src[i] == 0)
- {
- dst[i] = 0;
- break;
- }
-
- if ((src[i] == '.')
- || isspace ((int) src[i])
- || iscntrl ((int) src[i]))
- dst[i] = escape_char;
- else
- dst[i] = src[i];
- }
-}
-
-static int wg_format_name (char *ret, int ret_len,
- const value_list_t *vl,
- const struct wg_callback *cb,
- const char *ds_name)
-{
- char n_host[DATA_MAX_NAME_LEN];
- char n_plugin[DATA_MAX_NAME_LEN];
- char n_plugin_instance[DATA_MAX_NAME_LEN];
- char n_type[DATA_MAX_NAME_LEN];
- char n_type_instance[DATA_MAX_NAME_LEN];
-
- char *prefix;
- char *postfix;
-
- char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
- char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
-
- prefix = cb->prefix;
- if (prefix == NULL)
- prefix = "";
-
- postfix = cb->postfix;
- if (postfix == NULL)
- postfix = "";
-
- wg_copy_escape_part (n_host, vl->host,
- sizeof (n_host), cb->escape_char);
- wg_copy_escape_part (n_plugin, vl->plugin,
- sizeof (n_plugin), cb->escape_char);
- wg_copy_escape_part (n_plugin_instance, vl->plugin_instance,
- sizeof (n_plugin_instance), cb->escape_char);
- wg_copy_escape_part (n_type, vl->type,
- sizeof (n_type), cb->escape_char);
- wg_copy_escape_part (n_type_instance, vl->type_instance,
- sizeof (n_type_instance), cb->escape_char);
-
- if (n_plugin_instance[0] != '\0')
- ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
- n_plugin,
- cb->separate_instances ? '.' : '-',
- n_plugin_instance);
- else
- sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
-
- if (n_type_instance[0] != '\0')
- ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
- n_type,
- cb->separate_instances ? '.' : '-',
- n_type_instance);
- else
- sstrncpy (tmp_type, n_type, sizeof (tmp_type));
-
- if (ds_name != NULL)
- ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
- prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
- else
- ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
- prefix, n_host, postfix, tmp_plugin, tmp_type);
-
- return (0);
-}
-
-static int wg_send_message (const char* key, const char* value,
- cdtime_t time, struct wg_callback *cb)
+static int wg_send_message (char const *message, struct wg_callback *cb)
{
int status;
size_t message_len;
- char message[1024];
-
- message_len = (size_t) ssnprintf (message, sizeof (message),
- "%s %s %u\r\n",
- key,
- value,
- (unsigned int) CDTIME_T_TO_TIME_T (time));
- if (message_len >= sizeof (message)) {
- ERROR ("write_graphite plugin: message buffer too small: "
- "Need %zu bytes.", message_len + 1);
- return (-1);
- }
+
+ message_len = strlen (message);
pthread_mutex_lock (&cb->send_lock);
static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
struct wg_callback *cb)
{
- char key[10*DATA_MAX_NAME_LEN];
- char values[512];
-
- int status, i;
+ char buffer[4096];
+ int status;
if (0 != strcmp (ds->type, vl->type))
{
return -1;
}
- for (i = 0; i < ds->ds_num; i++)
- {
- const char *ds_name = NULL;
-
- if (cb->always_append_ds || (ds->ds_num > 1))
- ds_name = ds->ds[i].name;
-
- /* Copy the identifier to `key' and escape it. */
- status = wg_format_name (key, sizeof (key), vl, cb, ds_name);
- if (status != 0)
- {
- ERROR ("write_graphite plugin: error with format_name");
- return (status);
- }
-
- escape_string (key, sizeof (key));
- /* Convert the values to an ASCII representation and put that into
- * `values'. */
- status = wg_format_values (values, sizeof (values), i, ds, vl,
- cb->store_rates);
- if (status != 0)
- {
- ERROR ("write_graphite plugin: error with "
- "wg_format_values");
- return (status);
- }
+ memset (buffer, 0, sizeof (buffer));
+ status = format_graphite (buffer, sizeof (buffer), ds, vl,
+ cb->prefix, cb->postfix, cb->escape_char, cb->store_rates);
+ if (status != 0) /* error message has been printed already. */
+ return (status);
- /* Send the message to graphite */
- status = wg_send_message (key, values, vl->time, cb);
- if (status != 0)
- {
- ERROR ("write_graphite plugin: error with "
- "wg_send_message");
- return (status);
- }
+ wg_send_message (buffer, cb);
+ if (status != 0)
+ {
+ ERROR ("write_graphite plugin: wg_send_message failed "
+ "with status %i.", status);
+ return (status);
}
return (0);
-}
+} /* int wg_write_messages */
static int wg_write (const data_set_t *ds, const value_list_t *vl,
user_data_t *user_data)