Merge branch 'collectd-5.8'
[collectd.git] / src / dpdkevents.c
index 62e28b9..6fd642b 100644 (file)
  *   Krzysztof Matczak <krzysztofx@intel.com>
  */
 
+#include "collectd.h"
+
 #include "common.h"
 #include "plugin.h"
+
 #include "semaphore.h"
 #include "sys/mman.h"
 #include "utils_dpdk.h"
 #include "utils_time.h"
-#include "collectd.h"
 
 #include <rte_config.h>
 #include <rte_eal.h>
@@ -51,8 +53,6 @@
 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
 
-#define RTE_VERSION_16_07 RTE_VERSION_NUM(16, 7, 0, 16)
-
 typedef struct dpdk_keepalive_shm_s {
   sem_t core_died;
   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
@@ -66,19 +66,20 @@ typedef struct dpdk_ka_monitor_s {
 
 typedef struct dpdk_link_status_config_s {
   int enabled;
-  int send_updated;
+  bool send_updated;
   uint32_t enabled_port_mask;
   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
-  int notify;
+  bool notify;
 } dpdk_link_status_config_t;
 
 typedef struct dpdk_keep_alive_config_s {
   int enabled;
-  int send_updated;
+  bool send_updated;
   uint128_t lcore_mask;
   dpdk_keepalive_shm_t *shm;
   char shm_name[DATA_MAX_NAME_LEN];
-  int notify;
+  bool notify;
+  int fd;
 } dpdk_keep_alive_config_t;
 
 typedef struct dpdk_events_config_s {
@@ -100,14 +101,22 @@ typedef struct dpdk_events_ctx_s {
   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
 } dpdk_events_ctx_t;
 
+typedef enum {
+  DPDK_EVENTS_STATE_CFG_ERR = 1 << 0,
+  DPDK_EVENTS_STATE_KA_CFG_ERR = 1 << 1,
+  DPDK_EVENTS_STATE_LS_CFG_ERR = 1 << 2,
+
+} dpdk_events_cfg_status;
+
 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
 
 #define DPDK_EVENTS_TRACE()                                                    \
   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
 
 static dpdk_helper_ctx_t *g_hc;
+static dpdk_events_cfg_status g_state;
 
-static int dpdk_event_keep_alive_shm_create(void) {
+static int dpdk_event_keep_alive_shm_open(void) {
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
   char *shm_name;
 
@@ -120,24 +129,47 @@ static int dpdk_event_keep_alive_shm_create(void) {
             shm_name);
   }
 
-  char errbuf[ERR_BUF_SIZE];
-  int fd = shm_open(shm_name, O_RDWR, 0);
+  int fd = shm_open(shm_name, O_RDONLY, 0);
   if (fd < 0) {
     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
                              "primary application running?",
-          shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
+          shm_name, STRERRNO);
     return errno;
-  } else {
-    ec->config.keep_alive.shm =
-        (dpdk_keepalive_shm_t *)mmap(0, sizeof(*(ec->config.keep_alive.shm)),
-                                     PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-    close(fd);
-    if (ec->config.keep_alive.shm == MAP_FAILED) {
-      ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
-            sstrerror(errno, errbuf, sizeof(errbuf)));
-      return errno;
+  }
+
+  if (ec->config.keep_alive.fd != -1) {
+    struct stat stat_old, stat_new;
+
+    if (fstat(ec->config.keep_alive.fd, &stat_old) || fstat(fd, &stat_new)) {
+      ERROR(DPDK_EVENTS_PLUGIN ": failed to get information about a file");
+      close(fd);
+      return -1;
     }
+
+    /* Check if inode number has changed. If yes, then create a new mapping */
+    if (stat_old.st_ino == stat_new.st_ino) {
+      close(fd);
+      return 0;
+    }
+
+    if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t)) != 0) {
+      ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
+      close(fd);
+      return -1;
+    }
+
+    close(ec->config.keep_alive.fd);
+    ec->config.keep_alive.fd = -1;
+  }
+
+  ec->config.keep_alive.shm = (dpdk_keepalive_shm_t *)mmap(
+      0, sizeof(*(ec->config.keep_alive.shm)), PROT_READ, MAP_SHARED, fd, 0);
+  if (ec->config.keep_alive.shm == MAP_FAILED) {
+    ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s", STRERRNO);
+    close(fd);
+    return errno;
   }
+  ec->config.keep_alive.fd = fd;
 
   return 0;
 }
