Format correction.
[collectd.git] / src / utils_dpdk.c
1 /*
2  * collectd - src/utils_dpdk.c
3  * MIT License
4  *
5  * Copyright(c) 2016 Intel Corporation. All rights reserved.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Maryam Tahhan <maryam.tahhan@intel.com>
27  *   Harry van Haaren <harry.van.haaren@intel.com>
28  *   Taras Chornyi <tarasx.chornyi@intel.com>
29  *   Serhiy Pshyk <serhiyx.pshyk@intel.com>
30  *   Krzysztof Matczak <krzysztofx.matczak@intel.com>
31  */
32
33 #include "collectd.h"
34
35 #include <poll.h>
36 #include <semaphore.h>
37 #include <sys/mman.h>
38
39 #include <rte_config.h>
40 #include <rte_eal.h>
41 #include <rte_ethdev.h>
42
43 #include "common.h"
44 #include "utils_dpdk.h"
45
46 #define DPDK_DEFAULT_RTE_CONFIG "/var/run/.rte_config"
47 #define DPDK_EAL_ARGC 10
48 // Complete trace should fit into 1024 chars
49 #define DPDK_MAX_BUFFER_SIZE 896
50 #define DPDK_CDM_DEFAULT_TIMEOUT 10000
51
52 enum DPDK_HELPER_STATUS {
53   DPDK_HELPER_NOT_INITIALIZED = 0,
54   DPDK_HELPER_INITIALIZING,
55   DPDK_HELPER_WAITING_ON_PRIMARY,
56   DPDK_HELPER_INITIALIZING_EAL,
57   DPDK_HELPER_ALIVE_SENDING_EVENTS,
58   DPDK_HELPER_GRACEFUL_QUIT,
59 };
60
61 #define DPDK_HELPER_TRACE(_name)                                               \
62   DEBUG("%s:%s:%d pid=%ld", _name, __FUNCTION__, __LINE__, (long)getpid())
63
64 struct dpdk_helper_ctx_s {
65
66   dpdk_eal_config_t eal_config;
67   int eal_initialized;
68
69   size_t shm_size;
70   char shm_name[DATA_MAX_NAME_LEN];
71
72   sem_t sema_cmd_start;
73   sem_t sema_cmd_complete;
74   cdtime_t cmd_wait_time;
75
76   pid_t pid;
77   int pipes[2];
78   int status;
79
80   int cmd;
81   int cmd_result;
82
83   char priv_data[];
84 };
85
86 static int dpdk_shm_init(const char *name, size_t size, void **map);
87 static void dpdk_shm_cleanup(const char *name, size_t size, void *map);
88
89 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc);
90 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc);
91 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc);
92 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid);
93 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
94                                     enum DPDK_HELPER_STATUS status);
95 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
96                             enum DPDK_HELPER_STATUS status);
97 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc);
98 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc);
99 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status);
100
101 static void dpdk_helper_config_default(dpdk_helper_ctx_t *phc) {
102   if (phc == NULL)
103     return;
104
105   DPDK_HELPER_TRACE(phc->shm_name);
106
107   snprintf(phc->eal_config.coremask, DATA_MAX_NAME_LEN, "%s", "0xf");
108   snprintf(phc->eal_config.memory_channels, DATA_MAX_NAME_LEN, "%s", "1");
109   snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN, "%s",
110            DPDK_DEFAULT_RTE_CONFIG);
111 }
112
113 int dpdk_helper_eal_config_set(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
114   if (phc == NULL) {
115     ERROR("Invalid argument (phc)");
116     return -EINVAL;
117   }
118
119   DPDK_HELPER_TRACE(phc->shm_name);
120
121   if (ec == NULL) {
122     ERROR("Invalid argument (ec)");
123     return -EINVAL;
124   }
125
126   memcpy(&phc->eal_config, ec, sizeof(phc->eal_config));
127
128   return 0;
129 }
130
131 int dpdk_helper_eal_config_get(dpdk_helper_ctx_t *phc, dpdk_eal_config_t *ec) {
132   if (phc == NULL) {
133     ERROR("Invalid argument (phc)");
134     return -EINVAL;
135   }
136
137   DPDK_HELPER_TRACE(phc->shm_name);
138
139   if (ec == NULL) {
140     ERROR("Invalid argument (ec)");
141     return -EINVAL;
142   }
143
144   memcpy(ec, &phc->eal_config, sizeof(*ec));
145
146   return 0;
147 }
148
149 int dpdk_helper_eal_config_parse(dpdk_helper_ctx_t *phc, oconfig_item_t *ci) {
150   DPDK_HELPER_TRACE(phc->shm_name);
151
152   if (phc == NULL) {
153     ERROR("Invalid argument (phc)");
154     return -EINVAL;
155   }
156
157   if (ci == NULL) {
158     ERROR("Invalid argument (ci)");
159     return -EINVAL;
160   }
161
162   int status = 0;
163   for (int i = 0; i < ci->children_num; i++) {
164     oconfig_item_t *child = ci->children + i;
165
166     if (strcasecmp("Coremask", child->key) == 0) {
167       status = cf_util_get_string_buffer(child, phc->eal_config.coremask,
168                                          sizeof(phc->eal_config.coremask));
169       DEBUG("dpdk_common: EAL:Coremask %s", phc->eal_config.coremask);
170     } else if (strcasecmp("MemoryChannels", child->key) == 0) {
171       status =
172           cf_util_get_string_buffer(child, phc->eal_config.memory_channels,
173                                     sizeof(phc->eal_config.memory_channels));
174       DEBUG("dpdk_common: EAL:Memory Channels %s",
175             phc->eal_config.memory_channels);
176     } else if (strcasecmp("SocketMemory", child->key) == 0) {
177       status = cf_util_get_string_buffer(child, phc->eal_config.socket_memory,
178                                          sizeof(phc->eal_config.socket_memory));
179       DEBUG("dpdk_common: EAL:Socket memory %s", phc->eal_config.socket_memory);
180     } else if (strcasecmp("FilePrefix", child->key) == 0) {
181       char prefix[DATA_MAX_NAME_LEN];
182
183       status = cf_util_get_string_buffer(child, prefix, sizeof(prefix));
184       if (status == 0) {
185         snprintf(phc->eal_config.file_prefix, DATA_MAX_NAME_LEN,
186                  "/var/run/.%s_config", prefix);
187         DEBUG("dpdk_common: EAL:File prefix %s", phc->eal_config.file_prefix);
188       }
189     } else if (strcasecmp("LogLevel", child->key) == 0) {
190       status = cf_util_get_string_buffer(child, phc->eal_config.log_level,
191                                          sizeof(phc->eal_config.log_level));
192       DEBUG("dpdk_common: EAL:LogLevel %s", phc->eal_config.log_level);
193     } else if (strcasecmp("RteDriverLibPath", child->key) == 0) {
194       status = cf_util_get_string_buffer(
195           child, phc->eal_config.rte_driver_lib_path,
196           sizeof(phc->eal_config.rte_driver_lib_path));
197       DEBUG("dpdk_common: EAL:RteDriverLibPath %s",
198             phc->eal_config.rte_driver_lib_path);
199     } else {
200       ERROR("dpdk_common: Invalid '%s' configuration option", child->key);
201       status = -EINVAL;
202     }
203
204     if (status != 0) {
205       ERROR("dpdk_common: Parsing EAL configuration failed");
206       break;
207     }
208   }
209
210   return status;
211 }
212
213 static int dpdk_shm_init(const char *name, size_t size, void **map) {
214   DPDK_HELPER_TRACE(name);
215
216   char errbuf[ERR_BUF_SIZE];
217
218   int fd = shm_open(name, O_CREAT | O_TRUNC | O_RDWR, 0666);
219   if (fd < 0) {
220     WARNING("dpdk_shm_init: Failed to open %s as SHM:%s", name,
221             sstrerror(errno, errbuf, sizeof(errbuf)));
222     *map = NULL;
223     return -1;
224   }
225
226   int ret = ftruncate(fd, size);
227   if (ret != 0) {
228     WARNING("dpdk_shm_init: Failed to resize SHM:%s",
229             sstrerror(errno, errbuf, sizeof(errbuf)));
230     close(fd);
231     *map = NULL;
232     dpdk_shm_cleanup(name, size, NULL);
233     return -1;
234   }
235
236   *map = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
237   if (*map == MAP_FAILED) {
238     WARNING("dpdk_shm_init:Failed to mmap SHM:%s",
239             sstrerror(errno, errbuf, sizeof(errbuf)));
240     close(fd);
241     *map = NULL;
242     dpdk_shm_cleanup(name, size, NULL);
243     return -1;
244   }
245   /* File descriptor no longer needed for shared memory operations */
246   close(fd);
247   memset(*map, 0, size);
248
249   return 0;
250 }
251
252 static void dpdk_shm_cleanup(const char *name, size_t size, void *map) {
253   DPDK_HELPER_TRACE(name);
254   char errbuf[ERR_BUF_SIZE];
255
256   /*
257    * Call shm_unlink first, as 'name' might be no longer accessible after munmap
258    */
259   if (shm_unlink(name))
260     ERROR("shm_unlink failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
261
262   if (map != NULL) {
263     if (munmap(map, size))
264       ERROR("munmap failure %s", sstrerror(errno, errbuf, sizeof(errbuf)));
265   }
266 }
267
268 void *dpdk_helper_priv_get(dpdk_helper_ctx_t *phc) {
269   if (phc)
270     return phc->priv_data;
271
272   return NULL;
273 }
274
275 int dpdk_helper_data_size_get(dpdk_helper_ctx_t *phc) {
276   if (phc == NULL) {
277     DPDK_CHILD_LOG("Invalid argument(phc)\n");
278     return -EINVAL;
279   }
280
281   return phc->shm_size - sizeof(dpdk_helper_ctx_t);
282 }
283
284 int dpdk_helper_init(const char *name, size_t data_size,
285                      dpdk_helper_ctx_t **pphc) {
286   dpdk_helper_ctx_t *phc = NULL;
287   size_t shm_size = sizeof(dpdk_helper_ctx_t) + data_size;
288   char errbuf[ERR_BUF_SIZE];
289
290   if (pphc == NULL) {
291     ERROR("%s:Invalid argument(pphc)", __FUNCTION__);
292     return -EINVAL;
293   }
294
295   if (name == NULL) {
296     ERROR("%s:Invalid argument(name)", __FUNCTION__);
297     return -EINVAL;
298   }
299
300   DPDK_HELPER_TRACE(name);
301
302   /* Allocate dpdk_helper_ctx_t and
303   * initialize a POSIX SHared Memory (SHM) object.
304   */
305   int err = dpdk_shm_init(name, shm_size, (void **)&phc);
306   if (err != 0) {
307     return -errno;
308   }
309
310   err = sem_init(&phc->sema_cmd_start, 1, 0);
311   if (err != 0) {
312     ERROR("sema_cmd_start semaphore init failed: %s",
313           sstrerror(errno, errbuf, sizeof(errbuf)));
314     int errno_m = errno;
315     dpdk_shm_cleanup(name, shm_size, (void *)phc);
316     return -errno_m;
317   }
318
319   err = sem_init(&phc->sema_cmd_complete, 1, 0);
320   if (err != 0) {
321     ERROR("sema_cmd_complete semaphore init failed: %s",
322           sstrerror(errno, errbuf, sizeof(errbuf)));
323     sem_destroy(&phc->sema_cmd_start);
324     int errno_m = errno;
325     dpdk_shm_cleanup(name, shm_size, (void *)phc);
326     return -errno_m;
327   }
328
329   phc->shm_size = shm_size;
330   sstrncpy(phc->shm_name, name, sizeof(phc->shm_name));
331
332   dpdk_helper_config_default(phc);
333
334   *pphc = phc;
335
336   return 0;
337 }
338
339 void dpdk_helper_shutdown(dpdk_helper_ctx_t *phc) {
340   if (phc == NULL)
341     return;
342
343   DPDK_HELPER_TRACE(phc->shm_name);
344
345   close(phc->pipes[1]);
346
347   if (phc->status != DPDK_HELPER_NOT_INITIALIZED) {
348     dpdk_helper_exit_command(phc, DPDK_HELPER_GRACEFUL_QUIT);
349   }
350
351   sem_destroy(&phc->sema_cmd_start);
352   sem_destroy(&phc->sema_cmd_complete);
353   dpdk_shm_cleanup(phc->shm_name, phc->shm_size, (void *)phc);
354 }
355
356 static int dpdk_helper_spawn(dpdk_helper_ctx_t *phc) {
357   char errbuf[ERR_BUF_SIZE];
358   if (phc == NULL) {
359     ERROR("Invalid argument(phc)");
360     return -EINVAL;
361   }
362
363   DPDK_HELPER_TRACE(phc->shm_name);
364
365   phc->eal_initialized = 0;
366   phc->cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
367
368   /*
369    * Create a pipe for helper stdout back to collectd. This is necessary for
370    * logging EAL failures, as rte_eal_init() calls rte_panic().
371    */
372   if (phc->pipes[1]) {
373     DEBUG("dpdk_helper_spawn: collectd closing helper pipe %d", phc->pipes[1]);
374   } else {
375     DEBUG("dpdk_helper_spawn: collectd helper pipe %d, not closing",
376           phc->pipes[1]);
377   }
378
379   if (pipe(phc->pipes) != 0) {
380     DEBUG("dpdk_helper_spawn: Could not create helper pipe: %s",
381           sstrerror(errno, errbuf, sizeof(errbuf)));
382     return -1;
383   }
384
385   int pipe0_flags = fcntl(phc->pipes[0], F_GETFL, 0);
386   int pipe1_flags = fcntl(phc->pipes[1], F_GETFL, 0);
387   if (pipe0_flags == -1 || pipe1_flags == -1) {
388     WARNING("dpdk_helper_spawn: error setting up pipe flags: %s",
389             sstrerror(errno, errbuf, sizeof(errbuf)));
390   }
391   int pipe0_err = fcntl(phc->pipes[0], F_SETFL, pipe1_flags | O_NONBLOCK);
392   int pipe1_err = fcntl(phc->pipes[1], F_SETFL, pipe0_flags | O_NONBLOCK);
393   if (pipe0_err == -1 || pipe1_err == -1) {
394     WARNING("dpdk_helper_spawn: error setting up pipes: %s",
395             sstrerror(errno, errbuf, sizeof(errbuf)));
396   }
397
398   pid_t pid = fork();
399   if (pid > 0) {
400     phc->pid = pid;
401     close(phc->pipes[1]);
402     DEBUG("%s:dpdk_helper_spawn: helper pid %lu", phc->shm_name,
403           (long)phc->pid);
404   } else if (pid == 0) {
405     /* Replace stdout with a pipe to collectd. */
406     close(phc->pipes[0]);
407     close(STDOUT_FILENO);
408     dup2(phc->pipes[1], STDOUT_FILENO);
409     DPDK_CHILD_TRACE(phc->shm_name);
410     dpdk_helper_worker(phc);
411     exit(0);
412   } else {
413     ERROR("dpdk_helper_start: Failed to fork helper process: %s",
414           sstrerror(errno, errbuf, sizeof(errbuf)));
415     return -1;
416   }
417
418   return 0;
419 }
420
421 static int dpdk_helper_exit(dpdk_helper_ctx_t *phc,
422                             enum DPDK_HELPER_STATUS status) {
423   DPDK_CHILD_LOG("%s:%s:%d %s\n", phc->shm_name, __FUNCTION__, __LINE__,
424                  dpdk_helper_status_str(status));
425
426   close(phc->pipes[1]);
427
428   phc->status = status;
429
430   exit(0);
431
432   return 0;
433 }
434
435 static int dpdk_helper_exit_command(dpdk_helper_ctx_t *phc,
436                                     enum DPDK_HELPER_STATUS status) {
437   char errbuf[ERR_BUF_SIZE];
438   DPDK_HELPER_TRACE(phc->shm_name);
439
440   close(phc->pipes[1]);
441
442   if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
443     phc->status = status;
444     DEBUG("%s:%s:%d %s", phc->shm_name, __FUNCTION__, __LINE__,
445           dpdk_helper_status_str(status));
446
447     int ret = dpdk_helper_command(phc, DPDK_CMD_QUIT, NULL, 0);
448     if (ret != 0) {
449       DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
450             __LINE__, (long)phc->pid);
451
452       int err = kill(phc->pid, SIGKILL);
453       if (err) {
454         ERROR("%s error sending kill to helper: %s", __FUNCTION__,
455               sstrerror(errno, errbuf, sizeof(errbuf)));
456       }
457     }
458   } else {
459
460     DEBUG("%s:%s:%d kill helper (pid=%lu)", phc->shm_name, __FUNCTION__,
461           __LINE__, (long)phc->pid);
462
463     int err = kill(phc->pid, SIGKILL);
464     if (err) {
465       ERROR("%s error sending kill to helper: %s", __FUNCTION__,
466             sstrerror(errno, errbuf, sizeof(errbuf)));
467     }
468   }
469
470   return 0;
471 }
472
473 static int dpdk_helper_eal_init(dpdk_helper_ctx_t *phc) {
474   phc->status = DPDK_HELPER_INITIALIZING_EAL;
475   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (start)\n",
476                  phc->shm_name, __FUNCTION__, __LINE__);
477
478   char *argp[DPDK_EAL_ARGC * 2 + 1];
479   int argc = 0;
480
481   /* EAL config must be initialized */
482   assert(phc->eal_config.coremask[0] != 0);
483   assert(phc->eal_config.memory_channels[0] != 0);
484   assert(phc->eal_config.file_prefix[0] != 0);
485
486   argp[argc++] = "collectd-dpdk";
487
488   argp[argc++] = "-c";
489   argp[argc++] = phc->eal_config.coremask;
490
491   argp[argc++] = "-n";
492   argp[argc++] = phc->eal_config.memory_channels;
493
494   if (strcasecmp(phc->eal_config.socket_memory, "") != 0) {
495     argp[argc++] = "--socket-mem";
496     argp[argc++] = phc->eal_config.socket_memory;
497   }
498
499   if (strcasecmp(phc->eal_config.file_prefix, DPDK_DEFAULT_RTE_CONFIG) != 0) {
500     argp[argc++] = "--file-prefix";
501     argp[argc++] = phc->eal_config.file_prefix;
502   }
503
504   argp[argc++] = "--proc-type";
505   argp[argc++] = "secondary";
506
507   if (strcasecmp(phc->eal_config.log_level, "") != 0) {
508     argp[argc++] = "--log-level";
509     argp[argc++] = phc->eal_config.log_level;
510   }
511   if (strcasecmp(phc->eal_config.rte_driver_lib_path, "") != 0) {
512     argp[argc++] = "-d";
513     argp[argc++] = phc->eal_config.rte_driver_lib_path;
514   }
515
516   assert(argc <= (DPDK_EAL_ARGC * 2 + 1));
517
518   int ret = rte_eal_init(argc, argp);
519
520   if (ret < 0) {
521
522     phc->eal_initialized = 0;
523
524     DPDK_CHILD_LOG("dpdk_helper_eal_init: ERROR initializing EAL ret=%d\n",
525                    ret);
526
527     printf("dpdk_helper_eal_init: EAL arguments: ");
528     for (int i = 0; i < argc; i++) {
529       printf("%s ", argp[i]);
530     }
531     printf("\n");
532
533     return ret;
534   }
535
536   phc->eal_initialized = 1;
537
538   DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_INITIALIZING_EAL (done)\n",
539                  phc->shm_name, __FUNCTION__, __LINE__);
540
541   return 0;
542 }
543
544 static int dpdk_helper_cmd_wait(dpdk_helper_ctx_t *phc, pid_t ppid) {
545   DPDK_CHILD_TRACE(phc->shm_name);
546
547   struct timespec ts;
548   cdtime_t now = cdtime();
549   cdtime_t cmd_wait_time = MS_TO_CDTIME_T(1500) + phc->cmd_wait_time * 2;
550   ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
551
552   int ret = sem_timedwait(&phc->sema_cmd_start, &ts);
553   DPDK_CHILD_LOG("%s:%s:%d pid=%lu got sema_cmd_start (ret=%d, errno=%d)\n",
554                  phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), ret,
555                  errno);
556
557   if (phc->cmd == DPDK_CMD_QUIT) {
558     DPDK_CHILD_LOG("%s:%s:%d pid=%lu exiting\n", phc->shm_name, __FUNCTION__,
559                    __LINE__, (long)getpid());
560     exit(0);
561   } else if (ret == -1 && errno == ETIMEDOUT) {
562     if (phc->status == DPDK_HELPER_ALIVE_SENDING_EVENTS) {
563       DPDK_CHILD_LOG("%s:dpdk_helper_cmd_wait: sem timedwait()"
564                      " timeout, did collectd terminate?\n",
565                      phc->shm_name);
566       dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
567     }
568   }
569 #if COLLECT_DEBUG
570   int val = 0;
571   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
572     DPDK_CHILD_LOG("%s:%s:%d pid=%lu wait sema_cmd_start (value=%d)\n",
573                    phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(), val);
574 #endif
575
576   /* Parent PID change means collectd died so quit the helper process. */
577   if (ppid != getppid()) {
578     DPDK_CHILD_LOG("dpdk_helper_cmd_wait: parent PID changed, quitting.\n");
579     dpdk_helper_exit(phc, DPDK_HELPER_GRACEFUL_QUIT);
580   }
581
582   /* Checking for DPDK primary process. */
583   if (!rte_eal_primary_proc_alive(phc->eal_config.file_prefix)) {
584     if (phc->eal_initialized) {
585       DPDK_CHILD_LOG(
586           "%s:dpdk_helper_cmd_wait: no primary alive but EAL initialized:"
587           " quitting.\n",
588           phc->shm_name);
589       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
590     }
591
592     phc->status = DPDK_HELPER_WAITING_ON_PRIMARY;
593     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_WAITING_ON_PRIMARY\n", phc->shm_name,
594                    __FUNCTION__, __LINE__);
595
596     return -1;
597   }
598
599   if (!phc->eal_initialized) {
600     int ret = dpdk_helper_eal_init(phc);
601     if (ret != 0) {
602       DPDK_CHILD_LOG("Error initializing EAL\n");
603       dpdk_helper_exit(phc, DPDK_HELPER_NOT_INITIALIZED);
604     }
605     phc->status = DPDK_HELPER_ALIVE_SENDING_EVENTS;
606     DPDK_CHILD_LOG("%s:%s:%d DPDK_HELPER_ALIVE_SENDING_EVENTS\n", phc->shm_name,
607                    __FUNCTION__, __LINE__);
608     return -1;
609   }
610
611   return 0;
612 }
613
614 static int dpdk_helper_worker(dpdk_helper_ctx_t *phc) {
615   DPDK_CHILD_TRACE(phc->shm_name);
616
617   pid_t ppid = getppid();
618
619   while (1) {
620     if (dpdk_helper_cmd_wait(phc, ppid) == 0) {
621       DPDK_CHILD_LOG("%s:%s:%d DPDK command handle (cmd=%d, pid=%lu)\n",
622                      phc->shm_name, __FUNCTION__, __LINE__, phc->cmd,
623                      (long)getpid());
624       phc->cmd_result = dpdk_helper_command_handler(phc, phc->cmd);
625     } else {
626       phc->cmd_result = -1;
627     }
628
629     /* now kick collectd to get results */
630     int err = sem_post(&phc->sema_cmd_complete);
631     DPDK_CHILD_LOG("%s:%s:%d post sema_cmd_complete (pid=%lu)\n", phc->shm_name,
632                    __FUNCTION__, __LINE__, (long)getpid());
633     if (err) {
634       char errbuf[ERR_BUF_SIZE];
635       DPDK_CHILD_LOG("dpdk_helper_worker: error posting sema_cmd_complete "
636                      "semaphore (%s)\n",
637                      sstrerror(errno, errbuf, sizeof(errbuf)));
638     }
639
640 #if COLLECT_DEBUG
641     int val = 0;
642     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
643       DPDK_CHILD_LOG("%s:%s:%d pid=%lu sema_cmd_complete (value=%d)\n",
644                      phc->shm_name, __FUNCTION__, __LINE__, (long)getpid(),
645                      val);
646 #endif
647
648   } /* while(1) */
649
650   return 0;
651 }
652
653 static const char *dpdk_helper_status_str(enum DPDK_HELPER_STATUS status) {
654   switch (status) {
655   case DPDK_HELPER_ALIVE_SENDING_EVENTS:
656     return "DPDK_HELPER_ALIVE_SENDING_EVENTS";
657   case DPDK_HELPER_WAITING_ON_PRIMARY:
658     return "DPDK_HELPER_WAITING_ON_PRIMARY";
659   case DPDK_HELPER_INITIALIZING:
660     return "DPDK_HELPER_INITIALIZING";
661   case DPDK_HELPER_INITIALIZING_EAL:
662     return "DPDK_HELPER_INITIALIZING_EAL";
663   case DPDK_HELPER_GRACEFUL_QUIT:
664     return "DPDK_HELPER_GRACEFUL_QUIT";
665   case DPDK_HELPER_NOT_INITIALIZED:
666     return "DPDK_HELPER_NOT_INITIALIZED";
667   default:
668     return "UNKNOWN";
669   }
670 }
671
672 static int dpdk_helper_status_check(dpdk_helper_ctx_t *phc) {
673   DEBUG("%s:%s:%d pid=%u %s", phc->shm_name, __FUNCTION__, __LINE__, getpid(),
674         dpdk_helper_status_str(phc->status));
675   char errbuf[ERR_BUF_SIZE];
676
677   if (phc->status == DPDK_HELPER_GRACEFUL_QUIT) {
678     return 0;
679   } else if (phc->status == DPDK_HELPER_NOT_INITIALIZED) {
680     phc->status = DPDK_HELPER_INITIALIZING;
681     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
682           __LINE__);
683     int err = dpdk_helper_spawn(phc);
684     if (err) {
685       ERROR("dpdkstat: error spawning helper %s",
686             sstrerror(errno, errbuf, sizeof(errbuf)));
687     }
688     return -1;
689   }
690
691   pid_t ws = waitpid(phc->pid, NULL, WNOHANG);
692   if (ws != 0) {
693     phc->status = DPDK_HELPER_INITIALIZING;
694     DEBUG("%s:%s:%d DPDK_HELPER_INITIALIZING", phc->shm_name, __FUNCTION__,
695           __LINE__);
696     int err = dpdk_helper_spawn(phc);
697     if (err) {
698       ERROR("dpdkstat: error spawning helper %s",
699             sstrerror(errno, errbuf, sizeof(errbuf)));
700     }
701     return -1;
702   }
703
704   if (phc->status == DPDK_HELPER_INITIALIZING_EAL) {
705     return -1;
706   }
707
708   return 0;
709 }
710
711 static void dpdk_helper_check_pipe(dpdk_helper_ctx_t *phc) {
712   char buf[DPDK_MAX_BUFFER_SIZE];
713   char out[DPDK_MAX_BUFFER_SIZE];
714
715   /* non blocking check on helper logging pipe */
716   struct pollfd fds = {
717       .fd = phc->pipes[0], .events = POLLIN,
718   };
719   int data_avail = poll(&fds, 1, 0);
720   DEBUG("%s:dpdk_helper_check_pipe: poll data_avail=%d", phc->shm_name,
721         data_avail);
722   if (data_avail < 0) {
723     if (errno != EINTR || errno != EAGAIN) {
724       char errbuf[ERR_BUF_SIZE];
725       ERROR("%s: poll(2) failed: %s", phc->shm_name,
726             sstrerror(errno, errbuf, sizeof(errbuf)));
727     }
728   }
729   while (data_avail) {
730     int nbytes = read(phc->pipes[0], buf, (sizeof(buf) - 1));
731     DEBUG("%s:dpdk_helper_check_pipe: read nbytes=%d", phc->shm_name, nbytes);
732     if (nbytes <= 0)
733       break;
734     buf[nbytes] = '\n';
735     sstrncpy(out, buf, (nbytes + 1));
736     DEBUG("%s: helper process:\n%s", phc->shm_name, out);
737   }
738 }
739
740 int dpdk_helper_command(dpdk_helper_ctx_t *phc, enum DPDK_CMD cmd, int *result,
741                         cdtime_t cmd_wait_time) {
742   if (phc == NULL) {
743     ERROR("Invalid argument(phc)");
744     return -EINVAL;
745   }
746
747   DEBUG("%s:%s:%d pid=%lu, cmd=%d", phc->shm_name, __FUNCTION__, __LINE__,
748         (long)getpid(), cmd);
749
750   phc->cmd_wait_time = cmd_wait_time;
751
752   int ret = dpdk_helper_status_check(phc);
753
754   dpdk_helper_check_pipe(phc);
755
756   if (ret != 0) {
757     return ret;
758   }
759
760   DEBUG("%s: DPDK command execute (cmd=%d)", phc->shm_name, cmd);
761
762   phc->cmd_result = 0;
763   phc->cmd = cmd;
764
765   /* kick helper to process command */
766   int err = sem_post(&phc->sema_cmd_start);
767   if (err) {
768     char errbuf[ERR_BUF_SIZE];
769     ERROR("dpdk_helper_worker: error posting sema_cmd_start semaphore (%s)",
770           sstrerror(errno, errbuf, sizeof(errbuf)));
771   }
772
773 #if COLLECT_DEBUG
774   int val = 0;
775   if (sem_getvalue(&phc->sema_cmd_start, &val) == 0)
776     DEBUG("%s:dpdk_helper_command: post sema_cmd_start (value=%d)",
777           phc->shm_name, val);
778 #endif
779
780   if (phc->cmd != DPDK_CMD_QUIT) {
781
782     /* wait for helper to complete processing */
783     struct timespec ts;
784     cdtime_t now = cdtime();
785
786     if (phc->status != DPDK_HELPER_ALIVE_SENDING_EVENTS) {
787       cmd_wait_time = MS_TO_CDTIME_T(DPDK_CDM_DEFAULT_TIMEOUT);
788     }
789
790     ts = CDTIME_T_TO_TIMESPEC(now + cmd_wait_time);
791     ret = sem_timedwait(&phc->sema_cmd_complete, &ts);
792     if (ret == -1 && errno == ETIMEDOUT) {
793       DPDK_HELPER_TRACE(phc->shm_name);
794       DEBUG("%s:sema_cmd_start: timeout in collectd thread: is a DPDK Primary "
795             "running?",
796             phc->shm_name);
797       return -ETIMEDOUT;
798     }
799
800 #if COLLECT_DEBUG
801     val = 0;
802     if (sem_getvalue(&phc->sema_cmd_complete, &val) == 0)
803       DEBUG("%s:dpdk_helper_command: wait sema_cmd_complete (value=%d)",
804             phc->shm_name, val);
805 #endif
806
807     if (result) {
808       *result = phc->cmd_result;
809     }
810   }
811
812   dpdk_helper_check_pipe(phc);
813
814   DEBUG("%s: DPDK command complete (cmd=%d, result=%d)", phc->shm_name,
815         phc->cmd, phc->cmd_result);
816
817   return 0;
818 }
819
820 uint64_t strtoull_safe(const char *str, int *err) {
821   uint64_t val = 0;
822   char *endptr;
823   int res = 0;
824
825   val = strtoull(str, &endptr, 16);
826   if (*endptr) {
827     ERROR("%s Failed to parse the value %s, endptr=%c", __FUNCTION__, str,
828           *endptr);
829     res = -EINVAL;
830   }
831   if (err != NULL)
832     *err = res;
833   return val;
834 }
835
836 uint128_t str_to_uint128(const char *str, int len) {
837   uint128_t lcore_mask;
838   int err = 0;
839
840   memset(&lcore_mask, 0, sizeof(lcore_mask));
841
842   if (len <= 2 || strncmp(str, "0x", 2) != 0) {
843     ERROR("%s Value %s should be represened in hexadecimal format",
844           __FUNCTION__, str);
845     return lcore_mask;
846   }
847   /* If str is <= 64 bit long ('0x' + 16 chars = 18 chars) then
848    * conversion is straightforward. Otherwise str is splitted into 64b long
849    * blocks */
850   if (len <= 18) {
851     lcore_mask.low = strtoull_safe(str, &err);
852     if (err)
853       return lcore_mask;
854   } else {
855     char low_str[DATA_MAX_NAME_LEN];
856     char high_str[DATA_MAX_NAME_LEN];
857
858     memset(high_str, 0, sizeof(high_str));
859     memset(low_str, 0, sizeof(low_str));
860
861     strncpy(high_str, str, len - 16);
862     strncpy(low_str, str + len - 16, 16);
863
864     lcore_mask.low = strtoull_safe(low_str, &err);
865     if (err)
866       return lcore_mask;
867
868     lcore_mask.high = strtoull_safe(high_str, &err);
869     if (err) {
870       lcore_mask.low = 0;
871       return lcore_mask;
872     }
873   }
874   return lcore_mask;
875 }
876
877 uint8_t dpdk_helper_eth_dev_count() {
878   uint8_t ports = rte_eth_dev_count();
879   if (ports == 0) {
880     ERROR(
881         "%s:%d: No DPDK ports available. Check bound devices to DPDK driver.\n",
882         __FUNCTION__, __LINE__);
883     return ports;
884   }
885
886   if (ports > RTE_MAX_ETHPORTS) {
887     ERROR("%s:%d: Number of DPDK ports (%u) is greater than "
888           "RTE_MAX_ETHPORTS=%d. Ignoring extra ports\n",
889           __FUNCTION__, __LINE__, ports, RTE_MAX_ETHPORTS);
890     ports = RTE_MAX_ETHPORTS;
891   }
892
893   return ports;
894 }