write_prometheus plugin: New plugin for exposing metrics to Prometheus.
authorFlorian Forster <octo@collectd.org>
Thu, 29 Sep 2016 19:38:24 +0000 (21:38 +0200)
committerFlorian Forster <octo@collectd.org>
Fri, 11 Nov 2016 13:42:03 +0000 (14:42 +0100)
configure.ac
proto/Makefile.am
proto/prometheus.proto [new file with mode: 0644]
src/Makefile.am
src/collectd.conf.in
src/collectd.conf.pod
src/write_prometheus.c [new file with mode: 0644]

index 5d3fcae..55e80eb 100644 (file)
@@ -3045,6 +3045,70 @@ fi
 AM_CONDITIONAL(BUILD_WITH_LIBMEMCACHED, test "x$with_libmemcached" = "xyes")
 # }}}
 
+# --with-libmicrohttpd {{{
+with_libmicrohttpd_cppflags=""
+with_libmicrohttpd_ldflags=""
+AC_ARG_WITH([libmicrohttpd], [AS_HELP_STRING([--with-libmicrohttpd@<:@=PREFIX@:>@], [Path to libmicrohttpd.])],
+  [
+    if test "x$withval" != "xno" && test "x$withval" != "xyes"
+    then
+      with_libmicrohttpd_cppflags="-I$withval/include"
+      with_libmicrohttpd_ldflags="-L$withval/lib"
+      with_libmicrohttpd="yes"
+    fi
+    if test "x$withval" = "xno"
+    then
+      with_libmicrohttpd="no (disabled on command line)"
+    fi
+  ],
+  [withval="yes"]
+)
+if test "x$withval" = "xyes"
+then
+PKG_CHECK_MODULES([MICROHTTPD], [libmicrohttpd],
+  [with_libmicrohttpd="yes"],
+  [with_libmicrohttpd="no (pkg-config could not find libmicrohttpd)"]
+)
+fi
+
+if test "x$MICROHTTPD_LIBS" = "x"
+then
+  MICROHTTPD_LIBS="-lmicrohttpd"
+fi
+
+SAVE_CPPFLAGS="$CPPFLAGS"
+SAVE_LDFLAGS="$LDFLAGS"
+SAVE_LIBS="$LIBS"
+CPPFLAGS="$with_libmicrohttpd_cppflags $MICROHTTPD_CFLAGS"
+LDFLAGS="$with_libmicrohttpd_ldflags $LDFLAGS"
+LIBS="$LIBS $MICROHTTPD_LIBS"
+
+if test "x$with_libmicrohttpd" = "xyes"
+then
+  AC_CHECK_HEADERS([microhttpd.h],
+                   [with_libmicrohttpd="yes"],
+                   [with_libmicrohttpd="no (<microhttpd.h> not found)"])
+fi
+
+if test "x$with_libmicrohttpd" = "xyes"
+then
+  AC_CHECK_LIB([microhttpd], [MHD_start_daemon],
+               [with_libmicrohttpd="yes"],
+               [with_libmicrohttpd="no (libmicrohttpd not found)"])
+fi
+
+CPPFLAGS="$SAVE_CPPFLAGS"
+LDFLAGS="$SAVE_LDFLAGS"
+LIBS="$SAVE_LIBS"
+
+BUILD_WITH_LIBMICROHTTPD_CPPFLAGS="$with_libmicrohttpd_cppflags $MICROHTTPD_CFLAGS"
+BUILD_WITH_LIBMICROHTTPD_LDFLAGS="$with_libmicrohttpd_ldflags"
+BUILD_WITH_LIBMICROHTTPD_LIBS="$MICROHTTPD_LIBS"
+AC_SUBST([BUILD_WITH_LIBMICROHTTPD_CPPFLAGS])
+AC_SUBST([BUILD_WITH_LIBMICROHTTPD_LDFLAGS])
+AC_SUBST([BUILD_WITH_LIBMICROHTTPD_LIBS])
+# }}}
+
 # --with-libmodbus {{{
 with_libmodbus_config=""
 with_libmodbus_cflags=""
@@ -5878,6 +5942,7 @@ plugin_virt="no"
 plugin_vmem="no"
 plugin_vserver="no"
 plugin_wireless="no"
+plugin_write_prometheus="no"
 plugin_xencpu="no"
 plugin_zfs_arc="no"
 plugin_zone="no"
@@ -6165,6 +6230,10 @@ fi
 if test "x$have_protoc_c" = "xyes" && test "x$with_libprotobuf_c" = "xyes"
 then
        plugin_pinba="yes"