@@ -147,24 +179,30 @@ static void dpdk_events_default_config(void) {
 
   ec->config.interval = plugin_get_interval();
 
+  /* In configless mode when no <Plugin/> section is defined in config file
+   * both link_status and keep_alive should be enabled */
+
   /* Link Status */
-  ec->config.link_status.enabled = 0;
+  ec->config.link_status.enabled = 1;
   ec->config.link_status.enabled_port_mask = ~0;
-  ec->config.link_status.send_updated = 1;
-  ec->config.link_status.notify = 0;
+  ec->config.link_status.send_updated = true;
+  ec->config.link_status.notify = false;
 
   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
     ec->config.link_status.port_name[i][0] = 0;
   }
 
   /* Keep Alive */
-  ec->config.keep_alive.enabled = 0;
-  ec->config.keep_alive.send_updated = 1;
-  ec->config.keep_alive.notify = 0;
-  memset(&ec->config.keep_alive.lcore_mask, 0,
+  ec->config.keep_alive.enabled = 1;
+  ec->config.keep_alive.send_updated = true;
+  ec->config.keep_alive.notify = false;
+  /* by default enable 128 cores */
+  memset(&ec->config.keep_alive.lcore_mask, 1,
          sizeof(ec->config.keep_alive.lcore_mask));
   memset(&ec->config.keep_alive.shm_name, 0,
          sizeof(ec->config.keep_alive.shm_name));
+  ec->config.keep_alive.shm = MAP_FAILED;
+  ec->config.keep_alive.fd = -1;
 }
 
 static int dpdk_events_preinit(void) {
@@ -208,18 +246,26 @@ static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp("EnabledPortMask", child->key) == 0) {
-      ec->config.link_status.enabled_port_mask =
-          (uint32_t)child->values[0].value.number;
+      if (cf_util_get_int(child,
+                          (int *)&ec->config.link_status.enabled_port_mask))
+        return -1;
       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
             ec->config.link_status.enabled_port_mask);
     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
-      ec->config.link_status.send_updated = child->values[0].value.boolean;
+      if (cf_util_get_boolean(child, &ec->config.link_status.send_updated))
+        return -1;
       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
-            (int)child->values[0].value.boolean);
+            ec->config.link_status.send_updated);
     } else if (strcasecmp("SendNotification", child->key) == 0) {
-      ec->config.link_status.notify = child->values[0].value.boolean;
+      if (cf_util_get_boolean(child, &ec->config.link_status.notify))
+        return -1;
+
       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
-            (int)child->values[0].value.boolean);
+            ec->config.link_status.notify);
+    } else if (strcasecmp("PortName", child->key) != 0) {
+      ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
+            child->key);
+      return -1;
     }
   }
 
@@ -231,8 +277,12 @@ static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
     if (strcasecmp("PortName", child->key) == 0) {
       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
         port_num++;
-      ssnprintf(ec->config.link_status.port_name[port_num], DATA_MAX_NAME_LEN,
-                "%s", child->values[0].value.string);
+
+      if (cf_util_get_string_buffer(
+              child, ec->config.link_status.port_name[port_num],
+              sizeof(ec->config.link_status.port_name[port_num]))) {
+        return -1;
+      }
       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
             ec->config.link_status.port_name[port_num]);
       port_num++;
@@ -251,28 +301,38 @@ static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
-      ec->config.keep_alive.send_updated = child->values[0].value.boolean;
+      if (cf_util_get_boolean(child, &ec->config.keep_alive.send_updated))
+        return -1;
+
       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
-            (int)child->values[0].value.boolean);
+            ec->config.keep_alive.send_updated);
     } else if (strcasecmp("LCoreMask", child->key) == 0) {
       char lcore_mask[DATA_MAX_NAME_LEN];
-      ssnprintf(lcore_mask, sizeof(lcore_mask), "%s",
-                child->values[0].value.string);
+
+      if (cf_util_get_string_buffer(child, lcore_mask, sizeof(lcore_mask)))
+        return -1;
       ec->config.keep_alive.lcore_mask =
           str_to_uint128(lcore_mask, strlen(lcore_mask));
       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
             ec->config.keep_alive.lcore_mask.high,
             ec->config.keep_alive.lcore_mask.low);
     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
