Merge branch 'ff/highres'
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 17 Nov 2010 14:18:03 +0000 (15:18 +0100)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Wed, 17 Nov 2010 14:20:11 +0000 (15:20 +0100)
Conflicts:
src/netapp.c

25 files changed:
AUTHORS
README
configure.in
contrib/collection3/etc/collection.conf
contrib/collection3/lib/Collectd/Graph/Common.pm
src/Makefile.am
src/amqp.c [new file with mode: 0644]
src/collectd-python.pod
src/collectd.conf.in
src/collectd.conf.pod
src/collectd.h
src/curl_json.c
src/lpar.c [new file with mode: 0644]
src/match_value.c
src/netapp.c
src/network.c
src/notify_email.c
src/python.c
src/target_v5upgrade.c
src/types.db
src/unixsock.c
src/utils_cache.c
src/utils_match.c
src/utils_tail.c
src/zfs_arc.c

diff --git a/AUTHORS b/AUTHORS
index e83c2f8..c57f90b 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -34,6 +34,10 @@ Anthony Gialluca <tonyabg at charter.net>
 Antony Dovgal <tony at daylessday.org>
  - memcached plugin.
 
+Aurélien Reynaud <collectd at wattapower.net>
+ - LPAR plugin.
+ - Various fixes for AIX, HP-UX and Solaris.
+
 Bruno Prémont <bonbons at linux-vserver.org>
  - BIND plugin.
  - Many bugreports and -fixes in various plugins,
@@ -162,6 +166,9 @@ Rodolphe Quiédeville <rquiedeville at bearstech.com>
 Scott Garrett <sgarrett at technomancer.com>
  - tape plugin.
 
+Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ - AMQP plugin.
+
 Simon Kuhnle <simon at blarzwurst.de>
  - OpenBSD code for the cpu and memory plugins.
 
diff --git a/README b/README
index 0c7a422..2ed8934 100644 (file)
--- a/README
+++ b/README
@@ -125,6 +125,10 @@ Features
     - load
       System load average over the last 1, 5 and 15 minutes.
 
+    - lpar
+      Detailed CPU statistics of the “Logical Partitions” virtualization
+      technique built into IBM's POWER processors.
+
     - libvirt
       CPU, disk and network I/O statistics from virtual machines.
 
@@ -315,6 +319,10 @@ Features
   * Output can be written or sent to various destinations by the following
     plugins:
 
+    - amqp
+      Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
+      server, such as RabbitMQ.
+
     - csv
       Write to comma separated values (CSV) files. This needs lots of
       diskspace but is extremely portable and can be analysed with almost
@@ -597,6 +605,9 @@ Prerequisites
     Used to capture packets by the `dns' plugin.
     <http://www.tcpdump.org/>
 
+  * libperfstat (optional)
+    Used by various plugins to gather statistics under AIX.
+
   * libperl (optional)
     Obviously used by the `perl' plugin. The library has to be compiled with
     ithread support (introduced in Perl 5.6.0).
@@ -615,6 +626,10 @@ Prerequisites
     Used by the `python' plugin. Currently, only 2.3 ≦ Python < 3 is supported.
     <http://www.python.org/>
 
+  * librabbitmq (optional; also called “rabbitmq-c”)
+    Used by the AMQP plugin for AMQP connections, for example to RabbitMQ.
+    <http://hg.rabbitmq.com/rabbitmq-c/>
+
   * librouteros (optional)
     Used by the `routeros' plugin to connect to a device running `RouterOS'.
     <http://verplant.org/librouteros/>
index 48942b4..fd6a257 100644 (file)
@@ -91,6 +91,10 @@ if test "x$ac_system" = "xSolaris"
 then
        AC_DEFINE(_POSIX_PTHREAD_SEMANTICS, 1, [Define to enforce POSIX thread semantics under Solaris.])
 fi
+if test "x$ac_system" = "xAIX"
+then
+       AC_DEFINE(_THREAD_SAFE_ERRNO, 1, [Define to use the thread-safe version of errno under AIX.])
+fi
 
 # Where to install .pc files.
 pkgconfigdir="${libdir}/pkgconfig"
@@ -1204,6 +1208,12 @@ fi
 if test "x$with_perfstat" = "xyes"
 then
         AC_DEFINE(HAVE_PERFSTAT, 1, [Define to 1 if you have the 'perfstat' library (-lperfstat)])
