Correction of tracing from child process
[collectd.git] / src / utils_dpdk.c
1 /*
2  * collectd - src/utils_dpdk.c
3  * MIT License
4  *
5  * Copyright(c) 2016 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Taras Chornyi <tarasx.chornyi@intel.com>
29  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
30  *   Krzysztof Matczak <krzysztofx.matczak@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include <poll.h>
36 #include <semaphore.h>
37 #include <sys/mman.h>
38
39 #include <rte_config.h>
40 #include <rte_eal.h>
41 #include <rte_ethdev.h>
42
43 #include "common.h"
44 #include "utils_dpdk.h"
45
46 #define DPDK_DEFAULT_RTE_CONFIG "/var/run/.rte_config"
47 #define DPDK_EAL_ARGC 10
48 //Complete trace should fit into 1024 chars
49 #define DPDK_MAX_BUFFER_SIZE 896
50 #define DPDK_CDM_DEFAULT_TIMEOUT 10000
51
52 enum DPDK_HELPER_STATUS {
53   DPDK_HELPER_NOT_INITIALIZED = 0,
54   DPDK_HELPER_INITIALIZING,
55   DPDK_HELPER_WAITING_ON_PRIMARY,
56   DPDK_HELPER_INITIALIZING_EAL,
57   DPDK_HELPER_ALIVE_SENDING_EVENTS,
58   DPDK_HELPER_GRACEFUL_QUIT,
59 };
60
61 #define DPDK_HELPER_TRACE(_name)                                               \
62   DEBUG("%s:%s:%d pid=%ld", _name, __FUNCTION__, __LINE__, (long)getpid())
63
64 struct dpdk_helper_ctx_s {
65
66   dpdk_eal_config_t eal_config;
67   int eal_initialized;
68
69   size_t shm_size;
70   char shm_name[DATA_MAX_NAME_LEN];
71
72   sem_t sema_cmd_start;
73   sem_t sema_cmd_complete;
74   cdtime_t cmd_wait_time;
75
76   pid_t pid;
77   int pipes[2];
78   int status;
79
80   int cmd;
81   int cmd_result;
82
83   char priv_data[];
84 };
85
86 static int dpdk_shm_init(const char *name, size_t size, void **map);
87 static void dpdk_shm_cleanup(const char *name, size_t size, void *map);
88
89 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc);
90 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc);
91 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc);
92 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid);
93 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
94                                     enum DPDK_HELPER_STATUS status);
95 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
96                             enum DPDK_HELPER_STATUS status);
97 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc);
98 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc);
99 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status);
100
101 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc) {
102   if (phc == NULL)
103     return;
104
105   DPDK_HELPER_TRACE(phc->shm_name);
106
107   snprintf(phc->eal_config.coremask, DATA_MAX_NAME_LEN, "%s", "0xf");
108   snprintf(phc->eal_config.memory_channels, DATA_MAX_NAME_LEN, "%s", "1");
109   snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN, "%s",
110            DPDK_DEFAULT_RTE_CONFIG);
111 }
112
113 int dpdk_helper_eal_config_set(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
114   if (phc == NULL) {
115     ERROR("Invalid argument (phc)");
116     return -EINVAL;
117   }
118
119   DPDK_HELPER_TRACE(phc->shm_name);
120
121   if (ec == NULL) {
122     ERROR("Invalid argument (ec)");
123     return -EINVAL;
124   }
125
126   memcpy(&phc->eal_config, ec, sizeof(phc->eal_config));
127
128   return 0;
129 }
130
131 int dpdk_helper_eal_config_get(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
132   if (phc == NULL) {
133     ERROR("Invalid argument (phc)");
134     return -EINVAL;
135   }
136
137   DPDK_HELPER_TRACE(phc->shm_name);
138
139   if (ec == NULL) {
140     ERROR("Invalid argument (ec)");
141     return -EINVAL;
142   }
143
144   memcpy(ec, &phc->eal_config, sizeof(*ec));
145
146   return 0;
147 }
148
149 int dpdk_helper_eal_config_parse(dpdk_helper_ctx_t *phc, oconfig_item_t *ci) {
150   DPDK_HELPER_TRACE(phc->shm_name);
151
152   if (phc == NULL) {
153     ERROR("Invalid argument (phc)");
154     return -EINVAL;
155   }
156
157   if (ci == NULL) {
158     ERROR("Invalid argument (ci)");
159     return -EINVAL;
160   }
161
162   int status = 0;
163   for (int i = 0; i < ci->children_num; i++) {
164     oconfig_item_t *child = ci->children + i;
165
166     if (strcasecmp("Coremask", child->key) == 0) {
167       status = cf_util_get_string_buffer(child, phc->eal_config.coremask,
168                                          sizeof(phc->eal_config.coremask));
169       DEBUG("dpdk_common: EAL:Coremask %s", phc->eal_config.coremask);
170     } else if (strcasecmp("MemoryChannels", child->key) == 0) {
171       status =
172           cf_util_get_string_buffer(child, phc->eal_config.memory_channels,
173                                     sizeof(phc->eal_config.memory_channels));
174       DEBUG("dpdk_common: EAL:Memory Channels %s",
175             phc->eal_config.memory_channels);
176     } else if (strcasecmp("SocketMemory", child->key) == 0) {
177       status = cf_util_get_string_buffer(child, phc->eal_config.socket_memory,
178                                          sizeof(phc->eal_config.socket_memory));
179       DEBUG("dpdk_common: EAL:Socket memory %s", phc->eal_config.socket_memory);
180     } else if (strcasecmp("FilePrefix", child->key) == 0) {
181       char prefix[DATA_MAX_NAME_LEN];
182
183       status = cf_util_get_string_buffer(child, prefix, sizeof(prefix));
184       if (status == 0) {
185         snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN,
186                   "/var/run/.%s_config", prefix);
187         DEBUG("dpdk_common: EAL:File prefix %s", phc->eal_config.file_prefix);
188       }
189     } else if (strcasecmp("LogLevel", child->key) == 0) {
190       status = cf_util_get_string_buffer(child, phc->eal_config.log_level,
191                                          sizeof(phc->eal_config.log_level));
192       DEBUG("dpdk_common: EAL:LogLevel %s", phc->eal_config.log_level);
193     } else if (strcasecmp("RteDriverLibPath", child->key) == 0) {
194       status = cf_util_get_string_buffer(child, phc->eal_config.rte_driver_lib_path,
195                                          sizeof(phc->eal_config.rte_driver_lib_path));
196       DEBUG("dpdk_common: EAL:RteDriverLibPath %s", phc->eal_config.rte_driver_lib_path);
197     } else {
198       ERROR("dpdk_common: Invalid '%s' configuration option", child->key);
199       status = -EINVAL;
200     }
201
202     if (status != 0) {
203       ERROR("dpdk_common: Parsing EAL configuration failed");
204       break;
205     }
206   }
207
208   return status;
209 }
210
211 static int dpdk_shm_init(const char *name, size_t size, void **map) {
212   DPDK_HELPER_TRACE(name);
213
214   char errbuf[ERR_BUF_SIZE];
215
216   int fd = shm_open(name, O_CREAT | O_TRUNC | O_RDWR, 0666);
217   if (fd < 0) {
218     WARNING("dpdk_shm_init: Failed to open %s as SHM:%s", name,
219             sstrerror(errno, errbuf, sizeof(errbuf)));
220     *map = NULL;
221     return -1;
222   }
223
224   int ret = ftruncate(fd, size);
225   if (ret != 0) {
226     WARNING("dpdk_shm_init: Failed to resize SHM:%s",
227             sstrerror(errno, errbuf, sizeof(errbuf)));
228     close(fd);
229     *map = NULL;
230     dpdk_shm_cleanup(name, size, NULL);
231     return -1;
232   }
233
234   *map = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
235   if (*map == MAP_FAILED) {
236     WARNING("dpdk_shm_init:Failed to mmap SHM:%s",
237             sstrerror(errno, errbuf, sizeof(errbuf)));
238     close(fd);
239     *map = NULL;
240     dpdk_shm_cleanup(name, size, NULL);
241     return -1;
242   }
243   /* File descriptor no longer needed for shared memory operations */
244   close(fd);
245   memset(*map, 0, size);
246
247   return 0;
248 }
249
250 static void dpdk_shm_cleanup(const char *name, size_t size, void *map) {
251   DPDK_HELPER_TRACE(name);
252   char errbuf[ERR_BUF_SIZE];
253
254   /*
255    * Call shm_unlink first, as 'name' might be no longer accessible after munmap
256    */
257   if (shm_unlink(name))
258     ERROR("shm_unlink failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
259
260   if (map != NULL) {
261     if (munmap(map, size))
262       ERROR("munmap failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
263   }
264 }
265
266 void *dpdk_helper_priv_get(dpdk_helper_ctx_t *phc) {
267   if (phc)
268     return phc->priv_data;
269
270   return NULL;
271 }
272
273 int dpdk_helper_data_size_get(dpdk_helper_ctx_t *phc) {
274   if (phc == NULL) {
275     DPDK_CHILD_LOG("Invalid argument(phc)\n");
276     return -EINVAL;
277   }
278
279   return phc->shm_size - sizeof(dpdk_helper_ctx_t);
280 }
281
282 int dpdk_helper_init(const char *name, size_t data_size,
283                      dpdk_helper_ctx_t **pphc) {
284   dpdk_helper_ctx_t *phc = NULL;
285   size_t shm_size = sizeof(dpdk_helper_ctx_t) + data_size;
286   char errbuf[ERR_BUF_SIZE];
287
288   if (pphc == NULL) {
289     ERROR("%s:Invalid argument(pphc)", __FUNCTION__);
290     return -EINVAL;
291   }
292
293   if (name == NULL) {
294     ERROR("%s:Invalid argument(name)", __FUNCTION__);
295     return -EINVAL;
296   }
297
298   DPDK_HELPER_TRACE(name);
299
300   /* Allocate dpdk_helper_ctx_t and
301   * initialize a POSIX SHared Memory (SHM) object.
302   */
303   int err = dpdk_shm_init(name, shm_size, (void **)&phc);
304   if (err != 0) {
305     return -errno;
306   }
307
308   err = sem_init(&phc->sema_cmd_start, 1, 0);
309   if (err != 0) {
310     ERROR("sema_cmd_start semaphore init failed: %s",
311           sstrerror(errno, errbuf, sizeof(errbuf)));
312     int errno_m = errno;
313     dpdk_shm_cleanup(name, shm_size, (void *)phc);
314     return -errno_m;
315   }
316
317   err = sem_init(&phc->sema_cmd_complete, 1, 0);
318   if (err != 0) {
319     ERROR("sema_cmd_complete semaphore init failed: %s",
320           sstrerror(errno, errbuf, sizeof(errbuf)));
321     sem_destroy(&phc->sema_cmd_start);
322     int errno_m = errno;
323     dpdk_shm_cleanup(name, shm_size, (void *)phc);
324     return -errno_m;
325   }
326
327   phc->shm_size = shm_size;
328   sstrncpy(phc->shm_name, name, sizeof(phc->shm_name));
329
330   dpdk_helper_config_default(phc);
331
332   *pphc = phc;
333
334   return 0;
335 }
336
337 void dpdk_helper_shutdown(dpdk_helper_ctx_t *phc) {
338   if (phc == NULL)
339     return;
340
341   DPDK_HELPER_TRACE(phc->shm_name);
342
343   close(phc->pipes[1]);
344
345   if (phc->status != DPDK_HELPER_NOT_INITIALIZED) {
346     dpdk_helper_exit_command(phc, DPDK_HELPER_GRACEFUL_QUIT);
347   }
348
349   sem_destroy(&phc->sema_cmd_start);
350   sem_destroy(&phc->sema_cmd_complete);
351   dpdk_shm_cleanup(phc->shm_name, phc->shm_size, (void *)phc);
352 }
353
354 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc) {
355   char errbuf[ERR_BUF_SIZE];
356   if (phc == NULL) {
357     ERROR("Invalid argument(phc)");
358     return -EINVAL;
359   }
360
361   DPDK_HELPER_TRACE(phc->shm_name);
362
363   phc->eal_initialized = 0;
364   phc->cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
365
366   /*
367    * Create a pipe for helper stdout back to collectd. This is necessary for
368    * logging EAL failures, as rte_eal_init() calls rte_panic().
369    */
370   if (phc->pipes[1]) {
371     DEBUG("dpdk_helper_spawn: collectd closing helper pipe %d", phc->pipes[1]);
372   } else {
373     DEBUG("dpdk_helper_spawn: collectd helper pipe %d, not closing",
374           phc->pipes[1]);
375   }
376
377   if (pipe(phc->pipes) != 0) {
378     DEBUG("dpdk_helper_spawn: Could not create helper pipe: %s",
379           sstrerror(errno, errbuf, sizeof(errbuf)));
380     return -1;
381   }
382
383   int pipe0_flags = fcntl(phc->pipes[0], F_GETFL, 0);
384   int pipe1_flags = fcntl(phc->pipes[1], F_GETFL, 0);
385   if (pipe0_flags == -1 || pipe1_flags == -1) {
386     WARNING("dpdk_helper_spawn: error setting up pipe flags: %s",
387             sstrerror(errno, errbuf, sizeof(errbuf)));
388   }
389   int pipe0_err = fcntl(phc->pipes[0], F_SETFL, pipe1_flags | O_NONBLOCK);
390   int pipe1_err = fcntl(phc->pipes[1], F_SETFL, pipe0_flags | O_NONBLOCK);
391   if (pipe0_err == -1 || pipe1_err == -1) {
392     WARNING("dpdk_helper_spawn: error setting up pipes: %s",
393             sstrerror(errno, errbuf, sizeof(errbuf)));
394   }
395
396   pid_t pid = fork();
397   if (pid > 0) {
398     phc->pid = pid;
399     close(phc->pipes[1]);
400     DEBUG("%s:dpdk_helper_spawn: helper pid %lu", phc->shm_name,
401           (long)phc->pid);
402   } else if (pid == 0) {
403     /* Replace stdout with a pipe to collectd. */
404     close(phc->pipes[0]);
405     close(STDOUT_FILENO);
406     dup2(phc->pipes[1], STDOUT_FILENO);
407     DPDK_CHILD_TRACE(phc->shm_name);
408     dpdk_helper_worker(phc);
409     exit(0);
410   } else {
411     ERROR("dpdk_helper_start: Failed to fork helper process: %s",
412           sstrerror(errno, errbuf, sizeof(errbuf)));
413     return -1;
414   }
415
416   return 0;
417 }
418
419 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
420                             enum DPDK_HELPER_STATUS status) {
421   DPDK_CHILD_LOG("%s:%s:%d %s\n", phc->shm_name, __FUNCTION__, __LINE__,
422                  dpdk_helper_status_str(status));
423
424   close(phc->pipes[1]);
425
426   phc->status = status;
427
428   exit(0);
429
430   return 0;
431 }
432
433 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
434                                     enum DPDK_HELPER_STATUS status) {
435   char errbuf[ERR_BUF_SIZE];
436   DPDK_HELPER_TRACE(phc->shm_name);
437
438   close(phc->pipes[1]);
439
440   if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
441     phc->status = status;
442     DEBUG("%s:%s:%d %s", phc->shm_name, __FUNCTION__, __LINE__,
443           dpdk_helper_status_str(status));
444
445     int ret = dpdk_helper_command(phc, DPDK_CMD_QUIT, NULL, 0);
446     if (ret != 0) {
447       DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
448             __LINE__, (long)phc->pid);
449
450       int err = kill(phc->pid, SIGKILL);
451       if (err) {
452         ERROR("%s error sending kill to helper: %s", __FUNCTION__,
453               sstrerror(errno, errbuf, sizeof(errbuf)));
454       }
455     }
456   } else {
457
458     DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
459           __LINE__, (long)phc->pid);
460
461     int err = kill(phc->pid, SIGKILL);
462     if (err) {
463       ERROR("%s error sending kill to helper: %s", __FUNCTION__,
464             sstrerror(errno, errbuf, sizeof(errbuf)));
465     }
466   }
467
468   return 0;
469 }
470
471 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc) {
472   phc->status = DPDK_HELPER_INITIALIZING_EAL;
473   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (start)\n",
474                  phc->shm_name, __FUNCTION__, __LINE__);
475
476   char *argp[DPDK_EAL_ARGC * 2 + 1];
477   int argc = 0;
478
479   /* EAL config must be initialized */
480   assert(phc->eal_config.coremask[0] != 0);
481   assert(phc->eal_config.memory_channels[0] != 0);
482   assert(phc->eal_config.file_prefix[0] != 0);
483
484   argp[argc++] = "collectd-dpdk";
485
486   argp[argc++] = "-c";
487   argp[argc++] = phc->eal_config.coremask;
488
489   argp[argc++] = "-n";
490   argp[argc++] = phc->eal_config.memory_channels;
491
492   if (strcasecmp(phc->eal_config.socket_memory, "") != 0) {
493     argp[argc++] = "--socket-mem";
494     argp[argc++] = phc->eal_config.socket_memory;
495   }
496
497   if (strcasecmp(phc->eal_config.file_prefix, DPDK_DEFAULT_RTE_CONFIG) != 0) {
498     argp[argc++] = "--file-prefix";
499     argp[argc++] = phc->eal_config.file_prefix;
500   }
501
502   argp[argc++] = "--proc-type";
503   argp[argc++] = "secondary";
504
505   if (strcasecmp(phc->eal_config.log_level, "") != 0) {
506     argp[argc++] = "--log-level";
507     argp[argc++] = phc->eal_config.log_level;
508   }
509   if (strcasecmp(phc->eal_config.rte_driver_lib_path, "") != 0) {
510       argp[argc++] = "-d";
511       argp[argc++] = phc->eal_config.rte_driver_lib_path;
512   }
513
514   assert(argc <= (DPDK_EAL_ARGC * 2 + 1));
515
516   int ret = rte_eal_init(argc, argp);
517
518   if (ret < 0) {
519
520     phc->eal_initialized = 0;
521
522     DPDK_CHILD_LOG("dpdk_helper_eal_init: ERROR initializing EAL ret=%d\n",
523                    ret);
524
525     printf("dpdk_helper_eal_init: EAL arguments: ");
526     for (int i = 0; i < argc; i++) {
527       printf("%s ", argp[i]);
528     }
529     printf("\n");
530
531     return ret;
532   }
533
534   phc->eal_initialized = 1;
535
536   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (done)\n",
537                  phc->shm_name, __FUNCTION__, __LINE__);
538
539   return 0;
540 }
541
542 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid) {
543   DPDK_CHILD_TRACE(phc->shm_name);
544
545   struct timespec ts;
546   cdtime_t now = cdtime();
547   cdtime_t cmd_wait_time = MS_TO_CDTIME_T(1500) + phc->cmd_wait_time * 2;
548   ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
549
550   int ret = sem_timedwait(&phc->sema_cmd_start, &ts);
551   DPDK_CHILD_LOG("%s:%s:%d pid=%lu got sema_cmd_start (ret=%d, errno=%d)\n",
552                  phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), ret,
553                  errno);
554
555   if (phc->cmd == DPDK_CMD_QUIT) {
556     DPDK_CHILD_LOG("%s:%s:%d pid=%lu exiting\n", phc->shm_name, __FUNCTION__,
557                    __LINE__, (long)getpid());
558     exit(0);
559   } else if (ret == -1 && errno == ETIMEDOUT) {
560     if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
561       DPDK_CHILD_LOG("%s:dpdk_helper_cmd_wait: sem timedwait()"
562                      " timeout, did collectd terminate?\n",
563                      phc->shm_name);
564       dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
565     }
566   }
567 #if COLLECT_DEBUG
568   int val = 0;
569   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
570     DPDK_CHILD_LOG("%s:%s:%d pid=%lu wait sema_cmd_start (value=%d)\n",
571                    phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), val);
572 #endif
573
574   /* Parent PID change means collectd died so quit the helper process. */
575   if (ppid != getppid()) {
576     DPDK_CHILD_LOG("dpdk_helper_cmd_wait: parent PID changed, quitting.\n");
577     dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
578   }
579
580   /* Checking for DPDK primary process. */
581   if (!rte_eal_primary_proc_alive(phc->eal_config.file_prefix)) {
582     if (phc->eal_initialized) {
583       DPDK_CHILD_LOG(
584           "%s:dpdk_helper_cmd_wait: no primary alive but EAL initialized:"
585           " quitting.\n",
586           phc->shm_name);
587       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
588     }
589
590     phc->status = DPDK_HELPER_WAITING_ON_PRIMARY;
591     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_WAITING_ON_PRIMARY\n", phc->shm_name,
592                    __FUNCTION__, __LINE__);
593
594     return -1;
595   }
596
597   if (!phc->eal_initialized) {
598     int ret = dpdk_helper_eal_init(phc);
599     if (ret != 0) {
600       DPDK_CHILD_LOG("Error initializing EAL\n");
601       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
602     }
603     phc->status = DPDK_HELPER_ALIVE_SENDING_EVENTS;
604     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_ALIVE_SENDING_EVENTS\n", phc->shm_name,
605                    __FUNCTION__, __LINE__);
606     return -1;
607   }
608
609   return 0;
610 }
611
612 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc) {
613   DPDK_CHILD_TRACE(phc->shm_name);
614
615   pid_t ppid = getppid();
616
617   while (1) {
618     if (dpdk_helper_cmd_wait(phc, ppid) == 0) {
619       DPDK_CHILD_LOG("%s:%s:%d DPDK command handle (cmd=%d, pid=%lu)\n",
620                      phc->shm_name, __FUNCTION__, __LINE__, phc->cmd,
621                      (long)getpid());
622       phc->cmd_result = dpdk_helper_command_handler(phc, phc->cmd);
623     } else {
624       phc->cmd_result = -1;
625     }
626
627     /* now kick collectd to get results */
628     int err = sem_post(&phc->sema_cmd_complete);
629     DPDK_CHILD_LOG("%s:%s:%d post sema_cmd_complete (pid=%lu)\n", phc->shm_name,
630                    __FUNCTION__, __LINE__, (long)getpid());
631     if (err) {
632       char errbuf[ERR_BUF_SIZE];
633       DPDK_CHILD_LOG("dpdk_helper_worker: error posting sema_cmd_complete "
634                      "semaphore (%s)\n",
635                      sstrerror(errno, errbuf, sizeof(errbuf)));
636     }
637
638 #if COLLECT_DEBUG
639     int val = 0;
640     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
641       DPDK_CHILD_LOG("%s:%s:%d pid=%lu sema_cmd_complete (value=%d)\n",
642                      phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(),
643                      val);
644 #endif
645
646   } /* while(1) */
647
648   return 0;
649 }
650
651 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status) {
652   switch (status) {
653   case DPDK_HELPER_ALIVE_SENDING_EVENTS:
654     return "DPDK_HELPER_ALIVE_SENDING_EVENTS";
655   case DPDK_HELPER_WAITING_ON_PRIMARY:
656     return "DPDK_HELPER_WAITING_ON_PRIMARY";
657   case DPDK_HELPER_INITIALIZING:
658     return "DPDK_HELPER_INITIALIZING";
659   case DPDK_HELPER_INITIALIZING_EAL:
660     return "DPDK_HELPER_INITIALIZING_EAL";
661   case DPDK_HELPER_GRACEFUL_QUIT:
662     return "DPDK_HELPER_GRACEFUL_QUIT";
663   case DPDK_HELPER_NOT_INITIALIZED:
664     return "DPDK_HELPER_NOT_INITIALIZED";
665   default:
666     return "UNKNOWN";
667   }
668 }
669
670 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc) {
671   DEBUG("%s:%s:%d pid=%u %s", phc->shm_name, __FUNCTION__, __LINE__, getpid(),
672         dpdk_helper_status_str(phc->status));
673   char errbuf[ERR_BUF_SIZE];
674
675   if (phc->status == DPDK_HELPER_GRACEFUL_QUIT) {
676     return 0;
677   } else if (phc->status == DPDK_HELPER_NOT_INITIALIZED) {
678     phc->status = DPDK_HELPER_INITIALIZING;
679     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
680           __LINE__);
681     int err = dpdk_helper_spawn(phc);
682     if (err) {
683       ERROR("dpdkstat: error spawning helper %s",
684             sstrerror(errno, errbuf, sizeof(errbuf)));
685     }
686     return -1;
687   }
688
689   pid_t ws = waitpid(phc->pid, NULL, WNOHANG);
690   if (ws != 0) {
691     phc->status = DPDK_HELPER_INITIALIZING;
692     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
693           __LINE__);
694     int err = dpdk_helper_spawn(phc);
695     if (err) {
696       ERROR("dpdkstat: error spawning helper %s",
697             sstrerror(errno, errbuf, sizeof(errbuf)));
698     }
699     return -1;
700   }
701
702   if (phc->status == DPDK_HELPER_INITIALIZING_EAL) {
703     return -1;
704   }
705
706   return 0;
707 }
708
709 static void dpdk_helper_check_pipe(dpdk_helper_ctx_t *phc) {
710   char buf[DPDK_MAX_BUFFER_SIZE];
711   char out[DPDK_MAX_BUFFER_SIZE];
712
713   /* non blocking check on helper logging pipe */
714   struct pollfd fds = {
715       .fd = phc->pipes[0], .events = POLLIN,
716   };
717   int data_avail = poll(&fds, 1, 0);
718   DEBUG("%s:dpdk_helper_check_pipe: poll data_avail=%d", phc->shm_name, data_avail);
719   if (data_avail < 0) {
720     if (errno != EINTR || errno != EAGAIN) {
721       char errbuf[ERR_BUF_SIZE];
722       ERROR("%s: poll(2) failed: %s", phc->shm_name,
723             sstrerror(errno, errbuf, sizeof(errbuf)));
724     }
725   }
726   while (data_avail) {
727     int nbytes = read(phc->pipes[0], buf, (sizeof(buf) - 1));
728     DEBUG("%s:dpdk_helper_check_pipe: read nbytes=%d", phc->shm_name, nbytes);
729     if (nbytes <= 0)
730       break;
731     buf[nbytes] = '\n';
732     sstrncpy(out, buf, (nbytes + 1));
733     DEBUG("%s: helper process:\n%s", phc->shm_name, out);
734   }
735 }
736
737 int dpdk_helper_command(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd, int *result,
738                         cdtime_t cmd_wait_time) {
739   if (phc == NULL) {
740     ERROR("Invalid argument(phc)");
741     return -EINVAL;
742   }
743
744   DEBUG("%s:%s:%d pid=%lu, cmd=%d", phc->shm_name, __FUNCTION__, __LINE__,
745         (long)getpid(), cmd);
746
747   phc->cmd_wait_time = cmd_wait_time;
748
749   int ret = dpdk_helper_status_check(phc);
750
751   dpdk_helper_check_pipe(phc);
752
753   if (ret != 0) {
754     return ret;
755   }
756
757   DEBUG("%s: DPDK command execute (cmd=%d)", phc->shm_name, cmd);
758
759   phc->cmd_result = 0;
760   phc->cmd = cmd;
761
762   /* kick helper to process command */
763   int err = sem_post(&phc->sema_cmd_start);
764   if (err) {
765     char errbuf[ERR_BUF_SIZE];
766     ERROR("dpdk_helper_worker: error posting sema_cmd_start semaphore (%s)",
767           sstrerror(errno, errbuf, sizeof(errbuf)));
768   }
769
770 #if COLLECT_DEBUG
771   int val = 0;
772   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
773     DEBUG("%s:dpdk_helper_command: post sema_cmd_start (value=%d)",
774           phc->shm_name, val);
775 #endif
776
777   if (phc->cmd != DPDK_CMD_QUIT) {
778
779     /* wait for helper to complete processing */
780     struct timespec ts;
781     cdtime_t now = cdtime();
782
783     if (phc->status != DPDK_HELPER_ALIVE_SENDING_EVENTS) {
784       cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
785     }
786
787     ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
788     ret = sem_timedwait(&phc->sema_cmd_complete, &ts);
789     if (ret == -1 && errno == ETIMEDOUT) {
790       DPDK_HELPER_TRACE(phc->shm_name);
791       DEBUG("%s:sema_cmd_start: timeout in collectd thread: is a DPDK Primary "
792             "running?",
793             phc->shm_name);
794       return -ETIMEDOUT;
795     }
796
797 #if COLLECT_DEBUG
798     val = 0;
799     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
800       DEBUG("%s:dpdk_helper_command: wait sema_cmd_complete (value=%d)",
801             phc->shm_name, val);
802 #endif
803
804     if (result) {
805       *result = phc->cmd_result;
806     }
807   }
808
809   dpdk_helper_check_pipe(phc);
810
811   DEBUG("%s: DPDK command complete (cmd=%d, result=%d)", phc->shm_name,
812         phc->cmd, phc->cmd_result);
813
814   return 0;
815 }
816
817 uint64_t strtoull_safe(const char *str, int *err) {
818   uint64_t val = 0;
819   char *endptr;
820   int res = 0;
821
822   val = strtoull(str, &endptr, 16);
823   if (*endptr) {
824     ERROR("%s Failed to parse the value %s, endptr=%c", __FUNCTION__, str,
825           *endptr);
826     res = -EINVAL;
827   }
828   if (err != NULL)
829     *err = res;
830   return val;
831 }
832
833 uint128_t str_to_uint128(const char *str, int len) {
834   uint128_t lcore_mask;
835   int err = 0;
836
837   memset(&lcore_mask, 0, sizeof(lcore_mask));
838
839   if (len <= 2 || strncmp(str, "0x", 2) != 0) {
840     ERROR("%s Value %s should be represened in hexadecimal format",
841           __FUNCTION__, str);
842     return lcore_mask;
843   }
844   /* If str is <= 64 bit long ('0x' + 16 chars = 18 chars) then
845    * conversion is straightforward. Otherwise str is splitted into 64b long
846    * blocks */
847   if (len <= 18) {
848     lcore_mask.low = strtoull_safe(str, &err);
849     if (err)
850       return lcore_mask;
851   } else {
852     char low_str[DATA_MAX_NAME_LEN];
853     char high_str[DATA_MAX_NAME_LEN];
854
855     memset(high_str, 0, sizeof(high_str));
856     memset(low_str, 0, sizeof(low_str));
857
858     strncpy(high_str, str, len - 16);
859     strncpy(low_str, str + len - 16, 16);
860
861     lcore_mask.low = strtoull_safe(low_str, &err);
862     if (err)
863       return lcore_mask;
864
865     lcore_mask.high = strtoull_safe(high_str, &err);
866     if (err) {
867       lcore_mask.low = 0;
868       return lcore_mask;
869     }
870   }
871   return lcore_mask;
872 }
873
874 uint8_t dpdk_helper_eth_dev_count() {
875   uint8_t ports = rte_eth_dev_count();
876   if (ports == 0) {
877     ERROR(
878         "%s:%d: No DPDK ports available. Check bound devices to DPDK driver.\n",
879         __FUNCTION__, __LINE__);
880     return ports;
881   }
882
883   if (ports > RTE_MAX_ETHPORTS) {
884     ERROR("%s:%d: Number of DPDK ports (%u) is greater than "
885           "RTE_MAX_ETHPORTS=%d. Ignoring extra ports\n",
886           __FUNCTION__, __LINE__, ports, RTE_MAX_ETHPORTS);
887     ports = RTE_MAX_ETHPORTS;
888   }
889
890   return ports;
891 }