-      ssnprintf(ec->config.keep_alive.shm_name,
-                sizeof(ec->config.keep_alive.shm_name), "%s",
-                child->values[0].value.string);
+      if (cf_util_get_string_buffer(child, ec->config.keep_alive.shm_name,
+                                    sizeof(ec->config.keep_alive.shm_name)))
+        return -1;
+
       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
             ec->config.keep_alive.shm_name);
     } else if (strcasecmp("SendNotification", child->key) == 0) {
-      ec->config.keep_alive.notify = child->values[0].value.boolean;
+      if (cf_util_get_boolean(child, &ec->config.keep_alive.notify))
+        return -1;
+
       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
-            (int)child->values[0].value.boolean);
+            ec->config.keep_alive.notify);
+    } else {
+      ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
+            child->key);
+      return -1;
     }
   }
 
@@ -281,30 +341,79 @@ static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
 
 static int dpdk_events_config(oconfig_item_t *ci) {
   DPDK_EVENTS_TRACE();
-
   int ret = dpdk_events_preinit();
-  if (ret)
-    return ret;
+  if (ret) {
+    g_state |= DPDK_EVENTS_STATE_CFG_ERR;
+    return 0;
+  }
 
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
 
+  /* Disabling link_status and keep_alive since <Plugin/> config section
+   * specifies if those should be enabled */
+  ec->config.keep_alive.enabled = ec->config.link_status.enabled = 0;
+  memset(&ec->config.keep_alive.lcore_mask, 0,
+         sizeof(ec->config.keep_alive.lcore_mask));
+
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
-    if (strcasecmp("EAL", child->key) == 0) {
-      dpdk_helper_eal_config_parse(g_hc, child);
-    } else if (strcasecmp("Event", child->key) == 0) {
-      if (strcasecmp(child->values[0].value.string, "link_status") == 0) {
-        dpdk_events_link_status_config(ec, child);
-      } else if (strcasecmp(child->values[0].value.string, "keep_alive") == 0) {
-        dpdk_events_keep_alive_config(ec, child);
+
+    if (strcasecmp("EAL", child->key) == 0)
+      ret = dpdk_helper_eal_config_parse(g_hc, child);
+    else if (strcasecmp("Event", child->key) == 0) {
+      char event_type[DATA_MAX_NAME_LEN];
+
+      if (cf_util_get_string_buffer(child, event_type, sizeof(event_type)))
+        ret = -1;
+      else if (strcasecmp(event_type, "link_status") == 0) {
+        ret = dpdk_events_link_status_config(ec, child);
+        if (ret) {
+          g_state |= DPDK_EVENTS_STATE_LS_CFG_ERR;
+          continue;
+        }
+      } else if (strcasecmp(event_type, "keep_alive") == 0) {
+        ret = dpdk_events_keep_alive_config(ec, child);
+        if (ret) {
+          g_state |= DPDK_EVENTS_STATE_KA_CFG_ERR;
+          continue;
+        }
       } else {
         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
-              child->values[0].value.string);
+              event_type);
+        ret = -1;
       }
+    } else {
+      ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
+            child->key);
+      ret = -1;
+    }
+
+    if (ret != 0) {
+      g_state |= DPDK_EVENTS_STATE_CFG_ERR;
+      return 0;
     }
   }
 
-  return ret;
+  if (g_state & DPDK_EVENTS_STATE_KA_CFG_ERR) {
+    ERROR(DPDK_EVENTS_PLUGIN
+          ": Invalid keep alive configuration. Event disabled.");
+    ec->config.keep_alive.enabled = 0;
+  }
+
+  if (g_state & DPDK_EVENTS_STATE_LS_CFG_ERR) {
+    ERROR(DPDK_EVENTS_PLUGIN
+          ": Invalid link status configuration. Event disabled.");
+    ec->config.link_status.enabled = 0;
+  }
+
+  if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
+    ERROR(DPDK_EVENTS_PLUGIN ": At least one type of events should be "
+                             "configured for collecting. Plugin misconfigured");
+    g_state |= DPDK_EVENTS_STATE_CFG_ERR;
+    return 0;
+  }
+
+  return 0;
 }
 
 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
