dpdkstat: refactored pipe error checking
[collectd.git] / src / dpdkstat.c
index e451990..e5aafde 100644 (file)
 #include "plugin.h" /* plugin_register_*, plugin_dispatch_values */
 #include "utils_time.h"
 
-#include <stdio.h>
-#include <string.h>
-#include <stdint.h>
-#include <signal.h>
-#include <stdarg.h>
-#include <stdlib.h>
-#include <errno.h>
 #include <getopt.h>
-#include <inttypes.h>
-#include <fcntl.h>
 #include <semaphore.h>
 #include <sys/mman.h>
 #include <sys/queue.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/time.h>
-#include <sys/wait.h>
 #include <poll.h>
-#include <unistd.h>
-#include <string.h>
 
 #include <rte_config.h>
 #include <rte_eal.h>
@@ -71,8 +56,6 @@
 #include <rte_branch_prediction.h>
 #include <rte_string_fns.h>
 
-
-#define DATA_MAX_NAME_LEN        64
 #define DPDKSTAT_MAX_BUFFER_SIZE (4096*4)
 #define DPDK_SHM_NAME "dpdk_collectd_stats_shm"
 #define REINIT_SHM 1
@@ -122,22 +105,21 @@ struct dpdk_config_s {
 };
 typedef struct dpdk_config_s dpdk_config_t;
 
-static int g_configured = 0;
-static dpdk_config_t *g_configuration = 0;
+static int g_configured;
+static dpdk_config_t *g_configuration;
 
-static int dpdk_config_init_default(void);
+static void dpdk_config_init_default(void);
 static int dpdk_config(oconfig_item_t *ci);
 static int dpdk_helper_init_eal(void);
 static int dpdk_helper_run(void);
 static int dpdk_helper_spawn(enum DPDK_HELPER_ACTION action);
-static int dpdk_init (void);
+static int dpdk_init(void);
 static int dpdk_read(user_data_t *ud);
 static int dpdk_shm_cleanup(void);
 static int dpdk_shm_init(size_t size);
-void module_register(void);
 
 /* Write the default configuration to the g_configuration instances */
