Plugin for getting DPDK ports link status and keep alive events.
authorKrzysztof Matczak <krzysztofx.matczak@intel.com>
Wed, 25 Jan 2017 14:01:39 +0000 (14:01 +0000)
committerKrzysztof Matczak <krzysztofx.matczak@intel.com>
Thu, 26 Jan 2017 21:19:39 +0000 (21:19 +0000)
Dpdkevents plugin collects and reports following events from DPDK based
applications:
- link status of network ports bound with DPDK
- keep alive events related to DPDK logical cores

In order to get link status plugin forks child process that attaches itself to
DPDK application as secondary DPDK process and fetches link status data using
DPDK API. This is the same approach like for dpdkstat plugin, also the same
utils_dpdk.c helper is utilized.

For getting keep alive events plugin communicates with DPDK application
via shared memory area. This is possible only if keep alive feature
is implemented in monitored application.
More details can be found under link
http://dpdk.org/doc/guides/sample_app_ug/keep_alive.html

Minimal required version of DPDK is 16.07.

Change-Id: Id820407ea933b1ecc31a72b6e344a6ec186ec780
Signed-off-by: Krzysztof Matczak <krzysztofx.matczak@intel.com>
Makefile.am
configure.ac
src/collectd.conf.in
src/collectd.conf.pod
src/dpdkevents.c [new file with mode: 0644]

index 805733e..0825fd7 100644 (file)
@@ -766,6 +766,15 @@ dns_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBPCAP_LDFLAGS)
 dns_la_LIBADD = $(BUILD_WITH_LIBPCAP_LIBS)
 endif
 
+if BUILD_PLUGIN_DPDKEVENTS
+pkglib_LTLIBRARIES += dpdkevents.la
+dpdkevents_la_SOURCES = src/dpdkevents.c src/utils_dpdk.c src/utils_dpdk.h
+dpdkevents_la_CPPFLAGS = $(AM_CPPFLAGS) $(LIBDPDK_CPPFLAGS)
+#dpdkevents_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_DPDK_CFLAGS)
+dpdkevents_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(LIBDPDK_LDFLAGS)
+dpdkevents_la_LIBADD = -ldpdk
+endif
+
 if BUILD_PLUGIN_DPDKSTAT
 pkglib_LTLIBRARIES += dpdkstat.la
 dpdkstat_la_SOURCES = src/dpdkstat.c src/utils_dpdk.c src/utils_dpdk.h
index 1b9b3fb..2d4e9dc 100644 (file)
@@ -2700,6 +2700,8 @@ AC_ARG_VAR([LIBDPDK_LDFLAGS], [Linker flags for libdpdk])
 
 AC_ARG_WITH([libdpdk], [AS_HELP_STRING([--without-libdpdk], [Disable libdpdk.])])
 
+rte_version_major=""
+rte_version_minor=""
 if test "x$with_libdpdk" != "xno"
 then
        if test "x$LIBDPDK_CPPFLAGS" = "x"
@@ -2709,7 +2711,37 @@ then
        SAVE_CPPFLAGS="$CPPFLAGS"
        CPPFLAGS="$LIBDPDK_CPPFLAGS $CPPFLAGS"
        AC_CHECK_HEADERS([rte_config.h],
-               [with_libdpdk="yes"],
+               [
+      with_libdpdk="yes"
+      AC_RUN_IFELSE(
+        [
+          AC_LANG_PROGRAM(
+            [[
+              #include <rte_version.h>
+            ]],
+            [[
+              return (RTE_VER_YEAR & 0xFF)
+            ]]
+          )
+        ],
+        [rte_version_major=0],
+        [rte_version_major=$?]
+      )
+      AC_RUN_IFELSE(
+        [
+          AC_LANG_PROGRAM(
+            [[
+              #include <rte_version.h>
+            ]],
+            [[
+              return (RTE_VER_MONTH & 0xFF)
+            ]]
+          )
+        ],
+        [rte_version_minor=0],
+        [rte_version_minor=$?]
+      )
+    ],
                [with_libdpdk="no (rte_config.h not found)"]
        )
        CPPFLAGS="$SAVE_CPPFLAGS"
