Merge branch 'collectd-5.1'
authorFlorian Forster <octo@collectd.org>
Mon, 12 Nov 2012 07:28:16 +0000 (08:28 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 12 Nov 2012 07:28:16 +0000 (08:28 +0100)
32 files changed:
AUTHORS
configure.in
contrib/README
contrib/collectd.service [new file with mode: 0644]
src/Makefile.am
src/aggregation.c [new file with mode: 0644]
src/amqp.c
src/collectd.conf.in
src/collectd.conf.pod
src/common.c
src/common.h
src/contextswitch.c
src/disk.c
src/liboconfig/parser.y
src/memcached.c
src/ntpd.c
src/oracle.c
src/postgresql.c
src/processes.c
src/redis.c
src/rrdcached.c
src/swap.c
src/tcpconns.c
src/uptime.c
src/utils_cache.c
src/utils_format_graphite.c [new file with mode: 0644]
src/utils_format_graphite.h [new file with mode: 0644]
src/utils_format_json.c
src/utils_vl_lookup.c [new file with mode: 0644]
src/utils_vl_lookup.h [new file with mode: 0644]
src/utils_vl_lookup_test.c [new file with mode: 0644]
src/write_graphite.c

diff --git a/AUTHORS b/AUTHORS
index 90560e3..78dbad1 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -200,6 +200,9 @@ Sven Trenkel <collectd at semidefinite.de>
  - netapp plugin.
  - python plugin.
 
+Thomas Meson <zllak at hycik.org>
+ - Graphite support for the AMQP plugin.
+
 Tomasz Pala <gotar at pld-linux.org>
  - conntrack plugin.
 
index eff8d61..4a15976 100644 (file)
@@ -4636,11 +4636,13 @@ fi
 if test "x$with_perfstat" = "xyes"
 then
        plugin_cpu="yes"
+       plugin_contextswitch="yes"
        plugin_disk="yes"
        plugin_memory="yes"
        plugin_swap="yes"
        plugin_interface="yes"
        plugin_load="yes"
+       plugin_uptime="yes"
 fi
 
 if test "x$with_procinfo" = "xyes"
@@ -4850,6 +4852,7 @@ AC_ARG_ENABLE([all-plugins],
 
 m4_divert_once([HELP_ENABLE], [])
 
+AC_PLUGIN([aggregation], [yes],                [Aggregation plugin])
 AC_PLUGIN([amqp],        [$with_librabbitmq],  [AMQP output plugin])
 AC_PLUGIN([apache],      [$with_libcurl],      [Apache httpd statistics])
 AC_PLUGIN([apcups],      [yes],                [Statistics of UPSes by APC])
@@ -5182,6 +5185,7 @@ Configuration:
     perl  . . . . . . . . $with_perl_bindings
 
   Modules:
+    aggregation . . . . . $enable_aggregation
     amqp    . . . . . . . $enable_amqp
     apache  . . . . . . . $enable_apache
     apcups  . . . . . . . $enable_apcups
index bc1fe9f..1ebf1f1 100644 (file)
@@ -101,3 +101,8 @@ solaris-smf
 -----------
   Manifest file for the Solaris SMF system and detailed information on how to
 register collectd as a service with this system.
+
+collectd.service
+----------------
+  Service file for systemd. Please ship this file as
+  /lib/systemd/system/collectd.service in any linux package of collectd.
diff --git a/contrib/collectd.service b/contrib/collectd.service
new file mode 100644 (file)
index 0000000..ee4d596
--- /dev/null
@@ -0,0 +1,15 @@
+[Unit]
+Description=statistics collection daemon
+Documentation=man:collectd(1)
+After=local-fs.target network.target
+Requires=local-fs.target network.target
+
+[Service]
+ExecStart=/usr/sbin/collectd -C /etc/collectd/collectd.conf -f
+Restart=always
+RestartSec=10
+StandardOutput=syslog
+StandardError=syslog
+
+[Install]
+WantedBy=multi-user.target
index 1891f7a..58ab17e 100644 (file)
@@ -119,10 +119,21 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES = 
 CLEANFILES = 
 
+if BUILD_PLUGIN_AGGREGATION
+pkglib_LTLIBRARIES += aggregation.la
+aggregation_la_SOURCES = aggregation.c \
+                         utils_vl_lookup.c utils_vl_lookup.h
+aggregation_la_LDFLAGS = -module -avoid-version
+aggregation_la_LIBADD =
+collectd_LDADD += "-dlopen" aggregation.la
+collectd_DEPENDENCIES += aggregation.la
+endif
+
 if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
                  utils_cmd_putval.c utils_cmd_putval.h \
+                 utils_format_graphite.c utils_format_graphite.h \
                  utils_format_json.c utils_format_json.h
 amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
 amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
@@ -212,6 +223,10 @@ if BUILD_PLUGIN_CONTEXTSWITCH
 pkglib_LTLIBRARIES += contextswitch.la
 contextswitch_la_SOURCES = contextswitch.c
 contextswitch_la_LDFLAGS = -module -avoid-version
+contextswitch_la_LIBADD =
+if BUILD_WITH_PERFSTAT
+contextswitch_la_LIBADD += -lperfstat
+endif
 collectd_LDADD += "-dlopen" contextswitch.la
 collectd_DEPENDENCIES += contextswitch.la
 endif
@@ -1191,6 +1206,9 @@ uptime_la_LIBADD =
 if BUILD_WITH_LIBKSTAT
 uptime_la_LIBADD += -lkstat
 endif
+if BUILD_WITH_PERFSTAT
+uptime_la_LIBADD += -lperfstat
+endif
 collectd_LDADD += "-dlopen" uptime.la
 collectd_DEPENDENCIES += uptime.la
 endif
@@ -1256,7 +1274,8 @@ endif
 if BUILD_PLUGIN_WRITE_GRAPHITE
 pkglib_LTLIBRARIES += write_graphite.la
 write_graphite_la_SOURCES = write_graphite.c \
-                       utils_format_json.c utils_format_json.h
+                        utils_format_graphite.c utils_format_graphite.h \
+                        utils_format_json.c utils_format_json.h
 write_graphite_la_LDFLAGS = -module -avoid-version
 collectd_LDADD += "-dlopen" write_graphite.la
 collectd_DEPENDENCIES += write_graphite.la
@@ -1390,3 +1409,16 @@ uninstall-hook:
        rm -f $(DESTDIR)$(pkgdatadir)/types.db;
        rm -f $(DESTDIR)$(sysconfdir)/collectd.conf
        rm -f $(DESTDIR)$(pkgdatadir)/postgresql_default.conf;
+
+if BUILD_FEATURE_DEBUG
+bin_PROGRAMS += utils_vl_lookup_test
+utils_vl_lookup_test_SOURCES = utils_vl_lookup_test.c \
+                               utils_vl_lookup.h utils_vl_lookup.c \
+                               utils_avltree.c utils_avltree.h \
+                               common.h
+
+utils_vl_lookup_test_CPPFLAGS =  $(AM_CPPFLAGS) $(LTDLINCL) -DBUILD_TEST=1
+utils_vl_lookup_test_CFLAGS = $(AM_CFLAGS)
+utils_vl_lookup_test_LDFLAGS = -export-dynamic
+utils_vl_lookup_test_LDADD =
+endif
diff --git a/src/aggregation.c b/src/aggregation.c
new file mode 100644 (file)
index 0000000..db33c17
--- /dev/null
@@ -0,0 +1,684 @@
+/**
+ * collectd - src/aggregation.c
+ * Copyright (C) 2012       Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#include "configfile.h"
+#include "meta_data.h"
+#include "utils_cache.h" /* for uc_get_rate() */
+#include "utils_vl_lookup.h"
+
+#include <pthread.h>
+
+struct aggregation_s /* {{{ */
+{
+  identifier_t ident;
+
+  _Bool calc_num;
+  _Bool calc_sum;
+  _Bool calc_average;
+  _Bool calc_min;
+  _Bool calc_max;
+  _Bool calc_stddev;
+}; /* }}} */
+typedef struct aggregation_s aggregation_t;
+
+struct agg_instance_s;
+typedef struct agg_instance_s agg_instance_t;
+struct agg_instance_s /* {{{ */
+{
+  pthread_mutex_t lock;
+  identifier_t ident;
+
+  int ds_type;
+
+  derive_t num;
+  gauge_t sum;
+  gauge_t squares_sum;
+
+  gauge_t min;
+  gauge_t max;
+
+  rate_to_value_state_t *state_num;
+  rate_to_value_state_t *state_sum;
+  rate_to_value_state_t *state_average;
+  rate_to_value_state_t *state_min;
+  rate_to_value_state_t *state_max;
+  rate_to_value_state_t *state_stddev;
+
+  agg_instance_t *next;
+}; /* }}} */
+
+static lookup_t *lookup = NULL;
+
+static pthread_mutex_t agg_instance_list_lock = PTHREAD_MUTEX_INITIALIZER;
+static agg_instance_t *agg_instance_list_head = NULL;
+
+static void agg_destroy (aggregation_t *agg) /* {{{ */
+{
+  sfree (agg);
+} /* }}} void agg_destroy */
+
+/* Frees all dynamically allocated memory within the instance. */
+static void agg_instance_destroy (agg_instance_t *inst) /* {{{ */
+{
+  if (inst == NULL)
+    return;
+
+  /* Remove this instance from the global list of instances. */
+  pthread_mutex_lock (&agg_instance_list_lock);
+  if (agg_instance_list_head == inst)
+    agg_instance_list_head = inst->next;
+  else if (agg_instance_list_head != NULL)
+  {
+    agg_instance_t *prev = agg_instance_list_head;
+    while ((prev != NULL) && (prev->next != inst))
+      prev = prev->next;
+    if (prev != NULL)
+      prev->next = inst->next;
+  }
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  sfree (inst->state_num);
+  sfree (inst->state_sum);
+  sfree (inst->state_average);
+  sfree (inst->state_min);
+  sfree (inst->state_max);
+  sfree (inst->state_stddev);
+
+  memset (inst, 0, sizeof (*inst));
+  inst->ds_type = -1;
+  inst->min = NAN;
+  inst->max = NAN;
+} /* }}} void agg_instance_destroy */
+
+/* Create a new aggregation instance. */
+static agg_instance_t *agg_instance_create (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl, aggregation_t *agg)
+{
+  agg_instance_t *inst;
+
+  DEBUG ("aggregation plugin: Creating new instance.");
+
+  inst = malloc (sizeof (*inst));
+  if (inst == NULL)
+  {
+    ERROR ("aggregation plugin: malloc() failed.");
+    return (NULL);
+  }
+  memset (inst, 0, sizeof (*inst));
+  pthread_mutex_init (&inst->lock, /* attr = */ NULL);
+
+  inst->ds_type = ds->ds[0].type;
+
+#define COPY_FIELD(fld) do { \
+  sstrncpy (inst->ident.fld, \
+      LU_IS_ANY (agg->ident.fld) ? vl->fld : agg->ident.fld, \
+      sizeof (inst->ident.fld)); \
+} while (0)
+
+  COPY_FIELD (host);
+  COPY_FIELD (plugin);
+  COPY_FIELD (plugin_instance);
+  COPY_FIELD (type);
+  COPY_FIELD (type_instance);
+
+#undef COPY_FIELD
+
+  inst->min = NAN;
+  inst->max = NAN;
+
+#define INIT_STATE(field) do { \
+  inst->state_ ## field = NULL; \
+  if (agg->calc_ ## field) { \
+    inst->state_ ## field = malloc (sizeof (*inst->state_ ## field)); \
+    if (inst->state_ ## field == NULL) { \
+      agg_instance_destroy (inst); \
+      ERROR ("aggregation plugin: malloc() failed."); \
+      return (NULL); \
+    } \
+    memset (inst->state_ ## field, 0, sizeof (*inst->state_ ## field)); \
+  } \
+} while (0)
+
+  INIT_STATE (num);
+  INIT_STATE (sum);
+  INIT_STATE (average);
+  INIT_STATE (min);
+  INIT_STATE (max);
+  INIT_STATE (stddev);
+
+#undef INIT_STATE
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+  inst->next = agg_instance_list_head;
+  agg_instance_list_head = inst;
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return (inst);
+} /* }}} agg_instance_t *agg_instance_create */
+
+/* Update the num, sum, min, max, ... fields of the aggregation instance, if
+ * the rate of the value list is available. Value lists with more than one data
+ * source are not supported and will return an error. Returns zero on success
+ * and non-zero otherwise. */
+static int agg_instance_update (agg_instance_t *inst, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl)
+{
+  gauge_t *rate;
+
+  if (ds->ds_num != 1)
+  {
+    ERROR ("aggregation plugin: The \"%s\" type (data set) has more than one "
+        "data source. This is currently not supported by this plugin. "
+        "Sorry.", ds->type);
+    return (EINVAL);
+  }
+
+  rate = uc_get_rate (ds, vl);
+  if (rate == NULL)
+  {
+    char ident[6 * DATA_MAX_NAME_LEN];
+    FORMAT_VL (ident, sizeof (ident), vl);
+    ERROR ("aggregation plugin: Unable to read the current rate of \"%s\".",
+        ident);
+    return (ENOENT);
+  }
+
+  if (isnan (rate[0]))
+  {
+    sfree (rate);
+    return (0);
+  }
+
+  pthread_mutex_lock (&inst->lock);
+
+  inst->num++;
+  inst->sum += rate[0];
+  inst->squares_sum += (rate[0] * rate[0]);
+
+  if (isnan (inst->min) || (inst->min > rate[0]))
+    inst->min = rate[0];
+  if (isnan (inst->max) || (inst->max < rate[0]))
+    inst->max = rate[0];
+
+  pthread_mutex_unlock (&inst->lock);
+
+  sfree (rate);
+  return (0);
+} /* }}} int agg_instance_update */
+
+static int agg_instance_read_func (agg_instance_t *inst, /* {{{ */
+  char const *func, gauge_t rate, rate_to_value_state_t *state,
+  value_list_t *vl, char const *pi_prefix, cdtime_t t)
+{
+  value_t v;
+  int status;
+
+  if (pi_prefix[0] != 0)
+    ssnprintf (vl->plugin_instance, sizeof (vl->plugin_instance), "%s-%s",
+        pi_prefix, func);
+  else
+    sstrncpy (vl->plugin_instance, func, sizeof (vl->plugin_instance));
+
+  memset (&v, 0, sizeof (v));
+  status = rate_to_value (&v, rate, state, inst->ds_type, t);
+  if (status != 0)
+  {
+    /* If this is the first iteration and rate_to_value() was asked to return a
+     * COUNTER or a DERIVE, it will return EAGAIN. Catch this and handle
+     * gracefully. */
+    if (status == EAGAIN)
+      return (0);
+
+    WARNING ("aggregation plugin: rate_to_value failed with status %i.",
+        status);
+    return (-1);
+  }
+
+  vl->values = &v;
+  vl->values_len = 1;
+
+  plugin_dispatch_values_secure (vl);
+
+  vl->values = NULL;
+  vl->values_len = 0;
+
+  return (0);
+} /* }}} int agg_instance_read_func */
+
+static int agg_instance_read (agg_instance_t *inst, cdtime_t t) /* {{{ */
+{
+  value_list_t vl = VALUE_LIST_INIT;
+  char pi_prefix[DATA_MAX_NAME_LEN];
+
+  /* Pre-set all the fields in the value list that will not change per
+   * aggregation type (sum, average, ...). The struct will be re-used and must
+   * therefore be dispatched using the "secure" function. */
+
+  vl.time = t;
+  vl.interval = 0;
+
+  vl.meta = meta_data_create ();
+  if (vl.meta == NULL)
+  {
+    ERROR ("aggregation plugin: meta_data_create failed.");
+    return (-1);
+  }
+  meta_data_add_boolean (vl.meta, "aggregation:created", 1);
+
+  if (LU_IS_ALL (inst->ident.host))
+    sstrncpy (vl.host, "global", sizeof (vl.host));
+  else
+    sstrncpy (vl.host, inst->ident.host, sizeof (vl.host));
+
+  sstrncpy (vl.plugin, "aggregation", sizeof (vl.plugin));
+
+  if (LU_IS_ALL (inst->ident.plugin))
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, "", sizeof (pi_prefix));
+    else
+      sstrncpy (pi_prefix, inst->ident.plugin_instance, sizeof (pi_prefix));
+  }
+  else
+  {
+    if (LU_IS_ALL (inst->ident.plugin_instance))
+      sstrncpy (pi_prefix, inst->ident.plugin, sizeof (pi_prefix));
+    else
+      ssnprintf (pi_prefix, sizeof (pi_prefix),
+          "%s-%s", inst->ident.plugin, inst->ident.plugin_instance);
+  }
+
+  sstrncpy (vl.type, inst->ident.type, sizeof (vl.type));
+
+  if (!LU_IS_ALL (inst->ident.type_instance))
+    sstrncpy (vl.type_instance, inst->ident.type_instance,
+        sizeof (vl.type_instance));
+
+#define READ_FUNC(func, rate) do { \
+  if (inst->state_ ## func != NULL) { \
+    agg_instance_read_func (inst, #func, rate, \
+        inst->state_ ## func, &vl, pi_prefix, t); \
+  } \
+} while (0)
+
+  pthread_mutex_lock (&inst->lock);
+
+  READ_FUNC (num, (gauge_t) inst->num);
+
+  /* All other aggregations are only defined when there have been any values
+   * at all. */
+  if (inst->num > 0)
+  {
+    READ_FUNC (sum, inst->sum);
+    READ_FUNC (average, (inst->sum / ((gauge_t) inst->num)));
+    READ_FUNC (min, inst->min);
+    READ_FUNC (max, inst->max);
+    READ_FUNC (stddev, sqrt((((gauge_t) inst->num) * inst->squares_sum)
+          - (inst->sum * inst->sum)) / ((gauge_t) inst->num));
+  }
+
+  /* Reset internal state. */
+  inst->num = 0;
+  inst->sum = 0.0;
+  inst->squares_sum = 0.0;
+  inst->min = NAN;
+  inst->max = NAN;
+
+  pthread_mutex_unlock (&inst->lock);
+
+  meta_data_destroy (vl.meta);
+  vl.meta = NULL;
+
+  return (0);
+} /* }}} int agg_instance_read */
+
+/* lookup_class_callback_t for utils_vl_lookup */
+static void *agg_lookup_class_callback ( /* {{{ */
+    __attribute__((unused)) data_set_t const *ds,
+    value_list_t const *vl, void *user_class)
+{
+  return (agg_instance_create (ds, vl, (aggregation_t *) user_class));
+} /* }}} void *agg_class_callback */
+
+/* lookup_obj_callback_t for utils_vl_lookup */
+static int agg_lookup_obj_callback (data_set_t const *ds, /* {{{ */
+    value_list_t const *vl,
+    __attribute__((unused)) void *user_class,
+    void *user_obj)
+{
+  return (agg_instance_update ((agg_instance_t *) user_obj, ds, vl));
+} /* }}} int agg_lookup_obj_callback */
+
+/* lookup_free_class_callback_t for utils_vl_lookup */
+static void agg_lookup_free_class_callback (void *user_class) /* {{{ */
+{
+  agg_destroy ((aggregation_t *) user_class);
+} /* }}} void agg_lookup_free_class_callback */
+
+/* lookup_free_obj_callback_t for utils_vl_lookup */
+static void agg_lookup_free_obj_callback (void *user_obj) /* {{{ */
+{
+  agg_instance_destroy ((agg_instance_t *) user_obj);
+} /* }}} void agg_lookup_free_obj_callback */
+
+/*
+ * <Plugin "aggregation">
+ *   <Aggregation>
+ *     Plugin "cpu"
+ *     Type "cpu"
+ *
+ *     GroupBy Host
+ *     GroupBy TypeInstance
+ *
+ *     CalculateNum true
+ *     CalculateSum true
+ *     CalculateAverage true
+ *     CalculateMinimum true
+ *     CalculateMaximum true
+ *     CalculateStddev true
+ *   </Aggregation>
+ * </Plugin>
+ */
+static int agg_config_handle_group_by (oconfig_item_t const *ci, /* {{{ */
+    aggregation_t *agg)
+{
+  int i;
+
+  for (i = 0; i < ci->values_num; i++)
+  {
+    char const *value;
+
+    if (ci->values[i].type != OCONFIG_TYPE_STRING)
+    {
+      ERROR ("aggregation plugin: Argument %i of the \"GroupBy\" option "
+          "is not a string.", i + 1);
+      continue;
+    }
+
+    value = ci->values[i].value.string;
+
+    if (strcasecmp ("Host", value) == 0)
+      sstrncpy (agg->ident.host, LU_ANY, sizeof (agg->ident.host));
+    else if (strcasecmp ("Plugin", value) == 0)
+      sstrncpy (agg->ident.plugin, LU_ANY, sizeof (agg->ident.plugin));
+    else if (strcasecmp ("PluginInstance", value) == 0)
+      sstrncpy (agg->ident.plugin_instance, LU_ANY,
+          sizeof (agg->ident.plugin_instance));
+    else if (strcasecmp ("TypeInstance", value) == 0)
+      sstrncpy (agg->ident.type_instance, LU_ANY, sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("Type", value) == 0)
+      ERROR ("aggregation plugin: Grouping by type is not supported.");
+    else
+      WARNING ("aggregation plugin: The \"%s\" argument to the \"GroupBy\" "
+          "option is invalid and will be ignored.", value);
+  } /* for (ci->values) */
+
+  return (0);
+} /* }}} int agg_config_handle_group_by */
+
+static int agg_config_aggregation (oconfig_item_t *ci) /* {{{ */
+{
+  aggregation_t *agg;
+  _Bool is_valid;
+  int status;
+  int i;
+
+  agg = malloc (sizeof (*agg));
+  if (agg == NULL)
+  {
+    ERROR ("aggregation plugin: malloc failed.");
+    return (-1);
+  }
+  memset (agg, 0, sizeof (*agg));
+
+  sstrncpy (agg->ident.host, LU_ALL, sizeof (agg->ident.host));
+  sstrncpy (agg->ident.plugin, LU_ALL, sizeof (agg->ident.plugin));
+  sstrncpy (agg->ident.plugin_instance, LU_ALL,
+      sizeof (agg->ident.plugin_instance));
+  sstrncpy (agg->ident.type, LU_ALL, sizeof (agg->ident.type));
+  sstrncpy (agg->ident.type_instance, LU_ALL,
+      sizeof (agg->ident.type_instance));
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Host", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.host,
+          sizeof (agg->ident.host));
+    else if (strcasecmp ("Plugin", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin,
+          sizeof (agg->ident.plugin));
+    else if (strcasecmp ("PluginInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.plugin_instance,
+          sizeof (agg->ident.plugin_instance));
+    else if (strcasecmp ("Type", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type,
+          sizeof (agg->ident.type));
+    else if (strcasecmp ("TypeInstance", child->key) == 0)
+      cf_util_get_string_buffer (child, agg->ident.type_instance,
+          sizeof (agg->ident.type_instance));
+    else if (strcasecmp ("GroupBy", child->key) == 0)
+      agg_config_handle_group_by (child, agg);
+    else if (strcasecmp ("CalculateNum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_num);
+    else if (strcasecmp ("CalculateSum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_sum);
+    else if (strcasecmp ("CalculateAverage", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_average);
+    else if (strcasecmp ("CalculateMinimum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_min);
+    else if (strcasecmp ("CalculateMaximum", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_max);
+    else if (strcasecmp ("CalculateStddev", child->key) == 0)
+      cf_util_get_boolean (child, &agg->calc_stddev);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  /* Sanity checking */
+  is_valid = 1;
+  if (LU_IS_ALL (agg->ident.type)) /* {{{ */
+  {
+    ERROR ("aggregation plugin: It appears you did not specify the required "
+        "\"Type\" option in this aggregation. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  }
+  else if (strchr (agg->ident.type, '/') != NULL)
+  {
+    ERROR ("aggregation plugin: The \"Type\" may not contain the '/' "
+        "character. Especially, it may not be a wildcard. The current "
+        "value is \"%s\".", agg->ident.type);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!LU_IS_ALL (agg->ident.host) /* {{{ */
+      && !LU_IS_ALL (agg->ident.plugin)
+      && !LU_IS_ALL (agg->ident.plugin_instance)
+      && !LU_IS_ALL (agg->ident.type_instance))
+  {
+    ERROR ("aggregation plugin: An aggregation must contain at least one "
+        "wildcard. This is achieved by leaving at least one of the \"Host\", "
+        "\"Plugin\", \"PluginInstance\" and \"TypeInstance\" options blank "
+        "and not grouping by that field. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!agg->calc_num && !agg->calc_sum && !agg->calc_average /* {{{ */
+      && !agg->calc_min && !agg->calc_max && !agg->calc_stddev)
+  {
+    ERROR ("aggregation plugin: No aggregation function has been specified. "
+        "Without this, I don't know what I should be calculating. "
+        "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+        "Type \"%s\", TypeInstance \"%s\")",
+        agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+        agg->ident.type, agg->ident.type_instance);
+    is_valid = 0;
+  } /* }}} */
+
+  if (!is_valid) /* {{{ */
+  {
+    sfree (agg);
+    return (-1);
+  } /* }}} */
+
+  status = lookup_add (lookup, &agg->ident, agg);
+  if (status != 0)
+  {
+    ERROR ("aggregation plugin: lookup_add failed with status %i.", status);
+    sfree (agg);
+    return (-1);
+  }
+
+  DEBUG ("aggregation plugin: Successfully added aggregation: "
+      "(Host \"%s\", Plugin \"%s\", PluginInstance \"%s\", "
+      "Type \"%s\", TypeInstance \"%s\")",
+      agg->ident.host, agg->ident.plugin, agg->ident.plugin_instance,
+      agg->ident.type, agg->ident.type_instance);
+  return (0);
+} /* }}} int agg_config_aggregation */
+
+static int agg_config (oconfig_item_t *ci) /* {{{ */
+{
+  int i;
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+
+  if (lookup == NULL)
+  {
+    lookup = lookup_create (agg_lookup_class_callback,
+        agg_lookup_obj_callback,
+        agg_lookup_free_class_callback,
+        agg_lookup_free_obj_callback);
+    if (lookup == NULL)
+    {
+      pthread_mutex_unlock (&agg_instance_list_lock);
+      ERROR ("aggregation plugin: lookup_create failed.");
+      return (-1);
+    }
+  }
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Aggregation", child->key) == 0)
+      agg_config_aggregation (child);
+    else
+      WARNING ("aggregation plugin: The \"%s\" key is not allowed inside "
+          "<Plugin aggregation /> blocks and will be ignored.", child->key);
+  }
+
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return (0);
+} /* }}} int agg_config */
+
+static int agg_read (void) /* {{{ */
+{
+  agg_instance_t *this;
+  cdtime_t t;
+  int success;
+
+  t = cdtime ();
+  success = 0;
+
+  pthread_mutex_lock (&agg_instance_list_lock);
+
+  /* agg_instance_list_head only holds data, after the "write" callback has
+   * been called with a matching value list at least once. So on startup,
+   * there's a race between the aggregations read() and write() callback. If
+   * the read() callback is called first, agg_instance_list_head is NULL and
+   * "success" may be zero. This is expected and should not result in an error.
+   * Therefore we need to handle this case separately. */
+  if (agg_instance_list_head == NULL)
+  {
+    pthread_mutex_unlock (&agg_instance_list_lock);
+    return (0);
+  }
+
+  for (this = agg_instance_list_head; this != NULL; this = this->next)
+  {
+    int status;
+
+    status = agg_instance_read (this, t);
+    if (status != 0)
+      WARNING ("aggregation plugin: Reading an aggregation instance "
+          "failed with status %i.", status);
+    else
+      success++;
+  }
+
+  pthread_mutex_unlock (&agg_instance_list_lock);
+
+  return ((success > 0) ? 0 : -1);
+} /* }}} int agg_read */
+
+static int agg_write (data_set_t const *ds, value_list_t const *vl, /* {{{ */
+    __attribute__((unused)) user_data_t *user_data)
+{
+  _Bool created_by_aggregation = 0;
+  int status;
+
+  /* Ignore values that were created by the aggregation plugin to avoid weird
+   * effects. */
+  (void) meta_data_get_boolean (vl->meta, "aggregation:created",
+      &created_by_aggregation);
+  if (created_by_aggregation)
+    return (0);
+
+  if (lookup == NULL)
+    status = ENOENT;
+  else
+  {
+    status = lookup_search (lookup, ds, vl);
+    if (status > 0)
+      status = 0;
+  }
+
+  return (status);
+} /* }}} int agg_write */
+
+void module_register (void)
+{
+  plugin_register_complex_config ("aggregation", agg_config);
+  plugin_register_read ("aggregation", agg_read);
+  plugin_register_write ("aggregation", agg_write, /* user_data = */ NULL);
+}
+
+/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */
index 89284c8..c9e46c4 100644 (file)
@@ -31,6 +31,7 @@
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
+#include "utils_format_graphite.h"
 
 #include <pthread.h>
 
@@ -42,8 +43,9 @@
 #define CAMQP_DM_VOLATILE   1
 #define CAMQP_DM_PERSISTENT 2
 
-#define CAMQP_FORMAT_COMMAND 1
-#define CAMQP_FORMAT_JSON    2
+#define CAMQP_FORMAT_COMMAND    1
+#define CAMQP_FORMAT_JSON       2
+#define CAMQP_FORMAT_GRAPHITE   3
 
 #define CAMQP_CHANNEL 1
 
@@ -68,6 +70,10 @@ struct camqp_config_s
     uint8_t delivery_mode;
     _Bool   store_rates;
     int     format;
+    /* publish & graphite format only */
+    char    *prefix;
+    char    *postfix;
+    char    escape_char;
 
     /* subscribe only */
     char   *exchange_type;
@@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */
     sfree (conf->exchange_type);
     sfree (conf->queue);
     sfree (conf->routing_key);
+    sfree (conf->prefix);
+    sfree (conf->postfix);
+
 
     sfree (conf);
 } /* }}} void camqp_config_free */
@@ -699,6 +708,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         props.content_type = amqp_cstring_bytes("text/collectd");
     else if (conf->format == CAMQP_FORMAT_JSON)
         props.content_type = amqp_cstring_bytes("application/json");
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+        props.content_type = amqp_cstring_bytes("text/graphite");
     else
         assert (23 == 42);
     props.delivery_mode = conf->delivery_mode;
@@ -777,6 +788,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
         format_json_finalize (buffer, &bfill, &bfree);
     }
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+    {
+        status = format_graphite (buffer, sizeof (buffer), ds, vl,
+                    conf->prefix, conf->postfix, conf->escape_char,
+                    conf->store_rates);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: format_graphite failed with status %i.",
+                    status);
+            return (status);
+        }
+    }
     else
     {
         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
@@ -809,6 +832,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
         conf->format = CAMQP_FORMAT_COMMAND;
     else if (strcasecmp ("JSON", string) == 0)
         conf->format = CAMQP_FORMAT_JSON;
+    else if (strcasecmp ("Graphite", string) == 0)
+        conf->format = CAMQP_FORMAT_GRAPHITE;
     else
     {
         WARNING ("amqp plugin: Invalid format string: %s",
@@ -849,6 +874,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
+    /* publish & graphite only */
+    conf->prefix = NULL;
+    conf->postfix = NULL;
+    conf->escape_char = '_';
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
@@ -906,6 +935,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_boolean (child, &conf->store_rates);
         else if ((strcasecmp ("Format", child->key) == 0) && publish)
             status = camqp_config_set_format (child, conf);
+        else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->prefix);
+        else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->postfix);
+        else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
+        {
+            char *tmp_buff = NULL;
+            status = cf_util_get_string (child, &tmp_buff);
+            if (strlen (tmp_buff) > 1)
+                WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
+                        "only one character. Others will be ignored.");
+            conf->escape_char = tmp_buff[0];
+            sfree (tmp_buff);
+        }
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);
index f3ef675..9f5dedc 100644 (file)
@@ -52,6 +52,7 @@
 # to missing dependencies or because they have been deactivated explicitly.  #
 ##############################################################################
 
+#@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
 #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 # ription of those options is available in the collectd.conf(5) manual page. #
 ##############################################################################
 
+#<Plugin "aggregation">
+#  <Aggregation>
+#    #Host "unspecified"
+#    Plugin "cpu"
+#    #PluginInstance "unspecified"
+#    Type "cpu"
+#    #TypeInstance "unspecified"
+#
+#    GroupBy "Host"
+#    GroupBy "TypeInstance"
+#
+#    CalculateNum false
+#    CalculateSum false
+#    CalculateAverage true
+#    CalculateMinimum false
+#    CalculateMaximum false
+#    CalculateStddev false
+#  </Aggregation>
+#</Plugin>
+
 #<Plugin "amqp">
 #  <Publish "name">
 #    Host "localhost"
 #</Plugin>
 
 #<Plugin memcached>
-#      Host "127.0.0.1"
-#      Port "11211"
+#      <Instance "local">
+#              Host "127.0.0.1"
+#              Port "11211"
+#      </Instance>
 #</Plugin>
 
 #<Plugin modbus>
 #      Host "localhost"
 #      Port 123
 #      ReverseLookups false
+#      IncludeUnitID true
 #</Plugin>
 
 #<Plugin nut>
 
 #<Plugin "swap">
 #      ReportByDevice false
+#      ReportBytes true
 #</Plugin>
 
 #<Plugin "table">
index c025f94..f6f61c7 100644 (file)
@@ -193,12 +193,122 @@ C<Plugin>-Section. Which options exist depends on the plugin used. Some plugins
 require external configuration, too. The C<apache plugin>, for example,
 required C<mod_status> to be configured in the webserver you're going to
 collect data from. These plugins are listed below as well, even if they don't
-require any configuration within collectd's configfile.
+require any configuration within collectd's configuration file.
 
 A list of all plugins and a short summary for each plugin can be found in the
 F<README> file shipped with the sourcecode and hopefully binary packets as
 well.
 
+=head2 Plugin C<aggregation>
+
+The I<Aggregation plugin> makes it possible to aggregate several values into
+one using aggregation functions such as I<sum>, I<average>, I<min> and I<max>.
+This can be put to a wide variety of uses, e.g. average and total CPU
+statistics for your entire fleet.
+
+The grouping is powerful but, as with many powerful tools, may be a bit
+difficult to wrap your head around. The grouping will therefore be
+demonstrated using an example: The average and sum of the CPU usage across
+all CPUs of each host is to be calculated.
+
+To select all the affected values for our example, set C<Plugin cpu> and
+C<Type cpu>. The other values are left unspecified, meaning "all values". The
+I<Host>, I<Plugin>, I<PluginInstance>, I<Type> and I<TypeInstance> options
+work as if they were specified in the C<WHERE> clause of an C<SELECT> SQL
+statement.
+
+  Plugin "cpu"
+  Type "cpu"
+
+Although the I<Host>, I<PluginInstance> (CPU number, i.e. 0, 1, 2, ...)  and
+I<TypeInstance> (idle, user, system, ...) fields are left unspecified in the
+example, the intention is to have a new value for each host / type instance
+pair. This is achieved by "grouping" the values using the C<GroupBy> option.
+It can be specified multiple times to group by more than one field.
+
+  GroupBy "Host"
+  GroupBy "TypeInstance"
+
+We do neither specify nor group by I<plugin instance> (the CPU number), so all
+metrics that differ in the CPU number only will be aggregated. Each
+aggregation needs I<at least one> such field, otherwise no aggregation would
+take place.
+
+The full example configuration looks like this:
+
+ <Plugin "aggregation">
+   <Aggregation>
+     Plugin "cpu"
+     Type "cpu"
+     
+     GroupBy "Host"
+     GroupBy "TypeInstance"
+     
+     CalculateSum true
+     CalculateAverage true
+   </Aggregation>
+ </Plugin>
+
+There are a couple of limitations you should be aware of:
+
+=over 4
+
+=item
+
+The I<Type> cannot be left unspecified, because it is not reasonable to add
+apples to oranges. Also, the internal lookup structure won't work if you try
+to group by type.
+
+=item
+
+There must be at least one unspecified, ungrouped field. Otherwise nothing
+will be aggregated.
+
+=back
+
+As you can see in the example above, each aggregation has its own
+B<Aggregation> block. You can have multiple aggregation blocks and aggregation
+blocks may match the same values, i.e. one value list can update multiple
+aggregations. The following options are valid inside B<Aggregation> blocks:
+
+=over 4
+
+=item B<Host> I<Host>
+
+=item B<Plugin> I<Plugin>
+
+=item B<PluginInstance> I<PluginInstance>
+
+=item B<Type> I<Type>
+
+=item B<TypeInstance> I<TypeInstance>
+
+Selects the value lists to be added to this aggregation. B<Type> must be a
+valid data set name, see L<types.db(5)> for details.
+
+=item B<GroupBy> B<Host>|B<Plugin>|B<PluginInstance>|B<TypeInstance>
+
+Group valued by the specified field. The B<GroupBy> option may be repeated to
+group by multiple fields.
+
+=item B<CalculateNum> B<true>|B<false>
+
+=item B<CalculateSum> B<true>|B<false>
+
+=item B<CalculateAverage> B<true>|B<false>
+
+=item B<CalculateMinimum> B<true>|B<false>
+
+=item B<CalculateMaximum> B<true>|B<false>
+
+=item B<CalculateStddev> B<true>|B<false>
+
+Boolean options for enabling calculation of the number of value lists, their
+sum, average, minimum, maximum andE<nbsp>/ or standard deviation. All options
+are disabled by default.
+
+=back
+
 =head2 Plugin C<amqp>
 
 The I<AMQMP plugin> can be used to communicate with other instances of
@@ -220,6 +330,8 @@ possibly filtering or messages.
  #   Persistent false
  #   Format "command"
  #   StoreRates false
+ #   GraphitePrefix "collectd."
+ #   GraphiteEscapeChar "_"
    </Publish>
    
    # Receive values from an AMQP broker
@@ -320,6 +432,10 @@ If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
 an easy and straight forward exchange format. The C<Content-Type> header field
 will be set to C<application/json>.
 
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
 A subscribing client I<should> use the C<Content-Type> header field to
 determine how to decode the values. Currently, the I<AMQP plugin> itself can
 only decode the B<Command> format.
@@ -334,6 +450,25 @@ using the internal value cache.
 Please note that currently this option is only used if the B<Format> option has
 been set to B<JSON>.
 
+=item B<GraphitePrefix> (Publish and B<Format>=I<Graphite> only)
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix> (Publish and B<Format>=I<Graphite> only)
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar> (Publish and B<Format>=I<Graphite> only)
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
 =back
 
 =head2 Plugin C<apache>
@@ -1949,6 +2084,17 @@ The C<memcached plugin> connects to a memcached server and queries statistics
 about cache utilization, memory and bandwidth used.
 L<http://www.danga.com/memcached/>
 
+ <Plugin "memcached">
+   <Instance "name">
+     Host "memcache.example.com"
+     Port 11211
+   </Instance>
+ </Plugin>
+
+The plugin configuration consists of one or more B<Instance> blocks which
+specify one I<memcached> connection each. Within the B<Instance> blocks, the
+following options are allowed:
+
 =over 4
 
 =item B<Host> I<Hostname>
@@ -1959,6 +2105,11 @@ Hostname to connect to. Defaults to B<127.0.0.1>.
 
 TCP-Port to connect to. Defaults to B<11211>.
 
+=item B<Socket> I<Path>
+
+Connect to I<memcached> using the UNIX domain socket at I<Path>. If this
+setting is given, the B<Host> and B<Port> settings are ignored.
+
 =back
 
 =head2 Plugin C<modbus>
@@ -3053,6 +3204,16 @@ IP-address may be used in a filename it is recommended to disable reverse
 lookups. The default is to do reverse lookups to preserve backwards
 compatibility, though.
 
+=item B<IncludeUnitID> B<true>|B<false>
+
+When a peer is a refclock, include the unit ID in the I<type instance>.
+Defaults to B<false> for backward compatibility.
+
+If two refclock peers use the same driver and this is B<false>, the plugin will
+try to write simultaneous measurements from both to the same type instance.
+This will result in error messages in the log and only one set of measurements
+making it through.
+
 =back
 
 =head2 Plugin C<nut>
@@ -3278,6 +3439,11 @@ values submitted to the daemon. Other than that, that name is not used.
 Defines the "database alias" or "service name" to connect to. Usually, these
 names are defined in the file named C<$ORACLE_HOME/network/admin/tnsnames.ora>.
 
+=item B<Host> I<Host>
+
+Hostname to use when dispatching values for this database. Defaults to using
+the global hostname of the I<collectd> instance.
+
 =item B<Username> I<Username>
 
 Username used for authentication.
@@ -3548,6 +3714,11 @@ used, the parameter expands to "localhost".
 
 The name of the database of the current connection.
 
+=item I<instance>
+
+The name of the database plugin instance. See the B<Instance> option of the
+database specification below for details.
+
 =item I<username>
 
 The username used to connect to the database.
@@ -3706,6 +3877,13 @@ Specify the password to be used when connecting to the server.
 Specify whether to use an SSL connection when contacting the server. The
 following modes are supported:
 
+=item B<Instance> I<name>
+
+Specify the plugin instance name that should be used instead of the database
+name (which is the default, if this option has not been specified). This
+allows to query multiple databases of the same name on the same host (e.g.
+when running multiple database server versions in parallel).
+
 =over 4
 
 =item I<disable>
@@ -4066,6 +4244,10 @@ The B<Port> option is the TCP port on which the Redis instance accepts
 connections. Either a service name of a port number may be given. Please note
 that numerical port numbers must be given as a string, too.
 
+=item B<Password> I<Password>
+
+Use I<Password> to authenticate when connecting to I<Redis>.
+
 =item B<Timeout> I<Timeout in miliseconds>
 
 The B<Timeout> option set the socket timeout for node response. Since the Redis
@@ -4123,6 +4305,50 @@ Enables or disables the creation of RRD files. If the daemon is not running
 locally, or B<DataDir> is set to a relative path, this will not work as
 expected. Default is B<true>.
 
+=item B<StepSize> I<Seconds>
+
+B<Force> the stepsize of newly created RRD-files. Ideally (and per default)
+this setting is unset and the stepsize is set to the interval in which the data
+is collected. Do not use this option unless you absolutely have to for some
+reason. Setting this option may cause problems with the C<snmp plugin>, the
+C<exec plugin> or when the daemon is set up to receive data from other hosts.
+
+=item B<HeartBeat> I<Seconds>
+
+B<Force> the heartbeat of newly created RRD-files. This setting should be unset
+in which case the heartbeat is set to twice the B<StepSize> which should equal
+the interval in which data is collected. Do not set this option unless you have
+a very good reason to do so.
+
+=item B<RRARows> I<NumRows>
+
+The C<rrdtool plugin> calculates the number of PDPs per CDP based on the
+B<StepSize>, this setting and a timespan. This plugin creates RRD-files with
+three times five RRAs, i. e. five RRAs with the CFs B<MIN>, B<AVERAGE>, and
+B<MAX>. The five RRAs are optimized for graphs covering one hour, one day, one
+week, one month, and one year.
+
+So for each timespan, it calculates how many PDPs need to be consolidated into
+one CDP by calculating:
+  number of PDPs = timespan / (stepsize * rrarows)
+
+Bottom line is, set this no smaller than the width of you graphs in pixels. The
+default is 1200.
+
+=item B<RRATimespan> I<Seconds>
+
+Adds an RRA-timespan, given in seconds. Use this option multiple times to have
+more then one RRA. If this option is never used, the built-in default of (3600,
+86400, 604800, 2678400, 31622400) is used.
+
+For more information on how RRA-sizes are calculated see B<RRARows> above.
+
+=item B<XFF> I<Factor>
+
+Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
+I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
+one (exclusive).
+
 =back
 
 =head2 Plugin C<rrdtool>
@@ -4180,6 +4406,8 @@ For more information on how RRA-sizes are calculated see B<RRARows> above.
 =item B<XFF> I<Factor>
 
 Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
+I<Factor> must be in the range C<[0.0-1.0)>, i.e. between zero (inclusive) and
+one (exclusive).
 
 =item B<CacheFlush> I<Seconds>
 
@@ -4295,6 +4523,11 @@ and available space of each device will be reported separately.
 This option is only available if the I<Swap plugin> can read C</proc/swaps>
 (under Linux) or use the L<swapctl(2)> mechanism (under I<Solaris>).
 
+=item B<ReportBytes> B<false>|B<true>
+
+When enabled, the I<swap I/O> is reported in bytes. When disabled, the default,
+I<swap I/O> is reported in pages. This option is available under Linux only.
+
 =back
 
 =head2 Plugin C<syslog>
index 0bd9f68..b679bf7 100644 (file)
@@ -1229,7 +1229,102 @@ counter_t counter_diff (counter_t old_value, counter_t new_value)
        }
 
        return (diff);
-} /* counter_t counter_to_gauge */
+} /* counter_t counter_diff */
+
+int rate_to_value (value_t *ret_value, gauge_t rate, /* {{{ */
+               rate_to_value_state_t *state,
+               int ds_type, cdtime_t t)
+{
+       gauge_t delta_gauge;
+       cdtime_t delta_t;
+
+       if (ds_type == DS_TYPE_GAUGE)
+       {
+               state->last_value.gauge = rate;
+               state->last_time = t;
+
+               *ret_value = state->last_value;
+               return (0);
+       }
+
+       /* Counter and absolute can't handle negative rates. Reset "last time"
+        * to zero, so that the next valid rate will re-initialize the
+        * structure. */
+       if ((rate < 0.0)
+                       && ((ds_type == DS_TYPE_COUNTER)
+                               || (ds_type == DS_TYPE_ABSOLUTE)))
+       {
+               memset (state, 0, sizeof (*state));
+               return (EINVAL);
+       }
+
+       /* Another invalid state: The time is not increasing. */
+       if (t <= state->last_time)
+       {
+               memset (state, 0, sizeof (*state));
+               return (EINVAL);
+       }
+
+       delta_t = t - state->last_time;
+       delta_gauge = (rate * CDTIME_T_TO_DOUBLE (delta_t)) + state->residual;
+
+       /* Previous value is invalid. */
+       if (state->last_time == 0) /* {{{ */
+       {
+               if (ds_type == DS_TYPE_DERIVE)
+               {
+                       state->last_value.derive = (derive_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.derive);
+               }
+               else if (ds_type == DS_TYPE_COUNTER)
+               {
+                       state->last_value.counter = (counter_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.counter);
+               }
+               else if (ds_type == DS_TYPE_ABSOLUTE)
+               {
+                       state->last_value.absolute = (absolute_t) rate;
+                       state->residual = rate - ((gauge_t) state->last_value.absolute);
+               }
+               else
+               {
+                       assert (23 == 42);
+               }
+
+               state->last_time = t;
+               return (EAGAIN);
+       } /* }}} */
+
+       if (ds_type == DS_TYPE_DERIVE)
+       {
+               derive_t delta_derive = (derive_t) delta_gauge;
+
+               state->last_value.derive += delta_derive;
+               state->residual = delta_gauge - ((gauge_t) delta_derive);
+       }
+       else if (ds_type == DS_TYPE_COUNTER)
+       {
+               counter_t delta_counter = (counter_t) delta_gauge;
+
+               state->last_value.counter += delta_counter;
+               state->residual = delta_gauge - ((gauge_t) delta_counter);
+       }
+       else if (ds_type == DS_TYPE_ABSOLUTE)
+       {
+               absolute_t delta_absolute = (absolute_t) delta_gauge;
+
+               state->last_value.absolute = delta_absolute;
+               state->residual = delta_gauge - ((gauge_t) delta_absolute);
+       }
+       else
+       {
+               assert (23 == 42);
+       }
+
+        state->last_time = t;
+       *ret_value = state->last_value;
+       return (0);
+} /* }}} value_t rate_to_value */
 
 int service_name_to_port_number (const char *service_name)
 {
index 6b11b53..8a7d986 100644 (file)
                || (strcasecmp ("no", (s)) == 0) \
                || (strcasecmp ("off", (s)) == 0))
 
+struct rate_to_value_state_s
+{
+  value_t last_value;
+  cdtime_t last_time;
+  gauge_t residual;
+};
+typedef struct rate_to_value_state_s rate_to_value_state_t;
+
 char *sstrncpy (char *dest, const char *src, size_t n);
 int ssnprintf (char *dest, size_t n, const char *format, ...);
 char *sstrdup(const char *s);
@@ -292,6 +300,15 @@ int read_file_contents (const char *filename, char *buf, int bufsize);
 
 counter_t counter_diff (counter_t old_value, counter_t new_value);
 
+/* Convert a rate back to a value_t. When converting to a derive_t, counter_t
+ * or absoltue_t, take fractional residuals into account. This is important
+ * when scaling counters, for example.
+ * Returns zero on success. Returns EAGAIN when called for the first time; in
+ * this case the value_t is invalid and the next call should succeed. Other
+ * return values indicate an error. */
+int rate_to_value (value_t *ret_value, gauge_t rate,
+               rate_to_value_state_t *state, int ds_type, cdtime_t t);
+
 /* Converts a service name (a string) to a port number
  * (in the range [1-65535]). Returns less than zero on error. */
 int service_name_to_port_number (const char *service_name);
index c207318..344f76e 100644 (file)
 /* no global variables */
 /* #endif KERNEL_LINUX */
 
+#elif HAVE_PERFSTAT
+# include <sys/protosw.h>
+# include <libperfstat.h>
+/* #endif HAVE_PERFSTAT */
+
 #else
 # error "No applicable input method."
 #endif
@@ -121,7 +126,24 @@ static int cs_read (void)
 
        if (status == -2)
                ERROR ("contextswitch plugin: Unable to find context switch value.");
-#endif /* KERNEL_LINUX */
+/* #endif  KERNEL_LINUX */
+
+#elif HAVE_PERFSTAT
+       int status = 0;
+       perfstat_cpu_total_t perfcputotal;
+
+       status = perfstat_cpu_total(NULL, &perfcputotal, sizeof(perfstat_cpu_total_t), 1);
+       if (status < 0)
+       {
+               char errbuf[1024];
+               ERROR ("contextswitch plugin: perfstat_cpu_total: %s",
+                       sstrerror (errno, errbuf, sizeof (errbuf)));
+               return (-1);
+       }
+
+       cs_submit(perfcputotal.pswitch);
+       status = 0;
+#endif /* defined(HAVE_PERFSTAT) */
 
        return status;
 }
index 7411c22..375583c 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/disk.c
- * Copyright (C) 2005-2010  Florian octo Forster
+ * Copyright (C) 2005-2012  Florian octo Forster
  * Copyright (C) 2009       Manuel Sanmartin
  *
  * This program is free software; you can redistribute it and/or modify it
@@ -17,7 +17,7 @@
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  *   Manuel Sanmartin
  **/
 
@@ -75,6 +75,9 @@
 
 #if HAVE_IOKIT_IOKITLIB_H
 static mach_port_t io_master_port = MACH_PORT_NULL;
+/* This defaults to false for backwards compatibility. Please fix in the next
+ * major version. */
+static _Bool use_bsd_name = 0;
 /* #endif HAVE_IOKIT_IOKITLIB_H */
 
 #elif KERNEL_LINUX
@@ -128,6 +131,7 @@ static int pnumdisk;
 static const char *config_keys[] =
 {
        "Disk",
+       "UseBSDName",
        "IgnoreSelected"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
@@ -152,6 +156,15 @@ static int disk_config (const char *key, const char *value)
       invert = 0;
     ignorelist_set_invert (ignorelist, invert);
   }
+  else if (strcasecmp ("UseBSDName", key) == 0)
+  {
+#if HAVE_IOKIT_IOKITLIB_H
+    use_bsd_name = IS_TRUE (value) ? 1 : 0;
+#else
+    WARNING ("disk plugin: The \"UseBSDName\" option is only supported "
+        "on Mach / Mac OS X and will be ignored.");
+#endif
+  }
   else
   {
     return (-1);
@@ -291,7 +304,8 @@ static int disk_read (void)
        CFDictionaryRef         props_dict;
        CFDictionaryRef         stats_dict;
        CFDictionaryRef         child_dict;
-       kern_return_t           status;
+       CFStringRef             tmp_cf_string_ref;
+       kern_return_t           status;
 
        signed long long read_ops;
        signed long long read_byt;
@@ -302,7 +316,8 @@ static int disk_read (void)
 
        int  disk_major;
        int  disk_minor;
-       char disk_name[64];
+       char disk_name[DATA_MAX_NAME_LEN];
+       char disk_name_bsd[DATA_MAX_NAME_LEN];
 
        /* Get the list of all disk objects. */
        if (IOServiceGetMatchingServices (io_master_port,
@@ -350,12 +365,41 @@ static int disk_read (void)
                        continue;
                }
 
+               /* tmp_cf_string_ref doesn't need to be released. */
+               tmp_cf_string_ref = (CFStringRef) CFDictionaryGetValue (props_dict,
+                               CFSTR(kIOBSDNameKey));
+               if (!tmp_cf_string_ref)
+               {
+                       DEBUG ("disk plugin: CFDictionaryGetValue("
+                                       "kIOBSDNameKey) failed.");
+                       CFRelease (props_dict);
+                       IOObjectRelease (disk_child);
+                       IOObjectRelease (disk);
+                       continue;
+               }
+               assert (CFGetTypeID (tmp_cf_string_ref) == CFStringGetTypeID ());
+
+               memset (disk_name_bsd, 0, sizeof (disk_name_bsd));
+               CFStringGetCString (tmp_cf_string_ref,
+                               disk_name_bsd, sizeof (disk_name_bsd),
+                               kCFStringEncodingUTF8);
+               if (disk_name_bsd[0] == 0)
+               {
+                       ERROR ("disk plugin: CFStringGetCString() failed.");
+                       CFRelease (props_dict);
+                       IOObjectRelease (disk_child);
+                       IOObjectRelease (disk);
+                       continue;
+               }
+               DEBUG ("disk plugin: disk_name_bsd = \"%s\"", disk_name_bsd);
+
                stats_dict = (CFDictionaryRef) CFDictionaryGetValue (props_dict,
                                CFSTR (kIOBlockStorageDriverStatisticsKey));
 
                if (stats_dict == NULL)
                {
-                       DEBUG ("CFDictionaryGetValue (%s) failed.",
+                       DEBUG ("disk plugin: CFDictionaryGetValue ("
+                                       "%s) failed.",
                                        kIOBlockStorageDriverStatisticsKey);
                        CFRelease (props_dict);
                        IOObjectRelease (disk_child);
@@ -369,7 +413,8 @@ static int disk_read (void)
                                        kNilOptions)
                                != kIOReturnSuccess)
                {
-                       DEBUG ("IORegistryEntryCreateCFProperties (disk_child) failed.");
+                       DEBUG ("disk plugin: IORegistryEntryCreateCFProperties ("
+                                       "disk_child) failed.");
                        IOObjectRelease (disk_child);
                        CFRelease (props_dict);
                        IOObjectRelease (disk);
@@ -399,17 +444,12 @@ static int disk_read (void)
                write_tme = dict_get_value (stats_dict,
                                kIOBlockStorageDriverStatisticsTotalWriteTimeKey);
 
-               if (ssnprintf (disk_name, sizeof (disk_name),
-                               "%i-%i", disk_major, disk_minor) >= sizeof (disk_name))
-               {
-                       DEBUG ("snprintf (major, minor) failed.");
-                       CFRelease (child_dict);
-                       IOObjectRelease (disk_child);
-                       CFRelease (props_dict);
-                       IOObjectRelease (disk);
-                       continue;
-               }
-               DEBUG ("disk_name = %s", disk_name);
+               if (use_bsd_name)
+                       sstrncpy (disk_name, disk_name_bsd, sizeof (disk_name));
+               else
+                       ssnprintf (disk_name, sizeof (disk_name), "%i-%i",
+                                       disk_major, disk_minor);
+               DEBUG ("disk plugin: disk_name = \"%s\"", disk_name);
 
                if ((read_byt != -1LL) || (write_byt != -1LL))
                        disk_submit (disk_name, "disk_octets", read_byt, write_byt);
index 5b7aa94..19f58b2 100644 (file)
@@ -148,6 +148,19 @@ block:
         $$.children = $2.statement;
         $$.children_num = $2.statement_num;
        }
+       | block_begin block_end
+       {
+        if (strcmp ($1.key, $2) != 0)
+        {
+               printf ("block_begin = %s; block_end = %s;\n", $1.key, $2);
+               yyerror ("Block not closed..\n");
+               exit (1);
+        }
+        free ($2); $2 = NULL;
+        $$ = $1;
+        $$.children = NULL;
+        $$.children_num = 0;
+       }
        ;
 
 statement:
@@ -191,6 +204,13 @@ entire_file:
         ci_root->children = $1.statement;
         ci_root->children_num = $1.statement_num;
        }
+       | /* epsilon */
+       {
+        ci_root = malloc (sizeof (oconfig_item_t));
+        memset (ci_root, '\0', sizeof (oconfig_item_t));
+        ci_root->children = NULL;
+        ci_root->children_num = 0;
+       }
        ;
 
 %%
index 48fa713..a09f45e 100644 (file)
@@ -1,9 +1,10 @@
 /**
  * collectd - src/memcached.c, based on src/hddtemp.c
  * Copyright (C) 2007       Antony Dovgal
- * Copyright (C) 2007-2010  Florian Forster
+ * Copyright (C) 2007-2012  Florian Forster
  * Copyright (C) 2009       Doug MacEachern
  * Copyright (C) 2009       Franck Lombardi
+ * Copyright (C) 2012       Nicolas Szalay
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -24,6 +25,7 @@
  *   Florian octo Forster <octo at collectd.org>
  *   Doug MacEachern <dougm at hyperic.com>
  *   Franck Lombardi
+ *   Nicolas Szalay
  **/
 
 #include "collectd.h"
 #include "plugin.h"
 #include "configfile.h"
 
-# include <poll.h>
-# include <netdb.h>
-# include <sys/socket.h>
-# include <sys/un.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
-
-/* Hack to work around the missing define in AIX */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 
 #define MEMCACHED_DEF_HOST "127.0.0.1"
 #define MEMCACHED_DEF_PORT "11211"
 
-#define MEMCACHED_RETRY_COUNT 100
-
-static const char *config_keys[] =
+struct memcached_s
 {
-       "Socket",
-       "Host",
-       "Port"
+  char *name;
+  char *socket;
+  char *host;
+  char *port;
 };
-static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+typedef struct memcached_s memcached_t;
 
-static char *memcached_socket = NULL;
-static char *memcached_host = NULL;
-static char memcached_port[16];
+static _Bool memcached_have_instances = 0;
 
-static int memcached_query_daemon (char *buffer, int buffer_size) /* {{{ */
+static void memcached_free (memcached_t *st)
 {
-       int fd;
-       ssize_t status;
-       int buffer_fill;
-       int i = 0;
-
-       if (memcached_socket != NULL) {
-               struct sockaddr_un serv_addr;
-
-               memset (&serv_addr, 0, sizeof (serv_addr));
-               serv_addr.sun_family = AF_UNIX;
-               sstrncpy (serv_addr.sun_path, memcached_socket,
-                               sizeof (serv_addr.sun_path));
-
-               /* create our socket descriptor */
-               fd = socket (AF_UNIX, SOCK_STREAM, 0);
-               if (fd < 0) {
-                       char errbuf[1024];
-                       ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
-                                               sizeof (errbuf)));
-                       return -1;
-               }
-
-               /* connect to the memcached daemon */
-               status = (ssize_t) connect (fd, (struct sockaddr *) &serv_addr,
-                               sizeof (serv_addr));
-               if (status != 0) {
-                       shutdown (fd, SHUT_RDWR);
-                       close (fd);
-                       fd = -1;
-               }
-       }
-       else { /* if (memcached_socket == NULL) */
-               const char *host;
-               const char *port;
-
-               struct addrinfo  ai_hints;
-               struct addrinfo *ai_list, *ai_ptr;
-               int              ai_return = 0;
-
-               memset (&ai_hints, '\0', sizeof (ai_hints));
-               ai_hints.ai_flags    = 0;
+  if (st == NULL)
+    return;
+
+  sfree (st->name);
+  sfree (st->socket);
+  sfree (st->host);
+  sfree (st->port);
+}
+
+static int memcached_connect_unix (memcached_t *st)
+{
+  struct sockaddr_un serv_addr;
+  int fd;
+
+  memset (&serv_addr, 0, sizeof (serv_addr));
+  serv_addr.sun_family = AF_UNIX;
+  sstrncpy (serv_addr.sun_path, st->socket,
+      sizeof (serv_addr.sun_path));
+
+  /* create our socket descriptor */
+  fd = socket (AF_UNIX, SOCK_STREAM, 0);
+  if (fd < 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    return (-1);
+  }
+
+  return (fd);
+} /* int memcached_connect_unix */
+
+static int memcached_connect_inet (memcached_t *st)
+{
+  char *host;
+  char *port;
+
+  struct addrinfo  ai_hints;
+  struct addrinfo *ai_list, *ai_ptr;
+  int status;
+  int fd = -1;
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags    = 0;
 #ifdef AI_ADDRCONFIG
-               /*      ai_hints.ai_flags   |= AI_ADDRCONFIG; */
+  ai_hints.ai_flags   |= AI_ADDRCONFIG;
 #endif
-               ai_hints.ai_family   = AF_INET;
-               ai_hints.ai_socktype = SOCK_STREAM;
-               ai_hints.ai_protocol = 0;
-
-               host = memcached_host;
-               if (host == NULL) {
-                       host = MEMCACHED_DEF_HOST;
-               }
-
-               port = memcached_port;
-               if (strlen (port) == 0) {
-                       port = MEMCACHED_DEF_PORT;
-               }
-
-               if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
-                       char errbuf[1024];
-                       ERROR ("memcached: getaddrinfo (%s, %s): %s",
-                                       host, port,
-                                       (ai_return == EAI_SYSTEM)
-                                       ? sstrerror (errno, errbuf, sizeof (errbuf))
-                                       : gai_strerror (ai_return));
-                       return -1;
-               }
-
-               fd = -1;
-               for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
-                       /* create our socket descriptor */
-                       fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
-                       if (fd < 0) {
-                               char errbuf[1024];
-                               ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
-                               continue;
-                       }
-
-                       /* connect to the memcached daemon */
-                       status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen);
-                       if (status != 0) {
-                               shutdown (fd, SHUT_RDWR);
-                               close (fd);
-                               fd = -1;
-                               continue;
-                       }
-
-                       /* A socket could be opened and connecting succeeded. We're
-                        * done. */
-                       break;
-               }
-
-               freeaddrinfo (ai_list);
-       }
-
-       if (fd < 0) {
-               ERROR ("memcached: Could not connect to daemon.");
-               return -1;
-       }
-
-       if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
-               ERROR ("memcached: Could not send command to the memcached daemon.");
-               return -1;
-       }
-
-       {
-               struct pollfd p;
-               int status;
-
-               memset (&p, 0, sizeof (p));
-               p.fd = fd;
-               p.events = POLLIN | POLLERR | POLLHUP;
-               p.revents = 0;
-
-               status = poll (&p, /* nfds = */ 1,
-                               /* timeout = */ CDTIME_T_TO_MS (interval_g));
-               if (status <= 0)
-               {
-                       if (status == 0)
-                       {
-                               ERROR ("memcached: poll(2) timed out after %.3f seconds.",
-                                               CDTIME_T_TO_DOUBLE (interval_g));
-                       }
-                       else
-                       {
-                               char errbuf[1024];
-                               ERROR ("memcached: poll(2) failed: %s",
-                                               sstrerror (errno, errbuf, sizeof (errbuf)));
-                       }
-                       shutdown (fd, SHUT_RDWR);
-                       close (fd);
-                       return (-1);
-               }
-       }
-
-       /* receive data from the memcached daemon */
-       memset (buffer, '\0', buffer_size);
-
-       buffer_fill = 0;
-       while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
-               if (i > MEMCACHED_RETRY_COUNT) {
-                       ERROR("recv() timed out");
-                       break;
-               }
-               i++;
-
-               if (status == -1) {
-                       char errbuf[1024];
-
-                       if (errno == EAGAIN) {
-                               continue;
-                       }
-
-                       ERROR ("memcached: Error reading from socket: %s",
-                                       sstrerror (errno, errbuf, sizeof (errbuf)));
-                       shutdown(fd, SHUT_RDWR);
-                       close (fd);
-                       return -1;
-               }
-               buffer_fill += status;
-
-               if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
-                       /* we got all the data */
-                       break;
-               }
-       }
-
-       if (buffer_fill >= buffer_size) {
-               buffer[buffer_size - 1] = '\0';
-               WARNING ("memcached: Message from memcached has been truncated.");
-       } else if (buffer_fill == 0) {
-               WARNING ("memcached: Peer has unexpectedly shut down the socket. "
-                               "Buffer: `%s'", buffer);
-               shutdown(fd, SHUT_RDWR);
-               close(fd);
-               return -1;
-       }
-
-       shutdown(fd, SHUT_RDWR);
-       close(fd);
-       return 0;
+  ai_hints.ai_family   = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+  ai_hints.ai_protocol = 0;
+
+  host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
+  port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
+
+  ai_list = NULL;
+  status = getaddrinfo (host, port, &ai_hints, &ai_list);
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: memcached_connect_inet: "
+        "getaddrinfo(%s,%s) failed: %s",
+        host, port,
+        (status == EAI_SYSTEM)
+        ? sstrerror (errno, errbuf, sizeof (errbuf))
+        : gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    /* create our socket descriptor */
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      char errbuf[1024];
+      WARNING ("memcached plugin: memcached_connect_inet: "
+          "socket(2) failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      continue;
+    }
+
+    /* connect to the memcached daemon */
+    status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      shutdown (fd, SHUT_RDWR);
+      close (fd);
+      fd = -1;
+      continue;
+    }
+
+    /* A socket could be opened and connecting succeeded. We're done. */
+    break;
+  }
+
+  freeaddrinfo (ai_list);
+  return (fd);
+} /* int memcached_connect_inet */
+
+static int memcached_connect (memcached_t *st)
+{
+  if (st->socket != NULL)
+    return (memcached_connect_unix (st));
+  else
+    return (memcached_connect_inet (st));
 }
