Merge pull request #2776 from mfournier/unblock_pr_2737
authorMarc Fournier <marc.fournier@camptocamp.com>
Tue, 15 May 2018 13:37:45 +0000 (14:37 +0100)
committerGitHub <noreply@github.com>
Tue, 15 May 2018 13:37:45 +0000 (14:37 +0100)
Unblock #2737

1  2 
docs/BUILD.dpdkstat.md
src/perl.c
src/write_kafka.c
src/write_prometheus.c

diff --combined docs/BUILD.dpdkstat.md
@@@ -1,7 -1,17 +1,17 @@@
  # The dpdkstat plugin
  
+ This plugin is optional and only has a specific use case: monitoring DPDK applications
+ that don't expose stats in any other way than the DPDK xstats API.
  **Data Plane Development Kit** (DPDK) is a set of drivers and libraries for fast
- packet processing.
+ packet processing. Please note that this plugin is a polling based plugin rather
+ than an events based plugin (using it will drive up core utilization on a system).
+ **PLEASE DO NOT USE THIS PLUGIN FOR OVS-DPDK**. dpdkstat is really for DPDK
+ applications that have no other way of exposing stats. For OVS or OVS-with-DPDK the
+ Open vSwitch plugins available in collectd 5.8.0 should be used for
+ collecting stats and events. In addition the OVS plugin is events based rather
+ than polling based and will have a smaller footprint on the system.
  
  ## Summary
  
@@@ -148,6 -158,34 +158,6 @@@ instruction set manually
  
       *  Run `ldconfig` to update the shared library cache.
  
 -### Static library
 -
 -To build static DPDK library for use with collectd:
 -
 - *  To configure DPDK to build the combined static library `libdpdk.a` ensure
 -    that `CONFIG_RTE_BUILD_SHARED_LIB` is set to ā€œnā€ in `config/common_base` in
 -    your DPDK as follows:
 -
 -        #
 -        # Compile to share library
 -        #
 -        CONFIG_RTE_BUILD_SHARED_LIB=n
 -
 - *  Prepare the configuration for the appropriate target as specified at:
 -    http://dpdk.org/doc/guides/linux_gsg/build_dpdk.html.
 -
 -    For example:
 -
 -        make config T=x86_64-native-linuxapp-gcc
 -
 - *  Build the target using `-fPIC`:
 -
 -        make EXTRA_CFLAGS=-fPIC -j
 -
 - *  Install DPDK to `/usr`:
 -
 -        sudo make install prefix=/usr
 -
  ## Build collectd with DPDK
  
  **Note:** DPDK 16.04 is the minimum version and currently supported version of
@@@ -171,32 -209,33 +181,32 @@@ implications
  See also: http://dpdk.org/doc/guides/prog_guide/multi_proc_support.html
  
   *  Generate the build script as specified below. (i.e. run `build.sh`).
 - *  Configure collectd with the DPDK shared library:
 -
 -        ./configure --with-libdpdk=/usr
 + *  Configure collectd with the DPDK shared library. If DPDK is installed in
 +    custom installation path you can specify headers include path using
 +    LIBDPDK_CPPFLAGS variable and libraries path with LIBDPDK_LDFLAGS.
 +    Example:
  
 -### Build with the static DPDK library
 +        ./configure
  
 -To configure collectd with the DPDK static library:
 +        or for custom DPKD installation:
  
 - *  Run *configure* with the following CFLAGS:
 +        ./configure LIBDPDK_CPPFLAGS="-I/home/joe/include/dpdk" LIBDPDK_LDFLAGS="-L/home/joe/usr/lib"
  
 -        ./configure --with-libdpdk=/usr CFLAGS=" -lpthread -Wl,--whole-archive -Wl,-ldpdk -Wl,-lm -Wl,-lrt -Wl,-lpcap -Wl,-ldl -Wl,--no-whole-archive"
 -
 - *  Make sure that dpdk and dpdkstat are enabled in the *configure* output.
 + *  Make sure that libdpdk and dpdkstat are enabled in the *configure* output.
  
      Expected output:
  
          Libraries:
          ...
          libdpdk  . . . . . . . . yes
 -        
 +
          Modules:
          ...
          dpdkstat . . . . . . .yes
  
   *  Build collectd:
  
 -        make -j && make -j install.
 +    make -j && make -j install
  
      **Note:** As mentioned above, if you are building on Ubuntu 14.04 with
      GCC <= 4.8.X, you need to use:
   *  The same PCI device configuration should be passed to the primary process as
      the secondary process uses the same port indexes as the primary.
   *  A blacklist / whitelist of NICs isn't supported yet.
 + *  Plugin initialization time depends on read interval. It requires 5 read
 +    cycles to set up internal buffers and states. During that time no statistics
 +    are submitted.
 + *  If number of DPDK ports is increased while plugin is running, internal
 +    buffers are resized. That requires 3 read cycles and no port statistics
 +    are submitted in that time.
  
  ## License
  