+        # struct members pertaining to donation have been added to libperfstat somewhere between AIX5.3ML5 and AIX5.3ML9
+        AC_CHECK_MEMBER([perfstat_partition_type_t.b.donate_enabled], [], [], [[#include <libperfstat.h]])
+        if test "x$av_cv_member_perfstat_partition_type_t_b_donate_enabled" = "xyes"
+        then
+               AC_DEFINE(PERFSTAT_SUPPORTS_DONATION, 1, [Define to 1 if your version of the 'perfstat' library supports donation])
+        fi
 fi
 AM_CONDITIONAL(BUILD_WITH_PERFSTAT, test "x$with_perfstat" = "xyes")
 
@@ -3120,6 +3130,57 @@ then
 fi
 # }}} --with-python
 
+# --with-librabbitmq {{{
+with_librabbitmq_cppflags=""
+with_librabbitmq_ldflags=""
+AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
+[
+       if test "x$withval" != "xno" && test "x$withval" != "xyes"
+       then
+               with_librabbitmq_cppflags="-I$withval/include"
+               with_librabbitmq_ldflags="-L$withval/lib"
+               with_librabbitmq="yes"
+       else
+               with_librabbitmq="$withval"
+       fi
+],
+[
+       with_librabbitmq="yes"
+])
+if test "x$with_librabbitmq" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+
+       AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       SAVE_LDFLAGS="$LDFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+       LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+
+       AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+       LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+       BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
+       BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
+       BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq"
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+       AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
+       AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+# }}}
+
 # --with-librouteros {{{
 AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])],
 [
@@ -4473,6 +4534,7 @@ AC_ARG_ENABLE([all-plugins],
 
 m4_divert_once([HELP_ENABLE], [])
 
+AC_PLUGIN([amqp],        [$with_librabbitmq],  [AMQP output plugin])
 AC_PLUGIN([apache],      [$with_libcurl],      [Apache httpd statistics])
 AC_PLUGIN([apcups],      [yes],                [Statistics of UPSes by APC])
 AC_PLUGIN([apple_sensors], [$with_libiokit],   [Apple's hardware sensors])
@@ -4507,6 +4569,7 @@ AC_PLUGIN([java],        [$with_java],         [Embed the Java Virtual Machine])
 AC_PLUGIN([libvirt],     [$plugin_libvirt],    [Virtual machine statistics])
 AC_PLUGIN([load],        [$plugin_load],       [System load])
 AC_PLUGIN([logfile],     [yes],                [File logging plugin])
+AC_PLUGIN([lpar],        [$with_perfstat],     [AIX logical partitions statistics])
 AC_PLUGIN([madwifi],     [$have_linux_wireless_h], [Madwifi wireless statistics])
 AC_PLUGIN([match_empty_counter], [yes],        [The empty counter match])
 AC_PLUGIN([match_hashed], [yes],               [The hashed match])
@@ -4772,6 +4835,7 @@ Configuration:
     libperl . . . . . . . $with_libperl
     libpq . . . . . . . . $with_libpq
     libpthread  . . . . . $with_libpthread
+    librabbitmq . . . . . $with_librabbitmq
     librouteros . . . . . $with_librouteros
     librrd  . . . . . . . $with_librrd
     libsensors  . . . . . $with_libsensors
@@ -4796,6 +4860,7 @@ Configuration:
     perl  . . . . . . . . $with_perl_bindings
 
   Modules:
+    amqp    . . . . . . . $enable_amqp
     apache  . . . . . . . $enable_apache
     apcups  . . . . . . . $enable_apcups
     apple_sensors . . . . $enable_apple_sensors
@@ -4830,6 +4895,7 @@ Configuration:
     libvirt . . . . . . . $enable_libvirt
     load  . . . . . . . . $enable_load
     logfile . . . . . . . $enable_logfile
+    lpar... . . . . . . . $enable_lpar
     madwifi . . . . . . . $enable_madwifi
     match_empty_counter . $enable_match_empty_counter
     match_hashed  . . . . $enable_match_hashed
index 9c5e3d1..3bb3d8b 100644 (file)
@@ -567,6 +567,16 @@ GraphWidth 400
 <Type ps_cputime>
   Module PsCputime
 </Type>
+<Type ps_disk_octets>
+  Module GenericIO
+  DataSources read write
+  DSName "read Read   "
+  DSName write Written
+  RRDTitle "Process disk traffic ({instance})"
+  RRDVerticalLabel "Bytes per second"
+# RRDOptions ...
+  RRDFormat "%5.1lf%s"
+</Type>
 <Type ps_rss>
   DataSources value
   DSName value RSS
index f88c22b..c6e2508 100644 (file)
@@ -106,7 +106,9 @@ sub group_files_by_plugin_instance
   for (my $i = 0; $i < @files; $i++)
   {
     my $file = $files[$i];
-    my $key = $file->{'plugin_instance'} || '';
+    my $key1 = $file->{'hostname'} || '';
+    my $key2 = $file->{'plugin_instance'} || '';
+    my $key = "$key1-$key2";
 
     $data->{$key} ||= [];
     push (@{$data->{$key}}, $file);
index 69329bf..1a7ba5b 100644 (file)
@@ -124,6 +124,18 @@ pkglib_LTLIBRARIES =
 BUILT_SOURCES = 
 CLEANFILES = 
 
+if BUILD_PLUGIN_AMQP
+pkglib_LTLIBRARIES += amqp.la
+amqp_la_SOURCES = amqp.c \
+                 utils_cmd_putval.c utils_cmd_putval.h \
+                 utils_format_json.c utils_format_json.h
+amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
+collectd_LDADD += "-dlopen" amqp.la
+collectd_DEPENDENCIES += amqp.la
+endif
+
 if BUILD_PLUGIN_APACHE
 pkglib_LTLIBRARIES += apache.la
 apache_la_SOURCES = apache.c
@@ -513,6 +525,15 @@ collectd_LDADD += "-dlopen" logfile.la
 collectd_DEPENDENCIES += logfile.la
 endif
 
+if BUILD_PLUGIN_LPAR
+pkglib_LTLIBRARIES += lpar.la
+lpar_la_SOURCES = lpar.c
+lpar_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" lpar.la
+collectd_DEPENDENCIES += lpar.la
+lpar_la_LIBADD = -lperfstat
+endif
+
 if BUILD_PLUGIN_MADWIFI
 pkglib_LTLIBRARIES += madwifi.la
 madwifi_la_SOURCES = madwifi.c madwifi.h
diff --git a/src/amqp.c b/src/amqp.c
new file mode 100644 (file)
index 0000000..f0abd44
--- /dev/null
@@ -0,0 +1,939 @@
+/**
+ * collectd - src/amqp.c
+ * Copyright (C) 2009  Sebastien Pahl
+ * Copyright (C) 2010  Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ *   Florian Forster <octo at verplant.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_format_json.h"
+
+#include <pthread.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+/* Defines for the delivery mode. I have no idea why they're not defined by the
+ * library.. */
+#define CAMQP_DM_VOLATILE   1
+#define CAMQP_DM_PERSISTENT 2
+
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON    2
+
+#define CAMQP_CHANNEL 1
+
+/*
+ * Data types
+ */
+struct camqp_config_s
+{
+    _Bool   publish;
+    char   *name;
+
+    char   *host;
+    int     port;
+    char   *vhost;
+    char   *user;
+    char   *password;
+
+    char   *exchange;
+    char   *routing_key;
+
+    /* publish only */
+    uint8_t delivery_mode;
+    _Bool   store_rates;
+    int     format;
+
+    /* subscribe only */
+    char   *exchange_type;
+    char   *queue;
+
+    amqp_connection_state_t connection;
+    pthread_mutex_t lock;
+};
+typedef struct camqp_config_s camqp_config_t;
+
+/*
+ * Global variables
+ */
+static const char *def_host       = "localhost";
+static const char *def_vhost      = "/";
+static const char *def_user       = "guest";
+static const char *def_password   = "guest";
+static const char *def_exchange   = "amq.fanout";
+
+static pthread_t *subscriber_threads     = NULL;
+static size_t     subscriber_threads_num = 0;
+static _Bool      subscriber_threads_running = 1;
+
+#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
+
+/*
+ * Functions
+ */
+static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
+{
+    int sockfd;
+
+    if ((conf == NULL) || (conf->connection == NULL))
+        return;
+
+    sockfd = amqp_get_sockfd (conf->connection);
+    amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
+    amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+    amqp_destroy_connection (conf->connection);
+    close (sockfd);
+    conf->connection = NULL;
+} /* }}} void camqp_close_connection */
+
+static void camqp_config_free (void *ptr) /* {{{ */
+{
+    camqp_config_t *conf = ptr;
+
+    if (conf == NULL)
+        return;
+
+    camqp_close_connection (conf);
+
+    sfree (conf->name);
+    sfree (conf->host);
+    sfree (conf->vhost);
+    sfree (conf->user);
+    sfree (conf->password);
+    sfree (conf->exchange);
+    sfree (conf->exchange_type);
+    sfree (conf->queue);
+    sfree (conf->routing_key);
+
+    sfree (conf);
+} /* }}} void camqp_config_free */
+
+static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
+{
+    char *ret;
+
+    if ((in == NULL) || (in->bytes == NULL))
+        return (NULL);
+
+    ret = malloc (in->len + 1);
+    if (ret == NULL)
+        return (NULL);
+
+    memcpy (ret, in->bytes, in->len);
+    ret[in->len] = 0;
+
+    return (ret);
+} /* }}} char *camqp_bytes_cstring */
+
+static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
+{
+    amqp_rpc_reply_t r;
+
+    r = amqp_get_rpc_reply (conf->connection);
+    if (r.reply_type == AMQP_RESPONSE_NORMAL)
+        return (0);
+
+    return (1);
+} /* }}} _Bool camqp_is_error */
+
+static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
+        char *buffer, size_t buffer_size)
+{
+    amqp_rpc_reply_t r;
+
+    r = amqp_get_rpc_reply (conf->connection);
+    switch (r.reply_type)
+    {
+        case AMQP_RESPONSE_NORMAL:
+            sstrncpy (buffer, "Success", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_NONE:
+            sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+            if (r.library_errno)
+                return (sstrerror (r.library_errno, buffer, buffer_size));
+            else
+                sstrncpy (buffer, "End of stream", sizeof (buffer));
+            break;
+
+        case AMQP_RESPONSE_SERVER_EXCEPTION:
+            if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
+            {
+                amqp_connection_close_t *m = r.reply.decoded;
+                char *tmp = camqp_bytes_cstring (&m->reply_text);
+                ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
+                        m->reply_code, tmp);
+                sfree (tmp);
+            }
+            else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
+            {
+                amqp_channel_close_t *m = r.reply.decoded;
+                char *tmp = camqp_bytes_cstring (&m->reply_text);
+                ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
+                        m->reply_code, tmp);
+                sfree (tmp);
+            }
+            else
+            {
+                ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
+                        r.reply.id);
+            }
+            break;
+
+        default:
+            ssnprintf (buffer, buffer_size, "Unknown reply type %i",
+                    (int) r.reply_type);
+    }
+
+    return (buffer);
+} /* }}} char *camqp_strerror */
+
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+    amqp_exchange_declare_ok_t *ed_ret;
+
+    if (conf->exchange_type == NULL)
+        return (0);
+
+    ed_ret = amqp_exchange_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+            /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if ((ed_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    INFO ("amqp plugin: Successfully created exchange \"%s\" "
+            "with type \"%s\".",
+            conf->exchange, conf->exchange_type);
+
+    return (0);
+} /* }}} int camqp_create_exchange */
+
+static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
+{
+    amqp_queue_declare_ok_t *qd_ret;
+    amqp_basic_consume_ok_t *cm_ret;
+
+    qd_ret = amqp_queue_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* queue       = */ (conf->queue != NULL)
+            ? amqp_cstring_bytes (conf->queue)
+            : AMQP_EMPTY_BYTES,
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* exclusive   = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if (qd_ret == NULL)
+    {
+        ERROR ("amqp plugin: amqp_queue_declare failed.");
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    if (conf->queue == NULL)
+    {
+        conf->queue = camqp_bytes_cstring (&qd_ret->queue);
+        if (conf->queue == NULL)
+        {
+            ERROR ("amqp plugin: camqp_bytes_cstring failed.");
+            camqp_close_connection (conf);
+            return (-1);
+        }
+
+        INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
+    }
+    DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
+
+    /* bind to an exchange */
+    if (conf->exchange != NULL)
+    {
+        amqp_queue_bind_ok_t *qb_ret;
+
+        assert (conf->queue != NULL);
+        qb_ret = amqp_queue_bind (conf->connection,
+                /* channel     = */ CAMQP_CHANNEL,
+                /* queue       = */ amqp_cstring_bytes (conf->queue),
+                /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+                /* routing_key = */ (conf->routing_key != NULL)
+                ? amqp_cstring_bytes (conf->routing_key)
+                : AMQP_EMPTY_BYTES,
+                /* arguments   = */ AMQP_EMPTY_TABLE);
+        if ((qb_ret == NULL) && camqp_is_error (conf))
+        {
+            char errbuf[1024];
+            ERROR ("amqp plugin: amqp_queue_bind failed: %s",
+                    camqp_strerror (conf, errbuf, sizeof (errbuf)));
+            camqp_close_connection (conf);
+            return (-1);
+        }
+
+        DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
+                conf->queue, conf->exchange);
+    } /* if (conf->exchange != NULL) */
+
+    cm_ret = amqp_basic_consume (conf->connection,
+            /* channel      = */ CAMQP_CHANNEL,
+            /* queue        = */ amqp_cstring_bytes (conf->queue),
+            /* consumer_tag = */ AMQP_EMPTY_BYTES,
+            /* no_local     = */ 0,
+            /* no_ack       = */ 1,
+            /* exclusive    = */ 0);
+    if ((cm_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_basic_consume failed: %s",
+                    camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    return (0);
+} /* }}} int camqp_setup_queue */
+
+static int camqp_connect (camqp_config_t *conf) /* {{{ */
+{
+    amqp_rpc_reply_t reply;
+    int sockfd;
+    int status;
+
+    if (conf->connection != NULL)
+        return (0);
+
+    conf->connection = amqp_new_connection ();
+    if (conf->connection == NULL)
+    {
+        ERROR ("amqp plugin: amqp_new_connection failed.");
+        return (ENOMEM);
+    }
+
+    sockfd = amqp_open_socket (CONF(conf, host), conf->port);
+    if (sockfd < 0)
+    {
+        char errbuf[1024];
+        status = (-1) * sockfd;
+        ERROR ("amqp plugin: amqp_open_socket failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (status);
+    }
+    amqp_set_sockfd (conf->connection, sockfd);
+
+    reply = amqp_login (conf->connection, CONF(conf, vhost),
+            /* channel max = */      0,
+            /* frame max   = */ 131072,
+            /* heartbeat   = */      0,
+            /* authentication = */ AMQP_SASL_METHOD_PLAIN,
+            CONF(conf, user), CONF(conf, password));
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+                CONF(conf, vhost), CONF(conf, user));
+        amqp_destroy_connection (conf->connection);
+        close (sockfd);
+        conf->connection = NULL;
+        return (1);
+    }
+
+    amqp_channel_open (conf->connection, /* channel = */ 1);
+    /* FIXME: Is checking "reply.reply_type" really correct here? How does
+     * it get set? --octo */
+    if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+    {
+        ERROR ("amqp plugin: amqp_channel_open failed.");
+        amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+        amqp_destroy_connection (conf->connection);
+        close(sockfd);
+        conf->connection = NULL;
+        return (1);
+    }
+
+    INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+            "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
+
+    status = camqp_create_exchange (conf);
+    if (status != 0)
+        return (status);
+
+    if (!conf->publish)
+        return (camqp_setup_queue (conf));
+    return (0);
+} /* }}} int camqp_connect */
+
+static int camqp_shutdown (void) /* {{{ */
+{
+    size_t i;
+
+    DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+            subscriber_threads_num);
+
+    subscriber_threads_running = 0;
+    for (i = 0; i < subscriber_threads_num; i++)
+    {
+        /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+         * to use a timeout in the thread and check for the variable in regular
+         * intervals. */
+        pthread_kill (subscriber_threads[i], SIGTERM);
+        pthread_join (subscriber_threads[i], /* retval = */ NULL);
+    }
+
+    subscriber_threads_num = 0;
+    sfree (subscriber_threads);
+
+    DEBUG ("amqp plugin: All subscriber threads exited.");
+
+    return (0);
+} /* }}} int camqp_shutdown */
+
+/*
+ * Subscribing code
+ */
+static int camqp_read_body (camqp_config_t *conf, /* {{{ */
+        size_t body_size, const char *content_type)
+{
+    char body[body_size + 1];
+    char *body_ptr;
+    size_t received;
+    amqp_frame_t frame;
+    int status;
+
+    memset (body, 0, sizeof (body));
+    body_ptr = &body[0];
+    received = 0;
+
+    while (received < body_size)
+    {
+        status = amqp_simple_wait_frame (conf->connection, &frame);
+        if (status < 0)
+        {
+            char errbuf[1024];
+            status = (-1) * status;
+            ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+                    sstrerror (status, errbuf, sizeof (errbuf)));
+            camqp_close_connection (conf);
+            return (status);
+        }
+
+        if (frame.frame_type != AMQP_FRAME_BODY)
+        {
+            NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                    frame.frame_type);
+            return (-1);
+        }
+
+        if ((body_size - received) < frame.payload.body_fragment.len)
+        {
+            WARNING ("amqp plugin: Body is larger than indicated by header.");
+            return (-1);
+        }
+
+        memcpy (body_ptr, frame.payload.body_fragment.bytes,
+                frame.payload.body_fragment.len);
+        body_ptr += frame.payload.body_fragment.len;
+        received += frame.payload.body_fragment.len;
+    } /* while (received < body_size) */
+
+    if (strcasecmp ("text/collectd", content_type) == 0)
+    {
+        status = handle_putval (stderr, body);
+        if (status != 0)
+            ERROR ("amqp plugin: handle_putval failed with status %i.",
+                    status);
+        return (status);
+    }
+    else if (strcasecmp ("application/json", content_type) == 0)
+    {
+        ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
+                "been implemented yet. FIXME!");
+        return (0);
+    }
+    else
+    {
+        ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+                content_type);
+        return (EINVAL);
+    }
+
+    /* not reached */
+    return (0);
+} /* }}} int camqp_read_body */
+
+static int camqp_read_header (camqp_config_t *conf) /* {{{ */
+{
+    int status;
+    amqp_frame_t frame;
+    amqp_basic_properties_t *properties;
+    char *content_type;
+
+    status = amqp_simple_wait_frame (conf->connection, &frame);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status = (-1) * status;
+        ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+                    sstrerror (status, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (status);
+    }
+
+    if (frame.frame_type != AMQP_FRAME_HEADER)
+    {
+        NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                frame.frame_type);
+        return (-1);
+    }
+
+    properties = frame.payload.properties.decoded;
+    content_type = camqp_bytes_cstring (&properties->content_type);
+    if (content_type == NULL)
+    {
+        ERROR ("amqp plugin: Unable to determine content type.");
+        return (-1);
+    }
+
+    status = camqp_read_body (conf,
+            (size_t) frame.payload.properties.body_size,
+            content_type);
+
+    sfree (content_type);
+    return (status);
+} /* }}} int camqp_read_header */
+
+static void *camqp_subscribe_thread (void *user_data) /* {{{ */
+{
+    camqp_config_t *conf = user_data;
+    int status;
+
+    while (subscriber_threads_running)
+    {
+        amqp_frame_t frame;
+
+        status = camqp_connect (conf);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: camqp_connect failed. "
+                    "Will sleep for %i seconds.", interval_g);
+            sleep (interval_g);
+            continue;
+        }
+
+        status = amqp_simple_wait_frame (conf->connection, &frame);
+        if (status < 0)
+        {
+            ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
+                    "Will sleep for %i seconds.", interval_g);
+            camqp_close_connection (conf);
+            sleep (interval_g);
+            continue;
+        }
+
+        if (frame.frame_type != AMQP_FRAME_METHOD)
+        {
+            DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
+                    frame.frame_type);
+            continue;
+        }
+
+        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+        {
+            DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
+                    frame.payload.method.id);
+            continue;
+        }
+
+        status = camqp_read_header (conf);
+
+        amqp_maybe_release_buffers (conf->connection);
+    } /* while (subscriber_threads_running) */
+
+    camqp_config_free (conf);
+    pthread_exit (NULL);
+} /* }}} void *camqp_subscribe_thread */
+
+static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
+{
+    int status;
+    pthread_t *tmp;
+
+    tmp = realloc (subscriber_threads,
+            sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
+    if (tmp == NULL)
+    {
+        ERROR ("amqp plugin: realloc failed.");
+        camqp_config_free (conf);
+        return (ENOMEM);
+    }
+    subscriber_threads = tmp;
+    tmp = subscriber_threads + subscriber_threads_num;
+    memset (tmp, 0, sizeof (*tmp));
+
+    status = pthread_create (tmp, /* attr = */ NULL,
+            camqp_subscribe_thread, conf);
+    if (status != 0)
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: pthread_create failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        camqp_config_free (conf);
+        return (status);
+    }
+
+    subscriber_threads_num++;
+
+    return (0);
+} /* }}} int camqp_subscribe_init */
+
+/*
+ * Publishing code
+ */
+/* XXX: You must hold "conf->lock" when calling this function! */
+static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
+        const char *buffer, const char *routing_key)
+{
+    amqp_basic_properties_t props;
+    int status;
+
+    status = camqp_connect (conf);
+    if (status != 0)
+        return (status);
+
+    memset (&props, 0, sizeof (props));
+    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+        | AMQP_BASIC_DELIVERY_MODE_FLAG
+        | AMQP_BASIC_APP_ID_FLAG;
+    if (conf->format == CAMQP_FORMAT_COMMAND)
+        props.content_type = amqp_cstring_bytes("text/collectd");
+    else if (conf->format == CAMQP_FORMAT_JSON)
+        props.content_type = amqp_cstring_bytes("application/json");
+    else
+        assert (23 == 42);
+    props.delivery_mode = conf->delivery_mode;
+    props.app_id = amqp_cstring_bytes("collectd");
+
+    status = amqp_basic_publish(conf->connection,
+                /* channel = */ 1,
+                amqp_cstring_bytes(CONF(conf, exchange)),
+                amqp_cstring_bytes (routing_key),
+                /* mandatory = */ 0,
+                /* immediate = */ 0,
+                &props,
+                amqp_cstring_bytes(buffer));
+    if (status != 0)
+    {
+        ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
+                status);
+        camqp_close_connection (conf);
+    }
+
+    return (status);
+} /* }}} int camqp_write_locked */
+
+static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+        user_data_t *user_data)
+{
+    camqp_config_t *conf = user_data->data;
+    char routing_key[6 * DATA_MAX_NAME_LEN];
+    char buffer[4096];
+    int status;
+
+    if ((ds == NULL) || (vl == NULL) || (conf == NULL))
+        return (EINVAL);
+
+    memset (buffer, 0, sizeof (buffer));
+
+    if (conf->routing_key != NULL)
+    {
+        sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+    }
+    else
+    {
+        size_t i;
+        ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+                vl->host,
+                vl->plugin, vl->plugin_instance,
+                vl->type, vl->type_instance);
+
+        /* Switch slashes (the only character forbidden by collectd) and dots
+         * (the separation character used by AMQP). */
+        for (i = 0; routing_key[i] != 0; i++)
+        {
+            if (routing_key[i] == '.')
+                routing_key[i] = '/';
+            else if (routing_key[i] == '/')
+                routing_key[i] = '.';
+        }
+    }
+
+    if (conf->format == CAMQP_FORMAT_COMMAND)
+    {
+        status = create_putval (buffer, sizeof (buffer), ds, vl);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: create_putval failed with status %i.",
+                    status);
+            return (status);
+        }
+    }
+    else if (conf->format == CAMQP_FORMAT_JSON)
+    {
+        size_t bfree = sizeof (buffer);
+        size_t bfill = 0;
+
+        format_json_initialize (buffer, &bfill, &bfree);
+        format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+        format_json_finalize (buffer, &bfill, &bfree);
+    }
+    else
+    {
+        ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+        return (-1);
+    }
+
+    pthread_mutex_lock (&conf->lock);
+    status = camqp_write_locked (conf, buffer, routing_key);
+    pthread_mutex_unlock (&conf->lock);
+
+    return (status);
+} /* }}} int camqp_write */
+
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+        camqp_config_t *conf)
+{
+    char *string;
+    int status;
+
+    string = NULL;
+    status = cf_util_get_string (ci, &string);
+    if (status != 0)
+        return (status);
+
+    assert (string != NULL);
+    if (strcasecmp ("Command", string) == 0)
+        conf->format = CAMQP_FORMAT_COMMAND;
+    else if (strcasecmp ("JSON", string) == 0)
+        conf->format = CAMQP_FORMAT_JSON;
+    else
+    {
+        WARNING ("amqp plugin: Invalid format string: %s",
+                string);
+    }
+
+    free (string);
+
+    return (0);
+} /* }}} int config_set_string */
+
+static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
+        _Bool publish)
+{
+    camqp_config_t *conf;
+    int status;
+    int i;
+
+    conf = malloc (sizeof (*conf));
+    if (conf == NULL)
+    {
+        ERROR ("amqp plugin: malloc failed.");
+        return (ENOMEM);
+    }
+
+    /* Initialize "conf" {{{ */
+    memset (conf, 0, sizeof (*conf));
+    conf->publish = publish;
+    conf->name = NULL;
+    conf->format = CAMQP_FORMAT_COMMAND;
+    conf->host = NULL;
+    conf->port = 5672;
+    conf->vhost = NULL;
+    conf->user = NULL;
+    conf->password = NULL;
+    conf->exchange = NULL;
+    conf->routing_key = NULL;
+    /* publish only */
+    conf->delivery_mode = CAMQP_DM_VOLATILE;
+    conf->store_rates = 0;
+    /* subscribe only */
+    conf->exchange_type = NULL;
+    conf->queue = NULL;
+    /* general */
+    conf->connection = NULL;
+    pthread_mutex_init (&conf->lock, /* attr = */ NULL);
+    /* }}} */
+
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        sfree (conf);
+        return (status);
+    }
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp ("Host", child->key) == 0)
+            status = cf_util_get_string (child, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
+        {
+            status = cf_util_get_port_number (child);
+            if (status > 0)
+            {
+                conf->port = status;
+                status = 0;
+            }
+        }
+        else if (strcasecmp ("VHost", child->key) == 0)
+            status = cf_util_get_string (child, &conf->vhost);
+        else if (strcasecmp ("User", child->key) == 0)
+            status = cf_util_get_string (child, &conf->user);
+        else if (strcasecmp ("Password", child->key) == 0)
+            status = cf_util_get_string (child, &conf->password);
+        else if (strcasecmp ("Exchange", child->key) == 0)
+            status = cf_util_get_string (child, &conf->exchange);
+        else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
+            status = cf_util_get_string (child, &conf->exchange_type);
+        else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
+            status = cf_util_get_string (child, &conf->queue);
+        else if (strcasecmp ("RoutingKey", child->key) == 0)
+            status = cf_util_get_string (child, &conf->routing_key);
+        else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
+        {
+            _Bool tmp = 0;
+            status = cf_util_get_boolean (child, &tmp);
+            if (tmp)
+                conf->delivery_mode = CAMQP_DM_PERSISTENT;
+            else
+                conf->delivery_mode = CAMQP_DM_VOLATILE;
+        }
+        else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
+            status = cf_util_get_boolean (child, &conf->store_rates);
+        else if ((strcasecmp ("Format", child->key) == 0) && publish)
+            status = camqp_config_set_format (child, conf);
+        else
+            WARNING ("amqp plugin: Ignoring unknown "
+                    "configuration option \"%s\".", child->key);
+
+        if (status != 0)
+            break;
+    } /* for (i = 0; i < ci->children_num; i++) */
+
+    if ((status == 0) && (conf->exchange == NULL))
+    {
+        if (conf->exchange_type != NULL)
+            WARNING ("amqp plugin: The option \"ExchangeType\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+
+        if (!publish && (conf->routing_key != NULL))
+            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+
+    }
+
+    if (status != 0)
+    {
+        camqp_config_free (conf);
+        return (status);
+    }
+
+    if (conf->exchange != NULL)
+    {
+        DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
+                conf->exchange);
+    }
+
+    if (publish)
+    {
+        char cbname[128];
+        user_data_t ud = { conf, camqp_config_free };
+
+        ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
+
+        status = plugin_register_write (cbname, camqp_write, &ud);
+        if (status != 0)
+        {
+            camqp_config_free (conf);
+            return (status);
+        }
+    }
+    else
+    {
+        status = camqp_subscribe_init (conf);
+        if (status != 0)
+        {
+            camqp_config_free (conf);
+            return (status);
+        }
+    }
+
+    return (0);
+} /* }}} int camqp_config_connection */
+
+static int camqp_config (oconfig_item_t *ci) /* {{{ */
+{
+    int i;
+
+    for (i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp ("Publish", child->key) == 0)
+            camqp_config_connection (child, /* publish = */ 1);
+        else if (strcasecmp ("Subscribe", child->key) == 0)
+            camqp_config_connection (child, /* publish = */ 0);
+        else
+            WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
+                    child->key);
+    } /* for (ci->children_num) */
+
+    return (0);
+} /* }}} int camqp_config */
+
+void module_register (void)
+{
+    plugin_register_complex_config ("amqp", camqp_config);
+    plugin_register_shutdown ("amqp", camqp_shutdown);
+} /* void module_register */
+
+/* vim: set sw=4 sts=4 et fdm=marker : */
index b9408a3..267296c 100644 (file)
@@ -1,3 +1,13 @@
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
 =head1 NAME
 
 collectd-python - Documentation of collectd's C<python plugin>