-/* }}} */
 
-static int memcached_config (const char *key, const char *value) /* {{{ */
+static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
 {
-       if (strcasecmp (key, "Socket") == 0) {
-               if (memcached_socket != NULL) {
-                       free (memcached_socket);
-               }
-               memcached_socket = strdup (value);
-       } else if (strcasecmp (key, "Host") == 0) {
-               if (memcached_host != NULL) {
-                       free (memcached_host);
-               }
-               memcached_host = strdup (value);
-       } else if (strcasecmp (key, "Port") == 0) {
-               int port = (int) (atof (value));
-               if ((port > 0) && (port <= 65535)) {
-                       ssnprintf (memcached_port, sizeof (memcached_port), "%i", port);
-               } else {
-                       sstrncpy (memcached_port, value, sizeof (memcached_port));
-               }
-       } else {
-               return -1;
-       }
-
-       return 0;
+  int fd = -1;
+  int status;
+  size_t buffer_fill;
+
+  fd = memcached_connect (st);
+  if (fd < 0) {
+    ERROR ("memcached plugin: Instance \"%s\" could not connect to daemon.",
+        st->name);
+    return -1;
+  }
+
+  status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: write(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    shutdown(fd, SHUT_RDWR);
+    close (fd);
+    return (-1);
+  }
+
+  /* receive data from the memcached daemon */
+  memset (buffer, 0, buffer_size);
+
+  buffer_fill = 0;
+  while ((status = (int) recv (fd, buffer + buffer_fill,
+          buffer_size - buffer_fill, /* flags = */ 0)) != 0)
+  {
+    char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
+    if (status < 0)
+    {
+      char errbuf[1024];
+
+      if ((errno == EAGAIN) || (errno == EINTR))
+          continue;
+
+      ERROR ("memcached: Error reading from socket: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      shutdown(fd, SHUT_RDWR);
+      close (fd);
+      return (-1);
+    }
+
+    buffer_fill += (size_t) status;
+    if (buffer_fill > buffer_size)
+    {
+      buffer_fill = buffer_size;
+      WARNING ("memcached plugin: Message was truncated.");
+      break;
+    }
+
+    /* If buffer ends in end_token, we have all the data. */
+    if (memcmp (buffer + buffer_fill - sizeof (end_token),
+          end_token, sizeof (end_token)) == 0)
+      break;
+  } /* while (recv) */
+
+  status = 0;
+  if (buffer_fill == 0)
+  {
+    WARNING ("memcached plugin: No data returned by memcached.");
+    status = -1;
+  }
+
+  shutdown(fd, SHUT_RDWR);
+  close(fd);
+  return (status);
+} /* int memcached_query_daemon */
+
+static void memcached_init_vl (value_list_t *vl, memcached_t const *st)
+{
+  sstrncpy (vl->plugin, "memcached", sizeof (vl->plugin));
+  if (strcmp (st->name, "__legacy__") == 0) /* legacy mode */
+  {
+    sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+  }
+  else
+  {
+    if (st->socket != NULL)
+      sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+    else
+      sstrncpy (vl->host,
+          (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST,
+          sizeof (vl->host));
+    sstrncpy (vl->plugin_instance, st->name, sizeof (vl->plugin_instance));
+  }
 }
-/* }}} */
 
 static void submit_derive (const char *type, const char *type_inst,
-               derive_t value) /* {{{ */
+    derive_t value, memcached_t *st)
 {
-       value_t values[1];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[1];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].derive = value;
+  values[0].derive = value;
 
-       vl.values = values;
-       vl.values_len = 1;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 1;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+  plugin_dispatch_values (&vl);
+}
 
 static void submit_derive2 (const char *type, const char *type_inst,
-               derive_t value0, derive_t value1) /* {{{ */
+    derive_t value0, derive_t value1, memcached_t *st)
 {
-       value_t values[2];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[2];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].derive = value0;
-       values[1].derive = value1;
+  values[0].derive = value0;
+  values[1].derive = value1;
 