diff --combined src/perl.c
@@@ -38,7 -38,9 +38,7 @@@
  
  #undef DONT_POISON_SPRINTF_YET
  
 -#if HAVE_STDBOOL_H
  #include <stdbool.h>
 -#endif
  
  #include <EXTERN.h>
  #include <perl.h>
@@@ -261,6 -263,12 +261,6 @@@ struct 
                   {"Collectd::NOTIF_WARNING", NOTIF_WARNING},
                   {"Collectd::NOTIF_OKAY", NOTIF_OKAY},
                   {"", 0}};
 -
 -struct {
 -  char name[64];
 -  char *var;
 -} g_strings[] = {{"Collectd::hostname_g", hostname_g}, {"", NULL}};
 -
  /*
   * Helper functions for data type conversion.
   */
@@@ -418,6 -426,8 +418,6 @@@ static int hv2value_list(pTHX_ HV *hash
  
    if (NULL != (tmp = hv_fetch(hash, "host", 4, 0)))
      sstrncpy(vl->host, SvPV_nolen(*tmp), sizeof(vl->host));
 -  else
 -    sstrncpy(vl->host, hostname_g, sizeof(vl->host));
  
    if (NULL != (tmp = hv_fetch(hash, "plugin", 6, 0)))
      sstrncpy(vl->plugin, SvPV_nolen(*tmp), sizeof(vl->plugin));
@@@ -872,12 -882,12 +872,12 @@@ static int oconfig_item2hv(pTHX_ oconfi
  static char *get_module_name(char *buf, size_t buf_len, const char *module) {
    int status = 0;
    if (base_name[0] == '\0')
 -    status = ssnprintf(buf, buf_len, "%s", module);
 +    status = snprintf(buf, buf_len, "%s", module);
    else
 -    status = ssnprintf(buf, buf_len, "%s::%s", base_name, module);
 +    status = snprintf(buf, buf_len, "%s::%s", base_name, module);
    if ((status < 0) || ((unsigned int)status >= buf_len))
 -    return (NULL);
 -  return (buf);
 +    return NULL;
 +  return buf;
  } /* char *get_module_name */
  
  /*
@@@ -1616,19 -1626,18 +1616,19 @@@ static void _plugin_register_generic_us
        ret = plugin_register_flush("perl", perl_flush, /* user_data = */ NULL);
      }
  
 -    if (0 == ret)
 +    if (0 == ret) {
        ret = plugin_register_flush(pluginname, perl_flush, &userdata);
 +    } else {
 +      free(userdata.data);
 +    }
    } else {
      ret = -1;
    }
  
    if (0 == ret)
      XSRETURN_YES;
 -  else {
 -    free(userdata.data);
 +  else
      XSRETURN_EMPTY;
 -  }
  } /* static void _plugin_register_generic_userdata ( ... ) */
  
  /*
   */
  
  static XS(Collectd_plugin_register_read) {
-   return _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read");
+   _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read");
  }
  
  static XS(Collectd_plugin_register_write) {
-   return _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write");
+   _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write");
  }
  
  static XS(Collectd_plugin_register_log) {
-   return _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log");
+   _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log");
  }
  
  static XS(Collectd_plugin_register_notification) {
-   return _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification");
+   _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification");
  }
  
  static XS(Collectd_plugin_register_flush) {
-   return _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush");
+   _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush");
  }
  
  typedef int perl_unregister_function_t(const char *name);