+       if test "x$with_libmicrohttpd" = "xyes"
+       then
+               plugin_write_prometheus="yes"
+       fi
 fi
 
 # Mac OS X memory interface
@@ -6404,6 +6473,7 @@ AC_PLUGIN([write_http],          [$with_libcurl],           [HTTP output plugin]
 AC_PLUGIN([write_kafka],         [$with_librdkafka],        [Kafka output plugin])
 AC_PLUGIN([write_log],           [yes],                     [Log output plugin])
 AC_PLUGIN([write_mongodb],       [$with_libmongoc],         [MongoDB output plugin])
+AC_PLUGIN([write_prometheus],    [$plugin_write_prometheus], [Prometheus write plugin])
 AC_PLUGIN([write_redis],         [$with_libhiredis],        [Redis output plugin])
 AC_PLUGIN([write_riemann],       [$with_libriemann_client], [Riemann output plugin])
 AC_PLUGIN([write_sensu],         [yes],                     [Sensu output plugin])
@@ -6652,6 +6722,7 @@ AC_MSG_RESULT([    libldap . . . . . . . $with_libldap])
 AC_MSG_RESULT([    liblua  . . . . . . . $with_liblua])
 AC_MSG_RESULT([    liblvm2app  . . . . . $with_liblvm2app])
 AC_MSG_RESULT([    libmemcached  . . . . $with_libmemcached])
+AC_MSG_RESULT([    libmicrohttpd . . . . $with_libmicrohttpd])
 AC_MSG_RESULT([    libmnl  . . . . . . . $with_libmnl])
 AC_MSG_RESULT([    libmodbus . . . . . . $with_libmodbus])
 AC_MSG_RESULT([    libmongoc . . . . . . $with_libmongoc])
@@ -6838,6 +6909,7 @@ AC_MSG_RESULT([    write_http  . . . . . $enable_write_http])
 AC_MSG_RESULT([    write_kafka . . . . . $enable_write_kafka])
 AC_MSG_RESULT([    write_log . . . . . . $enable_write_log])
 AC_MSG_RESULT([    write_mongodb . . . . $enable_write_mongodb])
+AC_MSG_RESULT([    write_prometheus. . . $enable_write_prometheus])
 AC_MSG_RESULT([    write_redis . . . . . $enable_write_redis])
 AC_MSG_RESULT([    write_riemann . . . . $enable_write_riemann])
 AC_MSG_RESULT([    write_sensu . . . . . $enable_write_sensu])
index 3c0bfd7..62d3bed 100644 (file)
@@ -1 +1 @@
-EXTRA_DIST = collectd.proto types.proto
+EXTRA_DIST = collectd.proto types.proto prometheus.proto
diff --git a/proto/prometheus.proto b/proto/prometheus.proto
new file mode 100644 (file)
index 0000000..0b84af9
--- /dev/null
@@ -0,0 +1,81 @@
+// Copyright 2013 Prometheus Team
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+package io.prometheus.client;
+option java_package = "io.prometheus.client";
+
+message LabelPair {
+  optional string name  = 1;
+  optional string value = 2;
+}
+
+enum MetricType {
+  COUNTER    = 0;
+  GAUGE      = 1;
+  SUMMARY    = 2;
+  UNTYPED    = 3;
+  HISTOGRAM  = 4;
+}
+
+message Gauge {
+  optional double value = 1;
+}
+
+message Counter {
+  optional double value = 1;
+}
+
+message Quantile {
+  optional double quantile = 1;
+  optional double value    = 2;
+}
+
+message Summary {
+  optional uint64   sample_count = 1;
+  optional double   sample_sum   = 2;
+  repeated Quantile quantile     = 3;
+}
+
+message Untyped {
+  optional double value = 1;
+}
+
+message Histogram {
+  optional uint64 sample_count = 1;
+  optional double sample_sum   = 2;
+  repeated Bucket bucket       = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional.
+}
+
+message Bucket {
+  optional uint64 cumulative_count = 1; // Cumulative in increasing order.
+  optional double upper_bound = 2;      // Inclusive.
+}
+
+message Metric {
+  repeated LabelPair label        = 1;
+  optional Gauge     gauge        = 2;
+  optional Counter   counter      = 3;
+  optional Summary   summary      = 4;
+  optional Untyped   untyped      = 5;
+  optional Histogram histogram    = 7;
+  optional int64     timestamp_ms = 6;
+}
+
+message MetricFamily {
+  optional string     name   = 1;
+  optional string     help   = 2;
+  optional MetricType type   = 3;
+  repeated Metric     metric = 4;
+}
index 7254bd9..18dddf7 100644 (file)
@@ -1330,6 +1330,15 @@ write_mongodb_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBMONGOC_LDFLAGS)
 write_mongodb_la_LIBADD = -lmongoc
 endif
 
