dpdk plugins: bug fixes and improvements
[collectd.git] / src / dpdkevents.c
1 /*
2  * collectd - src/dpdkevents.c
3  * MIT License
4  *
5  * Copyright(c) 2017 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  *   Kim-Marie Jones <kim-marie.jones@intel.com>
30  *   Krzysztof Matczak <krzysztofx@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37
38 #include "semaphore.h"
39 #include "sys/mman.h"
40 #include "utils_dpdk.h"
41 #include "utils_time.h"
42
43 #include <rte_config.h>
44 #include <rte_eal.h>
45 #include <rte_ethdev.h>
46 #include <rte_keepalive.h>
47
48 #define DPDK_EVENTS_PLUGIN "dpdkevents"
49 #define DPDK_EVENTS_NAME "dpdk_collectd_events"
50 #define ETH_LINK_NA 0xFF
51
52 #define INT64_BIT_SIZE 64
53 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
54 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
55
56 typedef struct dpdk_keepalive_shm_s {
57   sem_t core_died;
58   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
59   uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
60 } dpdk_keepalive_shm_t;
61
62 typedef struct dpdk_ka_monitor_s {
63   cdtime_t read_time;
64   int lcore_state;
65 } dpdk_ka_monitor_t;
66
67 typedef struct dpdk_link_status_config_s {
68   int enabled;
69   _Bool send_updated;
70   uint32_t enabled_port_mask;
71   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
72   _Bool notify;
73 } dpdk_link_status_config_t;
74
75 typedef struct dpdk_keep_alive_config_s {
76   int enabled;
77   _Bool send_updated;
78   uint128_t lcore_mask;
79   dpdk_keepalive_shm_t *shm;
80   char shm_name[DATA_MAX_NAME_LEN];
81   _Bool notify;
82   int fd;
83 } dpdk_keep_alive_config_t;
84
85 typedef struct dpdk_events_config_s {
86   cdtime_t interval;
87   dpdk_link_status_config_t link_status;
88   dpdk_keep_alive_config_t keep_alive;
89 } dpdk_events_config_t;
90
91 typedef struct dpdk_link_info_s {
92   cdtime_t read_time;
93   int status_updated;
94   int link_status;
95 } dpdk_link_info_t;
96
97 typedef struct dpdk_events_ctx_s {
98   dpdk_events_config_t config;
99   uint32_t nb_ports;
100   dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
101   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
102 } dpdk_events_ctx_t;
103
104 typedef enum {
105   DPDK_EVENTS_STATE_CFG_ERR = 1 << 0,
106   DPDK_EVENTS_STATE_KA_CFG_ERR = 1 << 1,
107   DPDK_EVENTS_STATE_LS_CFG_ERR = 1 << 2,
108
109 } dpdk_events_cfg_status;
110
111 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
112
113 #define DPDK_EVENTS_TRACE()                                                    \
114   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
115
116 static dpdk_helper_ctx_t *g_hc;
117 static dpdk_events_cfg_status g_state;
118
119 static int dpdk_event_keep_alive_shm_open(void) {
120   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
121   char *shm_name;
122
123   if (strlen(ec->config.keep_alive.shm_name)) {
124     shm_name = ec->config.keep_alive.shm_name;
125   } else {
126     shm_name = RTE_KEEPALIVE_SHM_NAME;
127     WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
128                                "specified, using default one: %s",
129             shm_name);
130   }
131
132   char errbuf[ERR_BUF_SIZE];
133   int fd = shm_open(shm_name, O_RDONLY, 0);
134   if (fd < 0) {
135     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
136                              "primary application running?",
137           shm_name, sstrerror(errno, errbuf, sizeof(errbuf)));
138     return errno;
139   }
140
141   if (ec->config.keep_alive.fd != -1) {
142     struct stat stat_old, stat_new;
143
144     if (fstat(ec->config.keep_alive.fd, &stat_old) || fstat(fd, &stat_new)) {
145       ERROR(DPDK_EVENTS_PLUGIN ": failed to get information about a file");
146       close(fd);
147       return -1;
148     }
149
150     /* Check if inode number has changed. If yes, then create a new mapping */
151     if (stat_old.st_ino == stat_new.st_ino) {
152       close(fd);
153       return 0;
154     }
155
156     if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t)) != 0) {
157       ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
158       close(fd);
159       return -1;
160     }
161
162     close(ec->config.keep_alive.fd);
163     ec->config.keep_alive.fd = -1;
164   }
165
166   ec->config.keep_alive.shm = (dpdk_keepalive_shm_t *)mmap(
167       0, sizeof(*(ec->config.keep_alive.shm)), PROT_READ, MAP_SHARED, fd, 0);
168   if (ec->config.keep_alive.shm == MAP_FAILED) {
169     ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s",
170           sstrerror(errno, errbuf, sizeof(errbuf)));
171     close(fd);
172     return errno;
173   }
174   ec->config.keep_alive.fd = fd;
175
176   return 0;
177 }
178
179 static void dpdk_events_default_config(void) {
180   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
181
182   ec->config.interval = plugin_get_interval();
183
184   /* In configless mode when no <Plugin/> section is defined in config file
185    * both link_status and keep_alive should be enabled */
186
187   /* Link Status */
188   ec->config.link_status.enabled = 1;
189   ec->config.link_status.enabled_port_mask = ~0;
190   ec->config.link_status.send_updated = 1;
191   ec->config.link_status.notify = 0;
192
193   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
194     ec->config.link_status.port_name[i][0] = 0;
195   }
196
197   /* Keep Alive */
198   ec->config.keep_alive.enabled = 1;
199   ec->config.keep_alive.send_updated = 1;
200   ec->config.keep_alive.notify = 0;
201   /* by default enable 128 cores */
202   memset(&ec->config.keep_alive.lcore_mask, 1,
203          sizeof(ec->config.keep_alive.lcore_mask));
204   memset(&ec->config.keep_alive.shm_name, 0,
205          sizeof(ec->config.keep_alive.shm_name));
206   ec->config.keep_alive.shm = MAP_FAILED;
207   ec->config.keep_alive.fd = -1;
208 }
209
210 static int dpdk_events_preinit(void) {
211   DPDK_EVENTS_TRACE();
212
213   if (g_hc != NULL) {
214     /* already initialized if config callback was called before init callback */
215     DEBUG("dpdk_events_preinit: helper already initialized.");
216     return 0;
217   }
218
219   int ret =
220       dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
221   if (ret != 0) {
222     ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
223           DPDK_EVENTS_NAME, strerror(ret));
224     return ret;
225   }
226
227   dpdk_events_default_config();
228
229   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
230   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
231     ec->link_info[i].link_status = ETH_LINK_NA;
232   }
233
234   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
235     ec->core_info[i].lcore_state = ETH_LINK_NA;
236   }
237
238   return ret;
239 }
240
241 static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
242                                           oconfig_item_t *ci) {
243   ec->config.link_status.enabled = 1;
244
245   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
246
247   for (int i = 0; i < ci->children_num; i++) {
248     oconfig_item_t *child = ci->children + i;
249
250     if (strcasecmp("EnabledPortMask", child->key) == 0) {
251       if (cf_util_get_int(child,
252                           (int *)&ec->config.link_status.enabled_port_mask))
253         return -1;
254       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
255             ec->config.link_status.enabled_port_mask);
256     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
257       if (cf_util_get_boolean(child, &ec->config.link_status.send_updated))
258         return -1;
259       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
260             ec->config.link_status.send_updated);
261     } else if (strcasecmp("SendNotification", child->key) == 0) {
262       if (cf_util_get_boolean(child, &ec->config.link_status.notify))
263         return -1;
264
265       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
266             ec->config.link_status.notify);
267     } else if (strcasecmp("PortName", child->key) != 0) {
268       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
269             child->key);
270       return -1;
271     }
272   }
273
274   int port_num = 0;
275
276   /* parse port names after EnabledPortMask was parsed */
277   for (int i = 0; i < ci->children_num; i++) {
278     oconfig_item_t *child = ci->children + i;
279     if (strcasecmp("PortName", child->key) == 0) {
280       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
281         port_num++;
282
283       if (cf_util_get_string_buffer(
284               child, ec->config.link_status.port_name[port_num],
285               sizeof(ec->config.link_status.port_name[port_num]))) {
286         return -1;
287       }
288       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
289             ec->config.link_status.port_name[port_num]);
290       port_num++;
291     }
292   }
293
294   return 0;
295 }
296
297 static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
298                                          oconfig_item_t *ci) {
299   ec->config.keep_alive.enabled = 1;
300   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
301
302   for (int i = 0; i < ci->children_num; i++) {
303     oconfig_item_t *child = ci->children + i;
304
305     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
306       if (cf_util_get_boolean(child, &ec->config.keep_alive.send_updated))
307         return -1;
308
309       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
310             ec->config.keep_alive.send_updated);
311     } else if (strcasecmp("LCoreMask", child->key) == 0) {
312       char lcore_mask[DATA_MAX_NAME_LEN];
313
314       if (cf_util_get_string_buffer(child, lcore_mask, sizeof(lcore_mask)))
315         return -1;
316       ec->config.keep_alive.lcore_mask =
317           str_to_uint128(lcore_mask, strlen(lcore_mask));
318       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
319             ec->config.keep_alive.lcore_mask.high,
320             ec->config.keep_alive.lcore_mask.low);
321     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
322       if (cf_util_get_string_buffer(child, ec->config.keep_alive.shm_name,
323                                     sizeof(ec->config.keep_alive.shm_name)))
324         return -1;
325
326       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
327             ec->config.keep_alive.shm_name);
328     } else if (strcasecmp("SendNotification", child->key) == 0) {
329       if (cf_util_get_boolean(child, &ec->config.keep_alive.notify))
330         return -1;
331
332       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
333             ec->config.keep_alive.notify);
334     } else {
335       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
336             child->key);
337       return -1;
338     }
339   }
340
341   return 0;
342 }
343
344 static int dpdk_events_config(oconfig_item_t *ci) {
345   DPDK_EVENTS_TRACE();
346   int ret = dpdk_events_preinit();
347   if (ret) {
348     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
349     return 0;
350   }
351
352   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
353
354   /* Disabling link_status and keep_alive since <Plugin/> config section
355    * specifies if those should be enabled */
356   ec->config.keep_alive.enabled = ec->config.link_status.enabled = 0;
357   memset(&ec->config.keep_alive.lcore_mask, 0,
358          sizeof(ec->config.keep_alive.lcore_mask));
359
360   for (int i = 0; i < ci->children_num; i++) {
361     oconfig_item_t *child = ci->children + i;
362
363     if (strcasecmp("EAL", child->key) == 0)
364       ret = dpdk_helper_eal_config_parse(g_hc, child);
365     else if (strcasecmp("Event", child->key) == 0) {
366       char event_type[DATA_MAX_NAME_LEN];
367
368       if (cf_util_get_string_buffer(child, event_type, sizeof(event_type)))
369         ret = -1;
370       else if (strcasecmp(event_type, "link_status") == 0) {
371         ret = dpdk_events_link_status_config(ec, child);
372         if (ret) {
373           g_state |= DPDK_EVENTS_STATE_LS_CFG_ERR;
374           continue;
375         }
376       } else if (strcasecmp(event_type, "keep_alive") == 0) {
377         ret = dpdk_events_keep_alive_config(ec, child);
378         if (ret) {
379           g_state |= DPDK_EVENTS_STATE_KA_CFG_ERR;
380           continue;
381         }
382       } else {
383         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
384               event_type);
385         ret = -1;
386       }
387     } else {
388       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
389             child->key);
390       ret = -1;
391     }
392
393     if (ret != 0) {
394       g_state |= DPDK_EVENTS_STATE_CFG_ERR;
395       return 0;
396     }
397   }
398
399   if (g_state & DPDK_EVENTS_STATE_KA_CFG_ERR) {
400     ERROR(DPDK_EVENTS_PLUGIN
401           ": Invalid keep alive configuration. Event disabled.");
402     ec->config.keep_alive.enabled = 0;
403   }
404
405   if (g_state & DPDK_EVENTS_STATE_LS_CFG_ERR) {
406     ERROR(DPDK_EVENTS_PLUGIN
407           ": Invalid link status configuration. Event disabled.");
408     ec->config.link_status.enabled = 0;
409   }
410
411   if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
412     ERROR(DPDK_EVENTS_PLUGIN ": At least one type of events should be "
413                              "configured for collecting. Plugin misconfigured");
414     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
415     return 0;
416   }
417
418   return 0;
419 }
420
421 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
422   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
423
424   /* get Link Status values from DPDK */
425   uint8_t nb_ports = rte_eth_dev_count();
426   if (nb_ports == 0) {
427     DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
428                    "Check bound devices to DPDK driver.\n");
429     return -ENODEV;
430   }
431   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
432
433   for (int i = 0; i < ec->nb_ports; i++) {
434     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
435       struct rte_eth_link link;
436       ec->link_info[i].read_time = cdtime();
437       rte_eth_link_get_nowait(i, &link);
438       if ((link.link_status == ETH_LINK_NA) ||
439           (link.link_status != ec->link_info[i].link_status)) {
440         ec->link_info[i].link_status = link.link_status;
441         ec->link_info[i].status_updated = 1;
442         DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
443                        link.link_status ? "UP" : "DOWN");
444       }
445     }
446   }
447
448   return 0;
449 }
450
451 /* this function is called from helper context */
452 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
453   if (phc == NULL) {
454     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
455     return -EINVAL;
456   }
457
458   if (cmd != DPDK_CMD_GET_EVENTS) {
459     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
460     return -EINVAL;
461   }
462
463   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
464   int ret = 0;
465   if (ec->config.link_status.enabled)
466     ret = dpdk_helper_link_status_get(phc);
467
468   return ret;
469 }
470
471 static void dpdk_events_notification_dispatch(int severity,
472                                               const char *plugin_instance,
473                                               cdtime_t time, const char *msg) {
474   notification_t n = {
475       .severity = severity, .time = time, .plugin = DPDK_EVENTS_PLUGIN};
476   sstrncpy(n.host, hostname_g, sizeof(n.host));
477   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
478   sstrncpy(n.message, msg, sizeof(n.message));
479   plugin_dispatch_notification(&n);
480 }
481
482 static void dpdk_events_gauge_submit(const char *plugin_instance,
483                                      const char *type_instance, gauge_t value,
484                                      cdtime_t time) {
485   value_list_t vl = {.values = &(value_t){.gauge = value},
486                      .values_len = 1,
487                      .time = time,
488                      .plugin = DPDK_EVENTS_PLUGIN,
489                      .type = "gauge",
490                      .meta = NULL};
491   sstrncpy(vl.host, hostname_g, sizeof(vl.host));
492   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
493   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
494   plugin_dispatch_values(&vl);
495 }
496
497 static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
498   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
499   DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
500         ec->nb_ports);
501
502   /* dispatch Link Status values to collectd */
503   for (int i = 0; i < ec->nb_ports; i++) {
504     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
505       if (!ec->config.link_status.send_updated ||
506           ec->link_info[i].status_updated) {
507
508         DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
509               ec->link_info[i].link_status ? "UP" : "DOWN");
510
511         char dev_name[DATA_MAX_NAME_LEN];
512         if (ec->config.link_status.port_name[i][0] != 0) {
513           ssnprintf(dev_name, sizeof(dev_name), "%s",
514                     ec->config.link_status.port_name[i]);
515         } else {
516           ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
517         }
518
519         if (ec->config.link_status.notify) {
520           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
521           char msg[DATA_MAX_NAME_LEN];
522           ssnprintf(msg, sizeof(msg), "Link Status: %s",
523                     ec->link_info[i].link_status ? "UP" : "DOWN");
524           dpdk_events_notification_dispatch(sev, dev_name,
525                                             ec->link_info[i].read_time, msg);
526         } else {
527           dpdk_events_gauge_submit(dev_name, "link_status",
528                                    (gauge_t)ec->link_info[i].link_status,
529                                    ec->link_info[i].read_time);
530         }
531         ec->link_info[i].status_updated = 0;
532       }
533     }
534   }
535
536   return 0;
537 }
538
539 static void dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
540   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
541
542   /* dispatch Keep Alive values to collectd */
543   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
544     if (i < INT64_BIT_SIZE) {
545       if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
546         continue;
547     } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
548       if (!(ec->config.keep_alive.lcore_mask.high &
549             ((uint64_t)1 << (i - INT64_BIT_SIZE))))
550         continue;
551     } else {
552       WARNING(DPDK_EVENTS_PLUGIN
553               ": %s:%d Core id %u is out of 0 to %u range, skipping",
554               __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
555       continue;
556     }
557
558     char core_name[DATA_MAX_NAME_LEN];
559     ssnprintf(core_name, sizeof(core_name), "lcore%u", i);
560
561     if (!ec->config.keep_alive.send_updated ||
562         (ec->core_info[i].lcore_state !=
563          ec->config.keep_alive.shm->core_state[i])) {
564       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
565       ec->core_info[i].read_time = cdtime();
566
567       if (ec->config.keep_alive.notify) {
568         char msg[DATA_MAX_NAME_LEN];
569         int sev;
570
571         switch (ec->config.keep_alive.shm->core_state[i]) {
572         case RTE_KA_STATE_ALIVE:
573           sev = NOTIF_OKAY;
574           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
575           break;
576         case RTE_KA_STATE_MISSING:
577           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
578           sev = NOTIF_WARNING;
579           break;
580         case RTE_KA_STATE_DEAD:
581           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
582           sev = NOTIF_FAILURE;
583           break;
584         case RTE_KA_STATE_UNUSED:
585           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
586           sev = NOTIF_OKAY;
587           break;
588         case RTE_KA_STATE_GONE:
589           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
590           sev = NOTIF_FAILURE;
591           break;
592         case RTE_KA_STATE_DOZING:
593           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
594           sev = NOTIF_OKAY;
595           break;
596         case RTE_KA_STATE_SLEEP:
597           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
598           sev = NOTIF_OKAY;
599           break;
600         default:
601           ssnprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
602           sev = NOTIF_FAILURE;
603         }
604
605         dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
606                                           ec->core_info[i].read_time, msg);
607       } else {
608         dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
609                                  ec->config.keep_alive.shm->core_state[i],
610                                  ec->core_info[i].read_time);
611       }
612     }
613   }
614 }
615
616 static int dpdk_events_read(user_data_t *ud) {
617   DPDK_EVENTS_TRACE();
618
619   if (g_hc == NULL) {
620     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
621     return -1;
622   }
623
624   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
625   int ls_ret = -1, ka_ret = -1;
626
627   int cmd_res = 0;
628   if (ec->config.link_status.enabled) {
629     ls_ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
630                                  ec->config.interval);
631     if (cmd_res == 0 && ls_ret == 0) {
632       dpdk_events_link_status_dispatch(g_hc);
633     }
634   }
635
636   if (ec->config.keep_alive.enabled) {
637     ka_ret = dpdk_event_keep_alive_shm_open();
638     if (ka_ret) {
639       ERROR(DPDK_EVENTS_PLUGIN
640             ": %s : error %d in dpdk_event_keep_alive_shm_open()",
641             __FUNCTION__, ka_ret);
642     } else
643       dpdk_events_keep_alive_dispatch(g_hc);
644   }
645
646   if (!((cmd_res || ls_ret) == 0 || ka_ret == 0)) {
647     ERROR(DPDK_EVENTS_PLUGIN ": Read failure for all enabled event types");
648     return -1;
649   }
650
651   return 0;
652 }
653
654 static int dpdk_events_shutdown(void) {
655   DPDK_EVENTS_TRACE();
656
657   if (g_hc == NULL)
658     return 0;
659
660   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
661   if (ec->config.keep_alive.enabled) {
662     if (ec->config.keep_alive.fd != -1) {
663       close(ec->config.keep_alive.fd);
664       ec->config.keep_alive.fd = -1;
665     }
666
667     if (ec->config.keep_alive.shm != MAP_FAILED) {
668       if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t))) {
669         ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
670         return -1;
671       }
672       ec->config.keep_alive.shm = MAP_FAILED;
673     }
674   }
675
676   dpdk_helper_shutdown(g_hc);
677   g_hc = NULL;
678
679   return 0;
680 }
681
682 static int dpdk_events_init(void) {
683   DPDK_EVENTS_TRACE();
684
685   if (g_state & DPDK_EVENTS_STATE_CFG_ERR) {
686     dpdk_events_shutdown();
687     return -1;
688   }
689
690   int ret = dpdk_events_preinit();
691   if (ret)
692     return ret;
693
694   return 0;
695 }
696
697 void module_register(void) {
698   plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
699   plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
700   plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
701                                NULL);
702   plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
703 }