@@@ -1685,8 -1694,6 +1685,6 @@@ static void _plugin_unregister_generic(
    unreg(SvPV_nolen(ST(0)));
  
    XSRETURN_EMPTY;
-   return;
  } /* static void _plugin_unregister_generic ( ... ) */
  
  /*
   */
  
  static XS(Collectd_plugin_unregister_read) {
-   return _plugin_unregister_generic(aTHX, plugin_unregister_read, "read");
+   _plugin_unregister_generic(aTHX, plugin_unregister_read, "read");
  }
  
  static XS(Collectd_plugin_unregister_write) {
-   return _plugin_unregister_generic(aTHX, plugin_unregister_write, "write");
+   _plugin_unregister_generic(aTHX, plugin_unregister_write, "write");
  }
  
  static XS(Collectd_plugin_unregister_log) {
-   return _plugin_unregister_generic(aTHX, plugin_unregister_log, "log");
+   _plugin_unregister_generic(aTHX, plugin_unregister_log, "log");
  }
  
  static XS(Collectd_plugin_unregister_notification) {
-   return _plugin_unregister_generic(aTHX, plugin_unregister_notification,
-                                     "notification");
+   _plugin_unregister_generic(aTHX, plugin_unregister_notification,
+                              "notification");
  }
  
  static XS(Collectd_plugin_unregister_flush) {
-   return _plugin_unregister_generic(aTHX, plugin_unregister_flush, "flush");
+   _plugin_unregister_generic(aTHX, plugin_unregister_flush, "flush");
  }
  
  /*
@@@ -2091,7 -2098,7 +2089,7 @@@ static int perl_init(void) 
    /* Lock the base thread to avoid race conditions with c_ithread_create().
     * See https://github.com/collectd/collectd/issues/9 and
     *     https://github.com/collectd/collectd/issues/1706 for details.
 -  */
 +   */
    assert(aTHX == perl_threads->head->interp);
    pthread_mutex_lock(&perl_threads->mutex);
  
@@@ -2182,7 -2189,7 +2180,7 @@@ static void perl_log(int level, const c
    /* Lock the base thread if this is not called from one of the read threads
     * to avoid race conditions with c_ithread_create(). See
     * https://github.com/collectd/collectd/issues/9 for details.
 -  */
 +   */
  
    if (aTHX == perl_threads->head->interp)
      pthread_mutex_lock(&perl_threads->mutex);
@@@ -2393,11 -2400,6 +2391,11 @@@ static void xs_init(pTHX) 
     * accessing any such variable (this is basically the same as using
     * tie() in Perl) */
    /* global strings */
 +  struct {
 +    char name[64];
 +    char *var;
 +  } g_strings[] = {{"Collectd::hostname_g", hostname_g}, {"", NULL}};
 +
    for (int i = 0; '\0' != g_strings[i].name[0]; ++i) {
      tmp = get_sv(g_strings[i].name, 1);
      sv_magicext(tmp, NULL, PERL_MAGIC_ext, &g_pv_vtbl, g_strings[i].var, 0);
@@@ -2505,7 -2507,7 +2503,7 @@@ static int perl_config_loadplugin(pTHX
  
    if (NULL == get_module_name(module_name, sizeof(module_name), value)) {
      log_err("Invalid module name %s", value);
 -    return (1);
 +    return 1;
    }
  
    if (0 != init_pi(perl_argc, perl_argv))
@@@ -2731,3 -2733,5 +2729,3 @@@ void module_register(void) 
    plugin_register_complex_config("perl", perl_config);
    return;
  } /* void module_register (void) */
 -
 -/* vim: set sw=4 ts=4 tw=78 noexpandtab : */
diff --combined src/write_kafka.c
@@@ -77,6 -77,14 +77,14 @@@ static void kafka_log(const rd_kafka_t 
  }
  #endif
  
+ static rd_kafka_resp_err_t kafka_error() {
+ #if RD_KAFKA_VERSION >= 0x000b00ff
+   return rd_kafka_last_error();
+ #else
+   return rd_kafka_errno2err(errno);
+ #endif
+ }
  static uint32_t kafka_hash(const char *keydata, size_t keylen) {
    uint32_t hash = 5381;
    for (; keylen > 0; keylen--)
@@@ -89,7 -97,7 +97,7 @@@
  #define KAFKA_RANDOM_KEY_BUFFER                                                \
    (char[KAFKA_RANDOM_KEY_SIZE]) { "" }
  static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
 -  ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
 +  snprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
    return buffer;
  }
  
@@@ -113,12 -121,12 +121,12 @@@ static int kafka_handle(struct kafka_to
    rd_kafka_topic_conf_t *topic_conf;
  
    if (ctx->kafka != NULL && ctx->topic != NULL)
 -    return (0);
 +    return 0;
  
    if (ctx->kafka == NULL) {
      if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) {
        ERROR("write_kafka plugin: cannot duplicate kafka config");
 -      return (1);
 +      return 1;
      }
  
      if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf,
      if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
                                           topic_conf)) == NULL) {
        ERROR("write_kafka plugin: cannot create topic : %s\n",
-             rd_kafka_err2str(rd_kafka_errno2err(errno)));
+             rd_kafka_err2str(kafka_error()));
        return errno;
      }
  
           rd_kafka_topic_name(ctx->topic));
    }
  
 -  return (0);
 +  return 0;
  
  } /* }}} int kafka_handle */
  
