dpdkevents: initialization fixes
[collectd.git] / src / dpdkevents.c
1 /*
2  * collectd - src/dpdkevents.c
3  * MIT License
4  *
5  * Copyright(c) 2017 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  *   Kim-Marie Jones <kim-marie.jones@intel.com>
30  *   Krzysztof Matczak <krzysztofx@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37
38 #include "semaphore.h"
39 #include "sys/mman.h"
40 #include "utils_dpdk.h"
41 #include "utils_time.h"
42
43 #include <rte_config.h>
44 #include <rte_eal.h>
45 #include <rte_ethdev.h>
46 #include <rte_keepalive.h>
47
48 #define DPDK_EVENTS_PLUGIN "dpdkevents"
49 #define DPDK_EVENTS_NAME "dpdk_collectd_events"
50 #define ETH_LINK_NA 0xFF
51
52 #define INT64_BIT_SIZE 64
53 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
54 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
55
56 typedef struct dpdk_keepalive_shm_s {
57   sem_t core_died;
58   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
59   uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
60 } dpdk_keepalive_shm_t;
61
62 typedef struct dpdk_ka_monitor_s {
63   cdtime_t read_time;
64   int lcore_state;
65 } dpdk_ka_monitor_t;
66
67 typedef struct dpdk_link_status_config_s {
68   int enabled;
69   int send_updated;
70   uint32_t enabled_port_mask;
71   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
72   int notify;
73 } dpdk_link_status_config_t;
74
75 typedef struct dpdk_keep_alive_config_s {
76   int enabled;
77   int send_updated;
78   uint128_t lcore_mask;
79   dpdk_keepalive_shm_t *shm;
80   char shm_name[DATA_MAX_NAME_LEN];
81   int notify;
82   int fd;
83 } dpdk_keep_alive_config_t;
84
85 typedef struct dpdk_events_config_s {
86   cdtime_t interval;
87   dpdk_link_status_config_t link_status;
88   dpdk_keep_alive_config_t keep_alive;
89 } dpdk_events_config_t;
90
91 typedef struct dpdk_link_info_s {
92   cdtime_t read_time;
93   int status_updated;
94   int link_status;
95 } dpdk_link_info_t;
96
97 typedef struct dpdk_events_ctx_s {
98   dpdk_events_config_t config;
99   uint32_t nb_ports;
100   dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
101   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
102 } dpdk_events_ctx_t;
103
104 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
105
106 #define DPDK_EVENTS_TRACE()                                                    \
107   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
108
109 static dpdk_helper_ctx_t *g_hc;
110
111 static int dpdk_event_keep_alive_shm_open(void) {
112   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
113   char *shm_name;
114
115   if (strlen(ec->config.keep_alive.shm_name)) {
116     shm_name = ec->config.keep_alive.shm_name;
117   } else {
118     shm_name = RTE_KEEPALIVE_SHM_NAME;
119     WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
120                                "specified, using default one: %s",
121             shm_name);
122   }
123
124   char errbuf[ERR_BUF_SIZE];
125   int fd = shm_open(shm_name, O_RDONLY, 0);
126   if (fd < 0) {
127     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
128                              "primary application running?",
129           shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
130     return errno;
131   }
132
133   if (ec->config.keep_alive.fd != -1) {
134     struct stat stat_old, stat_new;
135
136     if (fstat(ec->config.keep_alive.fd, &stat_old) || fstat(fd, &stat_new)) {
137       ERROR(DPDK_EVENTS_PLUGIN ": failed to get information about a file");
138       close(fd);
139       return -1;
140     }
141
142     /* Check if inode number has changed. If yes, then create a new mapping */
143     if (stat_old.st_ino == stat_new.st_ino) {
144       close(fd);
145       return 0;
146     }
147
148     if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t)) != 0) {
149       ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
150       close(fd);
151       return -1;
152     }
153
154     close(ec->config.keep_alive.fd);
155     ec->config.keep_alive.fd = -1;
156   }
157
158   ec->config.keep_alive.shm = (dpdk_keepalive_shm_t *)mmap(
159       0, sizeof(*(ec->config.keep_alive.shm)), PROT_READ, MAP_SHARED, fd, 0);
160   if (ec->config.keep_alive.shm == MAP_FAILED) {
161     ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
162           sstrerror(errno, errbuf, sizeof(errbuf)));
163     close(fd);
164     return errno;
165   }
166   ec->config.keep_alive.fd = fd;
167
168   return 0;
169 }
170
171 static void dpdk_events_default_config(void) {
172   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
173
174   ec->config.interval = plugin_get_interval();
175
176   /* In configless mode when no <Plugin/> section is defined in config file
177    * both link_status and keep_alive should be enabled */
178
179   /* Link Status */
180   ec->config.link_status.enabled = 1;
181   ec->config.link_status.enabled_port_mask = ~0;
182   ec->config.link_status.send_updated = 1;
183   ec->config.link_status.notify = 0;
184
185   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
186     ec->config.link_status.port_name[i][0] = 0;
187   }
188
189   /* Keep Alive */
190   ec->config.keep_alive.enabled = 1;
191   ec->config.keep_alive.send_updated = 1;
192   ec->config.keep_alive.notify = 0;
193   /* by default enable 128 cores */
194   memset(&ec->config.keep_alive.lcore_mask, 1,
195          sizeof(ec->config.keep_alive.lcore_mask));
196   memset(&ec->config.keep_alive.shm_name, 0,
197          sizeof(ec->config.keep_alive.shm_name));
198   ec->config.keep_alive.shm = MAP_FAILED;
199   ec->config.keep_alive.fd = -1;
200 }
201
202 static int dpdk_events_preinit(void) {
203   DPDK_EVENTS_TRACE();
204
205   if (g_hc != NULL) {
206     /* already initialized if config callback was called before init callback */
207     DEBUG("dpdk_events_preinit: helper already initialized.");
208     return 0;
209   }
210
211   int ret =
212       dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
213   if (ret != 0) {
214     ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
215           DPDK_EVENTS_NAME, strerror(ret));
216     return ret;
217   }
218
219   dpdk_events_default_config();
220
221   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
222   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
223     ec->link_info[i].link_status = ETH_LINK_NA;
224   }
225
226   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
227     ec->core_info[i].lcore_state = ETH_LINK_NA;
228   }
229
230   return ret;
231 }
232
233 static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
234                                           oconfig_item_t *ci) {
235   ec->config.link_status.enabled = 1;
236
237   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
238
239   for (int i = 0; i < ci->children_num; i++) {
240     oconfig_item_t *child = ci->children + i;
241
242     if (strcasecmp("EnabledPortMask", child->key) == 0) {
243       ec->config.link_status.enabled_port_mask =
244           (uint32_t)child->values[0].value.number;
245       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
246             ec->config.link_status.enabled_port_mask);
247     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
248       ec->config.link_status.send_updated = child->values[0].value.boolean;
249       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
250             (int)child->values[0].value.boolean);
251     } else if (strcasecmp("SendNotification", child->key) == 0) {
252       ec->config.link_status.notify = child->values[0].value.boolean;
253       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
254             (int)child->values[0].value.boolean);
255     }
256   }
257
258   int port_num = 0;
259
260   /* parse port names after EnabledPortMask was parsed */
261   for (int i = 0; i < ci->children_num; i++) {
262     oconfig_item_t *child = ci->children + i;
263     if (strcasecmp("PortName", child->key) == 0) {
264       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
265         port_num++;
266       ssnprintf(ec->config.link_status.port_name[port_num], DATA_MAX_NAME_LEN,
267                 "%s", child->values[0].value.string);
268       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
269             ec->config.link_status.port_name[port_num]);
270       port_num++;
271     }
272   }
273
274   return 0;
275 }
276
277 static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
278                                          oconfig_item_t *ci) {
279   ec->config.keep_alive.enabled = 1;
280   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
281
282   for (int i = 0; i < ci->children_num; i++) {
283     oconfig_item_t *child = ci->children + i;
284
285     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
286       ec->config.keep_alive.send_updated = child->values[0].value.boolean;
287       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
288             (int)child->values[0].value.boolean);
289     } else if (strcasecmp("LCoreMask", child->key) == 0) {
290       char lcore_mask[DATA_MAX_NAME_LEN];
291       ssnprintf(lcore_mask, sizeof(lcore_mask), "%s",
292                 child->values[0].value.string);
293       ec->config.keep_alive.lcore_mask =
294           str_to_uint128(lcore_mask, strlen(lcore_mask));
295       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
296             ec->config.keep_alive.lcore_mask.high,
297             ec->config.keep_alive.lcore_mask.low);
298     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
299       ssnprintf(ec->config.keep_alive.shm_name,
300                 sizeof(ec->config.keep_alive.shm_name), "%s",
301                 child->values[0].value.string);
302       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
303             ec->config.keep_alive.shm_name);
304     } else if (strcasecmp("SendNotification", child->key) == 0) {
305       ec->config.keep_alive.notify = child->values[0].value.boolean;
306       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
307             (int)child->values[0].value.boolean);
308     }
309   }
310
311   return 0;
312 }
313
314 static int dpdk_events_config(oconfig_item_t *ci) {
315   DPDK_EVENTS_TRACE();
316   int ret = dpdk_events_preinit();
317   if (ret)
318     return ret;
319
320   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
321
322   /* Disabling link_status and keep_alive since <Plugin/> config section
323    * specifies if those should be enabled */
324   ec->config.keep_alive.enabled = ec->config.link_status.enabled = 0;
325   memset(&ec->config.keep_alive.lcore_mask, 0,
326          sizeof(ec->config.keep_alive.lcore_mask));
327
328   for (int i = 0; i < ci->children_num; i++) {
329     oconfig_item_t *child = ci->children + i;
330     if (strcasecmp("EAL", child->key) == 0) {
331       dpdk_helper_eal_config_parse(g_hc, child);
332     } else if (strcasecmp("Event", child->key) == 0) {
333       if (strcasecmp(child->values[0].value.string, "link_status") == 0) {
334         dpdk_events_link_status_config(ec, child);
335       } else if (strcasecmp(child->values[0].value.string, "keep_alive") == 0) {
336         dpdk_events_keep_alive_config(ec, child);
337       } else {
338         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
339               child->values[0].value.string);
340       }
341     }
342   }
343
344   if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
345     ERROR(DPDK_EVENTS_PLUGIN ": At least one type of events should be "
346                              "configured for collecting. Plugin misconfigured");
347     return -1;
348   }
349
350   return ret;
351 }
352
353 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
354   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
355
356   /* get Link Status values from DPDK */
357   uint8_t nb_ports = rte_eth_dev_count();
358   if (nb_ports == 0) {
359     DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
360                    "Check bound devices to DPDK driver.\n");
361     return -ENODEV;
362   }
363   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
364
365   for (int i = 0; i < ec->nb_ports; i++) {
366     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
367       struct rte_eth_link link;
368       ec->link_info[i].read_time = cdtime();
369       rte_eth_link_get_nowait(i, &link);
370       if ((link.link_status == ETH_LINK_NA) ||
371           (link.link_status != ec->link_info[i].link_status)) {
372         ec->link_info[i].link_status = link.link_status;
373         ec->link_info[i].status_updated = 1;
374         DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
375                        link.link_status ? "UP" : "DOWN");
376       }
377     }
378   }
379
380   return 0;
381 }
382
383 /* this function is called from helper context */
384 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
385   if (phc == NULL) {
386     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
387     return -EINVAL;
388   }
389
390   if (cmd != DPDK_CMD_GET_EVENTS) {
391     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
392     return -EINVAL;
393   }
394
395   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
396   int ret = 0;
397   if (ec->config.link_status.enabled)
398     ret = dpdk_helper_link_status_get(phc);
399
400   return ret;
401 }
402
403 static void dpdk_events_notification_dispatch(int severity,
404                                               const char *plugin_instance,
405                                               cdtime_t time, const char *msg) {
406   notification_t n = {
407       .severity = severity, .time = time, .plugin = DPDK_EVENTS_PLUGIN};
408   sstrncpy(n.host, hostname_g, sizeof(n.host));
409   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
410   sstrncpy(n.message, msg, sizeof(n.message));
411   plugin_dispatch_notification(&n);
412 }
413
414 static void dpdk_events_gauge_submit(const char *plugin_instance,
415                                      const char *type_instance, gauge_t value,
416                                      cdtime_t time) {
417   value_list_t vl = {.values = &(value_t){.gauge = value},
418                      .values_len = 1,
419                      .time = time,
420                      .plugin = DPDK_EVENTS_PLUGIN,
421                      .type = "gauge",
422                      .meta = NULL};
423   sstrncpy(vl.host, hostname_g, sizeof(vl.host));
424   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
425   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
426   plugin_dispatch_values(&vl);
427 }
428
429 static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
430   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
431   DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
432         ec->nb_ports);
433
434   /* dispatch Link Status values to collectd */
435   for (int i = 0; i < ec->nb_ports; i++) {
436     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
437       if (!ec->config.link_status.send_updated ||
438           ec->link_info[i].status_updated) {
439
440         DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
441               ec->link_info[i].link_status ? "UP" : "DOWN");
442
443         char dev_name[DATA_MAX_NAME_LEN];
444         if (ec->config.link_status.port_name[i][0] != 0) {
445           ssnprintf(dev_name, sizeof(dev_name), "%s",
446                     ec->config.link_status.port_name[i]);
447         } else {
448           ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
449         }
450
451         if (ec->config.link_status.notify) {
452           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
453           char msg[DATA_MAX_NAME_LEN];
454           ssnprintf(msg, sizeof(msg), "Link Status: %s",
455                     ec->link_info[i].link_status ? "UP" : "DOWN");
456           dpdk_events_notification_dispatch(sev, dev_name,
457                                             ec->link_info[i].read_time, msg);
458         } else {
459           dpdk_events_gauge_submit(dev_name, "link_status",
460                                    (gauge_t)ec->link_info[i].link_status,
461                                    ec->link_info[i].read_time);
462         }
463         ec->link_info[i].status_updated = 0;
464       }
465     }
466   }
467
468   return 0;
469 }
470
471 static void dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
472   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
473
474   /* dispatch Keep Alive values to collectd */
475   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
476     if (i < INT64_BIT_SIZE) {
477       if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
478         continue;
479     } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
480       if (!(ec->config.keep_alive.lcore_mask.high &
481             ((uint64_t)1 << (i - INT64_BIT_SIZE))))
482         continue;
483     } else {
484       WARNING(DPDK_EVENTS_PLUGIN
485               ": %s:%d Core id %u is out of 0 to %u range, skipping",
486               __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
487       continue;
488     }
489
490     char core_name[DATA_MAX_NAME_LEN];
491     ssnprintf(core_name, sizeof(core_name), "lcore%u", i);
492
493     if (!ec->config.keep_alive.send_updated ||
494         (ec->core_info[i].lcore_state !=
495          ec->config.keep_alive.shm->core_state[i])) {
496       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
497       ec->core_info[i].read_time = cdtime();
498
499       if (ec->config.keep_alive.notify) {
500         char msg[DATA_MAX_NAME_LEN];
501         int sev;
502
503         switch (ec->config.keep_alive.shm->core_state[i]) {
504         case RTE_KA_STATE_ALIVE:
505           sev = NOTIF_OKAY;
506           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
507           break;
508         case RTE_KA_STATE_MISSING:
509           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
510           sev = NOTIF_WARNING;
511           break;
512         case RTE_KA_STATE_DEAD:
513           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
514           sev = NOTIF_FAILURE;
515           break;
516         case RTE_KA_STATE_UNUSED:
517           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
518           sev = NOTIF_OKAY;
519           break;
520         case RTE_KA_STATE_GONE:
521           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
522           sev = NOTIF_FAILURE;
523           break;
524         case RTE_KA_STATE_DOZING:
525           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
526           sev = NOTIF_OKAY;
527           break;
528         case RTE_KA_STATE_SLEEP:
529           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
530           sev = NOTIF_OKAY;
531           break;
532         default:
533           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
534           sev = NOTIF_FAILURE;
535         }
536
537         dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
538                                           ec->core_info[i].read_time, msg);
539       } else {
540         dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
541                                  ec->config.keep_alive.shm->core_state[i],
542                                  ec->core_info[i].read_time);
543       }
544     }
545   }
546 }
547
548 static int dpdk_events_read(user_data_t *ud) {
549   DPDK_EVENTS_TRACE();
550
551   if (g_hc == NULL) {
552     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
553     return -1;
554   }
555
556   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
557   int ls_ret = -1, ka_ret = -1;
558
559   int cmd_res = 0;
560   if (ec->config.link_status.enabled) {
561     ls_ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
562                                  ec->config.interval);
563     if (cmd_res == 0 && ls_ret == 0) {
564       dpdk_events_link_status_dispatch(g_hc);
565     }
566   }
567
568   if (ec->config.keep_alive.enabled) {
569     ka_ret = dpdk_event_keep_alive_shm_open();
570     if (ka_ret) {
571       ERROR(DPDK_EVENTS_PLUGIN
572             ": %s : error %d in dpdk_event_keep_alive_shm_open()",
573             __FUNCTION__, ka_ret);
574     } else
575       dpdk_events_keep_alive_dispatch(g_hc);
576   }
577
578   if (!((cmd_res || ls_ret) == 0 || ka_ret == 0)) {
579     ERROR(DPDK_EVENTS_PLUGIN ": Read failure for all enabled event types");
580     return -1;
581   }
582
583   return 0;
584 }
585
586 static int dpdk_events_init(void) {
587   DPDK_EVENTS_TRACE();
588
589   int ret = dpdk_events_preinit();
590   if (ret)
591     return ret;
592
593   return 0;
594 }
595
596 static int dpdk_events_shutdown(void) {
597   DPDK_EVENTS_TRACE();
598   int ret;
599
600   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
601   if (ec->config.keep_alive.enabled) {
602     if (ec->config.keep_alive.fd != -1) {
603       close(ec->config.keep_alive.fd);
604       ec->config.keep_alive.fd = -1;
605     }
606
607     if (ec->config.keep_alive.shm != MAP_FAILED) {
608       if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t))) {
609         ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
610         return -1;
611       }
612       ec->config.keep_alive.shm = MAP_FAILED;
613     }
614   }
615
616   ret = dpdk_helper_shutdown(g_hc);
617   g_hc = NULL;
618   if (ret)
619     ERROR(DPDK_EVENTS_PLUGIN ": failed to cleanup %s helper", DPDK_EVENTS_NAME);
620
621   return ret;
622 }
623
624 void module_register(void) {
625   plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
626   plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
627   plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
628                                NULL);
629   plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
630 }