@@ -693,14 +703,8 @@ dispatched by the python plugin after upgrades.
 
 =item
 
-This plugin is not compatible with python3. Trying to compile it with python3
-will fail because of the ways string, unicode and bytearray behavior was
-changed.
-
-=item
-
 Not all aspects of the collectd API are accessible from python. This includes
-but is not limited to meta-data, filters and data sets.
+but is not limited to filters and data sets.
 
 =back
 
index 12cec75..e194d14 100644 (file)
@@ -52,6 +52,7 @@
 # to missing dependencies or because they have been deactivated explicitly.  #
 ##############################################################################
 
+#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors
 # ription of those options is available in the collectd.conf(5) manual page. #
 ##############################################################################
 
+#<Plugin "amqp">
+#  <Publish "name">
+#    Host "localhost"
+#    Port "5672"
+#    VHost "/"
+#    User "guest"
+#    Password "guest"
+#    Exchange "amq.fanout"
+#    RoutingKey "collectd"
+#    Persistent false
+#    StoreRates false
+#  </Publish>
+#</Plugin>
+
 #<Plugin apache>
 #  <Instance "local">
 #    URL "http://localhost/status?auto"
 #      SocketFile "@prefix@/var/run/@PACKAGE_NAME@-unixsock"
 #      SocketGroup "collectd"
 #      SocketPerms "0660"
