Plugin for getting DPDK ports link status and keep alive events.
[collectd.git] / src / dpdkevents.c
1 /*
2  * collectd - src/dpdkevents.c
3  * MIT License
4  *
5  * Copyright(c) 2017 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  *   Kim-Marie Jones <kim-marie.jones@intel.com>
30  *   Krzysztof Matczak <krzysztofx@intel.com>
31  */
32
33 #include "common.h"
34 #include "plugin.h"
35 #include "semaphore.h"
36 #include "sys/mman.h"
37 #include "utils_dpdk.h"
38 #include "utils_time.h"
39 #include "collectd.h"
40
41 #include <rte_config.h>
42 #include <rte_eal.h>
43 #include <rte_ethdev.h>
44 #include <rte_keepalive.h>
45
46 #define DPDK_EVENTS_PLUGIN "dpdkevents"
47 #define DPDK_EVENTS_NAME "dpdk_collectd_events"
48 #define ETH_LINK_NA 0xFF
49
50 #define INT64_BIT_SIZE 64
51 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
52 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
53
54 #define RTE_VERSION_16_07 RTE_VERSION_NUM(16, 7, 0, 16)
55
56 typedef struct dpdk_keepalive_shm_s {
57   sem_t core_died;
58   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
59   uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
60 } dpdk_keepalive_shm_t;
61
62 typedef struct dpdk_ka_monitor_s {
63   cdtime_t read_time;
64   int lcore_state;
65 } dpdk_ka_monitor_t;
66
67 typedef struct dpdk_link_status_config_s {
68   int enabled;
69   int send_updated;
70   uint32_t enabled_port_mask;
71   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
72   int notify;
73 } dpdk_link_status_config_t;
74
75 typedef struct dpdk_keep_alive_config_s {
76   int enabled;
77   int send_updated;
78   uint128_t lcore_mask;
79   dpdk_keepalive_shm_t *shm;
80   char shm_name[DATA_MAX_NAME_LEN];
81   int notify;
82 } dpdk_keep_alive_config_t;
83
84 typedef struct dpdk_events_config_s {
85   cdtime_t interval;
86   dpdk_link_status_config_t link_status;
87   dpdk_keep_alive_config_t keep_alive;
88 } dpdk_events_config_t;
89
90 typedef struct dpdk_link_info_s {
91   cdtime_t read_time;
92   int status_updated;
93   int link_status;
94 } dpdk_link_info_t;
95
96 typedef struct dpdk_events_ctx_s {
97   dpdk_events_config_t config;
98   uint32_t nb_ports;
99   dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
100   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
101 } dpdk_events_ctx_t;
102
103 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
104
105 #define DPDK_EVENTS_TRACE()                                                    \
106   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
107
108 static dpdk_helper_ctx_t *g_hc;
109
110 static int dpdk_event_keep_alive_shm_create(void) {
111   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
112   char *shm_name;
113
114   if (strlen(ec->config.keep_alive.shm_name)) {
115     shm_name = ec->config.keep_alive.shm_name;
116   } else {
117     shm_name = RTE_KEEPALIVE_SHM_NAME;
118     WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
119                                "specified, using default one: %s",
120             shm_name);
121   }
122
123   char errbuf[ERR_BUF_SIZE];
124   int fd = shm_open(shm_name, O_RDWR, 0);
125   if (fd < 0) {
126     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
127                              "primary application running?",
128           shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
129     return errno;
130   } else {
131     ec->config.keep_alive.shm =
132         (dpdk_keepalive_shm_t *)mmap(0, sizeof(*(ec->config.keep_alive.shm)),
133                                      PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
134     close(fd);
135     if (ec->config.keep_alive.shm == MAP_FAILED) {
136       ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
137             sstrerror(errno, errbuf, sizeof(errbuf)));
138       return errno;
139     }
140   }
141
142   return 0;
143 }
144
145 static void dpdk_events_default_config(void) {
146   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
147
148   ec->config.interval = plugin_get_interval();
149
150   /* Link Status */
151   ec->config.link_status.enabled = 0;
152   ec->config.link_status.enabled_port_mask = ~0;
153   ec->config.link_status.send_updated = 1;
154   ec->config.link_status.notify = 0;
155
156   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
157     ec->config.link_status.port_name[i][0] = 0;
158   }
159
160   /* Keep Alive */
161   ec->config.keep_alive.enabled = 0;
162   ec->config.keep_alive.send_updated = 1;
163   ec->config.keep_alive.notify = 0;
164   memset(&ec->config.keep_alive.lcore_mask, 0,
165          sizeof(ec->config.keep_alive.lcore_mask));
166   memset(&ec->config.keep_alive.shm_name, 0,
167          sizeof(ec->config.keep_alive.shm_name));
168 }
169
170 static int dpdk_events_preinit(void) {
171   DPDK_EVENTS_TRACE();
172
173   if (g_hc != NULL) {
174     /* already initialized if config callback was called before init callback */
175     DEBUG("dpdk_events_preinit: helper already initialized.");
176     return 0;
177   }
178
179   int ret =
180       dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
181   if (ret != 0) {
182     ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
183           DPDK_EVENTS_NAME, strerror(ret));
184     return ret;
185   }
186
187   dpdk_events_default_config();
188
189   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
190   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
191     ec->link_info[i].link_status = ETH_LINK_NA;
192   }
193
194   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
195     ec->core_info[i].lcore_state = ETH_LINK_NA;
196   }
197
198   return ret;
199 }
200
201 static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
202                                           oconfig_item_t *ci) {
203   ec->config.link_status.enabled = 1;
204
205   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
206
207   for (int i = 0; i < ci->children_num; i++) {
208     oconfig_item_t *child = ci->children + i;
209
210     if (strcasecmp("EnabledPortMask", child->key) == 0) {
211       ec->config.link_status.enabled_port_mask =
212           (uint32_t)child->values[0].value.number;
213       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
214             ec->config.link_status.enabled_port_mask);
215     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
216       ec->config.link_status.send_updated = child->values[0].value.boolean;
217       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
218             (int)child->values[0].value.boolean);
219     } else if (strcasecmp("SendNotification", child->key) == 0) {
220       ec->config.link_status.notify = child->values[0].value.boolean;
221       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
222             (int)child->values[0].value.boolean);
223     }
224   }
225
226   int port_num = 0;
227
228   /* parse port names after EnabledPortMask was parsed */
229   for (int i = 0; i < ci->children_num; i++) {
230     oconfig_item_t *child = ci->children + i;
231     if (strcasecmp("PortName", child->key) == 0) {
232       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
233         port_num++;
234       ssnprintf(ec->config.link_status.port_name[port_num], DATA_MAX_NAME_LEN,
235                 "%s", child->values[0].value.string);
236       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
237             ec->config.link_status.port_name[port_num]);
238       port_num++;
239     }
240   }
241
242   return 0;
243 }
244
245 static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
246                                          oconfig_item_t *ci) {
247   ec->config.keep_alive.enabled = 1;
248   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
249
250   for (int i = 0; i < ci->children_num; i++) {
251     oconfig_item_t *child = ci->children + i;
252
253     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
254       ec->config.keep_alive.send_updated = child->values[0].value.boolean;
255       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
256             (int)child->values[0].value.boolean);
257     } else if (strcasecmp("LCoreMask", child->key) == 0) {
258       char lcore_mask[DATA_MAX_NAME_LEN];
259       ssnprintf(lcore_mask, sizeof(lcore_mask), "%s",
260                 child->values[0].value.string);
261       ec->config.keep_alive.lcore_mask =
262           str_to_uint128(lcore_mask, strlen(lcore_mask));
263       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
264             ec->config.keep_alive.lcore_mask.high,
265             ec->config.keep_alive.lcore_mask.low);
266     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
267       ssnprintf(ec->config.keep_alive.shm_name,
268                 sizeof(ec->config.keep_alive.shm_name), "%s",
269                 child->values[0].value.string);
270       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
271             ec->config.keep_alive.shm_name);
272     } else if (strcasecmp("SendNotification", child->key) == 0) {
273       ec->config.keep_alive.notify = child->values[0].value.boolean;
274       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
275             (int)child->values[0].value.boolean);
276     }
277   }
278
279   return 0;
280 }
281
282 static int dpdk_events_config(oconfig_item_t *ci) {
283   DPDK_EVENTS_TRACE();
284
285   int ret = dpdk_events_preinit();
286   if (ret)
287     return ret;
288
289   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
290
291   for (int i = 0; i < ci->children_num; i++) {
292     oconfig_item_t *child = ci->children + i;
293     if (strcasecmp("EAL", child->key) == 0) {
294       dpdk_helper_eal_config_parse(g_hc, child);
295     } else if (strcasecmp("Event", child->key) == 0) {
296       if (strcasecmp(child->values[0].value.string, "link_status") == 0) {
297         dpdk_events_link_status_config(ec, child);
298       } else if (strcasecmp(child->values[0].value.string, "keep_alive") == 0) {
299         dpdk_events_keep_alive_config(ec, child);
300       } else {
301         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
302               child->values[0].value.string);
303       }
304     }
305   }
306
307   return ret;
308 }
309
310 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
311   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
312
313   /* get Link Status values from DPDK */
314   uint8_t nb_ports = rte_eth_dev_count();
315   if (nb_ports == 0) {
316     DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
317                    "Check bound devices to DPDK driver.\n");
318     return -ENODEV;
319   }
320   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
321
322   for (int i = 0; i < ec->nb_ports; i++) {
323     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
324       struct rte_eth_link link;
325       ec->link_info[i].read_time = cdtime();
326       rte_eth_link_get_nowait(i, &link);
327       if ((link.link_status == ETH_LINK_NA) ||
328           (link.link_status != ec->link_info[i].link_status)) {
329         ec->link_info[i].link_status = link.link_status;
330         ec->link_info[i].status_updated = 1;
331         DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
332                        link.link_status ? "UP" : "DOWN");
333       }
334     }
335   }
336
337   return 0;
338 }
339
340 /* this function is called from helper context */
341 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
342   if (phc == NULL) {
343     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
344     return -EINVAL;
345   }
346
347   if (cmd != DPDK_CMD_GET_EVENTS) {
348     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
349     return -EINVAL;
350   }
351
352   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
353
354   if (ec->config.link_status.enabled)
355     dpdk_helper_link_status_get(phc);
356
357   return 0;
358 }
359
360 static void dpdk_events_notification_dispatch(int severity,
361                                               char *plugin_instance,
362                                               cdtime_t time, char *msg) {
363   notification_t n = {0};
364   n.severity = severity;
365   n.time = time;
366   sstrncpy(n.host, hostname_g, sizeof(n.host));
367   sstrncpy(n.plugin, DPDK_EVENTS_PLUGIN, sizeof(n.plugin));
368   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
369   sstrncpy(n.message, msg, sizeof(n.message));
370   plugin_dispatch_notification(&n);
371 }
372
373 static void dpdk_events_gauge_submit(char *plugin_instance, char *type,
374                                      gauge_t value, cdtime_t time) {
375   value_list_t vl = {.values = &(value_t){.gauge = value},
376                      .values_len = 1,
377                      .time = time,
378                      .plugin = DPDK_EVENTS_PLUGIN,
379                      .type = "gauge",
380                      .meta = NULL};
381   sstrncpy(vl.host, hostname_g, sizeof(vl.host));
382   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
383   sstrncpy(vl.type_instance, type, sizeof(vl.type_instance));
384   plugin_dispatch_values(&vl);
385 }
386
387 static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
388   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
389   DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
390         ec->nb_ports);
391
392   /* dispatch Link Status values to collectd */
393   for (int i = 0; i < ec->nb_ports; i++) {
394     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
395       if (!ec->config.link_status.send_updated ||
396           ec->link_info[i].status_updated) {
397
398         DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
399               ec->link_info[i].link_status ? "UP" : "DOWN");
400
401         char dev_name[DATA_MAX_NAME_LEN];
402         if (ec->config.link_status.port_name[i][0] != 0) {
403           ssnprintf(dev_name, sizeof(dev_name), "%s",
404                     ec->config.link_status.port_name[i]);
405         } else {
406           ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
407         }
408
409         if (ec->config.link_status.notify) {
410           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
411           char msg[DATA_MAX_NAME_LEN];
412           ssnprintf(msg, sizeof(msg), "Link Status: %s",
413                     ec->link_info[i].link_status ? "UP" : "DOWN");
414           dpdk_events_notification_dispatch(sev, dev_name,
415                                             ec->link_info[i].read_time, msg);
416         } else {
417           dpdk_events_gauge_submit(dev_name, "link_status",
418                                    (gauge_t)ec->link_info[i].link_status,
419                                    ec->link_info[i].read_time);
420         }
421         ec->link_info[i].status_updated = 0;
422       }
423     }
424   }
425
426   return 0;
427 }
428
429 static int dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
430   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
431
432   /* dispatch Keep Alive values to collectd */
433   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
434     if (i < INT64_BIT_SIZE) {
435       if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
436         continue;
437     } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
438       if (!(ec->config.keep_alive.lcore_mask.high &
439             ((uint64_t)1 << (i - INT64_BIT_SIZE))))
440         continue;
441     } else {
442       WARNING(DPDK_EVENTS_PLUGIN
443               ": %s:%d Core id %u is out of 0 to %u range, skipping",
444               __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
445       continue;
446     }
447
448     char core_name[DATA_MAX_NAME_LEN];
449     ssnprintf(core_name, sizeof(core_name), "lcore%u", i);
450
451     if (!ec->config.keep_alive.send_updated ||
452         (ec->core_info[i].lcore_state !=
453          ec->config.keep_alive.shm->core_state[i])) {
454       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
455       ec->core_info[i].read_time = cdtime();
456
457 #if RTE_VERSION >= RTE_VERSION_16_07
458       if (ec->config.keep_alive.notify) {
459         char msg[DATA_MAX_NAME_LEN];
460         int sev;
461
462         switch (ec->config.keep_alive.shm->core_state[i]) {
463         case RTE_KA_STATE_ALIVE:
464           sev = NOTIF_OKAY;
465           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
466           break;
467         case RTE_KA_STATE_MISSING:
468           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
469           sev = NOTIF_WARNING;
470           break;
471         case RTE_KA_STATE_DEAD:
472           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
473           sev = NOTIF_FAILURE;
474           break;
475         case RTE_KA_STATE_UNUSED:
476           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
477           sev = NOTIF_OKAY;
478           break;
479         case RTE_KA_STATE_GONE:
480           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
481           sev = NOTIF_FAILURE;
482           break;
483         case RTE_KA_STATE_DOZING:
484           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
485           sev = NOTIF_OKAY;
486           break;
487         case RTE_KA_STATE_SLEEP:
488           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
489           sev = NOTIF_OKAY;
490           break;
491         default:
492           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
493           sev = NOTIF_FAILURE;
494         }
495
496         dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
497                                           ec->core_info[i].read_time, msg);
498       } else {
499         dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
500                                  ec->config.keep_alive.shm->core_state[i],
501                                  ec->core_info[i].read_time);
502       }
503 #else
504       dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
505                                ec->config.keep_alive.shm->core_state[i],
506                                ec->core_info[i].read_time);
507 #endif /* #if RTE_VERSION >= RTE_VERSION_16_07 */
508     }
509   }
510
511   return 0;
512 }
513
514 static int dpdk_events_read(user_data_t *ud) {
515   DPDK_EVENTS_TRACE();
516
517   if (g_hc == NULL) {
518     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
519     return -EINVAL;
520   }
521
522   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
523
524   if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
525     /* nothing to do */
526     return 0;
527   }
528
529   if (ec->config.link_status.enabled) {
530     int cmd_res = 0;
531     int ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
532                                   ec->config.interval);
533     if (cmd_res == 0 && ret == 0) {
534       dpdk_events_link_status_dispatch(g_hc);
535     }
536   }
537
538   if (ec->config.keep_alive.enabled) {
539     dpdk_events_keep_alive_dispatch(g_hc);
540   }
541
542   return 0;
543 }
544
545 static int dpdk_events_init(void) {
546   DPDK_EVENTS_TRACE();
547
548   int ret = dpdk_events_preinit();
549   if (ret)
550     return ret;
551
552   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
553
554   if (ec->config.keep_alive.enabled) {
555     ret = dpdk_event_keep_alive_shm_create();
556     if (ret) {
557       ERROR(DPDK_EVENTS_PLUGIN ": %s : error %d in ka_shm_create()",
558             __FUNCTION__, ret);
559       return ret;
560     }
561   }
562   return 0;
563 }
564
565 static int dpdk_events_shutdown(void) {
566   DPDK_EVENTS_TRACE();
567   int ret = 0;
568
569   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
570   if (ec->config.keep_alive.enabled) {
571     ret = munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t));
572     if (ret) {
573       ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor returned %d", ret);
574       return ret;
575     }
576   }
577
578   ret = dpdk_helper_shutdown(g_hc);
579   g_hc = NULL;
580   if (ret)
581     ERROR(DPDK_EVENTS_PLUGIN ": failed to cleanup %s helper", DPDK_EVENTS_NAME);
582
583   return ret;
584 }
585
586 void module_register(void) {
587   plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
588   plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
589   plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
590                                NULL);
591   plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
592 }