@@@ -398,8 -406,8 +406,8 @@@ static void kafka_config_topic(rd_kafka
    rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
    rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
  
 -  ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
 -            tctx->topic_name);
 +  snprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
 +           tctx->topic_name);
  
    status = plugin_register_write(
        callback_name, kafka_write,
@@@ -481,7 -489,7 +489,7 @@@ static int kafka_config(oconfig_item_t 
    }
    if (conf != NULL)
      rd_kafka_conf_destroy(conf);
 -  return (0);
 +  return 0;
  errout:
    if (conf != NULL)
      rd_kafka_conf_destroy(conf);
diff --combined src/write_prometheus.c
@@@ -163,8 -163,8 +163,8 @@@ static char *format_labels(char *buffer
     * know that they are sane. */
    for (size_t i = 0; i < m->n_label; i++) {
      char value[LABEL_VALUE_SIZE];
 -    ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
 -              escape_label_value(value, sizeof(value), m->label[i]->value));
 +    snprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
 +             escape_label_value(value, sizeof(value), m->label[i]->value));
    }
  
    strjoin(buffer, buffer_size, labels, m->n_label, ",");
@@@ -182,13 -182,13 +182,13 @@@ static void format_text(ProtobufCBuffe
    while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
      char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
  
 -    ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
 +    snprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
      buffer->append(buffer, strlen(line), (uint8_t *)line);
  
 -    ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
 -              (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
 -                  ? "gauge"
 -                  : "counter");
 +    snprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
 +             (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
 +                 ? "gauge"
 +                 : "counter");
      buffer->append(buffer, strlen(line), (uint8_t *)line);
  
      for (size_t i = 0; i < fam->n_metric; i++) {
  
        char timestamp_ms[24] = "";
        if (m->has_timestamp_ms)
 -        ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
 -                  m->timestamp_ms);
 +        snprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
 +                 m->timestamp_ms);
  
        if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
 -        ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
 -                  format_labels(labels, sizeof(labels), m), m->gauge->value,
 -                  timestamp_ms);
 +        snprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
 +                 format_labels(labels, sizeof(labels), m), m->gauge->value,
 +                 timestamp_ms);
        else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
 -        ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
 -                  format_labels(labels, sizeof(labels), m), m->counter->value,
 -                  timestamp_ms);
 +        snprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
 +                 format_labels(labels, sizeof(labels), m), m->counter->value,
 +                 timestamp_ms);
  
        buffer->append(buffer, strlen(line), (uint8_t *)line);
      }
    c_avl_iterator_destroy(iter);
  
    char server[1024];
 -  ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
 -            PACKAGE_VERSION, hostname_g);
 +  snprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
 +           PACKAGE_VERSION, hostname_g);
    buffer->append(buffer, strlen(server), (uint8_t *)server);
  
    pthread_mutex_unlock(&metrics_lock);
@@@ -635,7 -635,7 +635,7 @@@ metric_family_create(char *name, data_s
    msg->name = name;
  
    char help[1024];
 -  ssnprintf(
 +  snprintf(
        help, sizeof(help),
        "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
        vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
@@@ -764,6 -764,16 +764,16 @@@ static int prom_open_socket(int addrfam
      if (fd == -1)
        continue;
  
+     int tmp = 1;
+     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(tmp)) != 0) {
+       char errbuf[1024];
+       WARNING("write_prometheus: setsockopt(SO_REUSEADDR) failed: %s",
+               sstrerror(errno, errbuf, sizeof(errbuf)));
+       close(fd);
+       fd = -1;
+       continue;
+     }
      if (bind(fd, ai->ai_addr, ai->ai_addrlen) != 0) {
        close(fd);
        fd = -1;
@@@ -965,3 -975,5 +975,3 @@@ void module_register() 
                            /* user data = */ NULL);
    plugin_register_shutdown("write_prometheus", prom_shutdown);
  }
 -
 -/* vim: set sw=2 sts=2 et fdm=marker : */