Merge pull request #2157 from maryamtahhan/dpdkevents_upstream
authorRuben Kerkhof <ruben@rubenkerkhof.com>
Fri, 24 Feb 2017 16:19:21 +0000 (17:19 +0100)
committerGitHub <noreply@github.com>
Fri, 24 Feb 2017 16:19:21 +0000 (17:19 +0100)
Plugin for getting DPDK ports link status and keep alive events.

22 files changed:
AUTHORS
ChangeLog
bindings/java/org/collectd/java/GenericJMXConfValue.java
configure.ac
contrib/redhat/collectd.spec
contrib/systemd.collectd.service
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/collectd.c
src/daemon/plugin.c
src/dpdkstat.c
src/intel_rdt.c
src/log_logstash.c
src/logfile.c
src/mqtt.c
src/nut.c
src/ovs_events.c
src/utils_dpdk.c
src/utils_dpdk.h
src/virt.c
src/write_riemann_threshold.c
version-gen.sh

diff --git a/AUTHORS b/AUTHORS
index 8962e77..d866c70 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -294,6 +294,9 @@ Sjoerd van der Berg <harekiet at gmail.com>
 Stefan Hacker <stefan.hacker at web.de>
  - teamspeak2 plugin.
 
+Steven Bell <stv.bell07 at gmail.com>
+ - nut plugin.
+
 Sven Trenkel <collectd at semidefinite.de>
  - netapp plugin.
  - python plugin.
index db0b062..9f1b639 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,22 @@
+2017-01-23, Version 5.7.1
+       * collectd: Handling of boolean configuration options has been unified.
+         Thanks to Sebastian Harl. #2083, #2098
+       * collectd: Reporting of internal statistics has been fixed. Thanks to
+         Florian Forster. #2108
+       * collectd, various plugins: Bugs and issues reported by scan-build and
+         coverity-scan have been fixed. Thanks to Ruben Kerkhof and Florian
+         Forster.
+       * Build system: Parallel build have been fixed. Thanks to Ruben Kerkhof.
+         #2110
+       * DPDKStat plugin: Portability issues and a double-close bug have been
+         fixed. Thanks to Ruben Kerkhof and Marc Fournier.
+       * Intel RDT plugin: A check for the libpqos library version has been
+         added. Thanks to Serhiy Pshyk.
+       * NetApp plugin: Compilation problems have been corrected. Thanks to
+         Florian Forster. #2120
+       * Write Prometheus plugin: A memory leak has been fixed. Thanks to Ruben
+         Kerkhof.
+
 2016-12-12, Version 5.7.0
        * Documentation: The Turbostat plugin section has been improved. Thanks
          to Florian Forster
index 63b7628..6d3d688 100644 (file)
@@ -30,6 +30,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.Iterator;
 import java.util.ArrayList;
 
@@ -128,6 +130,14 @@ class GenericJMXConfValue
     {
       return (BigInteger.ZERO.add ((BigInteger) obj));
     }
+    else if (obj instanceof AtomicInteger)
+    {
+        return (new Integer(((AtomicInteger) obj).get()));
+    }
+    else if (obj instanceof AtomicLong)
+    {
+        return (new Long(((AtomicLong) obj).get()));
+    }
 
     return (null);
   } /* }}} Number genericObjectToNumber */
index 0c9368d..0154d82 100644 (file)
@@ -5202,6 +5202,11 @@ if test "x$with_libupsclient" = "xyes"; then
     [with_libupsclient="no (symbol upscli_connect not found)"]
   )
 
+  AC_CHECK_LIB([upsclient], [upscli_init],
+    [AC_DEFINE([WITH_UPSCLIENT_27], [1], [At least version 2-7])],
+    []
+  )
+
   LDFLAGS="$SAVE_LDFLAGS"
 fi
 
index 350c357..451589d 100644 (file)
 
 Summary:       Statistics collection and monitoring daemon
 Name:          collectd
-Version:       5.7.0
-Release:       4%{?dist}
+Version:       5.7.1
+Release:       2%{?dist}
 URL:           https://collectd.org
 Source:                https://collectd.org/files/%{name}-%{version}.tar.bz2
 License:       GPLv2
 Group:         System Environment/Daemons
 BuildRoot:     %{_tmppath}/%{name}-%{version}-root
-BuildRequires: libgcrypt-devel, kernel-headers, libcap-devel, which
+BuildRequires: libgcrypt-devel, kernel-headers, libcap-devel, which, xfsprogs-devel
 Vendor:                collectd development team <collectd@verplant.org>
 
 %if 0%{?fedora} || 0%{?rhel} >= 7
@@ -2632,6 +2632,13 @@ fi
 %doc contrib/
 
 %changelog
+* Wed Feb 22 2017 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.7.1-2
+- Enable XFS support in df plugin
+- Fix bogus date in changelog
+
+* Sun Jan 01 2017 Marc Fournier <marc.fournier@camptocamp.com> - 5.7.1-1
+- New upstream version
+
 * Sat Dec 31 2016 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.7.0-4
 - Add new ovs_events plugin
 
index d0f1bde..8a1fc27 100644 (file)
@@ -18,6 +18,7 @@ ProtectHome=true
 #   ceph            CAP_DAC_OVERRIDE
 #   dns             CAP_NET_RAW
 #   exec            CAP_SETUID CAP_SETGID
+#   intel_rdt       CAP_SYS_RAWIO
 #   iptables        CAP_NET_ADMIN
 #   ping            CAP_NET_RAW
 #   turbostat       CAP_SYS_RAWIO
index 4ee8001..dc7d6eb 100644 (file)
 #    ProcessType "secondary"
 #    FilePrefix "rte"
 #  </EAL>
+#  SharedMemObj "dpdk_collectd_stats_0"
 #  EnabledPortMask 0xffff
 #  PortName "interface1"
 #  PortName "interface2"
 
 #<Plugin nut>
 #      UPS "upsname@hostname:port"
+#      ForceSSL true
+#      VerifyPeer true
+#      CAPath "/path/to/folder"
 #</Plugin>
 
 #<Plugin olsrd>
 #  Socket "/var/run/openvswitch/db.sock"
 #  Interfaces "br0" "veth0"
 #  SendNotification false
+#  DispatchValues true
 #</Plugin>
 
 #<Plugin perl>
 #      InterfaceFormat name
 #      PluginInstanceFormat name
 #      Instances 1
+#      ExtraStats "disk pcpu"
 #</Plugin>
 
 #<Plugin vmem>
index a222da7..6d927d2 100644 (file)
@@ -2520,6 +2520,7 @@ B<Synopsis:>
      FilePrefix "rte"
      SocketMemory "1024"
    </EAL>
+   SharedMemObj "dpdk_collectd_stats_0"
    EnabledPortMask 0xffff
    PortName "interface1"
    PortName "interface2"
@@ -2556,7 +2557,12 @@ sockets in MB. This is an optional value.
 
 =back
 
-=over 4
+=over 3
+
+=item B<SharedMemObj> I<Mask>
+A string containing the name of the shared memory object that should be used to
+share stats from the DPDK secondary process to the collectd dpdkstat plugin.
+Defaults to dpdk_collectd_stats if no other value is configured.
 
 =item B<EnabledPortMask> I<Mask>
 
@@ -3080,6 +3086,10 @@ allows to monitor instructions per clock (IPC).
 Monitor events are hardware dependant. Monitoring capabilities are detected on
 plugin initialization and only supported events are monitored.
 
+B<Note:> I<intel_rdt> plugin is using model-specific registers (MSRs), which
+require an additional capability to be enabled if collectd is run as a service.
+Please refer to I<contrib/systemd.collectd.service> file for more details.
+
 B<Synopsis:>
 
   <Plugin "intel_rdt">
@@ -5239,6 +5249,35 @@ making it through.
 Add a UPS to collect data from. The format is identical to the one accepted by
 L<upsc(8)>.
 