-       vl.values = values;
-       vl.values_len = 2;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 2;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+  plugin_dispatch_values (&vl);
+}
 
 static void submit_gauge (const char *type, const char *type_inst,
-               gauge_t value) /* {{{ */
+    gauge_t value, memcached_t *st)
 {
-       value_t values[1];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[1];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].gauge = value;
+  values[0].gauge = value;
 
-       vl.values = values;
-       vl.values_len = 1;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 1;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
+  plugin_dispatch_values (&vl);
 }
-/* }}} */
 
 static void submit_gauge2 (const char *type, const char *type_inst,
-               gauge_t value0, gauge_t value1) /* {{{ */
+    gauge_t value0, gauge_t value1, memcached_t *st)
 {
-       value_t values[2];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[2];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].gauge = value0;
-       values[1].gauge = value1;
+  values[0].gauge = value0;
+  values[1].gauge = value1;
 
-       vl.values = values;
-       vl.values_len = 2;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 2;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
+  plugin_dispatch_values (&vl);
 }
-/* }}} */
 
-static int memcached_read (void) /* {{{ */
+static int memcached_read (user_data_t *user_data)
 {
-       char buf[4096];
-       char *fields[3];
-       char *ptr;
-       char *line;
-       char *saveptr;
-       int fields_num;
-
-       gauge_t bytes_used = NAN;
-       gauge_t bytes_total = NAN;
-       gauge_t hits = NAN;
-       gauge_t gets = NAN;
-       derive_t rusage_user = 0;
-       derive_t rusage_syst = 0;
-       derive_t octets_rx = 0;
-       derive_t octets_tx = 0;
-
-       /* get data from daemon */
-       if (memcached_query_daemon (buf, sizeof (buf)) < 0) {
-               return -1;
-       }
+  char buf[4096];
+  char *fields[3];
+  char *ptr;
+  char *line;
+  char *saveptr;
+  int fields_num;
+
+  gauge_t bytes_used = NAN;
+  gauge_t bytes_total = NAN;
+  gauge_t hits = NAN;
+  gauge_t gets = NAN;
+  derive_t rusage_user = 0;
+  derive_t rusage_syst = 0;
+  derive_t octets_rx = 0;
+  derive_t octets_tx = 0;
+
+  memcached_t *st;
+  st = user_data->data;
+
+  /* get data from daemon */
+  if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
+    return -1;
+  }
 
 #define FIELD_IS(cnst) \