@@ -5881,6 +5913,7 @@ plugin_curl_xml="no"
 plugin_df="no"
 plugin_disk="no"
 plugin_drbd="no"
+plugin_dpdkevents="no"
 plugin_dpdkstat="no"
 plugin_entropy="no"
 plugin_ethstat="no"
@@ -6231,6 +6264,14 @@ fi
 
 if test "x$with_libdpdk" = "xyes"
 then
+  plugin_dpdkevents="no (DPDK version < 16.07)"
+  if test $rte_version_major -eq 16
+  then
+    if test $rte_version_minor -ge 7
+    then
+    plugin_dpdkevents="yes"
+    fi
+  fi
   plugin_dpdkstat="yes"
 fi
 
@@ -6279,6 +6320,7 @@ AC_PLUGIN([dbi],                 [$with_libdbi],            [General database st
 AC_PLUGIN([df],                  [$plugin_df],              [Filesystem usage statistics])
 AC_PLUGIN([disk],                [$plugin_disk],            [Disk usage statistics])
 AC_PLUGIN([dns],                 [$with_libpcap],           [DNS traffic analysis])
+AC_PLUGIN([dpdkevents],          [$plugin_dpdkevents],      [Events from DPDK])
 AC_PLUGIN([dpdkstat],            [$plugin_dpdkstat],        [Stats from DPDK])
 AC_PLUGIN([drbd],                [$plugin_drbd],            [DRBD statistics])
 AC_PLUGIN([email],               [yes],                     [EMail statistics])
@@ -6704,6 +6746,7 @@ AC_MSG_RESULT([    dbi . . . . . . . . . $enable_dbi])
 AC_MSG_RESULT([    df  . . . . . . . . . $enable_df])
 AC_MSG_RESULT([    disk  . . . . . . . . $enable_disk])
 AC_MSG_RESULT([    dns . . . . . . . . . $enable_dns])
+AC_MSG_RESULT([    dpdkevents. . . . . . $enable_dpdkevents])
 AC_MSG_RESULT([    dpdkstat  . . . . . . $enable_dpdkstat])
 AC_MSG_RESULT([    drbd  . . . . . . . . $enable_drbd])
 AC_MSG_RESULT([    email . . . . . . . . $enable_email])
index 1cc86af..4ee8001 100644 (file)
 #@BUILD_PLUGIN_DF_TRUE@LoadPlugin df
 #@BUILD_PLUGIN_DISK_TRUE@LoadPlugin disk
 #@BUILD_PLUGIN_DNS_TRUE@LoadPlugin dns
+#@BUILD_PLUGIN_DPDKEVENTS_TRUE@LoadPlugin dpdkevents
 #@BUILD_PLUGIN_DPDKSTAT_TRUE@LoadPlugin dpdkstat
 #@BUILD_PLUGIN_DRBD_TRUE@LoadPlugin drbd
 #@BUILD_PLUGIN_EMAIL_TRUE@LoadPlugin email
 #      SelectNumericQueryTypes true
 #</Plugin>
 
+#<Plugin "dpdkevents">
+#  <EAL>
+#    Coremask "0x1"
+#    MemoryChannels "4"
+#    ProcessType "secondary"
+#    FilePrefix "rte"
+#  </EAL>
+#  <Event "link_status">
+#    SendEventsOnUpdate true
+#    EnabledPortMask 0xffff
+#    PortName "interface1"
+#    PortName "interface2"
+#    SendNotification false
+#  </Event>
+#  <Event "keep_alive">
+#    SendEventsOnUpdate true
+#    LCoreMask "0xf"
+#    KeepAliveShmName "/dpdk_keepalive_shm_name"
+#    SendNotification false
+#  </Event>
+#</Plugin>
+
 #<Plugin dpdkstat>
 #  <EAL>
 #    Coremask "0x2"
index 51fe68d..e7c00c3 100644 (file)
@@ -2383,6 +2383,128 @@ Enabled by default, collects unknown (and thus presented as numeric only) query
 
 =back
 
+=head2 Plugin C<dpdkevents>
+
+The I<dpdkevents plugin> collects events from DPDK such as link status of
+network ports and Keep Alive status of DPDK logical cores.
+In order to get Keep Alive events following requirements must be met:
+- DPDK >= 16.07
+- support for Keep Alive implemented in DPDK application. More details can
+be found here: http://dpdk.org/doc/guides/sample_app_ug/keep_alive.html
+
+B<Synopsis:>
+
+ <Plugin "dpdkevents">
+   <EAL>
+     Coremask "0x1"
+     MemoryChannels "4"
+     ProcessType "secondary"
+     FilePrefix "rte"
+   </EAL>
+   <Event "link_status">
+     SendEventsOnUpdate true
+     EnabledPortMask 0xffff
+     PortName "interface1"
+     PortName "interface2"
+     SendNotification false
+   </Event>
+   <Event "keep_alive">
+     SendEventsOnUpdate true
+     LCoreMask "0xf"
+     KeepAliveShmName "/dpdk_keepalive_shm_name"
+     SendNotification false
+   </Event>
+ </Plugin>
+
+B<Options:>
+
+
+=head3 The EAL block
+
+=over 5
+
+=item B<Coremask> I<Mask>
+
+=item B<Memorychannels> I<Channels>
+
+Number of memory channels per processor socket.
+
+=item B<ProcessType> I<type>
+
+The type of DPDK process instance.
+
+=item B<FilePrefix> I<File>
+
+The prefix text used for hugepage filenames. The filename will be set to
+/var/run/.<prefix>_config where prefix is what is passed in by the user.
+
+=back
+
+=head3 The Event block
+
+The B<Event> block defines configuration for specific event. It accepts a
+single argument which specifies the name of the event.
+
+=head4 Link Status event
+
+=over 5
+
+=item B<SendEventOnUpdate> I<true|false>
+
+If set to true link status value will be dispatched only when it is
+different from previously read value. This is an optional argument - default
+value is true.
+
+=item B<EnabledPortMask> I<Mask>
+
+A hexidecimal bit mask of the DPDK ports which should be enabled. A mask
+of 0x0 means that all ports will be disabled. A bitmask of all Fs means
+that all ports will be enabled. This is an optional argument - default
+is all ports enabled.
+
+=item B<PortName> I<Name>
+
+A string containing an optional name for the enabled DPDK ports. Each PortName
+option should contain only one port name; specify as many PortName options as
+desired. Default naming convention will be used if PortName is blank. If there
+are less PortName options than there are enabled ports, the default naming
+convention will be used for the additional ports.
+
+=item B<SendNotification> I<true|false>
+
+If set to true, link status notifications will be sent, instead of link
+status being collected as a statistic. This is an optional argument - default
+value is false.
+
+=back
+
+=head4 Keep Alive event
+
+=over 5
+
+=item B<SendEventOnUpdate> I<true|false>
+
+If set to true keep alive value will be dispatched only when it is
+different from previously read value. This is an optional argument - default
+value is true.
+
+=item B<LCoreMask> I<Mask>
+
+An hexadecimal bit mask of the logical cores to monitor keep alive state.
+
+=item B<KeepAliveShmName> I<Name>
+
+Shared memory name identifier that is used by secondary process to monitor
+the keep alive cores state.
+
+=item B<SendNotification> I<true|false>
+
+If set to true, keep alive notifications will be sent, instead of keep
+alive information being collected as a statistic. This is an optional
+argument - default value is false.
+
+=back
+
 =head2 Plugin C<dpdkstat>
 
 The I<dpdkstat plugin> collects information about DPDK interfaces using the
diff --git a/src/dpdkevents.c b/src/dpdkevents.c
new file mode 100644 (file)
index 0000000..62e28b9
--- /dev/null
@@ -0,0 +1,592 @@
+/*
+ * collectd - src/dpdkevents.c
+ * MIT License
+ *
+ * Copyright(c) 2017 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Maryam Tahhan <maryam.tahhan@intel.com>
+ *   Harry van Haaren <harry.van.haaren@intel.com>
+ *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
+ *   Kim-Marie Jones <kim-marie.jones@intel.com>
+ *   Krzysztof Matczak <krzysztofx@intel.com>
+ */
+
+#include "common.h"
+#include "plugin.h"
+#include "semaphore.h"
+#include "sys/mman.h"
+#include "utils_dpdk.h"
+#include "utils_time.h"
+#include "collectd.h"
+
+#include <rte_config.h>
+#include <rte_eal.h>
+#include <rte_ethdev.h>
+#include <rte_keepalive.h>
+
+#define DPDK_EVENTS_PLUGIN "dpdkevents"
+#define DPDK_EVENTS_NAME "dpdk_collectd_events"
+#define ETH_LINK_NA 0xFF
+
+#define INT64_BIT_SIZE 64
+#define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
+#define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
+
+#define RTE_VERSION_16_07 RTE_VERSION_NUM(16, 7, 0, 16)
+
+typedef struct dpdk_keepalive_shm_s {
+  sem_t core_died;
+  enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
+  uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
+} dpdk_keepalive_shm_t;
+
+typedef struct dpdk_ka_monitor_s {
+  cdtime_t read_time;
+  int lcore_state;
+} dpdk_ka_monitor_t;
+
+typedef struct dpdk_link_status_config_s {
+  int enabled;
+  int send_updated;
+  uint32_t enabled_port_mask;
+  char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
+  int notify;
+} dpdk_link_status_config_t;
+
+typedef struct dpdk_keep_alive_config_s {
+  int enabled;
+  int send_updated;
+  uint128_t lcore_mask;
+  dpdk_keepalive_shm_t *shm;
+  char shm_name[DATA_MAX_NAME_LEN];
+  int notify;
+} dpdk_keep_alive_config_t;
+
+typedef struct dpdk_events_config_s {
+  cdtime_t interval;
+  dpdk_link_status_config_t link_status;
+  dpdk_keep_alive_config_t keep_alive;
+} dpdk_events_config_t;
+
+typedef struct dpdk_link_info_s {
+  cdtime_t read_time;
+  int status_updated;
+  int link_status;
+} dpdk_link_info_t;
+
+typedef struct dpdk_events_ctx_s {
+  dpdk_events_config_t config;
+  uint32_t nb_ports;
+  dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
+  dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
+} dpdk_events_ctx_t;
+
+#define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
+
+#define DPDK_EVENTS_TRACE()                                                    \
+  DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
+
+static dpdk_helper_ctx_t *g_hc;
+
+static int dpdk_event_keep_alive_shm_create(void) {
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+  char *shm_name;
+
+  if (strlen(ec->config.keep_alive.shm_name)) {
+    shm_name = ec->config.keep_alive.shm_name;
+  } else {
+    shm_name = RTE_KEEPALIVE_SHM_NAME;
+    WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
+                               "specified, using default one: %s",
+            shm_name);
+  }
+
+  char errbuf[ERR_BUF_SIZE];
+  int fd = shm_open(shm_name, O_RDWR, 0);
+  if (fd < 0) {
+    ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
+                             "primary application running?",
+          shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
+    return errno;
+  } else {
+    ec->config.keep_alive.shm =
+        (dpdk_keepalive_shm_t *)mmap(0, sizeof(*(ec->config.keep_alive.shm)),
+                                     PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+    close(fd);
+    if (ec->config.keep_alive.shm == MAP_FAILED) {
+      ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
+            sstrerror(errno, errbuf, sizeof(errbuf)));
+      return errno;
+    }
+  }
+
+  return 0;
+}
+
+static void dpdk_events_default_config(void) {
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+
+  ec->config.interval = plugin_get_interval();
+
+  /* Link Status */
+  ec->config.link_status.enabled = 0;
+  ec->config.link_status.enabled_port_mask = ~0;
+  ec->config.link_status.send_updated = 1;
+  ec->config.link_status.notify = 0;
+
+  for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
+    ec->config.link_status.port_name[i][0] = 0;
+  }
+
+  /* Keep Alive */
+  ec->config.keep_alive.enabled = 0;
+  ec->config.keep_alive.send_updated = 1;
+  ec->config.keep_alive.notify = 0;
+  memset(&ec->config.keep_alive.lcore_mask, 0,
+         sizeof(ec->config.keep_alive.lcore_mask));
+  memset(&ec->config.keep_alive.shm_name, 0,
+         sizeof(ec->config.keep_alive.shm_name));
+}
+
+static int dpdk_events_preinit(void) {
+  DPDK_EVENTS_TRACE();
+
+  if (g_hc != NULL) {
+    /* already initialized if config callback was called before init callback */
+    DEBUG("dpdk_events_preinit: helper already initialized.");
+    return 0;
+  }
+
+  int ret =
+      dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
+  if (ret != 0) {
+    ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
+          DPDK_EVENTS_NAME, strerror(ret));
+    return ret;
+  }
+
+  dpdk_events_default_config();
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+  for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
+    ec->link_info[i].link_status = ETH_LINK_NA;
+  }
+
+  for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
+    ec->core_info[i].lcore_state = ETH_LINK_NA;
+  }
+
+  return ret;
+}
+
+static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
+                                          oconfig_item_t *ci) {
+  ec->config.link_status.enabled = 1;
+
+  DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("EnabledPortMask", child->key) == 0) {
+      ec->config.link_status.enabled_port_mask =
+          (uint32_t)child->values[0].value.number;
+      DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
+            ec->config.link_status.enabled_port_mask);
+    } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
+      ec->config.link_status.send_updated = child->values[0].value.boolean;
+      DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
+            (int)child->values[0].value.boolean);
+    } else if (strcasecmp("SendNotification", child->key) == 0) {
+      ec->config.link_status.notify = child->values[0].value.boolean;
+      DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
+            (int)child->values[0].value.boolean);
+    }
+  }
+
+  int port_num = 0;
+
+  /* parse port names after EnabledPortMask was parsed */
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+    if (strcasecmp("PortName", child->key) == 0) {
+      while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
+        port_num++;
+      ssnprintf(ec->config.link_status.port_name[port_num], DATA_MAX_NAME_LEN,
+                "%s", child->values[0].value.string);
+      DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
+            ec->config.link_status.port_name[port_num]);
+      port_num++;
+    }
+  }
+
+  return 0;
+}
+
+static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
+                                         oconfig_item_t *ci) {
+  ec->config.keep_alive.enabled = 1;
+  DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
+      ec->config.keep_alive.send_updated = child->values[0].value.boolean;
+      DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
+            (int)child->values[0].value.boolean);
+    } else if (strcasecmp("LCoreMask", child->key) == 0) {
+      char lcore_mask[DATA_MAX_NAME_LEN];
+      ssnprintf(lcore_mask, sizeof(lcore_mask), "%s",
+                child->values[0].value.string);
+      ec->config.keep_alive.lcore_mask =
+          str_to_uint128(lcore_mask, strlen(lcore_mask));
+      DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
+            ec->config.keep_alive.lcore_mask.high,
+            ec->config.keep_alive.lcore_mask.low);
+    } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
+      ssnprintf(ec->config.keep_alive.shm_name,
+                sizeof(ec->config.keep_alive.shm_name), "%s",
+                child->values[0].value.string);
+      DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
+            ec->config.keep_alive.shm_name);
+    } else if (strcasecmp("SendNotification", child->key) == 0) {
+      ec->config.keep_alive.notify = child->values[0].value.boolean;
+      DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
+            (int)child->values[0].value.boolean);
+    }
+  }
+
+  return 0;
+}
+
+static int dpdk_events_config(oconfig_item_t *ci) {
+  DPDK_EVENTS_TRACE();
+
+  int ret = dpdk_events_preinit();
+  if (ret)
+    return ret;
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+    if (strcasecmp("EAL", child->key) == 0) {
+      dpdk_helper_eal_config_parse(g_hc, child);
+    } else if (strcasecmp("Event", child->key) == 0) {
+      if (strcasecmp(child->values[0].value.string, "link_status") == 0) {
+        dpdk_events_link_status_config(ec, child);
+      } else if (strcasecmp(child->values[0].value.string, "keep_alive") == 0) {
+        dpdk_events_keep_alive_config(ec, child);
+      } else {
+        ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
+              child->values[0].value.string);
+      }
+    }
+  }
+
+  return ret;
+}
+
+static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
+
+  /* get Link Status values from DPDK */
+  uint8_t nb_ports = rte_eth_dev_count();
+  if (nb_ports == 0) {
+    DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
+                   "Check bound devices to DPDK driver.\n");
+    return -ENODEV;
+  }
+  ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
+
+  for (int i = 0; i < ec->nb_ports; i++) {
+    if (ec->config.link_status.enabled_port_mask & (1 << i)) {
+      struct rte_eth_link link;
+      ec->link_info[i].read_time = cdtime();
+      rte_eth_link_get_nowait(i, &link);
+      if ((link.link_status == ETH_LINK_NA) ||
+          (link.link_status != ec->link_info[i].link_status)) {
+        ec->link_info[i].link_status = link.link_status;
+        ec->link_info[i].status_updated = 1;
+        DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
+                       link.link_status ? "UP" : "DOWN");
+      }
+    }
+  }
+
+  return 0;
+}
+
+/* this function is called from helper context */
+int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
+  if (phc == NULL) {
+    DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
+    return -EINVAL;
+  }
+
+  if (cmd != DPDK_CMD_GET_EVENTS) {
+    DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
+    return -EINVAL;
+  }
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
+
+  if (ec->config.link_status.enabled)
+    dpdk_helper_link_status_get(phc);
+
+  return 0;
+}
+
+static void dpdk_events_notification_dispatch(int severity,
+                                              char *plugin_instance,
+                                              cdtime_t time, char *msg) {
+  notification_t n = {0};
+  n.severity = severity;
+  n.time = time;
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin, DPDK_EVENTS_PLUGIN, sizeof(n.plugin));
+  sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
+  sstrncpy(n.message, msg, sizeof(n.message));
+  plugin_dispatch_notification(&n);
+}
+
+static void dpdk_events_gauge_submit(char *plugin_instance, char *type,
+                                     gauge_t value, cdtime_t time) {
+  value_list_t vl = {.values = &(value_t){.gauge = value},
+                     .values_len = 1,
+                     .time = time,
+                     .plugin = DPDK_EVENTS_PLUGIN,
+                     .type = "gauge",
+                     .meta = NULL};
+  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
+  sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
+  sstrncpy(vl.type_instance, type, sizeof(vl.type_instance));
+  plugin_dispatch_values(&vl);
+}
+
+static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
+  DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
+        ec->nb_ports);
+
+  /* dispatch Link Status values to collectd */
+  for (int i = 0; i < ec->nb_ports; i++) {
+    if (ec->config.link_status.enabled_port_mask & (1 << i)) {
+      if (!ec->config.link_status.send_updated ||
+          ec->link_info[i].status_updated) {
+
+        DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
+              ec->link_info[i].link_status ? "UP" : "DOWN");
+
+        char dev_name[DATA_MAX_NAME_LEN];
+        if (ec->config.link_status.port_name[i][0] != 0) {
+          ssnprintf(dev_name, sizeof(dev_name), "%s",
+                    ec->config.link_status.port_name[i]);
+        } else {
+          ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
+        }
+
+        if (ec->config.link_status.notify) {
+          int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
+          char msg[DATA_MAX_NAME_LEN];
+          ssnprintf(msg, sizeof(msg), "Link Status: %s",
+                    ec->link_info[i].link_status ? "UP" : "DOWN");
+          dpdk_events_notification_dispatch(sev, dev_name,
+                                            ec->link_info[i].read_time, msg);
+        } else {
+          dpdk_events_gauge_submit(dev_name, "link_status",
+                                   (gauge_t)ec->link_info[i].link_status,
+                                   ec->link_info[i].read_time);
+        }
+        ec->link_info[i].status_updated = 0;
+      }
+    }
+  }
+
+  return 0;
+}
+
+static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
+
+  /* dispatch Keep Alive values to collectd */
+  for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
+    if (i < INT64_BIT_SIZE) {
+      if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
+        continue;
+    } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
+      if (!(ec->config.keep_alive.lcore_mask.high &
+            ((uint64_t)1 << (i - INT64_BIT_SIZE))))
+        continue;
+    } else {
+      WARNING(DPDK_EVENTS_PLUGIN
+              ": %s:%d Core id %u is out of 0 to %u range, skipping",
+              __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
+      continue;
+    }
+
+    char core_name[DATA_MAX_NAME_LEN];
+    ssnprintf(core_name, sizeof(core_name), "lcore%u", i);
+
+    if (!ec->config.keep_alive.send_updated ||
+        (ec->core_info[i].lcore_state !=
+         ec->config.keep_alive.shm->core_state[i])) {
+      ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
+      ec->core_info[i].read_time = cdtime();
+
+#if RTE_VERSION >= RTE_VERSION_16_07
+      if (ec->config.keep_alive.notify) {
+        char msg[DATA_MAX_NAME_LEN];
+        int sev;
+
+        switch (ec->config.keep_alive.shm->core_state[i]) {
+        case RTE_KA_STATE_ALIVE:
+          sev = NOTIF_OKAY;
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
+          break;
+        case RTE_KA_STATE_MISSING:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
+          sev = NOTIF_WARNING;
+          break;
+        case RTE_KA_STATE_DEAD:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
+          sev = NOTIF_FAILURE;
+          break;
+        case RTE_KA_STATE_UNUSED:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
+          sev = NOTIF_OKAY;
+          break;
+        case RTE_KA_STATE_GONE:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
+          sev = NOTIF_FAILURE;
+          break;
+        case RTE_KA_STATE_DOZING:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
+          sev = NOTIF_OKAY;
+          break;
+        case RTE_KA_STATE_SLEEP:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
+          sev = NOTIF_OKAY;
+          break;
+        default:
+          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
+          sev = NOTIF_FAILURE;
+        }
+
+        dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
+                                          ec->core_info[i].read_time, msg);
+      } else {
+        dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
+                                 ec->config.keep_alive.shm->core_state[i],
+                                 ec->core_info[i].read_time);
+      }
+#else
+      dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
+                               ec->config.keep_alive.shm->core_state[i],
+                               ec->core_info[i].read_time);
+#endif /* #if RTE_VERSION >= RTE_VERSION_16_07 */
+    }
+  }
+
+  return 0;
+}
+
+static int dpdk_events_read(user_data_t *ud) {
+  DPDK_EVENTS_TRACE();
+
+  if (g_hc == NULL) {
+    ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
+    return -EINVAL;
+  }
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+
+  if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
+    /* nothing to do */
+    return 0;
+  }
+
+  if (ec->config.link_status.enabled) {
+    int cmd_res = 0;
+    int ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
+                                  ec->config.interval);
+    if (cmd_res == 0 && ret == 0) {
+      dpdk_events_link_status_dispatch(g_hc);
+    }
+  }
+
+  if (ec->config.keep_alive.enabled) {
+    dpdk_events_keep_alive_dispatch(g_hc);
+  }
+
+  return 0;
+}
+
+static int dpdk_events_init(void) {
+  DPDK_EVENTS_TRACE();
+
+  int ret = dpdk_events_preinit();
+  if (ret)
+    return ret;
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+
+  if (ec->config.keep_alive.enabled) {
+    ret = dpdk_event_keep_alive_shm_create();
+    if (ret) {
+      ERROR(DPDK_EVENTS_PLUGIN ": %s : error %d in ka_shm_create()",
+            __FUNCTION__, ret);
+      return ret;
+    }
+  }
+  return 0;
+}
+
+static int dpdk_events_shutdown(void) {
+  DPDK_EVENTS_TRACE();
+  int ret = 0;
+
+  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+  if (ec->config.keep_alive.enabled) {
+    ret = munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t));
+    if (ret) {
+      ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor returned %d", ret);
+      return ret;
+    }
+  }
+
+  ret = dpdk_helper_shutdown(g_hc);
+  g_hc = NULL;
+  if (ret)
+    ERROR(DPDK_EVENTS_PLUGIN ": failed to cleanup %s helper", DPDK_EVENTS_NAME);
+
+  return ret;
+}
+
+void module_register(void) {
+  plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
+  plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
+  plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
+                               NULL);
+  plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
+}