dpdkstat plugin rework
[collectd.git] / src / utils_dpdk.c
1 /**
2  * collectd - src/utils_dpdk.c
3  * MIT License
4  *
5  * Copyright(c) 2016 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
29  **/
30
31 #include "collectd.h"
32
33 #include <poll.h>
34 #include <semaphore.h>
35 #include <sys/mman.h>
36
37 #include <rte_config.h>
38 #include <rte_eal.h>
39
40 #include "common.h"
41 #include "utils_dpdk.h"
42
43 #define DPDK_DEFAULT_RTE_CONFIG "/var/run/.rte_config"
44 #define DPDK_EAL_ARGC 5
45 #define DPDK_MAX_BUFFER_SIZE (4096 * 4)
46 #define DPDK_CDM_DEFAULT_TIMEOUT 10000
47
48 enum DPDK_HELPER_STATUS {
49   DPDK_HELPER_NOT_INITIALIZED = 0,
50   DPDK_HELPER_INITIALIZING,
51   DPDK_HELPER_WAITING_ON_PRIMARY,
52   DPDK_HELPER_INITIALIZING_EAL,
53   DPDK_HELPER_ALIVE_SENDING_EVENTS,
54   DPDK_HELPER_GRACEFUL_QUIT,
55 };
56
57 #define DPDK_HELPER_TRACE(_name)                                               \
58   DEBUG("%s:%s:%d pid=%lu", _name, __FUNCTION__, __LINE__, (long)getpid())
59
60 #define DPDK_HELPER_USE_PIPES
61
62 struct dpdk_helper_ctx_s {
63
64   dpdk_eal_config_t eal_config;
65   int eal_initialized;
66
67   size_t shm_size;
68   const char *shm_name;
69
70   sem_t sema_cmd_start;
71   sem_t sema_cmd_complete;
72   cdtime_t cmd_wait_time;
73
74   pid_t pid;
75 #ifdef DPDK_HELPER_USE_PIPES
76   int pipes[2];
77 #endif /* DPDK_HELPER_USE_PIPES */
78   int status;
79
80   int cmd;
81   int cmd_result;
82
83   char priv_data[];
84 };
85
86 static int dpdk_shm_init(const char *name, size_t size, void **map);
87 static int dpdk_shm_cleanup(const char *name, size_t size, void *map);
88
89 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc);
90 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc);
91 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc);
92 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid);
93 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
94                                     enum DPDK_HELPER_STATUS status);
95 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
96                             enum DPDK_HELPER_STATUS status);
97 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc);
98 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc);
99 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status);
100
101 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc) {
102   if (phc == NULL)
103     return;
104
105   DPDK_HELPER_TRACE(phc->shm_name);
106
107   ssnprintf(phc->eal_config.coremask, DATA_MAX_NAME_LEN, "%s", "0xf");
108   ssnprintf(phc->eal_config.memory_channels, DATA_MAX_NAME_LEN, "%s", "1");
109   ssnprintf(phc->eal_config.process_type, DATA_MAX_NAME_LEN, "%s", "secondary");
110   ssnprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN, "%s",
111             DPDK_DEFAULT_RTE_CONFIG);
112 }
113
114 int dpdk_helper_eal_config_set(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
115   if (phc == NULL) {
116     ERROR("Invalid argument (phc)");
117     return -EINVAL;
118   }
119
120   DPDK_HELPER_TRACE(phc->shm_name);
121
122   if (ec == NULL) {
123     ERROR("Invalid argument (ec)");
124     return -EINVAL;
125   }
126
127   memcpy(&phc->eal_config, ec, sizeof(dpdk_eal_config_t));
128
129   return 0;
130 }
131
132 int dpdk_helper_eal_config_get(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
133   if (phc == NULL) {
134     ERROR("Invalid argument (phc)");
135     return -EINVAL;
136   }
137
138   DPDK_HELPER_TRACE(phc->shm_name);
139
140   if (ec == NULL) {
141     ERROR("Invalid argument (ec)");
142     return -EINVAL;
143   }
144
145   memcpy(ec, &phc->eal_config, sizeof(dpdk_eal_config_t));
146
147   return 0;
148 }
149
150 int dpdk_helper_eal_config_parse(dpdk_helper_ctx_t *phc, oconfig_item_t *ci) {
151   DPDK_HELPER_TRACE(phc->shm_name);
152
153   if (phc == NULL) {
154     ERROR("Invalid argument (phc)");
155     return -EINVAL;
156   }
157
158   if (ci == NULL) {
159     ERROR("Invalid argument (ci)");
160     return -EINVAL;
161   }
162
163   for (int i = 0; i < ci->children_num; i++) {
164     oconfig_item_t *child = ci->children + i;
165     if (strcasecmp("Coremask", child->key) == 0) {
166       cf_util_get_string_buffer(child, phc->eal_config.coremask,
167                                 sizeof(phc->eal_config.coremask));
168       DEBUG("dpdk_common: EAL:Coremask %s", phc->eal_config.coremask);
169     } else if (strcasecmp("MemoryChannels", child->key) == 0) {
170       cf_util_get_string_buffer(child, phc->eal_config.memory_channels,
171                                 sizeof(phc->eal_config.memory_channels));
172       DEBUG("dpdk_common: EAL:Memory Channels %s",
173             phc->eal_config.memory_channels);
174     } else if (strcasecmp("SocketMemory", child->key) == 0) {
175       cf_util_get_string_buffer(child, phc->eal_config.socket_memory,
176                                 sizeof(phc->eal_config.socket_memory));
177       DEBUG("dpdk_common: EAL:Socket memory %s", phc->eal_config.socket_memory);
178     } else if (strcasecmp("ProcessType", child->key) == 0) {
179       cf_util_get_string_buffer(child, phc->eal_config.process_type,
180                                 sizeof(phc->eal_config.process_type));
181       DEBUG("dpdk_common: EAL:Process type %s", phc->eal_config.process_type);
182     } else if ((strcasecmp("FilePrefix", child->key) == 0) &&
183                (child->values[0].type == OCONFIG_TYPE_STRING)) {
184       ssnprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN,
185                 "/var/run/.%s_config", child->values[0].value.string);
186       DEBUG("dpdk_common: EAL:File prefix %s", phc->eal_config.file_prefix);
187     }
188   }
189
190   return 0;
191 }
192
193 static int dpdk_shm_init(const char *name, size_t size, void **map) {
194   DPDK_HELPER_TRACE(name);
195
196   char errbuf[ERR_BUF_SIZE];
197
198   int fd = shm_open(name, O_CREAT | O_TRUNC | O_RDWR, 0666);
199   if (fd < 0) {
200     WARNING("dpdk_shm_init: Failed to open %s as SHM:%s\n", name,
201             sstrerror(errno, errbuf, sizeof(errbuf)));
202     goto fail;
203   }
204
205   int ret = ftruncate(fd, size);
206   if (ret != 0) {
207     WARNING("dpdk_shm_init: Failed to resize SHM:%s\n",
208             sstrerror(errno, errbuf, sizeof(errbuf)));
209     goto fail_close;
210   }
211
212   *map = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
213   if (*map == MAP_FAILED) {
214     WARNING("dpdk_shm_init:Failed to mmap SHM:%s\n",
215             sstrerror(errno, errbuf, sizeof(errbuf)));
216     goto fail_close;
217   }
218   /*
219    * Close the file descriptor, the shared memory object still exists
220    * and can only be removed by calling shm_unlink().
221    */
222   close(fd);
223
224   memset(*map, 0, size);
225
226   return 0;
227
228 fail_close:
229   close(fd);
230 fail:
231   *map = NULL;
232   return -1;
233 }
234
235 static int dpdk_shm_cleanup(const char *name, size_t size, void *map) {
236   DPDK_HELPER_TRACE(name);
237
238   int ret = munmap(map, size);
239   if (ret) {
240     ERROR("munmap returned %d\n", ret);
241   }
242
243   ret = shm_unlink(name);
244   if (ret) {
245     ERROR("shm_unlink returned %d\n", ret);
246   }
247
248   return 0;
249 }
250
251 inline void *dpdk_helper_priv_get(dpdk_helper_ctx_t *phc) {
252   if (phc)
253     return (void *)phc->priv_data;
254
255   return NULL;
256 }
257
258 int dpdk_helper_data_size_get(dpdk_helper_ctx_t *phc) {
259   if (phc == NULL) {
260     DPDK_CHILD_LOG("Invalid argument(phc)\n");
261     return -EINVAL;
262   }
263
264   return (phc->shm_size - sizeof(dpdk_helper_ctx_t));
265 }
266
267 int dpdk_helper_init(const char *name, size_t data_size,
268                      dpdk_helper_ctx_t **pphc) {
269   int err = 0;
270   dpdk_helper_ctx_t *phc = NULL;
271   size_t shm_size = sizeof(dpdk_helper_ctx_t) + data_size;
272   char errbuf[ERR_BUF_SIZE];
273
274   if (pphc == NULL) {
275     ERROR("%s:Invalid argument(pphc)", __FUNCTION__);
276     return -EINVAL;
277   }
278
279   if (name == NULL) {
280     ERROR("%s:Invalid argument(name)", __FUNCTION__);
281     return -EINVAL;
282   }
283
284   DPDK_HELPER_TRACE(name);
285
286   /* Allocate dpdk_helper_ctx_t and
287   * initialize a POSIX SHared Memory (SHM) object.
288   */
289   err = dpdk_shm_init(name, shm_size, (void **)&phc);
290   if (err != 0) {
291     return -errno;
292   }
293
294   err = sem_init(&phc->sema_cmd_start, 1, 0);
295   if (err != 0) {
296     ERROR("sema_cmd_start semaphore init failed: %s\n",
297           sstrerror(errno, errbuf, sizeof(errbuf)));
298     dpdk_shm_cleanup(name, shm_size, (void *)phc);
299     return -errno;
300   }
301
302   err = sem_init(&phc->sema_cmd_complete, 1, 0);
303   if (err != 0) {
304     ERROR("sema_cmd_complete semaphore init failed: %s\n",
305           sstrerror(errno, errbuf, sizeof(errbuf)));
306     sem_destroy(&phc->sema_cmd_start);
307     dpdk_shm_cleanup(name, shm_size, (void *)phc);
308     return -errno;
309   }
310
311   phc->shm_size = shm_size;
312   phc->shm_name = name;
313
314   dpdk_helper_config_default(phc);
315
316   *pphc = phc;
317
318   return 0;
319 }
320
321 int dpdk_helper_shutdown(dpdk_helper_ctx_t *phc) {
322   if (phc == NULL) {
323     ERROR("%s:Invalid argument(phc)", __FUNCTION__);
324     return -EINVAL;
325   }
326
327   DPDK_HELPER_TRACE(phc->shm_name);
328
329 #ifdef DPDK_HELPER_USE_PIPES
330   close(phc->pipes[1]);
331 #endif
332
333   if (phc->status != DPDK_HELPER_NOT_INITIALIZED) {
334     dpdk_helper_exit_command(phc, DPDK_HELPER_GRACEFUL_QUIT);
335   }
336
337   sem_destroy(&phc->sema_cmd_start);
338   sem_destroy(&phc->sema_cmd_complete);
339   dpdk_shm_cleanup(phc->shm_name, phc->shm_size, (void *)phc);
340
341   return 0;
342 }
343
344 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc) {
345   char errbuf[ERR_BUF_SIZE];
346   if (phc == NULL) {
347     ERROR("Invalid argument(phc)");
348     return -EINVAL;
349   }
350
351   DPDK_HELPER_TRACE(phc->shm_name);
352
353   phc->eal_initialized = 0;
354   phc->cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
355
356 #ifdef DPDK_HELPER_USE_PIPES
357   /*
358    * Create a pipe for helper stdout back to collectd. This is necessary for
359    * logging EAL failures, as rte_eal_init() calls rte_panic().
360    */
361   if (phc->pipes[1]) {
362     DEBUG("dpdk_helper_spawn: collectd closing helper pipe %d", phc->pipes[1]);
363   } else {
364     DEBUG("dpdk_helper_spawn: collectd helper pipe %d, not closing",
365           phc->pipes[1]);
366   }
367
368   if (pipe(phc->pipes) != 0) {
369     DEBUG("dpdk_helper_spawn: Could not create helper pipe: %s\n",
370           sstrerror(errno, errbuf, sizeof(errbuf)));
371     return -1;
372   }
373
374   int pipe0_flags = fcntl(phc->pipes[0], F_GETFL, 0);
375   int pipe1_flags = fcntl(phc->pipes[1], F_GETFL, 0);
376   if (pipe0_flags == -1 || pipe1_flags == -1) {
377     WARNING("dpdk_helper_spawn: error setting up pipe flags: %s",
378             sstrerror(errno, errbuf, sizeof(errbuf)));
379   }
380   int pipe0_err = fcntl(phc->pipes[0], F_SETFL, pipe1_flags | O_NONBLOCK);
381   int pipe1_err = fcntl(phc->pipes[1], F_SETFL, pipe0_flags | O_NONBLOCK);
382   if (pipe0_err == -1 || pipe1_err == -1) {
383     WARNING("dpdk_helper_spawn: error setting up pipes: %s",
384             sstrerror(errno, errbuf, sizeof(errbuf)));
385   }
386 #endif /* DPDK_HELPER_USE_PIPES */
387
388   pid_t pid = fork();
389   if (pid > 0) {
390     phc->pid = pid;
391 #ifdef DPDK_HELPER_USE_PIPES
392     close(phc->pipes[1]);
393 #endif /* DPDK_HELPER_USE_PIPES */
394     DEBUG("%s:dpdk_helper_spawn: helper pid %lu", phc->shm_name,
395           (long)phc->pid);
396   } else if (pid == 0) {
397 #ifdef DPDK_HELPER_USE_PIPES
398     /* Replace stdout with a pipe to collectd. */
399     close(phc->pipes[0]);
400     close(STDOUT_FILENO);
401     dup2(phc->pipes[1], STDOUT_FILENO);
402 #endif /* DPDK_HELPER_USE_PIPES */
403     DPDK_CHILD_TRACE(phc->shm_name);
404     dpdk_helper_worker(phc);
405     exit(0);
406   } else {
407     ERROR("dpdk_helper_start: Failed to fork helper process: %s\n",
408           sstrerror(errno, errbuf, sizeof(errbuf)));
409     return -1;
410   }
411
412   return 0;
413 }
414
415 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
416                             enum DPDK_HELPER_STATUS status) {
417   DPDK_CHILD_LOG("%s:%s:%d %s\n", phc->shm_name, __FUNCTION__, __LINE__,
418                  dpdk_helper_status_str(status));
419
420 #ifdef DPDK_HELPER_USE_PIPES
421   close(phc->pipes[1]);
422 #endif /* DPDK_HELPER_USE_PIPES */
423
424   phc->status = status;
425
426   exit(0);
427
428   return 0;
429 }
430
431 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
432                                     enum DPDK_HELPER_STATUS status) {
433   char errbuf[ERR_BUF_SIZE];
434   DPDK_HELPER_TRACE(phc->shm_name);
435
436 #ifdef DPDK_HELPER_USE_PIPES
437   close(phc->pipes[1]);
438 #endif /* DPDK_HELPER_USE_PIPES */
439
440   if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
441     phc->status = status;
442     DEBUG("%s:%s:%d %s", phc->shm_name, __FUNCTION__, __LINE__,
443           dpdk_helper_status_str(status));
444
445     int ret = dpdk_helper_command(phc, DPDK_CMD_QUIT, NULL, 0);
446     if (ret != 0) {
447       DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
448             __LINE__, (long)phc->pid);
449
450       int err = kill(phc->pid, SIGKILL);
451       if (err) {
452         ERROR("%s error sending kill to helper: %s\n", __FUNCTION__,
453               sstrerror(errno, errbuf, sizeof(errbuf)));
454       }
455     }
456   } else {
457
458     DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
459           __LINE__, (long)phc->pid);
460
461     int err = kill(phc->pid, SIGKILL);
462     if (err) {
463       ERROR("%s error sending kill to helper: %s\n", __FUNCTION__,
464             sstrerror(errno, errbuf, sizeof(errbuf)));
465     }
466   }
467
468   return 0;
469 }
470
471 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc) {
472   phc->status = DPDK_HELPER_INITIALIZING_EAL;
473   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (start)\n",
474                  phc->shm_name, __FUNCTION__, __LINE__);
475
476   char *argp[DPDK_EAL_ARGC * 2 + 1];
477   int argc = 0;
478
479   /* EAL config must be initialized */
480   assert(phc->eal_config.coremask[0] != 0);
481   assert(phc->eal_config.memory_channels[0] != 0);
482   assert(phc->eal_config.process_type[0] != 0);
483   assert(phc->eal_config.file_prefix[0] != 0);
484
485   argp[argc++] = "collectd-dpdk";
486
487   argp[argc++] = "-c";
488   argp[argc++] = phc->eal_config.coremask;
489
490   argp[argc++] = "-n";
491   argp[argc++] = phc->eal_config.memory_channels;
492
493   if (strcasecmp(phc->eal_config.socket_memory, "") != 0) {
494     argp[argc++] = "--socket-mem";
495     argp[argc++] = phc->eal_config.socket_memory;
496   }
497
498   if (strcasecmp(phc->eal_config.file_prefix, DPDK_DEFAULT_RTE_CONFIG) != 0) {
499     argp[argc++] = "--file-prefix";
500     argp[argc++] = phc->eal_config.file_prefix;
501   }
502
503   argp[argc++] = "--proc-type";
504   argp[argc++] = phc->eal_config.process_type;
505
506   assert(argc <= (DPDK_EAL_ARGC * 2 + 1));
507
508   int ret = rte_eal_init(argc, argp);
509
510   if (ret < 0) {
511
512     phc->eal_initialized = 0;
513
514     DPDK_CHILD_LOG("dpdk_helper_eal_init: ERROR initializing EAL ret=%d\n",
515                    ret);
516
517     printf("dpdk_helper_eal_init: EAL arguments: ");
518     for (int i = 0; i < argc; i++) {
519       printf("%s ", argp[i]);
520     }
521     printf("\n");
522
523     return ret;
524   }
525
526   phc->eal_initialized = 1;
527
528   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (done)\n",
529                  phc->shm_name, __FUNCTION__, __LINE__);
530
531   return 0;
532 }
533
534 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid) {
535   DPDK_CHILD_TRACE(phc->shm_name);
536
537   struct timespec ts;
538   cdtime_t now = cdtime();
539   cdtime_t cmd_wait_time = MS_TO_CDTIME_T(1500) + phc->cmd_wait_time * 2;
540   ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
541
542   int ret = sem_timedwait(&phc->sema_cmd_start, &ts);
543   DPDK_CHILD_LOG("%s:%s:%d pid=%lu got sema_cmd_start (ret=%d, errno=%d)\n",
544                  phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), ret,
545                  errno);
546
547   if (phc->cmd == DPDK_CMD_QUIT) {
548     DPDK_CHILD_LOG("%s:%s:%d pid=%lu exiting\n", phc->shm_name, __FUNCTION__,
549                    __LINE__, (long)getpid());
550     exit(0);
551   } else if (ret == -1 && errno == ETIMEDOUT) {
552     if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
553       DPDK_CHILD_LOG("%s:dpdk_helper_cmd_wait: sem timedwait()"
554                      " timeout, did collectd terminate?\n",
555                      phc->shm_name);
556       dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
557     }
558   }
559 #if COLLECT_DEBUG
560   int val = 0;
561   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
562     DPDK_CHILD_LOG("%s:%s:%d pid=%lu wait sema_cmd_start (value=%d)\n",
563                    phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), val);
564 #endif
565
566   /* Parent PID change means collectd died so quit the helper process. */
567   if (ppid != getppid()) {
568     DPDK_CHILD_LOG("dpdk_helper_cmd_wait: parent PID changed, quitting.\n");
569     dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
570   }
571
572   /* Checking for DPDK primary process. */
573   if (!rte_eal_primary_proc_alive(phc->eal_config.file_prefix)) {
574     if (phc->eal_initialized) {
575       DPDK_CHILD_LOG(
576           "%s:dpdk_helper_cmd_wait: no primary alive but EAL initialized:"
577           " quitting.\n",
578           phc->shm_name);
579       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
580     }
581
582     phc->status = DPDK_HELPER_WAITING_ON_PRIMARY;
583     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_WAITING_ON_PRIMARY\n", phc->shm_name,
584                    __FUNCTION__, __LINE__);
585
586     return -1;
587   }
588
589   if (!phc->eal_initialized) {
590     int ret = dpdk_helper_eal_init(phc);
591     if (ret != 0) {
592       DPDK_CHILD_LOG("Error initializing EAL\n");
593       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
594     }
595     phc->status = DPDK_HELPER_ALIVE_SENDING_EVENTS;
596     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_ALIVE_SENDING_EVENTS\n", phc->shm_name,
597                    __FUNCTION__, __LINE__);
598     return -1;
599   }
600
601   return 0;
602 }
603
604 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc) {
605   DPDK_CHILD_TRACE(phc->shm_name);
606
607   pid_t ppid = getppid();
608
609   while (1) {
610     if (dpdk_helper_cmd_wait(phc, ppid) == 0) {
611       DPDK_CHILD_LOG("%s:%s:%d DPDK command handle (cmd=%d, pid=%lu)\n",
612                      phc->shm_name, __FUNCTION__, __LINE__, phc->cmd,
613                      (long)getpid());
614       phc->cmd_result = dpdk_helper_command_handler(phc, phc->cmd);
615     } else {
616       phc->cmd_result = -1;
617     }
618
619     /* now kick collectd to get results */
620     int err = sem_post(&phc->sema_cmd_complete);
621     DPDK_CHILD_LOG("%s:%s:%d post sema_cmd_complete (pid=%lu)\n", phc->shm_name,
622                    __FUNCTION__, __LINE__, (long)getpid());
623     if (err) {
624       char errbuf[ERR_BUF_SIZE];
625       DPDK_CHILD_LOG("dpdk_helper_worker: error posting sema_cmd_complete "
626                      "semaphore (%s)\n",
627                      sstrerror(errno, errbuf, sizeof(errbuf)));
628     }
629
630 #if COLLECT_DEBUG
631     int val = 0;
632     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
633       DPDK_CHILD_LOG("%s:%s:%d pid=%lu sema_cmd_complete (value=%d)\n",
634                      phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(),
635                      val);
636 #endif
637
638   } /* while(1) */
639
640   return 0;
641 }
642
643 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status) {
644   switch (status) {
645   case DPDK_HELPER_ALIVE_SENDING_EVENTS:
646     return "DPDK_HELPER_ALIVE_SENDING_EVENTS";
647   case DPDK_HELPER_WAITING_ON_PRIMARY:
648     return "DPDK_HELPER_WAITING_ON_PRIMARY";
649   case DPDK_HELPER_INITIALIZING:
650     return "DPDK_HELPER_INITIALIZING";
651   case DPDK_HELPER_INITIALIZING_EAL:
652     return "DPDK_HELPER_INITIALIZING_EAL";
653   case DPDK_HELPER_GRACEFUL_QUIT:
654     return "DPDK_HELPER_GRACEFUL_QUIT";
655   case DPDK_HELPER_NOT_INITIALIZED:
656     return "DPDK_HELPER_NOT_INITIALIZED";
657   default:
658     return "UNKNOWN";
659   }
660 }
661
662 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc) {
663   DEBUG("%s:%s:%d pid=%u %s", phc->shm_name, __FUNCTION__, __LINE__, getpid(),
664         dpdk_helper_status_str(phc->status));
665   char errbuf[ERR_BUF_SIZE];
666
667   if (phc->status == DPDK_HELPER_GRACEFUL_QUIT) {
668     return 0;
669   } else if (phc->status == DPDK_HELPER_NOT_INITIALIZED) {
670     phc->status = DPDK_HELPER_INITIALIZING;
671     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
672           __LINE__);
673     int err = dpdk_helper_spawn(phc);
674     if (err) {
675       ERROR("dpdkstat: error spawning helper %s",
676             sstrerror(errno, errbuf, sizeof(errbuf)));
677     }
678     return -1;
679   }
680
681   pid_t ws = waitpid(phc->pid, NULL, WNOHANG);
682   if (ws != 0) {
683     phc->status = DPDK_HELPER_INITIALIZING;
684     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
685           __LINE__);
686     int err = dpdk_helper_spawn(phc);
687     if (err) {
688       ERROR("dpdkstat: error spawning helper %s",
689             sstrerror(errno, errbuf, sizeof(errbuf)));
690     }
691     return -1;
692   }
693
694   if (phc->status == DPDK_HELPER_INITIALIZING_EAL) {
695     return -1;
696   }
697
698   return 0;
699 }
700
701 #ifdef DPDK_HELPER_USE_PIPES
702 static void dpdk_helper_check_pipe(dpdk_helper_ctx_t *phc) {
703   char buf[DPDK_MAX_BUFFER_SIZE];
704   char out[DPDK_MAX_BUFFER_SIZE];
705
706   /* non blocking check on helper logging pipe */
707   struct pollfd fds = {
708       .fd = phc->pipes[0], .events = POLLIN,
709   };
710   int data_avail = poll(&fds, 1, 0);
711   if (data_avail < 0) {
712     if (errno != EINTR || errno != EAGAIN) {
713       char errbuf[ERR_BUF_SIZE];
714       ERROR("%s: poll(2) failed: %s", phc->shm_name,
715             sstrerror(errno, errbuf, sizeof(errbuf)));
716     }
717   }
718   while (data_avail) {
719     int nbytes = read(phc->pipes[0], buf, sizeof(buf));
720     if (nbytes <= 0)
721       break;
722     sstrncpy(out, buf, nbytes);
723     DEBUG("%s: helper process:\n%s", phc->shm_name, out);
724   }
725 }
726 #endif /* DPDK_HELPER_USE_PIPES */
727
728 int dpdk_helper_command(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd, int *result,
729                         cdtime_t cmd_wait_time) {
730   if (phc == NULL) {
731     ERROR("Invalid argument(phc)");
732     return -EINVAL;
733   }
734
735   DEBUG("%s:%s:%d pid=%lu, cmd=%d", phc->shm_name, __FUNCTION__, __LINE__,
736         (long)getpid(), cmd);
737
738   phc->cmd_wait_time = cmd_wait_time;
739
740   int ret = 0;
741
742   ret = dpdk_helper_status_check(phc);
743
744 #ifdef DPDK_HELPER_USE_PIPES
745   dpdk_helper_check_pipe(phc);
746 #endif /* DPDK_HELPER_USE_PIPES */
747
748   if (ret != 0) {
749     return ret;
750   }
751
752   DEBUG("%s: DPDK command execute (cmd=%d)", phc->shm_name, cmd);
753
754   phc->cmd_result = 0;
755   phc->cmd = cmd;
756
757   /* kick helper to process command */
758   int err = sem_post(&phc->sema_cmd_start);
759   if (err) {
760     char errbuf[ERR_BUF_SIZE];
761     ERROR("dpdk_helper_worker: error posting sema_cmd_start semaphore (%s)\n",
762           sstrerror(errno, errbuf, sizeof(errbuf)));
763   }
764
765 #if COLLECT_DEBUG
766   int val = 0;
767   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
768     DEBUG("%s:dpdk_helper_command: post sema_cmd_start (value=%d)",
769           phc->shm_name, val);
770 #endif
771
772   if (phc->cmd != DPDK_CMD_QUIT) {
773
774     /* wait for helper to complete processing */
775     struct timespec ts;
776     cdtime_t now = cdtime();
777
778     if (phc->status != DPDK_HELPER_ALIVE_SENDING_EVENTS) {
779       cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
780     }
781
782     ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
783     ret = sem_timedwait(&phc->sema_cmd_complete, &ts);
784     if (ret == -1 && errno == ETIMEDOUT) {
785       DPDK_HELPER_TRACE(phc->shm_name);
786       DEBUG("%s:sema_cmd_start: timeout in collectd thread: is a DPDK Primary "
787             "running?",
788             phc->shm_name);
789       return -ETIMEDOUT;
790     }
791
792 #if COLLECT_DEBUG
793     val = 0;
794     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
795       DEBUG("%s:dpdk_helper_command: wait sema_cmd_complete (value=%d)",
796             phc->shm_name, val);
797 #endif
798
799     if (result) {
800       *result = phc->cmd_result;
801     }
802   }
803
804 #ifdef DPDK_HELPER_USE_PIPES
805   dpdk_helper_check_pipe(phc);
806 #endif /* DPDK_HELPER_USE_PIPES */
807
808   DEBUG("%s: DPDK command complete (cmd=%d, result=%d)", phc->shm_name,
809         phc->cmd, phc->cmd_result);
810
811   return 0;
812 }
813
814 uint64_t strtoull_safe(const char *str, int *err) {
815   uint64_t val = 0;
816   char *endptr;
817   int res = 0;
818
819   val = strtoull(str, &endptr, 16);
820   if (*endptr) {
821     ERROR("%s Failed to parse the value %s, endptr=%c", __FUNCTION__, str,
822           *endptr);
823     res = -EINVAL;
824   }
825   if (err != NULL)
826     *err = res;
827   return val;
828 }
829
830 uint128_t str_to_uint128(const char *str, int len) {
831   uint128_t lcore_mask;
832   int err = 0;
833
834   memset(&lcore_mask, 0, sizeof(uint128_t));
835
836   if (len <= 2 || strncmp(str, "0x", 2) != 0) {
837     ERROR("%s Value %s should be represened in hexadecimal format",
838           __FUNCTION__, str);
839     return lcore_mask;
840   }
841
842   if (len <= 18) {
843     lcore_mask.low = strtoull_safe(str, &err);
844     if (err)
845       goto parse_out;
846   } else {
847     char low_str[DATA_MAX_NAME_LEN];
848     char high_str[DATA_MAX_NAME_LEN];
849
850     memset(high_str, 0, sizeof(high_str));
851     memset(low_str, 0, sizeof(low_str));
852
853     strncpy(high_str, str, len - 16);
854     strncpy(low_str, str + len - 16, 16);
855
856     lcore_mask.low = strtoull_safe(low_str, &err);
857     if (err)
858       goto parse_out;
859
860     lcore_mask.high = strtoull_safe(high_str, &err);
861     if (err) {
862       lcore_mask.low = 0;
863       goto parse_out;
864     }
865   }
866
867 parse_out:
868   return lcore_mask;
869 }