+=item B<ForceSSL> B<true>|B<false>
+
+Stops connections from falling back to unsecured if an SSL connection
+cannot be established. Defaults to false if undeclared.
+
+=item B<VerifyPeer> I<true>|I<false>
+
+If set to true, requires a CAPath be provided. Will use the CAPath to find
+certificates to use as Trusted Certificates to validate a upsd server certificate.
+If validation of the upsd server certificate fails, the connection will not be
+established. If ForceSSL is undeclared or set to false, setting VerifyPeer to true
+will override and set ForceSSL to true.
+
+=item B<CAPath> I/path/to/certs/folder
+
+If VerifyPeer is set to true, this is required. Otherwise this is ignored.
+The folder pointed at must contain certificate(s) named according to their hash.
+Ex: XXXXXXXX.Y where X is the hash value of a cert and Y is 0. If name collisions
+occur because two different certs have the same hash value, Y can be  incremented
+in order to avoid conflict. To create a symbolic link to a certificate the following
+command can be used from within the directory where the cert resides:
+
+C<ln -s some.crt ./$(openssl x509 -hash -noout -in some.crt).0>
+
+Alternatively, the package openssl-perl provides a command C<c_rehash> that will
+generate links like the one described above for ALL certs in a given folder.
+Example usage:
+C<c_rehash /path/to/certs/folder>
+
 =back
 
 =head2 Plugin C<olsrd>
@@ -5599,6 +5638,7 @@ B<Synopsis:>
    Socket "/var/run/openvswitch/db.sock"
    Interfaces "br0" "veth0"
    SendNotification false
+   DispatchValues true
  </Plugin>
 
 The plugin provides the following configuration options:
@@ -5638,6 +5678,12 @@ Default: empty (all interfaces on all bridges are monitored)
 If set to true, OVS link notifications (interface status and OVS DB connection
 terminate) are sent to collectd. Default value is false.
 
+=item B<DispatchValues> I<true|false>
+
+Dispatch the OVS DB interface link status value with configured plugin interval.
+Defaults to true. Please note, if B<SendNotification> and B<DispatchValues>
+options are false, no OVS information will be provided by the plugin.
+
 =back
 
 B<Note:> By default, the global interval setting is used within which to
@@ -8194,6 +8240,17 @@ How many read instances you want to use for this plugin. The default is one,
 and the sensible setting is a multiple of the B<ReadThreads> value.
 If you are not sure, just use the default setting.
 
+=item B<ExtraStats> B<string>
+
+Report additional extra statistics. The default is no extra statistics, preserving
+the previous behaviour of the plugin. If unsure, leave the default. If enabled,
+allows the plugin to reported more detailed statistics about the behaviour of
+Virtual Machines. The argument is a space-separated list of selectors.
+Currently supported selectors are:
+B<disk> report extra statistics like number of flush operations and total
+service time for read, write and flush operations.
+B<pcpu> report the physical user/system cpu time consumed by the hypervisor, per-vm.
+
 =back
 
 =head2 Plugin C<vmem>
index f3df795..6b7c413 100644 (file)
@@ -521,7 +521,7 @@ int main(int argc, char **argv) {
    */
   if (cf_read(configfile)) {
     fprintf(stderr, "Error: Reading the config file failed!\n"
-                    "Read the syslog for details.\n");
+                    "Read the logs for details.\n");
     return (1);
   }
 
index c8fc15a..b37e9b3 100644 (file)
@@ -153,14 +153,13 @@ static const char *plugin_get_dir(void) {
     return (plugindir);
 }
 