+#      DeleteSocket false
 #</Plugin>
 
 #<Plugin uuid>
index 8481a54..1c31d97 100644 (file)
@@ -181,6 +181,143 @@ A list of all plugins and a short summary for each plugin can be found in the
 F<README> file shipped with the sourcecode and hopefully binary packets as
 well.
 
+=head2 Plugin C<amqp>
+
+The I<AMQMP plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP message broker. Values
+are sent to or received from the broker, which handles routing, queueing and
+possibly filtering or messages.
+
+ <Plugin "amqp">
+   # Send values to an AMQP broker
+   <Publish "some_name">
+     Host "localhost"
+     Port "5672"
+     VHost "/"
+     User "guest"
+     Password "guest"
+     Exchange "amq.fanout"
+ #   ExchangeType "fanout"
+ #   RoutingKey "collectd"
+ #   Persistent false
+ #   Format "command"
+ #   StoreRates false
+   </Publish>
+   
+   # Receive values from an AMQP broker
+   <Subscribe "some_name">
+     Host "localhost"
+     Port "5672"
+     VHost "/"
+     User "guest"
+     Password "guest"
+     Exchange "amq.fanout"
+ #   ExchangeType "fanout"
+ #   Queue "queue_name"
+ #   RoutingKey "collectd.#"
+   </Subscribe>
+ </Plugin>
+
+The plugin's configuration consists of a number of I<Publish> and I<Subscribe>
+blocks, which configure sending and receiving of values respectively. The two
+blocks are very similar, so unless otherwise noted, an option can be used in
+either block. The name given in the blocks starting tag is only used for
+reporting messages, but may be used to support I<flushing> of certain
+I<Publish> blocks in the future.
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP broker. Defaults to the default behavior of
+the underlying communications library, I<rabbitmq-c>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP broker accepts connections. This
+argument must be a string, even if the numeric form is used. Defaults to
+"5672".
+
+=item B<VHost> I<VHost>
+
+Name of the I<virtual host> on the AMQP broker to use. Defaults to "/".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP broker. By default "guest"/"guest"
+is used.
+
+=item B<Exchange> I<Exchange>
+
+In I<Publish> blocks, this option specifies the I<exchange> to send values to.
+By default, "amq.fanout" will be used.
+
+In I<Subscribe> blocks this option is optional. If given, a I<binding> between
+the given exchange and the I<queue> is created, using the I<routing key> if
+configured. See the B<Queue> and B<RoutingKey> options below.
+
+=item B<ExchangeType> I<Type>
+
+If given, the plugin will try to create the configured I<exchange> with this
+I<type> after connecting. When in a I<Subscribe> block, the I<queue> will then
+be bound to this exchange.
+
+=item B<Queue> I<Queue> (Subscribe only)
+
+Configures the I<queue> name to subscribe to. If no queue name was configures
+explicitly, a unique queue name will be created by the broker.
+
+=item B<RoutingKey> I<Key>
+
+In I<Publish> blocks, this configures the routing key to set on all outgoing
+messages. If not given, the routing key will be computed from the I<identifier>
+of the value. The host, plugin, type and the two instances are concatenated
+together using dots as the separator and all containing dots replaced with
+slashes. For example "collectd.host/example/com.cpu.0.cpu.user". This makes it
+possible to receive only specific values using a "topic" exchange.
+
+In I<Subscribe> blocks, configures the I<routing key> used when creating a
+I<binding> between an I<exchange> and the I<queue>. The usual wildcards can be
+used to filter messages when using a "topic" exchange. If you're only
+interested in CPU statistics, you could use the routing key "collectd.*.cpu.#"
+for example.
+
+=item B<Persistent> B<true>|B<false> (Publish only)
+
+Selects the I<delivery method> to use. If set to B<true>, the I<persistent>
+mode will be used, i.e. delivery is guaranteed. If set to B<false> (the
+default), the I<transient> delivery mode will be used, i.e. messages may be
+lost due to high load, overflowing queues or similar issues.
+
+=item B<Format> B<Command>|B<JSON> (Publish only)
+
+Selects the format in which messages are sent to the broker. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values. Currently, the I<AMQP plugin> itself can
+only decode the B<Command> format.
+
+=item B<StoreRates> B<true>|B<false> (Publish only)
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=back
+
 =head2 Plugin C<apache>
 
 To configure the C<apache>-plugin you first need to configure the Apache
@@ -4421,6 +4558,13 @@ Change the file permissions of the UNIX-socket after it has been created. The
 permissions must be given as a numeric, octal value as you would pass to
 L<chmod(1)>. Defaults to B<0770>.
 
+=item B<DeleteSocket> B<false>|B<true>
+
+If set to B<true>, delete the socket file before calling L<bind(2)>, if a file
+with the given name already exists. If I<collectd> crashes a socket file may be
+left over, preventing the daemon from opening a new socket when restarted.
+Since this is potentially dangerous, this defaults to B<false>.
+
 =back
 
 =head2 Plugin C<uuid>
index af0033a..8dd0f42 100644 (file)
 # include <kstat.h>
 #endif
 
-#if HAVE_SENSORS_SENSORS_H
-# include <sensors/sensors.h>
-#endif
-
 #ifndef PACKAGE_NAME
 #define PACKAGE_NAME "collectd"
 #endif
index fbac7ad..433764e 100644 (file)
@@ -758,6 +758,7 @@ static int cj_curl_perform (cj_t *db, CURL *curl) /* {{{ */
   if (db->yajl == NULL)
   {
     ERROR ("curl_json plugin: yajl_alloc failed.");
+    db->yajl = yprev;
     return (-1);
   }
 