-static int dpdk_config_init_default(void)
+static void dpdk_config_init_default(void)
 {
     g_configuration->interval = plugin_get_interval();
     WARNING("dpdkstat: No time interval was configured, default value %lu ms is set\n",
@@ -150,22 +132,22 @@ static int dpdk_config_init_default(void)
     ssnprintf(g_configuration->process_type, DATA_MAX_NAME_LEN, "%s", "secondary");
     ssnprintf(g_configuration->file_prefix, DATA_MAX_NAME_LEN, "%s",
              "/var/run/.rte_config");
-  return 0;
 }
 
 static int dpdk_config(oconfig_item_t *ci)
 {
-  int i = 0, ret = 0;
+  int i = 0;
 
   /* Initialize a POSIX SHared Memory (SHM) object. */
-  dpdk_shm_init(sizeof(dpdk_config_t));
-
-  /* Set defaults for config, overwritten by loop if config item exists */
-  ret = dpdk_config_init_default();
-  if(ret != 0) {
+  int err = dpdk_shm_init(sizeof(dpdk_config_t));
+  if (err) {
+    DEBUG("dpdkstat: error in shm_init, %s", strerror(errno));
     return -1;
   }
 
+  /* Set defaults for config, overwritten by loop if config item exists */
+  dpdk_config_init_default();
+
   for (i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
 
@@ -236,8 +218,7 @@ static int dpdk_shm_init(size_t size)
     goto fail_close;
   }
   /* Map the shared memory object into this process' virtual address space. */
-  g_configuration = (dpdk_config_t *)
-                    mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+  g_configuration = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
   if (g_configuration == MAP_FAILED) {
     WARNING("dpdkstat:Failed to mmap SHM:%s\n", strerror(errno));
     goto fail_close;
@@ -252,8 +233,17 @@ static int dpdk_shm_init(size_t size)
   memset(g_configuration, 0, size);
 
   /* Initialize the semaphores for SHM use */
-  sem_init(&g_configuration->sema_helper_get_stats, 1, 0);
-  sem_init(&g_configuration->sema_stats_in_shm, 1, 0);
+  int err = sem_init(&g_configuration->sema_helper_get_stats, 1, 0);
+  if(err) {
+    ERROR("dpdkstat semaphore init failed: %s\n", strerror(errno));
+    goto fail_close;
+  }
+  err = sem_init(&g_configuration->sema_stats_in_shm, 1, 0);
+  if(err) {
+    ERROR("dpdkstat semaphore init failed: %s\n", strerror(errno));
+    goto fail_close;
+  }
+
   return 0;
 
 fail_close:
@@ -269,7 +259,7 @@ fail:
 static int dpdk_re_init_shm()
 {
   dpdk_config_t temp_config;
-  memcpy(&temp_config,g_configuration, sizeof(dpdk_config_t));
+  memcpy(&temp_config, g_configuration, sizeof(dpdk_config_t));
   DEBUG("dpdkstat: %s: ports %d, xstats %d\n", __func__, temp_config.num_ports,
         temp_config.num_xstats);
 
@@ -289,7 +279,7 @@ static int dpdk_re_init_shm()
   if(!g_configured)
     dpdk_config_init_default();
 
-  memcpy(g_configuration,&temp_config, sizeof(dpdk_config_t));
+  memcpy(g_configuration, &temp_config, sizeof(dpdk_config_t));
   g_configuration->collectd_reinit_shm = 0;
 
   return 0;
@@ -297,17 +287,13 @@ static int dpdk_re_init_shm()
 
 static int dpdk_init (void)
 {
-  int ret = 0;
   int err = dpdk_shm_init(sizeof(dpdk_config_t));
   if (err)
     ERROR("dpdkstat: %s : error %d in shm_init()", __func__, err);
 
   /* If the XML config() function has been run, dont re-initialize defaults */
   if(!g_configured) {
-    ret = dpdk_config_init_default();
-    if (ret != 0) {
-      return -1;
-    }
+    dpdk_config_init_default();
   }
 
   plugin_register_complex_read (NULL, "dpdkstat", dpdk_read,
@@ -318,17 +304,21 @@ static int dpdk_init (void)
 static int dpdk_helper_exit(int reset)
 {
   g_configuration->helper_status = DPDK_HELPER_GRACEFUL_QUIT;
-  if(reset) {
+  if (reset) {
     g_configuration->eal_initialized = 0;
     g_configuration->num_ports = 0;
     memset(&g_configuration->xstats, 0, g_configuration->num_xstats* sizeof(struct rte_eth_xstats));
     g_configuration->num_xstats = 0;
-    int i =0;
-    for(;i < RTE_MAX_ETHPORTS; i++)
+    int i = 0;
+    for (; i < RTE_MAX_ETHPORTS; i++)
       g_configuration->num_stats_in_port[i] = 0;
   }
   close(g_configuration->helper_pipes[1]);
-  kill(g_configuration->helper_pid, SIGKILL);
+  int err = kill(g_configuration->helper_pid, SIGKILL);
+  if (err) {
+    ERROR("dpdkstat: error sending kill to helper: %s\n", strerror(errno));
+  }
+
   return 0;
 }
 
@@ -340,7 +330,7 @@ static int dpdk_helper_spawn(enum DPDK_HELPER_ACTION action)
    * Create a pipe for helper stdout back to collectd. This is necessary for
    * logging EAL failures, as rte_eal_init() calls rte_panic().
    */
-  if(g_configuration->helper_pipes[1]) {
+  if (g_configuration->helper_pipes[1]) {
     DEBUG("dpdkstat: collectd closing helper pipe %d\n",
           g_configuration->helper_pipes[1]);
   } else {
@@ -352,10 +342,18 @@ static int dpdk_helper_spawn(enum DPDK_HELPER_ACTION action)
     return -1;
   }
 
-  int pipe0_flags = fcntl(g_configuration->helper_pipes[1], F_GETFL, 0);
-  int pipe1_flags = fcntl(g_configuration->helper_pipes[0], F_GETFL, 0);
-  fcntl(g_configuration->helper_pipes[1], F_SETFL, pipe1_flags | O_NONBLOCK);
-  fcntl(g_configuration->helper_pipes[0], F_SETFL, pipe0_flags | O_NONBLOCK);
+  int pipe0_flags = fcntl(g_configuration->helper_pipes[0], F_GETFL, 0);
+  int pipe1_flags = fcntl(g_configuration->helper_pipes[1], F_GETFL, 0);
+  if (pipe0_flags == -1 || pipe1_flags == -1) {
+    ERROR("dpdkstat: error setting up pipe flags: %s\n", strerror(errno));
+  }
+  int  pipe0_err = fcntl(g_configuration->helper_pipes[0], F_SETFL, pipe1_flags
+                         | O_NONBLOCK);
+  int  pipe1_err = fcntl(g_configuration->helper_pipes[1], F_SETFL, pipe0_flags
+                         | O_NONBLOCK);
+  if (pipe0_err == -1 || pipe1_err == -1) {
+    ERROR("dpdkstat: error setting up pipes: %s\n", strerror(errno));
+  }
 
   pid_t pid = fork();
   if (pid > 0) {
@@ -388,24 +386,24 @@ static int dpdk_helper_init_eal(void)
   int i = 0;
 
   argp[i++] = "collectd-dpdk";
-  if(strcasecmp(g_configuration->coremask, "") != 0) {
+  if (strcasecmp(g_configuration->coremask, "") != 0) {
     argp[i++] = "-c";
     argp[i++] = g_configuration->coremask;
   }
-  if(strcasecmp(g_configuration->memory_channels, "") != 0) {
+  if (strcasecmp(g_configuration->memory_channels, "") != 0) {
     argp[i++] = "-n";
     argp[i++] = g_configuration->memory_channels;
   }
-  if(strcasecmp(g_configuration->socket_memory, "") != 0) {
+  if (strcasecmp(g_configuration->socket_memory, "") != 0) {
     argp[i++] = "--socket-mem";
     argp[i++] = g_configuration->socket_memory;
   }
-  if(strcasecmp(g_configuration->file_prefix, "") != 0 &&
+  if (strcasecmp(g_configuration->file_prefix, "") != 0 &&
      strcasecmp(g_configuration->file_prefix, "/var/run/.rte_config") != 0) {
     argp[i++] = "--file-prefix";
     argp[i++] = g_configuration->file_prefix;
   }
-  if(strcasecmp(g_configuration->process_type, "") != 0) {
+  if (strcasecmp(g_configuration->process_type, "") != 0) {
     argp[i++] = "--proc-type";
     argp[i++] = g_configuration->process_type;
   }
@@ -421,7 +419,7 @@ static int dpdk_helper_init_eal(void)
       printf("%s ", argp[i]);
     }
     printf("\n");
-    return -1;
+    return ret;
   }
   return 0;
 }
@@ -431,7 +429,7 @@ static int dpdk_helper_run (void)
   pid_t ppid = getppid();
   g_configuration->helper_status = DPDK_HELPER_WAITING_ON_PRIMARY;
 
-   while(1) {
+   while (1) {
     /* sem_timedwait() to avoid blocking forever */
     struct timespec ts;
     cdtime_t now = cdtime();
@@ -439,12 +437,11 @@ static int dpdk_helper_run (void)
     CDTIME_T_TO_TIMESPEC(now + half_sec + g_configuration->interval *2, &ts);
     int ret = sem_timedwait(&g_configuration->sema_helper_get_stats, &ts);
 
-    if(ret == -1 && errno == ETIMEDOUT) {
+    if (ret == -1 && errno == ETIMEDOUT) {
       ERROR("dpdkstat-helper: sem timedwait()"
              " timeout, did collectd terminate?\n");
       dpdk_helper_exit(RESET);
     }
-
     /* Parent PID change means collectd died so quit the helper process. */
     if (ppid != getppid()) {
       WARNING("dpdkstat-helper: parent PID changed, quitting.\n");
@@ -463,17 +460,18 @@ static int dpdk_helper_run (void)
       continue;
     }
 
-    if(!g_configuration->eal_initialized) {
+    if (!g_configuration->eal_initialized) {
       /* Initialize EAL. */
       int ret = dpdk_helper_init_eal();
-      if(ret != 0)
+      if(ret != 0) {
+        WARNING("ERROR INITIALIZING EAL\n");
         dpdk_helper_exit(RESET);
+      }
     }
 
     g_configuration->helper_status = DPDK_HELPER_ALIVE_SENDING_STATS;
 
-    uint8_t nb_ports;
-    nb_ports = rte_eth_dev_count();
+    uint8_t nb_ports = rte_eth_dev_count();
     if (nb_ports == 0) {
       DEBUG("dpdkstat-helper: No DPDK ports available. "
               "Check bound devices to DPDK driver.\n");
@@ -486,8 +484,8 @@ static int dpdk_helper_run (void)
     if (g_configuration->enabled_port_mask == 0)
       g_configuration->enabled_port_mask = 0xffff;
 
-    int i, len = 0, enabled_port_count = 0, num_xstats = 0;
-    for (i = 0; i < nb_ports; i++) {
+    int len = 0, enabled_port_count = 0, num_xstats = 0, i = 0;
+    for (; i < nb_ports; i++) {
       if (g_configuration->enabled_port_mask & (1 << i)) {
         if(g_configuration->helper_action == DPDK_HELPER_ACTION_COUNT_STATS) {
           len = rte_eth_xstats_get(i, NULL, 0);
@@ -503,7 +501,7 @@ static int dpdk_helper_run (void)
           len = g_configuration->num_stats_in_port[enabled_port_count];
           g_configuration->port_read_time[enabled_port_count] = cdtime();
           ret = rte_eth_xstats_get(i, &g_configuration->xstats + num_xstats,
-                                   g_configuration->num_stats_in_port[i]);
+                                   g_configuration->num_stats_in_port[enabled_port_count]);
           if (ret < 0 || ret != len) {
             DEBUG("dpdkstat-helper: Error reading xstats on port %d len = %d\n",
                   i, len);
@@ -515,7 +513,7 @@ static int dpdk_helper_run (void)
       } /* if (enabled_port_mask) */
     } /* for (nb_ports) */
 
-    if(g_configuration->helper_action == DPDK_HELPER_ACTION_COUNT_STATS) {
+    if (g_configuration->helper_action == DPDK_HELPER_ACTION_COUNT_STATS) {
       g_configuration->num_ports  = enabled_port_count;
       g_configuration->num_xstats = num_xstats;
       DEBUG("dpdkstat-helper ports: %d, num stats: %d\n",
@@ -525,7 +523,9 @@ static int dpdk_helper_run (void)
       dpdk_helper_exit(NO_RESET);
     }
     /* Now kick collectd send thread to send the stats */
-    sem_post(&g_configuration->sema_stats_in_shm);
+    int err = sem_post(&g_configuration->sema_stats_in_shm);
+    if (err)
+      ERROR("dpdkstat: error posting semaphore to helper %s\n", strerror(errno));
   } /* while(1) */
 
   return 0;
@@ -539,7 +539,7 @@ static int dpdk_read (user_data_t *ud)
    * Check if SHM flag is set to be re-initialized. AKA DPDK ports have been
    * counted, so re-init SHM to be large enough to fit all the statistics.
    */
-  if(g_configuration->collectd_reinit_shm) {
+  if (g_configuration->collectd_reinit_shm) {
     DEBUG("dpdkstat: read() now reinit SHM then launching send-thread\n");
     dpdk_re_init_shm();
   }
@@ -549,13 +549,17 @@ static int dpdk_read (user_data_t *ud)
    * must be done in dpdk_read(), because the DPDK primary process may not be
    * alive at dpdk_init() time.
    */
-  if(g_configuration->helper_status == DPDK_HELPER_NOT_INITIALIZED ||
+  if (g_configuration->helper_status == DPDK_HELPER_NOT_INITIALIZED ||
      g_configuration->helper_status == DPDK_HELPER_GRACEFUL_QUIT) {
       int action = DPDK_HELPER_ACTION_SEND_STATS;
       if(g_configuration->num_xstats == 0)
         action = DPDK_HELPER_ACTION_COUNT_STATS;
       /* Spawn the helper thread to count stats or to read stats. */
-      dpdk_helper_spawn(action);
+      int err = dpdk_helper_spawn(action);
+      if (err) {
+        ERROR("dpdkstat: error spawning helper %s\n", strerror(errno));
+        return -1;
+      }
     }
 
   int exit_status;
@@ -565,7 +569,7 @@ static int dpdk_read (user_data_t *ud)
    *  waitpid() fails, helper process died (or quit), so respawn
    */
   int respawn_helper = 0;
-  if(ws != 0) {
+  if (ws != 0) {
     respawn_helper = 1;
   }
 
@@ -577,15 +581,15 @@ static int dpdk_read (user_data_t *ud)
   fds.fd = g_configuration->helper_pipes[0];
   fds.events = POLLIN;
   int data_avail = poll(&fds, 1, 0);
-  while(data_avail) {
+  while (data_avail) {
     int nbytes = read(g_configuration->helper_pipes[0], buf, sizeof(buf));
-    if(nbytes <= 0)
+    if (nbytes <= 0)
       break;
     ssnprintf( out, nbytes, "%s", buf);
     DEBUG("dpdkstat: helper-proc: %s\n", out);
   }
 
-  if(respawn_helper) {
+  if (respawn_helper) {
     if (g_configuration->helper_pid)
       dpdk_helper_exit(RESET);
     dpdk_helper_spawn(DPDK_HELPER_ACTION_COUNT_STATS);
@@ -600,41 +604,42 @@ static int dpdk_read (user_data_t *ud)
   cdtime_t now = cdtime();
   CDTIME_T_TO_TIMESPEC(now + g_configuration->interval, &ts);
   ret = sem_timedwait(&g_configuration->sema_stats_in_shm, &ts);
-  if(ret == -1 && errno == ETIMEDOUT) {
+  if (ret == -1 && errno == ETIMEDOUT) {
     DEBUG("dpdkstat: timeout in collectd thread: is a DPDK Primary running? \n");
     return 0;
   }
 
   /* Dispatch the stats.*/
-    int i, j, count = 0;
-
-    for (i = 0; i < g_configuration->num_ports; i++) {
-      cdtime_t time = g_configuration->port_read_time[i];
-      char dev_name[64];
-      int len = g_configuration->num_stats_in_port[i];
-      ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
-      struct rte_eth_xstats *xstats = (&g_configuration->xstats);
-      xstats += count; /* pointer arithmetic to jump to each stats struct */
-      for (j = 0; j < len; j++) {
-        value_t dpdkstat_values[1];
-        value_list_t dpdkstat_vl = VALUE_LIST_INIT;
-
-        dpdkstat_values[0].counter = xstats[j].value;
-        dpdkstat_vl.values = dpdkstat_values;
-        dpdkstat_vl.values_len = 1; /* Submit stats one at a time */
-        dpdkstat_vl.time = time;
-        sstrncpy (dpdkstat_vl.host, hostname_g, sizeof (dpdkstat_vl.host));
-        sstrncpy (dpdkstat_vl.plugin, "dpdkstat", sizeof (dpdkstat_vl.plugin));
-        sstrncpy (dpdkstat_vl.plugin_instance, dev_name,
-                  sizeof (dpdkstat_vl.plugin_instance));
-        sstrncpy (dpdkstat_vl.type, "counter",
-                  sizeof (dpdkstat_vl.type));
-        sstrncpy (dpdkstat_vl.type_instance, xstats[j].name,
-                  sizeof (dpdkstat_vl.type_instance));
-        plugin_dispatch_values (&dpdkstat_vl);
-      }
-      count += len;
-    } /* for each port */
+  int count = 0, i = 0;
+
+  for (; i < g_configuration->num_ports; i++) {
+    cdtime_t time = g_configuration->port_read_time[i];
+    char dev_name[64];
+    int len = g_configuration->num_stats_in_port[i];
+    ssnprintf(dev_name, sizeof(dev_name), "port.%d", i);
+    struct rte_eth_xstats *xstats = (&g_configuration->xstats);
+    xstats += count; /* pointer arithmetic to jump to each stats struct */
+    int j = 0;
+    for (; j < len; j++) {
+      value_t dpdkstat_values[1];
+      value_list_t dpdkstat_vl = VALUE_LIST_INIT;
+
+      dpdkstat_values[0].counter = xstats[j].value;
+      dpdkstat_vl.values = dpdkstat_values;
+      dpdkstat_vl.values_len = 1; /* Submit stats one at a time */
+      dpdkstat_vl.time = time;
+      sstrncpy (dpdkstat_vl.host, hostname_g, sizeof (dpdkstat_vl.host));
+      sstrncpy (dpdkstat_vl.plugin, "dpdkstat", sizeof (dpdkstat_vl.plugin));
+      sstrncpy (dpdkstat_vl.plugin_instance, dev_name,
+                sizeof (dpdkstat_vl.plugin_instance));
+      sstrncpy (dpdkstat_vl.type, "counter",
+                sizeof (dpdkstat_vl.type));
+      sstrncpy (dpdkstat_vl.type_instance, xstats[j].name,
+                sizeof (dpdkstat_vl.type_instance));
+      plugin_dispatch_values (&dpdkstat_vl);
+    }
+    count += len;
+  } /* for each port */
   return 0;
 }
 
@@ -642,12 +647,12 @@ static int dpdk_shm_cleanup(void)
 {
   int ret = munmap(g_configuration, sizeof(dpdk_config_t));
   g_configuration = 0;
-  if(ret) {
+  if (ret) {
     WARNING("dpdkstat: munmap returned %d\n", ret);
     return ret;
   }
   ret = shm_unlink(DPDK_SHM_NAME);
-  if(ret) {
+  if (ret) {
     WARNING("dpdkstat: shm_unlink returned %d\n", ret);
     return ret;
   }
@@ -656,9 +661,18 @@ static int dpdk_shm_cleanup(void)
 
 static int dpdk_shutdown (void)
 {
+  int ret = 0;
   close(g_configuration->helper_pipes[1]);
-  kill(g_configuration->helper_pid, SIGKILL);
-  int ret = dpdk_shm_cleanup();
+  int err = kill(g_configuration->helper_pid, SIGKILL);
+  if (err) {
+    ERROR("dpdkstat: error sending sigkill to helper %s\n", strerror(errno));
+    ret = -1;
+  }
+  err = dpdk_shm_cleanup();
+  if (err) {
+    ERROR("dpdkstat: error cleaning up SHM: %s\n", strerror(errno));
+    ret = -1;
+  }
 
   return ret;
 }