+if BUILD_PLUGIN_WRITE_PROMETHEUS
+pkglib_LTLIBRARIES += write_prometheus.la
+write_prometheus_la_SOURCES = write_prometheus.c
+nodist_write_prometheus_la_SOURCES = prometheus.pb-c.c prometheus.pb-c.h
+write_prometheus_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_C_CPPFLAGS) $(BUILD_WITH_LIBMICROHTTPD_CPPFLAGS)
+write_prometheus_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_C_LDFLAGS) $(BUILD_WITH_LIBMICROHTTPD_LDFLAGS)
+write_prometheus_la_LIBADD = $(BUILD_WITH_LIBPROTOBUF_C_LIBS) $(BUILD_WITH_LIBMICROHTTPD_LIBS)
+endif
+
 if BUILD_PLUGIN_WRITE_REDIS
 pkglib_LTLIBRARIES += write_redis.la
 write_redis_la_SOURCES = write_redis.c
@@ -1472,6 +1481,15 @@ pinba.pb-c.c pinba.pb-c.h: pinba.proto
        $(AM_V_PROTOC_C)$(PROTOC_C) -I$(srcdir) --c_out . $(srcdir)/pinba.proto
 endif
 
+# Protocol buffer for the "write_prometheus" plugin.
+if BUILD_PLUGIN_WRITE_PROMETHEUS
+CLEANFILES += prometheus.pb-c.c prometheus.pb-c.h
+BUILT_SOURCES += prometheus.pb-c.c prometheus.pb-c.h
+
+prometheus.pb-c.c prometheus.pb-c.h: $(top_srcdir)/proto/prometheus.proto
+       $(AM_V_PROTOC_C)$(PROTOC_C) -I$(top_srcdir)/proto --c_out=$(builddir) $(top_srcdir)/proto/prometheus.proto
+endif
+
 install-exec-hook:
        $(mkinstalldirs) $(DESTDIR)$(sysconfdir)
        if test -e $(DESTDIR)$(sysconfdir)/collectd.conf; \
index bbe7b18..8ad24d6 100644 (file)
 #@BUILD_PLUGIN_WRITE_KAFKA_TRUE@LoadPlugin write_kafka
 #@BUILD_PLUGIN_WRITE_LOG_TRUE@LoadPlugin write_log
 #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb
+#@BUILD_PLUGIN_WRITE_PROMETHEUS_TRUE@LoadPlugin write_prometheus
 #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis
 #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann
 #@BUILD_PLUGIN_WRITE_SENSU_TRUE@LoadPlugin write_sensu
 #      </Node>
 #</Plugin>
 
+#<Plugin write_prometheus>
+#      Port "9103"
+#</Plugin>
+
 #<Plugin write_redis>
 #      <Node "example">
 #              Host "localhost"
index c21dac9..dcf6c36 100644 (file)
@@ -8148,6 +8148,41 @@ want to use authentication all three fields must be set.
 
 =back
 
+=head2 Plugin C<write_prometheus>
+
+The I<write_prometheus plugin> implements a tiny webserver that can be scraped
+using I<Prometheus>.
+
+B<Options:>
+
+=over 4
+
+=item B<Port> I<Port>
+
+Port the embedded webserver should listen on. Defaults to B<9103>.
+
+=item B<StalenessDelta> I<Seconds>
+
+Time in seconds after which I<Prometheus> considers a metric "stale" if it
+hasn't seen any update for it. This value must match the setting in Prometheus.
+It defaults to B<300> seconds (5 minutes), same as Prometheus.
+
+B<Background:>
+
+I<Prometheus> has a global setting, C<StalenessDelta>, which controls after
+which time a metric without updates is considered "stale". This setting
+effectively puts an upper limit on the interval in which metrics are reported.
+
+When the I<write_prometheus plugin> encounters a metric with an interval
+exceeding this limit, it will inform you, the user, and provide the metric to
+I<Prometheus> B<without> a timestamp. That causes I<Prometheus> to consider the
+metric "fresh" each time it is scraped, with the time of the scrape being
+considered the time of the update. The result is that there appear more
+datapoints in I<Prometheus> than were actually created, but at least the metric
+doesn't disappear periodically.
+
+=back
+
 =head2 Plugin C<write_http>
 
 This output plugin submits values to an HTTP server using POST requests and
