Merge pull request #1705 from rpv-tomsk/oracle-memleak-fix
authorPavel Rochnyak <pavel2000@ngs.ru>
Sat, 26 May 2018 17:09:29 +0000 (00:09 +0700)
committerGitHub <noreply@github.com>
Sat, 26 May 2018 17:09:29 +0000 (00:09 +0700)
dbi/postgresql/oracle plugins: Fixes and improvements.

12 files changed:
AUTHORS
Makefile.am
README
configure.ac
contrib/redhat/collectd.spec
src/amqp1.c [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod
src/modbus.c
src/statsd.c
src/utils_deq.h [new file with mode: 0644]
src/virt.c

diff --git a/AUTHORS b/AUTHORS
index 4df743c..409655a 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -59,6 +59,9 @@ Andreas Henriksson <andreas at fatal.se>
 Andy Parkins <andyp at fussylogic.co.uk>
  - battery plugin: sysfs code.
 
+Andy Smith <ansmith at redhat.com>
+ - AMQP 1.0 plugin.
+
 Anthony Dewhurst <dewhurst at gmail.com>
  - zfs_arc plugin.
 
@@ -286,7 +289,7 @@ Scott Sanders <scott at jssjr.com>
  - Write-Graphite plugin.
 
 Sebastien Pahl <sebastien.pahl at dotcloud.com>
- - AMQP plugin.
+ - AMQP 0.9 plugin.
 
 Serhiy Pshyk <serhiyx.pshyk at intel.com>
  - intel_pmu plugin
index 98b71ab..c345904 100644 (file)
@@ -550,6 +550,20 @@ amqp_la_LIBADD = \
        libformat_json.la
 endif
 
+if BUILD_PLUGIN_AMQP1
+pkglib_LTLIBRARIES += amqp1.la
+amqp1_la_SOURCES = \
+       src/amqp1.c \
+       src/utils_deq.h
+amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS)
+amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS)
+amqp1_la_LIBADD = \
+       $(BUILD_WITH_LIBQPIDPROTON_LIBS) \
+       libcmds.la \
+       libformat_graphite.la \
+       libformat_json.la
+endif
+
 if BUILD_PLUGIN_APACHE
 pkglib_LTLIBRARIES += apache.la
 apache_la_SOURCES = src/apache.c
diff --git a/README b/README
index 43df03b..2210b2b 100644 (file)
--- a/README
+++ b/README
@@ -465,7 +465,11 @@ Features
 
     - amqp
       Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
-      server, such as RabbitMQ.
+      0.9.1 server, such as RabbitMQ.
+
+    - amqp1
+      Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP)
+      1.0 server, such as Qpid Dispatch Router or Apache Artemis Broker.
 
     - csv
       Write to comma separated values (CSV) files. This needs lots of
@@ -908,8 +912,14 @@ Prerequisites
     are supported.
     <http://www.python.org/>
 