-       (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
-
-       ptr = buf;
-       saveptr = NULL;
-       while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
-       {
-               int name_len;
-
-               ptr = NULL;
-
-               fields_num = strsplit(line, fields, 3);
-               if (fields_num != 3)
-                       continue;
-
-               name_len = strlen(fields[1]);
-               if (name_len == 0)
-                       continue;
-
-               /*
-                * For an explanation on these fields please refer to
-                * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
-                */
-
-               /*
-                * CPU time consumed by the memcached process
-                */
-               if (FIELD_IS ("rusage_user"))
-               {
-                       rusage_user = atoll (fields[2]);
-               }
-               else if (FIELD_IS ("rusage_system"))
-               {
-                       rusage_syst = atoll(fields[2]);
-               }
-
-               /*
-                * Number of threads of this instance
-                */
-               else if (FIELD_IS ("threads"))
-               {
-                       submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]));
-               }
-
-               /*
-                * Number of items stored
-                */
-               else if (FIELD_IS ("curr_items"))
-               {
-                       submit_gauge ("memcached_items", "current", atof (fields[2]));
-               }
-
-               /*
-                * Number of bytes used and available (total - used)
-                */
-               else if (FIELD_IS ("bytes"))
-               {
-                       bytes_used = atof (fields[2]);
-               }
-               else if (FIELD_IS ("limit_maxbytes"))
-               {
-                       bytes_total = atof(fields[2]);
-               }
-
-               /*
-                * Connections
-                */
-               else if (FIELD_IS ("curr_connections"))
-               {
-                       submit_gauge ("memcached_connections", "current", atof (fields[2]));
-               }
-
-               /*
-                * Commands
-                */
-               else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
-               {
-                       const char *name = fields[1] + 4;
-                       submit_derive ("memcached_command", name, atoll (fields[2]));
-                       if (strcmp (name, "get") == 0)
-                               gets = atof (fields[2]);
-               }
-
-               /*
-                * Operations on the cache, i. e. cache hits, cache misses and evictions of items
-                */
-               else if (FIELD_IS ("get_hits"))
-               {
-                       submit_derive ("memcached_ops", "hits", atoll (fields[2]));
-                       hits = atof (fields[2]);
-               }
-               else if (FIELD_IS ("get_misses"))
-               {
-                       submit_derive ("memcached_ops", "misses", atoll (fields[2]));
-               }
-               else if (FIELD_IS ("evictions"))
-               {
-                       submit_derive ("memcached_ops", "evictions", atoll (fields[2]));
-               }
-
-               /*
-                * Network traffic
-                */
-               else if (FIELD_IS ("bytes_read"))
-               {
-                       octets_rx = atoll (fields[2]);
-               }
-               else if (FIELD_IS ("bytes_written"))
-               {
-                       octets_tx = atoll (fields[2]);
-               }
-       } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
-
-       if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
-               submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used);
-
-       if ((rusage_user != 0) || (rusage_syst != 0))
-               submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst);
-
-       if ((octets_rx != 0) || (octets_tx != 0))
-               submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx);
-
-       if (!isnan (gets) && !isnan (hits))
-       {
-               gauge_t rate = NAN;
-
-               if (gets != 0.0)
-                       rate = 100.0 * hits / gets;
-
-               submit_gauge ("percent", "hitratio", rate);
-       }
-
-       return 0;
+  (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
+
+  ptr = buf;
+  saveptr = NULL;
+  while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
+  {
+    int name_len;
+
+    ptr = NULL;
+
+    fields_num = strsplit(line, fields, 3);
+    if (fields_num != 3)
+      continue;
+
+    name_len = strlen(fields[1]);
+    if (name_len == 0)
+      continue;
+
+    /*
+     * For an explanation on these fields please refer to
+     * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
+     */
+
+    /*
+     * CPU time consumed by the memcached process
+     */
+    if (FIELD_IS ("rusage_user"))
+    {
+      rusage_user = atoll (fields[2]);
+    }
+    else if (FIELD_IS ("rusage_system"))
+    {
+      rusage_syst = atoll(fields[2]);
+    }
+
+    /*
+     * Number of threads of this instance
+     */
+    else if (FIELD_IS ("threads"))
+    {
+      submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
+    }
+
+    /*
+     * Number of items stored
+     */
+    else if (FIELD_IS ("curr_items"))
+    {
+      submit_gauge ("memcached_items", "current", atof (fields[2]), st);
+    }
+
+    /*
+     * Number of bytes used and available (total - used)
+     */
+    else if (FIELD_IS ("bytes"))
+    {
+      bytes_used = atof (fields[2]);
+    }
+    else if (FIELD_IS ("limit_maxbytes"))
+    {
+      bytes_total = atof(fields[2]);
+    }
+
+    /*
+     * Connections
+     */
+    else if (FIELD_IS ("curr_connections"))
+    {
+      submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
+    }
+
+    /*
+     * Commands
+     */
+    else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
+    {
+      const char *name = fields[1] + 4;
+      submit_derive ("memcached_command", name, atoll (fields[2]), st);
+      if (strcmp (name, "get") == 0)
+        gets = atof (fields[2]);
+    }
+
+    /*
+     * Operations on the cache, i. e. cache hits, cache misses and evictions of items
+     */
+    else if (FIELD_IS ("get_hits"))
+    {
+      submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
+      hits = atof (fields[2]);
+    }
+    else if (FIELD_IS ("get_misses"))
+    {
+      submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
+    }
+    else if (FIELD_IS ("evictions"))
+    {
+      submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
+    }
+
+    /*
+     * Network traffic
+     */
+    else if (FIELD_IS ("bytes_read"))
+    {
+      octets_rx = atoll (fields[2]);
+    }
+    else if (FIELD_IS ("bytes_written"))
+    {
+      octets_tx = atoll (fields[2]);
+    }
+  } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
+
+  if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
+    submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
+
+  if ((rusage_user != 0) || (rusage_syst != 0))
+    submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
+
+  if ((octets_rx != 0) || (octets_tx != 0))
+    submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
+
+  if (!isnan (gets) && !isnan (hits))
+  {
+    gauge_t rate = NAN;
+
+    if (gets != 0.0)
+      rate = 100.0 * hits / gets;
+
+    submit_gauge ("percent", "hitratio", rate, st);
+  }
+
+  return 0;
+} /* int memcached_read */
+
+static int memcached_add_read_callback (memcached_t *st)
+{
+  user_data_t ud;
+  char callback_name[3*DATA_MAX_NAME_LEN];
+  int status;
+
+  memset (&ud, 0, sizeof (ud));
+  ud.data = st;
+  ud.free_func = (void *) memcached_free;
+
+  assert (st->name != NULL);
+  ssnprintf (callback_name, sizeof (callback_name), "memcached/%s", st->name);
+
+  status = plugin_register_complex_read (/* group = */ "memcached",
+      /* name      = */ callback_name,
+      /* callback  = */ memcached_read,
+      /* interval  = */ NULL,
+      /* user_data = */ &ud);
+  return (status);
+} /* int memcached_add_read_callback */
+
+/* Configuration handling functiions
+ * <Plugin memcached>
+ *   <Instance "instance_name">
+ *     Host foo.zomg.com
+ *     Port "1234"
+ *   </Instance>
+ * </Plugin>
+ */
+static int config_add_instance(oconfig_item_t *ci)
+{
+  memcached_t *st;
+  int i;
+  int status = 0;
+
+  /* Disable automatic generation of default instance in the init callback. */
+  memcached_have_instances = 1;
+
+  st = malloc (sizeof (*st));
+  if (st == NULL)
+  {
+    ERROR ("memcached plugin: malloc failed.");
+    return (-1);
+  }
+
+  memset (st, 0, sizeof (*st));
+  st->name = NULL;
+  st->socket = NULL;
+  st->host = NULL;
+  st->port = NULL;
+
+  if (strcasecmp (ci->key, "Plugin") == 0) /* default instance */
+    st->name = sstrdup ("__legacy__");
+  else /* <Instance /> block */
+    status = cf_util_get_string (ci, &st->name);
+  if (status != 0)
+  {
+    sfree (st);
+    return (status);
+  }
+  assert (st->name != NULL);
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Socket", child->key) == 0)
+      status = cf_util_get_string (child, &st->socket);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &st->host);
+    else if (strcasecmp ("Port", child->key) == 0)
+      status = cf_util_get_service (child, &st->port);
+    else
+    {
+      WARNING ("memcached plugin: Option `%s' not allowed here.",
+          child->key);
+      status = -1;
+    }
+
+    if (status != 0)
+      break;
+  }
+
+  if (status == 0)
+    status = memcached_add_read_callback (st);
+
+  if (status != 0)
+  {
+    memcached_free(st);
+    return (-1);
+  }
+
+  return (0);
 }
-/* }}} */
 
-void module_register (void) /* {{{ */
+static int memcached_config (oconfig_item_t *ci)
 {
-       plugin_register_config ("memcached", memcached_config, config_keys, config_keys_num);
-       plugin_register_read ("memcached", memcached_read);
+  int status = 0;
+  _Bool have_instance_block = 0;
+  int i;
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Instance", child->key) == 0)
+    {
+      config_add_instance (child);
+      have_instance_block = 1;
+    }
+    else if (!have_instance_block)
+    {
+      /* Non-instance option: Assume legacy configuration (without <Instance />
+       * blocks) and call config_add_instance() with the <Plugin /> block. */
+      return (config_add_instance (ci));
+    }
+    else
+      WARNING ("memcached plugin: The configuration option "
+          "\"%s\" is not allowed here. Did you "
+          "forget to add an <Instance /> block "
+          "around the configuration?",
+          child->key);
+  } /* for (ci->children) */
+
+  return (status);
 }
-/* }}} */
-
-/*
- * Local variables:
- * tab-width: 4
- * c-basic-offset: 4
- * End:
- * vim600: sw=4 ts=4 fdm=marker noexpandtab
- * vim<600: sw=4 ts=4 noexpandtab
- */
 
+static int memcached_init (void)
+{
+  memcached_t *st;
+  int status;
+
+  if (memcached_have_instances)
+    return (0);
+
+  /* No instances were configured, lets start a default instance. */
+  st = malloc (sizeof (*st));
+  if (st == NULL)
+    return (ENOMEM);
+  memset (st, 0, sizeof (*st));
+  st->name = sstrdup ("__legacy__");
+  st->socket = NULL;
+  st->host = NULL;
+  st->port = NULL;
+
+  status = memcached_add_read_callback (st);
+  if (status == 0)
+    memcached_have_instances = 1;
+  else
+    memcached_free (st);
+
+  return (status);
+} /* int memcached_init */
+
+void module_register (void)
+{
+  plugin_register_complex_config ("memcached", memcached_config);
+  plugin_register_init ("memcached", memcached_init);
+}
index 8bbf74d..bbc455f 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/ntpd.c
- * Copyright (C) 2006-2007  Florian octo Forster
+ * Copyright (C) 2006-2012  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -16,7 +16,7 @@
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #define _BSD_SOURCE /* For NI_MAXHOST */
@@ -52,11 +52,17 @@ static const char *config_keys[] =
 {
        "Host",
        "Port",
-       "ReverseLookups"
+       "ReverseLookups",
+       "IncludeUnitID"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
-static int do_reverse_lookups = 1;
+static _Bool do_reverse_lookups = 1;
+
+/* This option only exists for backward compatibility. If it is false and two
+ * ntpd peers use the same refclock driver, the plugin will try to write
+ * simultaneous measurements from both to the same type instance. */
+static _Bool include_unit_id = 0;
 
 # define NTPD_DEFAULT_HOST "localhost"
 # define NTPD_DEFAULT_PORT "123"
@@ -283,6 +289,13 @@ static int ntpd_config (const char *key, const char *value)
                else
                        do_reverse_lookups = 0;
        }
+       else if (strcasecmp (key, "IncludeUnitID") == 0)
+       {
+               if (IS_TRUE (value))
+                       include_unit_id = 1;
+               else
+                       include_unit_id = 0;
+       }
        else
        {
                return (-1);
@@ -309,6 +322,18 @@ static void ntpd_submit (char *type, char *type_inst, double value)
        plugin_dispatch_values (&vl);
 }
 
+/* Each time a peer is polled, ntpd shifts the reach register to the left and
+ * sets the LSB based on whether the peer was reachable. If the LSB is zero,
+ * the values are out of date. */
+static void ntpd_submit_reach (char *type, char *type_inst, uint8_t reach,
+               double value)
+{
+       if (!(reach & 1))
+               value = NAN;
+
+       ntpd_submit (type, type_inst, value);
+}
+
 static int ntpd_connect (void)
 {
        char *host;
@@ -763,6 +788,108 @@ static double ntpd_read_fp (int32_t val_int)
        return (val_double);
 }
 
+static uint32_t ntpd_get_refclock_id (struct info_peer_summary const *peer_info)
+{
+       uint32_t addr = ntohl (peer_info->srcadr);
+       uint32_t refclock_id = (addr >> 8) & 0x00FF;
+
+       return (refclock_id);
+}
+
+static int ntpd_get_name_from_address (char *buffer, size_t buffer_size,
+               struct info_peer_summary const *peer_info, _Bool do_reverse_lookup)
+{
+       struct sockaddr_storage sa;
+       socklen_t sa_len;
+       int flags = 0;
+       int status;
+
+       memset (&sa, 0, sizeof (sa));
+
+       if (peer_info->v6_flag)
+       {
+               struct sockaddr_in6 sa6;
+
+               assert (sizeof (sa) >= sizeof (sa6));
+
+               memset (&sa6, 0, sizeof (sa6));
+               sa6.sin6_family = AF_INET6;
+               sa6.sin6_port = htons (123);
+               memcpy (&sa6.sin6_addr, &peer_info->srcadr6,
+                               sizeof (struct in6_addr));
+               sa_len = sizeof (sa6);
+
+               memcpy (&sa, &sa6, sizeof (sa6));
+       }
+       else
+       {
+               struct sockaddr_in sa4;
+
+               assert (sizeof (sa) >= sizeof (sa4));
+
+               memset (&sa4, 0, sizeof (sa4));
+               sa4.sin_family = AF_INET;
+               sa4.sin_port = htons (123);
+               memcpy (&sa4.sin_addr, &peer_info->srcadr,
+                               sizeof (struct in_addr));
+               sa_len = sizeof (sa4);
+
+               memcpy (&sa, &sa4, sizeof (sa4));
+       }
+
+       if (!do_reverse_lookup)
+               flags |= NI_NUMERICHOST;
+
+       status = getnameinfo ((struct sockaddr const *) &sa, sa_len,
+                       buffer, buffer_size,
+                       NULL, 0, /* No port name */
+                       flags);
+       if (status != 0)
+       {
+               char errbuf[1024];
+               ERROR ("ntpd plugin: getnameinfo failed: %s",
+                               (status == EAI_SYSTEM)
+                               ? sstrerror (errno, errbuf, sizeof (errbuf))
+                               : gai_strerror (status));
+               return (-1);
+       }
+
+       return (0);
+} /* ntpd_get_name_from_address */
+
+static int ntpd_get_name_refclock (char *buffer, size_t buffer_size,
+               struct info_peer_summary const *peer_info)
+{
+       uint32_t refclock_id = ntpd_get_refclock_id (peer_info);
+       uint32_t unit_id = ntohl (peer_info->srcadr) & 0x00FF;
+
+       if (refclock_id >= refclock_names_num)
+               return (ntpd_get_name_from_address (buffer, buffer_size,
+                                       peer_info,
+                                       /* do_reverse_lookup = */ 0));
+
+       if (include_unit_id)
+               ssnprintf (buffer, buffer_size, "%s-%"PRIu32,
+                               refclock_names[refclock_id], unit_id);
+       else
+               sstrncpy (buffer, refclock_names[refclock_id], buffer_size);
+
+       return (0);
+} /* int ntpd_get_name_refclock */
+
+static int ntpd_get_name (char *buffer, size_t buffer_size,
+               struct info_peer_summary const *peer_info)
+{
+       uint32_t addr = ntohl (peer_info->srcadr);
+
+       if (!peer_info->v6_flag && ((addr & REFCLOCK_MASK) == REFCLOCK_ADDR))
+               return (ntpd_get_name_refclock (buffer, buffer_size,
+                                       peer_info));
+       else
+               return (ntpd_get_name_from_address (buffer, buffer_size,
+                                       peer_info, do_reverse_lookups));
+} /* int ntpd_addr_to_name */
+
 static int ntpd_read (void)
 {
        struct info_kernel *ik;
@@ -837,99 +964,26 @@ static int ntpd_read (void)
                double offset;
 
                char peername[NI_MAXHOST];
-               int refclock_id;
-               
-               ptr = ps + i;
-               refclock_id = 0;
+               uint32_t refclock_id;
 
-               /* Convert the `long floating point' offset value to double */
-               M_LFPTOD (ntohl (ptr->offset_int), ntohl (ptr->offset_frc), offset);
+               ptr = ps + i;
 
-               /* Special IP addresses for hardware clocks and stuff.. */
-               if (!ptr->v6_flag
-                               && ((ntohl (ptr->srcadr) & REFCLOCK_MASK)
-                                       == REFCLOCK_ADDR))
+               status = ntpd_get_name (peername, sizeof (peername), ptr);
+               if (status != 0)
                {
-                       struct in_addr  addr_obj;
-                       char *addr_str;
-
-                       refclock_id = (ntohl (ptr->srcadr) >> 8) & 0x000000FF;
-
-                       if (refclock_id < refclock_names_num)
-                       {
-                               sstrncpy (peername, refclock_names[refclock_id],
-                                               sizeof (peername));
-                       }
-                       else
-                       {
-                               memset ((void *) &addr_obj, '\0', sizeof (addr_obj));
-                               addr_obj.s_addr = ptr->srcadr;
-                               addr_str = inet_ntoa (addr_obj);
-
-                               sstrncpy (peername, addr_str, sizeof (peername));
-                       }
+                       ERROR ("ntpd plugin: Determining name of peer failed.");
+                       continue;
                }
-               else /* Normal network host. */
-               {
-                       struct sockaddr_storage sa;
-                       socklen_t sa_len;
-                       int flags = 0;
-
-                       memset (&sa, '\0', sizeof (sa));
-
-                       if (ptr->v6_flag)
-                       {
-                               struct sockaddr_in6 sa6;
-
-                               assert (sizeof (sa) >= sizeof (sa6));
-
-                               memset (&sa6, 0, sizeof (sa6));
-                               sa6.sin6_family = AF_INET6;
-                               sa6.sin6_port = htons (123);
-                               memcpy (&sa6.sin6_addr, &ptr->srcadr6,
-                                               sizeof (struct in6_addr));
-                               sa_len = sizeof (sa6);
-
-                               memcpy (&sa, &sa6, sizeof (sa6));
-                       }
-                       else
-                       {
-                               struct sockaddr_in sa4;
-
-                               assert (sizeof (sa) >= sizeof (sa4));
 
-                               memset (&sa4, 0, sizeof (sa4));
-                               sa4.sin_family = AF_INET;
-                               sa4.sin_port = htons (123);
-                               memcpy (&sa4.sin_addr, &ptr->srcadr,
-                                               sizeof (struct in_addr));
-                               sa_len = sizeof (sa4);
+               refclock_id = ntpd_get_refclock_id (ptr);
 
-                               memcpy (&sa, &sa4, sizeof (sa4));
-                       }
-
-                       if (do_reverse_lookups == 0)
-                               flags |= NI_NUMERICHOST;
-
-                       status = getnameinfo ((const struct sockaddr *) &sa,
-                                       sa_len,
-                                       peername, sizeof (peername),
-                                       NULL, 0, /* No port name */
-                                       flags);
-                       if (status != 0)
-                       {
-                               char errbuf[1024];
-                               ERROR ("ntpd plugin: getnameinfo failed: %s",
-                                               (status == EAI_SYSTEM)
-                                               ? sstrerror (errno, errbuf, sizeof (errbuf))
-                                               : gai_strerror (status));
-                               continue;
-                       }
-               }
+               /* Convert the `long floating point' offset value to double */
+               M_LFPTOD (ntohl (ptr->offset_int), ntohl (ptr->offset_frc), offset);
 
                DEBUG ("peer %i:\n"
                                "  peername   = %s\n"
                                "  srcadr     = 0x%08x\n"
+                               "  reach      = 0%03o\n"
                                "  delay      = %f\n"
                                "  offset_int = %i\n"
                                "  offset_frc = %i\n"
@@ -938,6 +992,7 @@ static int ntpd_read (void)
                                i,
                                peername,
                                ntohl (ptr->srcadr),
+                               ptr->reach,
                                ntpd_read_fp (ptr->delay),
                                ntohl (ptr->offset_int),
                                ntohl (ptr->offset_frc),
@@ -945,10 +1000,13 @@ static int ntpd_read (void)
                                ntpd_read_fp (ptr->dispersion));
 
                if (refclock_id != 1) /* not the system clock (offset will always be zero.. */
-                       ntpd_submit ("time_offset", peername, offset);
-               ntpd_submit ("time_dispersion", peername, ntpd_read_fp (ptr->dispersion));
+                       ntpd_submit_reach ("time_offset", peername, ptr->reach,
+                                       offset);
+               ntpd_submit_reach ("time_dispersion", peername, ptr->reach,
+                               ntpd_read_fp (ptr->dispersion));
                if (refclock_id == 0) /* not a reference clock */
-                       ntpd_submit ("delay", peername, ntpd_read_fp (ptr->delay));
+                       ntpd_submit_reach ("delay", peername, ptr->reach,
+                                       ntpd_read_fp (ptr->delay));
        }
 
        free (ps);