diff --git a/src/lpar.c b/src/lpar.c
new file mode 100644 (file)
index 0000000..4d53447
--- /dev/null
@@ -0,0 +1,273 @@
+/**
+ * collectd - src/lpar.c
+ * Copyright (C) 2010  Aurélien Reynaud
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Aurélien Reynaud <collectd at wattapower.net>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include <sys/protosw.h>
+#include <libperfstat.h>
+#include <sys/utsname.h>
+
+/* XINTFRAC was defined in libperfstat.h somewhere between AIX 5.3 and 6.1 */
+#ifndef XINTFRAC
+# include <sys/systemcfg.h>
+# define XINTFRAC ((double)(_system_configuration.Xint) / \
+                   (double)(_system_configuration.Xfrac))
+#endif
+
+#define CLOCKTICKS_TO_TICKS(cticks) ((cticks) / XINTFRAC)
+
+static const char *config_keys[] =
+{
+  "CpuPoolStats",
+  "ReportBySerial"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static _Bool pool_stats = 0;
+static _Bool report_by_serial = 0;
+#if PERFSTAT_SUPPORTS_DONATION
+static _Bool donate_flag = 0;
+#endif
+static char serial[SYS_NMLN];
+
+static perfstat_partition_total_t lparstats_old;
+
+static int lpar_config (const char *key, const char *value)
+{
+       if (strcasecmp ("CpuPoolStats", key) == 0)
+       {
+               if (IS_TRUE (value))
+                       pool_stats = 1;
+               else
+                       pool_stats = 0;
+       }
+       else if (strcasecmp ("ReportBySerial", key) == 0)
+       {
+               if (IS_TRUE (value))
+                       report_by_serial = 1;
+               else
+                       report_by_serial = 0;
+       }
+       else
+       {
+               return (-1);
+       }
+
+       return (0);
+} /* int lpar_config */
+
+static int lpar_init (void)
+{
+       int status;
+
+       /* Retrieve the initial metrics. Returns the number of structures filled. */
+       status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+                       &lparstats_old, sizeof (perfstat_partition_total_t),
+                       /* number = */ 1 /* (must be 1) */);
+       if (status != 1)
+       {
+               char errbuf[1024];
+               ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+                               sstrerror (errno, errbuf, sizeof (errbuf)),
+                               status);
+               return (-1);
+       }
+
+#if PERFSTAT_SUPPORTS_DONATION
+       if (!lparstats_old.type.b.shared_enabled
+                       && lparstats_old.type.b.donate_enabled)
+       {
+               donate_flag = 1;
+       }
+#endif
+
+       if (pool_stats && !lparstats_old.type.b.pool_util_authority)
+       {
+               WARNING ("lpar plugin: This partition does not have pool authority. "
+                               "Disabling CPU pool statistics collection.");
+               pool_stats = 0;
+       }
+
+       return (0);
+} /* int lpar_init */
+
+static void lpar_submit (const char *type_instance, double value)
+{
+       value_t values[1];
+       value_list_t vl = VALUE_LIST_INIT;
+
+       values[0].gauge = (gauge_t)value;
+
+       vl.values = values;
+       vl.values_len = 1;
+       if (report_by_serial)
+       {
+               sstrncpy (vl.host, serial, sizeof (vl.host));
+               sstrncpy (vl.plugin_instance, hostname_g, sizeof (vl.plugin));
+       }
+       else
+       {
+               sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+       }
+       sstrncpy (vl.plugin, "lpar", sizeof (vl.plugin));
+       sstrncpy (vl.type, "vcpu", sizeof (vl.type));
+       sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+       plugin_dispatch_values (&vl);
+} /* void lpar_submit */
+
+static int lpar_read (void)
+{
+       perfstat_partition_total_t lparstats;
+       int status;
+       struct utsname name;
+       u_longlong_t ticks;
+       u_longlong_t user_ticks, syst_ticks, wait_ticks, idle_ticks;
+       u_longlong_t consumed_ticks;
+       double entitled_proc_capacity;
+
+       /* An LPAR has the same serial number as the physical system it is currently
+          running on. It is a convenient way of tracking LPARs as they are moved
+          from chassis to chassis through Live Partition Mobility (LPM). */
+       if (uname (&name) != 0)
+       {
+               ERROR ("lpar plugin: uname failed.");
+               return (-1);
+       }
+       sstrncpy (serial, name.machine, sizeof (serial));
+
+       /* Retrieve the current metrics. Returns the number of structures filled. */
+       status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+                       &lparstats, sizeof (perfstat_partition_total_t),
+                       /* number = */ 1 /* (must be 1) */);
+       if (status != 1)
+       {
+               char errbuf[1024];
+               ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+                               sstrerror (errno, errbuf, sizeof (errbuf)),
+                               status);
+               return (-1);
+       }
+
+       /* Number of ticks since we last run. */
+       ticks = lparstats.timebase_last - lparstats_old.timebase_last;
+       if (ticks == 0)
+       {
+               /* The stats have not been updated. Return now to avoid
+                * dividing by zero */
+               return (0);
+       }
+
+       /*
+        * On a shared partition, we're "entitled" to a certain amount of
+        * processing power, for example 250/100 of a physical CPU. Processing
+        * capacity not used by the partition may be assigned to a different
+        * partition by the hypervisor, so "idle" is hopefully a very small
+        * number.
+        *
+        * A dedicated partition may donate its CPUs to another partition and
+        * may steal ticks from somewhere else (another partition or maybe the
+        * shared pool, I don't know --octo).
+        */
+
+       /* entitled_proc_capacity is in 1/100th of a CPU */
+       entitled_proc_capacity = 0.01 * ((double) lparstats.entitled_proc_capacity);
+       lpar_submit ("entitled", entitled_proc_capacity);
+
+       /* The number of ticks actually spent in the various states */
+       user_ticks = lparstats.puser - lparstats_old.puser;
+       syst_ticks = lparstats.psys  - lparstats_old.psys;
+       wait_ticks = lparstats.pwait - lparstats_old.pwait;
+       idle_ticks = lparstats.pidle - lparstats_old.pidle;
+       consumed_ticks = user_ticks + syst_ticks + wait_ticks + idle_ticks;
+
+       lpar_submit ("user", (double) user_ticks / (double) ticks);
+       lpar_submit ("system", (double) syst_ticks / (double) ticks);
+       lpar_submit ("wait", (double) wait_ticks / (double) ticks);
+       lpar_submit ("idle", (double) idle_ticks / (double) ticks);
+
+#if PERFSTAT_SUPPORTS_DONATION
+       if (donate_flag)
+       {
+               /* donated => ticks given to another partition
+                * stolen  => ticks received from another partition */
+               u_longlong_t idle_donated_ticks, busy_donated_ticks;
+               u_longlong_t idle_stolen_ticks, busy_stolen_ticks;
+
+               /* FYI:  PURR == Processor Utilization of Resources Register
+                *      SPURR == Scaled PURR */
+               idle_donated_ticks = lparstats.idle_donated_purr - lparstats_old.idle_donated_purr;
+               busy_donated_ticks = lparstats.busy_donated_purr - lparstats_old.busy_donated_purr;
+               idle_stolen_ticks  = lparstats.idle_stolen_purr  - lparstats_old.idle_stolen_purr;
+               busy_stolen_ticks  = lparstats.busy_stolen_purr  - lparstats_old.busy_stolen_purr;
+
+               lpar_submit ("idle_donated", (double) idle_donated_ticks / (double) ticks);
+               lpar_submit ("busy_donated", (double) busy_donated_ticks / (double) ticks);
+               lpar_submit ("idle_stolen",  (double) idle_stolen_ticks  / (double) ticks);
+               lpar_submit ("busy_stolen",  (double) busy_stolen_ticks  / (double) ticks);
+
+               /* Donated ticks will be accounted for as stolen ticks in other LPARs */
+               consumed_ticks += idle_stolen_ticks + busy_stolen_ticks;
+       }
+#endif
+
+       lpar_submit ("consumed", (double) consumed_ticks / (double) ticks);
+
+       if (pool_stats)
+       {
+               char typinst[DATA_MAX_NAME_LEN];
+               u_longlong_t pool_idle_cticks;
+               double pool_idle_cpus;
+               double pool_busy_cpus;
+
+               /* We're calculating "busy" from "idle" and the total number of
+                * CPUs, because the "busy" member didn't exist in early versions
+                * of libperfstat. It was added somewhere between AIX 5.3 ML5 and ML9. */
+               pool_idle_cticks = lparstats.pool_idle_time - lparstats_old.pool_idle_time;
+               pool_idle_cpus = CLOCKTICKS_TO_TICKS ((double) pool_idle_cticks) / (double) ticks;
+               pool_busy_cpus = ((double) lparstats.phys_cpus_pool) - pool_idle_cpus;
+               if (pool_busy_cpus < 0.0)
+                       pool_busy_cpus = 0.0;
+
+               ssnprintf (typinst, sizeof (typinst), "pool-%X-busy", lparstats.pool_id);
+               lpar_submit (typinst, pool_busy_cpus);
+
+               ssnprintf (typinst, sizeof (typinst), "pool-%X-idle", lparstats.pool_id);
+               lpar_submit (typinst, pool_idle_cpus);
+       }
+
+       memcpy (&lparstats_old, &lparstats, sizeof (lparstats_old));
+
+       return (0);
+} /* int lpar_read */
+
+void module_register (void)
+{
+       plugin_register_config ("lpar", lpar_config,
+                               config_keys, config_keys_num);
+       plugin_register_init ("lpar", lpar_init);
+       plugin_register_read ("lpar", lpar_read);
+} /* void module_register */
+
+/* vim: set sw=8 noet : */
+
index 9f02226..ae6282c 100644 (file)
@@ -53,9 +53,18 @@ struct mv_match_s
  */
 static void mv_free_match (mv_match_t *m) /* {{{ */
 {
+  int i;
+  
   if (m == NULL)
     return;
 
+  if (m->data_sources != NULL)
+  {
+    for (i = 0; i < m->data_sources_num; ++i)
+      free(m->data_sources[i]);
+    free(m->data_sources);
+  }
+  
   free (m);
 } /* }}} void mv_free_match */
 
index 1640cfd..5c4b6e7 100644 (file)
@@ -566,7 +566,7 @@ static int submit_values (const char *host, /* {{{ */
                const char *plugin_inst,
                const char *type, const char *type_inst,
                value_t *values, int values_len,
-               cdtime_t timestamp)
+               cdtime_t timestamp, cdtime_t interval)
 {
        value_list_t vl = VALUE_LIST_INIT;
 
@@ -576,6 +576,9 @@ static int submit_values (const char *host, /* {{{ */
        if (timestamp > 0)
                vl.time = timestamp;
 
+       if (interval > 0)
+               vl.interval = interval;
+
        if (host != NULL)
                sstrncpy (vl.host, host, sizeof (vl.host));
        else
@@ -592,7 +595,7 @@ static int submit_values (const char *host, /* {{{ */
 
 static int submit_two_counters (const char *host, const char *plugin_inst, /* {{{ */
                const char *type, const char *type_inst, counter_t val0, counter_t val1,
-               cdtime_t timestamp)
+               cdtime_t timestamp, cdtime_t interval)
 {
        value_t values[2];
 
@@ -600,23 +603,24 @@ static int submit_two_counters (const char *host, const char *plugin_inst, /* {{
        values[1].counter = val1;
 
        return (submit_values (host, plugin_inst, type, type_inst,
-                               values, 2, timestamp));
+                               values, 2, timestamp, interval));
 } /* }}} int submit_two_counters */
 
 static int submit_counter (const char *host, const char *plugin_inst, /* {{{ */
-               const char *type, const char *type_inst, counter_t counter, cdtime_t timestamp)
+               const char *type, const char *type_inst, counter_t counter,
+               cdtime_t timestamp, cdtime_t interval)
 {
        value_t v;
 
        v.counter = counter;
 
        return (submit_values (host, plugin_inst, type, type_inst,
-                               &v, 1, timestamp));
+                               &v, 1, timestamp, interval));
 } /* }}} int submit_counter */
 
 static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ */
                const char *type, const char *type_inst, gauge_t val0, gauge_t val1,
-               cdtime_t timestamp)
+               cdtime_t timestamp, cdtime_t interval)
 {
        value_t values[2];
 
@@ -624,18 +628,19 @@ static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ *
        values[1].gauge = val1;
 
        return (submit_values (host, plugin_inst, type, type_inst,
-                               values, 2, timestamp));
+                               values, 2, timestamp, interval));
 } /* }}} int submit_two_gauge */
 
 static int submit_double (const char *host, const char *plugin_inst, /* {{{ */
-               const char *type, const char *type_inst, double d, cdtime_t timestamp)
+               const char *type, const char *type_inst, double d,
+               cdtime_t timestamp, cdtime_t interval)
 {
        value_t v;
 
        v.gauge = (gauge_t) d;
 
        return (submit_values (host, plugin_inst, type, type_inst,
-                               &v, 1, timestamp));
+                               &v, 1, timestamp, interval));
 } /* }}} int submit_uint64 */
 
 /* Calculate hit ratio from old and new counters and submit the resulting
@@ -647,7 +652,8 @@ static int submit_cache_ratio (const char *host, /* {{{ */
                uint64_t new_misses,
                uint64_t old_hits,
                uint64_t old_misses,
-               cdtime_t timestamp)
+               cdtime_t timestamp,
+               cdtime_t interval)
 {
        value_t v;
 
@@ -664,12 +670,12 @@ static int submit_cache_ratio (const char *host, /* {{{ */
        }
 
        return (submit_values (host, plugin_inst, "cache_ratio", type_inst,
-                               &v, 1, timestamp));
+                               &v, 1, timestamp, interval));
 } /* }}} int submit_cache_ratio */
 
 /* Submits all the caches used by WAFL. Uses "submit_cache_ratio". */
 static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ */
-               cfg_wafl_t *old_data, const cfg_wafl_t *new_data)
+               cfg_wafl_t *old_data, const cfg_wafl_t *new_data, int interval)
 {
        /* Submit requested counters */
        if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_NAME_CACHE | HAVE_WAFL_NAME_CACHE)
@@ -677,28 +683,28 @@ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{
                submit_cache_ratio (hostname, instance, "name_cache_hit",
                                new_data->name_cache_hit, new_data->name_cache_miss,
                                old_data->name_cache_hit, old_data->name_cache_miss,
-                               new_data->timestamp);
+                               new_data->timestamp, interval);
 
        if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_DIR_CACHE | HAVE_WAFL_FIND_DIR)
                        && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_FIND_DIR))
                submit_cache_ratio (hostname, instance, "find_dir_hit",
                                new_data->find_dir_hit, new_data->find_dir_miss,
                                old_data->find_dir_hit, old_data->find_dir_miss,
-                               new_data->timestamp);
+                               new_data->timestamp, interval);
 
        if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_BUF_CACHE | HAVE_WAFL_BUF_HASH)
                        && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_BUF_HASH))
                submit_cache_ratio (hostname, instance, "buf_hash_hit",
                                new_data->buf_hash_hit, new_data->buf_hash_miss,
                                old_data->buf_hash_hit, old_data->buf_hash_miss,
-                               new_data->timestamp);
+                               new_data->timestamp, interval);
 
        if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_INODE_CACHE | HAVE_WAFL_INODE_CACHE)
                        && HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_INODE_CACHE))
                submit_cache_ratio (hostname, instance, "inode_cache_hit",
                                new_data->inode_cache_hit, new_data->inode_cache_miss,
                                old_data->inode_cache_hit, old_data->inode_cache_miss,
-                               new_data->timestamp);
+                               new_data->timestamp, interval);
 
        /* Clear old HAVE_* flags */
        old_data->flags &= ~HAVE_WAFL_ALL;
