From cc893903f8453dc96a797b319bdd4e294052de6f Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Thu, 29 Sep 2016 21:38:24 +0200 Subject: [PATCH] write_prometheus plugin: New plugin for exposing metrics to Prometheus. --- configure.ac | 72 +++++ proto/Makefile.am | 2 +- proto/prometheus.proto | 81 +++++ src/Makefile.am | 18 ++ src/collectd.conf.in | 5 + src/collectd.conf.pod | 35 +++ src/write_prometheus.c | 789 +++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 1001 insertions(+), 1 deletion(-) create mode 100644 proto/prometheus.proto create mode 100644 src/write_prometheus.c diff --git a/configure.ac b/configure.ac index 5d3fcaec..55e80eb1 100644 --- a/configure.ac +++ b/configure.ac @@ -3045,6 +3045,70 @@ fi AM_CONDITIONAL(BUILD_WITH_LIBMEMCACHED, test "x$with_libmemcached" = "xyes") # }}} +# --with-libmicrohttpd {{{ +with_libmicrohttpd_cppflags="" +with_libmicrohttpd_ldflags="" +AC_ARG_WITH([libmicrohttpd], [AS_HELP_STRING([--with-libmicrohttpd@<:@=PREFIX@:>@], [Path to libmicrohttpd.])], + [ + if test "x$withval" != "xno" && test "x$withval" != "xyes" + then + with_libmicrohttpd_cppflags="-I$withval/include" + with_libmicrohttpd_ldflags="-L$withval/lib" + with_libmicrohttpd="yes" + fi + if test "x$withval" = "xno" + then + with_libmicrohttpd="no (disabled on command line)" + fi + ], + [withval="yes"] +) +if test "x$withval" = "xyes" +then +PKG_CHECK_MODULES([MICROHTTPD], [libmicrohttpd], + [with_libmicrohttpd="yes"], + [with_libmicrohttpd="no (pkg-config could not find libmicrohttpd)"] +) +fi + +if test "x$MICROHTTPD_LIBS" = "x" +then + MICROHTTPD_LIBS="-lmicrohttpd" +fi + +SAVE_CPPFLAGS="$CPPFLAGS" +SAVE_LDFLAGS="$LDFLAGS" +SAVE_LIBS="$LIBS" +CPPFLAGS="$with_libmicrohttpd_cppflags $MICROHTTPD_CFLAGS" +LDFLAGS="$with_libmicrohttpd_ldflags $LDFLAGS" +LIBS="$LIBS $MICROHTTPD_LIBS" + +if test "x$with_libmicrohttpd" = "xyes" +then + AC_CHECK_HEADERS([microhttpd.h], + [with_libmicrohttpd="yes"], + [with_libmicrohttpd="no ( not found)"]) +fi + +if test "x$with_libmicrohttpd" = "xyes" +then + AC_CHECK_LIB([microhttpd], [MHD_start_daemon], + [with_libmicrohttpd="yes"], + [with_libmicrohttpd="no (libmicrohttpd not found)"]) +fi + +CPPFLAGS="$SAVE_CPPFLAGS" +LDFLAGS="$SAVE_LDFLAGS" +LIBS="$SAVE_LIBS" + +BUILD_WITH_LIBMICROHTTPD_CPPFLAGS="$with_libmicrohttpd_cppflags $MICROHTTPD_CFLAGS" +BUILD_WITH_LIBMICROHTTPD_LDFLAGS="$with_libmicrohttpd_ldflags" +BUILD_WITH_LIBMICROHTTPD_LIBS="$MICROHTTPD_LIBS" +AC_SUBST([BUILD_WITH_LIBMICROHTTPD_CPPFLAGS]) +AC_SUBST([BUILD_WITH_LIBMICROHTTPD_LDFLAGS]) +AC_SUBST([BUILD_WITH_LIBMICROHTTPD_LIBS]) +# }}} + # --with-libmodbus {{{ with_libmodbus_config="" with_libmodbus_cflags="" @@ -5878,6 +5942,7 @@ plugin_virt="no" plugin_vmem="no" plugin_vserver="no" plugin_wireless="no" +plugin_write_prometheus="no" plugin_xencpu="no" plugin_zfs_arc="no" plugin_zone="no" @@ -6165,6 +6230,10 @@ fi if test "x$have_protoc_c" = "xyes" && test "x$with_libprotobuf_c" = "xyes" then plugin_pinba="yes" + if test "x$with_libmicrohttpd" = "xyes" + then + plugin_write_prometheus="yes" + fi fi # Mac OS X memory interface @@ -6404,6 +6473,7 @@ AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin] AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin]) AC_PLUGIN([write_log], [yes], [Log output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) +AC_PLUGIN([write_prometheus], [$plugin_write_prometheus], [Prometheus write plugin]) AC_PLUGIN([write_redis], [$with_libhiredis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$with_libriemann_client], [Riemann output plugin]) AC_PLUGIN([write_sensu], [yes], [Sensu output plugin]) @@ -6652,6 +6722,7 @@ AC_MSG_RESULT([ libldap . . . . . . . $with_libldap]) AC_MSG_RESULT([ liblua . . . . . . . $with_liblua]) AC_MSG_RESULT([ liblvm2app . . . . . $with_liblvm2app]) AC_MSG_RESULT([ libmemcached . . . . $with_libmemcached]) +AC_MSG_RESULT([ libmicrohttpd . . . . $with_libmicrohttpd]) AC_MSG_RESULT([ libmnl . . . . . . . $with_libmnl]) AC_MSG_RESULT([ libmodbus . . . . . . $with_libmodbus]) AC_MSG_RESULT([ libmongoc . . . . . . $with_libmongoc]) @@ -6838,6 +6909,7 @@ AC_MSG_RESULT([ write_http . . . . . $enable_write_http]) AC_MSG_RESULT([ write_kafka . . . . . $enable_write_kafka]) AC_MSG_RESULT([ write_log . . . . . . $enable_write_log]) AC_MSG_RESULT([ write_mongodb . . . . $enable_write_mongodb]) +AC_MSG_RESULT([ write_prometheus. . . $enable_write_prometheus]) AC_MSG_RESULT([ write_redis . . . . . $enable_write_redis]) AC_MSG_RESULT([ write_riemann . . . . $enable_write_riemann]) AC_MSG_RESULT([ write_sensu . . . . . $enable_write_sensu]) diff --git a/proto/Makefile.am b/proto/Makefile.am index 3c0bfd7f..62d3bed5 100644 --- a/proto/Makefile.am +++ b/proto/Makefile.am @@ -1 +1 @@ -EXTRA_DIST = collectd.proto types.proto +EXTRA_DIST = collectd.proto types.proto prometheus.proto diff --git a/proto/prometheus.proto b/proto/prometheus.proto new file mode 100644 index 00000000..0b84af92 --- /dev/null +++ b/proto/prometheus.proto @@ -0,0 +1,81 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +package io.prometheus.client; +option java_package = "io.prometheus.client"; + +message LabelPair { + optional string name = 1; + optional string value = 2; +} + +enum MetricType { + COUNTER = 0; + GAUGE = 1; + SUMMARY = 2; + UNTYPED = 3; + HISTOGRAM = 4; +} + +message Gauge { + optional double value = 1; +} + +message Counter { + optional double value = 1; +} + +message Quantile { + optional double quantile = 1; + optional double value = 2; +} + +message Summary { + optional uint64 sample_count = 1; + optional double sample_sum = 2; + repeated Quantile quantile = 3; +} + +message Untyped { + optional double value = 1; +} + +message Histogram { + optional uint64 sample_count = 1; + optional double sample_sum = 2; + repeated Bucket bucket = 3; // Ordered in increasing order of upper_bound, +Inf bucket is optional. +} + +message Bucket { + optional uint64 cumulative_count = 1; // Cumulative in increasing order. + optional double upper_bound = 2; // Inclusive. +} + +message Metric { + repeated LabelPair label = 1; + optional Gauge gauge = 2; + optional Counter counter = 3; + optional Summary summary = 4; + optional Untyped untyped = 5; + optional Histogram histogram = 7; + optional int64 timestamp_ms = 6; +} + +message MetricFamily { + optional string name = 1; + optional string help = 2; + optional MetricType type = 3; + repeated Metric metric = 4; +} diff --git a/src/Makefile.am b/src/Makefile.am index 7254bd9c..18dddf77 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1330,6 +1330,15 @@ write_mongodb_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBMONGOC_LDFLAGS) write_mongodb_la_LIBADD = -lmongoc endif +if BUILD_PLUGIN_WRITE_PROMETHEUS +pkglib_LTLIBRARIES += write_prometheus.la +write_prometheus_la_SOURCES = write_prometheus.c +nodist_write_prometheus_la_SOURCES = prometheus.pb-c.c prometheus.pb-c.h +write_prometheus_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBPROTOBUF_C_CPPFLAGS) $(BUILD_WITH_LIBMICROHTTPD_CPPFLAGS) +write_prometheus_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBPROTOBUF_C_LDFLAGS) $(BUILD_WITH_LIBMICROHTTPD_LDFLAGS) +write_prometheus_la_LIBADD = $(BUILD_WITH_LIBPROTOBUF_C_LIBS) $(BUILD_WITH_LIBMICROHTTPD_LIBS) +endif + if BUILD_PLUGIN_WRITE_REDIS pkglib_LTLIBRARIES += write_redis.la write_redis_la_SOURCES = write_redis.c @@ -1472,6 +1481,15 @@ pinba.pb-c.c pinba.pb-c.h: pinba.proto $(AM_V_PROTOC_C)$(PROTOC_C) -I$(srcdir) --c_out . $(srcdir)/pinba.proto endif +# Protocol buffer for the "write_prometheus" plugin. +if BUILD_PLUGIN_WRITE_PROMETHEUS +CLEANFILES += prometheus.pb-c.c prometheus.pb-c.h +BUILT_SOURCES += prometheus.pb-c.c prometheus.pb-c.h + +prometheus.pb-c.c prometheus.pb-c.h: $(top_srcdir)/proto/prometheus.proto + $(AM_V_PROTOC_C)$(PROTOC_C) -I$(top_srcdir)/proto --c_out=$(builddir) $(top_srcdir)/proto/prometheus.proto +endif + install-exec-hook: $(mkinstalldirs) $(DESTDIR)$(sysconfdir) if test -e $(DESTDIR)$(sysconfdir)/collectd.conf; \ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index bbe7b186..8ad24d60 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -210,6 +210,7 @@ #@BUILD_PLUGIN_WRITE_KAFKA_TRUE@LoadPlugin write_kafka #@BUILD_PLUGIN_WRITE_LOG_TRUE@LoadPlugin write_log #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb +#@BUILD_PLUGIN_WRITE_PROMETHEUS_TRUE@LoadPlugin write_prometheus #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann #@BUILD_PLUGIN_WRITE_SENSU_TRUE@LoadPlugin write_sensu @@ -1485,6 +1486,10 @@ # # +# +# Port "9103" +# + # # # Host "localhost" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index c21dac97..dcf6c36d 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -8148,6 +8148,41 @@ want to use authentication all three fields must be set. =back +=head2 Plugin C + +The I implements a tiny webserver that can be scraped +using I. + +B + +=over 4 + +=item B I + +Port the embedded webserver should listen on. Defaults to B<9103>. + +=item B I + +Time in seconds after which I considers a metric "stale" if it +hasn't seen any update for it. This value must match the setting in Prometheus. +It defaults to B<300> seconds (5 minutes), same as Prometheus. + +B + +I has a global setting, C, which controls after +which time a metric without updates is considered "stale". This setting +effectively puts an upper limit on the interval in which metrics are reported. + +When the I encounters a metric with an interval +exceeding this limit, it will inform you, the user, and provide the metric to +I B a timestamp. That causes I to consider the +metric "fresh" each time it is scraped, with the time of the scrape being +considered the time of the update. The result is that there appear more +datapoints in I than were actually created, but at least the metric +doesn't disappear periodically. + +=back + =head2 Plugin C This output plugin submits values to an HTTP server using POST requests and diff --git a/src/write_prometheus.c b/src/write_prometheus.c new file mode 100644 index 00000000..c91f012f --- /dev/null +++ b/src/write_prometheus.c @@ -0,0 +1,789 @@ +/** + * collectd - src/write_prometheus.c + * Copyright (C) 2016 Florian octo Forster + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * Authors: + * Florian octo Forster + */ + +#include "collectd.h" + +#include "common.h" +#include "plugin.h" +#include "utils_avltree.h" +#include "utils_complain.h" +#include "utils_time.h" + +#include "prometheus.pb-c.h" + +#include + +#ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA +#define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T(300) +#endif + +#define VARINT_UINT32_BYTES 5 + +#define CONTENT_TYPE_PROTO \ + "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \ + "encoding=delimited" +#define CONTENT_TYPE_TEXT "text/plain; version=0.0.4" + +static c_avl_tree_t *metrics; +static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER; + +static unsigned short httpd_port = 9103; +static struct MHD_Daemon *httpd; + +static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA; + +/* Unfortunately, protoc-c doesn't export it's implementation of varint, so we + * need to implement our own. */ +static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES], + uint32_t value) { + for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) { + buffer[i] = (uint8_t)(value & 0x7f); + value >>= 7; + + if (value == 0) + return i + 1; + + buffer[i] |= 0x80; + } + + return 0; +} + +/* format_protobuf iterates over all metric families in "metrics" and adds them + * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded + * size, the so called "delimited" format. */ +static void format_protobuf(ProtobufCBuffer *buffer) { + pthread_mutex_lock(&metrics_lock); + + char *unused_name; + Io__Prometheus__Client__MetricFamily *fam; + c_avl_iterator_t *iter = c_avl_get_iterator(metrics); + while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) { + /* Prometheus uses a message length prefix to determine where one + * MetricFamily ends and the next begins. This delimiter is encoded as a + * "varint", which is common in Protobufs. */ + uint8_t delim[VARINT_UINT32_BYTES] = {0}; + size_t delim_len = varint( + delim, + (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam)); + buffer->append(buffer, delim_len, delim); + + io__prometheus__client__metric_family__pack_to_buffer(fam, buffer); + } + c_avl_iterator_destroy(iter); + + pthread_mutex_unlock(&metrics_lock); +} + +/* format_labels formats a metric's labels in Prometheus-compatible format. This + * format looks like this: + * + * key0="value0",key1="value1" + */ +static char *format_labels(char *buffer, size_t buffer_size, + Io__Prometheus__Client__Metric const *m) { + /* our metrics always have at least one and at most three labels. */ + assert(m->n_label >= 1); + assert(m->n_label <= 3); + +#define LABEL_BUFFER_SIZE (2 * DATA_MAX_NAME_LEN + 4) + + char *labels[3] = { + (char[LABEL_BUFFER_SIZE]){0}, (char[LABEL_BUFFER_SIZE]){0}, + (char[LABEL_BUFFER_SIZE]){0}, + }; + + for (size_t i = 0; i < m->n_label; i++) + ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name, + m->label[i]->value); + + strjoin(buffer, buffer_size, labels, m->n_label, ","); + return buffer; +} + +/* format_protobuf iterates over all metric families in "metrics" and adds them + * to a buffer in plain text format. */ +static void format_text(ProtobufCBuffer *buffer) { + pthread_mutex_lock(&metrics_lock); + + char *unused_name; + Io__Prometheus__Client__MetricFamily *fam; + c_avl_iterator_t *iter = c_avl_get_iterator(metrics); + while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) { + char line[1024]; /* 4x DATA_MAX_NAME_LEN? */ + + ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help); + buffer->append(buffer, strlen(line), (uint8_t *)line); + + ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name, + (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE) + ? "gauge" + : "counter"); + buffer->append(buffer, strlen(line), (uint8_t *)line); + + for (size_t i = 0; i < fam->n_metric; i++) { + Io__Prometheus__Client__Metric *m = fam->metric[i]; + + char labels[1024]; + + char timestamp_ms[24] = ""; + if (m->has_timestamp_ms) + ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64, + m->timestamp_ms); + + if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE) + ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name, + format_labels(labels, sizeof(labels), m), m->gauge->value, + timestamp_ms); + else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */ + ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name, + format_labels(labels, sizeof(labels), m), m->counter->value, + timestamp_ms); + + buffer->append(buffer, strlen(line), (uint8_t *)line); + } + } + c_avl_iterator_destroy(iter); + + char server[1024]; + ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n", + PACKAGE_VERSION, hostname_g); + buffer->append(buffer, strlen(server), (uint8_t *)server); + + pthread_mutex_unlock(&metrics_lock); +} + +/* http_handler is the callback called by the microhttpd library. It essentially + * handles all HTTP request aspects and creates an HTTP response. */ +static int http_handler(void *cls, struct MHD_Connection *connection, + const char *url, const char *method, + const char *version, const char *upload_data, + size_t *upload_data_size, void **connection_state) { + if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) { + return MHD_NO; + } + + /* On the first call for each connection, return without anything further. + * Apparently not everything has been initialized yet or so; the docs are not + * very specific on the issue. */ + if (*connection_state == NULL) { + /* set to a random non-NULL pointer. */ + *connection_state = &(int){42}; + return MHD_YES; + } + + char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND, + MHD_HTTP_HEADER_ACCEPT); + _Bool want_proto = + (accept != NULL) && + (strstr(accept, "application/vnd.google.protobuf") != NULL); + + uint8_t scratch[4096] = {0}; + ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch); + ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple; + + if (want_proto) + format_protobuf(buffer); + else + format_text(buffer); + + struct MHD_Response *res = MHD_create_response_from_data( + simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1); + MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE, + want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT); + + int status = MHD_queue_response(connection, MHD_HTTP_OK, res); + + MHD_destroy_response(res); + PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple); + return status; +} + +/* + * Functions for manipulating the global state in "metrics". This is organized + * in two tiers: the global "metrics" tree holds "metric families", which are + * identified by a name (a string). Each metric family has one or more + * "metrics", which are identified by a unique set of key-value-pairs. For + * example: + * + * collectd_cpu_total + * {cpu="0",type="idle"} + * {cpu="0",type="user"} + * ... + * collectd_memory + * {memory="used"} + * {memory="free"} + * ... + * {{{ */ +/* label_pair_destroy frees the memory used by a label pair. */ +static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) { + if (msg == NULL) + return; + + sfree(msg->name); + sfree(msg->value); + + sfree(msg); +} + +/* label_pair_create allocates and initializes a new label pair. */ +static Io__Prometheus__Client__LabelPair *label_pair_create(char const *name, + char const *value) { + Io__Prometheus__Client__LabelPair *msg = calloc(1, sizeof(*msg)); + if (msg == NULL) + return NULL; + io__prometheus__client__label_pair__init(msg); + + msg->name = strdup(name); + msg->value = strdup(value); + if ((msg->name == NULL) || (msg->value == NULL)) { + label_pair_destroy(msg); + return NULL; + } + + return msg; +} + +/* metric_destroy frees the memory used by a metric. */ +static void metric_destroy(Io__Prometheus__Client__Metric *msg) { + if (msg == NULL) + return; + + for (size_t i = 0; i < msg->n_label; i++) { + label_pair_destroy(msg->label[i]); + } + sfree(msg->label); + + sfree(msg->gauge); + sfree(msg->counter); + + sfree(msg); +} + +/* metric_add_labels adds the labels that identify this metric to m. + * The logic is copied from the "collectd_exporter". Essentially, the labels + * contain the hostname, the plugin instance and the type instance of a + * value_list_t. */ +static int metric_add_labels(Io__Prometheus__Client__Metric *m, + value_list_t const *vl) { + size_t n_label = 1; + if (strlen(vl->plugin_instance) != 0) + n_label++; + if (strlen(vl->type_instance) != 0) + n_label++; + + m->label = calloc(n_label, sizeof(*m->label)); + if (m->label == NULL) + return ENOMEM; + + if (strlen(vl->plugin_instance) != 0) { + m->label[m->n_label] = label_pair_create(vl->plugin, vl->plugin_instance); + m->n_label++; + } + + if (strlen(vl->type_instance) != 0) { + char const *name = "type"; + if (strlen(vl->plugin_instance) == 0) + name = vl->plugin; + + m->label[m->n_label] = label_pair_create(name, vl->type_instance); + m->n_label++; + } + + m->label[m->n_label] = label_pair_create("instance", vl->host); + m->n_label++; + + for (size_t i = 0; i < m->n_label; i++) { + if (m->label[i] == NULL) + return ENOMEM; + } + + return 0; +} + +/* metric_cmp compares two metrics. It's prototype makes it easy to use with + * qsort(3) and bsearch(3). */ +static int metric_cmp(void const *a, void const *b) { + Io__Prometheus__Client__Metric const *m_a = + *((Io__Prometheus__Client__Metric **)a); + Io__Prometheus__Client__Metric const *m_b = + *((Io__Prometheus__Client__Metric **)b); + + if (m_a->n_label < m_b->n_label) + return -1; + else if (m_a->n_label > m_b->n_label) + return 1; + + /* Prometheus does not care about the order of labels. All labels in this + * plugin are created by metric_add_labels(), though, and therefore always + * appear in the same order. We take advantage of this and simplify the check + * by making sure all labels are the same in each position. */ + for (size_t i = 0; i < m_a->n_label; i++) { + int status = strcmp(m_a->label[i]->name, m_b->label[i]->name); + if (status != 0) + return status; + + status = strcmp(m_a->label[i]->value, m_b->label[i]->value); + if (status != 0) + return status; + } + + return 0; +} + +/* metric_create allocates and initializes a new metric. */ +static Io__Prometheus__Client__Metric *metric_create(value_list_t const *vl) { + Io__Prometheus__Client__Metric *msg = calloc(1, sizeof(*msg)); + if (msg == NULL) + return NULL; + io__prometheus__client__metric__init(msg); + + if (metric_add_labels(msg, vl) != 0) { + metric_destroy(msg); + return NULL; + } + + return msg; +} + +/* metric_update stores the new value and timestamp in m. */ +static int metric_update(Io__Prometheus__Client__Metric *m, value_t value, + int ds_type, cdtime_t t, cdtime_t interval) { + if (ds_type == DS_TYPE_GAUGE) { + sfree(m->counter); + if (m->gauge == NULL) { + m->gauge = calloc(1, sizeof(*m->gauge)); + if (m->gauge == NULL) + return ENOMEM; + io__prometheus__client__gauge__init(m->gauge); + } + + m->gauge->value = (double)value.gauge; + m->gauge->has_value = 1; + } else { /* not gauge */ + sfree(m->gauge); + if (m->counter == NULL) { + m->counter = calloc(1, sizeof(*m->counter)); + if (m->counter == NULL) + return ENOMEM; + io__prometheus__client__counter__init(m->counter); + } + + switch (ds_type) { + case DS_TYPE_ABSOLUTE: + m->counter->value = (double)value.absolute; + break; + case DS_TYPE_COUNTER: + m->counter->value = (double)value.counter; + break; + default: + m->counter->value = (double)value.derive; + break; + } + m->counter->has_value = 1; + } + + /* Prometheus has a globally configured timeout after which metrics are + * considered stale. This causes problems when metrics have an interval + * exceeding that limit. We emulate the behavior of "pushgateway" and *not* + * send a timestamp value – Prometheus will fill in the current time. */ + if (interval <= staleness_delta) { + m->timestamp_ms = CDTIME_T_TO_MS(t); + m->has_timestamp_ms = 1; + } else { + static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC; + c_complain( + LOG_NOTICE, &long_metric, + "write_prometheus plugin: You have metrics with an interval exceeding " + "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check " + "the collectd.conf(5) manual page to understand what's going on.", + CDTIME_T_TO_DOUBLE(staleness_delta)); + + m->timestamp_ms = 0; + m->has_timestamp_ms = 0; + } + + return 0; +} + +/* metric_family_add_metric adds m to the metric list of fam. */ +static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam, + Io__Prometheus__Client__Metric *m) { + Io__Prometheus__Client__Metric **tmp = + realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric)); + if (tmp == NULL) + return ENOMEM; + fam->metric = tmp; + + fam->metric[fam->n_metric] = m; + fam->n_metric++; + + /* Sort the metrics so that lookup is fast. */ + qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp); + + return 0; +} + +/* metric_family_delete_metric looks up and deletes the metric corresponding to + * vl. */ +static int +metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam, + value_list_t const *vl) { + Io__Prometheus__Client__Metric *key = metric_create(vl); + if (key == NULL) + return ENOMEM; + + size_t i; + for (i = 0; i < fam->n_metric; i++) { + if (metric_cmp(&key, &fam->metric[i]) == 0) + break; + } + + if (i >= fam->n_metric) + return ENOENT; + + metric_destroy(fam->metric[i]); + if ((fam->n_metric - 1) > i) + memmove(&fam->metric[i], &fam->metric[i + 1], + ((fam->n_metric - 1) - i) * sizeof(fam->metric[i])); + fam->n_metric--; + + Io__Prometheus__Client__Metric **tmp = + realloc(fam->metric, fam->n_metric * sizeof(*fam->metric)); + if ((tmp != NULL) || (fam->n_metric == 0)) + fam->metric = tmp; + + return 0; +} + +/* metric_family_get_metric looks up the matching metric in a metric family, + * allocating it if necessary. */ +static Io__Prometheus__Client__Metric * +metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam, + value_list_t const *vl) { + Io__Prometheus__Client__Metric *key = metric_create(vl); + if (key == NULL) + return NULL; + + /* Metrics are sorted in metric_family_add_metric() so that we can do a binary + * search here. */ + Io__Prometheus__Client__Metric **m = bsearch( + &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp); + + if (m != NULL) { + metric_destroy(key); + return *m; + } + + DEBUG("write_prometheus plugin: created new metric in family"); + int status = metric_family_add_metric(fam, key); + if (status != 0) { + metric_destroy(key); + return NULL; + } + + return key; +} + +/* metric_family_update looks up the matching metric in a metric family, + * allocating it if necessary, and updates the metric to the latest value. */ +static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam, + data_set_t const *ds, value_list_t const *vl, + size_t ds_index) { + Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl); + if (m == NULL) + return -1; + + return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time, + vl->interval); +} + +/* metric_family_destroy frees the memory used by a metric family. */ +static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) { + if (msg == NULL) + return; + + sfree(msg->name); + sfree(msg->help); + + for (size_t i = 0; i < msg->n_metric; i++) { + metric_destroy(msg->metric[i]); + } + sfree(msg->metric); + + sfree(msg); +} + +/* metric_family_create allocates and initializes a new metric family. */ +static Io__Prometheus__Client__MetricFamily * +metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl, + size_t ds_index) { + Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg)); + if (msg == NULL) + return NULL; + io__prometheus__client__metric_family__init(msg); + + msg->name = name; + + char help[1024]; + ssnprintf( + help, sizeof(help), + "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'", + vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type), + ds->ds[ds_index].name); + msg->help = strdup(help); + + msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE) + ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE + : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER; + msg->has_type = 1; + + return msg; +} + +/* metric_family_name creates a metric family's name from a data source. This is + * done in the same way as done by the "collectd_exporter" for best possible + * compatibility. In essence, the plugin, type and data source name go in the + * metric family name, while hostname, plugin instance and type instance go into + * the labels of a metric. */ +static char *metric_family_name(data_set_t const *ds, value_list_t const *vl, + size_t ds_index) { + char const *fields[5] = {"collectd"}; + size_t fields_num = 1; + + if (strcmp(vl->plugin, vl->type) != 0) { + fields[fields_num] = vl->plugin; + fields_num++; + } + fields[fields_num] = vl->type; + fields_num++; + + if (strcmp("value", ds->ds[ds_index].name) != 0) { + fields[fields_num] = ds->ds[ds_index].name; + fields_num++; + } + + /* Prometheus best practices: + * cumulative metrics should have a "total" suffix. */ + if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) || + (ds->ds[ds_index].type == DS_TYPE_DERIVE)) { + fields[fields_num] = "total"; + fields_num++; + } + + char name[5 * DATA_MAX_NAME_LEN]; + strjoin(name, sizeof(name), (char **)fields, fields_num, "_"); + return strdup(name); +} + +/* metric_family_get looks up the matching metric family, allocating it if + * necessary. */ +static Io__Prometheus__Client__MetricFamily * +metric_family_get(data_set_t const *ds, value_list_t const *vl, + size_t ds_index) { + char *name = metric_family_name(ds, vl, ds_index); + if (name == NULL) { + ERROR("write_prometheus plugin: Allocating metric family name failed."); + return NULL; + } + + Io__Prometheus__Client__MetricFamily *fam = NULL; + if (c_avl_get(metrics, name, (void *)&fam) == 0) { + sfree(name); + assert(fam != NULL); + return fam; + } + + fam = metric_family_create(name, ds, vl, ds_index); + if (fam == NULL) { + ERROR("write_prometheus plugin: Allocating metric family failed."); + sfree(name); + return NULL; + } + + /* If successful, "name" is owned by "fam", i.e. don't free it here. */ + DEBUG("write_prometheus plugin: metric family \"%s\" has been created.", + name); + name = NULL; + + int status = c_avl_insert(metrics, fam->name, fam); + if (status != 0) { + ERROR("write_prometheus plugin: Adding \"%s\" failed.", name); + metric_family_destroy(fam); + return NULL; + } + + return fam; +} +/* }}} */ + +/* + * collectd callbacks + */ +static int prom_config(oconfig_item_t *ci) { + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Port", child->key) == 0) { + int status = cf_util_get_port_number(child); + if (status > 0) + httpd_port = (unsigned short)status; + } else if (strcasecmp("StalenessDelta", child->key) == 0) { + cf_util_get_cdtime(child, &staleness_delta); + } else { + WARNING("write_prometheus plugin: Ignoring unknown configuration option " + "\"%s\".", + child->key); + } + } + + return 0; +} + +static int prom_init() { + if (metrics == NULL) { + metrics = c_avl_create((void *)strcmp); + if (metrics == NULL) { + ERROR("write_prometheus plugin: c_avl_create() failed."); + return -1; + } + } + + if (httpd == NULL) { + unsigned int flags = MHD_USE_THREAD_PER_CONNECTION; +#if MHD_VERSION >= 0x00093300 + flags |= MHD_USE_DUAL_STACK; +#endif + + httpd = MHD_start_daemon(flags, httpd_port, + /* MHD_AcceptPolicyCallback = */ NULL, + /* MHD_AcceptPolicyCallback arg = */ NULL, + http_handler, NULL, MHD_OPTION_END); + if (httpd == NULL) { + ERROR("write_prometheus plugin: MHD_start_daemon() failed."); + return -1; + } + DEBUG("write_prometheus plugin: Successfully started microhttpd %s", + MHD_get_version()); + } + + return 0; +} + +static int prom_write(data_set_t const *ds, value_list_t const *vl, + __attribute__((unused)) user_data_t *ud) { + pthread_mutex_lock(&metrics_lock); + + for (size_t i = 0; i < ds->ds_num; i++) { + Io__Prometheus__Client__MetricFamily *fam = metric_family_get(ds, vl, i); + if (fam == NULL) + continue; + + int status = metric_family_update(fam, ds, vl, i); + if (status != 0) { + ERROR("write_prometheus plugin: Updating metric \"%s\" failed with " + "status %d", + fam->name, status); + continue; + } + } + + pthread_mutex_unlock(&metrics_lock); + return 0; +} + +static int prom_missing(value_list_t const *vl, + __attribute__((unused)) user_data_t *ud) { + data_set_t const *ds = plugin_get_ds(vl->type); + if (ds == NULL) + return ENOENT; + + pthread_mutex_lock(&metrics_lock); + + for (size_t i = 0; i < ds->ds_num; i++) { + Io__Prometheus__Client__MetricFamily *fam = metric_family_get(ds, vl, i); + if (fam == NULL) + continue; + + int status = metric_family_delete_metric(fam, vl); + if (status != 0) { + ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" " + "failed with status %d", + fam->name, status); + continue; + } + + if (fam->n_metric == 0) { + int status = c_avl_remove(metrics, fam->name, NULL, NULL); + if (status != 0) { + ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed " + "with status %d", + fam->name, status); + continue; + } + metric_family_destroy(fam); + } + } + + pthread_mutex_unlock(&metrics_lock); + return 0; +} + +static int prom_shutdown() { + if (httpd != NULL) { + MHD_stop_daemon(httpd); + httpd = NULL; + } + + pthread_mutex_lock(&metrics_lock); + if (metrics != NULL) { + char *name; + Io__Prometheus__Client__MetricFamily *fam; + while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) { + assert(name == fam->name); + name = NULL; + + metric_family_destroy(fam); + } + c_avl_destroy(metrics); + metrics = NULL; + } + pthread_mutex_unlock(&metrics_lock); + + return 0; +} + +void module_register() { + plugin_register_complex_config("write_prometheus", prom_config); + plugin_register_init("write_prometheus", prom_init); + plugin_register_write("write_prometheus", prom_write, + /* user data = */ NULL); + plugin_register_missing("write_prometheus", prom_missing, + /* user data = */ NULL); + plugin_register_shutdown("write_prometheus", prom_shutdown); +} + +/* vim: set sw=2 sts=2 et fdm=marker : */ -- 2.11.0