index 80ae699..ab0812b 100644 (file)
@@ -59,6 +59,7 @@
 struct o_database_s
 {
   char *name;
+  char *host;
   char *connect_id;
   char *username;
   char *password;
@@ -183,33 +184,6 @@ static void o_database_free (o_database_t *db) /* {{{ */
  * </Plugin>
  */
 
-static int o_config_set_string (char **ret_string, /* {{{ */
-    oconfig_item_t *ci)
-{
-  char *string;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("oracle plugin: The `%s' config option "
-        "needs exactly one string argument.", ci->key);
-    return (-1);
-  }
-
-  string = strdup (ci->values[0].value.string);
-  if (string == NULL)
-  {
-    ERROR ("oracle plugin: strdup failed.");
-    return (-1);
-  }
-
-  if (*ret_string != NULL)
-    free (*ret_string);
-  *ret_string = string;
-
-  return (0);
-} /* }}} int o_config_set_string */
-
 static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
 {
   o_database_t *db;
@@ -231,8 +205,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
     return (-1);
   }
   memset (db, 0, sizeof (*db));
+  db->name = NULL;
+  db->host = NULL;
+  db->connect_id = NULL;
+  db->username = NULL;
+  db->password = NULL;
 
-  status = o_config_set_string (&db->name, ci);
+  status = cf_util_get_string (ci, &db->name);
   if (status != 0)
   {
     sfree (db);
@@ -245,11 +224,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp ("ConnectID", child->key) == 0)
-      status = o_config_set_string (&db->connect_id, child);
+      status = cf_util_get_string (child, &db->connect_id);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &db->host);
     else if (strcasecmp ("Username", child->key) == 0)
-      status = o_config_set_string (&db->username, child);
+      status = cf_util_get_string (child, &db->username);
     else if (strcasecmp ("Password", child->key) == 0)
-      status = o_config_set_string (&db->password, child);
+      status = cf_util_get_string (child, &db->password);
     else if (strcasecmp ("Query", child->key) == 0)
       status = udb_query_pick_from_list (child, queries, queries_num,
           &db->queries, &db->queries_num);
@@ -613,7 +594,8 @@ static int o_read_database_query (o_database_t *db, /* {{{ */
   } /* for (j = 1; j <= param_counter; j++) */
   /* }}} End of the ``define'' stuff. */
 
-  status = udb_query_prepare_result (q, prep_area, hostname_g,
+  status = udb_query_prepare_result (q, prep_area,
+      (db->host != NULL) ? db->host : hostname_g,
       /* plugin = */ "oracle", db->name, column_names, column_num,
       /* interval = */ 0);
   if (status != 0)
index 0a5e66c..675505b 100644 (file)
@@ -95,6 +95,7 @@ typedef enum {
        C_PSQL_PARAM_DB,
        C_PSQL_PARAM_USER,
        C_PSQL_PARAM_INTERVAL,
+       C_PSQL_PARAM_INSTANCE,
 } c_psql_param_t;
 
 /* Parameter configuration. Stored as `user data' in the query objects. */
@@ -125,6 +126,8 @@ typedef struct {
        char *user;
        char *password;
 
+       char *instance;
+
        char *sslmode;
 
        char *krbsrvname;
@@ -177,6 +180,8 @@ static c_psql_database_t *c_psql_database_new (const char *name)
        db->user       = NULL;
        db->password   = NULL;
 
+       db->instance   = sstrdup (name);
+
        db->sslmode    = NULL;
 
        db->krbsrvname = NULL;
@@ -208,6 +213,8 @@ static void c_psql_database_delete (void *data)
        sfree (db->user);
        sfree (db->password);
 
+       sfree (db->instance);
+
        sfree (db->sslmode);
 
        sfree (db->krbsrvname);
@@ -271,8 +278,9 @@ static int c_psql_check_connection (c_psql_database_t *db)
 
                if (CONNECTION_OK != PQstatus (db->conn)) {
                        c_complain (LOG_ERR, &db->conn_complaint,
-                                       "Failed to connect to database %s: %s",
-                                       db->database, PQerrorMessage (db->conn));
+                                       "Failed to connect to database %s (%s): %s",
+                                       db->database, db->instance,
+                                       PQerrorMessage (db->conn));
                        return -1;
                }
 
@@ -340,6 +348,9 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db,
                                                ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g);
                                params[i] = interval;
                                break;
+                       case C_PSQL_PARAM_INSTANCE:
+                               params[i] = db->instance;
+                               break;
                        default:
                                assert (0);
                }
@@ -377,9 +388,10 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
        else if ((NULL == data) || (0 == data->params_num))
                res = c_psql_exec_query_noparams (db, q);
        else {
-               log_err ("Connection to database \"%s\" does not support parameters "
-                               "(protocol version %d) - cannot execute query \"%s\".",
-                               db->database, db->proto_version,
+               log_err ("Connection to database \"%s\" (%s) does not support "
+                               "parameters (protocol version %d) - "
+                               "cannot execute query \"%s\".",
+                               db->database, db->instance, db->proto_version,
                                udb_query_get_name (q));
                return -1;
        }
@@ -436,7 +448,7 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                host = db->host;
 
        status = udb_query_prepare_result (q, prep_area, host, "postgresql",
-                       db->database, column_names, (size_t) column_num, db->interval);
+                       db->instance, column_names, (size_t) column_num, db->interval);
        if (0 != status) {
                log_err ("udb_query_prepare_result failed with status %i.",
                                status);
@@ -487,6 +499,7 @@ static int c_psql_read (user_data_t *ud)
        db = ud->data;
 
        assert (NULL != db->database);
+       assert (NULL != db->instance);
 
        if (0 != c_psql_check_connection (db))
                return -1;
@@ -558,6 +571,8 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci)
                data->params[data->params_num] = C_PSQL_PARAM_USER;
        else if (0 == strcasecmp (param_str, "interval"))
                data->params[data->params_num] = C_PSQL_PARAM_INTERVAL;
+       else if (0 == strcasecmp (param_str, "instance"))
+               data->params[data->params_num] = C_PSQL_PARAM_INSTANCE;
        else {
                log_err ("Invalid parameter \"%s\".", param_str);
                return 1;
@@ -612,6 +627,8 @@ static int c_psql_config_database (oconfig_item_t *ci)
                        cf_util_get_string (c, &db->user);
                else if (0 == strcasecmp (c->key, "Password"))
                        cf_util_get_string (c, &db->password);
+               else if (0 == strcasecmp (c->key, "Instance"))
+                       cf_util_get_string (c, &db->instance);
                else if (0 == strcasecmp (c->key, "SSLMode"))
                        cf_util_get_string (c, &db->sslmode);
                else if (0 == strcasecmp (c->key, "KRBSrvName"))
@@ -665,7 +682,7 @@ static int c_psql_config_database (oconfig_item_t *ci)
        ud.data = db;
        ud.free_func = c_psql_database_delete;
 
-       ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database);
+       ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance);
 
        CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
 
index 66237dc..c77859d 100644 (file)
@@ -7,6 +7,7 @@
  * Copyright (C) 2009       Andrés J. Díaz
  * Copyright (C) 2009       Manuel Sanmartin
  * Copyright (C) 2010       Clément Stenac
+ * Copyright (C) 2012       Cosmin Ioiart
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -30,6 +31,7 @@
  *   Andrés J. Díaz <ajdiaz at connectical.com>
  *   Manuel Sanmartin
  *   Clément Stenac <clement.stenac at diwi.org>
+ *   Cosmin Ioiart <cioiart at gmail.com>
  **/
 
 #include "collectd.h"
 #define MAXARGLN 1024
 /* #endif HAVE_PROCINFO_H */
 
+#elif KERNEL_SOLARIS
+# include <procfs.h>
+# include <dirent.h>
+/* #endif KERNEL_SOLARIS */
+
 #else
 # error "No applicable input method."
 #endif
 # include <regex.h>
 #endif
 
+#if HAVE_KSTAT_H
+# include <kstat.h>
+#endif
+
 #ifndef ARG_MAX
 #  define ARG_MAX 4096
 #endif
@@ -216,8 +227,8 @@ int getargs (struct procentry64 *processBuffer, int bufferLen, char *argsBuffer,
 #endif /* HAVE_PROCINFO_H */
 
 /* put name of process from config to list_head_g tree
  list_head_g is a list of 'procstat_t' structs with
  processes names we want to watch */
* list_head_g is a list of 'procstat_t' structs with
* processes names we want to watch */
 static void ps_list_register (const char *name, const char *regexp)
 {
        procstat_t *new;
@@ -712,7 +723,7 @@ static void ps_submit_proc_list (procstat_t *ps)
        }
 
        DEBUG ("name = %s; num_proc = %lu; num_lwp = %lu; "
-                        "vmem_size = %lu; vmem_rss = %lu; vmem_data = %lu; "
+                       "vmem_size = %lu; vmem_rss = %lu; vmem_data = %lu; "
                        "vmem_code = %lu; "
                        "vmem_minflt_counter = %"PRIi64"; vmem_majflt_counter = %"PRIi64"; "
                        "cpu_user_counter = %"PRIi64"; cpu_system_counter = %"PRIi64"; "
@@ -726,6 +737,26 @@ static void ps_submit_proc_list (procstat_t *ps)
                        ps->io_rchar, ps->io_wchar, ps->io_syscr, ps->io_syscw);
 } /* void ps_submit_proc_list */
 
+#if KERNEL_LINUX || KERNEL_SOLARIS
+static void ps_submit_fork_rate (derive_t value)
+{
+       value_t values[1];
+       value_list_t vl = VALUE_LIST_INIT;
+
+       values[0].derive = value;
+
+       vl.values = values;
+       vl.values_len = 1;
+       sstrncpy(vl.host, hostname_g, sizeof (vl.host));
+       sstrncpy(vl.plugin, "processes", sizeof (vl.plugin));
+       sstrncpy(vl.plugin_instance, "", sizeof (vl.plugin_instance));
+       sstrncpy(vl.type, "fork_rate", sizeof (vl.type));
+       sstrncpy(vl.type_instance, "", sizeof (vl.type_instance));
+
+       plugin_dispatch_values(&vl);
+}
+#endif /* KERNEL_LINUX || KERNEL_SOLARIS*/
+
 /* ------- additional functions for KERNEL_LINUX/HAVE_THREAD_INFO ------- */
 #if KERNEL_LINUX
 static int ps_read_tasks (int pid)
@@ -780,7 +811,7 @@ static procstat_t *ps_read_vmem (int pid, procstat_t *ps)
                        continue;
 
                numfields = strsplit (buffer, fields,
-                                      STATIC_ARRAY_SIZE (fields));
+                               STATIC_ARRAY_SIZE (fields));
 
                if (numfields < 2)
                        continue;
@@ -1117,70 +1148,226 @@ static char *ps_get_cmdline (pid_t pid, char *name, char *buf, size_t buf_len)
        return buf;
 } /* char *ps_get_cmdline (...) */
 
-static unsigned long read_fork_rate ()
+static int read_fork_rate ()
 {
        FILE *proc_stat;
-       char buf[1024];
-       unsigned long result = 0;
-       int numfields;
-       char *fields[3];
+       char buffer[1024];
+       value_t value;
+       _Bool value_valid = 0;
 
-       proc_stat = fopen("/proc/stat", "r");
-       if (proc_stat == NULL) {
+       proc_stat = fopen ("/proc/stat", "r");
+       if (proc_stat == NULL)
+       {
                char errbuf[1024];
                ERROR ("processes plugin: fopen (/proc/stat) failed: %s",
                                sstrerror (errno, errbuf, sizeof (errbuf)));
-               return ULONG_MAX;
+               return (-1);
        }
 
-       while (fgets (buf, sizeof(buf), proc_stat) != NULL)
+       while (fgets (buffer, sizeof (buffer), proc_stat) != NULL)
        {
-               char *endptr;
+               int status;
+               char *fields[3];
+               int fields_num;
 
-               numfields = strsplit(buf, fields, STATIC_ARRAY_SIZE (fields));
-               if (numfields != 2)
+               fields_num = strsplit (buffer, fields,
+                               STATIC_ARRAY_SIZE (fields));
+               if (fields_num != 2)
                        continue;
 
                if (strcmp ("processes", fields[0]) != 0)
                        continue;
 
-               errno = 0;
-               endptr = NULL;
-               result = strtoul(fields[1], &endptr, /* base = */ 10);
-               if ((endptr == fields[1]) || (errno != 0)) {
-                       ERROR ("processes plugin: Cannot parse fork rate: %s",
-                                       fields[1]);
-                       result = ULONG_MAX;
-                       break;
-               }
+               status = parse_value (fields[1], &value, DS_TYPE_DERIVE);
+               if (status == 0)
+                       value_valid = 1;
 
                break;
        }
-
        fclose(proc_stat);
 
-       return result;
+       if (!value_valid)
+               return (-1);
+
+       ps_submit_fork_rate (value.derive);
+       return (0);
 }
+#endif /*KERNEL_LINUX */
 
-static void ps_submit_fork_rate (unsigned long value)
+#if KERNEL_SOLARIS
+static const char *ps_get_cmdline (pid_t pid, /* {{{ */
+               char *buffer, size_t buffer_size)
 {
-       value_t values[1];
-       value_list_t vl = VALUE_LIST_INIT;
+       char path[PATH_MAX];
+       psinfo_t info;
+       int status;
 
-       values[0].derive = (derive_t) value;
+       snprintf(path, sizeof (path), "/proc/%i/psinfo", pid);
 
-       vl.values = values;
-       vl.values_len = 1;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "processes", sizeof (vl.plugin));
-       sstrncpy (vl.plugin_instance, "", sizeof (vl.plugin_instance));
-       sstrncpy (vl.type, "fork_rate", sizeof (vl.type));
-       sstrncpy (vl.type_instance, "", sizeof (vl.type_instance));
+       status = read_file_contents (path, (void *) &info, sizeof (info));
+       if (status != ((int) buffer_size))
+       {
+               ERROR ("processes plugin: Unexpected return value "
+                               "while reading \"%s\": "
+                               "Returned %i but expected %zu.",
+                               path, status, buffer_size);
+               return (NULL);
+       }
 
-       plugin_dispatch_values (&vl);
+       info.pr_psargs[sizeof (info.pr_psargs) - 1] = 0;
+       sstrncpy (buffer, info.pr_psargs, buffer_size);
+
+       return (buffer);
+} /* }}} int ps_get_cmdline */
+
+/*
+ * Reads process information on the Solaris OS. The information comes mainly from
+ * /proc/PID/status, /proc/PID/psinfo and /proc/PID/usage
+ * The values for input and ouput chars are calculated "by hand"
+ * Added a few "solaris" specific process states as well
+ */
+static int ps_read_process(int pid, procstat_t *ps, char *state)
+{
+       char filename[64];
+       char f_psinfo[64], f_usage[64];
+       char *buffer;
+
+       pstatus_t *myStatus;
+       psinfo_t *myInfo;
+       prusage_t *myUsage;
+
+       snprintf(filename, sizeof (filename), "/proc/%i/status", pid);
+       snprintf(f_psinfo, sizeof (f_psinfo), "/proc/%i/psinfo", pid);
+       snprintf(f_usage, sizeof (f_usage), "/proc/%i/usage", pid);
+
+
+       buffer = malloc(sizeof (pstatus_t));
+       memset(buffer, 0, sizeof (pstatus_t));
+       read_file_contents(filename, buffer, sizeof (pstatus_t));
+       myStatus = (pstatus_t *) buffer;
+
+       buffer = malloc(sizeof (psinfo_t));
+       memset(buffer, 0, sizeof(psinfo_t));
+       read_file_contents(f_psinfo, buffer, sizeof (psinfo_t));
+       myInfo = (psinfo_t *) buffer;
+
+       buffer = malloc(sizeof (prusage_t));
+       memset(buffer, 0, sizeof(prusage_t));
+       read_file_contents(f_usage, buffer, sizeof (prusage_t));
+       myUsage = (prusage_t *) buffer;
+
+       sstrncpy(ps->name, myInfo->pr_fname, sizeof (myInfo->pr_fname));
+       ps->num_lwp = myStatus->pr_nlwp;
+       if (myInfo->pr_wstat != 0) {
+               ps->num_proc = 0;
+               ps->num_lwp = 0;
+               *state = (char) 'Z';
+               return (0);
+       } else {
+               ps->num_proc = 1;
+               ps->num_lwp = myInfo->pr_nlwp;
+       }
+
+       /*
+        * Convert system time and user time from nanoseconds to microseconds
+        * for compatibility with the linux module
+        */
+       ps->cpu_system_counter = myStatus -> pr_stime.tv_nsec / 1000;
+       ps->cpu_user_counter = myStatus -> pr_utime.tv_nsec / 1000;
+
+       /*
+        * Convert rssize from KB to bytes to be consistent w/ the linux module
+        */
+       ps->vmem_rss = myInfo->pr_rssize * 1024;
+       ps->vmem_size = myInfo->pr_size * 1024;
+       ps->vmem_minflt_counter = myUsage->pr_minf;
+       ps->vmem_majflt_counter = myUsage->pr_majf;
+
+       /*
+        * TODO: Data and code segment calculations for Solaris
+        */
+
+       ps->vmem_data = -1;
+       ps->vmem_code = -1;
+       ps->stack_size = myStatus->pr_stksize;
+
+       /*
+        * Calculating input/ouput chars
+        * Formula used is total chars / total blocks => chars/block
+        * then convert input/output blocks to chars
+        */
+       ulong_t tot_chars = myUsage->pr_ioch;
+       ulong_t tot_blocks = myUsage->pr_inblk + myUsage->pr_oublk;
+       ulong_t chars_per_block = 1;
+       if (tot_blocks != 0)
+               chars_per_block = tot_chars / tot_blocks;
+       ps->io_rchar = myUsage->pr_inblk * chars_per_block;
+       ps->io_wchar = myUsage->pr_oublk * chars_per_block;
+       ps->io_syscr = myUsage->pr_sysc;
+       ps->io_syscw = myUsage->pr_sysc;
+
+
+       /*
+        * TODO: Find way of setting BLOCKED and PAGING status
+        */
+
+       *state = (char) 'R';
+       if (myStatus->pr_flags & PR_ASLEEP)
+               *state = (char) 'S';
+       else if (myStatus->pr_flags & PR_STOPPED)
+               *state = (char) 'T';
+       else if (myStatus->pr_flags & PR_DETACH)
+               *state = (char) 'E';
+       else if (myStatus->pr_flags & PR_DAEMON)
+               *state = (char) 'A';
+       else if (myStatus->pr_flags & PR_ISSYS)
+               *state = (char) 'Y';
+       else if (myStatus->pr_flags & PR_ORPHAN)
+               *state = (char) 'O';
+
+       sfree(myStatus);
+       sfree(myInfo);
+       sfree(myUsage);
+
+       return (0);
 }
 
-#endif /* KERNEL_LINUX */
+/*
+ * Reads the number of threads created since the last reboot. On Solaris these
+ * are retrieved from kstat (module cpu, name sys, class misc, stat nthreads).
+ * The result is the sum for all the threads created on each cpu
+ */
+static int read_fork_rate()
+{
+       extern kstat_ctl_t *kc;
+       kstat_t *ksp_chain = NULL;
+       derive_t result = 0;
+
+       if (kc == NULL)
+               return (-1);
+
+       for (ksp_chain = kc->kc_chain;
+                       ksp_chain != NULL;
+                       ksp_chain = ksp_chain->ks_next)
+       {
+               if ((strcmp (ksp_chain->ks_module, "cpu") == 0)
+                               && (strcmp (ksp_chain->ks_name, "sys") == 0)
+                               && (strcmp (ksp_chain->ks_class, "misc") == 0))
+               {
+                       long long tmp;
+
+                       kstat_read (kc, ksp_chain, NULL);
+
+                       tmp = get_kstat_value(ksp_chain, "nthreads");
+                       if (tmp != -1LL)
+                               result += tmp;
+               }
+       }
+
+       ps_submit_fork_rate (result);
+       return (0);
+}
+#endif /* KERNEL_SOLARIS */
 
 #if HAVE_THREAD_INFO
 static int mach_get_task_name (task_t t, int *pid, char *name, size_t name_max_len)
@@ -1506,8 +1693,6 @@ static int ps_read (void)
        procstat_entry_t pse;
        char       state;
 
-       unsigned long fork_rate;
-
        procstat_t *ps_ptr;
 
        running = sleeping = zombies = stopped = paging = blocked = 0;
@@ -1589,9 +1774,7 @@ static int ps_read (void)
        for (ps_ptr = list_head_g; ps_ptr != NULL; ps_ptr = ps_ptr->next)
                ps_submit_proc_list (ps_ptr);
 
-       fork_rate = read_fork_rate();
-       if (fork_rate != ULONG_MAX)
-               ps_submit_fork_rate(fork_rate);
+       read_fork_rate();
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_LIBKVM_GETPROCS && HAVE_STRUCT_KINFO_PROC_FREEBSD
@@ -1605,9 +1788,9 @@ static int ps_read (void)
 
        kvm_t *kd;
        char errbuf[1024];
-       struct kinfo_proc *procs;          /* array of processes */
+       struct kinfo_proc *procs;          /* array of processes */
        struct kinfo_proc *proc_ptr = NULL;
-       int count;                         /* returns number of processes */
+       int count;                         /* returns number of processes */
        int i;
 
        procstat_t *ps_ptr;
@@ -1778,7 +1961,7 @@ static int ps_read (void)
                                if (procentry[i].pi_pid == 0)
                                        cmdline = "swapper";
                                cargs = cmdline;
-                       }
+                       }
                        else
                        {
                                if (getargs(&procentry[i], sizeof(struct procentry64), arglist, MAXARGLN) >= 0)
@@ -1870,7 +2053,118 @@ static int ps_read (void)
 
        for (ps = list_head_g; ps != NULL; ps = ps->next)
                ps_submit_proc_list (ps);
-#endif /* HAVE_PROCINFO_H */
+/* #endif HAVE_PROCINFO_H */
+
+#elif KERNEL_SOLARIS
+       /*
+        * The Solaris section adds a few more process states and removes some
+        * process states compared to linux. Most notably there is no "PAGING"
+        * and "BLOCKED" state for a process.  The rest is similar to the linux
+        * code.
+        */
+       int running = 0;
+       int sleeping = 0;
+       int zombies = 0;
+       int stopped = 0;
+       int detached = 0;
+       int daemon = 0;
+       int system = 0;
+       int orphan = 0;
+
+       struct dirent *ent;
+       DIR *proc;
+
+       int status;
+       procstat_t *ps_ptr;
+       char state;
+
+       char cmdline[PRARGSZ];
+
+       ps_list_reset ();
+
+       proc = opendir ("/proc");
+       if (proc == NULL)
+               return (-1);
+
+       while ((ent = readdir(proc)) != NULL)
+       {
+               int pid;
+               struct procstat ps;
+               procstat_entry_t pse;
+
+               if (!isdigit ((int) ent->d_name[0]))
+                       continue;
+
+               if ((pid = atoi (ent->d_name)) < 1)
+                       continue;
+
+               status = ps_read_process (pid, &ps, &state);
+               if (status != 0)
+               {
+                       DEBUG("ps_read_process failed: %i", status);
+                       continue;
+               }
+
+               pse.id = pid;
+               pse.age = 0;
+
+               pse.num_proc   = ps.num_proc;
+               pse.num_lwp    = ps.num_lwp;
+               pse.vmem_size  = ps.vmem_size;
+               pse.vmem_rss   = ps.vmem_rss;
+               pse.vmem_data  = ps.vmem_data;
+               pse.vmem_code  = ps.vmem_code;
+               pse.stack_size = ps.stack_size;
+
+               pse.vmem_minflt = 0;
+               pse.vmem_minflt_counter = ps.vmem_minflt_counter;
+               pse.vmem_majflt = 0;
+               pse.vmem_majflt_counter = ps.vmem_majflt_counter;
+
+               pse.cpu_user = 0;
+               pse.cpu_user_counter = ps.cpu_user_counter;
+               pse.cpu_system = 0;
+               pse.cpu_system_counter = ps.cpu_system_counter;
+
+               pse.io_rchar = ps.io_rchar;
+               pse.io_wchar = ps.io_wchar;
+               pse.io_syscr = ps.io_syscr;
+               pse.io_syscw = ps.io_syscw;
+
+               switch (state)
+               {
+                       case 'R': running++;  break;
+                       case 'S': sleeping++; break;
+                       case 'E': detached++; break;
+                       case 'Z': zombies++;  break;
+                       case 'T': stopped++;  break;
+                       case 'A': daemon++;   break;
+                       case 'Y': system++;   break;
+                       case 'O': orphan++;   break;
+               }
+
+
+               ps_list_add (ps.name,
+                               ps_get_cmdline ((pid_t) pid,
+                                       cmdline, sizeof (cmdline)),
+                               &pse);
+       } /* while(readdir) */
+       closedir (proc);
+
+       ps_submit_state ("running",  running);
+       ps_submit_state ("sleeping", sleeping);
+       ps_submit_state ("zombies",  zombies);
+       ps_submit_state ("stopped",  stopped);
+       ps_submit_state ("detached", detached);
+       ps_submit_state ("daemon",   daemon);
+       ps_submit_state ("system",   system);
+       ps_submit_state ("orphan",   orphan);
+
+       for (ps_ptr = list_head_g; ps_ptr != NULL; ps_ptr = ps_ptr->next)
+               ps_submit_proc_list (ps_ptr);
+
+       read_fork_rate();
+#endif /* KERNEL_SOLARIS */
 
        return (0);
 } /* int ps_read */