@@ -724,7 +730,7 @@ static int submit_wafl_data (const char *hostname, const char *instance, /* {{{
  * update flags appropriately. */
 static int submit_volume_perf_data (const char *hostname, /* {{{ */
                data_volume_perf_t *old_data,
-               const data_volume_perf_t *new_data)
+               const data_volume_perf_t *new_data, int interval)
 {
        char plugin_instance[DATA_MAX_NAME_LEN];
 
@@ -739,7 +745,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */
                        && HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_BYTES_READ | HAVE_VOLUME_PERF_BYTES_WRITE))
        {
                submit_two_counters (hostname, plugin_instance, "disk_octets", /* type instance = */ NULL,
-                               (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp);
+                               (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp, interval);
        }
 
        /* Check for and submit disk-operations values */
@@ -747,7 +753,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */
                        && HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_OPS_READ | HAVE_VOLUME_PERF_OPS_WRITE))
        {
                submit_two_counters (hostname, plugin_instance, "disk_ops", /* type instance = */ NULL,
-                               (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp);
+                               (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp, interval);
        }
 
        /* Check for, calculate and submit disk-latency values */
@@ -791,7 +797,7 @@ static int submit_volume_perf_data (const char *hostname, /* {{{ */
                }
 
                submit_two_gauge (hostname, plugin_instance, "disk_latency", /* type instance = */ NULL,
-                               latency_per_op_read, latency_per_op_write, new_data->timestamp);
+                               latency_per_op_read, latency_per_op_write, new_data->timestamp, interval);
        }
 
        /* Clear all HAVE_* flags. */
@@ -830,7 +836,7 @@ static cdtime_t cna_child_get_cdtime (na_elem_t *data) /* {{{ */
  */
 /* Data corresponding to <WAFL /> */
 static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /* {{{ */
-               na_elem_t *data)
+               na_elem_t *data, int interval)
 {
        cfg_wafl_t perf_data;
        const char *plugin_inst;
@@ -909,7 +915,7 @@ static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /*
                }
        }
 
-       return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data));
+       return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data, interval));
 } /* }}} void cna_handle_wafl_data */
 
 static int cna_setup_wafl (cfg_wafl_t *cw) /* {{{ */
@@ -983,7 +989,7 @@ static int cna_query_wafl (host_config_t *host) /* {{{ */
                return (-1);
        }
 
-       status = cna_handle_wafl_data (host->name, host->cfg_wafl, data);
+       status = cna_handle_wafl_data (host->name, host->cfg_wafl, data, host->interval);
 
        if (status == 0)
                host->cfg_wafl->interval.last_read = now;
@@ -994,7 +1000,7 @@ static int cna_query_wafl (host_config_t *host) /* {{{ */
 
 /* Data corresponding to <Disks /> */
 static int cna_handle_disk_data (const char *hostname, /* {{{ */
-               cfg_disk_t *cfg_disk, na_elem_t *data)
+               cfg_disk_t *cfg_disk, na_elem_t *data, cdtime_t interval)
 {
        cdtime_t timestamp;
        na_elem_t *instances;
@@ -1108,7 +1114,7 @@ static int cna_handle_disk_data (const char *hostname, /* {{{ */
 
        if ((cfg_disk->flags & CFG_DISK_BUSIEST) && (worst_disk != NULL))
                submit_double (hostname, "system", "percent", "disk_busy",
-                               worst_disk->disk_busy_percent, timestamp);
+                               worst_disk->disk_busy_percent, timestamp, interval);
 
        return (0);
 } /* }}} int cna_handle_disk_data */
@@ -1178,7 +1184,7 @@ static int cna_query_disk (host_config_t *host) /* {{{ */
                return (-1);
        }
 
-       status = cna_handle_disk_data (host->name, host->cfg_disk, data);
+       status = cna_handle_disk_data (host->name, host->cfg_disk, data, host->interval);
 
        if (status == 0)
                host->cfg_disk->interval.last_read = now;
@@ -1189,7 +1195,7 @@ static int cna_query_disk (host_config_t *host) /* {{{ */
 
 /* Data corresponding to <VolumePerf /> */
 static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */
-               cfg_volume_perf_t *cvp, na_elem_t *data)
+               cfg_volume_perf_t *cvp, na_elem_t *data, cdtime_t interval)
 {
        cdtime_t timestamp;
        na_elem_t *elem_instances;
@@ -1274,7 +1280,7 @@ static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */
                        }
                } /* for (elem_counter) */
 
-               submit_volume_perf_data (hostname, v, &perf_data);
+               submit_volume_perf_data (hostname, v, &perf_data, interval);
        } /* for (volume) */
 
        return (0);
@@ -1349,7 +1355,7 @@ static int cna_query_volume_perf (host_config_t *host) /* {{{ */
                return (-1);
        }
 
-       status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data);
+       status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data, host->interval);
 
        if (status == 0)
                host->cfg_volume_perf->interval.last_read = now;
@@ -1360,7 +1366,7 @@ static int cna_query_volume_perf (host_config_t *host) /* {{{ */
 
 /* Data corresponding to <VolumeUsage /> */
 static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */
-               cfg_volume_usage_t *cfg_volume)
+               cfg_volume_usage_t *cfg_volume, int interval)
 {
        data_volume_usage_t *v;
 
@@ -1408,32 +1414,32 @@ static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_FREE))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "free",
-                                       (double) norm_free, /* timestamp = */ 0);
+                                       (double) norm_free, /* timestamp = */ 0, interval);
 
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SIS_SAVED))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "sis_saved",
-                                       (double) sis_saved, /* timestamp = */ 0);
+                                       (double) sis_saved, /* timestamp = */ 0, interval);
 
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_USED))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "used",
-                                       (double) norm_used, /* timestamp = */ 0);
+                                       (double) norm_used, /* timestamp = */ 0, interval);
 
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_RSVD))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "snap_reserved",
-                                       (double) snap_reserve_free, /* timestamp = */ 0);
+                                       (double) snap_reserve_free, /* timestamp = */ 0, interval);
 
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED | HAVE_VOLUME_USAGE_SNAP_RSVD))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "snap_reserve_used",
-                                       (double) snap_reserve_used, /* timestamp = */ 0);
+                                       (double) snap_reserve_used, /* timestamp = */ 0, interval);
 
                if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED))
                        submit_double (hostname, /* plugin instance = */ plugin_instance,
                                        "df_complex", "snap_normal_used",
-                                       (double) snap_norm_used, /* timestamp = */ 0);
+                                       (double) snap_norm_used, /* timestamp = */ 0, interval);
 
                /* Clear all the HAVE_* flags */
                v->flags &= ~HAVE_VOLUME_USAGE_ALL;
@@ -1660,7 +1666,7 @@ static int cna_handle_volume_usage_data (const host_config_t *host, /* {{{ */
                } /* }}} end of 32-bit workaround */
        } /* for (elem_volume) */
 
-       return (cna_submit_volume_usage_data (host->name, cfg_volume));
+       return (cna_submit_volume_usage_data (host->name, cfg_volume, host->interval));
 } /* }}} int cna_handle_volume_usage_data */
 
 static int cna_setup_volume_usage (cfg_volume_usage_t *cvu) /* {{{ */
@@ -1724,7 +1730,7 @@ static int cna_query_volume_usage (host_config_t *host) /* {{{ */
 
 /* Data corresponding to <System /> */
 static int cna_handle_system_data (const char *hostname, /* {{{ */
-               cfg_system_t *cfg_system, na_elem_t *data)
+               cfg_system_t *cfg_system, na_elem_t *data, int interval)
 {
        na_elem_t *instances;
        na_elem_t *counter;
@@ -1796,27 +1802,27 @@ static int cna_handle_system_data (const char *hostname, /* {{{ */
                                && (value > 0) && (strlen(name) > 4)
                                && (!strcmp(name + strlen(name) - 4, "_ops"))) {
                        submit_counter (hostname, instance, "disk_ops_complex", name,
-                                       (counter_t) value, timestamp);
+                                       (counter_t) value, timestamp, interval);
                }
        } /* for (counter) */
 
        if ((cfg_system->flags & CFG_SYSTEM_DISK)
                        && (HAS_ALL_FLAGS (counter_flags, 0x01 | 0x02)))
                submit_two_counters (hostname, instance, "disk_octets", NULL,
-                               disk_read, disk_written, timestamp);
+                               disk_read, disk_written, timestamp, interval);
                                
        if ((cfg_system->flags & CFG_SYSTEM_NET)
                        && (HAS_ALL_FLAGS (counter_flags, 0x04 | 0x08)))
                submit_two_counters (hostname, instance, "if_octets", NULL,
-                               net_recv, net_sent, timestamp);
+                               net_recv, net_sent, timestamp, interval);
 
        if ((cfg_system->flags & CFG_SYSTEM_CPU)
                        && (HAS_ALL_FLAGS (counter_flags, 0x10 | 0x20)))
        {
                submit_counter (hostname, instance, "cpu", "system",
-                               cpu_busy, timestamp);
+                               cpu_busy, timestamp, interval);
                submit_counter (hostname, instance, "cpu", "idle",
-                               cpu_total - cpu_busy, timestamp);
+                               cpu_total - cpu_busy, timestamp, interval);
        }
 
        return (0);
@@ -1872,7 +1878,7 @@ static int cna_query_system (host_config_t *host) /* {{{ */
                return (-1);
        }
 
-       status = cna_handle_system_data (host->name, host->cfg_system, data);
+       status = cna_handle_system_data (host->name, host->cfg_system, data, host->interval);
 
        if (status == 0)
                host->cfg_system->interval.last_read = now;
index eb32ad1..7a1fcf7 100644 (file)
@@ -31,6 +31,7 @@
 #include "utils_fbhash.h"
 #include "utils_avltree.h"
 #include "utils_cache.h"
+#include "utils_complain.h"
 
 #include "network.h"
 