+  * libqpid-proton (optional)
+    Used by the `amqp1' plugin for AMQP 1.0 connections, for example to
+    Qdrouterd.
+    <http://qpid.apache.org/>
+
   * librabbitmq (optional; also called “rabbitmq-c”)
-    Used by the `amqp' plugin for AMQP connections, for example to RabbitMQ.
+    Used by the `amqp' plugin for AMQP 0.9.1 connections, for example to
+    RabbitMQ.
     <http://hg.rabbitmq.com/rabbitmq-c/>
 
   * librdkafka (optional; also called “rdkafka”)
index 4be85bb..1e31e21 100644 (file)
@@ -4719,6 +4719,56 @@ if test "$with_libpython" != "xno"; then
 fi
 # }}} --with-libpython
 
+# --with-libqpid_proton {{{
+AC_ARG_WITH([libqpid_proton],
+  [AS_HELP_STRING([--with-libqpid_proton@<:@=PREFIX@:>@], [Path to libqpid_proton.])],
+  [
+    if test "x$withval" != "xno" && test "x$withval" != "xyes"; then
+      with_libqpid_proton_cppflags="-I$withval/include"
+      with_libqpid_proton_ldflags="-L$withval/lib"
+      with_libqpid_proton="yes"
+    else
+      with_libqpid_proton="$withval"
+    fi
+  ],
+  [with_libqpid_proton="yes"]
+)
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  SAVE_CPPFLAGS="$CPPFLAGS"
+  CPPFLAGS="$CPPFLAGS $with_libqpid_proton_cppflags"
+
+  AC_CHECK_HEADERS([proton/proactor.h],
+    [with_libqpid_proton="yes"],
+    [with_libqpid_proton="no (proton/proactor.h not found)"]
+  )
+
+  CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  SAVE_LDFLAGS="$LDFLAGS"
+  LDFLAGS="$LDFLAGS $with_libqpid_proton_ldflags"
+
+  AC_CHECK_LIB([qpid-proton], [pn_connection],
+    [with_libqpid_proton="yes"],
+    [with_libqpid_proton="no (Symbol 'pn_connection' not found)"])
+
+  LDFLAGS="$SAVE_LDFLAGS"
+fi
+
+if test "x$with_libqpid_proton" = "xyes"; then
+  BUILD_WITH_LIBQPIDPROTON_CPPFLAGS="$with_libqpid_proton_cppflags"
+  BUILD_WITH_LIBQPIDPROTON_LDFLAGS="$with_libqpid_proton_ldflags"
+  BUILD_WITH_LIBQPIDPROTON_LIBS="-lqpid-proton"
+fi
+
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS)
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LDFLAGS)
+AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LIBS)
+
+# }}}
+
 # --with-librabbitmq {{{
 AC_ARG_WITH([librabbitmq],
   [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])],
@@ -6544,6 +6594,7 @@ m4_divert_once([HELP_ENABLE], [])
 
 AC_PLUGIN([aggregation],         [yes],                     [Aggregation plugin])
 AC_PLUGIN([amqp],                [$with_librabbitmq],       [AMQP output plugin])
+AC_PLUGIN([amqp1],               [$with_libqpid_proton],    [AMQP 1.0 output plugin])
 AC_PLUGIN([apache],              [$with_libcurl],           [Apache httpd statistics])
 AC_PLUGIN([apcups],              [yes],                     [Statistics of UPSes by APC])
 AC_PLUGIN([apple_sensors],       [$with_libiokit],          [Apple hardware sensors])
@@ -6932,6 +6983,7 @@ AC_MSG_RESULT([    libpqos . . . . . . . $with_libpqos])
 AC_MSG_RESULT([    libprotobuf . . . . . $with_libprotobuf])
 AC_MSG_RESULT([    libprotobuf-c . . . . $with_libprotobuf_c])
 AC_MSG_RESULT([    libpython . . . . . . $with_libpython])
+AC_MSG_RESULT([    libqpid-proton .  . . $with_libqpid_proton])
 AC_MSG_RESULT([    librabbitmq . . . . . $with_librabbitmq])
 AC_MSG_RESULT([    libriemann-client . . $with_libriemann_client])
 AC_MSG_RESULT([    librdkafka  . . . . . $with_librdkafka])
@@ -6963,6 +7015,7 @@ AC_MSG_RESULT()
 AC_MSG_RESULT([  Modules:])
 AC_MSG_RESULT([    aggregation . . . . . $enable_aggregation])
 AC_MSG_RESULT([    amqp    . . . . . . . $enable_amqp])
+AC_MSG_RESULT([    amqp1   . . . . . . . $enable_amqp1])
 AC_MSG_RESULT([    apache  . . . . . . . $enable_apache])
 AC_MSG_RESULT([    apcups  . . . . . . . $enable_apcups])
 AC_MSG_RESULT([    apple_sensors . . . . $enable_apple_sensors])
index 752b024..6f86b7e 100644 (file)
@@ -44,6 +44,7 @@
 # plugins enabled by default
 %define with_aggregation 0%{!?_without_aggregation:1}
 %define with_amqp 0%{!?_without_amqp:1}
+%define with_amqp1 0%{!?_without_amqp1:1}
 %define with_apache 0%{!?_without_apache:1}
 %define with_apcups 0%{!?_without_apcups:1}
 %define with_ascent 0%{!?_without_ascent:1}
@@ -280,13 +281,24 @@ every 10 seconds by default.
 
 %if %{with_amqp}
 %package amqp
-Summary:       AMQP plugin for collectd
+Summary:       AMQP 0.9 plugin for collectd
 Group:         System Environment/Daemons
 Requires:      %{name}%{?_isa} = %{version}-%{release}
 BuildRequires: librabbitmq-devel
 %description amqp
-The AMQP plugin transmits or receives values collected by collectd via the
-Advanced Message Queuing Protocol (AMQP).
+The AMQP 0.9 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v0.9 (AMQP).
+%endif
+
+%if %{with_amqp1}
+%package amqp1
+Summary:       AMQP 1.0 plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: qpid-proton-c-devel
+%description amqp1
+The AMQP 1.0 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v1.0 (AMQP1).
 %endif
 
 %if %{with_apache}
@@ -1018,6 +1030,12 @@ Collectd utilities
 %define _with_amqp --disable-amqp
 %endif
 
+%if %{with_amqp1}
+%define _with_amqp1 --enable-amqp1
+%else
+%define _with_amqp1 --disable-amqp1
+%endif
+
 %if %{with_apache}
 %define _with_apache --enable-apache
 %else
@@ -1898,6 +1916,7 @@ Collectd utilities
        --enable-target_v5upgrade \
        %{?_with_aggregation} \
        %{?_with_amqp} \
+       %{?_with_amqp1} \
        %{?_with_apache} \
        %{?_with_apcups} \
        %{?_with_apple_sensors} \
@@ -2409,6 +2428,11 @@ fi
 %{_libdir}/%{name}/amqp.so
 %endif
 
+%if %{with_amqp1}
+%files amqp1
+%{_libdir}/%{name}/amqp1.so
+%endif
+
 %if %{with_apache}
 %files apache
 %{_libdir}/%{name}/apache.so
diff --git a/src/amqp1.c b/src/amqp1.c
new file mode 100644 (file)
index 0000000..e60142b
--- /dev/null
@@ -0,0 +1,701 @@
+/**
+ * collectd - src/amqp1.c
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Andy Smith <ansmith@redhat.com>
+ */
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_cmd_putval.h"
+#include "utils_deq.h"
+#include "utils_format_graphite.h"
+#include "utils_format_json.h"
+#include "utils_random.h"
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <errno.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define BUFSIZE 8192
+#define AMQP1_FORMAT_JSON 0
+#define AMQP1_FORMAT_COMMAND 1
+#define AMQP1_FORMAT_GRAPHITE 2
+
+typedef struct amqp1_config_transport_s {
+  DEQ_LINKS(struct amqp1_config_transport_s);
+  char *name;
+  char *host;
+  char *port;
+  char *user;
+  char *password;
+  char *address;
+  int retry_delay;
+} amqp1_config_transport_t;
+
+typedef struct amqp1_config_instance_s {
+  DEQ_LINKS(struct amqp1_config_instance_s);
+  char *name;
+  bool notify;
+  uint8_t format;
+  unsigned int graphite_flags;
+  bool store_rates;
+  char *prefix;
+  char *postfix;
+  char escape_char;
+  bool pre_settle;
+  char send_to[1024];
+} amqp1_config_instance_t;
+
+DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
+
+typedef struct cd_message_s {
+  DEQ_LINKS(struct cd_message_s);
+  pn_rwbytes_t mbuf;
+  amqp1_config_instance_t *instance;
+} cd_message_t;
+
+DEQ_DECLARE(cd_message_t, cd_message_list_t);
+
+/*
+ * Globals
+ */
+static pn_connection_t *conn;
+static pn_link_t *sender;
+static pn_proactor_t *proactor;
+static pthread_mutex_t send_lock;
+static cd_message_list_t out_messages;
+static uint64_t cd_tag = 1;
+static uint64_t acknowledged;
+static amqp1_config_transport_t *transport;
+static bool stopping;
+static bool event_thread_running;
+static pthread_t event_thread_id;
+
+/*
+ * Functions
+ */
+static void cd_message_free(cd_message_t *cdm) {
+  free(cdm->mbuf.start);
+  free(cdm);
+} /* }}} void cd_message_free */
+
+static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
+{
+  uint64_t dtag;
+  cd_message_list_t to_send;
+  cd_message_t *cdm;
+  int link_credit = pn_link_credit(link);
+  int event_count = 0;
+  pn_delivery_t *dlv;
+
+  if (stopping) {
+    return 0;
+  }
+
+  DEQ_INIT(to_send);
+
+  pthread_mutex_lock(&send_lock);
+
+  if (link_credit > 0) {
+    dtag = cd_tag;
+    cdm = DEQ_HEAD(out_messages);
+    while (cdm) {
+      DEQ_REMOVE_HEAD(out_messages);
+      DEQ_INSERT_TAIL(to_send, cdm);
+      if (DEQ_SIZE(to_send) == link_credit)
+        break;
+      cdm = DEQ_HEAD(out_messages);
+    }
+    cd_tag += DEQ_SIZE(to_send);
+  }
+
+  pthread_mutex_unlock(&send_lock);
+
+  /* message is already formatted and encoded */
+  cdm = DEQ_HEAD(to_send);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(to_send);
+    dtag++;
+    dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
+    pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
+    pn_link_advance(link);
+    if (cdm->instance->pre_settle == true) {
+      pn_delivery_settle(dlv);
+    }
+    event_count++;
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(to_send);
+  }
+
+  return event_count;
+} /* }}} int amqp1_send_out_messages */
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
+{
+  if (pn_condition_is_set(cond)) {
+    ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
+          pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    conn = NULL;
+  }
+} /* }}} void check_condition */
+
+static bool handle(pn_event_t *event) /* {{{ */
+{
+
+  switch (pn_event_type(event)) {
+
+  case PN_CONNECTION_INIT: {
+    conn = pn_event_connection(event);
+    pn_connection_set_container(conn, transport->name);
+    pn_connection_open(conn);
+    pn_session_t *ssn = pn_session(conn);
+    pn_session_open(ssn);
+    sender = pn_sender(ssn, "cd-sender");
+    pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
+    pn_link_open(sender);
+    break;
+  }
+
+  case PN_LINK_FLOW: {
+    /* peer has given us credit, send outbound messages */
+    amqp1_send_out_messages(sender);
+    break;
+  }
+
+  case PN_DELIVERY: {
+    /* acknowledgement from peer that a message was delivered */
+    pn_delivery_t *dlv = pn_event_delivery(event);
+    if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
+      pn_delivery_settle(dlv);
+      acknowledged++;
+    }
+    break;
+  }
+
+  case PN_CONNECTION_WAKE: {
+    if (!stopping) {
+      amqp1_send_out_messages(sender);
+    }
+    break;
+  }
+
+  case PN_TRANSPORT_CLOSED: {
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    break;
+  }
+
+  case PN_CONNECTION_REMOTE_CLOSE: {
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_SESSION_REMOTE_CLOSE: {
+    check_condition(event,
+                    pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_LINK_REMOTE_CLOSE:
+  case PN_LINK_REMOTE_DETACH: {
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+  }
+
+  case PN_PROACTOR_INACTIVE: {
+    return false;
+  }
+
+  default:
+    break;
+  }
+  return true;
+} /* }}} bool handle */
+
+static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
+{
+  char addr[PN_MAX_ADDR];
+  cd_message_t *cdm;
+
+  /* setup proactor */
+  proactor = pn_proactor();
+  pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
+
+  while (!stopping) {
+    /* make connection */
+    conn = pn_connection();
+    if (transport->user != NULL) {
+      pn_connection_set_user(conn, transport->user);
+      pn_connection_set_password(conn, transport->password);
+    }
+    pn_proactor_connect(proactor, conn, addr);
+
+    bool engine_running = true;
+    while (engine_running && !stopping) {
+      pn_event_batch_t *events = pn_proactor_wait(proactor);
+      pn_event_t *e;
+      while ((e = pn_event_batch_next(events))) {
+        engine_running = handle(e);
+        if (!engine_running) {
+          break;
+        }
+      }
+      pn_proactor_done(proactor, events);
+    }
+
+    pn_proactor_release_connection(conn);
+
+    DEBUG("amqp1 plugin: retrying connection");
+    int delay = transport->retry_delay;
+    while (delay-- > 0 && !stopping) {
+      sleep(1.0);
+    }
+  }
+
+  pn_proactor_disconnect(proactor, NULL);
+
+  /* Free the remaining out_messages */
+  cdm = DEQ_HEAD(out_messages);
+  while (cdm) {
+    DEQ_REMOVE_HEAD(out_messages);
+    cd_message_free(cdm);
+    cdm = DEQ_HEAD(out_messages);
+  }
+
+  event_thread_running = false;
+
+  return NULL;
+} /* }}} void event_thread */
+
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
+{
+  /* encode message */
+  pn_message_t *message = pn_message();
+  pn_message_set_address(message, instance->send_to);
+  pn_data_t *body = pn_message_body(message);
+  pn_data_clear(body);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
+  pn_data_exit(body);
+
+  /* put_binary copies and stores so ok to use mbuf */
+  cdm->mbuf.size = BUFSIZE;
+
+  int status;
+  while ((status = pn_message_encode(message, cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size);
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    cd_message_free(cdm);
+    return -1;
+  }
+
+  pthread_mutex_lock(&send_lock);
+  DEQ_INSERT_TAIL(out_messages, cdm);
+  pthread_mutex_unlock(&send_lock);
+
+  pn_message_free(message);
+
+  /* activate the sender */
+  if (conn) {
+    pn_connection_wake(conn);
+  }
+
+  return 0;
+} /* }}} int encqueue */
+
+static int amqp1_notify(notification_t const *n,
+                        user_data_t *user_data) /* {{{ */
+{
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
+  size_t bufsize = BUFSIZE;
+
+  if (n == NULL || user_data == NULL)
+    return EINVAL;
+
+  amqp1_config_instance_t *instance = user_data->data;
+
+  if (instance->notify != true) {
+    ERROR("amqp1 plugin: write notification failed");
+  }
+
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  DEQ_ITEM_INIT(cdm);
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  cdm->instance = instance;
+
+  switch (instance->format) {
+  case AMQP1_FORMAT_JSON:
+    format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
+    int status = format_json_notification(cdm->mbuf.start, bufsize, n);
+    if (status != 0) {
+      ERROR("amqp1 plugin: formatting notification failed");
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  default:
+    ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
+    return -1;
+  }
+
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
+
+} /* }}} int amqp1_notify */
+
+static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+                       user_data_t *user_data) {
+  int status = 0;
+  size_t bfree = BUFSIZE;
+  size_t bfill = 0;
+  size_t bufsize = BUFSIZE;
+
+  if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
+    return EINVAL;
+
+  amqp1_config_instance_t *instance = user_data->data;
+
+  if (instance->notify != false) {
+    ERROR("amqp1 plugin: write failed");
+  }
+
+  cd_message_t *cdm = malloc(sizeof(*cdm));
+  DEQ_ITEM_INIT(cdm);
+  cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
+  cdm->instance = instance;
+
+  switch (instance->format) {
+  case AMQP1_FORMAT_COMMAND:
+    status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
+    if (status != 0) {
+      ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  case AMQP1_FORMAT_JSON:
+    format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
+    format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
+                           instance->store_rates);
+    format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  case AMQP1_FORMAT_GRAPHITE:
+    status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
+                             instance->prefix, instance->postfix,
+                             instance->escape_char, instance->graphite_flags);
+    if (status != 0) {
+      ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
+      return status;
+    }
+    cdm->mbuf.size = strlen(cdm->mbuf.start);
+    break;
+  default:
+    ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
+    return -1;
+  }
+
+  /* encode message and place on outbound queue */
+  return encqueue(cdm, instance);
+
+} /* }}} int amqp1_write */
+
+static void amqp1_config_transport_free(void *ptr) /* {{{ */
+{
+  amqp1_config_transport_t *transport = ptr;
+
+  if (transport == NULL)
+    return;
+
+  sfree(transport->name);
+  sfree(transport->host);
+  sfree(transport->user);
+  sfree(transport->password);
+  sfree(transport->address);
+
+  sfree(transport);
+} /* }}} void amqp1_config_transport_free */
+
+static void amqp1_config_instance_free(void *ptr) /* {{{ */
+{
+  amqp1_config_instance_t *instance = ptr;
+
+  if (instance == NULL)
+    return;
+
+  sfree(instance->name);
+  sfree(instance->prefix);
+  sfree(instance->postfix);
+
+  sfree(instance);
+} /* }}} void amqp1_config_instance_free */
+
+static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
+{
+  amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
+  if (instance == NULL) {
+    ERROR("amqp1 plugin: calloc failed.");
+    return ENOMEM;
+  }
+
+  int status = cf_util_get_string(ci, &instance->name);
+  if (status != 0) {
+    sfree(instance);
+    return status;
+  }
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("PreSettle", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->pre_settle);
+    else if (strcasecmp("Notify", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->notify);
+    else if (strcasecmp("Format", child->key) == 0) {
+      char *key = NULL;
+      status = cf_util_get_string(child, &key);
+      if (status != 0)
+        return status;
+      assert(key != NULL);
+      if (strcasecmp(key, "Command") == 0) {
+        instance->format = AMQP1_FORMAT_COMMAND;
+      } else if (strcasecmp(key, "Graphite") == 0) {
+        instance->format = AMQP1_FORMAT_GRAPHITE;
+      } else if (strcasecmp(key, "JSON") == 0) {
+        instance->format = AMQP1_FORMAT_JSON;
+      } else {
+        WARNING("amqp1 plugin: Invalid format string: %s", key);
+      }
+      sfree(key);
+    } else if (strcasecmp("StoreRates", child->key) == 0)
+      status = cf_util_get_boolean(child, &instance->store_rates);
+    else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_SEPARATE_INSTANCES);
+    else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_ALWAYS_APPEND_DS);
+    else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
+      status = cf_util_get_flag(child, &instance->graphite_flags,
+                                GRAPHITE_PRESERVE_SEPARATOR);
+    else if (strcasecmp("GraphitePrefix", child->key) == 0)
+      status = cf_util_get_string(child, &instance->prefix);
+    else if (strcasecmp("GraphitePostfix", child->key) == 0)
+      status = cf_util_get_string(child, &instance->postfix);
+    else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
+      char *tmp_buff = NULL;
+      status = cf_util_get_string(child, &tmp_buff);
+      if (status == 0) {
+        if (strlen(tmp_buff) > 1)
+          WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
+                  "only one character. Others will be ignored.");
+        instance->escape_char = tmp_buff[0];
+      }
+      sfree(tmp_buff);
+    } else
+      WARNING("amqp1 plugin: Ignoring unknown "
+              "instance configuration option "
+              "\%s\".",
+              child->key);
+    if (status != 0)
+      break;
+  }
+
+  if (status != 0) {
+    amqp1_config_instance_free(instance);
+    return status;
+  } else {
+    char tpname[DATA_MAX_NAME_LEN];
+    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                      transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
+    if (instance->notify) {
+      status = plugin_register_notification(
+          tpname, amqp1_notify,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
+    } else {
+      status = plugin_register_write(
+          tpname, amqp1_write,
+          &(user_data_t){
+              .data = instance, .free_func = amqp1_config_instance_free,
+          });
+    }
+
+    if (status != 0) {
+      amqp1_config_instance_free(instance);
+    }
+  }
+
+  return status;
+} /* }}} int amqp1_config_instance */
+
+static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
+{
+  transport = calloc(1, sizeof(*transport));
+  if (transport == NULL) {
+    ERROR("amqp1 plugin: calloc failed.");
+    return ENOMEM;
+  }
+
+  /* Initialize transport configuration {{{ */
+  transport->retry_delay = 1;
+
+  int status = cf_util_get_string(ci, &transport->name);
+  if (status != 0) {
+    sfree(transport);
+    return status;
+  }
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Host", child->key) == 0)
+      status = cf_util_get_string(child, &transport->host);
+    else if (strcasecmp("Port", child->key) == 0)
+      status = cf_util_get_string(child, &transport->port);
+    else if (strcasecmp("User", child->key) == 0)
+      status = cf_util_get_string(child, &transport->user);
+    else if (strcasecmp("Password", child->key) == 0)
+      status = cf_util_get_string(child, &transport->password);
+    else if (strcasecmp("Address", child->key) == 0)
+      status = cf_util_get_string(child, &transport->address);
+    else if (strcasecmp("RetryDelay", child->key) == 0)
+      status = cf_util_get_int(child, &transport->retry_delay);
+    else if (strcasecmp("Instance", child->key) == 0)
+      amqp1_config_instance(child);
+    else
+      WARNING("amqp1 plugin: Ignoring unknown "
+              "transport configuration option "
+              "\%s\".",
+              child->key);
+
+    if (status != 0)
+      break;
+  }
+
+  if (status != 0) {
+    amqp1_config_transport_free(transport);
+  }
+  return status;
+} /* }}} int amqp1_config_transport */
+
+static int amqp1_config(oconfig_item_t *ci) /* {{{ */
+{
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Transport", child->key) == 0)
+      amqp1_config_transport(child);
+    else
+      WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
+              child->key);
+  }
+
+  return 0;
+} /* }}} int amqp1_config */
+
+static int amqp1_init(void) /* {{{ */
+{
+  if (transport == NULL) {
+    ERROR("amqp1: init failed, no transport configured");
+    return -1;
+  }
+
+  if (proactor == NULL) {
+    pthread_mutex_init(&send_lock, /* attr = */ NULL);
+    /* start_thread */
+    int status =
+        plugin_thread_create(&event_thread_id, NULL /* no attributes */,
+                             event_thread, NULL /* no argument */, "handle");
+    if (status != 0) {
+      ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
+    } else {
+      event_thread_running = true;
+    }
+  }
+  return 0;
+} /* }}} int amqp1_init */
+
+static int amqp1_shutdown(void) /* {{{ */
+{
+  stopping = true;
+
+  /* Stop the proactor thread */
+  if (event_thread_running) {
+    DEBUG("amqp1 plugin: Shutting down proactor thread.");
+    pn_connection_wake(conn);
+  }
+  pthread_join(event_thread_id, NULL /* no return value */);
+  memset(&event_thread_id, 0, sizeof(event_thread_id));
+
+  DEBUG("amqp1 plugin: proactor thread exited.");
+
+  if (transport) {
+    amqp1_config_transport_free(transport);
+  }
+
+  return 0;
+} /* }}} int amqp1_shutdown */
+
+void module_register(void) {
+  plugin_register_complex_config("amqp1", amqp1_config);
+  plugin_register_init("amqp1", amqp1_init);
+  plugin_register_shutdown("amqp1", amqp1_shutdown);
+} /* void module_register */
index 96c2630..662b483 100644 (file)
@@ -90,6 +90,7 @@
 
 #@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation
 #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp
+#@BUILD_PLUGIN_AMQP1_TRUE@LoadPlugin amqp1
 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache
 #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups
 #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors
 #  </Publish>
 #</Plugin>
 
+#<Plugin amqp1>
+#  <Transport "name">
+#    Host "localhost"
+#    Port "5672"
+#    User "guest"
+#    Password "guest"
+#    Address "collectd"
+#    RetryDelay 1
+#    <Instance "log">
+#        Format JSON
+#        PreSettle false
+#    </Instance>
+#    <Instance "notify">
+#        Format JSON
+#        PreSettle true
+#    </Instance>
+#    <Instance "telemetry">
+#        Format JSON
+#        PreSettle false
+#    </Instance>
+#  </Transport>
+#</Plugin>
+
 #<Plugin apache>
 #  <Instance "local">
 #    URL "http://localhost/status?auto"
 #              RegisterType float
 #              Type gauge
 #              Instance "..."
+#              #Scale 1.0
+#              #Shift 0.0
 #      </Data>
 #
 #      <Host "name">
index 17bc680..7a21ba2 100644 (file)
@@ -530,9 +530,9 @@ are disabled by default.
 =head2 Plugin C<amqp>
 
 The I<AMQP plugin> can be used to communicate with other instances of
-I<collectd> or third party applications using an AMQP message broker. Values
-are sent to or received from the broker, which handles routing, queueing and
-possibly filtering out messages.
+I<collectd> or third party applications using an AMQP 0.9.1 message broker.
+Values are sent to or received from the broker, which handles routing,
+queueing and possibly filtering out messages.
 
 B<Synopsis:>
 
@@ -738,6 +738,171 @@ is preserved, i.e. passed through.
 
 =back
 
+=head2 Plugin C<amqp1>
+
+The I<AMQP1 plugin> can be used to communicate with other instances of
+I<collectd> or third party applications using an AMQP 1.0 message
+intermediary. Metric values or notifications are sent to the
+messaging intermediary which may handle direct messaging or
+queue based transfer.
+
+B<Synopsis:>
+
+ <Plugin "amqp1">
+   # Send values to an AMQP 1.0 intermediary
+  <Transport "name">
+    Host "localhost"
+    Port "5672"
+    User "guest"
+    Password "guest"
+    Address "collectd"
+#    RetryDelay 1
+    <Instance "some_name">
+        Format "command"
+        PreSettle false
+        Notify false
+ #      StoreRates false
+ #      GraphitePrefix "collectd."
+ #      GraphiteEscapeChar "_"
+ #      GraphiteSeparateInstances false
+ #      GraphiteAlwaysAppendDS false
+ #      GraphitePreserveSeparator false
+    </Instance>
+  </Transport>
+ </Plugin>
+
+The plugin's configuration consists of a I<Transport> that configures
+communications to the AMQP 1.0 messaging bus and one or more I<Instance>
+corresponding to metric or event publishers to the messaging system.
+
+The address in the I<Transport> block concatenated with the name given in the
+I<Instance> block starting tag will be used as the send-to address for
+communications over the messaging link.
+
+The following options are accepted within each I<Transport> block:
+
+=over 4
+
+=item B<Host> I<Host>
+
+Hostname or IP-address of the AMQP 1.0 intermediary. Defaults to the
+default behavior of the underlying communications library,
+I<libqpid-proton>, which is "localhost".
+
+=item B<Port> I<Port>
+
+Service name or port number on which the AMQP 1.0 intermediary accepts
+connections. This argument must be a string, even if the numeric form
+is used. Defaults to "5672".
+
+=item B<User> I<User>
+
+=item B<Password> I<Password>
+
+Credentials used to authenticate to the AMQP 1.0 intermediary. By
+default "guest"/"guest" is used.
+
+=item B<Address> I<Address>
+
+This option specifies the prefix for the send-to value in the message.
+By default, "collectd" will be used.
+
+=item B<RetryDelay> I<RetryDelay>
+
+When the AMQP1 connection is lost, defines the time in seconds to wait
+before attempting to reconnect. Defaults to 1, which implies attempt
+to reconnect at 1 second intervals.
+
+=back
+
+The following options are accepted within each I<Instance> block:
+
+=over 4
+
+=item B<Format> B<Command>|B<JSON>|B<Graphite>
+
+Selects the format in which messages are sent to the intermediary. If set to
+B<Command> (the default), values are sent as C<PUTVAL> commands which are
+identical to the syntax used by the I<Exec> and I<UnixSock plugins>. In this
+case, the C<Content-Type> header field will be set to C<text/collectd>.
+
+If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
+an easy and straight forward exchange format. The C<Content-Type> header field
+will be set to C<application/json>.
+
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
+A subscribing client I<should> use the C<Content-Type> header field to
+determine how to decode the values.
+
+=item B<PreSettle> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will wait for a message
+acknowledgement from the messaging bus before sending the next
+message. This indicates transfer of ownership to the messaging
+system. If set to B<true>, the plugin will not wait for a message
+acknowledgement and the message may be dropped prior to transfer of
+ownership.
+
+=item B<Notify> B<true>|B<false>
+
+If set to B<false> (the default), the plugin will service the
+instance write call back as a value list. If set to B<true> the
+plugin will service the instance as a write notification callback
+for alert formatting.
+
+=item B<StoreRates> B<true>|B<false>
+
+Determines whether or not C<COUNTER>, C<DERIVE> and C<ABSOLUTE> data sources
+are converted to a I<rate> (i.e. a C<GAUGE> value). If set to B<false> (the
+default), no conversion is performed. Otherwise the conversion is performed
+using the internal value cache.
+
+Please note that currently this option is only used if the B<Format> option has
+been set to B<JSON>.
+
+=item B<GraphitePrefix>
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix>
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar>
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
+=item B<GraphiteSeparateInstances> B<true>|B<false>
+
+If set to B<true>, the plugin instance and type instance will be in their own
+path component, for example C<host.cpu.0.cpu.idle>. If set to B<false> (the
+default), the plugin and plugin instance (and likewise the type and type
+instance) are put into one component, for example C<host.cpu-0.cpu-idle>.
+
+=item B<GraphiteAlwaysAppendDS> B<true>|B<false>
+
+If set to B<true>, append the name of the I<Data Source> (DS) to the "metric"
+identifier. If set to B<false> (the default), this is only done when there is
+more than one DS.
+
+=item B<GraphitePreserveSeparator> B<false>|B<true>
+
+If set to B<false> (the default) the C<.> (dot) character is replaced with
+I<GraphiteEscapeChar>. Otherwise, if set to B<true>, the C<.> (dot) character
+is preserved, i.e. passed through.
+
+=back
+
 =head2 Plugin C<apache>
 
 To configure the C<apache>-plugin you first need to configure the Apache
@@ -4077,8 +4242,9 @@ which the sizes of physical memory vary.
 
 The B<modbus plugin> connects to a Modbus "slave" via Modbus/TCP or Modbus/RTU and
 reads register values. It supports reading single registers (unsigned 16E<nbsp>bit
-values), large integer values (unsigned 32E<nbsp>bit values) and floating point
-values (two registers interpreted as IEEE floats in big endian notation).
+values), large integer values (unsigned 32E<nbsp>bit and 64E<nbsp>bit values) and
+floating point values (two registers interpreted as IEEE floats in big endian
+notation).
 
 B<Synopsis:>
 
@@ -4088,6 +4254,8 @@ B<Synopsis:>
    RegisterCmd ReadHolding
    Type voltage
    Instance "input-1"
+   #Scale 1.0
+   #Shift 0.0
  </Data>
 
  <Data "voltage-input-2">
@@ -4146,7 +4314,7 @@ Configures the base register to read from the device. If the option
 B<RegisterType> has been set to B<Uint32> or B<Float>, this and the next
 register will be read (the register number is increased by one).
 
-=item B<RegisterType> B<Int16>|B<Int32>|B<Uint16>|B<Uint32>|B<Float>|B<Int32LE>|B<Uint32LE>|B<FloatLE>
+=item B<RegisterType> B<Int16>|B<Int32>|B<Int64>|B<Uint16>|B<Uint32>|B<UInt64>|B<Float>|B<Int32LE>|B<Uint32LE>|B<FloatLE>
 
 Specifies what kind of data is returned by the device. This defaults to
 B<Uint16>.  If the type is B<Int32>, B<Int32LE>, B<Uint32>, B<Uint32LE>,
@@ -4158,7 +4326,10 @@ significant 16E<nbsp>bits are in the register at B<RegisterBase+1>.
 For B<Int32LE>, B<Uint32LE>, or B<Float32LE>, the high and low order
 registers are swapped with the most significant 16E<nbsp>bits in
 the B<RegisterBase+1> and the least significant 16E<nbsp>bits in
-B<RegisterBase>.
+B<RegisterBase>. If the type is B<Int64> or B<UInt64>, four 16E<nbsp>bit
+registers at B<RegisterBase>, B<RegisterBase+1>, B<RegisterBase+2> and
+B<RegisterBase+3> will be read and the data combined into one
+64E<nbsp>value.
 
 =item B<RegisterCmd> B<ReadHolding>|B<ReadInput>
 
@@ -4173,9 +4344,19 @@ supported.
 
 =item B<Instance> I<Instance>
 
-Sets the type instance to use when dispatching the value to I<collectd>. If
+Sets the type instance to use when dispatching the value to I<Instance>. If
 unset, an empty string (no type instance) is used.
 
+=item B<Scale> I<Value>
+
+The values taken from device are multiplied by I<Value>. The field is optional
+and the default is B<1.0>.
+
+=item B<Shift> I<Value>
+
+I<Value> is added to values from device after they have been multiplied by
+B<Scale> value. The field is optional and the default value is B<0.0>.
+
 =back
 
 =item E<lt>B<Host> I<Name>E<gt> blocks
index efcf6be..bb9eaa0 100644 (file)
@@ -80,6 +80,8 @@ enum mb_register_type_e /* {{{ */
   REG_TYPE_UINT16,
   REG_TYPE_UINT32,
   REG_TYPE_UINT32_CDAB,
+  REG_TYPE_INT64,
+  REG_TYPE_UINT64,
   REG_TYPE_FLOAT,
   REG_TYPE_FLOAT_CDAB }; /* }}} */
 
@@ -105,6 +107,8 @@ struct mb_data_s /* {{{ */
   mb_mreg_type_t modbus_register_type;
   char type[DATA_MAX_NAME_LEN];
   char instance[DATA_MAX_NAME_LEN];
+  double scale;
+  double shift;
 
   mb_data_t *next;
 }; /* }}} */
@@ -392,21 +396,21 @@ static int mb_init_connection(mb_host_t *host) /* {{{ */
 } /* }}} int mb_init_connection */
 #endif /* !LEGACY_LIBMODBUS */
 
-#define CAST_TO_VALUE_T(ds, vt, raw)                                           \
+#define CAST_TO_VALUE_T(ds, vt, raw, scale, shift)                             \
   do {                                                                         \
     if ((ds)->ds[0].type == DS_TYPE_COUNTER)                                   \
-      (vt).counter = (counter_t)(raw);                                         \
+      (vt).counter = (((counter_t)(raw)*scale) + shift);                       \
     else if ((ds)->ds[0].type == DS_TYPE_GAUGE)                                \
-      (vt).gauge = (gauge_t)(raw);                                             \
+      (vt).gauge = (((gauge_t)(raw)*scale) + shift);                           \
     else if ((ds)->ds[0].type == DS_TYPE_DERIVE)                               \
-      (vt).derive = (derive_t)(raw);                                           \
+      (vt).derive = (((derive_t)(raw)*scale) + shift);                         \
     else /* if (ds->ds[0].type == DS_TYPE_ABSOLUTE) */                         \
-      (vt).absolute = (absolute_t)(raw);                                       \
+      (vt).absolute = (((absolute_t)(raw)*scale) + shift);                     \
   } while (0)
 
 static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
                         mb_data_t *data) {
-  uint16_t values[2] = {0};
+  uint16_t values[4] = {0};
   int values_num;
   const data_set_t *ds;
   int status = 0;
@@ -431,11 +435,13 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
       (data->register_type != REG_TYPE_INT32) &&
       (data->register_type != REG_TYPE_INT32_CDAB) &&
       (data->register_type != REG_TYPE_UINT32) &&
-      (data->register_type != REG_TYPE_UINT32_CDAB)) {
+      (data->register_type != REG_TYPE_UINT32_CDAB) &&
+      (data->register_type != REG_TYPE_INT64) &&
+      (data->register_type != REG_TYPE_UINT64)) {
     NOTICE(
         "Modbus plugin: The data source of type \"%s\" is %s, not gauge. "
         "This will most likely result in problems, because the register type "
-        "is not UINT32.",
+        "is not UINT32 or UINT64.",
         data->type, DS_TYPE_TO_STRING(ds->ds[0].type));
   }
 
@@ -446,6 +452,9 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
       (data->register_type == REG_TYPE_FLOAT) ||
       (data->register_type == REG_TYPE_FLOAT_CDAB))
     values_num = 2;
+  else if ((data->register_type == REG_TYPE_INT64) ||
+           (data->register_type == REG_TYPE_UINT64))
+    values_num = 4;
   else
     values_num = 1;
 
@@ -530,7 +539,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned float value is %g",
           (double)float_value);
 
-    CAST_TO_VALUE_T(ds, vt, float_value);
+    CAST_TO_VALUE_T(ds, vt, float_value, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_FLOAT_CDAB) {
     float float_value;
@@ -541,7 +550,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned float value is %g",
           (double)float_value);
 
-    CAST_TO_VALUE_T(ds, vt, float_value);
+    CAST_TO_VALUE_T(ds, vt, float_value, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT32) {
     union {
@@ -555,7 +564,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int32 value is %" PRIi32,
           v.i32);
 
-    CAST_TO_VALUE_T(ds, vt, v.i32);
+    CAST_TO_VALUE_T(ds, vt, v.i32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT32_CDAB) {
     union {
@@ -569,7 +578,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int32 value is %" PRIi32,
           v.i32);
 
-    CAST_TO_VALUE_T(ds, vt, v.i32);
+    CAST_TO_VALUE_T(ds, vt, v.i32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_INT16) {
     union {
@@ -584,7 +593,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned int16 value is %" PRIi16,
           v.i16);
 
-    CAST_TO_VALUE_T(ds, vt, v.i16);
+    CAST_TO_VALUE_T(ds, vt, v.i16, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_UINT32) {
     uint32_t v32;
@@ -595,7 +604,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint32 value is %" PRIu32,
           v32);
 
-    CAST_TO_VALUE_T(ds, vt, v32);
+    CAST_TO_VALUE_T(ds, vt, v32, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else if (data->register_type == REG_TYPE_UINT32_CDAB) {
     uint32_t v32;
@@ -606,7 +615,34 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint32 value is %" PRIu32,
           v32);
 
-    CAST_TO_VALUE_T(ds, vt, v32);
+    CAST_TO_VALUE_T(ds, vt, v32, data->scale, data->shift);
+    mb_submit(host, slave, data, vt);
+  } else if (data->register_type == REG_TYPE_UINT64) {
+    uint64_t v64;
+    value_t vt;
+
+    v64 = (((uint64_t)values[0]) << 48) | (((uint64_t)values[1]) << 32) |
+          (((uint64_t)values[2]) << 16) | (((uint64_t)values[3]));
+    DEBUG("Modbus plugin: mb_read_data: "
+          "Returned uint64 value is %" PRIu64,
+          v64);
+
+    CAST_TO_VALUE_T(ds, vt, v64, data->scale, data->shift);
+    mb_submit(host, slave, data, vt);
+  } else if (data->register_type == REG_TYPE_INT64) {
+    union {
+      uint64_t u64;
+      int64_t i64;
+    } v;
+    value_t vt;
+
+    v.u64 = (((uint64_t)values[0]) << 48) | (((uint64_t)values[1]) << 32) |
+            (((uint64_t)values[2]) << 16) | ((uint64_t)values[3]);
+    DEBUG("Modbus plugin: mb_read_data: "
+          "Returned uint64 value is %" PRIi64,
+          v.i64);
+
+    CAST_TO_VALUE_T(ds, vt, v.i64, data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   } else /* if (data->register_type == REG_TYPE_UINT16) */
   {
@@ -616,7 +652,7 @@ static int mb_read_data(mb_host_t *host, mb_slave_t *slave, /* {{{ */
           "Returned uint16 value is %" PRIu16,
           values[0]);
 
-    CAST_TO_VALUE_T(ds, vt, values[0]);
+    CAST_TO_VALUE_T(ds, vt, values[0], data->scale, data->shift);
     mb_submit(host, slave, data, vt);
   }
 
@@ -723,6 +759,8 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
   data.name = NULL;
   data.register_type = REG_TYPE_UINT16;
   data.next = NULL;
+  data.scale = 1;
+  data.shift = 0;
 
   status = cf_util_get_string(ci, &data.name);
   if (status != 0)
@@ -736,6 +774,10 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
     else if (strcasecmp("Instance", child->key) == 0)
       status = cf_util_get_string_buffer(child, data.instance,
                                          sizeof(data.instance));
+    else if (strcasecmp("Scale", child->key) == 0)
+      status = cf_util_get_double(child, &data.scale);
+    else if (strcasecmp("Shift", child->key) == 0)
+      status = cf_util_get_double(child, &data.shift);
     else if (strcasecmp("RegisterBase", child->key) == 0)
       status = cf_util_get_int(child, &data.register_base);
     else if (strcasecmp("RegisterType", child->key) == 0) {
@@ -759,6 +801,10 @@ static int mb_config_add_data(oconfig_item_t *ci) /* {{{ */
         data.register_type = REG_TYPE_FLOAT;
       else if (strcasecmp("FloatLE", tmp) == 0)
         data.register_type = REG_TYPE_FLOAT_CDAB;
+      else if (strcasecmp("Uint64", tmp) == 0)
+        data.register_type = REG_TYPE_UINT64;
+      else if (strcasecmp("Int64", tmp) == 0)
+        data.register_type = REG_TYPE_INT64;
       else {
         ERROR("Modbus plugin: The register type \"%s\" is unknown.", tmp);
         status = -1;
index 28ee337..444e8ea 100644 (file)
@@ -352,9 +352,8 @@ static int statsd_handle_set(char const *name, /* {{{ */
   status = c_avl_insert(metric->set, set_key, /* value = */ NULL);
   if (status < 0) {
     pthread_mutex_unlock(&metrics_lock);
-    if (status < 0)
-      ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
-            set_key, status);
+    ERROR("statsd plugin: c_avl_insert (\"%s\") failed with status %i.",
+          set_key, status);
     sfree(set_key);
     return -1;
   } else if (status > 0) /* key already exists */
diff --git a/src/utils_deq.h b/src/utils_deq.h
new file mode 100644 (file)
index 0000000..3182baa
--- /dev/null
@@ -0,0 +1,214 @@
+/**
+ * collectd - src/utils_deq.h
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Andy Smith <ansmith@redhat.com>
+ */
+
+#ifndef utils_deq_h
+#define utils_deq_h 1
+
+#include <assert.h>
+#include <memory.h>
+#include <stdlib.h>
+
+#define CT_ASSERT(exp)                                                         \
+  { assert(exp); }
+
+#define NEW(t) (t *)malloc(sizeof(t))
+#define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n))
+#define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n))
+
+#define ZERO(p) memset(p, 0, sizeof(*p))
+
+#define DEQ_DECLARE(i, d)                                                      \
+  typedef struct {                                                             \
+    i *head;                                                                   \
+    i *tail;                                                                   \
+    i *scratch;                                                                \
+    size_t size;                                                               \
+  } d
+
+#define DEQ_LINKS_N(n, t)                                                      \
+  t *prev##n;                                                                  \
+  t *next##n
+#define DEQ_LINKS(t) DEQ_LINKS_N(, t)
+#define DEQ_EMPTY                                                              \
+  { 0, 0, 0, 0 }
+
+#define DEQ_INIT(d)                                                            \
+  do {                                                                         \
+    (d).head = 0;                                                              \
+    (d).tail = 0;                                                              \
+    (d).scratch = 0;                                                           \
+    (d).size = 0;                                                              \
+  } while (0)
+#define DEQ_IS_EMPTY(d) ((d).head == 0)
+#define DEQ_ITEM_INIT_N(n, i)                                                  \
+  do {                                                                         \
+    (i)->next##n = 0;                                                          \
+    (i)->prev##n = 0;                                                          \
+  } while (0)
+#define DEQ_ITEM_INIT(i) DEQ_ITEM_INIT_N(, i)
+#define DEQ_HEAD(d) ((d).head)
+#define DEQ_TAIL(d) ((d).tail)
+#define DEQ_SIZE(d) ((d).size)
+#define DEQ_NEXT_N(n, i) (i)->next##n
+#define DEQ_NEXT(i) DEQ_NEXT_N(, i)
+#define DEQ_PREV_N(n, i) (i)->prev##n
+#define DEQ_PREV(i) DEQ_PREV_N(, i)
+#define DEQ_MOVE(d1, d2)                                                       \
+  do {                                                                         \
+    d2 = d1;                                                                   \
+    DEQ_INIT(d1);                                                              \
+  } while (0)
+/**
+ *@pre ptr points to first element of deq
+ *@post ptr points to first element of deq that passes test, or 0. Test should
+ *involve ptr.
+ */
+#define DEQ_FIND_N(n, ptr, test)                                               \
+  while ((ptr) && !(test))                                                     \
+    ptr = DEQ_NEXT_N(n, ptr);
+#define DEQ_FIND(ptr, test) DEQ_FIND_N(, ptr, test)
+
+#define DEQ_INSERT_HEAD_N(n, d, i)                                             \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    if ((d).head) {                                                            \
+      (i)->next##n = (d).head;                                                 \
+      (d).head->prev##n = i;                                                   \
+    } else {                                                                   \
+      (d).tail = i;                                                            \
+      (i)->next##n = 0;                                                        \
+      CT_ASSERT((d).size == 0);                                                \
+    }                                                                          \
+    (i)->prev##n = 0;                                                          \
+    (d).head = i;                                                              \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_HEAD(d, i) DEQ_INSERT_HEAD_N(, d, i)
+
+#define DEQ_INSERT_TAIL_N(n, d, i)                                             \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    if ((d).tail) {                                                            \
+      (i)->prev##n = (d).tail;                                                 \
+      (d).tail->next##n = i;                                                   \
+    } else {                                                                   \
+      (d).head = i;                                                            \
+      (i)->prev##n = 0;                                                        \
+      CT_ASSERT((d).size == 0);                                                \
+    }                                                                          \
+    (i)->next##n = 0;                                                          \
+    (d).tail = i;                                                              \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_TAIL(d, i) DEQ_INSERT_TAIL_N(, d, i)
+
+#define DEQ_REMOVE_HEAD_N(n, d)                                                \
+  do {                                                                         \
+    CT_ASSERT((d).head);                                                       \
+    if ((d).head) {                                                            \
+      (d).scratch = (d).head;                                                  \
+      (d).head = (d).head->next##n;                                            \
+      if ((d).head == 0) {                                                     \
+        (d).tail = 0;                                                          \
+        CT_ASSERT((d).size == 1);                                              \
+      } else                                                                   \
+        (d).head->prev##n = 0;                                                 \
+      (d).size--;                                                              \
+      (d).scratch->next##n = 0;                                                \
+      (d).scratch->prev##n = 0;                                                \
+    }                                                                          \
+  } while (0)
+#define DEQ_REMOVE_HEAD(d) DEQ_REMOVE_HEAD_N(, d)
+
+#define DEQ_REMOVE_TAIL_N(n, d)                                                \
+  do {                                                                         \
+    CT_ASSERT((d).tail);                                                       \
+    if ((d).tail) {                                                            \
+      (d).scratch = (d).tail;                                                  \
+      (d).tail = (d).tail->prev##n;                                            \
+      if ((d).tail == 0) {                                                     \
+        (d).head = 0;                                                          \
+        CT_ASSERT((d).size == 1);                                              \
+      } else                                                                   \
+        (d).tail->next##n = 0;                                                 \
+      (d).size--;                                                              \
+      (d).scratch->next##n = 0;                                                \
+      (d).scratch->prev##n = 0;                                                \
+    }                                                                          \
+  } while (0)
+#define DEQ_REMOVE_TAIL(d) DEQ_REMOVE_TAIL_N(, d)
+
+#define DEQ_INSERT_AFTER_N(n, d, i, a)                                         \
+  do {                                                                         \
+    CT_ASSERT((i)->next##n == 0);                                              \
+    CT_ASSERT((i)->prev##n == 0);                                              \
+    CT_ASSERT(a);                                                              \
+    if ((a)->next##n)                                                          \
+      (a)->next##n->prev##n = (i);                                             \
+    else                                                                       \
+      (d).tail = (i);                                                          \
+    (i)->next##n = (a)->next##n;                                               \
+    (i)->prev##n = (a);                                                        \
+    (a)->next##n = (i);                                                        \
+    (d).size++;                                                                \
+  } while (0)
+#define DEQ_INSERT_AFTER(d, i, a) DEQ_INSERT_AFTER_N(, d, i, a)
+
+#define DEQ_REMOVE_N(n, d, i)                                                  \
+  do {                                                                         \
+    if ((i)->next##n)                                                          \
+      (i)->next##n->prev##n = (i)->prev##n;                                    \
+    else                                                                       \
+      (d).tail = (i)->prev##n;                                                 \
+    if ((i)->prev##n)                                                          \
+      (i)->prev##n->next##n = (i)->next##n;                                    \
+    else                                                                       \
+      (d).head = (i)->next##n;                                                 \
+    CT_ASSERT((d).size > 0);                                                   \
+    (d).size--;                                                                \
+    (i)->next##n = 0;                                                          \
+    (i)->prev##n = 0;                                                          \
+    CT_ASSERT((d).size || (!(d).head && !(d).tail));                           \
+  } while (0)
+#define DEQ_REMOVE(d, i) DEQ_REMOVE_N(, d, i)
+
+#define DEQ_APPEND_N(n, d1, d2)                                                \
+  do {                                                                         \
+    if (!(d1).head)                                                            \
+      (d1) = (d2);                                                             \
+    else if ((d2).head) {                                                      \
+      (d1).tail->next##n = (d2).head;                                          \
+      (d2).head->prev##n = (d1).tail;                                          \
+      (d1).tail = (d2).tail;                                                   \
+      (d1).size += (d2).size;                                                  \
+    }                                                                          \
+    DEQ_INIT(d2);                                                              \
+  } while (0)
+#define DEQ_APPEND(d1, d2) DEQ_APPEND_N(, d1, d2)
+
+#endif
index 3785385..9f87d2c 100644 (file)
@@ -1898,7 +1898,9 @@ static int virt_notif_thread_init(virt_notif_thread_t *thread_data) {
    * domain_event_cb_id to '-1'
    */
   thread_data->domain_event_cb_id = -1;
-  thread_data->is_active = 0;
+  pthread_mutex_lock(&thread_data->active_mutex);
+  thread_data->is_active = false;
+  pthread_mutex_unlock(&thread_data->active_mutex);
 
   return 0;
 }