Merge remote-tracking branch 'github/pr/2258'
[collectd.git] / src / utils_dpdk.c
1 /*
2  * collectd - src/utils_dpdk.c
3  * MIT License
4  *
5  * Copyright(c) 2016 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Taras Chornyi <tarasx.chornyi@intel.com>
29  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
30  *   Krzysztof Matczak <krzysztofx.matczak@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include <poll.h>
36 #include <semaphore.h>
37 #include <sys/mman.h>
38
39 #include <rte_config.h>
40 #include <rte_eal.h>
41 #include <rte_ethdev.h>
42
43 #include "common.h"
44 #include "utils_dpdk.h"
45
46 #define DPDK_DEFAULT_RTE_CONFIG "/var/run/.rte_config"
47 #define DPDK_EAL_ARGC 5
48 #define DPDK_MAX_BUFFER_SIZE (4096 * 4)
49 #define DPDK_CDM_DEFAULT_TIMEOUT 10000
50
51 enum DPDK_HELPER_STATUS {
52   DPDK_HELPER_NOT_INITIALIZED = 0,
53   DPDK_HELPER_INITIALIZING,
54   DPDK_HELPER_WAITING_ON_PRIMARY,
55   DPDK_HELPER_INITIALIZING_EAL,
56   DPDK_HELPER_ALIVE_SENDING_EVENTS,
57   DPDK_HELPER_GRACEFUL_QUIT,
58 };
59
60 #define DPDK_HELPER_TRACE(_name)                                               \
61   DEBUG("%s:%s:%d pid=%ld", _name, __FUNCTION__, __LINE__, (long)getpid())
62
63 struct dpdk_helper_ctx_s {
64
65   dpdk_eal_config_t eal_config;
66   int eal_initialized;
67
68   size_t shm_size;
69   char shm_name[DATA_MAX_NAME_LEN];
70
71   sem_t sema_cmd_start;
72   sem_t sema_cmd_complete;
73   cdtime_t cmd_wait_time;
74
75   pid_t pid;
76   int pipes[2];
77   int status;
78
79   int cmd;
80   int cmd_result;
81
82   char priv_data[];
83 };
84
85 static int dpdk_shm_init(const char *name, size_t size, void **map);
86 static void dpdk_shm_cleanup(const char *name, size_t size, void *map);
87
88 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc);
89 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc);
90 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc);
91 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid);
92 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
93                                     enum DPDK_HELPER_STATUS status);
94 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
95                             enum DPDK_HELPER_STATUS status);
96 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc);
97 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc);
98 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status);
99
100 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc) {
101   if (phc == NULL)
102     return;
103
104   DPDK_HELPER_TRACE(phc->shm_name);
105
106   snprintf(phc->eal_config.coremask, DATA_MAX_NAME_LEN, "%s", "0xf");
107   snprintf(phc->eal_config.memory_channels, DATA_MAX_NAME_LEN, "%s", "1");
108   snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN, "%s",
109            DPDK_DEFAULT_RTE_CONFIG);
110 }
111
112 int dpdk_helper_eal_config_set(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
113   if (phc == NULL) {
114     ERROR("Invalid argument (phc)");
115     return -EINVAL;
116   }
117
118   DPDK_HELPER_TRACE(phc->shm_name);
119
120   if (ec == NULL) {
121     ERROR("Invalid argument (ec)");
122     return -EINVAL;
123   }
124
125   memcpy(&phc->eal_config, ec, sizeof(phc->eal_config));
126
127   return 0;
128 }
129
130 int dpdk_helper_eal_config_get(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
131   if (phc == NULL) {
132     ERROR("Invalid argument (phc)");
133     return -EINVAL;
134   }
135
136   DPDK_HELPER_TRACE(phc->shm_name);
137
138   if (ec == NULL) {
139     ERROR("Invalid argument (ec)");
140     return -EINVAL;
141   }
142
143   memcpy(ec, &phc->eal_config, sizeof(*ec));
144
145   return 0;
146 }
147
148 int dpdk_helper_eal_config_parse(dpdk_helper_ctx_t *phc, oconfig_item_t *ci) {
149   DPDK_HELPER_TRACE(phc->shm_name);
150
151   if (phc == NULL) {
152     ERROR("Invalid argument (phc)");
153     return -EINVAL;
154   }
155
156   if (ci == NULL) {
157     ERROR("Invalid argument (ci)");
158     return -EINVAL;
159   }
160
161   int status = 0;
162   for (int i = 0; i < ci->children_num; i++) {
163     oconfig_item_t *child = ci->children + i;
164
165     if (strcasecmp("Coremask", child->key) == 0) {
166       status = cf_util_get_string_buffer(child, phc->eal_config.coremask,
167                                          sizeof(phc->eal_config.coremask));
168       DEBUG("dpdk_common: EAL:Coremask %s", phc->eal_config.coremask);
169     } else if (strcasecmp("MemoryChannels", child->key) == 0) {
170       status =
171           cf_util_get_string_buffer(child, phc->eal_config.memory_channels,
172                                     sizeof(phc->eal_config.memory_channels));
173       DEBUG("dpdk_common: EAL:Memory Channels %s",
174             phc->eal_config.memory_channels);
175     } else if (strcasecmp("SocketMemory", child->key) == 0) {
176       status = cf_util_get_string_buffer(child, phc->eal_config.socket_memory,
177                                          sizeof(phc->eal_config.socket_memory));
178       DEBUG("dpdk_common: EAL:Socket memory %s", phc->eal_config.socket_memory);
179     } else if (strcasecmp("FilePrefix", child->key) == 0) {
180       char prefix[DATA_MAX_NAME_LEN];
181
182       status = cf_util_get_string_buffer(child, prefix, sizeof(prefix));
183       if (status == 0) {
184         snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN,
185                   "/var/run/.%s_config", prefix);
186         DEBUG("dpdk_common: EAL:File prefix %s", phc->eal_config.file_prefix);
187       }
188     } else {
189       ERROR("dpdk_common: Invalid '%s' configuration option", child->key);
190       status = -EINVAL;
191     }
192
193     if (status != 0) {
194       ERROR("dpdk_common: Parsing EAL configuration failed");
195       break;
196     }
197   }
198
199   return status;
200 }
201
202 static int dpdk_shm_init(const char *name, size_t size, void **map) {
203   DPDK_HELPER_TRACE(name);
204
205   char errbuf[ERR_BUF_SIZE];
206
207   int fd = shm_open(name, O_CREAT | O_TRUNC | O_RDWR, 0666);
208   if (fd < 0) {
209     WARNING("dpdk_shm_init: Failed to open %s as SHM:%s", name,
210             sstrerror(errno, errbuf, sizeof(errbuf)));
211     *map = NULL;
212     return -1;
213   }
214
215   int ret = ftruncate(fd, size);
216   if (ret != 0) {
217     WARNING("dpdk_shm_init: Failed to resize SHM:%s",
218             sstrerror(errno, errbuf, sizeof(errbuf)));
219     close(fd);
220     *map = NULL;
221     dpdk_shm_cleanup(name, size, NULL);
222     return -1;
223   }
224
225   *map = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
226   if (*map == MAP_FAILED) {
227     WARNING("dpdk_shm_init:Failed to mmap SHM:%s",
228             sstrerror(errno, errbuf, sizeof(errbuf)));
229     close(fd);
230     *map = NULL;
231     dpdk_shm_cleanup(name, size, NULL);
232     return -1;
233   }
234   /* File descriptor no longer needed for shared memory operations */
235   close(fd);
236   memset(*map, 0, size);
237
238   return 0;
239 }
240
241 static void dpdk_shm_cleanup(const char *name, size_t size, void *map) {
242   DPDK_HELPER_TRACE(name);
243   char errbuf[ERR_BUF_SIZE];
244
245   /*
246    * Call shm_unlink first, as 'name' might be no longer accessible after munmap
247    */
248   if (shm_unlink(name))
249     ERROR("shm_unlink failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
250
251   if (map != NULL) {
252     if (munmap(map, size))
253       ERROR("munmap failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
254   }
255 }
256
257 void *dpdk_helper_priv_get(dpdk_helper_ctx_t *phc) {
258   if (phc)
259     return phc->priv_data;
260
261   return NULL;
262 }
263
264 int dpdk_helper_data_size_get(dpdk_helper_ctx_t *phc) {
265   if (phc == NULL) {
266     DPDK_CHILD_LOG("Invalid argument(phc)\n");
267     return -EINVAL;
268   }
269
270   return phc->shm_size - sizeof(dpdk_helper_ctx_t);
271 }
272
273 int dpdk_helper_init(const char *name, size_t data_size,
274                      dpdk_helper_ctx_t **pphc) {
275   dpdk_helper_ctx_t *phc = NULL;
276   size_t shm_size = sizeof(dpdk_helper_ctx_t) + data_size;
277   char errbuf[ERR_BUF_SIZE];
278
279   if (pphc == NULL) {
280     ERROR("%s:Invalid argument(pphc)", __FUNCTION__);
281     return -EINVAL;
282   }
283
284   if (name == NULL) {
285     ERROR("%s:Invalid argument(name)", __FUNCTION__);
286     return -EINVAL;
287   }
288
289   DPDK_HELPER_TRACE(name);
290
291   /* Allocate dpdk_helper_ctx_t and
292   * initialize a POSIX SHared Memory (SHM) object.
293   */
294   int err = dpdk_shm_init(name, shm_size, (void **)&phc);
295   if (err != 0) {
296     return -errno;
297   }
298
299   err = sem_init(&phc->sema_cmd_start, 1, 0);
300   if (err != 0) {
301     ERROR("sema_cmd_start semaphore init failed: %s",
302           sstrerror(errno, errbuf, sizeof(errbuf)));
303     int errno_m = errno;
304     dpdk_shm_cleanup(name, shm_size, (void *)phc);
305     return -errno_m;
306   }
307
308   err = sem_init(&phc->sema_cmd_complete, 1, 0);
309   if (err != 0) {
310     ERROR("sema_cmd_complete semaphore init failed: %s",
311           sstrerror(errno, errbuf, sizeof(errbuf)));
312     sem_destroy(&phc->sema_cmd_start);
313     int errno_m = errno;
314     dpdk_shm_cleanup(name, shm_size, (void *)phc);
315     return -errno_m;
316   }
317
318   phc->shm_size = shm_size;
319   sstrncpy(phc->shm_name, name, sizeof(phc->shm_name));
320
321   dpdk_helper_config_default(phc);
322
323   *pphc = phc;
324
325   return 0;
326 }
327
328 void dpdk_helper_shutdown(dpdk_helper_ctx_t *phc) {
329   if (phc == NULL)
330     return;
331
332   DPDK_HELPER_TRACE(phc->shm_name);
333
334   close(phc->pipes[1]);
335
336   if (phc->status != DPDK_HELPER_NOT_INITIALIZED) {
337     dpdk_helper_exit_command(phc, DPDK_HELPER_GRACEFUL_QUIT);
338   }
339
340   sem_destroy(&phc->sema_cmd_start);
341   sem_destroy(&phc->sema_cmd_complete);
342   dpdk_shm_cleanup(phc->shm_name, phc->shm_size, (void *)phc);
343 }
344
345 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc) {
346   char errbuf[ERR_BUF_SIZE];
347   if (phc == NULL) {
348     ERROR("Invalid argument(phc)");
349     return -EINVAL;
350   }
351
352   DPDK_HELPER_TRACE(phc->shm_name);
353
354   phc->eal_initialized = 0;
355   phc->cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
356
357   /*
358    * Create a pipe for helper stdout back to collectd. This is necessary for
359    * logging EAL failures, as rte_eal_init() calls rte_panic().
360    */
361   if (phc->pipes[1]) {
362     DEBUG("dpdk_helper_spawn: collectd closing helper pipe %d", phc->pipes[1]);
363   } else {
364     DEBUG("dpdk_helper_spawn: collectd helper pipe %d, not closing",
365           phc->pipes[1]);
366   }
367
368   if (pipe(phc->pipes) != 0) {
369     DEBUG("dpdk_helper_spawn: Could not create helper pipe: %s",
370           sstrerror(errno, errbuf, sizeof(errbuf)));
371     return -1;
372   }
373
374   int pipe0_flags = fcntl(phc->pipes[0], F_GETFL, 0);
375   int pipe1_flags = fcntl(phc->pipes[1], F_GETFL, 0);
376   if (pipe0_flags == -1 || pipe1_flags == -1) {
377     WARNING("dpdk_helper_spawn: error setting up pipe flags: %s",
378             sstrerror(errno, errbuf, sizeof(errbuf)));
379   }
380   int pipe0_err = fcntl(phc->pipes[0], F_SETFL, pipe1_flags | O_NONBLOCK);
381   int pipe1_err = fcntl(phc->pipes[1], F_SETFL, pipe0_flags | O_NONBLOCK);
382   if (pipe0_err == -1 || pipe1_err == -1) {
383     WARNING("dpdk_helper_spawn: error setting up pipes: %s",
384             sstrerror(errno, errbuf, sizeof(errbuf)));
385   }
386
387   pid_t pid = fork();
388   if (pid > 0) {
389     phc->pid = pid;
390     close(phc->pipes[1]);
391     DEBUG("%s:dpdk_helper_spawn: helper pid %lu", phc->shm_name,
392           (long)phc->pid);
393   } else if (pid == 0) {
394     /* Replace stdout with a pipe to collectd. */
395     close(phc->pipes[0]);
396     close(STDOUT_FILENO);
397     dup2(phc->pipes[1], STDOUT_FILENO);
398     DPDK_CHILD_TRACE(phc->shm_name);
399     dpdk_helper_worker(phc);
400     exit(0);
401   } else {
402     ERROR("dpdk_helper_start: Failed to fork helper process: %s",
403           sstrerror(errno, errbuf, sizeof(errbuf)));
404     return -1;
405   }
406
407   return 0;
408 }
409
410 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
411                             enum DPDK_HELPER_STATUS status) {
412   DPDK_CHILD_LOG("%s:%s:%d %s\n", phc->shm_name, __FUNCTION__, __LINE__,
413                  dpdk_helper_status_str(status));
414
415   close(phc->pipes[1]);
416
417   phc->status = status;
418
419   exit(0);
420
421   return 0;
422 }
423
424 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
425                                     enum DPDK_HELPER_STATUS status) {
426   char errbuf[ERR_BUF_SIZE];
427   DPDK_HELPER_TRACE(phc->shm_name);
428
429   close(phc->pipes[1]);
430
431   if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
432     phc->status = status;
433     DEBUG("%s:%s:%d %s", phc->shm_name, __FUNCTION__, __LINE__,
434           dpdk_helper_status_str(status));
435
436     int ret = dpdk_helper_command(phc, DPDK_CMD_QUIT, NULL, 0);
437     if (ret != 0) {
438       DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
439             __LINE__, (long)phc->pid);
440
441       int err = kill(phc->pid, SIGKILL);
442       if (err) {
443         ERROR("%s error sending kill to helper: %s", __FUNCTION__,
444               sstrerror(errno, errbuf, sizeof(errbuf)));
445       }
446     }
447   } else {
448
449     DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
450           __LINE__, (long)phc->pid);
451
452     int err = kill(phc->pid, SIGKILL);
453     if (err) {
454       ERROR("%s error sending kill to helper: %s", __FUNCTION__,
455             sstrerror(errno, errbuf, sizeof(errbuf)));
456     }
457   }
458
459   return 0;
460 }
461
462 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc) {
463   phc->status = DPDK_HELPER_INITIALIZING_EAL;
464   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (start)\n",
465                  phc->shm_name, __FUNCTION__, __LINE__);
466
467   char *argp[DPDK_EAL_ARGC * 2 + 1];
468   int argc = 0;
469
470   /* EAL config must be initialized */
471   assert(phc->eal_config.coremask[0] != 0);
472   assert(phc->eal_config.memory_channels[0] != 0);
473   assert(phc->eal_config.file_prefix[0] != 0);
474
475   argp[argc++] = "collectd-dpdk";
476
477   argp[argc++] = "-c";
478   argp[argc++] = phc->eal_config.coremask;
479
480   argp[argc++] = "-n";
481   argp[argc++] = phc->eal_config.memory_channels;
482
483   if (strcasecmp(phc->eal_config.socket_memory, "") != 0) {
484     argp[argc++] = "--socket-mem";
485     argp[argc++] = phc->eal_config.socket_memory;
486   }
487
488   if (strcasecmp(phc->eal_config.file_prefix, DPDK_DEFAULT_RTE_CONFIG) != 0) {
489     argp[argc++] = "--file-prefix";
490     argp[argc++] = phc->eal_config.file_prefix;
491   }
492
493   argp[argc++] = "--proc-type";
494   argp[argc++] = "secondary";
495
496   assert(argc <= (DPDK_EAL_ARGC * 2 + 1));
497
498   int ret = rte_eal_init(argc, argp);
499
500   if (ret < 0) {
501
502     phc->eal_initialized = 0;
503
504     DPDK_CHILD_LOG("dpdk_helper_eal_init: ERROR initializing EAL ret=%d\n",
505                    ret);
506
507     printf("dpdk_helper_eal_init: EAL arguments: ");
508     for (int i = 0; i < argc; i++) {
509       printf("%s ", argp[i]);
510     }
511     printf("\n");
512
513     return ret;
514   }
515
516   phc->eal_initialized = 1;
517
518   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (done)\n",
519                  phc->shm_name, __FUNCTION__, __LINE__);
520
521   return 0;
522 }
523
524 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid) {
525   DPDK_CHILD_TRACE(phc->shm_name);
526
527   struct timespec ts;
528   cdtime_t now = cdtime();
529   cdtime_t cmd_wait_time = MS_TO_CDTIME_T(1500) + phc->cmd_wait_time * 2;
530   ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
531
532   int ret = sem_timedwait(&phc->sema_cmd_start, &ts);
533   DPDK_CHILD_LOG("%s:%s:%d pid=%lu got sema_cmd_start (ret=%d, errno=%d)\n",
534                  phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), ret,
535                  errno);
536
537   if (phc->cmd == DPDK_CMD_QUIT) {
538     DPDK_CHILD_LOG("%s:%s:%d pid=%lu exiting\n", phc->shm_name, __FUNCTION__,
539                    __LINE__, (long)getpid());
540     exit(0);
541   } else if (ret == -1 && errno == ETIMEDOUT) {
542     if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
543       DPDK_CHILD_LOG("%s:dpdk_helper_cmd_wait: sem timedwait()"
544                      " timeout, did collectd terminate?\n",
545                      phc->shm_name);
546       dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
547     }
548   }
549 #if COLLECT_DEBUG
550   int val = 0;
551   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
552     DPDK_CHILD_LOG("%s:%s:%d pid=%lu wait sema_cmd_start (value=%d)\n",
553                    phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), val);
554 #endif
555
556   /* Parent PID change means collectd died so quit the helper process. */
557   if (ppid != getppid()) {
558     DPDK_CHILD_LOG("dpdk_helper_cmd_wait: parent PID changed, quitting.\n");
559     dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
560   }
561
562   /* Checking for DPDK primary process. */
563   if (!rte_eal_primary_proc_alive(phc->eal_config.file_prefix)) {
564     if (phc->eal_initialized) {
565       DPDK_CHILD_LOG(
566           "%s:dpdk_helper_cmd_wait: no primary alive but EAL initialized:"
567           " quitting.\n",
568           phc->shm_name);
569       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
570     }
571
572     phc->status = DPDK_HELPER_WAITING_ON_PRIMARY;
573     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_WAITING_ON_PRIMARY\n", phc->shm_name,
574                    __FUNCTION__, __LINE__);
575
576     return -1;
577   }
578
579   if (!phc->eal_initialized) {
580     int ret = dpdk_helper_eal_init(phc);
581     if (ret != 0) {
582       DPDK_CHILD_LOG("Error initializing EAL\n");
583       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
584     }
585     phc->status = DPDK_HELPER_ALIVE_SENDING_EVENTS;
586     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_ALIVE_SENDING_EVENTS\n", phc->shm_name,
587                    __FUNCTION__, __LINE__);
588     return -1;
589   }
590
591   return 0;
592 }
593
594 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc) {
595   DPDK_CHILD_TRACE(phc->shm_name);
596
597   pid_t ppid = getppid();
598
599   while (1) {
600     if (dpdk_helper_cmd_wait(phc, ppid) == 0) {
601       DPDK_CHILD_LOG("%s:%s:%d DPDK command handle (cmd=%d, pid=%lu)\n",
602                      phc->shm_name, __FUNCTION__, __LINE__, phc->cmd,
603                      (long)getpid());
604       phc->cmd_result = dpdk_helper_command_handler(phc, phc->cmd);
605     } else {
606       phc->cmd_result = -1;
607     }
608
609     /* now kick collectd to get results */
610     int err = sem_post(&phc->sema_cmd_complete);
611     DPDK_CHILD_LOG("%s:%s:%d post sema_cmd_complete (pid=%lu)\n", phc->shm_name,
612                    __FUNCTION__, __LINE__, (long)getpid());
613     if (err) {
614       char errbuf[ERR_BUF_SIZE];
615       DPDK_CHILD_LOG("dpdk_helper_worker: error posting sema_cmd_complete "
616                      "semaphore (%s)\n",
617                      sstrerror(errno, errbuf, sizeof(errbuf)));
618     }
619
620 #if COLLECT_DEBUG
621     int val = 0;
622     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
623       DPDK_CHILD_LOG("%s:%s:%d pid=%lu sema_cmd_complete (value=%d)\n",
624                      phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(),
625                      val);
626 #endif
627
628   } /* while(1) */
629
630   return 0;
631 }
632
633 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status) {
634   switch (status) {
635   case DPDK_HELPER_ALIVE_SENDING_EVENTS:
636     return "DPDK_HELPER_ALIVE_SENDING_EVENTS";
637   case DPDK_HELPER_WAITING_ON_PRIMARY:
638     return "DPDK_HELPER_WAITING_ON_PRIMARY";
639   case DPDK_HELPER_INITIALIZING:
640     return "DPDK_HELPER_INITIALIZING";
641   case DPDK_HELPER_INITIALIZING_EAL:
642     return "DPDK_HELPER_INITIALIZING_EAL";
643   case DPDK_HELPER_GRACEFUL_QUIT:
644     return "DPDK_HELPER_GRACEFUL_QUIT";
645   case DPDK_HELPER_NOT_INITIALIZED:
646     return "DPDK_HELPER_NOT_INITIALIZED";
647   default:
648     return "UNKNOWN";
649   }
650 }
651
652 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc) {
653   DEBUG("%s:%s:%d pid=%u %s", phc->shm_name, __FUNCTION__, __LINE__, getpid(),
654         dpdk_helper_status_str(phc->status));
655   char errbuf[ERR_BUF_SIZE];
656
657   if (phc->status == DPDK_HELPER_GRACEFUL_QUIT) {
658     return 0;
659   } else if (phc->status == DPDK_HELPER_NOT_INITIALIZED) {
660     phc->status = DPDK_HELPER_INITIALIZING;
661     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
662           __LINE__);
663     int err = dpdk_helper_spawn(phc);
664     if (err) {
665       ERROR("dpdkstat: error spawning helper %s",
666             sstrerror(errno, errbuf, sizeof(errbuf)));
667     }
668     return -1;
669   }
670
671   pid_t ws = waitpid(phc->pid, NULL, WNOHANG);
672   if (ws != 0) {
673     phc->status = DPDK_HELPER_INITIALIZING;
674     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
675           __LINE__);
676     int err = dpdk_helper_spawn(phc);
677     if (err) {
678       ERROR("dpdkstat: error spawning helper %s",
679             sstrerror(errno, errbuf, sizeof(errbuf)));
680     }
681     return -1;
682   }
683
684   if (phc->status == DPDK_HELPER_INITIALIZING_EAL) {
685     return -1;
686   }
687
688   return 0;
689 }
690
691 static void dpdk_helper_check_pipe(dpdk_helper_ctx_t *phc) {
692   char buf[DPDK_MAX_BUFFER_SIZE];
693   char out[DPDK_MAX_BUFFER_SIZE];
694
695   /* non blocking check on helper logging pipe */
696   struct pollfd fds = {
697       .fd = phc->pipes[0], .events = POLLIN,
698   };
699   int data_avail = poll(&fds, 1, 0);
700   if (data_avail < 0) {
701     if (errno != EINTR || errno != EAGAIN) {
702       char errbuf[ERR_BUF_SIZE];
703       ERROR("%s: poll(2) failed: %s", phc->shm_name,
704             sstrerror(errno, errbuf, sizeof(errbuf)));
705     }
706   }
707   while (data_avail) {
708     int nbytes = read(phc->pipes[0], buf, sizeof(buf));
709     if (nbytes <= 0)
710       break;
711     sstrncpy(out, buf, nbytes);
712     DEBUG("%s: helper process:\n%s", phc->shm_name, out);
713   }
714 }
715
716 int dpdk_helper_command(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd, int *result,
717                         cdtime_t cmd_wait_time) {
718   if (phc == NULL) {
719     ERROR("Invalid argument(phc)");
720     return -EINVAL;
721   }
722
723   DEBUG("%s:%s:%d pid=%lu, cmd=%d", phc->shm_name, __FUNCTION__, __LINE__,
724         (long)getpid(), cmd);
725
726   phc->cmd_wait_time = cmd_wait_time;
727
728   int ret = dpdk_helper_status_check(phc);
729
730   dpdk_helper_check_pipe(phc);
731
732   if (ret != 0) {
733     return ret;
734   }
735
736   DEBUG("%s: DPDK command execute (cmd=%d)", phc->shm_name, cmd);
737
738   phc->cmd_result = 0;
739   phc->cmd = cmd;
740
741   /* kick helper to process command */
742   int err = sem_post(&phc->sema_cmd_start);
743   if (err) {
744     char errbuf[ERR_BUF_SIZE];
745     ERROR("dpdk_helper_worker: error posting sema_cmd_start semaphore (%s)",
746           sstrerror(errno, errbuf, sizeof(errbuf)));
747   }
748
749 #if COLLECT_DEBUG
750   int val = 0;
751   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
752     DEBUG("%s:dpdk_helper_command: post sema_cmd_start (value=%d)",
753           phc->shm_name, val);
754 #endif
755
756   if (phc->cmd != DPDK_CMD_QUIT) {
757
758     /* wait for helper to complete processing */
759     struct timespec ts;
760     cdtime_t now = cdtime();
761
762     if (phc->status != DPDK_HELPER_ALIVE_SENDING_EVENTS) {
763       cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
764     }
765
766     ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
767     ret = sem_timedwait(&phc->sema_cmd_complete, &ts);
768     if (ret == -1 && errno == ETIMEDOUT) {
769       DPDK_HELPER_TRACE(phc->shm_name);
770       DEBUG("%s:sema_cmd_start: timeout in collectd thread: is a DPDK Primary "
771             "running?",
772             phc->shm_name);
773       return -ETIMEDOUT;
774     }
775
776 #if COLLECT_DEBUG
777     val = 0;
778     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
779       DEBUG("%s:dpdk_helper_command: wait sema_cmd_complete (value=%d)",
780             phc->shm_name, val);
781 #endif
782
783     if (result) {
784       *result = phc->cmd_result;
785     }
786   }
787
788   dpdk_helper_check_pipe(phc);
789
790   DEBUG("%s: DPDK command complete (cmd=%d, result=%d)", phc->shm_name,
791         phc->cmd, phc->cmd_result);
792
793   return 0;
794 }
795
796 uint64_t strtoull_safe(const char *str, int *err) {
797   uint64_t val = 0;
798   char *endptr;
799   int res = 0;
800
801   val = strtoull(str, &endptr, 16);
802   if (*endptr) {
803     ERROR("%s Failed to parse the value %s, endptr=%c", __FUNCTION__, str,
804           *endptr);
805     res = -EINVAL;
806   }
807   if (err != NULL)
808     *err = res;
809   return val;
810 }
811
812 uint128_t str_to_uint128(const char *str, int len) {
813   uint128_t lcore_mask;
814   int err = 0;
815
816   memset(&lcore_mask, 0, sizeof(lcore_mask));
817
818   if (len <= 2 || strncmp(str, "0x", 2) != 0) {
819     ERROR("%s Value %s should be represened in hexadecimal format",
820           __FUNCTION__, str);
821     return lcore_mask;
822   }
823   /* If str is <= 64 bit long ('0x' + 16 chars = 18 chars) then
824    * conversion is straightforward. Otherwise str is splitted into 64b long
825    * blocks */
826   if (len <= 18) {
827     lcore_mask.low = strtoull_safe(str, &err);
828     if (err)
829       return lcore_mask;
830   } else {
831     char low_str[DATA_MAX_NAME_LEN];
832     char high_str[DATA_MAX_NAME_LEN];
833
834     memset(high_str, 0, sizeof(high_str));
835     memset(low_str, 0, sizeof(low_str));
836
837     strncpy(high_str, str, len - 16);
838     strncpy(low_str, str + len - 16, 16);
839
840     lcore_mask.low = strtoull_safe(low_str, &err);
841     if (err)
842       return lcore_mask;
843
844     lcore_mask.high = strtoull_safe(high_str, &err);
845     if (err) {
846       lcore_mask.low = 0;
847       return lcore_mask;
848     }
849   }
850   return lcore_mask;
851 }
852
853 uint8_t dpdk_helper_eth_dev_count() {
854   uint8_t ports = rte_eth_dev_count();
855   if (ports == 0) {
856     ERROR(
857         "%s:%d: No DPDK ports available. Check bound devices to DPDK driver.\n",
858         __FUNCTION__, __LINE__);
859     return ports;
860   }
861
862   if (ports > RTE_MAX_ETHPORTS) {
863     ERROR("%s:%d: Number of DPDK ports (%u) is greater than "
864           "RTE_MAX_ETHPORTS=%d. Ignoring extra ports\n",
865           __FUNCTION__, __LINE__, ports, RTE_MAX_ETHPORTS);
866     ports = RTE_MAX_ETHPORTS;
867   }
868
869   return ports;
870 }