@@ -350,37 +459,35 @@ int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
   }
 
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
-
+  int ret = 0;
   if (ec->config.link_status.enabled)
-    dpdk_helper_link_status_get(phc);
+    ret = dpdk_helper_link_status_get(phc);
 
-  return 0;
+  return ret;
 }
 
 static void dpdk_events_notification_dispatch(int severity,
-                                              char *plugin_instance,
-                                              cdtime_t time, char *msg) {
-  notification_t n = {0};
-  n.severity = severity;
-  n.time = time;
+                                              const char *plugin_instance,
+                                              cdtime_t time, const char *msg) {
+  notification_t n = {
+      .severity = severity, .time = time, .plugin = DPDK_EVENTS_PLUGIN};
   sstrncpy(n.host, hostname_g, sizeof(n.host));
-  sstrncpy(n.plugin, DPDK_EVENTS_PLUGIN, sizeof(n.plugin));
   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
   sstrncpy(n.message, msg, sizeof(n.message));
   plugin_dispatch_notification(&n);
 }
 
-static void dpdk_events_gauge_submit(char *plugin_instance, char *type,
-                                     gauge_t value, cdtime_t time) {
+static void dpdk_events_gauge_submit(const char *plugin_instance,
+                                     const char *type_instance, gauge_t value,
+                                     cdtime_t time) {
   value_list_t vl = {.values = &(value_t){.gauge = value},
                      .values_len = 1,
                      .time = time,
                      .plugin = DPDK_EVENTS_PLUGIN,
                      .type = "gauge",
                      .meta = NULL};
-  sstrncpy(vl.host, hostname_g, sizeof(vl.host));
   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
-  sstrncpy(vl.type_instance, type, sizeof(vl.type_instance));
+  sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
   plugin_dispatch_values(&vl);
 }
 
@@ -400,17 +507,17 @@ static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
 
         char dev_name[DATA_MAX_NAME_LEN];
         if (ec->config.link_status.port_name[i][0] != 0) {
-          ssnprintf(dev_name, sizeof(dev_name), "%s",
-                    ec->config.link_status.port_name[i]);
+          snprintf(dev_name, sizeof(dev_name), "%s",
+                   ec->config.link_status.port_name[i]);
         } else {
-          ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
+          snprintf(dev_name, sizeof(dev_name), "port.%d", i);
         }
 
         if (ec->config.link_status.notify) {
           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
           char msg[DATA_MAX_NAME_LEN];
-          ssnprintf(msg, sizeof(msg), "Link Status: %s",
-                    ec->link_info[i].link_status ? "UP" : "DOWN");
+          snprintf(msg, sizeof(msg), "Link Status: %s",
+                   ec->link_info[i].link_status ? "UP" : "DOWN");
           dpdk_events_notification_dispatch(sev, dev_name,
                                             ec->link_info[i].read_time, msg);
         } else {
@@ -426,7 +533,7 @@ static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
   return 0;
 }
 
-static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
+static void dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
 
   /* dispatch Keep Alive values to collectd */
@@ -446,7 +553,7 @@ static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
     }
 
     char core_name[DATA_MAX_NAME_LEN];
-    ssnprintf(core_name, sizeof(core_name), "lcore%u", i);
+    snprintf(core_name, sizeof(core_name), "lcore%u", i);
 
     if (!ec->config.keep_alive.send_updated ||
         (ec->core_info[i].lcore_state !=
@@ -454,7 +561,6 @@ static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
       ec->core_info[i].read_time = cdtime();
 
-#if RTE_VERSION >= RTE_VERSION_16_07
       if (ec->config.keep_alive.notify) {
         char msg[DATA_MAX_NAME_LEN];
         int sev;
@@ -462,34 +568,34 @@ static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
         switch (ec->config.keep_alive.shm->core_state[i]) {
         case RTE_KA_STATE_ALIVE:
           sev = NOTIF_OKAY;
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
           break;
         case RTE_KA_STATE_MISSING:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
           sev = NOTIF_WARNING;
           break;
         case RTE_KA_STATE_DEAD:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
           sev = NOTIF_FAILURE;
           break;
         case RTE_KA_STATE_UNUSED:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
           sev = NOTIF_OKAY;
           break;
         case RTE_KA_STATE_GONE:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
           sev = NOTIF_FAILURE;
           break;
         case RTE_KA_STATE_DOZING:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
           sev = NOTIF_OKAY;
           break;
         case RTE_KA_STATE_SLEEP:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
           sev = NOTIF_OKAY;
           break;
         default:
-          ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
+          snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
           sev = NOTIF_FAILURE;
         }
 
@@ -500,15 +606,8 @@ static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
                                  ec->config.keep_alive.shm->core_state[i],
                                  ec->core_info[i].read_time);
       }
-#else
-      dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
-                               ec->config.keep_alive.shm->core_state[i],
-                               ec->core_info[i].read_time);
-#endif /* #if RTE_VERSION >= RTE_VERSION_16_07 */
     }
   }