index 86062d9..439cf4b 100644 (file)
@@ -54,6 +54,7 @@ struct redis_node_s
 {
   char name[MAX_REDIS_NODE_NAME];
   char host[HOST_NAME_MAX];
+  char passwd[HOST_NAME_MAX];
   int port;
   int timeout;
 
@@ -136,6 +137,8 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */
     }
     else if (strcasecmp ("Timeout", option->key) == 0)
       status = cf_util_get_int (option, &rn.timeout);
+    else if (strcasecmp ("Password", option->key) == 0)
+      status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd));
     else
       WARNING ("redis plugin: Option `%s' not allowed inside a `Node' "
           "block. I'll ignore this option.", option->key);
@@ -255,6 +258,18 @@ static int redis_read (void) /* {{{ */
       continue;
     }
 
+    if (strlen (rn->passwd) > 0)
+    {
+      DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd);
+      status = credis_auth(rh, rn->passwd);
+      if (status != 0)
+      {
+        WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name);
+        credis_close (rh);
+        continue;
+      }
+    }
+
     memset (&info, 0, sizeof (info));
     status = credis_info (rh, &info);
     if (status != 0)
index 11c1c6a..45553b7 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/rrdcached.c
- * Copyright (C) 2008  Florian octo Forster
+ * Copyright (C) 2008-2012  Florian octo Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -16,7 +16,7 @@
  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
  *
  * Authors:
- *   Florian octo Forster <octo at verplant.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
@@ -33,8 +33,8 @@
  */
 static char *datadir = NULL;
 static char *daemon_address = NULL;
-static int config_create_files = 1;
-static int config_collect_stats = 1;
+static _Bool config_create_files = 1;
+static _Bool config_collect_stats = 1;
 static rrdcreate_config_t rrdcreate_config =
 {
        /* stepsize = */ 0,
@@ -161,89 +161,130 @@ static int value_list_to_filename (char *buffer, int buffer_len,
   return (0);
 } /* int value_list_to_filename */
 
-static const char *config_get_string (oconfig_item_t *ci)
+static int rc_config_get_int_positive (oconfig_item_t const *ci, int *ret)
 {
-  if ((ci->children_num != 0) || (ci->values_num != 1)
-      || ((ci->values[0].type != OCONFIG_TYPE_STRING)
-        && (ci->values[0].type != OCONFIG_TYPE_BOOLEAN)))
+  int status;
+  int tmp = 0;
+
+  status = cf_util_get_int (ci, &tmp);
+  if (status != 0)
+    return (status);
+  if (tmp < 0)
+    return (EINVAL);
+
+  *ret = tmp;
+  return (0);
+} /* int rc_config_get_int_positive */
+
+static int rc_config_get_xff (oconfig_item_t const *ci, double *ret)
+{
+  double value;
+
+  if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
   {
-    ERROR ("rrdcached plugin: %s expects a single string argument.",
-        ci->key);
-    return (NULL);
+    ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
+        "in the range [0.0, 1.0)", ci->key);
+    return (EINVAL);
   }
 
-  if (ci->values[0].type == OCONFIG_TYPE_BOOLEAN) {
-    if (ci->values[0].value.boolean)
-      return "true";
-    else
-      return "false";
+  value = ci->values[0].value.number;
+  if ((value >= 0.0) && (value < 1.0))
+  {
+    *ret = value;
+    return (0);
   }
-  return (ci->values[0].value.string);
-} /* const char *config_get_string */
+
+  ERROR ("rrdcached plugin: The \"%s\" needs exactly one numeric argument "
+      "in the range [0.0, 1.0)", ci->key);
+  return (EINVAL);
+} /* int rc_config_get_xff */
+
+static int rc_config_add_timespan (int timespan)
+{
+  int *tmp;
+
+  if (timespan <= 0)
+    return (EINVAL);
+
+  tmp = realloc (rrdcreate_config.timespans,
+      sizeof (*rrdcreate_config.timespans)
+      * (rrdcreate_config.timespans_num + 1));
+  if (tmp == NULL)
+    return (ENOMEM);
+  rrdcreate_config.timespans = tmp;
+
+  rrdcreate_config.timespans[rrdcreate_config.timespans_num] = timespan;
+  rrdcreate_config.timespans_num++;
+
+  return (0);
+} /* int rc_config_add_timespan */
 
 static int rc_config (oconfig_item_t *ci)
 {
   int i;
 
-  for (i = 0; i < ci->children_num; ++i) {
-    const char *key = ci->children[i].key;
-    const char *value = config_get_string (ci->children + i);
-
-    if (value == NULL) /* config_get_strings prints error message */
-      continue;
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t const *child = ci->children + i;
+    const char *key = child->key;
+    int status = 0;
 
     if (strcasecmp ("DataDir", key) == 0)
     {
-      if (datadir != NULL)
-        free (datadir);
-      datadir = strdup (value);
-      if (datadir != NULL)
+      status = cf_util_get_string (child, &datadir);
+      if (status == 0)
       {
         int len = strlen (datadir);
+
         while ((len > 0) && (datadir[len - 1] == '/'))
         {
           len--;
-          datadir[len] = '\0';
+          datadir[len] = 0;
         }
+
         if (len <= 0)
-        {
-          free (datadir);
-          datadir = NULL;
-        }
+          sfree (datadir);
       }
     }
     else if (strcasecmp ("DaemonAddress", key) == 0)
-    {
-      sfree (daemon_address);
-      daemon_address = strdup (value);
-      if (daemon_address == NULL)
-      {
-        ERROR ("rrdcached plugin: strdup failed.");
-        continue;
-      }
-    }
+      status = cf_util_get_string (child, &daemon_address);
     else if (strcasecmp ("CreateFiles", key) == 0)
+      status = cf_util_get_boolean (child, &config_create_files);
+    else if (strcasecmp ("CollectStatistics", key) == 0)
+      status = cf_util_get_boolean (child, &config_collect_stats);
+    else if (strcasecmp ("StepSize", key) == 0)
     {
-      if (IS_FALSE (value))
-        config_create_files = 0;
-      else
-        config_create_files = 1;
+      int tmp = -1;
+
+      status = rc_config_get_int_positive (child, &tmp);
+      if (status == 0)
+        rrdcreate_config.stepsize = (unsigned long) tmp;
     }
-    else if (strcasecmp ("CollectStatistics", key) == 0)
+    else if (strcasecmp ("HeartBeat", key) == 0)
+      status = rc_config_get_int_positive (child, &rrdcreate_config.heartbeat);
+    else if (strcasecmp ("RRARows", key) == 0)
+      status = rc_config_get_int_positive (child, &rrdcreate_config.rrarows);
+    else if (strcasecmp ("RRATimespan", key) == 0)
     {
-      if (IS_FALSE (value))
-        config_collect_stats = 0;
-      else
-        config_collect_stats = 1;
+      int tmp = -1;
+      status = rc_config_get_int_positive (child, &tmp);
+      if (status == 0)
+        status = rc_config_add_timespan (tmp);
     }
+    else if (strcasecmp ("XFF", key) == 0)
+      status = rc_config_get_xff (child, &rrdcreate_config.xff);
     else
     {
       WARNING ("rrdcached plugin: Ignoring invalid option %s.", key);
       continue;
     }
+
+    if (status != 0)
+      WARNING ("rrdcached plugin: Handling the \"%s\" option failed.", key);
   }
 
-  if (daemon_address != NULL) {
+  if (daemon_address != NULL)
+  {
     plugin_register_write ("rrdcached", rc_write, /* user_data = */ NULL);
     plugin_register_flush ("rrdcached", rc_flush, /* user_data = */ NULL);
   }
@@ -262,7 +303,7 @@ static int rc_read (void)
   if (daemon_address == NULL)
     return (-1);
 
-  if (config_collect_stats == 0)
+  if (!config_collect_stats)
     return (-1);
 
   vl.values = values;
@@ -354,7 +395,7 @@ static int rc_read (void)
 
 static int rc_init (void)
 {
-  if (config_collect_stats != 0)
+  if (config_collect_stats)
     plugin_register_read ("rrdcached", rc_read);
 
   return (0);
@@ -396,7 +437,7 @@ static int rc_write (const data_set_t *ds, const value_list_t *vl,
   values_array[0] = values;
   values_array[1] = NULL;
 
-  if (config_create_files != 0)
+  if (config_create_files)
   {
     struct stat statbuf;
 
index 397969e..46d3534 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/swap.c
- * Copyright (C) 2005-2010  Florian octo Forster
+ * Copyright (C) 2005-2012  Florian octo Forster
  * Copyright (C) 2009       Stefan Völkel
  * Copyright (C) 2009       Manuel Sanmartin
  * Copyright (C) 2010       Aurélien Reynaud
 #define MAX(x,y) ((x) > (y) ? (x) : (y))
 
 #if KERNEL_LINUX
-# define SWAP_HAVE_CONFIG 1
-/* No global variables */
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
+static derive_t pagesize;
+static _Bool report_bytes = 0;
+static _Bool report_by_device = 0;
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
-# define SWAP_HAVE_CONFIG 1
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
 static derive_t pagesize;
+static _Bool report_by_device = 0;
 /* #endif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS */
 
 #elif defined(VM_SWAPUSAGE)
@@ -101,23 +104,37 @@ static perfstat_memory_total_t pmemory;
 # error "No applicable input method."
 #endif /* HAVE_LIBSTATGRAB */
 
-#if SWAP_HAVE_CONFIG
 static const char *config_keys[] =
 {
+       "ReportBytes",
        "ReportByDevice"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
-static _Bool report_by_device = 0;
-
 static int swap_config (const char *key, const char *value) /* {{{ */
 {
-       if (strcasecmp ("ReportByDevice", key) == 0)
+       if (strcasecmp ("ReportBytes", key) == 0)
+       {
+#if KERNEL_LINUX
+               report_bytes = IS_TRUE (value) ? 1 : 0;
+#else
+               WARNING ("swap plugin: The \"ReportBytes\" option is only "
+                               "valid under Linux. "
+                               "The option is going to be ignored.");
+#endif
+       }
+       else if (strcasecmp ("ReportByDevice", key) == 0)
        {
+#if SWAP_HAVE_REPORT_BY_DEVICE
                if (IS_TRUE (value))
                        report_by_device = 1;
                else
                        report_by_device = 0;
+#else
+               WARNING ("swap plugin: The \"ReportByDevice\" option is not "
+                               "supported on this platform. "
+                               "The option is going to be ignored.");
+#endif /* SWAP_HAVE_REPORT_BY_DEVICE */
        }
        else
        {
@@ -126,12 +143,11 @@ static int swap_config (const char *key, const char *value) /* {{{ */
 
        return (0);
 } /* }}} int swap_config */
-#endif /* SWAP_HAVE_CONFIG */
 
 static int swap_init (void) /* {{{ */
 {
 #if KERNEL_LINUX
-       /* No init stuff */
+       pagesize = (derive_t) sysconf (_SC_PAGESIZE);
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
@@ -205,7 +221,7 @@ static void swap_submit_gauge (const char *plugin_instance, /* {{{ */
        swap_submit (plugin_instance, "swap", type_instance, v);
 } /* }}} void swap_submit_gauge */
 
-#if KERNEL_LINUX
+#if KERNEL_LINUX || HAVE_PERFSTAT
 static void swap_submit_derive (const char *plugin_instance, /* {{{ */
                const char *type_instance, derive_t value)
 {
@@ -214,7 +230,9 @@ static void swap_submit_derive (const char *plugin_instance, /* {{{ */
        v.derive = value;
        swap_submit (plugin_instance, "swap_io", type_instance, v);
 } /* }}} void swap_submit_derive */
+#endif
 
+#if KERNEL_LINUX
 static int swap_read_separate (void) /* {{{ */
 {
        FILE *fh;
@@ -406,6 +424,12 @@ static int swap_read_io (void) /* {{{ */
        if (have_data != 0x03)
                return (ENOENT);
 
+       if (report_bytes)
+       {
+               swap_in = swap_in * pagesize;
+               swap_out = swap_out * pagesize;
+       }
+
        swap_submit_derive (NULL, "in",  swap_in);
        swap_submit_derive (NULL, "out", swap_out);
 
@@ -764,8 +788,12 @@ static int swap_read (void) /* {{{ */
                         sstrerror (errno, errbuf, sizeof (errbuf)));
                 return (-1);
         }
+
        swap_submit_gauge (NULL, "used", (gauge_t) (pmemory.pgsp_total - pmemory.pgsp_free) * pagesize);
        swap_submit_gauge (NULL, "free", (gauge_t) pmemory.pgsp_free * pagesize );
+       swap_submit_gauge (NULL, "reserved", (gauge_t) pmemory.pgsp_rsvd * pagesize);
+       swap_submit_derive (NULL, "in",  (derive_t) pmemory.pgspins * pagesize);
+       swap_submit_derive (NULL, "out", (derive_t) pmemory.pgspouts * pagesize);
 
        return (0);
 } /* }}} int swap_read */
@@ -773,9 +801,8 @@ static int swap_read (void) /* {{{ */
 
 void module_register (void)
 {
-#if SWAP_HAVE_CONFIG
-       plugin_register_config ("swap", swap_config, config_keys, config_keys_num);
-#endif
+       plugin_register_config ("swap", swap_config,
+                       config_keys, config_keys_num);
        plugin_register_init ("swap", swap_init);
        plugin_register_read ("swap", swap_read);
 } /* void module_register */
index 3c8fc72..765b892 100644 (file)
 #endif
 
 #if KERNEL_LINUX
+# include <asm/types.h>
+/* sys/socket.h is necessary to compile when using netlink on older systems. */
+# include <sys/socket.h>
+# include <linux/netlink.h>
+# include <linux/inet_diag.h>
+# include <sys/socket.h>
+# include <arpa/inet.h>
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SYSCTLBYNAME
 #endif /* KERNEL_AIX */
 
 #if KERNEL_LINUX
+struct nlreq {
+  struct nlmsghdr nlh;
+  struct inet_diag_req r;
+};
+
 static const char *tcp_state[] =
 {
   "", /* 0 */
@@ -263,6 +275,17 @@ static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 static int port_collect_listening = 0;
 static port_entry_t *port_list_head = NULL;
 
+#if KERNEL_LINUX
+static uint32_t sequence_number = 0;
+
+enum
+{
+  SRC_DUNNO,
+  SRC_NETLINK,
+  SRC_PROC
+} linux_source = SRC_DUNNO;
+#endif
+
 static void conn_submit_port_entry (port_entry_t *pe)
 {
   value_t values[1];
@@ -419,6 +442,140 @@ static int conn_handle_ports (uint16_t port_local, uint16_t port_remote, uint8_t
 } /* int conn_handle_ports */
 
 #if KERNEL_LINUX
+/* Returns zero on success, less than zero on socket error and greater than
+ * zero on other errors. */
+static int conn_read_netlink (void)
+{
+  int fd;
+  struct sockaddr_nl nladdr;
+  struct nlreq req;
+  struct msghdr msg;
+  struct iovec iov;
+  struct inet_diag_msg *r;
+  char buf[8192];
+
+  /* If this fails, it's likely a permission problem. We'll fall back to
+   * reading this information from files below. */
+  fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG);
+  if (fd < 0)
+  {
+    ERROR ("tcpconns plugin: conn_read_netlink: socket(AF_NETLINK, SOCK_RAW, "
+       "NETLINK_INET_DIAG) failed: %s",
+       sstrerror (errno, buf, sizeof (buf)));
+    return (-1);
+  }
+
+  memset(&nladdr, 0, sizeof(nladdr));
+  nladdr.nl_family = AF_NETLINK;
+
+  memset(&req, 0, sizeof(req));
+  req.nlh.nlmsg_len = sizeof(req);
+  req.nlh.nlmsg_type = TCPDIAG_GETSOCK;
+  /* NLM_F_ROOT: return the complete table instead of a single entry.
+   * NLM_F_MATCH: return all entries matching criteria (not implemented)
+   * NLM_F_REQUEST: must be set on all request messages */
+  req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
+  req.nlh.nlmsg_pid = 0;
+  /* The sequence_number is used to track our messages. Since netlink is not
+   * reliable, we don't want to end up with a corrupt or incomplete old
+   * message in case the system is/was out of memory. */
+  req.nlh.nlmsg_seq = ++sequence_number;
+  req.r.idiag_family = AF_INET;
+  req.r.idiag_states = 0xfff;
+  req.r.idiag_ext = 0;
+
+  memset(&iov, 0, sizeof(iov));
+  iov.iov_base = &req;
+  iov.iov_len = sizeof(req);
+
+  memset(&msg, 0, sizeof(msg));
+  msg.msg_name = (void*)&nladdr;
+  msg.msg_namelen = sizeof(nladdr);
+  msg.msg_iov = &iov;
+  msg.msg_iovlen = 1;
+
+  if (sendmsg (fd, &msg, 0) < 0)
+  {
+    ERROR ("tcpconns plugin: conn_read_netlink: sendmsg(2) failed: %s",
+       sstrerror (errno, buf, sizeof (buf)));
+    close (fd);
+    return (-1);
+  }
+
+  iov.iov_base = buf;
+  iov.iov_len = sizeof(buf);
+
+  while (1)
+  {
+    int status;
+    struct nlmsghdr *h;
+
+    memset(&msg, 0, sizeof(msg));
+    msg.msg_name = (void*)&nladdr;
+    msg.msg_namelen = sizeof(nladdr);
+    msg.msg_iov = &iov;
+    msg.msg_iovlen = 1;
+
+    status = recvmsg(fd, (void *) &msg, /* flags = */ 0);
+    if (status < 0)
+    {
+      if ((errno == EINTR) || (errno == EAGAIN))
+        continue;
+
+      ERROR ("tcpconns plugin: conn_read_netlink: recvmsg(2) failed: %s",
+         sstrerror (errno, buf, sizeof (buf)));
+      close (fd);
+      return (-1);
+    }
+    else if (status == 0)
+    {
+      close (fd);
+      DEBUG ("tcpconns plugin: conn_read_netlink: Unexpected zero-sized "
+         "reply from netlink socket.");
+      return (0);
+    }
+
+    h = (struct nlmsghdr*)buf;
+    while (NLMSG_OK(h, status))
+    {
+      if (h->nlmsg_seq != sequence_number)
+      {
+       h = NLMSG_NEXT(h, status);
+       continue;
+      }
+
+      if (h->nlmsg_type == NLMSG_DONE)
+      {
+       close (fd);
+       return (0);
+      }
+      else if (h->nlmsg_type == NLMSG_ERROR)
+      {
+       struct nlmsgerr *msg_error;
+
+       msg_error = NLMSG_DATA(h);
+       WARNING ("tcpconns plugin: conn_read_netlink: Received error %i.",
+           msg_error->error);
+
+       close (fd);
+       return (1);
+      }
+
+      r = NLMSG_DATA(h);
+
+      /* This code does not (need to) distinguish between IPv4 and IPv6. */
+      conn_handle_ports (ntohs(r->id.idiag_sport),
+         ntohs(r->id.idiag_dport),
+         r->idiag_state);
+
+      h = NLMSG_NEXT(h, status);
+    } /* while (NLMSG_OK) */
+  } /* while (1) */
+
+  /* Not reached because the while() loop above handles the exit condition. */
+  return (0);
+} /* int conn_read_netlink */
+
 static int conn_handle_line (char *buffer)
 {
   char *fields[32];
@@ -553,26 +710,55 @@ static int conn_init (void)
 
 static int conn_read (void)
 {
-  int errors_num = 0;
+  int status;
 
   conn_reset_port_entry ();
 
-  if (conn_read_file ("/proc/net/tcp") != 0)
-    errors_num++;
-  if (conn_read_file ("/proc/net/tcp6") != 0)
-    errors_num++;
-
-  if (errors_num < 2)
+  if (linux_source == SRC_NETLINK)
   {
-    conn_submit_all ();
+    status = conn_read_netlink ();
   }
-  else
+  else if (linux_source == SRC_PROC)
   {
-    ERROR ("tcpconns plugin: Neither /proc/net/tcp nor /proc/net/tcp6 "
-       "coult be read.");
-    return (-1);
+    int errors_num = 0;
+
+    if (conn_read_file ("/proc/net/tcp") != 0)
+      errors_num++;
+    if (conn_read_file ("/proc/net/tcp6") != 0)
+      errors_num++;
+
+    if (errors_num < 2)
+      status = 0;
+    else
+      status = ENOENT;
+  }
+  else /* if (linux_source == SRC_DUNNO) */
+  {
+    /* Try to use netlink for getting this data, it is _much_ faster on systems
+     * with a large amount of connections. */
+    status = conn_read_netlink ();
+    if (status == 0)
+    {
+      INFO ("tcpconns plugin: Reading from netlink succeeded. "
+         "Will use the netlink method from now on.");
+      linux_source = SRC_NETLINK;
+    }
+    else
+    {
+      INFO ("tcpconns plugin: Reading from netlink failed. "
+         "Will read from /proc from now on.");
+      linux_source = SRC_PROC;
+
+      /* return success here to avoid the "plugin failed" message. */
+      return (0);
+    }
   }
 
+  if (status == 0)
+    conn_submit_all ();
+  else
+    return (status);
+
   return (0);
 } /* int conn_read */
 /* #endif KERNEL_LINUX */
index d2ba963..064c3ce 100644 (file)
 /* Using sysctl interface to retrieve the boot time on *BSD / Darwin / OS X systems */
 /* #endif HAVE_SYS_SYSCTL_H */
 
+#elif HAVE_PERFSTAT
+# include <sys/protosw.h>
+# include <libperfstat.h>
+/* Using perfstat_cpu_total to retrive the boot time in AIX */
+/* #endif HAVE_PERFSTAT */
+
 #else
 # error "No applicable input method."
 #endif
@@ -203,7 +209,29 @@ static int uptime_init (void) /* {{{ */
                                "but `boottime' is zero!");
                return (-1);
        }
-#endif /* HAVE_SYS_SYSCTL_H */
+/* #endif HAVE_SYS_SYSCTL_H */
+
+#elif HAVE_PERFSTAT
+       int status;
+       perfstat_cpu_total_t cputotal;
+       int hertz;
+
+       status = perfstat_cpu_total(NULL, &cputotal,
+               sizeof(perfstat_cpu_total_t), 1);
+       if (status < 0)
+       {
+               char errbuf[1024];
+               ERROR ("uptime plugin: perfstat_cpu_total: %s",
+                       sstrerror (errno, errbuf, sizeof (errbuf)));
+               return (-1);
+       }
+
+       hertz = sysconf(_SC_CLK_TCK);
+       if (hertz <= 0)
+               hertz = HZ;
+
+       boottime = time(NULL) - cputotal.lbolt / hertz;
+#endif /* HAVE_PERFSTAT */
 
        return (0);
 } /* }}} int uptime_init */
index dd5bcb5..fa6e660 100644 (file)
@@ -572,6 +572,7 @@ int uc_get_names (char ***ret_names, cdtime_t **ret_times, size_t *ret_number)
   char **names = NULL;
   cdtime_t *times = NULL;
   size_t number = 0;
+  size_t size_arrays = 0;
 
   int status = 0;
 
@@ -580,42 +581,47 @@ int uc_get_names (char ***ret_names, cdtime_t **ret_times, size_t *ret_number)
 
   pthread_mutex_lock (&cache_lock);
 
+  size_arrays = (size_t) c_avl_size (cache_tree);
+  if (size_arrays < 1)
+  {
+    /* Handle the "no values" case here, to avoid the error message when
+     * calloc() returns NULL. */
+    pthread_mutex_unlock (&cache_lock);
+    return (0);
+  }
+
+  names = calloc (size_arrays, sizeof (*names));
+  times = calloc (size_arrays, sizeof (*times));
+  if ((names == NULL) || (times == NULL))
+  {
+    ERROR ("uc_get_names: calloc failed.");
+    sfree (names);
+    sfree (times);
+    pthread_mutex_unlock (&cache_lock);
+    return (ENOMEM);
+  }
+
   iter = c_avl_get_iterator (cache_tree);
   while (c_avl_iterator_next (iter, (void *) &key, (void *) &value) == 0)
   {
-    char **temp;
-
     /* remove missing values when list values */
     if (value->state == STATE_MISSING)
       continue;
 
-    if (ret_times != NULL)
-    {
-      cdtime_t *tmp_times;
+    /* c_avl_size does not return a number smaller than the number of elements
+     * returned by c_avl_iterator_next. */
+    assert (number < size_arrays);
 
-      tmp_times = (cdtime_t *) realloc (times, sizeof (cdtime_t) * (number + 1));
-      if (tmp_times == NULL)
-      {
-       status = -1;
-       break;
-      }
-      times = tmp_times;
+    if (ret_times != NULL)
       times[number] = value->last_time;
-    }
 
-    temp = (char **) realloc (names, sizeof (char *) * (number + 1));
-    if (temp == NULL)
-    {
-      status = -1;
-      break;
-    }
-    names = temp;
     names[number] = strdup (key);
     if (names[number] == NULL)
     {
       status = -1;
       break;
     }
+
     number++;
   } /* while (c_avl_iterator_next) */
 
diff --git a/src/utils_format_graphite.c b/src/utils_format_graphite.c
new file mode 100644 (file)
index 0000000..b9b906f
--- /dev/null
@@ -0,0 +1,240 @@
+/**
+ * collectd - src/utils_format_graphite.c
+ * Copyright (C) 2012  Thomas Meson
+ * Copyright (C) 2012  Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Thomas Meson <zllak at hycik.org>
+ *   Florian octo Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+
+#include "utils_cache.h"
+#include "utils_format_json.h"
+#include "utils_parse_option.h"
+
+/* Utils functions to format data sets in graphite format.
+ * Largely taken from write_graphite.c as it remains the same formatting */
+
+static int gr_format_values (char *ret, size_t ret_len,
+        int ds_num, const data_set_t *ds, const value_list_t *vl,
+        gauge_t const *rates)
+{
+    size_t offset = 0;
+    int status;
+
+    assert (0 == strcmp (ds->type, vl->type));
+
+    memset (ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+    status = ssnprintf (ret + offset, ret_len - offset, \
+            __VA_ARGS__); \
+    if (status < 1) \
+    { \
+        return (-1); \
+    } \
+    else if (((size_t) status) >= (ret_len - offset)) \
+    { \
+        return (-1); \
+    } \
+    else \
+    offset += ((size_t) status); \
+} while (0)
+
+    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+        BUFFER_ADD ("%f", vl->values[ds_num].gauge);
+    else if (rates != NULL)
+        BUFFER_ADD ("%f", rates[ds_num]);
+    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+        BUFFER_ADD ("%llu", vl->values[ds_num].counter);
+    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+        BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
+    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+        BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
+    else
+    {
+        ERROR ("gr_format_values plugin: Unknown data source type: %i",
+                ds->ds[ds_num].type);
+        return (-1);
+    }
+
+#undef BUFFER_ADD
+
+    return (0);
+}
+
+static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len,
+    char escape_char)
+{
+    size_t i;
+
+    memset (dst, 0, dst_len);
+
+    if (src == NULL)
+        return;
+
+    for (i = 0; i < dst_len; i++)
+    {
+        if (src[i] == 0)
+        {
+            dst[i] = 0;
+            break;
+        }
+
+        if ((src[i] == '.')
+                || isspace ((int) src[i])
+                || iscntrl ((int) src[i]))
+            dst[i] = escape_char;
+        else
+            dst[i] = src[i];
+    }
+}
+
+static int gr_format_name (char *ret, int ret_len,
+        const value_list_t *vl,
+        const char *ds_name,
+        char *prefix,
+        char *postfix,
+        char escape_char)
+{
+    char n_host[DATA_MAX_NAME_LEN];
+    char n_plugin[DATA_MAX_NAME_LEN];
+    char n_plugin_instance[DATA_MAX_NAME_LEN];
+    char n_type[DATA_MAX_NAME_LEN];
+    char n_type_instance[DATA_MAX_NAME_LEN];
+
+    char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
+    char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
+
+    if (prefix == NULL)
+        prefix = "";
+
+    if (postfix == NULL)
+        postfix = "";
+
+    gr_copy_escape_part (n_host, vl->host,
+            sizeof (n_host), escape_char);
+    gr_copy_escape_part (n_plugin, vl->plugin,
+            sizeof (n_plugin), escape_char);
+    gr_copy_escape_part (n_plugin_instance, vl->plugin_instance,
+            sizeof (n_plugin_instance), escape_char);
+    gr_copy_escape_part (n_type, vl->type,
+            sizeof (n_type), escape_char);
+    gr_copy_escape_part (n_type_instance, vl->type_instance,
+            sizeof (n_type_instance), escape_char);
+
+    if (n_plugin_instance[0] != '\0')
+        ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
+            n_plugin,
+            '-',
+            n_plugin_instance);
+    else
+        sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
+
+    if (n_type_instance[0] != '\0')
+        ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
+            n_type,
+            '-',
+            n_type_instance);
+    else
+        sstrncpy (tmp_type, n_type, sizeof (tmp_type));
+
+    if (ds_name != NULL)
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
+    else
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type);
+
+    return (0);
+}
+
+int format_graphite (char *buffer, size_t buffer_size,
+    const data_set_t *ds, const value_list_t *vl, char *prefix,
+    char *postfix, char escape_char,
+    _Bool store_rates)
+{
+    int status = 0;
+    int i;
+    int buffer_pos = 0;
+
+    gauge_t *rates = NULL;
+    if (store_rates)
+      rates = uc_get_rate (ds, vl);
+
+    for (i = 0; i < ds->ds_num; i++)
+    {
+        const char *ds_name = NULL;
+        char        key[10*DATA_MAX_NAME_LEN];
+        char        values[512];
+        size_t      message_len;
+        char        message[1024];
+
+        ds_name = ds->ds[i].name;
+
+        /* Copy the identifier to `key' and escape it. */
+        status = gr_format_name (key, sizeof (key), vl, ds_name,
+                    prefix, postfix, escape_char);
+        if (status != 0)
+        {
+            ERROR ("format_graphite: error with gr_format_name");
+            sfree (rates);
+            return (status);
+        }
+
+        escape_string (key, sizeof (key));
+        /* Convert the values to an ASCII representation and put that into
+         * `values'. */
+        status = gr_format_values (values, sizeof (values), i, ds, vl, rates);
+        if (status != 0)
+        {
+            ERROR ("format_graphite: error with gr_format_values");
+            sfree (rates);
+            return (status);
+        }
+
+        /* Compute the graphite command */
+        message_len = (size_t) ssnprintf (message, sizeof (message),
+                "%s %s %u\r\n",
+                key,
+                values,
+                (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+        if (message_len >= sizeof (message)) {
+            ERROR ("format_graphite: message buffer too small: "
+                    "Need %zu bytes.", message_len + 1);
+            sfree (rates);
+            return (-ENOMEM);
+        }
+
+        /* Append it in case we got multiple data set */
+        if ((buffer_pos + message_len) >= buffer_size)
+        {
+            ERROR ("format_graphite: target buffer too small");
+            sfree (rates);
+            return (-ENOMEM);
+        }
+        memcpy((void *) (buffer + buffer_pos), message, message_len);
+        buffer_pos += message_len;
+    }
+    sfree (rates);
+    return (status);
+} /* int format_graphite */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
diff --git a/src/utils_format_graphite.h b/src/utils_format_graphite.h
new file mode 100644 (file)
index 0000000..a7a05bb
--- /dev/null
@@ -0,0 +1,34 @@
+/**
+ * collectd - src/utils_format_graphite.h
+ * Copyright (C) 2012  Thomas Meson
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Thomas Meson <zllak at hycik.org>
+ **/
+
+#ifndef UTILS_FORMAT_GRAPHITE_H
+#define UTILS_FORMAT_GRAPHITE_H 1
+
+#include "collectd.h"
+#include "plugin.h"
+
+int format_graphite (char *buffer,
+    size_t buffer_size, const data_set_t *ds,
+    const value_list_t *vl, const char *prefix,
+    const char *postfix, const char escape_char,
+    _Bool store_rates);
+
+#endif /* UTILS_FORMAT_GRAPHITE_H */
index 2a5526b..bbc3dfd 100644 (file)
@@ -70,7 +70,7 @@ static int escape_string (char *buffer, size_t buffer_size, /* {{{ */
 #undef BUFFER_ADD
 
   return (0);
-} /* }}} int buffer_add_string */
+} /* }}} int escape_string */
 
 static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
                 const data_set_t *ds, const value_list_t *vl, int store_rates)
@@ -152,7 +152,7 @@ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
 } /* }}} int values_to_json */
 
 static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */
-                const data_set_t *ds, const value_list_t *vl)
+                const data_set_t *ds)
 {
   size_t offset = 0;
   int i;
@@ -189,7 +189,7 @@ static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */
 } /* }}} int dstypes_to_json */
 
 static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */
