Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / dpdkevents.c
1 /*
2  * collectd - src/dpdkevents.c
3  * MIT License
4  *
5  * Copyright(c) 2017 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  *   Kim-Marie Jones <kim-marie.jones@intel.com>
30  *   Krzysztof Matczak <krzysztofx@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37
38 #include "semaphore.h"
39 #include "sys/mman.h"
40 #include "utils_dpdk.h"
41 #include "utils_time.h"
42
43 #include <rte_config.h>
44 #include <rte_eal.h>
45 #include <rte_ethdev.h>
46 #include <rte_keepalive.h>
47
48 #define DPDK_EVENTS_PLUGIN "dpdkevents"
49 #define DPDK_EVENTS_NAME "dpdk_collectd_events"
50 #define ETH_LINK_NA 0xFF
51
52 #define INT64_BIT_SIZE 64
53 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
54 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
55
56 typedef struct dpdk_keepalive_shm_s {
57   sem_t core_died;
58   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
59   uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
60 } dpdk_keepalive_shm_t;
61
62 typedef struct dpdk_ka_monitor_s {
63   cdtime_t read_time;
64   int lcore_state;
65 } dpdk_ka_monitor_t;
66
67 typedef struct dpdk_link_status_config_s {
68   int enabled;
69   _Bool send_updated;
70   uint32_t enabled_port_mask;
71   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
72   _Bool notify;
73 } dpdk_link_status_config_t;
74
75 typedef struct dpdk_keep_alive_config_s {
76   int enabled;
77   _Bool send_updated;
78   uint128_t lcore_mask;
79   dpdk_keepalive_shm_t *shm;
80   char shm_name[DATA_MAX_NAME_LEN];
81   _Bool notify;
82   int fd;
83 } dpdk_keep_alive_config_t;
84
85 typedef struct dpdk_events_config_s {
86   cdtime_t interval;
87   dpdk_link_status_config_t link_status;
88   dpdk_keep_alive_config_t keep_alive;
89 } dpdk_events_config_t;
90
91 typedef struct dpdk_link_info_s {
92   cdtime_t read_time;
93   int status_updated;
94   int link_status;
95 } dpdk_link_info_t;
96
97 typedef struct dpdk_events_ctx_s {
98   dpdk_events_config_t config;
99   uint32_t nb_ports;
100   dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
101   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
102 } dpdk_events_ctx_t;
103
104 typedef enum {
105   DPDK_EVENTS_STATE_CFG_ERR = 1 << 0,
106   DPDK_EVENTS_STATE_KA_CFG_ERR = 1 << 1,
107   DPDK_EVENTS_STATE_LS_CFG_ERR = 1 << 2,
108
109 } dpdk_events_cfg_status;
110
111 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
112
113 #define DPDK_EVENTS_TRACE()                                                    \
114   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
115
116 static dpdk_helper_ctx_t *g_hc;
117 static dpdk_events_cfg_status g_state;
118
119 static int dpdk_event_keep_alive_shm_open(void) {
120   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
121   char *shm_name;
122
123   if (strlen(ec->config.keep_alive.shm_name)) {
124     shm_name = ec->config.keep_alive.shm_name;
125   } else {
126     shm_name = RTE_KEEPALIVE_SHM_NAME;
127     WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
128                                "specified, using default one: %s",
129             shm_name);
130   }
131
132   char errbuf[ERR_BUF_SIZE];
133   int fd = shm_open(shm_name, O_RDONLY, 0);
134   if (fd < 0) {
135     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
136                              "primary application running?",
137           shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
138     return errno;
139   }
140
141   if (ec->config.keep_alive.fd != -1) {
142     struct stat stat_old, stat_new;
143
144     if (fstat(ec->config.keep_alive.fd, &stat_old) || fstat(fd, &stat_new)) {
145       ERROR(DPDK_EVENTS_PLUGIN ": failed to get information about a file");
146       close(fd);
147       return -1;
148     }
149
150     /* Check if inode number has changed. If yes, then create a new mapping */
151     if (stat_old.st_ino == stat_new.st_ino) {
152       close(fd);
153       return 0;
154     }
155
156     if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t)) != 0) {
157       ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
158       close(fd);
159       return -1;
160     }
161
162     close(ec->config.keep_alive.fd);
163     ec->config.keep_alive.fd = -1;
164   }
165
166   ec->config.keep_alive.shm = (dpdk_keepalive_shm_t *)mmap(
167       0, sizeof(*(ec->config.keep_alive.shm)), PROT_READ, MAP_SHARED, fd, 0);
168   if (ec->config.keep_alive.shm == MAP_FAILED) {
169     ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
170           sstrerror(errno, errbuf, sizeof(errbuf)));
171     close(fd);
172     return errno;
173   }
174   ec->config.keep_alive.fd = fd;
175
176   return 0;
177 }
178
179 static void dpdk_events_default_config(void) {
180   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
181
182   ec->config.interval = plugin_get_interval();
183
184   /* In configless mode when no <Plugin/> section is defined in config file
185    * both link_status and keep_alive should be enabled */
186
187   /* Link Status */
188   ec->config.link_status.enabled = 1;
189   ec->config.link_status.enabled_port_mask = ~0;
190   ec->config.link_status.send_updated = 1;
191   ec->config.link_status.notify = 0;
192
193   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
194     ec->config.link_status.port_name[i][0] = 0;
195   }
196
197   /* Keep Alive */
198   ec->config.keep_alive.enabled = 1;
199   ec->config.keep_alive.send_updated = 1;
200   ec->config.keep_alive.notify = 0;
201   /* by default enable 128 cores */
202   memset(&ec->config.keep_alive.lcore_mask, 1,
203          sizeof(ec->config.keep_alive.lcore_mask));
204   memset(&ec->config.keep_alive.shm_name, 0,
205          sizeof(ec->config.keep_alive.shm_name));
206   ec->config.keep_alive.shm = MAP_FAILED;
207   ec->config.keep_alive.fd = -1;
208 }
209
210 static int dpdk_events_preinit(void) {
211   DPDK_EVENTS_TRACE();
212
213   if (g_hc != NULL) {
214     /* already initialized if config callback was called before init callback */
215     DEBUG("dpdk_events_preinit: helper already initialized.");
216     return 0;
217   }
218
219   int ret =
220       dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
221   if (ret != 0) {
222     ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
223           DPDK_EVENTS_NAME, strerror(ret));
224     return ret;
225   }
226
227   dpdk_events_default_config();
228
229   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
230   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
231     ec->link_info[i].link_status = ETH_LINK_NA;
232   }
233
234   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
235     ec->core_info[i].lcore_state = ETH_LINK_NA;
236   }
237
238   return ret;
239 }
240
241 static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
242                                           oconfig_item_t *ci) {
243   ec->config.link_status.enabled = 1;
244
245   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
246
247   for (int i = 0; i < ci->children_num; i++) {
248     oconfig_item_t *child = ci->children + i;
249
250     if (strcasecmp("EnabledPortMask", child->key) == 0) {
251       if (cf_util_get_int(child,
252                           (int *)&ec->config.link_status.enabled_port_mask))
253         return -1;
254       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
255             ec->config.link_status.enabled_port_mask);
256     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
257       if (cf_util_get_boolean(child, &ec->config.link_status.send_updated))
258         return -1;
259       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
260             ec->config.link_status.send_updated);
261     } else if (strcasecmp("SendNotification", child->key) == 0) {
262       if (cf_util_get_boolean(child, &ec->config.link_status.notify))
263         return -1;
264
265       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
266             ec->config.link_status.notify);
267     } else if (strcasecmp("PortName", child->key) != 0) {
268       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
269             child->key);
270       return -1;
271     }
272   }
273
274   int port_num = 0;
275
276   /* parse port names after EnabledPortMask was parsed */
277   for (int i = 0; i < ci->children_num; i++) {
278     oconfig_item_t *child = ci->children + i;
279     if (strcasecmp("PortName", child->key) == 0) {
280       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
281         port_num++;
282
283       if (cf_util_get_string_buffer(
284               child, ec->config.link_status.port_name[port_num],
285               sizeof(ec->config.link_status.port_name[port_num]))) {
286         return -1;
287       }
288       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
289             ec->config.link_status.port_name[port_num]);
290       port_num++;
291     }
292   }
293
294   return 0;
295 }
296
297 static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
298                                          oconfig_item_t *ci) {
299   ec->config.keep_alive.enabled = 1;
300   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
301
302   for (int i = 0; i < ci->children_num; i++) {
303     oconfig_item_t *child = ci->children + i;
304
305     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
306       if (cf_util_get_boolean(child, &ec->config.keep_alive.send_updated))
307         return -1;
308
309       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
310             ec->config.keep_alive.send_updated);
311     } else if (strcasecmp("LCoreMask", child->key) == 0) {
312       char lcore_mask[DATA_MAX_NAME_LEN];
313
314       if (cf_util_get_string_buffer(child, lcore_mask, sizeof(lcore_mask)))
315         return -1;
316       ec->config.keep_alive.lcore_mask =
317           str_to_uint128(lcore_mask, strlen(lcore_mask));
318       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
319             ec->config.keep_alive.lcore_mask.high,
320             ec->config.keep_alive.lcore_mask.low);
321     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
322       if (cf_util_get_string_buffer(child, ec->config.keep_alive.shm_name,
323                                     sizeof(ec->config.keep_alive.shm_name)))
324         return -1;
325
326       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
327             ec->config.keep_alive.shm_name);
328     } else if (strcasecmp("SendNotification", child->key) == 0) {
329       if (cf_util_get_boolean(child, &ec->config.keep_alive.notify))
330         return -1;
331
332       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
333             ec->config.keep_alive.notify);
334     } else {
335       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
336             child->key);
337       return -1;
338     }
339   }
340
341   return 0;
342 }
343
344 static int dpdk_events_config(oconfig_item_t *ci) {
345   DPDK_EVENTS_TRACE();
346   int ret = dpdk_events_preinit();
347   if (ret) {
348     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
349     return 0;
350   }
351
352   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
353
354   /* Disabling link_status and keep_alive since <Plugin/> config section
355    * specifies if those should be enabled */
356   ec->config.keep_alive.enabled = ec->config.link_status.enabled = 0;
357   memset(&ec->config.keep_alive.lcore_mask, 0,
358          sizeof(ec->config.keep_alive.lcore_mask));
359
360   for (int i = 0; i < ci->children_num; i++) {
361     oconfig_item_t *child = ci->children + i;
362
363     if (strcasecmp("EAL", child->key) == 0)
364       ret = dpdk_helper_eal_config_parse(g_hc, child);
365     else if (strcasecmp("Event", child->key) == 0) {
366       char event_type[DATA_MAX_NAME_LEN];
367
368       if (cf_util_get_string_buffer(child, event_type, sizeof(event_type)))
369         ret = -1;
370       else if (strcasecmp(event_type, "link_status") == 0) {
371         ret = dpdk_events_link_status_config(ec, child);
372         if (ret) {
373           g_state |= DPDK_EVENTS_STATE_LS_CFG_ERR;
374           continue;
375         }
376       } else if (strcasecmp(event_type, "keep_alive") == 0) {
377         ret = dpdk_events_keep_alive_config(ec, child);
378         if (ret) {
379           g_state |= DPDK_EVENTS_STATE_KA_CFG_ERR;
380           continue;
381         }
382       } else {
383         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
384               event_type);
385         ret = -1;
386       }
387     } else {
388       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
389             child->key);
390       ret = -1;
391     }
392
393     if (ret != 0) {
394       g_state |= DPDK_EVENTS_STATE_CFG_ERR;
395       return 0;
396     }
397   }
398
399   if (g_state & DPDK_EVENTS_STATE_KA_CFG_ERR) {
400     ERROR(DPDK_EVENTS_PLUGIN
401           ": Invalid keep alive configuration. Event disabled.");
402     ec->config.keep_alive.enabled = 0;
403   }
404
405   if (g_state & DPDK_EVENTS_STATE_LS_CFG_ERR) {
406     ERROR(DPDK_EVENTS_PLUGIN
407           ": Invalid link status configuration. Event disabled.");
408     ec->config.link_status.enabled = 0;
409   }
410
411   if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
412     ERROR(DPDK_EVENTS_PLUGIN ": At least one type of events should be "
413                              "configured for collecting. Plugin misconfigured");
414     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
415     return 0;
416   }
417
418   return 0;
419 }
420
421 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
422   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
423
424   /* get Link Status values from DPDK */
425   uint8_t nb_ports = rte_eth_dev_count();
426   if (nb_ports == 0) {
427     DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
428                    "Check bound devices to DPDK driver.\n");
429     return -ENODEV;
430   }
431   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
432
433   for (int i = 0; i < ec->nb_ports; i++) {
434     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
435       struct rte_eth_link link;
436       ec->link_info[i].read_time = cdtime();
437       rte_eth_link_get_nowait(i, &link);
438       if ((link.link_status == ETH_LINK_NA) ||
439           (link.link_status != ec->link_info[i].link_status)) {
440         ec->link_info[i].link_status = link.link_status;
441         ec->link_info[i].status_updated = 1;
442         DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
443                        link.link_status ? "UP" : "DOWN");
444       }
445     }
446   }
447
448   return 0;
449 }
450
451 /* this function is called from helper context */
452 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
453   if (phc == NULL) {
454     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
455     return -EINVAL;
456   }
457
458   if (cmd != DPDK_CMD_GET_EVENTS) {
459     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
460     return -EINVAL;
461   }
462
463   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
464   int ret = 0;
465   if (ec->config.link_status.enabled)
466     ret = dpdk_helper_link_status_get(phc);
467
468   return ret;
469 }
470
471 static void dpdk_events_notification_dispatch(int severity,
472                                               const char *plugin_instance,
473                                               cdtime_t time, const char *msg) {
474   notification_t n = {
475       .severity = severity, .time = time, .plugin = DPDK_EVENTS_PLUGIN};
476   sstrncpy(n.host, hostname_g, sizeof(n.host));
477   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
478   sstrncpy(n.message, msg, sizeof(n.message));
479   plugin_dispatch_notification(&n);
480 }
481
482 static void dpdk_events_gauge_submit(const char *plugin_instance,
483                                      const char *type_instance, gauge_t value,
484                                      cdtime_t time) {
485   value_list_t vl = {.values = &(value_t){.gauge = value},
486                      .values_len = 1,
487                      .time = time,
488                      .plugin = DPDK_EVENTS_PLUGIN,
489                      .type = "gauge",
490                      .meta = NULL};
491   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
492   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
493   plugin_dispatch_values(&vl);
494 }
495
496 static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
497   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
498   DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
499         ec->nb_ports);
500
501   /* dispatch Link Status values to collectd */
502   for (int i = 0; i < ec->nb_ports; i++) {
503     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
504       if (!ec->config.link_status.send_updated ||
505           ec->link_info[i].status_updated) {
506
507         DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
508               ec->link_info[i].link_status ? "UP" : "DOWN");
509
510         char dev_name[DATA_MAX_NAME_LEN];
511         if (ec->config.link_status.port_name[i][0] != 0) {
512           snprintf(dev_name, sizeof(dev_name), "%s",
513                    ec->config.link_status.port_name[i]);
514         } else {
515           snprintf(dev_name, sizeof(dev_name), "port.%d", i);
516         }
517
518         if (ec->config.link_status.notify) {
519           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
520           char msg[DATA_MAX_NAME_LEN];
521           snprintf(msg, sizeof(msg), "Link Status: %s",
522                    ec->link_info[i].link_status ? "UP" : "DOWN");
523           dpdk_events_notification_dispatch(sev, dev_name,
524                                             ec->link_info[i].read_time, msg);
525         } else {
526           dpdk_events_gauge_submit(dev_name, "link_status",
527                                    (gauge_t)ec->link_info[i].link_status,
528                                    ec->link_info[i].read_time);
529         }
530         ec->link_info[i].status_updated = 0;
531       }
532     }
533   }
534
535   return 0;
536 }
537
538 static void dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
539   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
540
541   /* dispatch Keep Alive values to collectd */
542   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
543     if (i < INT64_BIT_SIZE) {
544       if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
545         continue;
546     } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
547       if (!(ec->config.keep_alive.lcore_mask.high &
548             ((uint64_t)1 << (i - INT64_BIT_SIZE))))
549         continue;
550     } else {
551       WARNING(DPDK_EVENTS_PLUGIN
552               ": %s:%d Core id %u is out of 0 to %u range, skipping",
553               __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
554       continue;
555     }
556
557     char core_name[DATA_MAX_NAME_LEN];
558     snprintf(core_name, sizeof(core_name), "lcore%u", i);
559
560     if (!ec->config.keep_alive.send_updated ||
561         (ec->core_info[i].lcore_state !=
562          ec->config.keep_alive.shm->core_state[i])) {
563       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
564       ec->core_info[i].read_time = cdtime();
565
566       if (ec->config.keep_alive.notify) {
567         char msg[DATA_MAX_NAME_LEN];
568         int sev;
569
570         switch (ec->config.keep_alive.shm->core_state[i]) {
571         case RTE_KA_STATE_ALIVE:
572           sev = NOTIF_OKAY;
573           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
574           break;
575         case RTE_KA_STATE_MISSING:
576           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
577           sev = NOTIF_WARNING;
578           break;
579         case RTE_KA_STATE_DEAD:
580           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
581           sev = NOTIF_FAILURE;
582           break;
583         case RTE_KA_STATE_UNUSED:
584           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
585           sev = NOTIF_OKAY;
586           break;
587         case RTE_KA_STATE_GONE:
588           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
589           sev = NOTIF_FAILURE;
590           break;
591         case RTE_KA_STATE_DOZING:
592           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
593           sev = NOTIF_OKAY;
594           break;
595         case RTE_KA_STATE_SLEEP:
596           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
597           sev = NOTIF_OKAY;
598           break;
599         default:
600           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
601           sev = NOTIF_FAILURE;
602         }
603
604         dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
605                                           ec->core_info[i].read_time, msg);
606       } else {
607         dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
608                                  ec->config.keep_alive.shm->core_state[i],
609                                  ec->core_info[i].read_time);
610       }
611     }
612   }
613 }
614
615 static int dpdk_events_read(user_data_t *ud) {
616   DPDK_EVENTS_TRACE();
617
618   if (g_hc == NULL) {
619     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
620     return -1;
621   }
622
623   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
624   int ls_ret = -1, ka_ret = -1;
625
626   int cmd_res = 0;
627   if (ec->config.link_status.enabled) {
628     ls_ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
629                                  ec->config.interval);
630     if (cmd_res == 0 && ls_ret == 0) {
631       dpdk_events_link_status_dispatch(g_hc);
632     }
633   }
634
635   if (ec->config.keep_alive.enabled) {
636     ka_ret = dpdk_event_keep_alive_shm_open();
637     if (ka_ret) {
638       ERROR(DPDK_EVENTS_PLUGIN
639             ": %s : error %d in dpdk_event_keep_alive_shm_open()",
640             __FUNCTION__, ka_ret);
641     } else
642       dpdk_events_keep_alive_dispatch(g_hc);
643   }
644
645   if (!((cmd_res || ls_ret) == 0 || ka_ret == 0)) {
646     ERROR(DPDK_EVENTS_PLUGIN ": Read failure for all enabled event types");
647     return -1;
648   }
649
650   return 0;
651 }
652
653 static int dpdk_events_shutdown(void) {
654   DPDK_EVENTS_TRACE();
655
656   if (g_hc == NULL)
657     return 0;
658
659   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
660   if (ec->config.keep_alive.enabled) {
661     if (ec->config.keep_alive.fd != -1) {
662       close(ec->config.keep_alive.fd);
663       ec->config.keep_alive.fd = -1;
664     }
665
666     if (ec->config.keep_alive.shm != MAP_FAILED) {
667       if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t))) {
668         ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
669         return -1;
670       }
671       ec->config.keep_alive.shm = MAP_FAILED;
672     }
673   }
674
675   dpdk_helper_shutdown(g_hc);
676   g_hc = NULL;
677
678   return 0;
679 }
680
681 static int dpdk_events_init(void) {
682   DPDK_EVENTS_TRACE();
683
684   if (g_state & DPDK_EVENTS_STATE_CFG_ERR) {
685     dpdk_events_shutdown();
686     return -1;
687   }
688
689   int ret = dpdk_events_preinit();
690   if (ret)
691     return ret;
692
693   return 0;
694 }
695
696 void module_register(void) {
697   plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
698   plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
699   plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
700                                NULL);
701   plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
702 }