-
-  return 0;
 }
 
 static int dpdk_events_read(user_data_t *ud) {
@@ -516,71 +615,80 @@ static int dpdk_events_read(user_data_t *ud) {
 
   if (g_hc == NULL) {
     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
-    return -EINVAL;
+    return -1;
   }
 
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
+  int ls_ret = -1, ka_ret = -1;
 
-  if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
-    /* nothing to do */
-    return 0;
-  }
-
+  int cmd_res = 0;
   if (ec->config.link_status.enabled) {
-    int cmd_res = 0;
-    int ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
-                                  ec->config.interval);
-    if (cmd_res == 0 && ret == 0) {
+    ls_ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
+                                 ec->config.interval);
+    if (cmd_res == 0 && ls_ret == 0) {
       dpdk_events_link_status_dispatch(g_hc);
     }
   }
 
   if (ec->config.keep_alive.enabled) {
-    dpdk_events_keep_alive_dispatch(g_hc);
+    ka_ret = dpdk_event_keep_alive_shm_open();
+    if (ka_ret) {
+      ERROR(DPDK_EVENTS_PLUGIN
+            ": %s : error %d in dpdk_event_keep_alive_shm_open()",
+            __FUNCTION__, ka_ret);
+    } else
+      dpdk_events_keep_alive_dispatch(g_hc);
+  }
+
+  if (!((cmd_res || ls_ret) == 0 || ka_ret == 0)) {
+    ERROR(DPDK_EVENTS_PLUGIN ": Read failure for all enabled event types");
+    return -1;
   }
 
   return 0;
 }
 
-static int dpdk_events_init(void) {
+static int dpdk_events_shutdown(void) {
   DPDK_EVENTS_TRACE();
 
-  int ret = dpdk_events_preinit();
-  if (ret)
-    return ret;
+  if (g_hc == NULL)
+    return 0;
 
   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
-
   if (ec->config.keep_alive.enabled) {
-    ret = dpdk_event_keep_alive_shm_create();
-    if (ret) {
-      ERROR(DPDK_EVENTS_PLUGIN ": %s : error %d in ka_shm_create()",
-            __FUNCTION__, ret);
-      return ret;
+    if (ec->config.keep_alive.fd != -1) {
+      close(ec->config.keep_alive.fd);
+      ec->config.keep_alive.fd = -1;
+    }
+
+    if (ec->config.keep_alive.shm != MAP_FAILED) {
+      if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t))) {
+        ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
+        return -1;
+      }
+      ec->config.keep_alive.shm = MAP_FAILED;
     }
   }
+
+  dpdk_helper_shutdown(g_hc);
+  g_hc = NULL;
+
   return 0;
 }
 
-static int dpdk_events_shutdown(void) {
+static int dpdk_events_init(void) {
   DPDK_EVENTS_TRACE();
-  int ret = 0;
 
-  dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
-  if (ec->config.keep_alive.enabled) {
-    ret = munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t));
-    if (ret) {
-      ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor returned %d", ret);
-      return ret;
-    }
+  if (g_state & DPDK_EVENTS_STATE_CFG_ERR) {
+    dpdk_events_shutdown();
+    return -1;
   }
 
-  ret = dpdk_helper_shutdown(g_hc);
-  g_hc = NULL;
+  int ret = dpdk_events_preinit();
   if (ret)
-    ERROR(DPDK_EVENTS_PLUGIN ": failed to cleanup %s helper", DPDK_EVENTS_NAME);
+    return ret;
 
-  return ret;
+  return 0;
 }
 
 void module_register(void) {