Merge pull request #2618 from ajssmith/amqp1_dev1_branch
[collectd.git] / src / dpdkevents.c
1 /*
2  * collectd - src/dpdkevents.c
3  * MIT License
4  *
5  * Copyright(c) 2017 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  *   Kim-Marie Jones <kim-marie.jones@intel.com>
30  *   Krzysztof Matczak <krzysztofx@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include "common.h"
36 #include "plugin.h"
37
38 #include "semaphore.h"
39 #include "sys/mman.h"
40 #include "utils_dpdk.h"
41 #include "utils_time.h"
42
43 #include <rte_config.h>
44 #include <rte_eal.h>
45 #include <rte_ethdev.h>
46 #include <rte_keepalive.h>
47
48 #define DPDK_EVENTS_PLUGIN "dpdkevents"
49 #define DPDK_EVENTS_NAME "dpdk_collectd_events"
50 #define ETH_LINK_NA 0xFF
51
52 #define INT64_BIT_SIZE 64
53 #define KEEPALIVE_PLUGIN_INSTANCE "keepalive"
54 #define RTE_KEEPALIVE_SHM_NAME "/dpdk_keepalive_shm_name"
55
56 typedef struct dpdk_keepalive_shm_s {
57   sem_t core_died;
58   enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
59   uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
60 } dpdk_keepalive_shm_t;
61
62 typedef struct dpdk_ka_monitor_s {
63   cdtime_t read_time;
64   int lcore_state;
65 } dpdk_ka_monitor_t;
66
67 typedef struct dpdk_link_status_config_s {
68   int enabled;
69   bool send_updated;
70   uint32_t enabled_port_mask;
71   char port_name[RTE_MAX_ETHPORTS][DATA_MAX_NAME_LEN];
72   bool notify;
73 } dpdk_link_status_config_t;
74
75 typedef struct dpdk_keep_alive_config_s {
76   int enabled;
77   bool send_updated;
78   uint128_t lcore_mask;
79   dpdk_keepalive_shm_t *shm;
80   char shm_name[DATA_MAX_NAME_LEN];
81   bool notify;
82   int fd;
83 } dpdk_keep_alive_config_t;
84
85 typedef struct dpdk_events_config_s {
86   cdtime_t interval;
87   dpdk_link_status_config_t link_status;
88   dpdk_keep_alive_config_t keep_alive;
89 } dpdk_events_config_t;
90
91 typedef struct dpdk_link_info_s {
92   cdtime_t read_time;
93   int status_updated;
94   int link_status;
95 } dpdk_link_info_t;
96
97 typedef struct dpdk_events_ctx_s {
98   dpdk_events_config_t config;
99   uint32_t nb_ports;
100   dpdk_link_info_t link_info[RTE_MAX_ETHPORTS];
101   dpdk_ka_monitor_t core_info[RTE_KEEPALIVE_MAXCORES];
102 } dpdk_events_ctx_t;
103
104 typedef enum {
105   DPDK_EVENTS_STATE_CFG_ERR = 1 << 0,
106   DPDK_EVENTS_STATE_KA_CFG_ERR = 1 << 1,
107   DPDK_EVENTS_STATE_LS_CFG_ERR = 1 << 2,
108
109 } dpdk_events_cfg_status;
110
111 #define DPDK_EVENTS_CTX_GET(a) ((dpdk_events_ctx_t *)dpdk_helper_priv_get(a))
112
113 #define DPDK_EVENTS_TRACE()                                                    \
114   DEBUG("%s:%s:%d pid=%u", DPDK_EVENTS_PLUGIN, __FUNCTION__, __LINE__, getpid())
115
116 static dpdk_helper_ctx_t *g_hc;
117 static dpdk_events_cfg_status g_state;
118
119 static int dpdk_event_keep_alive_shm_open(void) {
120   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
121   char *shm_name;
122
123   if (strlen(ec->config.keep_alive.shm_name)) {
124     shm_name = ec->config.keep_alive.shm_name;
125   } else {
126     shm_name = RTE_KEEPALIVE_SHM_NAME;
127     WARNING(DPDK_EVENTS_PLUGIN ": Keep alive shared memory identifier is not "
128                                "specified, using default one: %s",
129             shm_name);
130   }
131
132   int fd = shm_open(shm_name, O_RDONLY, 0);
133   if (fd < 0) {
134     ERROR(DPDK_EVENTS_PLUGIN ": Failed to open %s as SHM:%s. Is DPDK KA "
135                              "primary application running?",
136           shm_name, STRERRNO);
137     return errno;
138   }
139
140   if (ec->config.keep_alive.fd != -1) {
141     struct stat stat_old, stat_new;
142
143     if (fstat(ec->config.keep_alive.fd, &stat_old) || fstat(fd, &stat_new)) {
144       ERROR(DPDK_EVENTS_PLUGIN ": failed to get information about a file");
145       close(fd);
146       return -1;
147     }
148
149     /* Check if inode number has changed. If yes, then create a new mapping */
150     if (stat_old.st_ino == stat_new.st_ino) {
151       close(fd);
152       return 0;
153     }
154
155     if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t)) != 0) {
156       ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
157       close(fd);
158       return -1;
159     }
160
161     close(ec->config.keep_alive.fd);
162     ec->config.keep_alive.fd = -1;
163   }
164
165   ec->config.keep_alive.shm = (dpdk_keepalive_shm_t *)mmap(
166       0, sizeof(*(ec->config.keep_alive.shm)), PROT_READ, MAP_SHARED, fd, 0);
167   if (ec->config.keep_alive.shm == MAP_FAILED) {
168     ERROR(DPDK_EVENTS_PLUGIN ": Failed to mmap KA SHM:%s", STRERRNO);
169     close(fd);
170     return errno;
171   }
172   ec->config.keep_alive.fd = fd;
173
174   return 0;
175 }
176
177 static void dpdk_events_default_config(void) {
178   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
179
180   ec->config.interval = plugin_get_interval();
181
182   /* In configless mode when no <Plugin/> section is defined in config file
183    * both link_status and keep_alive should be enabled */
184
185   /* Link Status */
186   ec->config.link_status.enabled = 1;
187   ec->config.link_status.enabled_port_mask = ~0;
188   ec->config.link_status.send_updated = true;
189   ec->config.link_status.notify = false;
190
191   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
192     ec->config.link_status.port_name[i][0] = 0;
193   }
194
195   /* Keep Alive */
196   ec->config.keep_alive.enabled = 1;
197   ec->config.keep_alive.send_updated = true;
198   ec->config.keep_alive.notify = false;
199   /* by default enable 128 cores */
200   memset(&ec->config.keep_alive.lcore_mask, 1,
201          sizeof(ec->config.keep_alive.lcore_mask));
202   memset(&ec->config.keep_alive.shm_name, 0,
203          sizeof(ec->config.keep_alive.shm_name));
204   ec->config.keep_alive.shm = MAP_FAILED;
205   ec->config.keep_alive.fd = -1;
206 }
207
208 static int dpdk_events_preinit(void) {
209   DPDK_EVENTS_TRACE();
210
211   if (g_hc != NULL) {
212     /* already initialized if config callback was called before init callback */
213     DEBUG("dpdk_events_preinit: helper already initialized.");
214     return 0;
215   }
216
217   int ret =
218       dpdk_helper_init(DPDK_EVENTS_NAME, sizeof(dpdk_events_ctx_t), &g_hc);
219   if (ret != 0) {
220     ERROR(DPDK_EVENTS_PLUGIN ": failed to initialize %s helper(error: %s)",
221           DPDK_EVENTS_NAME, strerror(ret));
222     return ret;
223   }
224
225   dpdk_events_default_config();
226
227   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
228   for (int i = 0; i < RTE_MAX_ETHPORTS; i++) {
229     ec->link_info[i].link_status = ETH_LINK_NA;
230   }
231
232   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
233     ec->core_info[i].lcore_state = ETH_LINK_NA;
234   }
235
236   return ret;
237 }
238
239 static int dpdk_events_link_status_config(dpdk_events_ctx_t *ec,
240                                           oconfig_item_t *ci) {
241   ec->config.link_status.enabled = 1;
242
243   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Link Status Events.");
244
245   for (int i = 0; i < ci->children_num; i++) {
246     oconfig_item_t *child = ci->children + i;
247
248     if (strcasecmp("EnabledPortMask", child->key) == 0) {
249       if (cf_util_get_int(child,
250                           (int *)&ec->config.link_status.enabled_port_mask))
251         return -1;
252       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Enabled Port Mask 0x%X",
253             ec->config.link_status.enabled_port_mask);
254     } else if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
255       if (cf_util_get_boolean(child, &ec->config.link_status.send_updated))
256         return -1;
257       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendEventsOnUpdate %d",
258             ec->config.link_status.send_updated);
259     } else if (strcasecmp("SendNotification", child->key) == 0) {
260       if (cf_util_get_boolean(child, &ec->config.link_status.notify))
261         return -1;
262
263       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:SendNotification %d",
264             ec->config.link_status.notify);
265     } else if (strcasecmp("PortName", child->key) != 0) {
266       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
267             child->key);
268       return -1;
269     }
270   }
271
272   int port_num = 0;
273
274   /* parse port names after EnabledPortMask was parsed */
275   for (int i = 0; i < ci->children_num; i++) {
276     oconfig_item_t *child = ci->children + i;
277     if (strcasecmp("PortName", child->key) == 0) {
278       while (!(ec->config.link_status.enabled_port_mask & (1 << port_num)))
279         port_num++;
280
281       if (cf_util_get_string_buffer(
282               child, ec->config.link_status.port_name[port_num],
283               sizeof(ec->config.link_status.port_name[port_num]))) {
284         return -1;
285       }
286       DEBUG(DPDK_EVENTS_PLUGIN ": LinkStatus:Port %d Name: %s", port_num,
287             ec->config.link_status.port_name[port_num]);
288       port_num++;
289     }
290   }
291
292   return 0;
293 }
294
295 static int dpdk_events_keep_alive_config(dpdk_events_ctx_t *ec,
296                                          oconfig_item_t *ci) {
297   ec->config.keep_alive.enabled = 1;
298   DEBUG(DPDK_EVENTS_PLUGIN ": Subscribed for Keep Alive Events.");
299
300   for (int i = 0; i < ci->children_num; i++) {
301     oconfig_item_t *child = ci->children + i;
302
303     if (strcasecmp("SendEventsOnUpdate", child->key) == 0) {
304       if (cf_util_get_boolean(child, &ec->config.keep_alive.send_updated))
305         return -1;
306
307       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendEventsOnUpdate %d",
308             ec->config.keep_alive.send_updated);
309     } else if (strcasecmp("LCoreMask", child->key) == 0) {
310       char lcore_mask[DATA_MAX_NAME_LEN];
311
312       if (cf_util_get_string_buffer(child, lcore_mask, sizeof(lcore_mask)))
313         return -1;
314       ec->config.keep_alive.lcore_mask =
315           str_to_uint128(lcore_mask, strlen(lcore_mask));
316       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:LCoreMask 0x%" PRIX64 "%" PRIX64 "",
317             ec->config.keep_alive.lcore_mask.high,
318             ec->config.keep_alive.lcore_mask.low);
319     } else if (strcasecmp("KeepAliveShmName", child->key) == 0) {
320       if (cf_util_get_string_buffer(child, ec->config.keep_alive.shm_name,
321                                     sizeof(ec->config.keep_alive.shm_name)))
322         return -1;
323
324       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:KeepAliveShmName %s",
325             ec->config.keep_alive.shm_name);
326     } else if (strcasecmp("SendNotification", child->key) == 0) {
327       if (cf_util_get_boolean(child, &ec->config.keep_alive.notify))
328         return -1;
329
330       DEBUG(DPDK_EVENTS_PLUGIN ": KeepAlive:SendNotification %d",
331             ec->config.keep_alive.notify);
332     } else {
333       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
334             child->key);
335       return -1;
336     }
337   }
338
339   return 0;
340 }
341
342 static int dpdk_events_config(oconfig_item_t *ci) {
343   DPDK_EVENTS_TRACE();
344   int ret = dpdk_events_preinit();
345   if (ret) {
346     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
347     return 0;
348   }
349
350   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
351
352   /* Disabling link_status and keep_alive since <Plugin/> config section
353    * specifies if those should be enabled */
354   ec->config.keep_alive.enabled = ec->config.link_status.enabled = 0;
355   memset(&ec->config.keep_alive.lcore_mask, 0,
356          sizeof(ec->config.keep_alive.lcore_mask));
357
358   for (int i = 0; i < ci->children_num; i++) {
359     oconfig_item_t *child = ci->children + i;
360
361     if (strcasecmp("EAL", child->key) == 0)
362       ret = dpdk_helper_eal_config_parse(g_hc, child);
363     else if (strcasecmp("Event", child->key) == 0) {
364       char event_type[DATA_MAX_NAME_LEN];
365
366       if (cf_util_get_string_buffer(child, event_type, sizeof(event_type)))
367         ret = -1;
368       else if (strcasecmp(event_type, "link_status") == 0) {
369         ret = dpdk_events_link_status_config(ec, child);
370         if (ret) {
371           g_state |= DPDK_EVENTS_STATE_LS_CFG_ERR;
372           continue;
373         }
374       } else if (strcasecmp(event_type, "keep_alive") == 0) {
375         ret = dpdk_events_keep_alive_config(ec, child);
376         if (ret) {
377           g_state |= DPDK_EVENTS_STATE_KA_CFG_ERR;
378           continue;
379         }
380       } else {
381         ERROR(DPDK_EVENTS_PLUGIN ": The selected event \"%s\" is unknown.",
382               event_type);
383         ret = -1;
384       }
385     } else {
386       ERROR(DPDK_EVENTS_PLUGIN ": unrecognized configuration option %s.",
387             child->key);
388       ret = -1;
389     }
390
391     if (ret != 0) {
392       g_state |= DPDK_EVENTS_STATE_CFG_ERR;
393       return 0;
394     }
395   }
396
397   if (g_state & DPDK_EVENTS_STATE_KA_CFG_ERR) {
398     ERROR(DPDK_EVENTS_PLUGIN
399           ": Invalid keep alive configuration. Event disabled.");
400     ec->config.keep_alive.enabled = 0;
401   }
402
403   if (g_state & DPDK_EVENTS_STATE_LS_CFG_ERR) {
404     ERROR(DPDK_EVENTS_PLUGIN
405           ": Invalid link status configuration. Event disabled.");
406     ec->config.link_status.enabled = 0;
407   }
408
409   if (!ec->config.keep_alive.enabled && !ec->config.link_status.enabled) {
410     ERROR(DPDK_EVENTS_PLUGIN ": At least one type of events should be "
411                              "configured for collecting. Plugin misconfigured");
412     g_state |= DPDK_EVENTS_STATE_CFG_ERR;
413     return 0;
414   }
415
416   return 0;
417 }
418
419 static int dpdk_helper_link_status_get(dpdk_helper_ctx_t *phc) {
420   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
421
422   /* get Link Status values from DPDK */
423   uint8_t nb_ports = rte_eth_dev_count();
424   if (nb_ports == 0) {
425     DPDK_CHILD_LOG("dpdkevent-helper: No DPDK ports available. "
426                    "Check bound devices to DPDK driver.\n");
427     return -ENODEV;
428   }
429   ec->nb_ports = nb_ports > RTE_MAX_ETHPORTS ? RTE_MAX_ETHPORTS : nb_ports;
430
431   for (int i = 0; i < ec->nb_ports; i++) {
432     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
433       struct rte_eth_link link;
434       ec->link_info[i].read_time = cdtime();
435       rte_eth_link_get_nowait(i, &link);
436       if ((link.link_status == ETH_LINK_NA) ||
437           (link.link_status != ec->link_info[i].link_status)) {
438         ec->link_info[i].link_status = link.link_status;
439         ec->link_info[i].status_updated = 1;
440         DPDK_CHILD_LOG(" === PORT %d Link Status: %s\n", i,
441                        link.link_status ? "UP" : "DOWN");
442       }
443     }
444   }
445
446   return 0;
447 }
448
449 /* this function is called from helper context */
450 int dpdk_helper_command_handler(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd) {
451   if (phc == NULL) {
452     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Invalid argument(phc)\n");
453     return -EINVAL;
454   }
455
456   if (cmd != DPDK_CMD_GET_EVENTS) {
457     DPDK_CHILD_LOG(DPDK_EVENTS_PLUGIN ": Unknown command (cmd=%d)\n", cmd);
458     return -EINVAL;
459   }
460
461   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
462   int ret = 0;
463   if (ec->config.link_status.enabled)
464     ret = dpdk_helper_link_status_get(phc);
465
466   return ret;
467 }
468
469 static void dpdk_events_notification_dispatch(int severity,
470                                               const char *plugin_instance,
471                                               cdtime_t time, const char *msg) {
472   notification_t n = {
473       .severity = severity, .time = time, .plugin = DPDK_EVENTS_PLUGIN};
474   sstrncpy(n.host, hostname_g, sizeof(n.host));
475   sstrncpy(n.plugin_instance, plugin_instance, sizeof(n.plugin_instance));
476   sstrncpy(n.message, msg, sizeof(n.message));
477   plugin_dispatch_notification(&n);
478 }
479
480 static void dpdk_events_gauge_submit(const char *plugin_instance,
481                                      const char *type_instance, gauge_t value,
482                                      cdtime_t time) {
483   value_list_t vl = {.values = &(value_t){.gauge = value},
484                      .values_len = 1,
485                      .time = time,
486                      .plugin = DPDK_EVENTS_PLUGIN,
487                      .type = "gauge",
488                      .meta = NULL};
489   sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
490   sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
491   plugin_dispatch_values(&vl);
492 }
493
494 static int dpdk_events_link_status_dispatch(dpdk_helper_ctx_t *phc) {
495   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
496   DEBUG(DPDK_EVENTS_PLUGIN ": %s:%d ports=%u", __FUNCTION__, __LINE__,
497         ec->nb_ports);
498
499   /* dispatch Link Status values to collectd */
500   for (int i = 0; i < ec->nb_ports; i++) {
501     if (ec->config.link_status.enabled_port_mask & (1 << i)) {
502       if (!ec->config.link_status.send_updated ||
503           ec->link_info[i].status_updated) {
504
505         DEBUG(DPDK_EVENTS_PLUGIN ": Dispatch PORT %d Link Status: %s", i,
506               ec->link_info[i].link_status ? "UP" : "DOWN");
507
508         char dev_name[DATA_MAX_NAME_LEN];
509         if (ec->config.link_status.port_name[i][0] != 0) {
510           snprintf(dev_name, sizeof(dev_name), "%s",
511                    ec->config.link_status.port_name[i]);
512         } else {
513           snprintf(dev_name, sizeof(dev_name), "port.%d", i);
514         }
515
516         if (ec->config.link_status.notify) {
517           int sev = ec->link_info[i].link_status ? NOTIF_OKAY : NOTIF_WARNING;
518           char msg[DATA_MAX_NAME_LEN];
519           snprintf(msg, sizeof(msg), "Link Status: %s",
520                    ec->link_info[i].link_status ? "UP" : "DOWN");
521           dpdk_events_notification_dispatch(sev, dev_name,
522                                             ec->link_info[i].read_time, msg);
523         } else {
524           dpdk_events_gauge_submit(dev_name, "link_status",
525                                    (gauge_t)ec->link_info[i].link_status,
526                                    ec->link_info[i].read_time);
527         }
528         ec->link_info[i].status_updated = 0;
529       }
530     }
531   }
532
533   return 0;
534 }
535
536 static void dpdk_events_keep_alive_dispatch(dpdk_helper_ctx_t *phc) {
537   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(phc);
538
539   /* dispatch Keep Alive values to collectd */
540   for (int i = 0; i < RTE_KEEPALIVE_MAXCORES; i++) {
541     if (i < INT64_BIT_SIZE) {
542       if (!(ec->config.keep_alive.lcore_mask.low & ((uint64_t)1 << i)))
543         continue;
544     } else if (i >= INT64_BIT_SIZE && i < INT64_BIT_SIZE * 2) {
545       if (!(ec->config.keep_alive.lcore_mask.high &
546             ((uint64_t)1 << (i - INT64_BIT_SIZE))))
547         continue;
548     } else {
549       WARNING(DPDK_EVENTS_PLUGIN
550               ": %s:%d Core id %u is out of 0 to %u range, skipping",
551               __FUNCTION__, __LINE__, i, INT64_BIT_SIZE * 2);
552       continue;
553     }
554
555     char core_name[DATA_MAX_NAME_LEN];
556     snprintf(core_name, sizeof(core_name), "lcore%u", i);
557
558     if (!ec->config.keep_alive.send_updated ||
559         (ec->core_info[i].lcore_state !=
560          ec->config.keep_alive.shm->core_state[i])) {
561       ec->core_info[i].lcore_state = ec->config.keep_alive.shm->core_state[i];
562       ec->core_info[i].read_time = cdtime();
563
564       if (ec->config.keep_alive.notify) {
565         char msg[DATA_MAX_NAME_LEN];
566         int sev;
567
568         switch (ec->config.keep_alive.shm->core_state[i]) {
569         case RTE_KA_STATE_ALIVE:
570           sev = NOTIF_OKAY;
571           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: ALIVE", i);
572           break;
573         case RTE_KA_STATE_MISSING:
574           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: MISSING", i);
575           sev = NOTIF_WARNING;
576           break;
577         case RTE_KA_STATE_DEAD:
578           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DEAD", i);
579           sev = NOTIF_FAILURE;
580           break;
581         case RTE_KA_STATE_UNUSED:
582           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNUSED", i);
583           sev = NOTIF_OKAY;
584           break;
585         case RTE_KA_STATE_GONE:
586           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: GONE", i);
587           sev = NOTIF_FAILURE;
588           break;
589         case RTE_KA_STATE_DOZING:
590           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: DOZING", i);
591           sev = NOTIF_OKAY;
592           break;
593         case RTE_KA_STATE_SLEEP:
594           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: SLEEP", i);
595           sev = NOTIF_OKAY;
596           break;
597         default:
598           snprintf(msg, sizeof(msg), "lcore %u Keep Alive Status: UNKNOWN", i);
599           sev = NOTIF_FAILURE;
600         }
601
602         dpdk_events_notification_dispatch(sev, KEEPALIVE_PLUGIN_INSTANCE,
603                                           ec->core_info[i].read_time, msg);
604       } else {
605         dpdk_events_gauge_submit(KEEPALIVE_PLUGIN_INSTANCE, core_name,
606                                  ec->config.keep_alive.shm->core_state[i],
607                                  ec->core_info[i].read_time);
608       }
609     }
610   }
611 }
612
613 static int dpdk_events_read(user_data_t *ud) {
614   DPDK_EVENTS_TRACE();
615
616   if (g_hc == NULL) {
617     ERROR(DPDK_EVENTS_PLUGIN ": plugin not initialized.");
618     return -1;
619   }
620
621   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
622   int ls_ret = -1, ka_ret = -1;
623
624   int cmd_res = 0;
625   if (ec->config.link_status.enabled) {
626     ls_ret = dpdk_helper_command(g_hc, DPDK_CMD_GET_EVENTS, &cmd_res,
627                                  ec->config.interval);
628     if (cmd_res == 0 && ls_ret == 0) {
629       dpdk_events_link_status_dispatch(g_hc);
630     }
631   }
632
633   if (ec->config.keep_alive.enabled) {
634     ka_ret = dpdk_event_keep_alive_shm_open();
635     if (ka_ret) {
636       ERROR(DPDK_EVENTS_PLUGIN
637             ": %s : error %d in dpdk_event_keep_alive_shm_open()",
638             __FUNCTION__, ka_ret);
639     } else
640       dpdk_events_keep_alive_dispatch(g_hc);
641   }
642
643   if (!((cmd_res || ls_ret) == 0 || ka_ret == 0)) {
644     ERROR(DPDK_EVENTS_PLUGIN ": Read failure for all enabled event types");
645     return -1;
646   }
647
648   return 0;
649 }
650
651 static int dpdk_events_shutdown(void) {
652   DPDK_EVENTS_TRACE();
653
654   if (g_hc == NULL)
655     return 0;
656
657   dpdk_events_ctx_t *ec = DPDK_EVENTS_CTX_GET(g_hc);
658   if (ec->config.keep_alive.enabled) {
659     if (ec->config.keep_alive.fd != -1) {
660       close(ec->config.keep_alive.fd);
661       ec->config.keep_alive.fd = -1;
662     }
663
664     if (ec->config.keep_alive.shm != MAP_FAILED) {
665       if (munmap(ec->config.keep_alive.shm, sizeof(dpdk_keepalive_shm_t))) {
666         ERROR(DPDK_EVENTS_PLUGIN ": munmap KA monitor failed");
667         return -1;
668       }
669       ec->config.keep_alive.shm = MAP_FAILED;
670     }
671   }
672
673   dpdk_helper_shutdown(g_hc);
674   g_hc = NULL;
675
676   return 0;
677 }
678
679 static int dpdk_events_init(void) {
680   DPDK_EVENTS_TRACE();
681
682   if (g_state & DPDK_EVENTS_STATE_CFG_ERR) {
683     dpdk_events_shutdown();
684     return -1;
685   }
686
687   int ret = dpdk_events_preinit();
688   if (ret)
689     return ret;
690
691   return 0;
692 }
693
694 void module_register(void) {
695   plugin_register_init(DPDK_EVENTS_PLUGIN, dpdk_events_init);
696   plugin_register_complex_config(DPDK_EVENTS_PLUGIN, dpdk_events_config);
697   plugin_register_complex_read(NULL, DPDK_EVENTS_PLUGIN, dpdk_events_read, 0,
698                                NULL);
699   plugin_register_shutdown(DPDK_EVENTS_PLUGIN, dpdk_events_shutdown);
700 }