diff --git a/src/write_prometheus.c b/src/write_prometheus.c
new file mode 100644 (file)
index 0000000..c91f012
--- /dev/null
@@ -0,0 +1,789 @@
+/**
+ * collectd - src/write_prometheus.c
+ * Copyright (C) 2016       Florian octo Forster
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Florian octo Forster <octo at collectd.org>
+ */
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_avltree.h"
+#include "utils_complain.h"
+#include "utils_time.h"
+
+#include "prometheus.pb-c.h"
+
+#include <microhttpd.h>
+
+#ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
+#define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T(300)
+#endif
+
+#define VARINT_UINT32_BYTES 5
+
+#define CONTENT_TYPE_PROTO                                                     \
+  "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
+  "encoding=delimited"
+#define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
+
+static c_avl_tree_t *metrics;
+static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static unsigned short httpd_port = 9103;
+static struct MHD_Daemon *httpd;
+
+static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
+
+/* Unfortunately, protoc-c doesn't export it's implementation of varint, so we
+ * need to implement our own. */
+static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
+                     uint32_t value) {
+  for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
+    buffer[i] = (uint8_t)(value & 0x7f);
+    value >>= 7;
+
+    if (value == 0)
+      return i + 1;
+
+    buffer[i] |= 0x80;
+  }
+
+  return 0;
+}
+
+/* format_protobuf iterates over all metric families in "metrics" and adds them
+ * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
+ * size, the so called "delimited" format. */
+static void format_protobuf(ProtobufCBuffer *buffer) {
+  pthread_mutex_lock(&metrics_lock);
+
+  char *unused_name;
+  Io__Prometheus__Client__MetricFamily *fam;
+  c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
+  while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
+    /* Prometheus uses a message length prefix to determine where one
+     * MetricFamily ends and the next begins. This delimiter is encoded as a
+     * "varint", which is common in Protobufs. */
+    uint8_t delim[VARINT_UINT32_BYTES] = {0};
+    size_t delim_len = varint(
+        delim,
+        (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
+    buffer->append(buffer, delim_len, delim);
+
+    io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
+  }
+  c_avl_iterator_destroy(iter);
+
+  pthread_mutex_unlock(&metrics_lock);
+}
+
+/* format_labels formats a metric's labels in Prometheus-compatible format. This
+ * format looks like this:
+ *
+ *   key0="value0",key1="value1"
+ */
+static char *format_labels(char *buffer, size_t buffer_size,
+                           Io__Prometheus__Client__Metric const *m) {
+  /* our metrics always have at least one and at most three labels. */
+  assert(m->n_label >= 1);
+  assert(m->n_label <= 3);
+
+#define LABEL_BUFFER_SIZE (2 * DATA_MAX_NAME_LEN + 4)
+
+  char *labels[3] = {
+      (char[LABEL_BUFFER_SIZE]){0}, (char[LABEL_BUFFER_SIZE]){0},
+      (char[LABEL_BUFFER_SIZE]){0},
+  };
+
+  for (size_t i = 0; i < m->n_label; i++)
+    ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
+              m->label[i]->value);
+
+  strjoin(buffer, buffer_size, labels, m->n_label, ",");
+  return buffer;
+}
+
+/* format_protobuf iterates over all metric families in "metrics" and adds them
+ * to a buffer in plain text format. */
+static void format_text(ProtobufCBuffer *buffer) {
+  pthread_mutex_lock(&metrics_lock);
+
+  char *unused_name;
+  Io__Prometheus__Client__MetricFamily *fam;
+  c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
+  while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
+    char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
+
+    ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
+    buffer->append(buffer, strlen(line), (uint8_t *)line);
+
+    ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
+              (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
+                  ? "gauge"
+                  : "counter");
+    buffer->append(buffer, strlen(line), (uint8_t *)line);
+
+    for (size_t i = 0; i < fam->n_metric; i++) {
+      Io__Prometheus__Client__Metric *m = fam->metric[i];
+
+      char labels[1024];
+
+      char timestamp_ms[24] = "";
+      if (m->has_timestamp_ms)
+        ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
+                  m->timestamp_ms);
+
+      if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
+        ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
+                  format_labels(labels, sizeof(labels), m), m->gauge->value,
+                  timestamp_ms);
+      else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
+        ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
+                  format_labels(labels, sizeof(labels), m), m->counter->value,
+                  timestamp_ms);
+
+      buffer->append(buffer, strlen(line), (uint8_t *)line);
+    }
+  }
+  c_avl_iterator_destroy(iter);
+
+  char server[1024];
+  ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
+            PACKAGE_VERSION, hostname_g);
+  buffer->append(buffer, strlen(server), (uint8_t *)server);
+
+  pthread_mutex_unlock(&metrics_lock);
+}
+
+/* http_handler is the callback called by the microhttpd library. It essentially
+ * handles all HTTP request aspects and creates an HTTP response. */
+static int http_handler(void *cls, struct MHD_Connection *connection,
+                        const char *url, const char *method,
+                        const char *version, const char *upload_data,
+                        size_t *upload_data_size, void **connection_state) {
+  if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
+    return MHD_NO;
+  }
+
+  /* On the first call for each connection, return without anything further.
+   * Apparently not everything has been initialized yet or so; the docs are not
+   * very specific on the issue. */
+  if (*connection_state == NULL) {
+    /* set to a random non-NULL pointer. */
+    *connection_state = &(int){42};
+    return MHD_YES;
+  }
+
+  char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
+                                                   MHD_HTTP_HEADER_ACCEPT);
+  _Bool want_proto =
+      (accept != NULL) &&
+      (strstr(accept, "application/vnd.google.protobuf") != NULL);
+
+  uint8_t scratch[4096] = {0};
+  ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
+  ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
+
+  if (want_proto)
+    format_protobuf(buffer);
+  else
+    format_text(buffer);
+
+  struct MHD_Response *res = MHD_create_response_from_data(
+      simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
+  MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
+                          want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
+
+  int status = MHD_queue_response(connection, MHD_HTTP_OK, res);
+
+  MHD_destroy_response(res);
+  PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
+  return status;
+}
+
+/*
+ * Functions for manipulating the global state in "metrics". This is organized
+ * in two tiers: the global "metrics" tree holds "metric families", which are
+ * identified by a name (a string). Each metric family has one or more
+ * "metrics", which are identified by a unique set of key-value-pairs. For
+ * example:
+ *
+ * collectd_cpu_total
+ *   {cpu="0",type="idle"}
+ *   {cpu="0",type="user"}
+ *   ...
+ * collectd_memory
+ *   {memory="used"}
+ *   {memory="free"}
+ *   ...
+ * {{{ */
+/* label_pair_destroy frees the memory used by a label pair. */
+static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
+  if (msg == NULL)
+    return;
+
+  sfree(msg->name);
+  sfree(msg->value);
+
+  sfree(msg);
+}
+
+/* label_pair_create allocates and initializes a new label pair. */
+static Io__Prometheus__Client__LabelPair *label_pair_create(char const *name,
+                                                            char const *value) {
+  Io__Prometheus__Client__LabelPair *msg = calloc(1, sizeof(*msg));
+  if (msg == NULL)
+    return NULL;
+  io__prometheus__client__label_pair__init(msg);
+
+  msg->name = strdup(name);
+  msg->value = strdup(value);
+  if ((msg->name == NULL) || (msg->value == NULL)) {
+    label_pair_destroy(msg);
+    return NULL;
+  }
+
+  return msg;
+}
+
+/* metric_destroy frees the memory used by a metric. */
+static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
+  if (msg == NULL)
+    return;
+
+  for (size_t i = 0; i < msg->n_label; i++) {
+    label_pair_destroy(msg->label[i]);
+  }
+  sfree(msg->label);
+
+  sfree(msg->gauge);
+  sfree(msg->counter);
+
+  sfree(msg);
+}
+
+/* metric_add_labels adds the labels that identify this metric to m.
+ * The logic is copied from the "collectd_exporter". Essentially, the labels
+ * contain the hostname, the plugin instance and the type instance of a
+ * value_list_t. */
+static int metric_add_labels(Io__Prometheus__Client__Metric *m,
+                             value_list_t const *vl) {
+  size_t n_label = 1;
+  if (strlen(vl->plugin_instance) != 0)
+    n_label++;
+  if (strlen(vl->type_instance) != 0)
+    n_label++;
+
+  m->label = calloc(n_label, sizeof(*m->label));
+  if (m->label == NULL)
+    return ENOMEM;
+
+  if (strlen(vl->plugin_instance) != 0) {
+    m->label[m->n_label] = label_pair_create(vl->plugin, vl->plugin_instance);
+    m->n_label++;
+  }
+
+  if (strlen(vl->type_instance) != 0) {
+    char const *name = "type";
+    if (strlen(vl->plugin_instance) == 0)
+      name = vl->plugin;
+
+    m->label[m->n_label] = label_pair_create(name, vl->type_instance);
+    m->n_label++;
+  }
+
+  m->label[m->n_label] = label_pair_create("instance", vl->host);
+  m->n_label++;
+
+  for (size_t i = 0; i < m->n_label; i++) {
+    if (m->label[i] == NULL)
+      return ENOMEM;
+  }
+
+  return 0;
+}
+
+/* metric_cmp compares two metrics. It's prototype makes it easy to use with
+ * qsort(3) and bsearch(3). */
+static int metric_cmp(void const *a, void const *b) {
+  Io__Prometheus__Client__Metric const *m_a =
+      *((Io__Prometheus__Client__Metric **)a);
+  Io__Prometheus__Client__Metric const *m_b =
+      *((Io__Prometheus__Client__Metric **)b);
+
+  if (m_a->n_label < m_b->n_label)
+    return -1;
+  else if (m_a->n_label > m_b->n_label)
+    return 1;
+
+  /* Prometheus does not care about the order of labels. All labels in this
+   * plugin are created by metric_add_labels(), though, and therefore always
+   * appear in the same order. We take advantage of this and simplify the check
+   * by making sure all labels are the same in each position. */
+  for (size_t i = 0; i < m_a->n_label; i++) {
+    int status = strcmp(m_a->label[i]->name, m_b->label[i]->name);
+    if (status != 0)
+      return status;
+
+    status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
+    if (status != 0)
+      return status;
+  }
+
+  return 0;
+}
+
+/* metric_create allocates and initializes a new metric. */
+static Io__Prometheus__Client__Metric *metric_create(value_list_t const *vl) {
+  Io__Prometheus__Client__Metric *msg = calloc(1, sizeof(*msg));
+  if (msg == NULL)
+    return NULL;
+  io__prometheus__client__metric__init(msg);
+
+  if (metric_add_labels(msg, vl) != 0) {
+    metric_destroy(msg);
+    return NULL;
+  }
+
+  return msg;
+}
+
+/* metric_update stores the new value and timestamp in m. */
+static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
+                         int ds_type, cdtime_t t, cdtime_t interval) {
+  if (ds_type == DS_TYPE_GAUGE) {
+    sfree(m->counter);
+    if (m->gauge == NULL) {
+      m->gauge = calloc(1, sizeof(*m->gauge));
+      if (m->gauge == NULL)
+        return ENOMEM;
+      io__prometheus__client__gauge__init(m->gauge);
+    }
+
+    m->gauge->value = (double)value.gauge;
+    m->gauge->has_value = 1;
+  } else { /* not gauge */
+    sfree(m->gauge);
+    if (m->counter == NULL) {
+      m->counter = calloc(1, sizeof(*m->counter));
+      if (m->counter == NULL)
+        return ENOMEM;
+      io__prometheus__client__counter__init(m->counter);
+    }
+
+    switch (ds_type) {
+    case DS_TYPE_ABSOLUTE:
+      m->counter->value = (double)value.absolute;
+      break;
+    case DS_TYPE_COUNTER:
+      m->counter->value = (double)value.counter;
+      break;
+    default:
+      m->counter->value = (double)value.derive;
+      break;
+    }
+    m->counter->has_value = 1;
+  }
+
+  /* Prometheus has a globally configured timeout after which metrics are
+   * considered stale. This causes problems when metrics have an interval
+   * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
+   * send a timestamp value â€“ Prometheus will fill in the current time. */
+  if (interval <= staleness_delta) {
+    m->timestamp_ms = CDTIME_T_TO_MS(t);
+    m->has_timestamp_ms = 1;
+  } else {
+    static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
+    c_complain(
+        LOG_NOTICE, &long_metric,
+        "write_prometheus plugin: You have metrics with an interval exceeding "
+        "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
+        "the collectd.conf(5) manual page to understand what's going on.",
+        CDTIME_T_TO_DOUBLE(staleness_delta));
+
+    m->timestamp_ms = 0;
+    m->has_timestamp_ms = 0;
+  }
+
+  return 0;
+}
+
+/* metric_family_add_metric adds m to the metric list of fam. */
+static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
+                                    Io__Prometheus__Client__Metric *m) {
+  Io__Prometheus__Client__Metric **tmp =
+      realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
+  if (tmp == NULL)
+    return ENOMEM;
+  fam->metric = tmp;
+
+  fam->metric[fam->n_metric] = m;
+  fam->n_metric++;
+
+  /* Sort the metrics so that lookup is fast. */
+  qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
+
+  return 0;
+}
+
+/* metric_family_delete_metric looks up and deletes the metric corresponding to
+ * vl. */
+static int
+metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
+                            value_list_t const *vl) {
+  Io__Prometheus__Client__Metric *key = metric_create(vl);
+  if (key == NULL)
+    return ENOMEM;
+
+  size_t i;
+  for (i = 0; i < fam->n_metric; i++) {
+    if (metric_cmp(&key, &fam->metric[i]) == 0)
+      break;
+  }
+
+  if (i >= fam->n_metric)
+    return ENOENT;
+
+  metric_destroy(fam->metric[i]);
+  if ((fam->n_metric - 1) > i)
+    memmove(&fam->metric[i], &fam->metric[i + 1],
+            ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
+  fam->n_metric--;
+
+  Io__Prometheus__Client__Metric **tmp =
+      realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
+  if ((tmp != NULL) || (fam->n_metric == 0))
+    fam->metric = tmp;
+
+  return 0;
+}
+
+/* metric_family_get_metric looks up the matching metric in a metric family,
+ * allocating it if necessary. */
+static Io__Prometheus__Client__Metric *
+metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
+                         value_list_t const *vl) {
+  Io__Prometheus__Client__Metric *key = metric_create(vl);
+  if (key == NULL)
+    return NULL;
+
+  /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
+   * search here. */
+  Io__Prometheus__Client__Metric **m = bsearch(
+      &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
+
+  if (m != NULL) {
+    metric_destroy(key);
+    return *m;
+  }
+
+  DEBUG("write_prometheus plugin: created new metric in family");
+  int status = metric_family_add_metric(fam, key);
+  if (status != 0) {
+    metric_destroy(key);
+    return NULL;
+  }
+
+  return key;
+}
+
+/* metric_family_update looks up the matching metric in a metric family,
+ * allocating it if necessary, and updates the metric to the latest value. */
+static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
+                                data_set_t const *ds, value_list_t const *vl,
+                                size_t ds_index) {
+  Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
+  if (m == NULL)
+    return -1;
+
+  return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
+                       vl->interval);
+}
+
+/* metric_family_destroy frees the memory used by a metric family. */
+static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
+  if (msg == NULL)
+    return;
+
+  sfree(msg->name);
+  sfree(msg->help);
+
+  for (size_t i = 0; i < msg->n_metric; i++) {
+    metric_destroy(msg->metric[i]);
+  }
+  sfree(msg->metric);
+
+  sfree(msg);
+}
+
+/* metric_family_create allocates and initializes a new metric family. */
+static Io__Prometheus__Client__MetricFamily *
+metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
+                     size_t ds_index) {
+  Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
+  if (msg == NULL)
+    return NULL;
+  io__prometheus__client__metric_family__init(msg);
+
+  msg->name = name;
+
+  char help[1024];
+  ssnprintf(
+      help, sizeof(help),
+      "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
+      vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
+      ds->ds[ds_index].name);
+  msg->help = strdup(help);
+
+  msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
+                  ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
+                  : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
+  msg->has_type = 1;
+
+  return msg;
+}
+
+/* metric_family_name creates a metric family's name from a data source. This is
+ * done in the same way as done by the "collectd_exporter" for best possible
+ * compatibility. In essence, the plugin, type and data source name go in the
+ * metric family name, while hostname, plugin instance and type instance go into
+ * the labels of a metric. */
+static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
+                                size_t ds_index) {
+  char const *fields[5] = {"collectd"};
+  size_t fields_num = 1;
+
+  if (strcmp(vl->plugin, vl->type) != 0) {
+    fields[fields_num] = vl->plugin;
+    fields_num++;
+  }
+  fields[fields_num] = vl->type;
+  fields_num++;
+
+  if (strcmp("value", ds->ds[ds_index].name) != 0) {
+    fields[fields_num] = ds->ds[ds_index].name;
+    fields_num++;
+  }
+
+  /* Prometheus best practices:
+   * cumulative metrics should have a "total" suffix. */
+  if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
+      (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
+    fields[fields_num] = "total";
+    fields_num++;
+  }
+
+  char name[5 * DATA_MAX_NAME_LEN];
+  strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
+  return strdup(name);
+}
+
+/* metric_family_get looks up the matching metric family, allocating it if
+ * necessary. */
+static Io__Prometheus__Client__MetricFamily *
+metric_family_get(data_set_t const *ds, value_list_t const *vl,
+                  size_t ds_index) {
+  char *name = metric_family_name(ds, vl, ds_index);
+  if (name == NULL) {
+    ERROR("write_prometheus plugin: Allocating metric family name failed.");
+    return NULL;
+  }
+
+  Io__Prometheus__Client__MetricFamily *fam = NULL;
+  if (c_avl_get(metrics, name, (void *)&fam) == 0) {
+    sfree(name);
+    assert(fam != NULL);
+    return fam;
+  }
+
+  fam = metric_family_create(name, ds, vl, ds_index);
+  if (fam == NULL) {
+    ERROR("write_prometheus plugin: Allocating metric family failed.");
+    sfree(name);
+    return NULL;
+  }
+
+  /* If successful, "name" is owned by "fam", i.e. don't free it here. */
+  DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
+        name);
+  name = NULL;
+
+  int status = c_avl_insert(metrics, fam->name, fam);
+  if (status != 0) {
+    ERROR("write_prometheus plugin: Adding \"%s\" failed.", name);
+    metric_family_destroy(fam);
+    return NULL;
+  }
+
+  return fam;
+}
+/* }}} */
+
+/*
+ * collectd callbacks
+ */
+static int prom_config(oconfig_item_t *ci) {
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Port", child->key) == 0) {
+      int status = cf_util_get_port_number(child);
+      if (status > 0)
+        httpd_port = (unsigned short)status;
+    } else if (strcasecmp("StalenessDelta", child->key) == 0) {
+      cf_util_get_cdtime(child, &staleness_delta);
+    } else {
+      WARNING("write_prometheus plugin: Ignoring unknown configuration option "
+              "\"%s\".",
+              child->key);
+    }
+  }
+
+  return 0;
+}
+
+static int prom_init() {
+  if (metrics == NULL) {
+    metrics = c_avl_create((void *)strcmp);
+    if (metrics == NULL) {
+      ERROR("write_prometheus plugin: c_avl_create() failed.");
+      return -1;
+    }
+  }
+
+  if (httpd == NULL) {
+    unsigned int flags = MHD_USE_THREAD_PER_CONNECTION;
+#if MHD_VERSION >= 0x00093300
+    flags |= MHD_USE_DUAL_STACK;
+#endif
+
+    httpd = MHD_start_daemon(flags, httpd_port,
+                             /* MHD_AcceptPolicyCallback = */ NULL,
+                             /* MHD_AcceptPolicyCallback arg = */ NULL,
+                             http_handler, NULL, MHD_OPTION_END);
+    if (httpd == NULL) {
+      ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
+      return -1;
+    }
+    DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
+          MHD_get_version());
+  }
+
+  return 0;
+}
+
+static int prom_write(data_set_t const *ds, value_list_t const *vl,
+                      __attribute__((unused)) user_data_t *ud) {
+  pthread_mutex_lock(&metrics_lock);
+
+  for (size_t i = 0; i < ds->ds_num; i++) {
+    Io__Prometheus__Client__MetricFamily *fam = metric_family_get(ds, vl, i);
+    if (fam == NULL)
+      continue;
+
+    int status = metric_family_update(fam, ds, vl, i);
+    if (status != 0) {
+      ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
+            "status %d",
+            fam->name, status);
+      continue;
+    }
+  }
+
+  pthread_mutex_unlock(&metrics_lock);
+  return 0;
+}
+
+static int prom_missing(value_list_t const *vl,
+                        __attribute__((unused)) user_data_t *ud) {
+  data_set_t const *ds = plugin_get_ds(vl->type);
+  if (ds == NULL)
+    return ENOENT;
+
+  pthread_mutex_lock(&metrics_lock);
+
+  for (size_t i = 0; i < ds->ds_num; i++) {
+    Io__Prometheus__Client__MetricFamily *fam = metric_family_get(ds, vl, i);
+    if (fam == NULL)
+      continue;
+
+    int status = metric_family_delete_metric(fam, vl);
+    if (status != 0) {
+      ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
+            "failed with status %d",
+            fam->name, status);
+      continue;
+    }
+
+    if (fam->n_metric == 0) {
+      int status = c_avl_remove(metrics, fam->name, NULL, NULL);
+      if (status != 0) {
+        ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
+              "with status %d",
+              fam->name, status);
+        continue;
+      }
+      metric_family_destroy(fam);
+    }
+  }
+
+  pthread_mutex_unlock(&metrics_lock);
+  return 0;
+}
+
+static int prom_shutdown() {
+  if (httpd != NULL) {
+    MHD_stop_daemon(httpd);
+    httpd = NULL;
+  }
+
+  pthread_mutex_lock(&metrics_lock);
+  if (metrics != NULL) {
+    char *name;
+    Io__Prometheus__Client__MetricFamily *fam;
+    while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
+      assert(name == fam->name);
+      name = NULL;
+
+      metric_family_destroy(fam);
+    }
+    c_avl_destroy(metrics);
+    metrics = NULL;
+  }
+  pthread_mutex_unlock(&metrics_lock);
+
+  return 0;
+}
+
+void module_register() {
+  plugin_register_complex_config("write_prometheus", prom_config);
+  plugin_register_init("write_prometheus", prom_init);
+  plugin_register_write("write_prometheus", prom_write,
+                        /* user data = */ NULL);
+  plugin_register_missing("write_prometheus", prom_missing,
+                          /* user data = */ NULL);
+  plugin_register_shutdown("write_prometheus", prom_shutdown);
+}
+
+/* vim: set sw=2 sts=2 et fdm=marker : */