-                const data_set_t *ds, const value_list_t *vl)
+                const data_set_t *ds)
 {
   size_t offset = 0;
   int i;
@@ -225,6 +225,86 @@ static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */
   return (0);
 } /* }}} int dsnames_to_json */
 
+static int meta_data_to_json (char *buffer, size_t buffer_size, /* {{{ */
+    meta_data_t *meta)
+{
+  size_t offset = 0;
+  char **keys = NULL;
+  int keys_num;
+  int status;
+  int i;
+
+  memset (buffer, 0, buffer_size);
+
+#define BUFFER_ADD(...) do { \
+  status = ssnprintf (buffer + offset, buffer_size - offset, \
+      __VA_ARGS__); \
+  if (status < 1) \
+    return (-1); \
+  else if (((size_t) status) >= (buffer_size - offset)) \
+    return (-ENOMEM); \
+  else \
+    offset += ((size_t) status); \
+} while (0)
+
+  keys_num = meta_data_toc (meta, &keys);
+  for (i = 0; i < keys_num; ++i)
+  {
+    int type;
+    char *key = keys[i];
+
+    type = meta_data_type (meta, key);
+    if (type == MD_TYPE_STRING)
+    {
+      char *value = NULL;
+      if (meta_data_get_string (meta, key, &value) == 0)
+      {
+        char temp[512] = "";
+        escape_string (temp, sizeof (temp), value);
+        sfree (value);
+        BUFFER_ADD (",\"%s\":%s", key, temp);
+      }
+    }
+    else if (type == MD_TYPE_SIGNED_INT)
+    {
+      int64_t value = 0;
+      if (meta_data_get_signed_int (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%"PRIi64, key, value);
+    }
+    else if (type == MD_TYPE_UNSIGNED_INT)
+    {
+      uint64_t value = 0;
+      if (meta_data_get_unsigned_int (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%"PRIu64, key, value);
+    }
+    else if (type == MD_TYPE_DOUBLE)
+    {
+      double value = 0.0;
+      if (meta_data_get_double (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%f", key, value);
+    }
+    else if (type == MD_TYPE_BOOLEAN)
+    {
+      _Bool value = 0;
+      if (meta_data_get_boolean (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%s", key, value ? "true" : "false");
+    }
+
+    free (key);
+  } /* for (keys) */
+  free (keys);
+
+  if (offset <= 0)
+    return (ENOENT);
+
+  buffer[0] = '{'; /* replace leading ',' */
+  BUFFER_ADD ("}");
+
+#undef BUFFER_ADD
+
+  return (0);
+} /* int meta_data_to_json */
+
 static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
                 const data_set_t *ds, const value_list_t *vl, int store_rates)
 {
@@ -254,12 +334,12 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
     return (status);
   BUFFER_ADD ("\"values\":%s", temp);
 
-  status = dstypes_to_json (temp, sizeof (temp), ds, vl);
+  status = dstypes_to_json (temp, sizeof (temp), ds);
   if (status != 0)
     return (status);
   BUFFER_ADD (",\"dstypes\":%s", temp);
 
-  status = dsnames_to_json (temp, sizeof (temp), ds, vl);
+  status = dsnames_to_json (temp, sizeof (temp), ds);
   if (status != 0)
     return (status);
   BUFFER_ADD (",\"dsnames\":%s", temp);
@@ -280,6 +360,17 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
   BUFFER_ADD_KEYVAL ("type", vl->type);
   BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance);
 
+  if (vl->meta != NULL)
+  {
+    char meta_buffer[buffer_size];
+    memset (meta_buffer, 0, sizeof (meta_buffer));
+    status = meta_data_to_json (meta_buffer, sizeof (meta_buffer), vl->meta);
+    if (status != 0)
+      return (status);
+
+    BUFFER_ADD (",\"meta\":%s", meta_buffer);
+  } /* if (vl->meta != NULL) */
+
   BUFFER_ADD ("}");
 
 #undef BUFFER_ADD_KEYVAL
diff --git a/src/utils_vl_lookup.c b/src/utils_vl_lookup.c
new file mode 100644 (file)
index 0000000..2dada24
--- /dev/null
@@ -0,0 +1,541 @@
+/**
+ * collectd - src/utils_vl_lookup.c
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "utils_vl_lookup.h"
+#include "utils_avltree.h"
+
+#if BUILD_TEST
+# define sstrncpy strncpy
+# define plugin_log(s, ...) do { \
+  printf ("[severity %i] ", s); \
+  printf (__VA_ARGS__); \
+  printf ("\n"); \
+} while (0)
+#endif
+
+/*
+ * Types
+ */
+struct lookup_s
+{
+  c_avl_tree_t *by_type_tree;
+
+  lookup_class_callback_t cb_user_class;
+  lookup_obj_callback_t cb_user_obj;
+  lookup_free_class_callback_t cb_free_class;
+  lookup_free_obj_callback_t cb_free_obj;
+};
+
+struct user_obj_s;
+typedef struct user_obj_s user_obj_t;
+struct user_obj_s
+{
+  void *user_obj;
+  identifier_t ident;
+
+  user_obj_t *next;
+};
+
+struct user_class_s
+{
+  void *user_class;
+  identifier_t ident;
+  user_obj_t *user_obj_list; /* list of user_obj */
+};
+typedef struct user_class_s user_class_t;
+
+struct user_class_list_s;
+typedef struct user_class_list_s user_class_list_t;
+struct user_class_list_s
+{
+  user_class_t entry;
+  user_class_list_t *next;
+};
+
+struct by_type_entry_s
+{
+  c_avl_tree_t *by_plugin_tree; /* plugin -> user_class_list_t */
+  user_class_list_t *wildcard_plugin_list;
+};
+typedef struct by_type_entry_s by_type_entry_t;
+
+/*
+ * Private functions
+ */
+static void *lu_create_user_obj (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_t *user_class)
+{
+  user_obj_t *user_obj;
+
+  user_obj = malloc (sizeof (*user_obj));
+  if (user_obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (NULL);
+  }
+  memset (user_obj, 0, sizeof (*user_obj));
+  user_obj->next = NULL;
+
+  user_obj->user_obj = obj->cb_user_class (ds, vl, user_class->user_class);
+  if (user_obj->user_obj == NULL)
+  {
+    sfree (user_obj);
+    WARNING("utils_vl_lookup: User-provided constructor failed.");
+    return (NULL);
+  }
+
+  sstrncpy (user_obj->ident.host,
+    LU_IS_ALL (user_class->ident.host) ?  "/all/" : vl->host,
+    sizeof (user_obj->ident.host));
+  sstrncpy (user_obj->ident.plugin,
+    LU_IS_ALL (user_class->ident.plugin) ?  "/all/" : vl->plugin,
+    sizeof (user_obj->ident.plugin));
+  sstrncpy (user_obj->ident.plugin_instance,
+    LU_IS_ALL (user_class->ident.plugin_instance) ?  "/all/" : vl->plugin_instance,
+    sizeof (user_obj->ident.plugin_instance));
+  sstrncpy (user_obj->ident.type,
+    LU_IS_ALL (user_class->ident.type) ?  "/all/" : vl->type,
+    sizeof (user_obj->ident.type));
+  sstrncpy (user_obj->ident.type_instance,
+    LU_IS_ALL (user_class->ident.type_instance) ?  "/all/" : vl->type_instance,
+    sizeof (user_obj->ident.type_instance));
+
+  if (user_class->user_obj_list == NULL)
+  {
+    user_class->user_obj_list = user_obj;
+  }
+  else
+  {
+    user_obj_t *last = user_class->user_obj_list;
+    while (last->next != NULL)
+      last = last->next;
+    last->next = user_obj;
+  }
+
+  return (user_obj);
+} /* }}} void *lu_create_user_obj */
+
+static user_obj_t *lu_find_user_obj (user_class_t *user_class, /* {{{ */
+    value_list_t const *vl)
+{
+  user_obj_t *ptr;
+
+  for (ptr = user_class->user_obj_list;
+      ptr != NULL;
+      ptr = ptr->next)
+  {
+    if (!LU_IS_ALL (ptr->ident.host)
+        && (strcmp (ptr->ident.host, vl->host) != 0))
+      continue;
+    if (!LU_IS_ALL (ptr->ident.plugin_instance)
+        && (strcmp (ptr->ident.plugin_instance, vl->plugin_instance) != 0))
+      continue;
+    if (!LU_IS_ALL (ptr->ident.type_instance)
+        && (strcmp (ptr->ident.type_instance, vl->type_instance) != 0))
+      continue;
+
+    return (ptr);
+  }
+
+  return (NULL);
+} /* }}} user_obj_t *lu_find_user_obj */
+
+static int lu_handle_user_class (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_t *user_class)
+{
+  user_obj_t *user_obj;
+  int status;
+
+  assert (strcmp (vl->type, user_class->ident.type) == 0);
+  assert (LU_IS_WILDCARD (user_class->ident.plugin)
+      || (strcmp (vl->plugin, user_class->ident.plugin) == 0));
+
+  /* When we get here, type and plugin already match the user class. Now check
+   * the rest of the fields. */
+  if (!LU_IS_WILDCARD (user_class->ident.type_instance)
+      && (strcmp (vl->type_instance, user_class->ident.type_instance) != 0))
+    return (1);
+  if (!LU_IS_WILDCARD (user_class->ident.plugin_instance)
+      && (strcmp (vl->plugin_instance,
+          user_class->ident.plugin_instance) != 0))
+    return (1);
+  if (!LU_IS_WILDCARD (user_class->ident.host)
+      && (strcmp (vl->host, user_class->ident.host) != 0))
+    return (1);
+
+  user_obj = lu_find_user_obj (user_class, vl);
+  if (user_obj == NULL)
+  {
+    /* call lookup_class_callback_t() and insert into the list of user objects. */
+    user_obj = lu_create_user_obj (obj, ds, vl, user_class);
+    if (user_obj == NULL)
+      return (-1);
+  }
+
+  status = obj->cb_user_obj (ds, vl,
+      user_class->user_class, user_obj->user_obj);
+  if (status != 0)
+  {
+    ERROR ("utils_vl_lookup: The user object callback failed with status %i.",
+        status);
+    /* Returning a negative value means: abort! */
+    if (status < 0)
+      return (status);
+    else
+      return (1);
+  }
+
+  return (0);
+} /* }}} int lu_handle_user_class */
+
+static int lu_handle_user_class_list (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl,
+    user_class_list_t *user_class_list)
+{
+  user_class_list_t *ptr;
+  int retval = 0;
+  
+  for (ptr = user_class_list; ptr != NULL; ptr = ptr->next)
+  {
+    int status;
+
+    status = lu_handle_user_class (obj, ds, vl, &ptr->entry);
+    if (status < 0)
+      return (status);
+    else if (status == 0)
+      retval++;
+  }
+
+  return (retval);
+} /* }}} int lu_handle_user_class_list */
+
+static by_type_entry_t *lu_search_by_type (lookup_t *obj, /* {{{ */
+    char const *type, _Bool allocate_if_missing)
+{
+  by_type_entry_t *by_type;
+  char *type_copy;
+  int status;
+
+  status = c_avl_get (obj->by_type_tree, type, (void *) &by_type);
+  if (status == 0)
+    return (by_type);
+
+  if (!allocate_if_missing)
+    return (NULL);
+
+  type_copy = strdup (type);
+  if (type_copy == NULL)
+  {
+    ERROR ("utils_vl_lookup: strdup failed.");
+    return (NULL);
+  }
+
+  by_type = malloc (sizeof (*by_type));
+  if (by_type == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    sfree (type_copy);
+    return (NULL);
+  }
+  memset (by_type, 0, sizeof (*by_type));
+  by_type->wildcard_plugin_list = NULL;
+  
+  by_type->by_plugin_tree = c_avl_create ((void *) strcmp);
+  if (by_type->by_plugin_tree == NULL)
+  {
+    ERROR ("utils_vl_lookup: c_avl_create failed.");
+    sfree (by_type);
+    sfree (type_copy);
+    return (NULL);
+  }
+
+  status = c_avl_insert (obj->by_type_tree,
+      /* key = */ type_copy, /* value = */ by_type);
+  assert (status <= 0); /* >0 => entry exists => race condition. */
+  if (status != 0)
+  {
+    ERROR ("utils_vl_lookup: c_avl_insert failed.");
+    c_avl_destroy (by_type->by_plugin_tree);
+    sfree (by_type);
+    sfree (type_copy);
+    return (NULL);
+  }
+  
+  return (by_type);
+} /* }}} by_type_entry_t *lu_search_by_type */
+
+static int lu_add_by_plugin (by_type_entry_t *by_type, /* {{{ */
+    identifier_t const *ident, user_class_list_t *user_class_list)
+{
+  user_class_list_t *ptr = NULL;
+
+  /* Lookup user_class_list from the per-plugin structure. If this is the first
+   * user_class to be added, the blocks return immediately. Otherwise they will
+   * set "ptr" to non-NULL. */
+  if (LU_IS_WILDCARD (ident->plugin))
+  {
+    if (by_type->wildcard_plugin_list == NULL)
+    {
+      by_type->wildcard_plugin_list = user_class_list;
+      return (0);
+    }
+
+    ptr = by_type->wildcard_plugin_list;
+  } /* if (plugin is wildcard) */
+  else /* (plugin is not wildcard) */
+  {
+    int status;
+
+    status = c_avl_get (by_type->by_plugin_tree,
+        ident->plugin, (void *) &ptr);
+
+    if (status != 0) /* plugin not yet in tree */
+    {
+      char *plugin_copy = strdup (ident->plugin);
+
+      if (plugin_copy == NULL)
+      {
+        ERROR ("utils_vl_lookup: strdup failed.");
+        sfree (user_class_list);
+        return (ENOMEM);
+      }
+
+      status = c_avl_insert (by_type->by_plugin_tree,
+          plugin_copy, user_class_list);
+      if (status != 0)
+      {
+        ERROR ("utils_vl_lookup: c_avl_insert(\"%s\") failed with status %i.",
+            plugin_copy, status);
+        sfree (plugin_copy);
+        sfree (user_class_list);
+        return (status);
+      }
+      else
+      {
+        return (0);
+      }
+    } /* if (plugin not yet in tree) */
+  } /* if (plugin is not wildcard) */
+
+  assert (ptr != NULL);
+
+  while (ptr->next != NULL)
+    ptr = ptr->next;
+  ptr->next = user_class_list;
+
+  return (0);
+} /* }}} int lu_add_by_plugin */
+
+static void lu_destroy_user_obj (lookup_t *obj, /* {{{ */
+    user_obj_t *user_obj)
+{
+  while (user_obj != NULL)
+  {
+    user_obj_t *next = user_obj->next;
+
+    if (obj->cb_free_obj != NULL)
+      obj->cb_free_obj (user_obj->user_obj);
+    user_obj->user_obj = NULL;
+
+    sfree (user_obj);
+    user_obj = next;
+  }
+} /* }}} void lu_destroy_user_obj */
+
+static void lu_destroy_user_class_list (lookup_t *obj, /* {{{ */
+    user_class_list_t *user_class_list)
+{
+  while (user_class_list != NULL)
+  {
+    user_class_list_t *next = user_class_list->next;
+
+    if (obj->cb_free_class != NULL)
+      obj->cb_free_class (user_class_list->entry.user_class);
+    user_class_list->entry.user_class = NULL;
+
+    lu_destroy_user_obj (obj, user_class_list->entry.user_obj_list);
+    user_class_list->entry.user_obj_list = NULL;
+
+    sfree (user_class_list);
+    user_class_list = next;
+  }
+} /* }}} void lu_destroy_user_class_list */
+
+static void lu_destroy_by_type (lookup_t *obj, /* {{{ */
+    by_type_entry_t *by_type)
+{
+  
+  while (42)
+  {
+    char *plugin = NULL;
+    user_class_list_t *user_class_list = NULL;
+    int status;
+
+    status = c_avl_pick (by_type->by_plugin_tree,
+        (void *) &plugin, (void *) &user_class_list);
+    if (status != 0)
+      break;
+
+    DEBUG ("utils_vl_lookup: lu_destroy_by_type: Destroying plugin \"%s\".",
+        plugin);
+    sfree (plugin);
+    lu_destroy_user_class_list (obj, user_class_list);
+  }
+
+  c_avl_destroy (by_type->by_plugin_tree);
+  by_type->by_plugin_tree = NULL;
+
+  lu_destroy_user_class_list (obj, by_type->wildcard_plugin_list);
+  by_type->wildcard_plugin_list = NULL;
+
+  sfree (by_type);
+} /* }}} int lu_destroy_by_type */
+
+/*
+ * Public functions
+ */
+lookup_t *lookup_create (lookup_class_callback_t cb_user_class, /* {{{ */
+    lookup_obj_callback_t cb_user_obj,
+    lookup_free_class_callback_t cb_free_class,
+    lookup_free_obj_callback_t cb_free_obj)
+{
+  lookup_t *obj = malloc (sizeof (*obj));
+  if (obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (NULL);
+  }
+  memset (obj, 0, sizeof (*obj));
+
+  obj->by_type_tree = c_avl_create ((void *) strcmp);
+  if (obj->by_type_tree == NULL)
+  {
+    ERROR ("utils_vl_lookup: c_avl_create failed.");
+    sfree (obj);
+    return (NULL);
+  }
+
+  obj->cb_user_class = cb_user_class;
+  obj->cb_user_obj = cb_user_obj;
+  obj->cb_free_class = cb_free_class;
+  obj->cb_free_obj = cb_free_obj;
+
+  return (obj);
+} /* }}} lookup_t *lookup_create */
+
+void lookup_destroy (lookup_t *obj) /* {{{ */
+{
+  int status;
+
+  if (obj == NULL)
+    return;
+
+  while (42)
+  {
+    char *type = NULL;
+    by_type_entry_t *by_type = NULL;
+
+    status = c_avl_pick (obj->by_type_tree, (void *) &type, (void *) &by_type);
+    if (status != 0)
+      break;
+
+    DEBUG ("utils_vl_lookup: lookup_destroy: Destroying type \"%s\".", type);
+    sfree (type);
+    lu_destroy_by_type (obj, by_type);
+  }
+
+  c_avl_destroy (obj->by_type_tree);
+  obj->by_type_tree = NULL;
+
+  sfree (obj);
+} /* }}} void lookup_destroy */
+
+int lookup_add (lookup_t *obj, /* {{{ */
+    identifier_t const *ident, void *user_class)
+{
+  by_type_entry_t *by_type = NULL;
+  user_class_list_t *user_class_obj;
+
+  by_type = lu_search_by_type (obj, ident->type, /* allocate = */ 1);
+  if (by_type == NULL)
+    return (-1);
+
+  user_class_obj = malloc (sizeof (*user_class_obj));
+  if (user_class_obj == NULL)
+  {
+    ERROR ("utils_vl_lookup: malloc failed.");
+    return (ENOMEM);
+  }
+  memset (user_class_obj, 0, sizeof (*user_class_obj));
+  user_class_obj->entry.user_class = user_class;
+  memmove (&user_class_obj->entry.ident, ident, sizeof (*ident));
+  user_class_obj->entry.user_obj_list = NULL;
+  user_class_obj->next = NULL;
+
+  return (lu_add_by_plugin (by_type, ident, user_class_obj));
+} /* }}} int lookup_add */
+
+/* returns the number of successful calls to the callback function */
+int lookup_search (lookup_t *obj, /* {{{ */
+    data_set_t const *ds, value_list_t const *vl)
+{
+  by_type_entry_t *by_type = NULL;
+  user_class_list_t *user_class_list = NULL;
+  int retval = 0;
+  int status;
+
+  if ((obj == NULL) || (ds == NULL) || (vl == NULL))
+    return (-EINVAL);
+
+  by_type = lu_search_by_type (obj, vl->type, /* allocate = */ 0);
+  if (by_type == NULL)
+    return (0);
+
+  status = c_avl_get (by_type->by_plugin_tree,
+      vl->plugin, (void *) &user_class_list);
+  if (status == 0)
+  {
+    status = lu_handle_user_class_list (obj, ds, vl, user_class_list);
+    if (status < 0)
+      return (status);
+    retval += status;
+  }
+
+  if (by_type->wildcard_plugin_list != NULL)
+  {
+    status = lu_handle_user_class_list (obj, ds, vl,
+        by_type->wildcard_plugin_list);
+    if (status < 0)
+      return (status);
+    retval += status;
+  }
+    
+  return (retval);
+} /* }}} lookup_search */
diff --git a/src/utils_vl_lookup.h b/src/utils_vl_lookup.h
new file mode 100644 (file)
index 0000000..c006fc7
--- /dev/null
@@ -0,0 +1,90 @@
+/**
+ * collectd - src/utils_vl_lookup.h
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#ifndef UTILS_VL_LOOKUP_H
+#define UTILS_VL_LOOKUP_H 1
+
+#include "plugin.h"
+
+/*
+ * Types
+ */
+struct lookup_s;
+typedef struct lookup_s lookup_t;
+
+/* Given a user_class, constructs a new user_obj. */
+typedef void *(*lookup_class_callback_t) (data_set_t const *ds,
+    value_list_t const *vl, void *user_class);
+
+/* Given a user_class and a ds/vl combination, does stuff with the data.
+ * This is the main working horse of the module. */
+typedef int (*lookup_obj_callback_t) (data_set_t const *ds,
+    value_list_t const *vl,
+    void *user_class, void *user_obj);
+
+/* Used to free user_class pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_class_callback_t) (void *user_class);
+
+/* Used to free user_obj pointers. May be NULL in which case nothing is
+ * freed. */
+typedef void (*lookup_free_obj_callback_t) (void *user_obj);
+
+struct identifier_s
+{
+  char host[DATA_MAX_NAME_LEN];
+  char plugin[DATA_MAX_NAME_LEN];
+  char plugin_instance[DATA_MAX_NAME_LEN];
+  char type[DATA_MAX_NAME_LEN];
+  char type_instance[DATA_MAX_NAME_LEN];
+};
+typedef struct identifier_s identifier_t;
+
+#define LU_ANY "/any/"
+#define LU_ALL "/all/"
+
+#define LU_IS_ANY(str) (strcmp (str, LU_ANY) == 0)
+#define LU_IS_ALL(str) (strcmp (str, LU_ALL) == 0)
+#define LU_IS_WILDCARD(str) (LU_IS_ANY(str) || LU_IS_ALL(str))
+
+/*
+ * Functions
+ */
+__attribute__((nonnull(1,2)))
+lookup_t *lookup_create (lookup_class_callback_t,
+    lookup_obj_callback_t,
+    lookup_free_class_callback_t,
+    lookup_free_obj_callback_t);
+void lookup_destroy (lookup_t *obj);
+
+int lookup_add (lookup_t *obj,
+    identifier_t const *ident, void *user_class);
+
+/* TODO(octo): Pass lookup_obj_callback_t to lookup_search()? */
+int lookup_search (lookup_t *obj,
+    data_set_t const *ds, value_list_t const *vl);
+
+#endif /* UTILS_VL_LOOKUP_H */
diff --git a/src/utils_vl_lookup_test.c b/src/utils_vl_lookup_test.c
new file mode 100644 (file)
index 0000000..6265b32
--- /dev/null
@@ -0,0 +1,214 @@
+/**
+ * collectd - src/utils_vl_lookup_test.c
+ * Copyright (C) 2012  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "utils_vl_lookup.h"
+
+static _Bool expect_new_obj = 0;
+static _Bool have_new_obj = 0;
+
+static identifier_t last_class_ident;
+static identifier_t last_obj_ident;
+
+static data_source_t dsrc_test = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_test = { "test", 1, &dsrc_test };
+
+static data_source_t dsrc_unknown = { "value", DS_TYPE_DERIVE, 0.0, NAN };
+static data_set_t const ds_unknown = { "unknown", 1, &dsrc_unknown };
+
+static int lookup_obj_callback (data_set_t const *ds,
+    value_list_t const *vl,
+    void *user_class, void *user_obj)
+{
+  identifier_t *class = user_class;
+  identifier_t *obj = user_obj;
+
+  assert (expect_new_obj == have_new_obj);
+
+  memcpy (&last_class_ident, class, sizeof (last_class_ident));
+  memcpy (&last_obj_ident, obj, sizeof (last_obj_ident));
+
+  if (strcmp (obj->plugin_instance, "failure") == 0)
+    return (-1);
+
+  return (0);
+}
+
+static void *lookup_class_callback (data_set_t const *ds,
+    value_list_t const *vl, void *user_class)
+{
+  identifier_t *class = user_class;
+  identifier_t *obj;
+
+  assert (expect_new_obj);
+
+  memcpy (&last_class_ident, class, sizeof (last_class_ident));
+  
+  obj = malloc (sizeof (*obj));
+  strncpy (obj->host, vl->host, sizeof (obj->host));
+  strncpy (obj->plugin, vl->plugin, sizeof (obj->plugin));
+  strncpy (obj->plugin_instance, vl->plugin_instance, sizeof (obj->plugin_instance));
+  strncpy (obj->type, vl->type, sizeof (obj->type));
+  strncpy (obj->type_instance, vl->type_instance, sizeof (obj->type_instance));
+
+  have_new_obj = 1;
+
+  return ((void *) obj);
+}
+
+static void checked_lookup_add (lookup_t *obj, /* {{{ */
+    char const *host,
+    char const *plugin, char const *plugin_instance,
+    char const *type, char const *type_instance)
+{
+  identifier_t ident;
+  void *user_class;
+  int status;
+
+  memset (&ident, 0, sizeof (ident));
+  strncpy (ident.host, host, sizeof (ident.host));
+  strncpy (ident.plugin, plugin, sizeof (ident.plugin));
+  strncpy (ident.plugin_instance, plugin_instance, sizeof (ident.plugin_instance));
+  strncpy (ident.type, type, sizeof (ident.type));
+  strncpy (ident.type_instance, type_instance, sizeof (ident.type_instance));
+
+  user_class = malloc (sizeof (ident));
+  memmove (user_class, &ident, sizeof (ident));
+
+  status = lookup_add (obj, &ident, user_class);
+  assert (status == 0);
+} /* }}} void test_add */
+
+static int checked_lookup_search (lookup_t *obj,
+    char const *host,
+    char const *plugin, char const *plugin_instance,
+    char const *type, char const *type_instance,
+    _Bool expect_new)
+{
+  int status;
+  value_list_t vl = VALUE_LIST_STATIC;
+  data_set_t const *ds = &ds_unknown;
+
+  strncpy (vl.host, host, sizeof (vl.host));
+  strncpy (vl.plugin, plugin, sizeof (vl.plugin));
+  strncpy (vl.plugin_instance, plugin_instance, sizeof (vl.plugin_instance));
+  strncpy (vl.type, type, sizeof (vl.type));
+  strncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+  if (strcmp (vl.type, "test") == 0)
+    ds = &ds_test;
+
+  expect_new_obj = expect_new;
+  have_new_obj = 0;
+
+  status = lookup_search (obj, ds, &vl);
+  return (status);
+}
+
+static lookup_t *checked_lookup_create (void)
+{
+  lookup_t *obj = lookup_create (
+      lookup_class_callback,
+      lookup_obj_callback,
+      (void *) free,
+      (void *) free);
+  assert (obj != NULL);
+  return (obj);
+}
+
+static void testcase0 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+
+  checked_lookup_add (obj, "/any/", "test", "", "test", "/all/");
+  checked_lookup_search (obj, "host0", "test", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host0", "test", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "test", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host1", "test", "", "test", "1",
+      /* expect new = */ 0);
+
+  lookup_destroy (obj);
+}
+
+static void testcase1 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+
+  checked_lookup_add (obj, "/any/", "/all/", "/all/", "test", "/all/");
+  checked_lookup_search (obj, "host0", "plugin0", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host0", "plugin0", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host0", "plugin1", "", "test", "0",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host0", "plugin1", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin0", "", "test", "0",
+      /* expect new = */ 1);
+  checked_lookup_search (obj, "host1", "plugin0", "", "test", "1",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin1", "", "test", "0",
+      /* expect new = */ 0);
+  checked_lookup_search (obj, "host1", "plugin1", "", "test", "1",
+      /* expect new = */ 0);
+
+  lookup_destroy (obj);
+}
+
+static void testcase2 (void)
+{
+  lookup_t *obj = checked_lookup_create ();
+  int status;
+
+  checked_lookup_add (obj, "/any/", "plugin0", "", "test", "/all/");
+  checked_lookup_add (obj, "/any/", "/all/", "", "test", "ti0");
+
+  status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "",
+      /* expect new = */ 0);
+  assert (status == 0);
+  status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "",
+      /* expect new = */ 1);
+  assert (status == 1);
+  status = checked_lookup_search (obj, "host0", "plugin1", "", "test", "ti0",
+      /* expect new = */ 1);
+  assert (status == 1);
+  status = checked_lookup_search (obj, "host0", "plugin0", "", "test", "ti0",
+      /* expect new = */ 0);
+  assert (status == 2);
+
+  lookup_destroy (obj);
+}
+
+int main (int argc, char **argv) /* {{{ */
+{
+  testcase0 ();
+  testcase1 ();
+  testcase2 ();
+  return (EXIT_SUCCESS);
+} /* }}} int main */
index d6583a7..2ae30ef 100644 (file)
@@ -47,6 +47,7 @@
 
 #include "utils_cache.h"
 #include "utils_parse_option.h"
+#include "utils_format_graphite.h"
 
 /* Folks without pthread will need to disable this plugin. */
 #include <pthread.h>
@@ -284,175 +285,12 @@ static int wg_flush (cdtime_t timeout,
     return (status);
 }
 
-static int wg_format_values (char *ret, size_t ret_len,
-        int ds_num, const data_set_t *ds, const value_list_t *vl,
-        _Bool store_rates)
-{
-    size_t offset = 0;
-    int status;
-    gauge_t *rates = NULL;
-
-    assert (0 == strcmp (ds->type, vl->type));
-
-    memset (ret, 0, ret_len);
-
-#define BUFFER_ADD(...) do { \
-    status = ssnprintf (ret + offset, ret_len - offset, \
-            __VA_ARGS__); \
-    if (status < 1) \
-    { \
-        sfree (rates); \
-        return (-1); \
-    } \
-    else if (((size_t) status) >= (ret_len - offset)) \
-    { \
-        sfree (rates); \
-        return (-1); \
-    } \
-    else \
-    offset += ((size_t) status); \
-} while (0)
-
-    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
-        BUFFER_ADD ("%f", vl->values[ds_num].gauge);
-    else if (store_rates)
-    {
-        if (rates == NULL)
-            rates = uc_get_rate (ds, vl);
-        if (rates == NULL)
-        {
-            WARNING ("format_values: "
-                    "uc_get_rate failed.");
-            return (-1);
-        }
-        BUFFER_ADD ("%g", rates[ds_num]);
-    }
-    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
-        BUFFER_ADD ("%llu", vl->values[ds_num].counter);
-    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
-        BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
-    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
-        BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
-    else
-    {
-        ERROR ("format_values plugin: Unknown data source type: %i",
-                ds->ds[ds_num].type);
-        sfree (rates);
-        return (-1);
-    }
-
-#undef BUFFER_ADD
-
-    sfree (rates);
-    return (0);
-}
-
-static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len,
-    char escape_char)
-{
-    size_t i;
-
-    memset (dst, 0, dst_len);
-
-    if (src == NULL)
-        return;
-
-    for (i = 0; i < dst_len; i++)
-    {
-        if (src[i] == 0)
-        {
-            dst[i] = 0;
-            break;
-        }
-
-        if ((src[i] == '.')
-                || isspace ((int) src[i])
-                || iscntrl ((int) src[i]))
-            dst[i] = escape_char;
-        else
-            dst[i] = src[i];
-    }
-}
-
-static int wg_format_name (char *ret, int ret_len,
-        const value_list_t *vl,
-        const struct wg_callback *cb,
-        const char *ds_name)
-{
-    char n_host[DATA_MAX_NAME_LEN];
-    char n_plugin[DATA_MAX_NAME_LEN];
-    char n_plugin_instance[DATA_MAX_NAME_LEN];
-    char n_type[DATA_MAX_NAME_LEN];
-    char n_type_instance[DATA_MAX_NAME_LEN];
-
-    char *prefix;
-    char *postfix;
-
-    char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
-    char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
-
-    prefix = cb->prefix;
-    if (prefix == NULL)
-        prefix = "";
-
-    postfix = cb->postfix;
-    if (postfix == NULL)
-        postfix = "";
-
-    wg_copy_escape_part (n_host, vl->host,
-            sizeof (n_host), cb->escape_char);
-    wg_copy_escape_part (n_plugin, vl->plugin,
-            sizeof (n_plugin), cb->escape_char);
-    wg_copy_escape_part (n_plugin_instance, vl->plugin_instance,
-            sizeof (n_plugin_instance), cb->escape_char);
-    wg_copy_escape_part (n_type, vl->type,
-            sizeof (n_type), cb->escape_char);
-    wg_copy_escape_part (n_type_instance, vl->type_instance,
-            sizeof (n_type_instance), cb->escape_char);
-
-    if (n_plugin_instance[0] != '\0')
-        ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
-            n_plugin,
-            cb->separate_instances ? '.' : '-',
-            n_plugin_instance);
-    else
-        sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
-
-    if (n_type_instance[0] != '\0')
-        ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
-            n_type,
-            cb->separate_instances ? '.' : '-',
-            n_type_instance);
-    else
-        sstrncpy (tmp_type, n_type, sizeof (tmp_type));
-
-    if (ds_name != NULL)
-        ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
-            prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
-    else
-        ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
-            prefix, n_host, postfix, tmp_plugin, tmp_type);
-
-    return (0);
-}
-
-static int wg_send_message (const char* key, const char* value,
-        cdtime_t time, struct wg_callback *cb)
+static int wg_send_message (char const *message, struct wg_callback *cb)
 {
     int status;
     size_t message_len;
-    char message[1024];
-
-    message_len = (size_t) ssnprintf (message, sizeof (message),
-            "%s %s %u\r\n",
-            key,
-            value,
-            (unsigned int) CDTIME_T_TO_TIME_T (time));
-    if (message_len >= sizeof (message)) {
-        ERROR ("write_graphite plugin: message buffer too small: "
-                "Need %zu bytes.", message_len + 1);
-        return (-1);
-    }
+
+    message_len = strlen (message);
 
     pthread_mutex_lock (&cb->send_lock);
 
@@ -502,10 +340,8 @@ static int wg_send_message (const char* key, const char* value,
 static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
         struct wg_callback *cb)
 {
-    char key[10*DATA_MAX_NAME_LEN];
-    char values[512];
-
-    int status, i;
+    char buffer[4096];
+    int status;
 
     if (0 != strcmp (ds->type, vl->type))
     {
@@ -514,45 +350,22 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
         return -1;
     }
 
-    for (i = 0; i < ds->ds_num; i++)
-    {
-        const char *ds_name = NULL;
-
-        if (cb->always_append_ds || (ds->ds_num > 1))
-            ds_name = ds->ds[i].name;
-
-        /* Copy the identifier to `key' and escape it. */
-        status = wg_format_name (key, sizeof (key), vl, cb, ds_name);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with format_name");
-            return (status);
-        }
-
-        escape_string (key, sizeof (key));
-        /* Convert the values to an ASCII representation and put that into
-         * `values'. */
-        status = wg_format_values (values, sizeof (values), i, ds, vl,
-                    cb->store_rates);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with "
-                    "wg_format_values");
-            return (status);
-        }
+    memset (buffer, 0, sizeof (buffer));
+    status = format_graphite (buffer, sizeof (buffer), ds, vl,
+            cb->prefix, cb->postfix, cb->escape_char, cb->store_rates);
+    if (status != 0) /* error message has been printed already. */
+        return (status);
 
-        /* Send the message to graphite */
-        status = wg_send_message (key, values, vl->time, cb);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with "
-                    "wg_send_message");
-            return (status);
-        }
+    wg_send_message (buffer, cb);
+    if (status != 0)
+    {
+        ERROR ("write_graphite plugin: wg_send_message failed "
+                "with status %i.", status);
+        return (status);
     }
 
     return (0);
-}
+} /* int wg_write_messages */
 
 static int wg_write (const data_set_t *ds, const value_list_t *vl,
         user_data_t *user_data)