Antony Dovgal <tony at daylessday.org>
- memcached plugin.
+Aurélien Reynaud <collectd at wattapower.net>
+ - LPAR plugin.
+ - Various fixes for AIX, HP-UX and Solaris.
+
Bruno Prémont <bonbons at linux-vserver.org>
- BIND plugin.
- Many bugreports and -fixes in various plugins,
Scott Garrett <sgarrett at technomancer.com>
- tape plugin.
+Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ - AMQP plugin.
+
Simon Kuhnle <simon at blarzwurst.de>
- OpenBSD code for the cpu and memory plugins.
- load
System load average over the last 1, 5 and 15 minutes.
+ - lpar
+ Detailed CPU statistics of the “Logical Partitions” virtualization
+ technique built into IBM's POWER processors.
+
- libvirt
CPU, disk and network I/O statistics from virtual machines.
* Output can be written or sent to various destinations by the following
plugins:
+ - amqp
+ Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
+ server, such as RabbitMQ.
+
- csv
Write to comma separated values (CSV) files. This needs lots of
diskspace but is extremely portable and can be analysed with almost
Used to capture packets by the `dns' plugin.
<http://www.tcpdump.org/>
+ * libperfstat (optional)
+ Used by various plugins to gather statistics under AIX.
+
* libperl (optional)
Obviously used by the `perl' plugin. The library has to be compiled with
ithread support (introduced in Perl 5.6.0).
Used by the `python' plugin. Currently, only 2.3 ≦ Python < 3 is supported.
<http://www.python.org/>
+ * librabbitmq (optional; also called “rabbitmq-c”)
+ Used by the AMQP plugin for AMQP connections, for example to RabbitMQ.
+ <http://hg.rabbitmq.com/rabbitmq-c/>
+
* librouteros (optional)
Used by the `routeros' plugin to connect to a device running `RouterOS'.
<http://verplant.org/librouteros/>
then
AC_DEFINE(_POSIX_PTHREAD_SEMANTICS, 1, [Define to enforce POSIX thread semantics under Solaris.])
fi
+if test "x$ac_system" = "xAIX"
+then
+ AC_DEFINE(_THREAD_SAFE_ERRNO, 1, [Define to use the thread-safe version of errno under AIX.])
+fi
# Where to install .pc files.
pkgconfigdir="${libdir}/pkgconfig"
if test "x$with_perfstat" = "xyes"
then
AC_DEFINE(HAVE_PERFSTAT, 1, [Define to 1 if you have the 'perfstat' library (-lperfstat)])
+ # struct members pertaining to donation have been added to libperfstat somewhere between AIX5.3ML5 and AIX5.3ML9
+ AC_CHECK_MEMBER([perfstat_partition_type_t.b.donate_enabled], [], [], [[#include <libperfstat.h]])
+ if test "x$av_cv_member_perfstat_partition_type_t_b_donate_enabled" = "xyes"
+ then
+ AC_DEFINE(PERFSTAT_SUPPORTS_DONATION, 1, [Define to 1 if your version of the 'perfstat' library supports donation])
+ fi
fi
AM_CONDITIONAL(BUILD_WITH_PERFSTAT, test "x$with_perfstat" = "xyes")
fi
# }}} --with-python
+# --with-librabbitmq {{{
+with_librabbitmq_cppflags=""
+with_librabbitmq_ldflags=""
+AC_ARG_WITH(librabbitmq, [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
+[
+ if test "x$withval" != "xno" && test "x$withval" != "xyes"
+ then
+ with_librabbitmq_cppflags="-I$withval/include"
+ with_librabbitmq_ldflags="-L$withval/lib"
+ with_librabbitmq="yes"
+ else
+ with_librabbitmq="$withval"
+ fi
+],
+[
+ with_librabbitmq="yes"
+])
+if test "x$with_librabbitmq" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+
+ AC_CHECK_HEADERS(amqp.h, [with_librabbitmq="yes"], [with_librabbitmq="no (amqp.h not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_librabbitmq_cppflags"
+ LDFLAGS="$LDFLAGS $with_librabbitmq_ldflags"
+
+ AC_CHECK_LIB(rabbitmq, amqp_basic_publish, [with_librabbitmq="yes"], [with_librabbitmq="no (Symbol 'amqp_basic_publish' not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_librabbitmq" = "xyes"
+then
+ BUILD_WITH_LIBRABBITMQ_CPPFLAGS="$with_librabbitmq_cppflags"
+ BUILD_WITH_LIBRABBITMQ_LDFLAGS="$with_librabbitmq_ldflags"
+ BUILD_WITH_LIBRABBITMQ_LIBS="-lrabbitmq"
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+ AC_SUBST(BUILD_WITH_LIBRABBITMQ_LIBS)
+ AC_DEFINE(HAVE_LIBRABBITMQ, 1, [Define if librabbitmq is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes")
+# }}}
+
# --with-librouteros {{{
AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])],
[
m4_divert_once([HELP_ENABLE], [])
+AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin])
AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics])
AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC])
AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple's hardware sensors])
AC_PLUGIN([libvirt], [$plugin_libvirt], [Virtual machine statistics])
AC_PLUGIN([load], [$plugin_load], [System load])
AC_PLUGIN([logfile], [yes], [File logging plugin])
+AC_PLUGIN([lpar], [$with_perfstat], [AIX logical partitions statistics])
AC_PLUGIN([madwifi], [$have_linux_wireless_h], [Madwifi wireless statistics])
AC_PLUGIN([match_empty_counter], [yes], [The empty counter match])
AC_PLUGIN([match_hashed], [yes], [The hashed match])
libperl . . . . . . . $with_libperl
libpq . . . . . . . . $with_libpq
libpthread . . . . . $with_libpthread
+ librabbitmq . . . . . $with_librabbitmq
librouteros . . . . . $with_librouteros
librrd . . . . . . . $with_librrd
libsensors . . . . . $with_libsensors
perl . . . . . . . . $with_perl_bindings
Modules:
+ amqp . . . . . . . $enable_amqp
apache . . . . . . . $enable_apache
apcups . . . . . . . $enable_apcups
apple_sensors . . . . $enable_apple_sensors
libvirt . . . . . . . $enable_libvirt
load . . . . . . . . $enable_load
logfile . . . . . . . $enable_logfile
+ lpar... . . . . . . . $enable_lpar
madwifi . . . . . . . $enable_madwifi
match_empty_counter . $enable_match_empty_counter
match_hashed . . . . $enable_match_hashed
<Type ps_cputime>
Module PsCputime
</Type>
+<Type ps_disk_octets>
+ Module GenericIO
+ DataSources read write
+ DSName "read Read "
+ DSName write Written
+ RRDTitle "Process disk traffic ({instance})"
+ RRDVerticalLabel "Bytes per second"
+# RRDOptions ...
+ RRDFormat "%5.1lf%s"
+</Type>
<Type ps_rss>
DataSources value
DSName value RSS
for (my $i = 0; $i < @files; $i++)
{
my $file = $files[$i];
- my $key = $file->{'plugin_instance'} || '';
+ my $key1 = $file->{'hostname'} || '';
+ my $key2 = $file->{'plugin_instance'} || '';
+ my $key = "$key1-$key2";
$data->{$key} ||= [];
push (@{$data->{$key}}, $file);
BUILT_SOURCES =
CLEANFILES =
+if BUILD_PLUGIN_AMQP
+pkglib_LTLIBRARIES += amqp.la
+amqp_la_SOURCES = amqp.c \
+ utils_cmd_putval.c utils_cmd_putval.h \
+ utils_format_json.c utils_format_json.h
+amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
+amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
+amqp_la_LIBADD = $(BUILD_WITH_LIBRABBITMQ_LIBS)
+collectd_LDADD += "-dlopen" amqp.la
+collectd_DEPENDENCIES += amqp.la
+endif
+
if BUILD_PLUGIN_APACHE
pkglib_LTLIBRARIES += apache.la
apache_la_SOURCES = apache.c
collectd_DEPENDENCIES += logfile.la
endif
+if BUILD_PLUGIN_LPAR
+pkglib_LTLIBRARIES += lpar.la
+lpar_la_SOURCES = lpar.c
+lpar_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" lpar.la
+collectd_DEPENDENCIES += lpar.la
+lpar_la_LIBADD = -lperfstat
+endif
+
if BUILD_PLUGIN_MADWIFI
pkglib_LTLIBRARIES += madwifi.la
madwifi_la_SOURCES = madwifi.c madwifi.h
--- /dev/null
+/**
+ * collectd - src/amqp.c
+ * Copyright (C) 2009 Sebastien Pahl
+ * Copyright (C) 2010 Florian Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Sebastien Pahl <sebastien.pahl at dotcloud.com>
+ * Florian Forster <octo at verplant.org>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_format_json.h"
+
+#include <pthread.h>
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+/* Defines for the delivery mode. I have no idea why they're not defined by the
+ * library.. */
+#define CAMQP_DM_VOLATILE 1
+#define CAMQP_DM_PERSISTENT 2
+
+#define CAMQP_FORMAT_COMMAND 1
+#define CAMQP_FORMAT_JSON 2
+
+#define CAMQP_CHANNEL 1
+
+/*
+ * Data types
+ */
+struct camqp_config_s
+{
+ _Bool publish;
+ char *name;
+
+ char *host;
+ int port;
+ char *vhost;
+ char *user;
+ char *password;
+
+ char *exchange;
+ char *routing_key;
+
+ /* publish only */
+ uint8_t delivery_mode;
+ _Bool store_rates;
+ int format;
+
+ /* subscribe only */
+ char *exchange_type;
+ char *queue;
+
+ amqp_connection_state_t connection;
+ pthread_mutex_t lock;
+};
+typedef struct camqp_config_s camqp_config_t;
+
+/*
+ * Global variables
+ */
+static const char *def_host = "localhost";
+static const char *def_vhost = "/";
+static const char *def_user = "guest";
+static const char *def_password = "guest";
+static const char *def_exchange = "amq.fanout";
+
+static pthread_t *subscriber_threads = NULL;
+static size_t subscriber_threads_num = 0;
+static _Bool subscriber_threads_running = 1;
+
+#define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
+
+/*
+ * Functions
+ */
+static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
+{
+ int sockfd;
+
+ if ((conf == NULL) || (conf->connection == NULL))
+ return;
+
+ sockfd = amqp_get_sockfd (conf->connection);
+ amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
+ close (sockfd);
+ conf->connection = NULL;
+} /* }}} void camqp_close_connection */
+
+static void camqp_config_free (void *ptr) /* {{{ */
+{
+ camqp_config_t *conf = ptr;
+
+ if (conf == NULL)
+ return;
+
+ camqp_close_connection (conf);
+
+ sfree (conf->name);
+ sfree (conf->host);
+ sfree (conf->vhost);
+ sfree (conf->user);
+ sfree (conf->password);
+ sfree (conf->exchange);
+ sfree (conf->exchange_type);
+ sfree (conf->queue);
+ sfree (conf->routing_key);
+
+ sfree (conf);
+} /* }}} void camqp_config_free */
+
+static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
+{
+ char *ret;
+
+ if ((in == NULL) || (in->bytes == NULL))
+ return (NULL);
+
+ ret = malloc (in->len + 1);
+ if (ret == NULL)
+ return (NULL);
+
+ memcpy (ret, in->bytes, in->len);
+ ret[in->len] = 0;
+
+ return (ret);
+} /* }}} char *camqp_bytes_cstring */
+
+static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
+{
+ amqp_rpc_reply_t r;
+
+ r = amqp_get_rpc_reply (conf->connection);
+ if (r.reply_type == AMQP_RESPONSE_NORMAL)
+ return (0);
+
+ return (1);
+} /* }}} _Bool camqp_is_error */
+
+static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
+ char *buffer, size_t buffer_size)
+{
+ amqp_rpc_reply_t r;
+
+ r = amqp_get_rpc_reply (conf->connection);
+ switch (r.reply_type)
+ {
+ case AMQP_RESPONSE_NORMAL:
+ sstrncpy (buffer, "Success", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ if (r.library_errno)
+ return (sstrerror (r.library_errno, buffer, buffer_size));
+ else
+ sstrncpy (buffer, "End of stream", sizeof (buffer));
+ break;
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
+ {
+ amqp_connection_close_t *m = r.reply.decoded;
+ char *tmp = camqp_bytes_cstring (&m->reply_text);
+ ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
+ m->reply_code, tmp);
+ sfree (tmp);
+ }
+ else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
+ {
+ amqp_channel_close_t *m = r.reply.decoded;
+ char *tmp = camqp_bytes_cstring (&m->reply_text);
+ ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
+ m->reply_code, tmp);
+ sfree (tmp);
+ }
+ else
+ {
+ ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
+ r.reply.id);
+ }
+ break;
+
+ default:
+ ssnprintf (buffer, buffer_size, "Unknown reply type %i",
+ (int) r.reply_type);
+ }
+
+ return (buffer);
+} /* }}} char *camqp_strerror */
+
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+ amqp_exchange_declare_ok_t *ed_ret;
+
+ if (conf->exchange_type == NULL)
+ return (0);
+
+ ed_ret = amqp_exchange_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* type = */ amqp_cstring_bytes (conf->exchange_type),
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* auto_delete = */ 1,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if ((ed_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Successfully created exchange \"%s\" "
+ "with type \"%s\".",
+ conf->exchange, conf->exchange_type);
+
+ return (0);
+} /* }}} int camqp_create_exchange */
+
+static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
+{
+ amqp_queue_declare_ok_t *qd_ret;
+ amqp_basic_consume_ok_t *cm_ret;
+
+ qd_ret = amqp_queue_declare (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ (conf->queue != NULL)
+ ? amqp_cstring_bytes (conf->queue)
+ : AMQP_EMPTY_BYTES,
+ /* passive = */ 0,
+ /* durable = */ 0,
+ /* exclusive = */ 0,
+ /* auto_delete = */ 1,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if (qd_ret == NULL)
+ {
+ ERROR ("amqp plugin: amqp_queue_declare failed.");
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ if (conf->queue == NULL)
+ {
+ conf->queue = camqp_bytes_cstring (&qd_ret->queue);
+ if (conf->queue == NULL)
+ {
+ ERROR ("amqp plugin: camqp_bytes_cstring failed.");
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
+ }
+ DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
+
+ /* bind to an exchange */
+ if (conf->exchange != NULL)
+ {
+ amqp_queue_bind_ok_t *qb_ret;
+
+ assert (conf->queue != NULL);
+ qb_ret = amqp_queue_bind (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ amqp_cstring_bytes (conf->queue),
+ /* exchange = */ amqp_cstring_bytes (conf->exchange),
+ /* routing_key = */ (conf->routing_key != NULL)
+ ? amqp_cstring_bytes (conf->routing_key)
+ : AMQP_EMPTY_BYTES,
+ /* arguments = */ AMQP_EMPTY_TABLE);
+ if ((qb_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_queue_bind failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
+ conf->queue, conf->exchange);
+ } /* if (conf->exchange != NULL) */
+
+ cm_ret = amqp_basic_consume (conf->connection,
+ /* channel = */ CAMQP_CHANNEL,
+ /* queue = */ amqp_cstring_bytes (conf->queue),
+ /* consumer_tag = */ AMQP_EMPTY_BYTES,
+ /* no_local = */ 0,
+ /* no_ack = */ 1,
+ /* exclusive = */ 0);
+ if ((cm_ret == NULL) && camqp_is_error (conf))
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: amqp_basic_consume failed: %s",
+ camqp_strerror (conf, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (-1);
+ }
+
+ return (0);
+} /* }}} int camqp_setup_queue */
+
+static int camqp_connect (camqp_config_t *conf) /* {{{ */
+{
+ amqp_rpc_reply_t reply;
+ int sockfd;
+ int status;
+
+ if (conf->connection != NULL)
+ return (0);
+
+ conf->connection = amqp_new_connection ();
+ if (conf->connection == NULL)
+ {
+ ERROR ("amqp plugin: amqp_new_connection failed.");
+ return (ENOMEM);
+ }
+
+ sockfd = amqp_open_socket (CONF(conf, host), conf->port);
+ if (sockfd < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * sockfd;
+ ERROR ("amqp plugin: amqp_open_socket failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ amqp_destroy_connection (conf->connection);
+ conf->connection = NULL;
+ return (status);
+ }
+ amqp_set_sockfd (conf->connection, sockfd);
+
+ reply = amqp_login (conf->connection, CONF(conf, vhost),
+ /* channel max = */ 0,
+ /* frame max = */ 131072,
+ /* heartbeat = */ 0,
+ /* authentication = */ AMQP_SASL_METHOD_PLAIN,
+ CONF(conf, user), CONF(conf, password));
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
+ CONF(conf, vhost), CONF(conf, user));
+ amqp_destroy_connection (conf->connection);
+ close (sockfd);
+ conf->connection = NULL;
+ return (1);
+ }
+
+ amqp_channel_open (conf->connection, /* channel = */ 1);
+ /* FIXME: Is checking "reply.reply_type" really correct here? How does
+ * it get set? --octo */
+ if (reply.reply_type != AMQP_RESPONSE_NORMAL)
+ {
+ ERROR ("amqp plugin: amqp_channel_open failed.");
+ amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection (conf->connection);
+ close(sockfd);
+ conf->connection = NULL;
+ return (1);
+ }
+
+ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
+ "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
+
+ status = camqp_create_exchange (conf);
+ if (status != 0)
+ return (status);
+
+ if (!conf->publish)
+ return (camqp_setup_queue (conf));
+ return (0);
+} /* }}} int camqp_connect */
+
+static int camqp_shutdown (void) /* {{{ */
+{
+ size_t i;
+
+ DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
+ subscriber_threads_num);
+
+ subscriber_threads_running = 0;
+ for (i = 0; i < subscriber_threads_num; i++)
+ {
+ /* FIXME: Sending a signal is not very elegant here. Maybe find out how
+ * to use a timeout in the thread and check for the variable in regular
+ * intervals. */
+ pthread_kill (subscriber_threads[i], SIGTERM);
+ pthread_join (subscriber_threads[i], /* retval = */ NULL);
+ }
+
+ subscriber_threads_num = 0;
+ sfree (subscriber_threads);
+
+ DEBUG ("amqp plugin: All subscriber threads exited.");
+
+ return (0);
+} /* }}} int camqp_shutdown */
+
+/*
+ * Subscribing code
+ */
+static int camqp_read_body (camqp_config_t *conf, /* {{{ */
+ size_t body_size, const char *content_type)
+{
+ char body[body_size + 1];
+ char *body_ptr;
+ size_t received;
+ amqp_frame_t frame;
+ int status;
+
+ memset (body, 0, sizeof (body));
+ body_ptr = &body[0];
+ received = 0;
+
+ while (received < body_size)
+ {
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * status;
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (status);
+ }
+
+ if (frame.frame_type != AMQP_FRAME_BODY)
+ {
+ NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ return (-1);
+ }
+
+ if ((body_size - received) < frame.payload.body_fragment.len)
+ {
+ WARNING ("amqp plugin: Body is larger than indicated by header.");
+ return (-1);
+ }
+
+ memcpy (body_ptr, frame.payload.body_fragment.bytes,
+ frame.payload.body_fragment.len);
+ body_ptr += frame.payload.body_fragment.len;
+ received += frame.payload.body_fragment.len;
+ } /* while (received < body_size) */
+
+ if (strcasecmp ("text/collectd", content_type) == 0)
+ {
+ status = handle_putval (stderr, body);
+ if (status != 0)
+ ERROR ("amqp plugin: handle_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ else if (strcasecmp ("application/json", content_type) == 0)
+ {
+ ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
+ "been implemented yet. FIXME!");
+ return (0);
+ }
+ else
+ {
+ ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+ content_type);
+ return (EINVAL);
+ }
+
+ /* not reached */
+ return (0);
+} /* }}} int camqp_read_body */
+
+static int camqp_read_header (camqp_config_t *conf) /* {{{ */
+{
+ int status;
+ amqp_frame_t frame;
+ amqp_basic_properties_t *properties;
+ char *content_type;
+
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ char errbuf[1024];
+ status = (-1) * status;
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_close_connection (conf);
+ return (status);
+ }
+
+ if (frame.frame_type != AMQP_FRAME_HEADER)
+ {
+ NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ return (-1);
+ }
+
+ properties = frame.payload.properties.decoded;
+ content_type = camqp_bytes_cstring (&properties->content_type);
+ if (content_type == NULL)
+ {
+ ERROR ("amqp plugin: Unable to determine content type.");
+ return (-1);
+ }
+
+ status = camqp_read_body (conf,
+ (size_t) frame.payload.properties.body_size,
+ content_type);
+
+ sfree (content_type);
+ return (status);
+} /* }}} int camqp_read_header */
+
+static void *camqp_subscribe_thread (void *user_data) /* {{{ */
+{
+ camqp_config_t *conf = user_data;
+ int status;
+
+ while (subscriber_threads_running)
+ {
+ amqp_frame_t frame;
+
+ status = camqp_connect (conf);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: camqp_connect failed. "
+ "Will sleep for %i seconds.", interval_g);
+ sleep (interval_g);
+ continue;
+ }
+
+ status = amqp_simple_wait_frame (conf->connection, &frame);
+ if (status < 0)
+ {
+ ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
+ "Will sleep for %i seconds.", interval_g);
+ camqp_close_connection (conf);
+ sleep (interval_g);
+ continue;
+ }
+
+ if (frame.frame_type != AMQP_FRAME_METHOD)
+ {
+ DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
+ frame.frame_type);
+ continue;
+ }
+
+ if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
+ {
+ DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
+ frame.payload.method.id);
+ continue;
+ }
+
+ status = camqp_read_header (conf);
+
+ amqp_maybe_release_buffers (conf->connection);
+ } /* while (subscriber_threads_running) */
+
+ camqp_config_free (conf);
+ pthread_exit (NULL);
+} /* }}} void *camqp_subscribe_thread */
+
+static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
+{
+ int status;
+ pthread_t *tmp;
+
+ tmp = realloc (subscriber_threads,
+ sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
+ if (tmp == NULL)
+ {
+ ERROR ("amqp plugin: realloc failed.");
+ camqp_config_free (conf);
+ return (ENOMEM);
+ }
+ subscriber_threads = tmp;
+ tmp = subscriber_threads + subscriber_threads_num;
+ memset (tmp, 0, sizeof (*tmp));
+
+ status = pthread_create (tmp, /* attr = */ NULL,
+ camqp_subscribe_thread, conf);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("amqp plugin: pthread_create failed: %s",
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ camqp_config_free (conf);
+ return (status);
+ }
+
+ subscriber_threads_num++;
+
+ return (0);
+} /* }}} int camqp_subscribe_init */
+
+/*
+ * Publishing code
+ */
+/* XXX: You must hold "conf->lock" when calling this function! */
+static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
+ const char *buffer, const char *routing_key)
+{
+ amqp_basic_properties_t props;
+ int status;
+
+ status = camqp_connect (conf);
+ if (status != 0)
+ return (status);
+
+ memset (&props, 0, sizeof (props));
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG
+ | AMQP_BASIC_APP_ID_FLAG;
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ props.content_type = amqp_cstring_bytes("text/collectd");
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ props.content_type = amqp_cstring_bytes("application/json");
+ else
+ assert (23 == 42);
+ props.delivery_mode = conf->delivery_mode;
+ props.app_id = amqp_cstring_bytes("collectd");
+
+ status = amqp_basic_publish(conf->connection,
+ /* channel = */ 1,
+ amqp_cstring_bytes(CONF(conf, exchange)),
+ amqp_cstring_bytes (routing_key),
+ /* mandatory = */ 0,
+ /* immediate = */ 0,
+ &props,
+ amqp_cstring_bytes(buffer));
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
+ status);
+ camqp_close_connection (conf);
+ }
+
+ return (status);
+} /* }}} int camqp_write_locked */
+
+static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ user_data_t *user_data)
+{
+ camqp_config_t *conf = user_data->data;
+ char routing_key[6 * DATA_MAX_NAME_LEN];
+ char buffer[4096];
+ int status;
+
+ if ((ds == NULL) || (vl == NULL) || (conf == NULL))
+ return (EINVAL);
+
+ memset (buffer, 0, sizeof (buffer));
+
+ if (conf->routing_key != NULL)
+ {
+ sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
+ }
+ else
+ {
+ size_t i;
+ ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
+ vl->host,
+ vl->plugin, vl->plugin_instance,
+ vl->type, vl->type_instance);
+
+ /* Switch slashes (the only character forbidden by collectd) and dots
+ * (the separation character used by AMQP). */
+ for (i = 0; routing_key[i] != 0; i++)
+ {
+ if (routing_key[i] == '.')
+ routing_key[i] = '/';
+ else if (routing_key[i] == '/')
+ routing_key[i] = '.';
+ }
+ }
+
+ if (conf->format == CAMQP_FORMAT_COMMAND)
+ {
+ status = create_putval (buffer, sizeof (buffer), ds, vl);
+ if (status != 0)
+ {
+ ERROR ("amqp plugin: create_putval failed with status %i.",
+ status);
+ return (status);
+ }
+ }
+ else if (conf->format == CAMQP_FORMAT_JSON)
+ {
+ size_t bfree = sizeof (buffer);
+ size_t bfill = 0;
+
+ format_json_initialize (buffer, &bfill, &bfree);
+ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
+ format_json_finalize (buffer, &bfill, &bfree);
+ }
+ else
+ {
+ ERROR ("amqp plugin: Invalid format (%i).", conf->format);
+ return (-1);
+ }
+
+ pthread_mutex_lock (&conf->lock);
+ status = camqp_write_locked (conf, buffer, routing_key);
+ pthread_mutex_unlock (&conf->lock);
+
+ return (status);
+} /* }}} int camqp_write */
+
+/*
+ * Config handling
+ */
+static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
+ camqp_config_t *conf)
+{
+ char *string;
+ int status;
+
+ string = NULL;
+ status = cf_util_get_string (ci, &string);
+ if (status != 0)
+ return (status);
+
+ assert (string != NULL);
+ if (strcasecmp ("Command", string) == 0)
+ conf->format = CAMQP_FORMAT_COMMAND;
+ else if (strcasecmp ("JSON", string) == 0)
+ conf->format = CAMQP_FORMAT_JSON;
+ else
+ {
+ WARNING ("amqp plugin: Invalid format string: %s",
+ string);
+ }
+
+ free (string);
+
+ return (0);
+} /* }}} int config_set_string */
+
+static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
+ _Bool publish)
+{
+ camqp_config_t *conf;
+ int status;
+ int i;
+
+ conf = malloc (sizeof (*conf));
+ if (conf == NULL)
+ {
+ ERROR ("amqp plugin: malloc failed.");
+ return (ENOMEM);
+ }
+
+ /* Initialize "conf" {{{ */
+ memset (conf, 0, sizeof (*conf));
+ conf->publish = publish;
+ conf->name = NULL;
+ conf->format = CAMQP_FORMAT_COMMAND;
+ conf->host = NULL;
+ conf->port = 5672;
+ conf->vhost = NULL;
+ conf->user = NULL;
+ conf->password = NULL;
+ conf->exchange = NULL;
+ conf->routing_key = NULL;
+ /* publish only */
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ conf->store_rates = 0;
+ /* subscribe only */
+ conf->exchange_type = NULL;
+ conf->queue = NULL;
+ /* general */
+ conf->connection = NULL;
+ pthread_mutex_init (&conf->lock, /* attr = */ NULL);
+ /* }}} */
+
+ status = cf_util_get_string (ci, &conf->name);
+ if (status != 0)
+ {
+ sfree (conf);
+ return (status);
+ }
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Host", child->key) == 0)
+ status = cf_util_get_string (child, &conf->host);
+ else if (strcasecmp ("Port", child->key) == 0)
+ {
+ status = cf_util_get_port_number (child);
+ if (status > 0)
+ {
+ conf->port = status;
+ status = 0;
+ }
+ }
+ else if (strcasecmp ("VHost", child->key) == 0)
+ status = cf_util_get_string (child, &conf->vhost);
+ else if (strcasecmp ("User", child->key) == 0)
+ status = cf_util_get_string (child, &conf->user);
+ else if (strcasecmp ("Password", child->key) == 0)
+ status = cf_util_get_string (child, &conf->password);
+ else if (strcasecmp ("Exchange", child->key) == 0)
+ status = cf_util_get_string (child, &conf->exchange);
+ else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
+ status = cf_util_get_string (child, &conf->exchange_type);
+ else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
+ status = cf_util_get_string (child, &conf->queue);
+ else if (strcasecmp ("RoutingKey", child->key) == 0)
+ status = cf_util_get_string (child, &conf->routing_key);
+ else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
+ {
+ _Bool tmp = 0;
+ status = cf_util_get_boolean (child, &tmp);
+ if (tmp)
+ conf->delivery_mode = CAMQP_DM_PERSISTENT;
+ else
+ conf->delivery_mode = CAMQP_DM_VOLATILE;
+ }
+ else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
+ status = cf_util_get_boolean (child, &conf->store_rates);
+ else if ((strcasecmp ("Format", child->key) == 0) && publish)
+ status = camqp_config_set_format (child, conf);
+ else
+ WARNING ("amqp plugin: Ignoring unknown "
+ "configuration option \"%s\".", child->key);
+
+ if (status != 0)
+ break;
+ } /* for (i = 0; i < ci->children_num; i++) */
+
+ if ((status == 0) && (conf->exchange == NULL))
+ {
+ if (conf->exchange_type != NULL)
+ WARNING ("amqp plugin: The option \"ExchangeType\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+
+ if (!publish && (conf->routing_key != NULL))
+ WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+ "without the \"Exchange\" option. It will be ignored.");
+
+ }
+
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+
+ if (conf->exchange != NULL)
+ {
+ DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
+ conf->exchange);
+ }
+
+ if (publish)
+ {
+ char cbname[128];
+ user_data_t ud = { conf, camqp_config_free };
+
+ ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
+
+ status = plugin_register_write (cbname, camqp_write, &ud);
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+ }
+ else
+ {
+ status = camqp_subscribe_init (conf);
+ if (status != 0)
+ {
+ camqp_config_free (conf);
+ return (status);
+ }
+ }
+
+ return (0);
+} /* }}} int camqp_config_connection */
+
+static int camqp_config (oconfig_item_t *ci) /* {{{ */
+{
+ int i;
+
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp ("Publish", child->key) == 0)
+ camqp_config_connection (child, /* publish = */ 1);
+ else if (strcasecmp ("Subscribe", child->key) == 0)
+ camqp_config_connection (child, /* publish = */ 0);
+ else
+ WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
+ child->key);
+ } /* for (ci->children_num) */
+
+ return (0);
+} /* }}} int camqp_config */
+
+void module_register (void)
+{
+ plugin_register_complex_config ("amqp", camqp_config);
+ plugin_register_shutdown ("amqp", camqp_shutdown);
+} /* void module_register */
+
+/* vim: set sw=4 sts=4 et fdm=marker : */
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
=head1 NAME
collectd-python - Documentation of collectd's C<python plugin>
=item
-This plugin is not compatible with python3. Trying to compile it with python3
-will fail because of the ways string, unicode and bytearray behavior was
-changed.
-
-=item
-
Not all aspects of the collectd API are accessible from python. This includes
-but is not limited to meta-data, filters and data sets.
+but is not limited to filters and data sets.
=back
# to missing dependencies or because they have been deactivated explicitly. #
##############################################################################
+#@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
#@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
#@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
#@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors
# ription of those options is available in the collectd.conf(5) manual page. #
##############################################################################
+#<Plugin "amqp">
+# <Publish "name">
+# Host "localhost"
+# Port "5672"
+# VHost "/"
+# User "guest"
+# Password "guest"
+# Exchange "amq.fanout"
+# RoutingKey "collectd"
+# Persistent false
+# StoreRates false
+# </Publish>
+#</Plugin>
+
#<Plugin apache>
# <Instance "local">
# URL "http://localhost/status?auto"
# SocketFile "@prefix@/var/run/@PACKAGE_NAME@-unixsock"
# SocketGroup "collectd"
# SocketPerms "0660"
+# DeleteSocket false
#</Plugin>
#<Plugin uuid>
F<README> file shipped with the sourcecode and hopefully binary packets as
well.
+=head2 Plugin C<amqp>
+
+The I<AMQMP plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP message broker. Values
+are sent to or received from the broker, which handles routing, queueing and
+possibly filtering or messages.
+
+ <Plugin "amqp">
+ # Send values to an AMQP broker
+ <Publish "some_name">
+ Host "localhost"
+ Port "5672"
+ VHost "/"
+ User "guest"
+ Password "guest"
+ Exchange "amq.fanout"
+ # ExchangeType "fanout"
+ # RoutingKey "collectd"
+ # Persistent false
+ # Format "command"
+ # StoreRates false
+ </Publish>
+
+ # Receive values from an AMQP broker
+ <Subscribe "some_name">
+ Host "localhost"
+ Port "5672"
+ VHost "/"
+ User "guest"
+ Password "guest"
+ Exchange "amq.fanout"
+ # ExchangeType "fanout"
+ # Queue "queue_name"
+ # RoutingKey "collectd.#"
+ </Subscribe>
+ </Plugin>
+
+The plugin's configuration consists of a number of I<Publish> and I<Subscribe>
+blocks, which configure sending and receiving of values respectively. The two
+blocks are very similar, so unless otherwise noted, an option can be used in
+either block. The name given in the blocks starting tag is only used for
+reporting messages, but may be used to support I<flushing> of certain
+I<Publish> blocks in the future.
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP broker. Defaults to the default behavior of
+the underlying communications library, I<rabbitmq-c>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP broker accepts connections. This
+argument must be a string, even if the numeric form is used. Defaults to
+"5672".
+
+=item B<VHost> I<VHost>
+
+Name of the I<virtual host> on the AMQP broker to use. Defaults to "/".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP broker. By default "guest"/"guest"
+is used.
+
+=item B<Exchange> I<Exchange>
+
+In I<Publish> blocks, this option specifies the I<exchange> to send values to.
+By default, "amq.fanout" will be used.
+
+In I<Subscribe> blocks this option is optional. If given, a I<binding> between
+the given exchange and the I<queue> is created, using the I<routing key> if
+configured. See the B<Queue> and B<RoutingKey> options below.
+
+=item B<ExchangeType> I<Type>
+
+If given, the plugin will try to create the configured I<exchange> with this
+I<type> after connecting. When in a I<Subscribe> block, the I<queue> will then
+be bound to this exchange.
+
+=item B<Queue> I<Queue> (Subscribe only)
+
+Configures the I<queue> name to subscribe to. If no queue name was configures
+explicitly, a unique queue name will be created by the broker.
+
+=item B<RoutingKey> I<Key>
+
+In I<Publish> blocks, this configures the routing key to set on all outgoing
+messages. If not given, the routing key will be computed from the I<identifier>
+of the value. The host, plugin, type and the two instances are concatenated
+together using dots as the separator and all containing dots replaced with
+slashes. For example "collectd.host/example/com.cpu.0.cpu.user". This makes it
+possible to receive only specific values using a "topic" exchange.
+
+In I<Subscribe> blocks, configures the I<routing key> used when creating a
+I<binding> between an I<exchange> and the I<queue>. The usual wildcards can be
+used to filter messages when using a "topic" exchange. If you're only
+interested in CPU statistics, you could use the routing key "collectd.*.cpu.#"
+for example.
+
+=item B<Persistent> B<true>|B<false> (Publish only)
+
+Selects the I<delivery method> to use. If set to B<true>, the I<persistent>
+mode will be used, i.e. delivery is guaranteed. If set to B<false> (the
+default), the I<transient> delivery mode will be used, i.e. messages may be
+lost due to high load, overflowing queues or similar issues.
+
+=item B<Format> B<Command>|B<JSON> (Publish only)
+
+Selects the format in which messages are sent to the broker. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values. Currently, the I<AMQP plugin> itself can
+only decode the B<Command> format.
+
+=item B<StoreRates> B<true>|B<false> (Publish only)
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=back
+
=head2 Plugin C<apache>
To configure the C<apache>-plugin you first need to configure the Apache
permissions must be given as a numeric, octal value as you would pass to
L<chmod(1)>. Defaults to B<0770>.
+=item B<DeleteSocket> B<false>|B<true>
+
+If set to B<true>, delete the socket file before calling L<bind(2)>, if a file
+with the given name already exists. If I<collectd> crashes a socket file may be
+left over, preventing the daemon from opening a new socket when restarted.
+Since this is potentially dangerous, this defaults to B<false>.
+
=back
=head2 Plugin C<uuid>
# include <kstat.h>
#endif
-#if HAVE_SENSORS_SENSORS_H
-# include <sensors/sensors.h>
-#endif
-
#ifndef PACKAGE_NAME
#define PACKAGE_NAME "collectd"
#endif
if (db->yajl == NULL)
{
ERROR ("curl_json plugin: yajl_alloc failed.");
+ db->yajl = yprev;
return (-1);
}
--- /dev/null
+/**
+ * collectd - src/lpar.c
+ * Copyright (C) 2010 Aurélien Reynaud
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Authors:
+ * Aurélien Reynaud <collectd at wattapower.net>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include <sys/protosw.h>
+#include <libperfstat.h>
+#include <sys/utsname.h>
+
+/* XINTFRAC was defined in libperfstat.h somewhere between AIX 5.3 and 6.1 */
+#ifndef XINTFRAC
+# include <sys/systemcfg.h>
+# define XINTFRAC ((double)(_system_configuration.Xint) / \
+ (double)(_system_configuration.Xfrac))
+#endif
+
+#define CLOCKTICKS_TO_TICKS(cticks) ((cticks) / XINTFRAC)
+
+static const char *config_keys[] =
+{
+ "CpuPoolStats",
+ "ReportBySerial"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static _Bool pool_stats = 0;
+static _Bool report_by_serial = 0;
+#if PERFSTAT_SUPPORTS_DONATION
+static _Bool donate_flag = 0;
+#endif
+static char serial[SYS_NMLN];
+
+static perfstat_partition_total_t lparstats_old;
+
+static int lpar_config (const char *key, const char *value)
+{
+ if (strcasecmp ("CpuPoolStats", key) == 0)
+ {
+ if (IS_TRUE (value))
+ pool_stats = 1;
+ else
+ pool_stats = 0;
+ }
+ else if (strcasecmp ("ReportBySerial", key) == 0)
+ {
+ if (IS_TRUE (value))
+ report_by_serial = 1;
+ else
+ report_by_serial = 0;
+ }
+ else
+ {
+ return (-1);
+ }
+
+ return (0);
+} /* int lpar_config */
+
+static int lpar_init (void)
+{
+ int status;
+
+ /* Retrieve the initial metrics. Returns the number of structures filled. */
+ status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+ &lparstats_old, sizeof (perfstat_partition_total_t),
+ /* number = */ 1 /* (must be 1) */);
+ if (status != 1)
+ {
+ char errbuf[1024];
+ ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+ sstrerror (errno, errbuf, sizeof (errbuf)),
+ status);
+ return (-1);
+ }
+
+#if PERFSTAT_SUPPORTS_DONATION
+ if (!lparstats_old.type.b.shared_enabled
+ && lparstats_old.type.b.donate_enabled)
+ {
+ donate_flag = 1;
+ }
+#endif
+
+ if (pool_stats && !lparstats_old.type.b.pool_util_authority)
+ {
+ WARNING ("lpar plugin: This partition does not have pool authority. "
+ "Disabling CPU pool statistics collection.");
+ pool_stats = 0;
+ }
+
+ return (0);
+} /* int lpar_init */
+
+static void lpar_submit (const char *type_instance, double value)
+{
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ values[0].gauge = (gauge_t)value;
+
+ vl.values = values;
+ vl.values_len = 1;
+ if (report_by_serial)
+ {
+ sstrncpy (vl.host, serial, sizeof (vl.host));
+ sstrncpy (vl.plugin_instance, hostname_g, sizeof (vl.plugin));
+ }
+ else
+ {
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ }
+ sstrncpy (vl.plugin, "lpar", sizeof (vl.plugin));
+ sstrncpy (vl.type, "vcpu", sizeof (vl.type));
+ sstrncpy (vl.type_instance, type_instance, sizeof (vl.type_instance));
+
+ plugin_dispatch_values (&vl);
+} /* void lpar_submit */
+
+static int lpar_read (void)
+{
+ perfstat_partition_total_t lparstats;
+ int status;
+ struct utsname name;
+ u_longlong_t ticks;
+ u_longlong_t user_ticks, syst_ticks, wait_ticks, idle_ticks;
+ u_longlong_t consumed_ticks;
+ double entitled_proc_capacity;
+
+ /* An LPAR has the same serial number as the physical system it is currently
+ running on. It is a convenient way of tracking LPARs as they are moved
+ from chassis to chassis through Live Partition Mobility (LPM). */
+ if (uname (&name) != 0)
+ {
+ ERROR ("lpar plugin: uname failed.");
+ return (-1);
+ }
+ sstrncpy (serial, name.machine, sizeof (serial));
+
+ /* Retrieve the current metrics. Returns the number of structures filled. */
+ status = perfstat_partition_total (/* name = */ NULL, /* (must be NULL) */
+ &lparstats, sizeof (perfstat_partition_total_t),
+ /* number = */ 1 /* (must be 1) */);
+ if (status != 1)
+ {
+ char errbuf[1024];
+ ERROR ("lpar plugin: perfstat_partition_total failed: %s (%i)",
+ sstrerror (errno, errbuf, sizeof (errbuf)),
+ status);
+ return (-1);
+ }
+
+ /* Number of ticks since we last run. */
+ ticks = lparstats.timebase_last - lparstats_old.timebase_last;
+ if (ticks == 0)
+ {
+ /* The stats have not been updated. Return now to avoid
+ * dividing by zero */
+ return (0);
+ }
+
+ /*
+ * On a shared partition, we're "entitled" to a certain amount of
+ * processing power, for example 250/100 of a physical CPU. Processing
+ * capacity not used by the partition may be assigned to a different
+ * partition by the hypervisor, so "idle" is hopefully a very small
+ * number.
+ *
+ * A dedicated partition may donate its CPUs to another partition and
+ * may steal ticks from somewhere else (another partition or maybe the
+ * shared pool, I don't know --octo).
+ */
+
+ /* entitled_proc_capacity is in 1/100th of a CPU */
+ entitled_proc_capacity = 0.01 * ((double) lparstats.entitled_proc_capacity);
+ lpar_submit ("entitled", entitled_proc_capacity);
+
+ /* The number of ticks actually spent in the various states */
+ user_ticks = lparstats.puser - lparstats_old.puser;
+ syst_ticks = lparstats.psys - lparstats_old.psys;
+ wait_ticks = lparstats.pwait - lparstats_old.pwait;
+ idle_ticks = lparstats.pidle - lparstats_old.pidle;
+ consumed_ticks = user_ticks + syst_ticks + wait_ticks + idle_ticks;
+
+ lpar_submit ("user", (double) user_ticks / (double) ticks);
+ lpar_submit ("system", (double) syst_ticks / (double) ticks);
+ lpar_submit ("wait", (double) wait_ticks / (double) ticks);
+ lpar_submit ("idle", (double) idle_ticks / (double) ticks);
+
+#if PERFSTAT_SUPPORTS_DONATION
+ if (donate_flag)
+ {
+ /* donated => ticks given to another partition
+ * stolen => ticks received from another partition */
+ u_longlong_t idle_donated_ticks, busy_donated_ticks;
+ u_longlong_t idle_stolen_ticks, busy_stolen_ticks;
+
+ /* FYI: PURR == Processor Utilization of Resources Register
+ * SPURR == Scaled PURR */
+ idle_donated_ticks = lparstats.idle_donated_purr - lparstats_old.idle_donated_purr;
+ busy_donated_ticks = lparstats.busy_donated_purr - lparstats_old.busy_donated_purr;
+ idle_stolen_ticks = lparstats.idle_stolen_purr - lparstats_old.idle_stolen_purr;
+ busy_stolen_ticks = lparstats.busy_stolen_purr - lparstats_old.busy_stolen_purr;
+
+ lpar_submit ("idle_donated", (double) idle_donated_ticks / (double) ticks);
+ lpar_submit ("busy_donated", (double) busy_donated_ticks / (double) ticks);
+ lpar_submit ("idle_stolen", (double) idle_stolen_ticks / (double) ticks);
+ lpar_submit ("busy_stolen", (double) busy_stolen_ticks / (double) ticks);
+
+ /* Donated ticks will be accounted for as stolen ticks in other LPARs */
+ consumed_ticks += idle_stolen_ticks + busy_stolen_ticks;
+ }
+#endif
+
+ lpar_submit ("consumed", (double) consumed_ticks / (double) ticks);
+
+ if (pool_stats)
+ {
+ char typinst[DATA_MAX_NAME_LEN];
+ u_longlong_t pool_idle_cticks;
+ double pool_idle_cpus;
+ double pool_busy_cpus;
+
+ /* We're calculating "busy" from "idle" and the total number of
+ * CPUs, because the "busy" member didn't exist in early versions
+ * of libperfstat. It was added somewhere between AIX 5.3 ML5 and ML9. */
+ pool_idle_cticks = lparstats.pool_idle_time - lparstats_old.pool_idle_time;
+ pool_idle_cpus = CLOCKTICKS_TO_TICKS ((double) pool_idle_cticks) / (double) ticks;
+ pool_busy_cpus = ((double) lparstats.phys_cpus_pool) - pool_idle_cpus;
+ if (pool_busy_cpus < 0.0)
+ pool_busy_cpus = 0.0;
+
+ ssnprintf (typinst, sizeof (typinst), "pool-%X-busy", lparstats.pool_id);
+ lpar_submit (typinst, pool_busy_cpus);
+
+ ssnprintf (typinst, sizeof (typinst), "pool-%X-idle", lparstats.pool_id);
+ lpar_submit (typinst, pool_idle_cpus);
+ }
+
+ memcpy (&lparstats_old, &lparstats, sizeof (lparstats_old));
+
+ return (0);
+} /* int lpar_read */
+
+void module_register (void)
+{
+ plugin_register_config ("lpar", lpar_config,
+ config_keys, config_keys_num);
+ plugin_register_init ("lpar", lpar_init);
+ plugin_register_read ("lpar", lpar_read);
+} /* void module_register */
+
+/* vim: set sw=8 noet : */
+
*/
static void mv_free_match (mv_match_t *m) /* {{{ */
{
+ int i;
+
if (m == NULL)
return;
+ if (m->data_sources != NULL)
+ {
+ for (i = 0; i < m->data_sources_num; ++i)
+ free(m->data_sources[i]);
+ free(m->data_sources);
+ }
+
free (m);
} /* }}} void mv_free_match */
const char *plugin_inst,
const char *type, const char *type_inst,
value_t *values, int values_len,
- cdtime_t timestamp)
+ cdtime_t timestamp, cdtime_t interval)
{
value_list_t vl = VALUE_LIST_INIT;
if (timestamp > 0)
vl.time = timestamp;
+ if (interval > 0)
+ vl.interval = interval;
+
if (host != NULL)
sstrncpy (vl.host, host, sizeof (vl.host));
else
static int submit_two_counters (const char *host, const char *plugin_inst, /* {{{ */
const char *type, const char *type_inst, counter_t val0, counter_t val1,
- cdtime_t timestamp)
+ cdtime_t timestamp, cdtime_t interval)
{
value_t values[2];
values[1].counter = val1;
return (submit_values (host, plugin_inst, type, type_inst,
- values, 2, timestamp));
+ values, 2, timestamp, interval));
} /* }}} int submit_two_counters */
static int submit_counter (const char *host, const char *plugin_inst, /* {{{ */
- const char *type, const char *type_inst, counter_t counter, cdtime_t timestamp)
+ const char *type, const char *type_inst, counter_t counter,
+ cdtime_t timestamp, cdtime_t interval)
{
value_t v;
v.counter = counter;
return (submit_values (host, plugin_inst, type, type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_counter */
static int submit_two_gauge (const char *host, const char *plugin_inst, /* {{{ */
const char *type, const char *type_inst, gauge_t val0, gauge_t val1,
- cdtime_t timestamp)
+ cdtime_t timestamp, cdtime_t interval)
{
value_t values[2];
values[1].gauge = val1;
return (submit_values (host, plugin_inst, type, type_inst,
- values, 2, timestamp));
+ values, 2, timestamp, interval));
} /* }}} int submit_two_gauge */
static int submit_double (const char *host, const char *plugin_inst, /* {{{ */
- const char *type, const char *type_inst, double d, cdtime_t timestamp)
+ const char *type, const char *type_inst, double d,
+ cdtime_t timestamp, cdtime_t interval)
{
value_t v;
v.gauge = (gauge_t) d;
return (submit_values (host, plugin_inst, type, type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_uint64 */
/* Calculate hit ratio from old and new counters and submit the resulting
uint64_t new_misses,
uint64_t old_hits,
uint64_t old_misses,
- cdtime_t timestamp)
+ cdtime_t timestamp,
+ cdtime_t interval)
{
value_t v;
}
return (submit_values (host, plugin_inst, "cache_ratio", type_inst,
- &v, 1, timestamp));
+ &v, 1, timestamp, interval));
} /* }}} int submit_cache_ratio */
/* Submits all the caches used by WAFL. Uses "submit_cache_ratio". */
static int submit_wafl_data (const char *hostname, const char *instance, /* {{{ */
- cfg_wafl_t *old_data, const cfg_wafl_t *new_data)
+ cfg_wafl_t *old_data, const cfg_wafl_t *new_data, int interval)
{
/* Submit requested counters */
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_NAME_CACHE | HAVE_WAFL_NAME_CACHE)
submit_cache_ratio (hostname, instance, "name_cache_hit",
new_data->name_cache_hit, new_data->name_cache_miss,
old_data->name_cache_hit, old_data->name_cache_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_DIR_CACHE | HAVE_WAFL_FIND_DIR)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_FIND_DIR))
submit_cache_ratio (hostname, instance, "find_dir_hit",
new_data->find_dir_hit, new_data->find_dir_miss,
old_data->find_dir_hit, old_data->find_dir_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_BUF_CACHE | HAVE_WAFL_BUF_HASH)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_BUF_HASH))
submit_cache_ratio (hostname, instance, "buf_hash_hit",
new_data->buf_hash_hit, new_data->buf_hash_miss,
old_data->buf_hash_hit, old_data->buf_hash_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
if (HAS_ALL_FLAGS (old_data->flags, CFG_WAFL_INODE_CACHE | HAVE_WAFL_INODE_CACHE)
&& HAS_ALL_FLAGS (new_data->flags, HAVE_WAFL_INODE_CACHE))
submit_cache_ratio (hostname, instance, "inode_cache_hit",
new_data->inode_cache_hit, new_data->inode_cache_miss,
old_data->inode_cache_hit, old_data->inode_cache_miss,
- new_data->timestamp);
+ new_data->timestamp, interval);
/* Clear old HAVE_* flags */
old_data->flags &= ~HAVE_WAFL_ALL;
* update flags appropriately. */
static int submit_volume_perf_data (const char *hostname, /* {{{ */
data_volume_perf_t *old_data,
- const data_volume_perf_t *new_data)
+ const data_volume_perf_t *new_data, int interval)
{
char plugin_instance[DATA_MAX_NAME_LEN];
&& HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_BYTES_READ | HAVE_VOLUME_PERF_BYTES_WRITE))
{
submit_two_counters (hostname, plugin_instance, "disk_octets", /* type instance = */ NULL,
- (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp);
+ (counter_t) new_data->read_bytes, (counter_t) new_data->write_bytes, new_data->timestamp, interval);
}
/* Check for and submit disk-operations values */
&& HAS_ALL_FLAGS (new_data->flags, HAVE_VOLUME_PERF_OPS_READ | HAVE_VOLUME_PERF_OPS_WRITE))
{
submit_two_counters (hostname, plugin_instance, "disk_ops", /* type instance = */ NULL,
- (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp);
+ (counter_t) new_data->read_ops, (counter_t) new_data->write_ops, new_data->timestamp, interval);
}
/* Check for, calculate and submit disk-latency values */
}
submit_two_gauge (hostname, plugin_instance, "disk_latency", /* type instance = */ NULL,
- latency_per_op_read, latency_per_op_write, new_data->timestamp);
+ latency_per_op_read, latency_per_op_write, new_data->timestamp, interval);
}
/* Clear all HAVE_* flags. */
*/
/* Data corresponding to <WAFL /> */
static int cna_handle_wafl_data (const char *hostname, cfg_wafl_t *cfg_wafl, /* {{{ */
- na_elem_t *data)
+ na_elem_t *data, int interval)
{
cfg_wafl_t perf_data;
const char *plugin_inst;
}
}
- return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data));
+ return (submit_wafl_data (hostname, plugin_inst, cfg_wafl, &perf_data, interval));
} /* }}} void cna_handle_wafl_data */
static int cna_setup_wafl (cfg_wafl_t *cw) /* {{{ */
return (-1);
}
- status = cna_handle_wafl_data (host->name, host->cfg_wafl, data);
+ status = cna_handle_wafl_data (host->name, host->cfg_wafl, data, host->interval);
if (status == 0)
host->cfg_wafl->interval.last_read = now;
/* Data corresponding to <Disks /> */
static int cna_handle_disk_data (const char *hostname, /* {{{ */
- cfg_disk_t *cfg_disk, na_elem_t *data)
+ cfg_disk_t *cfg_disk, na_elem_t *data, cdtime_t interval)
{
cdtime_t timestamp;
na_elem_t *instances;
if ((cfg_disk->flags & CFG_DISK_BUSIEST) && (worst_disk != NULL))
submit_double (hostname, "system", "percent", "disk_busy",
- worst_disk->disk_busy_percent, timestamp);
+ worst_disk->disk_busy_percent, timestamp, interval);
return (0);
} /* }}} int cna_handle_disk_data */
return (-1);
}
- status = cna_handle_disk_data (host->name, host->cfg_disk, data);
+ status = cna_handle_disk_data (host->name, host->cfg_disk, data, host->interval);
if (status == 0)
host->cfg_disk->interval.last_read = now;
/* Data corresponding to <VolumePerf /> */
static int cna_handle_volume_perf_data (const char *hostname, /* {{{ */
- cfg_volume_perf_t *cvp, na_elem_t *data)
+ cfg_volume_perf_t *cvp, na_elem_t *data, cdtime_t interval)
{
cdtime_t timestamp;
na_elem_t *elem_instances;
}
} /* for (elem_counter) */
- submit_volume_perf_data (hostname, v, &perf_data);
+ submit_volume_perf_data (hostname, v, &perf_data, interval);
} /* for (volume) */
return (0);
return (-1);
}
- status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data);
+ status = cna_handle_volume_perf_data (host->name, host->cfg_volume_perf, data, host->interval);
if (status == 0)
host->cfg_volume_perf->interval.last_read = now;
/* Data corresponding to <VolumeUsage /> */
static int cna_submit_volume_usage_data (const char *hostname, /* {{{ */
- cfg_volume_usage_t *cfg_volume)
+ cfg_volume_usage_t *cfg_volume, int interval)
{
data_volume_usage_t *v;
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_FREE))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "free",
- (double) norm_free, /* timestamp = */ 0);
+ (double) norm_free, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SIS_SAVED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "sis_saved",
- (double) sis_saved, /* timestamp = */ 0);
+ (double) sis_saved, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_NORM_USED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "used",
- (double) norm_used, /* timestamp = */ 0);
+ (double) norm_used, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_RSVD))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_reserved",
- (double) snap_reserve_free, /* timestamp = */ 0);
+ (double) snap_reserve_free, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED | HAVE_VOLUME_USAGE_SNAP_RSVD))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_reserve_used",
- (double) snap_reserve_used, /* timestamp = */ 0);
+ (double) snap_reserve_used, /* timestamp = */ 0, interval);
if (HAS_ALL_FLAGS (v->flags, HAVE_VOLUME_USAGE_SNAP_USED))
submit_double (hostname, /* plugin instance = */ plugin_instance,
"df_complex", "snap_normal_used",
- (double) snap_norm_used, /* timestamp = */ 0);
+ (double) snap_norm_used, /* timestamp = */ 0, interval);
/* Clear all the HAVE_* flags */
v->flags &= ~HAVE_VOLUME_USAGE_ALL;
} /* }}} end of 32-bit workaround */
} /* for (elem_volume) */
- return (cna_submit_volume_usage_data (host->name, cfg_volume));
+ return (cna_submit_volume_usage_data (host->name, cfg_volume, host->interval));
} /* }}} int cna_handle_volume_usage_data */
static int cna_setup_volume_usage (cfg_volume_usage_t *cvu) /* {{{ */
/* Data corresponding to <System /> */
static int cna_handle_system_data (const char *hostname, /* {{{ */
- cfg_system_t *cfg_system, na_elem_t *data)
+ cfg_system_t *cfg_system, na_elem_t *data, int interval)
{
na_elem_t *instances;
na_elem_t *counter;
&& (value > 0) && (strlen(name) > 4)
&& (!strcmp(name + strlen(name) - 4, "_ops"))) {
submit_counter (hostname, instance, "disk_ops_complex", name,
- (counter_t) value, timestamp);
+ (counter_t) value, timestamp, interval);
}
} /* for (counter) */
if ((cfg_system->flags & CFG_SYSTEM_DISK)
&& (HAS_ALL_FLAGS (counter_flags, 0x01 | 0x02)))
submit_two_counters (hostname, instance, "disk_octets", NULL,
- disk_read, disk_written, timestamp);
+ disk_read, disk_written, timestamp, interval);
if ((cfg_system->flags & CFG_SYSTEM_NET)
&& (HAS_ALL_FLAGS (counter_flags, 0x04 | 0x08)))
submit_two_counters (hostname, instance, "if_octets", NULL,
- net_recv, net_sent, timestamp);
+ net_recv, net_sent, timestamp, interval);
if ((cfg_system->flags & CFG_SYSTEM_CPU)
&& (HAS_ALL_FLAGS (counter_flags, 0x10 | 0x20)))
{
submit_counter (hostname, instance, "cpu", "system",
- cpu_busy, timestamp);
+ cpu_busy, timestamp, interval);
submit_counter (hostname, instance, "cpu", "idle",
- cpu_total - cpu_busy, timestamp);
+ cpu_total - cpu_busy, timestamp, interval);
}
return (0);
return (-1);
}
- status = cna_handle_system_data (host->name, host->cfg_system, data);
+ status = cna_handle_system_data (host->name, host->cfg_system, data, host->interval);
if (status == 0)
host->cfg_system->interval.last_read = now;
#include "utils_fbhash.h"
#include "utils_avltree.h"
#include "utils_cache.h"
+#include "utils_complain.h"
#include "network.h"
static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */
void **ret_buffer, size_t *ret_buffer_len, int flags)
{
+ static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC;
+
char *buffer;
size_t buffer_len;
size_t buffer_offset;
if (se->data.server.userdb == NULL)
{
- NOTICE ("network plugin: Received signed network packet but can't verify "
- "it because no user DB has been configured. Will accept it.");
+ c_complain (LOG_NOTICE, &complain_no_users,
+ "network plugin: Received signed network packet but can't verify it "
+ "because no user DB has been configured. Will accept it.");
return (0);
}
/**
* collectd - src/notify_email.c
* Copyright (C) 2008 Oleg King
+ * Copyright (C) 2010 Florian Forster
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
*
* Authors:
* Oleg King <king2 at kaluga.ru>
+ * Florian Forster <octo at collectd.org>
**/
#include "collectd.h"
#include <auth-client.h>
#include <libesmtp.h>
+#include <pthread.h>
#define MAXSTRING 256
static int recipients_len = 0;
static smtp_session_t session;
+static pthread_mutex_t session_lock = PTHREAD_MUTEX_INITIALIZER;
static smtp_message_t message;
static auth_context_t authctx = NULL;
{
char server[MAXSTRING];
+ ssnprintf(server, sizeof (server), "%s:%i",
+ (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
+ smtp_port);
+
+ pthread_mutex_lock (&session_lock);
+
auth_client_init();
- if (!(session = smtp_create_session ())) {
+
+ session = smtp_create_session ();
+ if (session == NULL) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot create SMTP session");
return (-1);
}
smtp_set_monitorcb (session, monitor_cb, NULL, 1);
smtp_set_hostname (session, hostname_g);
- ssnprintf(server, sizeof (server), "%s:%i",
- (smtp_host == NULL) ? DEFAULT_SMTP_HOST : smtp_host,
- smtp_port);
smtp_set_server (session, server);
if (smtp_user && smtp_password) {
}
if ( !smtp_auth_set_context (session, authctx)) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot set SMTP auth context");
return (-1);
}
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_init */
static int notify_email_shutdown (void)
{
- smtp_destroy_session (session);
- auth_destroy_context (authctx);
+ pthread_mutex_lock (&session_lock);
+
+ if (session != NULL)
+ smtp_destroy_session (session);
+ session = NULL;
+
+ if (authctx != NULL)
+ auth_destroy_context (authctx);
+ authctx = NULL;
+
auth_client_exit();
+
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_shutdown */
n->host,
n->message);
+ pthread_mutex_lock (&session_lock);
+
+ if (session == NULL) {
+ /* Initialization failed or we're in the process of shutting down. */
+ pthread_mutex_unlock (&session_lock);
+ return (-1);
+ }
+
if (!(message = smtp_add_message (session))) {
+ pthread_mutex_unlock (&session_lock);
ERROR ("notify_email plugin: cannot set SMTP message");
return (-1);
}
char buf[MAXSTRING];
ERROR ("notify_email plugin: SMTP server problem: %s",
smtp_strerror (smtp_errno (), buf, sizeof buf));
+ pthread_mutex_unlock (&session_lock);
return (-1);
} else {
const smtp_status_t *status;
smtp_enumerate_recipients (message, print_recipient_status, NULL);
}
+ pthread_mutex_unlock (&session_lock);
return (0);
} /* int notify_email_notification */
static int cpy_config(oconfig_item_t *ci) {
int i;
+ char *argv = "";
PyObject *sys, *tb;
PyObject *sys_path;
PyObject *module;
cpy_log_exception("python initialization");
return 1;
}
+ PySys_SetArgv(1, &argv);
+ PyList_SetSlice(sys_path, 0, 1, NULL);
+
#ifdef IS_PY3K
module = PyImport_ImportModule("collectd");
#else
return (FC_TARGET_STOP);
} /* }}} int v5_mysql_threads */
+/*
+ * ZFS ARC hit and miss counters
+ *
+ * 4.* uses the flawed "arc_counts" type. In 5.* this has been replaced by the
+ * more generic "cache_result" type.
+ */
+static int v5_zfs_arc_counts (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+ value_list_t new_vl;
+ value_t new_value;
+ _Bool is_hits;
+
+ if (vl->values_len != 4)
+ return (FC_TARGET_STOP);
+
+ if (strcmp ("hits", vl->type_instance) == 0)
+ is_hits = 1;
+ else if (strcmp ("misses", vl->type_instance) == 0)
+ is_hits = 0;
+ else
+ return (FC_TARGET_STOP);
+
+ /* Copy everything: Time, interval, host, ... */
+ memcpy (&new_vl, vl, sizeof (new_vl));
+
+ /* Reset data we can't simply copy */
+ new_vl.values = &new_value;
+ new_vl.values_len = 1;
+ new_vl.meta = NULL;
+
+ /* Change the type to "cache_result" */
+ sstrncpy (new_vl.type, "cache_result", sizeof (new_vl.type));
+
+ /* Dispatch new value lists instead of this one */
+ new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+ ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+ "demand_data-%s",
+ is_hits ? "hit" : "miss");
+ plugin_dispatch_values (&new_vl);
+
+ new_vl.values[0].derive = (derive_t) vl->values[1].counter;
+ ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+ "demand_metadata-%s",
+ is_hits ? "hit" : "miss");
+ plugin_dispatch_values (&new_vl);
+
+ new_vl.values[0].derive = (derive_t) vl->values[2].counter;
+ ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+ "prefetch_data-%s",
+ is_hits ? "hit" : "miss");
+ plugin_dispatch_values (&new_vl);
+
+ new_vl.values[0].derive = (derive_t) vl->values[3].counter;
+ ssnprintf (new_vl.type_instance, sizeof (new_vl.type_instance),
+ "prefetch_metadata-%s",
+ is_hits ? "hit" : "miss");
+ plugin_dispatch_values (&new_vl);
+
+ /* Abort processing */
+ return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_counts */
+
+/*
+ * ZFS ARC L2 bytes
+ *
+ * "arc_l2_bytes" -> "io_octets-L2".
+ */
+static int v5_zfs_arc_l2_bytes (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+ value_list_t new_vl;
+ value_t new_values[2];
+
+ if (vl->values_len != 2)
+ return (FC_TARGET_STOP);
+
+ /* Copy everything: Time, interval, host, ... */
+ memcpy (&new_vl, vl, sizeof (new_vl));
+
+ /* Reset data we can't simply copy */
+ new_vl.values = new_values;
+ new_vl.values_len = 2;
+ new_vl.meta = NULL;
+
+ /* Change the type/-instance to "io_octets-L2" */
+ sstrncpy (new_vl.type, "io_octets", sizeof (new_vl.type));
+ sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance));
+
+ /* Copy the actual values. */
+ new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+ new_vl.values[1].derive = (derive_t) vl->values[1].counter;
+
+ /* Dispatch new value lists instead of this one */
+ plugin_dispatch_values (&new_vl);
+
+ /* Abort processing */
+ return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_l2_bytes */
+
+/*
+ * ZFS ARC L2 cache size
+ *
+ * 4.* uses a separate type for this. 5.* uses the generic "cache_size" type
+ * instead.
+ */
+static int v5_zfs_arc_l2_size (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+ value_list_t new_vl;
+ value_t new_value;
+
+ if (vl->values_len != 1)
+ return (FC_TARGET_STOP);
+
+ /* Copy everything: Time, interval, host, ... */
+ memcpy (&new_vl, vl, sizeof (new_vl));
+
+ /* Reset data we can't simply copy */
+ new_vl.values = &new_value;
+ new_vl.values_len = 1;
+ new_vl.meta = NULL;
+
+ new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge;
+
+ /* Change the type to "cache_size" */
+ sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type));
+
+ /* Adapt the type instance */
+ sstrncpy (new_vl.type_instance, "L2", sizeof (new_vl.type_instance));
+
+ /* Dispatch new value lists instead of this one */
+ plugin_dispatch_values (&new_vl);
+
+ /* Abort processing */
+ return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_l2_size */
+
+/*
+ * ZFS ARC ratio
+ *
+ * "arc_ratio-L1" -> "cache_ratio-arc"
+ * "arc_ratio-L2" -> "cache_ratio-L2"
+ */
+static int v5_zfs_arc_ratio (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+ value_list_t new_vl;
+ value_t new_value;
+
+ if (vl->values_len != 1)
+ return (FC_TARGET_STOP);
+
+ /* Copy everything: Time, interval, host, ... */
+ memcpy (&new_vl, vl, sizeof (new_vl));
+
+ /* Reset data we can't simply copy */
+ new_vl.values = &new_value;
+ new_vl.values_len = 1;
+ new_vl.meta = NULL;
+
+ new_vl.values[0].gauge = (gauge_t) vl->values[0].gauge;
+
+ /* Change the type to "cache_ratio" */
+ sstrncpy (new_vl.type, "cache_ratio", sizeof (new_vl.type));
+
+ /* Adapt the type instance */
+ if (strcmp ("L1", vl->type_instance) == 0)
+ sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance));
+
+ /* Dispatch new value lists instead of this one */
+ plugin_dispatch_values (&new_vl);
+
+ /* Abort processing */
+ return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_ratio */
+
+/*
+ * ZFS ARC size
+ *
+ * 4.* uses the "arc_size" type with four data sources. In 5.* this has been
+ * replaces with the "cache_size" type and static data has been removed.
+ */
+static int v5_zfs_arc_size (const data_set_t *ds, value_list_t *vl) /* {{{ */
+{
+ value_list_t new_vl;
+ value_t new_value;
+
+ if (vl->values_len != 4)
+ return (FC_TARGET_STOP);
+
+ /* Copy everything: Time, interval, host, ... */
+ memcpy (&new_vl, vl, sizeof (new_vl));
+
+ /* Reset data we can't simply copy */
+ new_vl.values = &new_value;
+ new_vl.values_len = 1;
+ new_vl.meta = NULL;
+
+ /* Change the type to "cache_size" */
+ sstrncpy (new_vl.type, "cache_size", sizeof (new_vl.type));
+
+ /* Dispatch new value lists instead of this one */
+ new_vl.values[0].derive = (derive_t) vl->values[0].counter;
+ sstrncpy (new_vl.type_instance, "arc", sizeof (new_vl.type_instance));
+ plugin_dispatch_values (&new_vl);
+
+ /* Abort processing */
+ return (FC_TARGET_STOP);
+} /* }}} int v5_zfs_arc_size */
+
static int v5_destroy (void **user_data) /* {{{ */
{
return (0);
return (v5_mysql_qcache (ds, vl));
else if (strcmp ("mysql_threads", vl->type) == 0)
return (v5_mysql_threads (ds, vl));
+ else if (strcmp ("arc_counts", vl->type) == 0)
+ return (v5_zfs_arc_counts (ds, vl));
+ else if (strcmp ("arc_l2_bytes", vl->type) == 0)
+ return (v5_zfs_arc_l2_bytes (ds, vl));
+ else if (strcmp ("arc_l2_size", vl->type) == 0)
+ return (v5_zfs_arc_l2_size (ds, vl));
+ else if (strcmp ("arc_ratio", vl->type) == 0)
+ return (v5_zfs_arc_ratio (ds, vl));
+ else if (strcmp ("arc_size", vl->type) == 0)
+ return (v5_zfs_arc_size (ds, vl));
return (FC_TARGET_CONTINUE);
} /* }}} int v5_invoke */
cpu value:COUNTER:0:4294967295
current value:GAUGE:U:U
current_connections value:GAUGE:0:U
+current_sessions value:GAUGE:0:U
delay seconds:GAUGE:-1000000:1000000
derive value:DERIVE:0:U
df used:GAUGE:0:1125899906842623, free:GAUGE:0:1125899906842623
total_values value:DERIVE:0:U
uptime value:GAUGE:0:4294967295
users users:GAUGE:0:65535
+vcpu value:GAUGE:0:U
virt_cpu_total ns:COUNTER:0:256000000000
virt_vcpu ns:COUNTER:0:1000000000
vmpage_action value:COUNTER:0:4294967295
vs_memory value:GAUGE:0:9223372036854775807
vs_processes value:GAUGE:0:65535
vs_threads value:GAUGE:0:65535
-pinba_view req_per_sec:GAUGE:0:U, req_time:GAUGE:0:U, ru_utime:GAUGE:0:U, ru_stime:GAUGE:0:U, doc_size:GAUGE:0:U, mem_peak:GAUGE:0:U
{
"SocketFile",
"SocketGroup",
- "SocketPerms"
+ "SocketPerms",
+ "DeleteSocket"
};
static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
static char *sock_file = NULL;
static char *sock_group = NULL;
static int sock_perms = S_IRWXU | S_IRWXG;
+static _Bool delete_socket = 0;
static pthread_t listen_thread = (pthread_t) 0;
sa.sun_family = AF_UNIX;
sstrncpy (sa.sun_path, (sock_file != NULL) ? sock_file : US_DEFAULT_PATH,
sizeof (sa.sun_path));
- /* unlink (sa.sun_path); */
DEBUG ("unixsock plugin: socket path = %s", sa.sun_path);
+ if (delete_socket)
+ {
+ errno = 0;
+ status = unlink (sa.sun_path);
+ if ((status != 0) && (errno != ENOENT))
+ {
+ char errbuf[1024];
+ WARNING ("unixsock plugin: Deleting socket file \"%s\" failed: %s",
+ sa.sun_path,
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ }
+ else if (status == 0)
+ {
+ INFO ("unixsock plugin: Successfully deleted socket file \"%s\".",
+ sa.sun_path);
+ }
+ }
+
status = bind (sock_fd, (struct sockaddr *) &sa, sizeof (sa));
if (status != 0)
{
{
sock_perms = (int) strtol (val, NULL, 8);
}
+ else if (strcasecmp (key, "DeleteSocket") == 0)
+ {
+ if (IS_TRUE (val))
+ delete_socket = 1;
+ else
+ delete_socket = 0;
+ }
else
{
return (-1);
}
/* Check if the entry has been updated in the meantime */
- if ((n.time - ce->last_update) < (2 * ce->interval))
+ if ((n.time - ce->last_update) < (timeout_g * ce->interval))
{
ce->state = STATE_OKAY;
pthread_mutex_unlock (&cache_lock);
return (NULL);
memset (obj, '\0', sizeof (cu_match_t));
- status = regcomp (&obj->regex, regex, REG_EXTENDED);
+ status = regcomp (&obj->regex, regex, REG_EXTENDED | REG_NEWLINE);
if (status != 0)
{
ERROR ("Compiling the regular expression \"%s\" failed.", regex);
while (42)
{
+ size_t len;
+
status = cu_tail_readline (obj, buf, buflen);
if (status != 0)
{
if (buf[0] == 0)
break;
+ len = strlen (buf);
+ while (len > 0) {
+ if (buf[len - 1] != '\n')
+ break;
+ buf[len - 1] = '\0';
+ }
+
status = callback (data, buf, buflen);
if (status != 0)
{
static void za_submit_gauge (const char* type, const char* type_instance, gauge_t value)
{
- value_t values[1];
+ value_t vv;
- values[0].gauge = value;
-
- za_submit (type, type_instance, values, STATIC_ARRAY_SIZE(values));
+ vv.gauge = value;
+ za_submit (type, type_instance, &vv, 1);
}
-static void za_submit_size (gauge_t size, gauge_t size_target, gauge_t limit_min, gauge_t limit_max)
+static void za_submit_derive (const char* type, const char* type_instance, derive_t dv)
{
- value_t values[4];
-
- values[0].gauge = size;
- values[1].gauge = size_target;
- values[2].gauge = limit_min;
- values[3].gauge = limit_max;
+ value_t vv;
- za_submit ("arc_size", "", values, STATIC_ARRAY_SIZE(values));
+ vv.derive = dv;
+ za_submit (type, type_instance, &vv, 1);
}
-static void za_submit_bytes (counter_t read, counter_t write)
+static void za_submit_ratio (const char* type_instance, gauge_t hits, gauge_t misses)
{
- value_t values[2];
+ gauge_t ratio = NAN;
- values[0].counter = read;
- values[1].counter = write;
+ if (!isfinite (hits) || (hits < 0.0))
+ hits = 0.0;
+ if (!isfinite (misses) || (misses < 0.0))
+ misses = 0.0;
- za_submit ("arc_l2_bytes", "", values, STATIC_ARRAY_SIZE(values));
-}
-
-static void za_submit_counts (char *type_instance, counter_t demand_data, counter_t demand_metadata,
- counter_t prefetch_data, counter_t prefetch_metadata)
-{
- value_t values[4];
+ if ((hits != 0.0) || (misses != 0.0))
+ ratio = hits / (hits + misses);
- values[0].counter = demand_data;
- values[1].counter = demand_metadata;
- values[2].counter = prefetch_data;
- values[3].counter = prefetch_metadata;
-
- za_submit ("arc_counts", type_instance, values, STATIC_ARRAY_SIZE(values));
+ za_submit_gauge ("cache_ratio", type_instance, ratio);
}
static int za_read (void)
{
- gauge_t arcsize, targetsize, minlimit, maxlimit, hits, misses, l2_size, l2_hits, l2_misses;
- counter_t demand_data_hits, demand_metadata_hits, prefetch_data_hits, prefetch_metadata_hits;
- counter_t demand_data_misses, demand_metadata_misses, prefetch_data_misses, prefetch_metadata_misses;
- counter_t l2_read_bytes, l2_write_bytes;
+ gauge_t arc_size, l2_size;
+ derive_t demand_data_hits,
+ demand_metadata_hits,
+ prefetch_data_hits,
+ prefetch_metadata_hits,
+ demand_data_misses,
+ demand_metadata_misses,
+ prefetch_data_misses,
+ prefetch_metadata_misses;
+ gauge_t arc_hits, arc_misses, arc_ratio, l2_hits, l2_misses, l2_ratio;
+ value_t l2_io[2];
get_kstat (&ksp, "zfs", 0, "arcstats");
if (ksp == NULL)
return (-1);
}
- arcsize = get_kstat_value(ksp, "size");
- targetsize = get_kstat_value(ksp, "c");
- minlimit = get_kstat_value(ksp, "c_min");
- maxlimit = get_kstat_value(ksp, "c_max");
+ /* Sizes */
+ arc_size = get_kstat_value(ksp, "size");
+ l2_size = get_kstat_value(ksp, "l2_size");
+
+ za_submit_gauge ("cache_size", "arc", arc_size);
+ za_submit_gauge ("cache_size", "L2", l2_size);
+ /* Hits / misses */
demand_data_hits = get_kstat_value(ksp, "demand_data_hits");
demand_metadata_hits = get_kstat_value(ksp, "demand_metadata_hits");
prefetch_data_hits = get_kstat_value(ksp, "prefetch_data_hits");
prefetch_data_misses = get_kstat_value(ksp, "prefetch_data_misses");
prefetch_metadata_misses = get_kstat_value(ksp, "prefetch_metadata_misses");
- hits = get_kstat_value(ksp, "hits");
- misses = get_kstat_value(ksp, "misses");
+ za_submit_derive ("cache_result", "demand_data-hit", demand_data_hits);
+ za_submit_derive ("cache_result", "demand_metadata-hit", demand_metadata_hits);
+ za_submit_derive ("cache_result", "prefetch_data-hit", prefetch_data_hits);
+ za_submit_derive ("cache_result", "prefetch_metadata-hit", prefetch_metadata_hits);
- l2_size = get_kstat_value(ksp, "l2_size");
- l2_read_bytes = get_kstat_value(ksp, "l2_read_bytes");
- l2_write_bytes = get_kstat_value(ksp, "l2_write_bytes");
- l2_hits = get_kstat_value(ksp, "l2_hits");
- l2_misses = get_kstat_value(ksp, "l2_misses");
+ za_submit_derive ("cache_result", "demand_data-miss", demand_data_misses);
+ za_submit_derive ("cache_result", "demand_metadata-miss", demand_metadata_misses);
+ za_submit_derive ("cache_result", "prefetch_data-miss", prefetch_data_misses);
+ za_submit_derive ("cache_result", "prefetch_metadata-miss", prefetch_metadata_misses);
+ /* Ratios */
+ arc_hits = (gauge_t) get_kstat_value(ksp, "hits");
+ arc_misses = (gauge_t) get_kstat_value(ksp, "misses");
+ l2_hits = (gauge_t) get_kstat_value(ksp, "l2_hits");
+ l2_misses = (gauge_t) get_kstat_value(ksp, "l2_misses");
- za_submit_size (arcsize, targetsize, minlimit, maxlimit);
- za_submit_gauge ("arc_l2_size", "", l2_size);
+ za_submit_ratio ("arc", arc_hits, arc_misses);
+ za_submit_ratio ("L2", l2_hits, l2_misses);
- za_submit_counts ("hits", demand_data_hits, demand_metadata_hits,
- prefetch_data_hits, prefetch_metadata_hits);
- za_submit_counts ("misses", demand_data_misses, demand_metadata_misses,
- prefetch_data_misses, prefetch_metadata_misses);
+ /* I/O */
+ l2_io[0].derive = get_kstat_value(ksp, "l2_read_bytes");
+ l2_io[1].derive = get_kstat_value(ksp, "l2_write_bytes");
- za_submit_gauge ("arc_ratio", "L1", hits / (hits + misses));
- za_submit_gauge ("arc_ratio", "L2", l2_hits / (l2_hits + l2_misses));
-
- za_submit_bytes (l2_read_bytes, l2_write_bytes);
+ za_submit ("io_octets", "L2", l2_io, /* num values = */ 2);
return (0);
-}
+} /* int za_read */
static int za_init (void) /* {{{ */
{