@@ -917,6 +918,8 @@ static int parse_packet (sockent_t *se,
 static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
     void **ret_buffer, size_t *ret_buffer_len, int flags)
 {
+  static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC;
+
   char *buffer;
   size_t buffer_len;
   size_t buffer_offset;
@@ -938,8 +941,9 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
 
   if (se->data.server.userdb == NULL)
   {
-    NOTICE ("network plugin: Received signed network packet but can't verify "
-        "it because no user DB has been configured. Will accept it.");
+    c_complain (LOG_NOTICE, &complain_no_users,
+        "network plugin: Received signed network packet but can't verify it "
+        "because no user DB has been configured. Will accept it.");
     return (0);
   }
 
index a13b1f9..da6894a 100644 (file)
@@ -1,6 +1,7 @@
 /**
  * collectd - src/notify_email.c
  * Copyright (C) 2008  Oleg King
+ * Copyright (C) 2010  Florian Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -18,6 +19,7 @@
  *
  * Authors:
  *   Oleg King <king2 at kaluga.ru>
+ *   Florian Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
@@ -26,6 +28,7 @@
 
 #include <auth-client.h>
 #include <libesmtp.h>
+#include <pthread.h>
 
 #define MAXSTRING               256
 
@@ -45,6 +48,7 @@ static char **recipients;
 static int recipients_len = 0;
 
 static smtp_session_t session;
+static pthread_mutex_t session_lock = PTHREAD_MUTEX_INITIALIZER;
 static smtp_message_t message;
 static auth_context_t authctx = NULL;
 
@@ -113,17 +117,23 @@ static int notify_email_init (void)
 {
   char server[MAXSTRING];
 
+  ssnprintf(server, sizeof (server), "%s:%i",
+      (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
+      smtp_port);
+
+  pthread_mutex_lock (&session_lock);
+
   auth_client_init();
-  if (!(session = smtp_create_session ())) {
+
+  session = smtp_create_session ();
+  if (session == NULL) {
+    pthread_mutex_unlock (&session_lock);
     ERROR ("notify_email plugin: cannot create SMTP session");
     return (-1);
   }
 
   smtp_set_monitorcb (session, monitor_cb, NULL, 1);
   smtp_set_hostname (session, hostname_g);
-  ssnprintf(server, sizeof (server), "%s:%i",
-      (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
-      smtp_port);
   smtp_set_server (session, server);
 
   if (smtp_user && smtp_password) {
@@ -133,18 +143,30 @@ static int notify_email_init (void)
   }
 
   if ( !smtp_auth_set_context (session, authctx)) {
+    pthread_mutex_unlock (&session_lock);
     ERROR ("notify_email plugin: cannot set SMTP auth context");
     return (-1);   
   }
 
+  pthread_mutex_unlock (&session_lock);
   return (0);
 } /* int notify_email_init */
 
 static int notify_email_shutdown (void)
 {
-  smtp_destroy_session (session);
-  auth_destroy_context (authctx);
+  pthread_mutex_lock (&session_lock);
+
+  if (session != NULL)
+    smtp_destroy_session (session);
+  session = NULL;
+
+  if (authctx != NULL)
+    auth_destroy_context (authctx);
+  authctx = NULL;
+
   auth_client_exit();
+
+  pthread_mutex_unlock (&session_lock);
   return (0);
 } /* int notify_email_shutdown */
 
@@ -250,7 +272,16 @@ static int notify_email_notification (const notification_t *n,
       n->host,
       n->message);
 
+  pthread_mutex_lock (&session_lock);
+
+  if (session == NULL) {
+    /* Initialization failed or we're in the process of shutting down. */
+    pthread_mutex_unlock (&session_lock);
+    return (-1);
+  }
+
   if (!(message = smtp_add_message (session))) {
+    pthread_mutex_unlock (&session_lock);
     ERROR ("notify_email plugin: cannot set SMTP message");
     return (-1);   
   }
@@ -266,6 +297,7 @@ static int notify_email_notification (const notification_t *n,
     char buf[MAXSTRING];
     ERROR ("notify_email plugin: SMTP server problem: %s",
         smtp_strerror (smtp_errno (), buf, sizeof buf));
+    pthread_mutex_unlock (&session_lock);
     return (-1);
   } else {
     const smtp_status_t *status;
@@ -276,6 +308,7 @@ static int notify_email_notification (const notification_t *n,
     smtp_enumerate_recipients (message, print_recipient_status, NULL);
   }
 
+  pthread_mutex_unlock (&session_lock);
   return (0);
 } /* int notify_email_notification */
 
index a3027e0..eed0591 100644 (file)
@@ -978,6 +978,7 @@ PyMODINIT_FUNC PyInit_collectd(void) {
 
 static int cpy_config(oconfig_item_t *ci) {
        int i;
+       char *argv = "";
        PyObject *sys, *tb;
        PyObject *sys_path;
        PyObject *module;
@@ -1017,6 +1018,9 @@ static int cpy_config(oconfig_item_t *ci) {
                cpy_log_exception("python initialization");
                return 1;
        }
+       PySys_SetArgv(1, &argv);
+       PyList_SetSlice(sys_path, 0, 1, NULL);
+
 #ifdef IS_PY3K
        module = PyImport_ImportModule("collectd");
 #else
index 7fc0d42..25f4637 100644 (file)
@@ -210,6 +210,213 @@ static int v5_mysql_threads (const data_set_t *ds, value_list_t *vl) /* {{{ */
   return (FC_TARGET_STOP);
 } /* }}} int v5_mysql_threads */
 
+/*
+ * ZFS ARC hit and miss counters
+ *
+ * 4.* uses the flawed "arc_counts" type. In 5.* this has been replaced by the
+ * more generic "cache_result" type.
+ */
+static int v5_zfs_arc_counts (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+  value_list_t new_vl;
+  value_t new_value;
+  _Bool is_hits;
+
+  if (vl->values_len != 4)
+    return (FC_TARGET_STOP);
+
+  if (strcmp ("hits", vl->type_instance) == 0)
+    is_hits = 1;
+  else if (strcmp ("misses", vl->type_instance) == 0)
+    is_hits = 0;
+  else
+    return (FC_TARGET_STOP);
+
+  /* Copy everything: Time, interval, host, ... */
+  memcpy (&new_vl, vl, sizeof (new_vl));
+
+  /* Reset data we can't simply copy */
+  new_vl.values = &new_value;
+  new_vl.values_len = 1;
+  new_vl.meta = NULL;
+
+  /* Change the type to "cache_result" */
+  sstrncpy (new_vl.type, "cache_result", sizeof (new_vl.type));
+
+  /* Dispatch new value lists instead of this one */
+  new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+  ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+      "demand_data-%s",
+      is_hits ? "hit" : "miss");
+  plugin_dispatch_values (&new_vl);
+
+  new_vl.values[0].derive = (derive_t) vl->values[1].counter;
+  ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+      "demand_metadata-%s",
+      is_hits ? "hit" : "miss");
+  plugin_dispatch_values (&new_vl);
+
+  new_vl.values[0].derive = (derive_t) vl->values[2].counter;
+  ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+      "prefetch_data-%s",
+      is_hits ? "hit" : "miss");
+  plugin_dispatch_values (&new_vl);
+
+  new_vl.values[0].derive = (derive_t) vl->values[3].counter;
+  ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+      "prefetch_metadata-%s",
+      is_hits ? "hit" : "miss");
+  plugin_dispatch_values (&new_vl);
+
+  /* Abort processing */
+  return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_counts */
+
+/*
+ * ZFS ARC L2 bytes
+ *
+ * "arc_l2_bytes" -> "io_octets-L2".
+ */
+static int v5_zfs_arc_l2_bytes (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+  value_list_t new_vl;
+  value_t new_values[2];
+
+  if (vl->values_len != 2)
+    return (FC_TARGET_STOP);
+
+  /* Copy everything: Time, interval, host, ... */
+  memcpy (&new_vl, vl, sizeof (new_vl));
+
+  /* Reset data we can't simply copy */
+  new_vl.values = new_values;
+  new_vl.values_len = 2;
+  new_vl.meta = NULL;
+
+  /* Change the type/-instance to "io_octets-L2" */
+  sstrncpy (new_vl.type, "io_octets", sizeof (new_vl.type));
+  sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance));
+
+  /* Copy the actual values. */
+  new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+  new_vl.values[1].derive = (derive_t) vl->values[1].counter;
+
+  /* Dispatch new value lists instead of this one */
+  plugin_dispatch_values (&new_vl);
+
+  /* Abort processing */
+  return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_l2_bytes */
+
+/*
+ * ZFS ARC L2 cache size
+ *
+ * 4.* uses a separate type for this. 5.* uses the generic "cache_size" type
+ * instead.
+ */
+static int v5_zfs_arc_l2_size (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+  value_list_t new_vl;
+  value_t new_value;
+
+  if (vl->values_len != 1)
+    return (FC_TARGET_STOP);
+
+  /* Copy everything: Time, interval, host, ... */
+  memcpy (&new_vl, vl, sizeof (new_vl));
+
+  /* Reset data we can't simply copy */
+  new_vl.values = &new_value;
+  new_vl.values_len = 1;
+  new_vl.meta = NULL;
+
+  new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge;
+
+  /* Change the type to "cache_size" */
+  sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type));
+
+  /* Adapt the type instance */
+  sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance));
+
+  /* Dispatch new value lists instead of this one */
+  plugin_dispatch_values (&new_vl);
+
+  /* Abort processing */
+  return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_l2_size */
+
+/*
+ * ZFS ARC ratio
+ *
+ * "arc_ratio-L1" -> "cache_ratio-arc"
+ * "arc_ratio-L2" -> "cache_ratio-L2"
+ */
+static int v5_zfs_arc_ratio (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+  value_list_t new_vl;
+  value_t new_value;
+
+  if (vl->values_len != 1)
+    return (FC_TARGET_STOP);
+
+  /* Copy everything: Time, interval, host, ... */
+  memcpy (&new_vl, vl, sizeof (new_vl));
+
+  /* Reset data we can't simply copy */
+  new_vl.values = &new_value;
+  new_vl.values_len = 1;
+  new_vl.meta = NULL;
+
+  new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge;
+
+  /* Change the type to "cache_ratio" */
+  sstrncpy (new_vl.type, "cache_ratio", sizeof (new_vl.type));
+
+  /* Adapt the type instance */
+  if (strcmp ("L1", vl->type_instance) == 0)
+    sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance));
+
+  /* Dispatch new value lists instead of this one */
+  plugin_dispatch_values (&new_vl);
+
+  /* Abort processing */
+  return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_ratio */
+
+/*
+ * ZFS ARC size
+ *
+ * 4.* uses the "arc_size" type with four data sources. In 5.* this has been
+ * replaces with the "cache_size" type and static data has been removed.
+ */
+static int v5_zfs_arc_size (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+  value_list_t new_vl;
+  value_t new_value;
+
+  if (vl->values_len != 4)
+    return (FC_TARGET_STOP);
+
+  /* Copy everything: Time, interval, host, ... */
+  memcpy (&new_vl, vl, sizeof (new_vl));
+
+  /* Reset data we can't simply copy */
+  new_vl.values = &new_value;
+  new_vl.values_len = 1;
+  new_vl.meta = NULL;
+
+  /* Change the type to "cache_size" */
+  sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type));
+
+  /* Dispatch new value lists instead of this one */
+  new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+  sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance));
+  plugin_dispatch_values (&new_vl);
+
+  /* Abort processing */
+  return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_size */
+
 static int v5_destroy (void **user_data) /* {{{ */
 {
   return (0);
@@ -236,6 +443,16 @@ static int v5_invoke (const data_set_t *ds, value_list_t *vl, /* {{{ */
     return (v5_mysql_qcache (ds, vl));
   else if (strcmp ("mysql_threads", vl->type) == 0)
     return (v5_mysql_threads (ds, vl));
+  else if (strcmp ("arc_counts", vl->type) == 0)
+    return (v5_zfs_arc_counts (ds, vl));
+  else if (strcmp ("arc_l2_bytes", vl->type) == 0)
+    return (v5_zfs_arc_l2_bytes (ds, vl));
+  else if (strcmp ("arc_l2_size", vl->type) == 0)
+    return (v5_zfs_arc_l2_size (ds, vl));
+  else if (strcmp ("arc_ratio", vl->type) == 0)
+    return (v5_zfs_arc_ratio (ds, vl));
+  else if (strcmp ("arc_size", vl->type) == 0)
+    return (v5_zfs_arc_size (ds, vl));
 
   return (FC_TARGET_CONTINUE);
 } /* }}} int v5_invoke */
index 1b0020f..0484983 100644 (file)
@@ -28,6 +28,7 @@ cpufreq                       value:GAUGE:0:U
 cpu                    value:COUNTER:0:4294967295
 current                        value:GAUGE:U:U
 current_connections    value:GAUGE:0:U
+current_sessions       value:GAUGE:0:U
 delay                  seconds:GAUGE:-1000000:1000000
 derive                 value:DERIVE:0:U
 df                     used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
@@ -165,6 +166,7 @@ total_time_in_ms    value:DERIVE:0:U
 total_values           value:DERIVE:0:U
 uptime                 value:GAUGE:0:4294967295
 users                  users:GAUGE:0:65535
+vcpu                   value:GAUGE:0:U
 virt_cpu_total         ns:COUNTER:0:256000000000
 virt_vcpu              ns:COUNTER:0:1000000000
 vmpage_action          value:COUNTER:0:4294967295
@@ -177,4 +179,3 @@ voltage                     value:GAUGE:U:U
 vs_memory              value:GAUGE:0:9223372036854775807
 vs_processes           value:GAUGE:0:65535
 vs_threads             value:GAUGE:0:65535
-pinba_view              req_per_sec:GAUGE:0:U, req_time:GAUGE:0:U, ru_utime:GAUGE:0:U, ru_stime:GAUGE:0:U, doc_size:GAUGE:0:U, mem_peak:GAUGE:0:U
index 0b89748..6de1395 100644 (file)
@@ -54,7 +54,8 @@ static const char *config_keys[] =
 {
        "SocketFile",
        "SocketGroup",
-       "SocketPerms"
+       "SocketPerms",
+       "DeleteSocket"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
@@ -65,6 +66,7 @@ static int   sock_fd    = -1;
 static char *sock_file  = NULL;
 static char *sock_group = NULL;
 static int   sock_perms = S_IRWXU | S_IRWXG;
+static _Bool delete_socket = 0;
 
 static pthread_t listen_thread = (pthread_t) 0;
 
@@ -89,10 +91,27 @@ static int us_open_socket (void)
        sa.sun_family = AF_UNIX;
        sstrncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
                        sizeof (sa.sun_path));
-       /* unlink (sa.sun_path); */
 
        DEBUG ("unixsock plugin: socket path = %s", sa.sun_path);
 
+       if (delete_socket)
+       {
+               errno = 0;
+               status = unlink (sa.sun_path);
+               if ((status != 0) && (errno != ENOENT))
+               {
+                       char errbuf[1024];
+                       WARNING ("unixsock plugin: Deleting socket file \"%s\" failed: %s",
+                                       sa.sun_path,
+                                       sstrerror (errno, errbuf, sizeof (errbuf)));
+               }
+               else if (status == 0)
+               {
+                       INFO ("unixsock plugin: Successfully deleted socket file \"%s\".",
+                                       sa.sun_path);
+               }
+       }
+
        status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa));
        if (status != 0)
        {
@@ -392,6 +411,13 @@ static int us_config (const char *key, const char *val)
        {
                sock_perms = (int) strtol (val, NULL, 8);
        }
+       else if (strcasecmp (key, "DeleteSocket") == 0)
+       {
+               if (IS_TRUE (val))
+                       delete_socket = 1;
+               else
+                       delete_socket = 0;
+       }
        else
        {
                return (-1);
index 10e8a48..20b1237 100644 (file)
@@ -175,7 +175,7 @@ static int uc_send_notification (const char *name)
   }
     
   /* Check if the entry has been updated in the meantime */
-  if ((n.time - ce->last_update) < (2 * ce->interval))
+  if ((n.time - ce->last_update) < (timeout_g * ce->interval))
   {
     ce->state = STATE_OKAY;
     pthread_mutex_unlock (&cache_lock);
index 4d4b57d..062bcfe 100644 (file)
@@ -228,7 +228,7 @@ cu_match_t *match_create_callback (const char *regex, const char *excluderegex,
     return (NULL);
   memset (obj, '\0', sizeof (cu_match_t));
 
-  status = regcomp (&obj->regex, regex, REG_EXTENDED);
+  status = regcomp (&obj->regex, regex, REG_EXTENDED | REG_NEWLINE);
   if (status != 0)
   {
     ERROR ("Compiling the regular expression \"%s\" failed.", regex);
index 904a521..5b7551d 100644 (file)
@@ -220,6 +220,8 @@ int cu_tail_read (cu_tail_t *obj, char *buf, int buflen, tailfunc_t *callback,
 
        while (42)
        {
+               size_t len;
+
                status = cu_tail_readline (obj, buf, buflen);
                if (status != 0)
                {
@@ -232,6 +234,13 @@ int cu_tail_read (cu_tail_t *obj, char *buf, int buflen, tailfunc_t *callback,
                if (buf[0] == 0)
                        break;
 
+               len = strlen (buf);
+               while (len > 0) {
+                       if (buf[len - 1] != '\n')
+                               break;
+                       buf[len - 1] = '\0';
+               }
+
                status = callback (data, buf, buflen);
                if (status != 0)
                {
index 5f14e90..bc2b0d2 100644 (file)
@@ -46,54 +46,48 @@ static void za_submit (const char* type, const char* type_instance, value_t* val
 
 static void za_submit_gauge (const char* type, const char* type_instance, gauge_t value)
 {
-       value_t values[1];
+       value_t vv;
 
-       values[0].gauge = value;
-
-       za_submit (type, type_instance, values, STATIC_ARRAY_SIZE(values));
+       vv.gauge = value;
+       za_submit (type, type_instance, &vv, 1);
 }
 
-static void za_submit_size (gauge_t size, gauge_t size_target, gauge_t limit_min, gauge_t limit_max)
+static void za_submit_derive (const char* type, const char* type_instance, derive_t dv)
 {
-       value_t values[4];
-
-       values[0].gauge = size;
-       values[1].gauge = size_target;
-       values[2].gauge = limit_min;
-       values[3].gauge = limit_max;
+       value_t vv;
 
-       za_submit ("arc_size", "", values, STATIC_ARRAY_SIZE(values));
+       vv.derive = dv;
+       za_submit (type, type_instance, &vv, 1);
 }
 
-static void za_submit_bytes (counter_t read, counter_t write)
+static void za_submit_ratio (const char* type_instance, gauge_t hits, gauge_t misses)
 {
-       value_t values[2];
+       gauge_t ratio = NAN;
 
-       values[0].counter = read;
-       values[1].counter = write;
+       if (!isfinite (hits) || (hits < 0.0))
+               hits = 0.0;
+       if (!isfinite (misses) || (misses < 0.0))
+               misses = 0.0;
 
-       za_submit ("arc_l2_bytes", "", values, STATIC_ARRAY_SIZE(values));
-}
-
-static void za_submit_counts (char *type_instance, counter_t demand_data, counter_t demand_metadata,
-       counter_t prefetch_data, counter_t prefetch_metadata)
-{
-       value_t values[4];
+       if ((hits != 0.0) || (misses != 0.0))
+               ratio = hits / (hits + misses);
 
-       values[0].counter = demand_data;
-       values[1].counter = demand_metadata;
-       values[2].counter = prefetch_data;
-       values[3].counter = prefetch_metadata;
-
-       za_submit ("arc_counts", type_instance, values, STATIC_ARRAY_SIZE(values));
+       za_submit_gauge ("cache_ratio", type_instance, ratio);
 }
 
 static int za_read (void)
 {
-       gauge_t   arcsize, targetsize, minlimit, maxlimit, hits, misses, l2_size, l2_hits, l2_misses;
-       counter_t demand_data_hits, demand_metadata_hits, prefetch_data_hits, prefetch_metadata_hits;
-       counter_t demand_data_misses, demand_metadata_misses, prefetch_data_misses, prefetch_metadata_misses;
-       counter_t l2_read_bytes, l2_write_bytes;
+       gauge_t  arc_size, l2_size;
+       derive_t demand_data_hits,
+                demand_metadata_hits,
+                prefetch_data_hits,
+                prefetch_metadata_hits,
+                demand_data_misses,
+                demand_metadata_misses,
+                prefetch_data_misses,
+                prefetch_metadata_misses;
+       gauge_t  arc_hits, arc_misses, arc_ratio, l2_hits, l2_misses, l2_ratio;
+       value_t  l2_io[2];
 
        get_kstat (&ksp, "zfs", 0, "arcstats");
        if (ksp == NULL)
@@ -102,11 +96,14 @@ static int za_read (void)
                return (-1);
        }
 
-       arcsize    = get_kstat_value(ksp, "size");
-       targetsize = get_kstat_value(ksp, "c");
-       minlimit   = get_kstat_value(ksp, "c_min");
-       maxlimit   = get_kstat_value(ksp, "c_max");
+       /* Sizes */
+       arc_size   = get_kstat_value(ksp, "size");
+       l2_size    = get_kstat_value(ksp, "l2_size");
+
+       za_submit_gauge ("cache_size", "arc", arc_size);
+       za_submit_gauge ("cache_size", "L2", l2_size);
 
+       /* Hits / misses */
        demand_data_hits       = get_kstat_value(ksp, "demand_data_hits");
        demand_metadata_hits   = get_kstat_value(ksp, "demand_metadata_hits");
        prefetch_data_hits     = get_kstat_value(ksp, "prefetch_data_hits");
@@ -117,31 +114,33 @@ static int za_read (void)
        prefetch_data_misses     = get_kstat_value(ksp, "prefetch_data_misses");
        prefetch_metadata_misses = get_kstat_value(ksp, "prefetch_metadata_misses");
 
-       hits   = get_kstat_value(ksp, "hits");
-       misses = get_kstat_value(ksp, "misses");
+       za_submit_derive ("cache_result", "demand_data-hit",       demand_data_hits);
+       za_submit_derive ("cache_result", "demand_metadata-hit",   demand_metadata_hits);
+       za_submit_derive ("cache_result", "prefetch_data-hit",     prefetch_data_hits);
+       za_submit_derive ("cache_result", "prefetch_metadata-hit", prefetch_metadata_hits);
 
-       l2_size        = get_kstat_value(ksp, "l2_size");
-       l2_read_bytes  = get_kstat_value(ksp, "l2_read_bytes");
-       l2_write_bytes = get_kstat_value(ksp, "l2_write_bytes");
-       l2_hits        = get_kstat_value(ksp, "l2_hits");
-       l2_misses      = get_kstat_value(ksp, "l2_misses");
+       za_submit_derive ("cache_result", "demand_data-miss",       demand_data_misses);
+       za_submit_derive ("cache_result", "demand_metadata-miss",   demand_metadata_misses);
+       za_submit_derive ("cache_result", "prefetch_data-miss",     prefetch_data_misses);
+       za_submit_derive ("cache_result", "prefetch_metadata-miss", prefetch_metadata_misses);
 
+       /* Ratios */
+       arc_hits   = (gauge_t) get_kstat_value(ksp, "hits");
+       arc_misses = (gauge_t) get_kstat_value(ksp, "misses");
+       l2_hits    = (gauge_t) get_kstat_value(ksp, "l2_hits");
+       l2_misses  = (gauge_t) get_kstat_value(ksp, "l2_misses");
 
-       za_submit_size (arcsize, targetsize, minlimit, maxlimit);
-       za_submit_gauge ("arc_l2_size", "", l2_size);
+       za_submit_ratio ("arc", arc_hits, arc_misses);
+       za_submit_ratio ("L2", l2_hits, l2_misses);
 
-       za_submit_counts ("hits",   demand_data_hits,     demand_metadata_hits,
-                                   prefetch_data_hits,   prefetch_metadata_hits);
-       za_submit_counts ("misses", demand_data_misses,   demand_metadata_misses,
-                                   prefetch_data_misses, prefetch_metadata_misses);
+       /* I/O */
+       l2_io[0].derive = get_kstat_value(ksp, "l2_read_bytes");
+       l2_io[1].derive = get_kstat_value(ksp, "l2_write_bytes");
 
-       za_submit_gauge ("arc_ratio", "L1", hits / (hits + misses));
-       za_submit_gauge ("arc_ratio", "L2", l2_hits / (l2_hits + l2_misses));
-
-       za_submit_bytes (l2_read_bytes, l2_write_bytes);
+       za_submit ("io_octets", "L2", l2_io, /* num values = */ 2);
 
        return (0);
-}
+} /* int za_read */
 
 static int za_init (void) /* {{{ */
 {