-static void plugin_update_internal_statistics(void) { /* {{{ */
-
+static int plugin_update_internal_statistics(void) { /* {{{ */
   gauge_t copy_write_queue_length = (gauge_t)write_queue_length;
 
   /* Initialize `vl' */
   value_list_t vl = VALUE_LIST_INIT;
-  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
   sstrncpy(vl.plugin, "collectd", sizeof(vl.plugin));
+  vl.interval = plugin_get_interval();
 
   /* Write queue */
   sstrncpy(vl.plugin_instance, "write_queue", sizeof(vl.plugin_instance));
@@ -189,8 +188,8 @@ static void plugin_update_internal_statistics(void) { /* {{{ */
   vl.type_instance[0] = 0;
   plugin_dispatch_values(&vl);
 
-  return;
-} /* }}} void plugin_update_internal_statistics */
+  return 0;
+} /* }}} int plugin_update_internal_statistics */
 
 static void destroy_callback(callback_func_t *cf) /* {{{ */
 {
@@ -1558,8 +1557,10 @@ int plugin_init_all(void) {
   /* Init the value cache */
   uc_init();
 
-  if (IS_TRUE(global_option_get("CollectInternalStats")))
+  if (IS_TRUE(global_option_get("CollectInternalStats"))) {
     record_statistics = 1;
+    plugin_register_read("collectd", plugin_update_internal_statistics);
+  }
 
   chain_name = global_option_get("PreCacheChain");
   pre_cache_chain = fc_chain_get_by_name(chain_name);
@@ -1647,9 +1648,6 @@ int plugin_init_all(void) {
 
 /* TODO: Rename this function. */
 void plugin_read_all(void) {
-  if (record_statistics) {
-    plugin_update_internal_statistics();
-  }
   uc_check_timeout();
 
   return;
index e335072..6b057f2 100644 (file)
@@ -94,12 +94,12 @@ typedef struct dpdk_stats_ctx_s dpdk_stats_ctx_t;
 #define DPDK_STATS_CTX_GET(a) ((dpdk_stats_ctx_t *)dpdk_helper_priv_get(a))
 
 dpdk_helper_ctx_t *g_hc = NULL;
-
+static char g_shm_name[DATA_MAX_NAME_LEN] = DPDK_STATS_NAME;
+static int dpdk_stats_reinit_helper();
 static void dpdk_stats_default_config(void) {
   dpdk_stats_ctx_t *ec = DPDK_STATS_CTX_GET(g_hc);
 
   ec->config.interval = plugin_get_interval();
-
   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
     ec->config.port_name[i][0] = 0;
   }
@@ -114,16 +114,15 @@ static int dpdk_stats_preinit(void) {
     return 0;
   }
 
-  int ret = dpdk_helper_init(DPDK_STATS_NAME, sizeof(dpdk_stats_ctx_t), &g_hc);
+  int ret = dpdk_helper_init(g_shm_name, sizeof(dpdk_stats_ctx_t), &g_hc);
   if (ret != 0) {
     char errbuf[ERR_BUF_SIZE];
     ERROR("%s: failed to initialize %s helper(error: %s)", DPDK_STATS_PLUGIN,
-          DPDK_STATS_NAME, sstrerror(errno, errbuf, sizeof(errbuf)));
+          g_shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
     return ret;
   }
 
   dpdk_stats_default_config();
-
   return ret;
 }
 
@@ -144,6 +143,10 @@ static int dpdk_stats_config(oconfig_item_t *ci) {
       ctx->config.enabled_port_mask = child->values[0].value.number;
       DEBUG("%s: Enabled Port Mask 0x%X", DPDK_STATS_PLUGIN,
             ctx->config.enabled_port_mask);
+    } else if (strcasecmp("SharedMemObj", child->key) == 0) {
+      cf_util_get_string_buffer(child, g_shm_name, sizeof(g_shm_name));
+      DEBUG("%s: Shared memory object %s", DPDK_STATS_PLUGIN, g_shm_name);
+      dpdk_stats_reinit_helper();
     } else if (strcasecmp("EAL", child->key) == 0) {
       ret = dpdk_helper_eal_config_parse(g_hc, child);
       if (ret)
@@ -175,73 +178,56 @@ static int dpdk_stats_config(oconfig_item_t *ci) {
 }
 
 static int dpdk_helper_stats_get(dpdk_helper_ctx_t *phc) {
-  dpdk_stats_ctx_t *ctx = DPDK_STATS_CTX_GET(phc);
-
-  /* get stats from DPDK */
-
-  uint8_t ports_count = rte_eth_dev_count();
-  if (ports_count == 0) {
-    DPDK_CHILD_LOG("%s: No DPDK ports available. "
-                   "Check bound devices to DPDK driver.\n",
-                   DPDK_STATS_PLUGIN);
-    return -ENODEV;
-  }
-
-  if (ports_count > RTE_MAX_ETHPORTS)
-    ports_count = RTE_MAX_ETHPORTS;
-
-  ctx->ports_count = ports_count;
-
   int len = 0;
   int ret = 0;
   int stats = 0;
+  dpdk_stats_ctx_t *ctx = DPDK_STATS_CTX_GET(phc);
 
-  for (uint8_t i = 0; i < ports_count; i++) {
+  /* get stats from DPDK */
+  for (uint8_t i = 0; i < ctx->ports_count; i++) {
     if (!(ctx->config.enabled_port_mask & (1 << i)))
       continue;
+
     ctx->port_read_time[i] = cdtime();
+    /* Store available stats array length for port */
     len = ctx->port_stats_count[i];
+
     ret = rte_eth_xstats_get(i, &ctx->xstats[stats], len);
-    if (ret < 0 || ret != len) {
-      DPDK_CHILD_LOG("%s: Error reading stats (port=%d; len=%d)\n",
-                     DPDK_STATS_PLUGIN, i, len);
+    if (ret < 0 || ret > len) {
+      DPDK_CHILD_LOG(DPDK_STATS_PLUGIN
+                     ": Error reading stats (port=%d; len=%d, ret=%d)\n",
+                     i, len, ret);
+      ctx->port_stats_count[i] = 0;
       return -1;
     }
 #if RTE_VERSION >= RTE_VERSION_16_07
     ret = rte_eth_xstats_get_names(i, &ctx->xnames[stats], len);
-    if (ret < 0 || ret != len) {
-      DPDK_CHILD_LOG("%s: Error reading stat names (port=%d; len=%d)\n",
-                     DPDK_STATS_PLUGIN, i, len);
+    if (ret < 0 || ret > len) {
+      DPDK_CHILD_LOG(DPDK_STATS_PLUGIN
+                     ": Error reading stat names (port=%d; len=%d ret=%d)\n",
+                     i, len, ret);
+      ctx->port_stats_count[i] = 0;
       return -1;
     }
 #endif
-    stats += len;
+    ctx->port_stats_count[i] = ret;
+    stats += ctx->port_stats_count[i];
   }
 
-  assert(stats == ctx->stats_count);
-
+  assert(stats <= ctx->stats_count);
   return 0;
 }
 
 static int dpdk_helper_stats_count_get(dpdk_helper_ctx_t *phc) {
-  dpdk_stats_ctx_t *ctx = DPDK_STATS_CTX_GET(phc);
-
-  uint8_t ports = rte_eth_dev_count();
-  if (ports == 0) {
-    DPDK_CHILD_LOG("%s: No DPDK ports available. "
-                   "Check bound devices to DPDK driver.\n",
-                   DPDK_STATS_PLUGIN);
+  uint8_t ports = dpdk_helper_eth_dev_count();
+  if (ports == 0)
     return -ENODEV;
-  }
-
-  if (ports > RTE_MAX_ETHPORTS)
-    ports = RTE_MAX_ETHPORTS;
 
+  dpdk_stats_ctx_t *ctx = DPDK_STATS_CTX_GET(phc);
   ctx->ports_count = ports;
 
   int len = 0;
   int stats_count = 0;
-
   for (int i = 0; i < ports; i++) {
     if (!(ctx->config.enabled_port_mask & (1 << i)))
       continue;
@@ -264,9 +250,12 @@ static int dpdk_helper_stats_count_get(dpdk_helper_ctx_t *phc) {
   return stats_count;
 }
 
+static int dpdk_stats_get_size(dpdk_helper_ctx_t *phc) {
+  return (dpdk_helper_data_size_get(phc) - sizeof(dpdk_stats_ctx_t));
+}
+
 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
   /* this function is called from helper context */
-  int ret = 0;
 
   if (phc == NULL) {
     DPDK_CHILD_LOG("%s: Invalid argument(phc)\n", DPDK_STATS_PLUGIN);
@@ -278,34 +267,23 @@ int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
     return -EINVAL;
   }
 
-  dpdk_stats_ctx_t *ctx = DPDK_STATS_CTX_GET(phc);
-
-  if (ctx->stats_count == 0) {
-
-    int stats_count = dpdk_helper_stats_count_get(phc);
+  int stats_count = dpdk_helper_stats_count_get(phc);
+  if (stats_count < 0) {
+    return stats_count;
+  }
 
-    if (stats_count < 0) {
-      return stats_count;
-    }
+  DPDK_STATS_CTX_GET(phc)->stats_count = stats_count;
+  int stats_size = stats_count * DPDK_STATS_CTX_GET_XSTAT_SIZE;
 
-    int stats_size = stats_count * DPDK_STATS_CTX_GET_XSTAT_SIZE;
-    ctx->stats_count = stats_count;
-
-    if ((dpdk_helper_data_size_get(phc) - sizeof(dpdk_stats_ctx_t)) <
-        stats_size) {
-      DPDK_CHILD_LOG(
-          "%s:%s:%d not enough space for stats (available=%d, "
-          "needed=%d)\n",
-          DPDK_STATS_PLUGIN, __FUNCTION__, __LINE__,
-          (int)(dpdk_helper_data_size_get(phc) - sizeof(dpdk_stats_ctx_t)),
-          stats_size);
-      return -ENOBUFS;
-    }
+  if (dpdk_stats_get_size(phc) < stats_size) {
+    DPDK_CHILD_LOG(
+        DPDK_STATS_PLUGIN
+        ":%s:%d not enough space for stats (available=%d, needed=%d)\n",
+        __FUNCTION__, __LINE__, (int)dpdk_stats_get_size(phc), stats_size);
+    return -ENOBUFS;
   }
 
-  ret = dpdk_helper_stats_get(phc);
-
-  return ret;
+  return dpdk_helper_stats_get(phc);
 }
 
 static void dpdk_stats_resolve_cnt_type(char *cnt_type, size_t cnt_type_len,
@@ -314,17 +292,17 @@ static void dpdk_stats_resolve_cnt_type(char *cnt_type, size_t cnt_type_len,
   type_end = strrchr(cnt_name, '_');
 
   if ((type_end != NULL) && (strncmp(cnt_name, "rx_", strlen("rx_")) == 0)) {
-    if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
+    if (strstr(type_end, "bytes") != NULL) {
+      sstrncpy(cnt_type, "if_rx_octets", cnt_type_len);
+    } else if (strstr(type_end, "error") != NULL) {
       sstrncpy(cnt_type, "if_rx_errors", cnt_type_len);
-    } else if (strncmp(type_end, "_dropped", strlen("_dropped")) == 0) {
+    } else if (strstr(type_end, "dropped") != NULL) {
       sstrncpy(cnt_type, "if_rx_dropped", cnt_type_len);
-    } else if (strncmp(type_end, "_bytes", strlen("_bytes")) == 0) {
-      sstrncpy(cnt_type, "if_rx_octets", cnt_type_len);
-    } else if (strncmp(type_end, "_packets", strlen("_packets")) == 0) {
+    } else if (strstr(type_end, "packets") != NULL) {
       sstrncpy(cnt_type, "if_rx_packets", cnt_type_len);
-    } else if (strncmp(type_end, "_placement", strlen("_placement")) == 0) {
+    } else if (strstr(type_end, "_placement") != NULL) {
       sstrncpy(cnt_type, "if_rx_errors", cnt_type_len);
-    } else if (strncmp(type_end, "_buff", strlen("_buff")) == 0) {
+    } else if (strstr(type_end, "_buff") != NULL) {
       sstrncpy(cnt_type, "if_rx_errors", cnt_type_len);
     } else {
       /* Does not fit obvious type: use a more generic one */
@@ -333,13 +311,13 @@ static void dpdk_stats_resolve_cnt_type(char *cnt_type, size_t cnt_type_len,
 
   } else if ((type_end != NULL) &&
              (strncmp(cnt_name, "tx_", strlen("tx_"))) == 0) {
-    if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
+    if (strstr(type_end, "bytes") != NULL) {
+      sstrncpy(cnt_type, "if_tx_octets", cnt_type_len);
+    } else if (strstr(type_end, "error") != NULL) {
       sstrncpy(cnt_type, "if_tx_errors", cnt_type_len);
-    } else if (strncmp(type_end, "_dropped", strlen("_dropped")) == 0) {
+    } else if (strstr(type_end, "dropped") != NULL) {
       sstrncpy(cnt_type, "if_tx_dropped", cnt_type_len);
-    } else if (strncmp(type_end, "_bytes", strlen("_bytes")) == 0) {
-      sstrncpy(cnt_type, "if_tx_octets", cnt_type_len);
-    } else if (strncmp(type_end, "_packets", strlen("_packets")) == 0) {
+    } else if (strstr(type_end, "packets") != NULL) {
       sstrncpy(cnt_type, "if_tx_packets", cnt_type_len);
     } else {
       /* Does not fit obvious type: use a more generic one */
@@ -348,16 +326,14 @@ static void dpdk_stats_resolve_cnt_type(char *cnt_type, size_t cnt_type_len,
   } else if ((type_end != NULL) &&
              (strncmp(cnt_name, "flow_", strlen("flow_"))) == 0) {
 
-    if (strncmp(type_end, "_filters", strlen("_filters")) == 0) {
+    if (strstr(type_end, "_filters") != NULL) {
       sstrncpy(cnt_type, "operations", cnt_type_len);
-    } else if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
+    } else if (strstr(type_end, "error") != NULL)
       sstrncpy(cnt_type, "errors", cnt_type_len);
-    } else if (strncmp(type_end, "_filters", strlen("_filters")) == 0) {
-      sstrncpy(cnt_type, "filter_result", cnt_type_len);
-    }
+
   } else if ((type_end != NULL) &&
              (strncmp(cnt_name, "mac_", strlen("mac_"))) == 0) {
-    if (strncmp(type_end, "_errors", strlen("_errors")) == 0) {
+    if (strstr(type_end, "error") != NULL) {
       sstrncpy(cnt_type, "errors", cnt_type_len);
     }
   } else {
@@ -446,11 +422,11 @@ static int dpdk_stats_reinit_helper() {
   g_hc = NULL;
 
   int ret;
-  ret = dpdk_helper_init(DPDK_STATS_NAME, data_size, &g_hc);
+  ret = dpdk_helper_init(g_shm_name, data_size, &g_hc);
   if (ret != 0) {
     char errbuf[ERR_BUF_SIZE];
     ERROR("%s: failed to initialize %s helper(error: %s)", DPDK_STATS_PLUGIN,
-          DPDK_STATS_NAME, sstrerror(errno, errbuf, sizeof(errbuf)));
+          g_shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
     return ret;
   }
 
@@ -494,7 +470,6 @@ static int dpdk_stats_read(user_data_t *ud) {
 
 static int dpdk_stats_init(void) {
   DPDK_STATS_TRACE();
-
   int ret = 0;
 
   ret = dpdk_stats_preinit();
@@ -513,8 +488,7 @@ static int dpdk_stats_shutdown(void) {
   ret = dpdk_helper_shutdown(g_hc);
   g_hc = NULL;
   if (ret != 0) {
-    ERROR("%s: failed to cleanup %s helper", DPDK_STATS_PLUGIN,
-          DPDK_STATS_NAME);
+    ERROR("%s: failed to cleanup %s helper", DPDK_STATS_PLUGIN, g_shm_name);
     return ret;
   }
 
index 6beac7b..e2e2e39 100644 (file)
 #define RDT_MAX_SOCKET_CORES 64
 #define RDT_MAX_CORES (RDT_MAX_SOCKET_CORES * RDT_MAX_SOCKETS)
 
+typedef enum {
+  UNKNOWN = 0,
+  CONFIGURATION_ERROR,
+} rdt_config_status;
+
 struct rdt_core_group_s {
   char *desc;
   size_t num_cores;
@@ -56,6 +61,8 @@ typedef struct rdt_ctx_s rdt_ctx_t;
 
 static rdt_ctx_t *g_rdt = NULL;
 
+static rdt_config_status g_state = UNKNOWN;
+
 static int isdup(const uint64_t *nums, size_t size, uint64_t val) {
   for (size_t i = 0; i < size; i++)
     if (nums[i] == val)
@@ -527,8 +534,14 @@ static int rdt_config(oconfig_item_t *ci) {
   int ret = 0;
 
   ret = rdt_preinit();
-  if (ret != 0)
-    return ret;
+  if (ret != 0) {
+    g_state = CONFIGURATION_ERROR;
+    /* if we return -1 at this point collectd
+      reports a failure in configuration and
+      aborts
+    */
+    goto exit;
+  }
 
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
@@ -536,8 +549,14 @@ static int rdt_config(oconfig_item_t *ci) {
     if (strcasecmp("Cores", child->key) == 0) {
 
       ret = rdt_config_cgroups(child);
-      if (ret != 0)
-        return ret;
+      if (ret != 0) {
+        g_state = CONFIGURATION_ERROR;
+        /* if we return -1 at this point collectd
+           reports a failure in configuration and
+           aborts
+         */
+        goto exit;
+      }
 
 #if COLLECT_DEBUG
       rdt_dump_cgroups();
@@ -548,6 +567,7 @@ static int rdt_config(oconfig_item_t *ci) {
     }
   }
 
+exit:
   return (0);
 }
 
@@ -630,6 +650,9 @@ static int rdt_read(__attribute__((unused)) user_data_t *ud) {
 static int rdt_init(void) {
   int ret;
 
+  if(g_state == CONFIGURATION_ERROR)
+    return (-1);
+
   ret = rdt_preinit();
   if (ret != 0)
     return ret;
index 8df32a8..0605930 100644 (file)
@@ -41,8 +41,6 @@
 #define HAVE_YAJL_V2 1
 #endif
 
-#define DEFAULT_LOGFILE LOCALSTATEDIR "/log/" PACKAGE_NAME ".json.log"
-
 #if COLLECT_DEBUG
 static int log_level = LOG_DEBUG;
 #else
@@ -149,8 +147,7 @@ static void log_logstash_print(yajl_gen g, int severity,
   pthread_mutex_lock(&file_lock);
 
   if (log_file == NULL) {
-    fh = fopen(DEFAULT_LOGFILE, "a");
-    do_close = 1;
+    fh = stderr;
   } else if (strcasecmp(log_file, "stdout") == 0) {
     fh = stdout;
     do_close = 0;
@@ -164,8 +161,7 @@ static void log_logstash_print(yajl_gen g, int severity,
 
   if (fh == NULL) {
     char errbuf[1024];
-    fprintf(stderr, "log_logstash plugin: fopen (%s) failed: %s\n",
-            (log_file == NULL) ? DEFAULT_LOGFILE : log_file,
+    fprintf(stderr, "log_logstash plugin: fopen (%s) failed: %s\n", log_file,
             sstrerror(errno, errbuf, sizeof(errbuf)));
   } else {
     fprintf(fh, "%s\n", buf);
index 0de639e..eca8c34 100644 (file)
@@ -31,8 +31,6 @@
 #include "common.h"
 #include "plugin.h"
 
-#define DEFAULT_LOGFILE LOCALSTATEDIR "/log/collectd.log"
-
 #if COLLECT_DEBUG
 static int log_level = LOG_DEBUG;
 #else
@@ -117,8 +115,7 @@ static void logfile_print(const char *msg, int severity,
   pthread_mutex_lock(&file_lock);
 
   if (log_file == NULL) {
-    fh = fopen(DEFAULT_LOGFILE, "a");
-    do_close = 1;
+    fh = stderr;
   } else if (strcasecmp(log_file, "stderr") == 0)
     fh = stderr;
   else if (strcasecmp(log_file, "stdout") == 0)
@@ -130,8 +127,7 @@ static void logfile_print(const char *msg, int severity,
 
   if (fh == NULL) {
     char errbuf[1024];
-    fprintf(stderr, "logfile plugin: fopen (%s) failed: %s\n",
-            (log_file == NULL) ? DEFAULT_LOGFILE : log_file,
+    fprintf(stderr, "logfile plugin: fopen (%s) failed: %s\n", log_file,
             sstrerror(errno, errbuf, sizeof(errbuf)));
   } else {
     if (print_timestamp)
index 33785c8..a962514 100644 (file)
@@ -448,6 +448,7 @@ static int publish(mqtt_client_conf_t *conf, char const *topic,
      * measure; we will try to reconnect the next time we have to publish a
      * message */
     conf->connected = 0;
+    mosquitto_disconnect(conf->mosq);
 
     pthread_mutex_unlock(&conf->lock);
     return (-1);
@@ -461,6 +462,7 @@ static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
                         value_list_t const *vl, mqtt_client_conf_t *conf) {
   char name[MQTT_MAX_TOPIC_SIZE];
   int status;
+  char *c;
 
   if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
     return (FORMAT_VL(buf, buf_len, vl));
@@ -473,6 +475,10 @@ static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
   if ((status < 0) || (((size_t)status) >= buf_len))
     return (ENOMEM);
 
+  while((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
+       *c = '_';
+  }
+
   return (0);
 } /* int format_topic */
 
index c0ee7ab..5acbdde 100644 (file)
--- a/src/nut.c
+++ b/src/nut.c
@@ -54,8 +54,11 @@ static nut_ups_t *upslist_head = NULL;
 static pthread_mutex_t read_lock = PTHREAD_MUTEX_INITIALIZER;
 static int read_busy = 0;
 
-static const char *config_keys[] = {"UPS"};
+static const char *config_keys[] = {"UPS", "FORCESSL", "VERIFYPEER", "CAPATH"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+static int force_ssl = 0;   // Initialized to default of 0 (false)
+static int verify_peer = 0; // Initialized to default of 0 (false)
+static char *ca_path = NULL;
 
 static void free_nut_ups_t(nut_ups_t *ups) {
   if (ups->conn != NULL) {
@@ -98,9 +101,51 @@ static int nut_add_ups(const char *name) {
   return (0);
 } /* int nut_add_ups */
 
+static int nut_force_ssl(const char *value) {
+  if (strcasecmp(value, "true") == 0)
+    force_ssl = 1;
+  else if (strcasecmp(value, "false") == 0)
+    force_ssl = 0; // Should already be set to 0 from initialization
+  else {
+    force_ssl = 0;
+    WARNING("nut plugin: nut_force_ssl: invalid FORCESSL value "
+            "found. Defaulting to false.");
+  }
+  return (0);
+} /* int nut_parse_force_ssl */
+
+static int nut_verify_peer(const char *value) {
+  if (strcasecmp(value, "true") == 0)
+    verify_peer = 1;
+  else if (strcasecmp(value, "false") == 0)
+    verify_peer = 0; // Should already be set to 0 from initialization
+  else {
+    verify_peer = 0;
+    WARNING("nut plugin: nut_verify_peer: invalid VERIFYPEER value "
+            "found. Defaulting to false.");
+  }
+  return (0);
+} /* int nut_verify_peer */
+
+static int nut_ca_path(const char *value) {
+  if (value != NULL && strcmp(value, "") != 0) {
+    ca_path = malloc(strlen(value) + 1);
+    strncpy(ca_path, value, (strlen(value) + 1));
+  } else {
+    ca_path = NULL; // Should alread be set to NULL from initialization
+  }
+  return (0);
+} /* int nut_ca_path */
+
 static int nut_config(const char *key, const char *value) {
   if (strcasecmp(key, "UPS") == 0)
     return (nut_add_ups(value));
+  else if (strcasecmp(key, "FORCESSL") == 0)
+    return (nut_force_ssl(value));
+  else if (strcasecmp(key, "VERIFYPEER") == 0)
+    return (nut_verify_peer(value));
+  else if (strcasecmp(key, "CAPATH") == 0)
+    return (nut_ca_path(value));
   else
     return (-1);
 } /* int nut_config */
@@ -123,6 +168,119 @@ static void nut_submit(nut_ups_t *ups, const char *type,
   plugin_dispatch_values(&vl);
 } /* void nut_submit */
 
+static int nut_connect(nut_ups_t *ups) {
+#ifdef WITH_UPSCLIENT_27
+  int status;
+  int ssl_status;
+  int ssl_flags;
+
+  if (verify_peer == 1 && force_ssl == 0) {
+    WARNING("nut plugin: nut_connect: VerifyPeer true but ForceSSL "
+            "false. Setting ForceSSL to true.");
+    force_ssl = 1;
+  }
+
+  if (verify_peer == 1 && ca_path == NULL) {
+    ERROR("nut plugin: nut_connect: VerifyPeer true but missing "
+          "CAPath value.");
+    return (-1);
+  }
+
+  if (verify_peer == 1) {
+    status = upscli_init(verify_peer, ca_path, NULL, NULL);
+
+    if (status != 1) {
+      ERROR("nut plugin: nut_connect: upscli_init (%i, %s) failed: %s",
+            verify_peer, ca_path, upscli_strerror(ups->conn));
+      upscli_cleanup();
+      return (-1);
+    }
+  } /* if (verify_peer == 1) */
+
+  if (verify_peer == 1)
+    ssl_flags = (UPSCLI_CONN_REQSSL | UPSCLI_CONN_CERTVERIF);
+  else if (force_ssl == 1)
+    ssl_flags = UPSCLI_CONN_REQSSL;
+  else
+    ssl_flags = UPSCLI_CONN_TRYSSL;
+
+  status = upscli_connect(ups->conn, ups->hostname, ups->port, ssl_flags);
+
+  if (status != 0) {
+    ERROR("nut plugin: nut_connect: upscli_connect (%s, %i) failed: %s",
+          ups->hostname, ups->port, upscli_strerror(ups->conn));
+    sfree(ups->conn);
+    upscli_cleanup();
+    return (-1);
+  } /* if (status != 0) */
+
+  INFO("nut plugin: Connection to (%s, %i) established.", ups->hostname,
+       ups->port);
+
+  // Output INFO or WARNING based on SSL and VERIFICATION
+  ssl_status = upscli_ssl(ups->conn); // 1 for SSL, 0 for not, -1 for error
+  if (ssl_status == 1 && verify_peer == 1) {
+    INFO("nut plugin: Connection is secured with SSL and certificate "
+         "has been verified.");
+  } else if (ssl_status == 1) {
+    INFO("nut plugin: Connection is secured with SSL with no verification "
+         "of server SSL certificate.");
+  } else if (ssl_status == 0) {
+    WARNING("nut plugin: Connection is unsecured (no SSL).");
+  } else {
+    ERROR("nut plugin: nut_connect: upscli_ssl failed: %s",
+          upscli_strerror(ups->conn));
+    sfree(ups->conn);
+    upscli_cleanup();
+    return (-1);
+  } /* if (ssl_status == 1 && verify_peer == 1) */
+  return (0);
+
+#else /* #ifdef WITH_UPSCLIENT_27 */
+  int status;
+  int ssl_status;
+  int ssl_flags;
+
+  if (verify_peer == 1 || ca_path != NULL) {
+    WARNING("nut plugin: nut_connect: Dependency libupsclient version "
+            "insufficient (<2.7) for VerifyPeer support. Ignoring VerifyPeer "
+            "and CAPath.");
+  }
+
+  if (force_ssl == 1)
+    ssl_flags = UPSCLI_CONN_REQSSL;
+  else
+    ssl_flags = UPSCLI_CONN_TRYSSL;
+
+  status = upscli_connect(ups->conn, ups->hostname, ups->port, ssl_flags);
+
+  if (status != 0) {
+    ERROR("nut plugin: nut_connect: upscli_connect (%s, %i) failed: %s",
+          ups->hostname, ups->port, upscli_strerror(ups->conn));
+    sfree(ups->conn);
+    return (-1);
+  } /* if (status != 0) */
+
+  INFO("nut plugin: Connection to (%s, %i) established.", ups->hostname,
+       ups->port);
+
+  // Output INFO or WARNING based on SSL
+  ssl_status = upscli_ssl(ups->conn); // 1 for SSL, 0 for not, -1 for error
+  if (ssl_status == 1) {
+    INFO("nut plugin: Connection is secured with SSL with no verification "
+         "of server SSL certificate.");
+  } else if (ssl_status == 0) {
+    WARNING("nut plugin: Connection is unsecured (no SSL).");
+  } else {
+    ERROR("nut plugin: nut_connect: upscli_ssl failed: %s",
+          upscli_strerror(ups->conn));
+    sfree(ups->conn);
+    return (-1);
+  } /* if (ssl_status == 1 && verify_peer == 1) */
+  return (0);
+#endif
+}
+
 static int nut_read_one(nut_ups_t *ups) {
   const char *query[3] = {"VAR", ups->upsname, NULL};
   unsigned int query_num = 2;
@@ -138,17 +296,10 @@ static int nut_read_one(nut_ups_t *ups) {
       return (-1);
     }
 
-    status =
-        upscli_connect(ups->conn, ups->hostname, ups->port, UPSCLI_CONN_TRYSSL);
-    if (status != 0) {
-      ERROR("nut plugin: nut_read_one: upscli_connect (%s, %i) failed: %s",
-            ups->hostname, ups->port, upscli_strerror(ups->conn));
-      sfree(ups->conn);
-      return (-1);
-    }
+    status = nut_connect(ups);
+    if (status == -1)
+      return -1;
 
-    INFO("nut plugin: Connection to (%s, %i) established.", ups->hostname,
-         ups->port);
   } /* if (ups->conn == NULL) */
 
   /* nut plugin: nut_read_one: upscli_list_start (adpos) failed: Protocol
@@ -159,6 +310,9 @@ static int nut_read_one(nut_ups_t *ups) {
           ups->upsname, upscli_strerror(ups->conn));
     upscli_disconnect(ups->conn);
     sfree(ups->conn);
+#ifdef WITH_UPSCLIENT_27
+    upscli_cleanup();
+#endif
     return (-1);
   }
 
@@ -246,6 +400,9 @@ static int nut_shutdown(void) {
     free_nut_ups_t(this);
     this = next;
   }
+#ifdef WITH_UPSCLIENT_27
+  upscli_cleanup();
+#endif
 
   return (0);
 } /* int nut_shutdown */
index c77bde4..8c2cd12 100644 (file)
@@ -90,6 +90,9 @@ static ovs_events_ctx_t ovs_events_ctx = {
                .ovs_db_serv = "6640"}      /* use default OVS DB service */
 };
 
+/* Forward declaration */
+static int ovs_events_plugin_read(user_data_t *u);
+
 /* This function is used only by "OVS_EVENTS_CTX_LOCK" define (see above).
  * It always returns 1 when context is locked.
  */
@@ -224,6 +227,7 @@ static int ovs_events_config_get_interfaces(const oconfig_item_t *ci) {
  * in allocated memory. Returns negative value in case of error.
  */
 static int ovs_events_plugin_config(oconfig_item_t *ci) {
+  _Bool dispatch_values = 1;
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
     if (strcasecmp("SendNotification", child->key) == 0) {
@@ -260,12 +264,28 @@ static int ovs_events_plugin_config(oconfig_item_t *ci) {
         ovs_events_config_free();
         return (-1);
       }
+    } else if (strcasecmp("DispatchValues", child->key) == 0) {
+      if (cf_util_get_boolean(child, &dispatch_values) != 0) {
+        ovs_events_config_free();
+        return (-1);
+      }
     } else {
       ERROR(OVS_EVENTS_PLUGIN ": option '%s' is not allowed here", child->key);
       ovs_events_config_free();
       return (-1);
     }
   }
+  /* Check and warn about invalid configuration */
+  if (!ovs_events_ctx.config.send_notification && !dispatch_values) {
+      WARNING(OVS_EVENTS_PLUGIN ": send notification and dispatch values "
+              "options are disabled. No information will be dispatched by the "
+              "plugin. Please check your configuration");
+  }
+  /* Dispatch link status values if configured */
+  if (dispatch_values)
+    return plugin_register_complex_read(NULL, OVS_EVENTS_PLUGIN,
+                                        ovs_events_plugin_read, 0, NULL);
+
   return (0);
 }
 
@@ -633,7 +653,5 @@ static int ovs_events_plugin_shutdown(void) {
 void module_register(void) {
   plugin_register_complex_config(OVS_EVENTS_PLUGIN, ovs_events_plugin_config);
   plugin_register_init(OVS_EVENTS_PLUGIN, ovs_events_plugin_init);
-  plugin_register_complex_read(NULL, OVS_EVENTS_PLUGIN, ovs_events_plugin_read,
-                               0, NULL);
   plugin_register_shutdown(OVS_EVENTS_PLUGIN, ovs_events_plugin_shutdown);
 }
index 640f08b..e3c7379 100644 (file)
@@ -38,6 +38,7 @@
 
 #include <rte_config.h>
 #include <rte_eal.h>
+#include <rte_ethdev.h>
 
 #include "common.h"
 #include "utils_dpdk.h"
@@ -850,3 +851,22 @@ uint128_t str_to_uint128(const char *str, int len) {
   }
   return lcore_mask;
 }
+
+uint8_t dpdk_helper_eth_dev_count() {
+  uint8_t ports = rte_eth_dev_count();
+  if (ports == 0) {
+    ERROR(
+        "%s:%d: No DPDK ports available. Check bound devices to DPDK driver.\n",
+        __FUNCTION__, __LINE__);
+    return ports;
+  }
+
+  if (ports > RTE_MAX_ETHPORTS) {
+    ERROR("%s:%d: Number of DPDK ports (%u) is greater than "
+          "RTE_MAX_ETHPORTS=%d. Ignoring extra ports\n",
+          __FUNCTION__, __LINE__, ports, RTE_MAX_ETHPORTS);
+    ports = RTE_MAX_ETHPORTS;
+  }
+
+  return ports;
+}
index c9bb14b..f97a6b5 100644 (file)
@@ -73,6 +73,7 @@ int dpdk_helper_command(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd, int *result,
                         cdtime_t cmd_wait_time);
 void *dpdk_helper_priv_get(dpdk_helper_ctx_t *phc);
 int dpdk_helper_data_size_get(dpdk_helper_ctx_t *phc);
+uint8_t dpdk_helper_eth_dev_count();
 
 /* forward declaration of handler function that is called by helper from
  * child process. not implemented in helper. must be provided by client. */
index 830db51..692088c 100644 (file)
 /* Plugin name */
 #define PLUGIN_NAME "virt"
 
+#ifdef LIBVIR_CHECK_VERSION
+
+#if LIBVIR_CHECK_VERSION(0, 9, 5)
+#define HAVE_BLOCK_STATS_FLAGS 1
+#endif
+
+#if LIBVIR_CHECK_VERSION(0, 9, 11)
+#define HAVE_CPU_STATS 1
+#endif
+
+#endif /* LIBVIR_CHECK_VERSION */
+
 static const char *config_keys[] = {"Connection",
 
                                     "RefreshInterval",
@@ -54,6 +66,7 @@ static const char *config_keys[] = {"Connection",
                                     "PluginInstanceFormat",
 
                                     "Instances",
+                                    "ExtraStats",
 
                                     NULL};
 #define NR_CONFIG_KEYS ((sizeof config_keys / sizeof config_keys[0]) - 1)
@@ -158,6 +171,19 @@ enum bd_field { target, source };
 /* InterfaceFormat. */
 enum if_field { if_address, if_name, if_number };
 
+/* ExtraStats */
+#define EX_STATS_MAX_FIELDS 8
+enum ex_stats { ex_stats_none = 0, ex_stats_disk = 1, ex_stats_pcpu = 2 };
+static unsigned int extra_stats = ex_stats_none;
+
+struct ex_stats_item {
+  const char *name;
+  enum ex_stats flag;
+};
+static const struct ex_stats_item ex_stats_table[] = {
+    {"disk", ex_stats_disk}, {"pcpu", ex_stats_pcpu}, {NULL, ex_stats_none},
+};
+
 /* BlockDeviceFormatBasename */
 _Bool blockdevice_format_basename = 0;
 static enum bd_field blockdevice_format = target;
@@ -168,6 +194,71 @@ static time_t last_refresh = (time_t)0;
 
 static int refresh_lists(struct lv_read_instance *inst);
 
+struct lv_info {
+  virDomainInfo di;
+  unsigned long long total_user_cpu_time;
+  unsigned long long total_syst_cpu_time;
+};
+
+struct lv_block_info {
+  virDomainBlockStatsStruct bi;
+
+  long long rd_total_times;
+  long long wr_total_times;
+
+  long long fl_req;
+  long long fl_total_times;
+};
+
+static void init_block_info(struct lv_block_info *binfo) {
+  if (binfo == NULL)
+    return;
+
+  binfo->bi.rd_req = -1;
+  binfo->bi.wr_req = -1;
+  binfo->bi.rd_bytes = -1;
+  binfo->bi.wr_bytes = -1;
+
+  binfo->rd_total_times = -1;
+  binfo->wr_total_times = -1;
+  binfo->fl_req = -1;
+  binfo->fl_total_times = -1;
+}
+
+#ifdef HAVE_BLOCK_STATS_FLAGS
+
+#define GET_BLOCK_INFO_VALUE(NAME, FIELD)                                      \
+  do {                                                                         \
+    if (!strcmp(param[i].field, NAME)) {                                       \
+      binfo->FIELD = param[i].value.l;                                         \
+      continue;                                                                \
+    }                                                                          \
+  } while (0)
+
+static int get_block_info(struct lv_block_info *binfo,
+                          virTypedParameterPtr param, int nparams) {
+  if (binfo == NULL || param == NULL)
+    return -1;
+
+  for (int i = 0; i < nparams; ++i) {
+    /* ignore type. Everything must be LLONG anyway. */
+    GET_BLOCK_INFO_VALUE("rd_operations", bi.rd_req);
+    GET_BLOCK_INFO_VALUE("wr_operations", bi.wr_req);
+    GET_BLOCK_INFO_VALUE("rd_bytes", bi.rd_bytes);
+    GET_BLOCK_INFO_VALUE("wr_bytes", bi.wr_bytes);
+    GET_BLOCK_INFO_VALUE("rd_total_times", rd_total_times);
+    GET_BLOCK_INFO_VALUE("wr_total_times", wr_total_times);
+    GET_BLOCK_INFO_VALUE("flush_operations", fl_req);
+    GET_BLOCK_INFO_VALUE("flush_total_times", fl_total_times);
+  }
+
+  return 0;
+}
+
+#undef GET_BLOCK_INFO_VALUE
+
+#endif /* HAVE_BLOCK_STATS_FLAGS */
+
 /* ERROR(...) macro for virterrors. */
 #define VIRT_ERROR(conn, s)                                                    \
   do {                                                                         \
@@ -177,6 +268,54 @@ static int refresh_lists(struct lv_read_instance *inst);
       ERROR("%s: %s", (s), err->message);                                      \
   } while (0)
 
+static void init_lv_info(struct lv_info *info) {
+  if (info != NULL)
+    memset(info, 0, sizeof(*info));
+}
+
+static int lv_domain_info(virDomainPtr dom, struct lv_info *info) {
+#ifdef HAVE_CPU_STATS
+  virTypedParameterPtr param = NULL;
+  int nparams = 0;
+#endif /* HAVE_CPU_STATS */
+  int ret = virDomainGetInfo(dom, &(info->di));
+  if (ret != 0) {
+    return ret;
+  }
+
+#ifdef HAVE_CPU_STATS
+  nparams = virDomainGetCPUStats(dom, NULL, 0, -1, 1, 0);
+  if (nparams < 0) {
+    VIRT_ERROR(conn, "getting the CPU params count");
+    return -1;
+  }
+
+  param = calloc(nparams, sizeof(virTypedParameter));
+  if (param == NULL) {
+    ERROR("virt plugin: alloc(%i) for cpu parameters failed.", nparams);
+    return -1;
+  }
+
+  ret = virDomainGetCPUStats(dom, param, nparams, -1, 1, 0); // total stats.
+  if (ret < 0) {
+    virTypedParamsFree(param, nparams);
+    VIRT_ERROR(conn, "getting the disk params values");
+    return -1;
+  }
+
+  for (int i = 0; i < nparams; ++i) {
+    if (!strcmp(param[i].field, "user_time"))
+      info->total_user_cpu_time = param[i].value.ul;
+    else if (!strcmp(param[i].field, "system_time"))
+      info->total_syst_cpu_time = param[i].value.ul;
+  }
+
+  virTypedParamsFree(param, nparams);
+#endif /* HAVE_CPU_STATS */
+
+  return 0;
+}
+
 static void init_value_list(value_list_t *vl, virDomainPtr dom) {
   int n;
   const char *name;
@@ -283,6 +422,23 @@ static void memory_stats_submit(gauge_t value, virDomainPtr dom,
   submit(dom, "memory", tags[tag_index], &(value_t){.gauge = value}, 1);
 }
 
+static void submit_derive2(const char *type, derive_t v0, derive_t v1,
+                           virDomainPtr dom, const char *devname) {
+  value_t values[] = {
+      {.derive = v0}, {.derive = v1},
+  };
+
+  submit(dom, type, devname, values, STATIC_ARRAY_SIZE(values));
+} /* void submit_derive2 */
+
+static void pcpu_submit(virDomainPtr dom, struct lv_info *info) {
+#ifdef HAVE_CPU_STATS
+  if (extra_stats & ex_stats_pcpu)
+    submit_derive2("ps_cputime", info->total_user_cpu_time,
+                   info->total_syst_cpu_time, dom, NULL);
+#endif /* HAVE_CPU_STATS */
+}
+
 static void cpu_submit(unsigned long long value, virDomainPtr dom,
                        const char *type) {
   submit(dom, type, NULL, &(value_t){.derive = (derive_t)value}, 1);
@@ -297,14 +453,51 @@ static void vcpu_submit(derive_t value, virDomainPtr dom, int vcpu_nr,
   submit(dom, type, type_instance, &(value_t){.derive = value}, 1);
 }
 
-static void submit_derive2(const char *type, derive_t v0, derive_t v1,
-                           virDomainPtr dom, const char *devname) {
-  value_t values[] = {
-      {.derive = v0}, {.derive = v1},
-  };
+static void disk_submit(struct lv_block_info *binfo, virDomainPtr dom,
+                        const char *type_instance) {
+  char flush_type_instance[DATA_MAX_NAME_LEN];
+
+  ssnprintf(flush_type_instance, sizeof(flush_type_instance), "flush-%s",
+            type_instance);
+
+  if ((binfo->bi.rd_req != -1) && (binfo->bi.wr_req != -1))
+    submit_derive2("disk_ops", (derive_t)binfo->bi.rd_req,
+                   (derive_t)binfo->bi.wr_req, dom, type_instance);
+
+  if ((binfo->bi.rd_bytes != -1) && (binfo->bi.wr_bytes != -1))
+    submit_derive2("disk_octets", (derive_t)binfo->bi.rd_bytes,
+                   (derive_t)binfo->bi.wr_bytes, dom, type_instance);
+
+  if (extra_stats & ex_stats_disk) {
+    if ((binfo->rd_total_times != -1) && (binfo->wr_total_times != -1))
+      submit_derive2("disk_time", (derive_t)binfo->rd_total_times,
+                     (derive_t)binfo->wr_total_times, dom, type_instance);
+
+    if (binfo->fl_req != -1)
+      submit(dom, "total_requests", flush_type_instance,
+             &(value_t){.derive = (derive_t)binfo->fl_req}, 1);
+    if (binfo->fl_total_times != -1) {
+      derive_t value = binfo->fl_total_times / 1000; // ns -> ms
+      submit(dom, "total_time_in_ms", flush_type_instance,
+             &(value_t){.derive = value}, 1);
+    }
+  }
+}
 
-  submit(dom, type, devname, values, STATIC_ARRAY_SIZE(values));
-} /* void submit_derive2 */
+static unsigned int parse_ex_stats_flags(char **exstats, int numexstats) {
+  unsigned int ex_stats_flags = ex_stats_none;
+  for (int i = 0; i < numexstats; i++) {
+    for (int j = 0; ex_stats_table[j].name != NULL; j++) {
+      if (strcasecmp(exstats[i], ex_stats_table[j].name) == 0) {
+        DEBUG(PLUGIN_NAME " plugin: enabling extra stats for '%s'",
+              ex_stats_table[j].name);
+        ex_stats_flags |= ex_stats_table[j].flag;
+        break;
+      }
+    }
+  }
+  return ex_stats_flags;
+}
 
 static int lv_config(const char *key, const char *value) {
   if (virInitialize() != 0)
@@ -500,6 +693,17 @@ static int lv_config(const char *key, const char *value) {
     return 0;
   }
 
+  if (strcasecmp(key, "ExtraStats") == 0) {
+    char *localvalue = strdup(value);
+    if (localvalue != NULL) {
+      char *exstats[EX_STATS_MAX_FIELDS];
+      int numexstats =
+          strsplit(localvalue, exstats, STATIC_ARRAY_SIZE(exstats));
+      extra_stats = parse_ex_stats_flags(exstats, numexstats);
+      sfree(localvalue);
+    }
+  }
+
   /* Unrecognised option. */
   return -1;
 }
@@ -527,6 +731,40 @@ static void lv_disconnect(void) {
   WARNING(PLUGIN_NAME " plugin: closed connection to libvirt");
 }
 
+static int lv_domain_block_info(virDomainPtr dom, const char *path,
+                                struct lv_block_info *binfo) {
+#ifdef HAVE_BLOCK_STATS_FLAGS
+  virTypedParameterPtr params = NULL;
+  int nparams = 0;
+  int rc = -1;
+  int ret = virDomainBlockStatsFlags(dom, path, NULL, &nparams, 0);
+  if (ret < 0 || nparams == 0) {
+    VIRT_ERROR(conn, "getting the disk params count");
+    return -1;
+  }
+
+  params = calloc(nparams, sizeof(virTypedParameter));
+  if (params == NULL) {
+    ERROR("virt plugin: alloc(%i) for block=%s parameters failed.", nparams,
+          path);
+    return -1;
+  }
+  ret = virDomainBlockStatsFlags(dom, path, params, &nparams, 0);
+  if (ret < 0) {
+    VIRT_ERROR(conn, "getting the disk params values");
+    goto done;
+  }
+
+  rc = get_block_info(binfo, params, nparams);
+
+done:
+  virTypedParamsFree(params, nparams);
+  return rc;
+#else
+  return virDomainBlockStats(dom, path, &(binfo->bi), sizeof(binfo->bi));
+#endif /* HAVE_BLOCK_STATS_FLAGS */
+}
+
 static int lv_read(user_data_t *ud) {
   time_t t;
   struct lv_read_instance *inst = NULL;
@@ -573,33 +811,30 @@ static int lv_read(user_data_t *ud) {
 
   /* Get CPU usage, memory, VCPU usage for each domain. */
   for (int i = 0; i < state->nr_domains; ++i) {
-    virDomainInfo info;
+    struct lv_info info;
     virVcpuInfoPtr vinfo = NULL;
     virDomainMemoryStatPtr minfo = NULL;
     int status;
 
-    status = virDomainGetInfo(state->domains[i], &info);
+    init_lv_info(&info);
+    status = lv_domain_info(state->domains[i], &info);
     if (status != 0) {
       ERROR(PLUGIN_NAME " plugin: virDomainGetInfo failed with status %i.",
             status);
       continue;
     }
 
-    if (info.state != VIR_DOMAIN_RUNNING) {
-      /* only gather stats for running domains */
-      continue;
-    }
+    pcpu_submit(state->domains[i], &info);
+    cpu_submit(info.di.cpuTime, state->domains[i], "virt_cpu_total");
+    memory_submit((gauge_t)info.di.memory * 1024, state->domains[i]);
 
-    cpu_submit(info.cpuTime, state->domains[i], "virt_cpu_total");
-    memory_submit((gauge_t)info.memory * 1024, state->domains[i]);
-
-    vinfo = malloc(info.nrVirtCpu * sizeof(vinfo[0]));
+    vinfo = malloc(info.di.nrVirtCpu * sizeof(vinfo[0]));
     if (vinfo == NULL) {
       ERROR(PLUGIN_NAME " plugin: malloc failed.");
       continue;
     }
 
-    status = virDomainGetVcpus(state->domains[i], vinfo, info.nrVirtCpu,
+    status = virDomainGetVcpus(state->domains[i], vinfo, info.di.nrVirtCpu,
                                /* cpu map = */ NULL, /* cpu map length = */ 0);
     if (status < 0) {
       ERROR(PLUGIN_NAME " plugin: virDomainGetVcpus failed with status %i.",
@@ -608,7 +843,7 @@ static int lv_read(user_data_t *ud) {
       continue;
     }
 
-    for (int j = 0; j < info.nrVirtCpu; ++j)
+    for (int j = 0; j < info.di.nrVirtCpu; ++j)
       vcpu_submit(vinfo[j].cpuTime, state->domains[i], vinfo[j].number,
                   "virt_vcpu");
 
@@ -640,29 +875,29 @@ static int lv_read(user_data_t *ud) {
 
   /* Get block device stats for each domain. */
   for (int i = 0; i < state->nr_block_devices; ++i) {
-    struct _virDomainBlockStats stats;
+    struct block_device *bdev = &(state->block_devices[i]);
+    struct lv_block_info binfo;
+    init_block_info(&binfo);
 
-    if (virDomainBlockStats(state->block_devices[i].dom,
-                            state->block_devices[i].path, &stats,
-                            sizeof stats) != 0)
+    if (lv_domain_block_info(bdev->dom, bdev->path, &binfo) < 0)
       continue;
 
-    char *type_instance = NULL;
-    if (blockdevice_format_basename && blockdevice_format == source)
-      type_instance = strdup(basename(state->block_devices[i].path));
-    else
-      type_instance = strdup(state->block_devices[i].path);
-
-    if ((stats.rd_req != -1) && (stats.wr_req != -1))
-      submit_derive2("disk_ops", (derive_t)stats.rd_req, (derive_t)stats.wr_req,
-                     state->block_devices[i].dom, type_instance);
+    char *type_instance = bdev->path;
+    char *path = NULL;
+    if (blockdevice_format_basename && blockdevice_format == source) {
+      path = strdup(bdev->path);
+      if (path == NULL) {
+        WARNING(PLUGIN_NAME
+                " plugin: error extracting the basename for '%s', skipped",
+                bdev->path);
+        continue;
+      }
+      type_instance = basename(path);
+    }
 
-    if ((stats.rd_bytes != -1) && (stats.wr_bytes != -1))
-      submit_derive2("disk_octets", (derive_t)stats.rd_bytes,
-                     (derive_t)stats.wr_bytes, state->block_devices[i].dom,
-                     type_instance);
+    disk_submit(&binfo, bdev->dom, type_instance);
 
-    sfree(type_instance);
+    sfree(path);
   } /* for (nr_block_devices) */
 
   /* Get interface stats for each domain. */
@@ -889,6 +1124,8 @@ static int refresh_lists(struct lv_read_instance *inst) {
       xmlXPathContextPtr xpath_ctx = NULL;
       xmlXPathObjectPtr xpath_obj = NULL;
       char tag[PARTITION_TAG_MAX_LEN] = {'\0'};
+      virDomainInfo info;
+      int status;
 
       dom = virDomainLookupByID(conn, domids[i]);
       if (dom == NULL) {
@@ -903,6 +1140,18 @@ static int refresh_lists(struct lv_read_instance *inst) {
         goto cont;
       }
 
+      status = virDomainGetInfo(dom, &info);
+      if (status != 0) {
+        ERROR(PLUGIN_NAME " plugin: virDomainGetInfo failed with status %i.",
+              status);
+        continue;
+      }
+
+      if (info.state != VIR_DOMAIN_RUNNING) {
+        DEBUG(PLUGIN_NAME " plugin: skipping inactive domain %s", name);
+        continue;
+      }
+
       if (il_domains && ignorelist_match(il_domains, name) != 0)
         goto cont;
 
index 992d856..8f6b4be 100644 (file)
@@ -34,8 +34,6 @@
 #include "utils_threshold.h"
 #include "write_riemann_threshold.h"
 
-#include <ltdl.h>
-
 /*
  * Threshold management
  * ====================
index 2083ec3..bdbb847 100755 (executable)
@@ -1,6 +1,6 @@
 #!/bin/sh
 
-DEFAULT_VERSION="5.7.0.git"
+DEFAULT_VERSION="5.7.1.git"
 
 if [ -d .git ]; then
        VERSION="`git describe --dirty=+ --abbrev=7 2> /dev/null | grep collectd | sed -e 's/^collectd-//' -e 's/-/./g'`"