Merge pull request #3339 from jkohen/patch-1
[collectd.git] / src / write_prometheus.c
1 /**
2  * collectd - src/write_prometheus.c
3  * Copyright (C) 2016       Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to deal
7  * in the Software without restriction, including without limitation the rights
8  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9  * copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21  * SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  */
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/avltree/avltree.h"
31 #include "utils/common/common.h"
32 #include "utils_complain.h"
33 #include "utils_time.h"
34
35 #include "prometheus.pb-c.h"
36
37 #include <microhttpd.h>
38
39 #include <netdb.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42
43 #ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
44 #define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T_STATIC(300)
45 #endif
46
47 #define VARINT_UINT32_BYTES 5
48
49 #define CONTENT_TYPE_PROTO                                                     \
50   "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
51   "encoding=delimited"
52 #define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
53
54 static c_avl_tree_t *metrics;
55 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
56
57 static char *httpd_host = NULL;
58 static unsigned short httpd_port = 9103;
59 static struct MHD_Daemon *httpd;
60
61 static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
62
63 /* Unfortunately, protoc-c doesn't export its implementation of varint, so we
64  * need to implement our own. */
65 static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
66                      uint32_t value) {
67   for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
68     buffer[i] = (uint8_t)(value & 0x7f);
69     value >>= 7;
70
71     if (value == 0)
72       return i + 1;
73
74     buffer[i] |= 0x80;
75   }
76
77   return 0;
78 }
79
80 /* format_protobuf iterates over all metric families in "metrics" and adds them
81  * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
82  * size, the so called "delimited" format. */
83 static void format_protobuf(ProtobufCBuffer *buffer) {
84   pthread_mutex_lock(&metrics_lock);
85
86   char *unused_name;
87   Io__Prometheus__Client__MetricFamily *fam;
88   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
89   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
90     /* Prometheus uses a message length prefix to determine where one
91      * MetricFamily ends and the next begins. This delimiter is encoded as a
92      * "varint", which is common in Protobufs. */
93     uint8_t delim[VARINT_UINT32_BYTES] = {0};
94     size_t delim_len = varint(
95         delim,
96         (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
97     buffer->append(buffer, delim_len, delim);
98
99     io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
100   }
101   c_avl_iterator_destroy(iter);
102
103   pthread_mutex_unlock(&metrics_lock);
104 }
105
106 static char const *escape_label_value(char *buffer, size_t buffer_size,
107                                       char const *value) {
108   /* shortcut for values that don't need escaping. */
109   if (strpbrk(value, "\n\"\\") == NULL)
110     return value;
111
112   size_t value_len = strlen(value);
113   size_t buffer_len = 0;
114
115   for (size_t i = 0; i < value_len; i++) {
116     switch (value[i]) {
117     case '\n':
118     case '"':
119     case '\\':
120       if ((buffer_size - buffer_len) < 3) {
121         break;
122       }
123       buffer[buffer_len] = '\\';
124       buffer[buffer_len + 1] = (value[i] == '\n') ? 'n' : value[i];
125       buffer_len += 2;
126       break;
127
128     default:
129       if ((buffer_size - buffer_len) < 2) {
130         break;
131       }
132       buffer[buffer_len] = value[i];
133       buffer_len++;
134       break;
135     }
136   }
137
138   assert(buffer_len < buffer_size);
139   buffer[buffer_len] = 0;
140   return buffer;
141 }
142
143 /* format_labels formats a metric's labels in Prometheus-compatible format. This
144  * format looks like this:
145  *
146  *   key0="value0",key1="value1"
147  */
148 static char *format_labels(char *buffer, size_t buffer_size,
149                            Io__Prometheus__Client__Metric const *m) {
150   /* our metrics always have at least one and at most three labels. */
151   assert(m->n_label >= 1);
152   assert(m->n_label <= 3);
153
154 #define LABEL_KEY_SIZE DATA_MAX_NAME_LEN
155 #define LABEL_VALUE_SIZE (2 * DATA_MAX_NAME_LEN - 1)
156 #define LABEL_BUFFER_SIZE (LABEL_KEY_SIZE + LABEL_VALUE_SIZE + 4)
157
158   char *labels[3] = {
159       (char[LABEL_BUFFER_SIZE]){0},
160       (char[LABEL_BUFFER_SIZE]){0},
161       (char[LABEL_BUFFER_SIZE]){0},
162   };
163
164   /* N.B.: the label *names* are hard-coded by this plugin and therefore we
165    * know that they are sane. */
166   for (size_t i = 0; i < m->n_label; i++) {
167     char value[LABEL_VALUE_SIZE];
168     ssnprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
169               escape_label_value(value, sizeof(value), m->label[i]->value));
170   }
171
172   strjoin(buffer, buffer_size, labels, m->n_label, ",");
173   return buffer;
174 }
175
176 /* format_protobuf iterates over all metric families in "metrics" and adds them
177  * to a buffer in plain text format. */
178 static void format_text(ProtobufCBuffer *buffer) {
179   pthread_mutex_lock(&metrics_lock);
180
181   char *unused_name;
182   Io__Prometheus__Client__MetricFamily *fam;
183   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
184   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
185     char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
186
187     ssnprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
188     buffer->append(buffer, strlen(line), (uint8_t *)line);
189
190     ssnprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
191               (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
192                   ? "gauge"
193                   : "counter");
194     buffer->append(buffer, strlen(line), (uint8_t *)line);
195
196     for (size_t i = 0; i < fam->n_metric; i++) {
197       Io__Prometheus__Client__Metric *m = fam->metric[i];
198
199       char labels[1024];
200
201       char timestamp_ms[24] = "";
202       if (m->has_timestamp_ms)
203         ssnprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
204                   m->timestamp_ms);
205
206       if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
207         ssnprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
208                   format_labels(labels, sizeof(labels), m), m->gauge->value,
209                   timestamp_ms);
210       else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
211         ssnprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
212                   format_labels(labels, sizeof(labels), m), m->counter->value,
213                   timestamp_ms);
214
215       buffer->append(buffer, strlen(line), (uint8_t *)line);
216     }
217   }
218   c_avl_iterator_destroy(iter);
219
220   char server[1024];
221   ssnprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
222             PACKAGE_VERSION, hostname_g);
223   buffer->append(buffer, strlen(server), (uint8_t *)server);
224
225   pthread_mutex_unlock(&metrics_lock);
226 }
227
228 /* http_handler is the callback called by the microhttpd library. It essentially
229  * handles all HTTP request aspects and creates an HTTP response. */
230 static int http_handler(void *cls, struct MHD_Connection *connection,
231                         const char *url, const char *method,
232                         const char *version, const char *upload_data,
233                         size_t *upload_data_size, void **connection_state) {
234   if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
235     return MHD_NO;
236   }
237
238   /* On the first call for each connection, return without anything further.
239    * Apparently not everything has been initialized yet or so; the docs are not
240    * very specific on the issue. */
241   if (*connection_state == NULL) {
242     /* set to a random non-NULL pointer. */
243     *connection_state = &(int){42};
244     return MHD_YES;
245   }
246
247   char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
248                                                    MHD_HTTP_HEADER_ACCEPT);
249   bool want_proto = (accept != NULL) &&
250                     (strstr(accept, "application/vnd.google.protobuf") != NULL);
251
252   uint8_t scratch[4096] = {0};
253   ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
254   ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
255
256   if (want_proto)
257     format_protobuf(buffer);
258   else
259     format_text(buffer);
260
261 #if defined(MHD_VERSION) && MHD_VERSION >= 0x00090500
262   struct MHD_Response *res = MHD_create_response_from_buffer(
263       simple.len, simple.data, MHD_RESPMEM_MUST_COPY);
264 #else
265   struct MHD_Response *res = MHD_create_response_from_data(
266       simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
267 #endif
268   MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
269                           want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
270
271   int status = MHD_queue_response(connection, MHD_HTTP_OK, res);
272
273   MHD_destroy_response(res);
274   PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
275   return status;
276 }
277
278 /*
279  * Functions for manipulating the global state in "metrics". This is organized
280  * in two tiers: the global "metrics" tree holds "metric families", which are
281  * identified by a name (a string). Each metric family has one or more
282  * "metrics", which are identified by a unique set of key-value-pairs. For
283  * example:
284  *
285  * collectd_cpu_total
286  *   {cpu="0",type="idle"}
287  *   {cpu="0",type="user"}
288  *   ...
289  * collectd_memory
290  *   {memory="used"}
291  *   {memory="free"}
292  *   ...
293  * {{{ */
294 /* label_pair_destroy frees the memory used by a label pair. */
295 static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
296   if (msg == NULL)
297     return;
298
299   sfree(msg->name);
300   sfree(msg->value);
301
302   sfree(msg);
303 }
304
305 /* label_pair_clone allocates and initializes a new label pair. */
306 static Io__Prometheus__Client__LabelPair *
307 label_pair_clone(Io__Prometheus__Client__LabelPair const *orig) {
308   Io__Prometheus__Client__LabelPair *copy = calloc(1, sizeof(*copy));
309   if (copy == NULL)
310     return NULL;
311   io__prometheus__client__label_pair__init(copy);
312
313   copy->name = strdup(orig->name);
314   copy->value = strdup(orig->value);
315   if ((copy->name == NULL) || (copy->value == NULL)) {
316     label_pair_destroy(copy);
317     return NULL;
318   }
319
320   return copy;
321 }
322
323 /* metric_destroy frees the memory used by a metric. */
324 static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
325   if (msg == NULL)
326     return;
327
328   for (size_t i = 0; i < msg->n_label; i++) {
329     label_pair_destroy(msg->label[i]);
330   }
331   sfree(msg->label);
332
333   sfree(msg->gauge);
334   sfree(msg->counter);
335
336   sfree(msg);
337 }
338
339 /* metric_cmp compares two metrics. It's prototype makes it easy to use with
340  * qsort(3) and bsearch(3). */
341 static int metric_cmp(void const *a, void const *b) {
342   Io__Prometheus__Client__Metric const *m_a =
343       *((Io__Prometheus__Client__Metric **)a);
344   Io__Prometheus__Client__Metric const *m_b =
345       *((Io__Prometheus__Client__Metric **)b);
346
347   if (m_a->n_label < m_b->n_label)
348     return -1;
349   else if (m_a->n_label > m_b->n_label)
350     return 1;
351
352   /* Prometheus does not care about the order of labels. All labels in this
353    * plugin are created by METRIC_ADD_LABELS(), though, and therefore always
354    * appear in the same order. We take advantage of this and simplify the check
355    * by making sure all labels are the same in each position.
356    *
357    * We also only need to check the label values, because the label names are
358    * the same for all metrics in a metric family.
359    *
360    * 3 labels:
361    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
362    * [1] type="$type_instance"      => "type" is a static string
363    * [2] instance="$host"           => "instance" is a static string
364    *
365    * 2 labels, variant 1:
366    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
367    * [1] instance="$host"           => "instance" is a static string
368    *
369    * 2 labels, variant 2:
370    * [0] $plugin="$type_instance"   => $plugin is the same within a family
371    * [1] instance="$host"           => "instance" is a static string
372    *
373    * 1 label:
374    * [1] instance="$host"           => "instance" is a static string
375    */
376   for (size_t i = 0; i < m_a->n_label; i++) {
377     int status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
378     if (status != 0)
379       return status;
380
381 #if COLLECT_DEBUG
382     assert(strcmp(m_a->label[i]->name, m_b->label[i]->name) == 0);
383 #endif
384   }
385
386   return 0;
387 }
388
389 #define METRIC_INIT                                                            \
390   &(Io__Prometheus__Client__Metric) {                                          \
391     .label =                                                                   \
392         (Io__Prometheus__Client__LabelPair *[]){                               \
393             &(Io__Prometheus__Client__LabelPair){                              \
394                 .name = NULL,                                                  \
395             },                                                                 \
396             &(Io__Prometheus__Client__LabelPair){                              \
397                 .name = NULL,                                                  \
398             },                                                                 \
399             &(Io__Prometheus__Client__LabelPair){                              \
400                 .name = NULL,                                                  \
401             },                                                                 \
402         },                                                                     \
403     .n_label = 0,                                                              \
404   }
405
406 #define METRIC_ADD_LABELS(m, vl)                                               \
407   do {                                                                         \
408     if (strlen((vl)->plugin_instance) != 0) {                                  \
409       (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                   \
410       (m)->label[(m)->n_label]->value = (char *)(vl)->plugin_instance;         \
411       (m)->n_label++;                                                          \
412     }                                                                          \
413                                                                                \
414     if (strlen((vl)->type_instance) != 0) {                                    \
415       (m)->label[(m)->n_label]->name = "type";                                 \
416       if (strlen((vl)->plugin_instance) == 0)                                  \
417         (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                 \
418       (m)->label[(m)->n_label]->value = (char *)(vl)->type_instance;           \
419       (m)->n_label++;                                                          \
420     }                                                                          \
421                                                                                \
422     (m)->label[(m)->n_label]->name = "instance";                               \
423     (m)->label[(m)->n_label]->value = (char *)(vl)->host;                      \
424     (m)->n_label++;                                                            \
425   } while (0)
426
427 /* metric_clone allocates and initializes a new metric based on orig. */
428 static Io__Prometheus__Client__Metric *
429 metric_clone(Io__Prometheus__Client__Metric const *orig) {
430   Io__Prometheus__Client__Metric *copy = calloc(1, sizeof(*copy));
431   if (copy == NULL)
432     return NULL;
433   io__prometheus__client__metric__init(copy);
434
435   copy->n_label = orig->n_label;
436   copy->label = calloc(copy->n_label, sizeof(*copy->label));
437   if (copy->label == NULL) {
438     sfree(copy);
439     return NULL;
440   }
441
442   for (size_t i = 0; i < copy->n_label; i++) {
443     copy->label[i] = label_pair_clone(orig->label[i]);
444     if (copy->label[i] == NULL) {
445       metric_destroy(copy);
446       return NULL;
447     }
448   }
449
450   return copy;
451 }
452
453 /* metric_update stores the new value and timestamp in m. */
454 static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
455                          int ds_type, cdtime_t t, cdtime_t interval) {
456   if (ds_type == DS_TYPE_GAUGE) {
457     sfree(m->counter);
458     if (m->gauge == NULL) {
459       m->gauge = calloc(1, sizeof(*m->gauge));
460       if (m->gauge == NULL)
461         return ENOMEM;
462       io__prometheus__client__gauge__init(m->gauge);
463     }
464
465     m->gauge->value = (double)value.gauge;
466     m->gauge->has_value = 1;
467   } else { /* not gauge */
468     sfree(m->gauge);
469     if (m->counter == NULL) {
470       m->counter = calloc(1, sizeof(*m->counter));
471       if (m->counter == NULL)
472         return ENOMEM;
473       io__prometheus__client__counter__init(m->counter);
474     }
475
476     switch (ds_type) {
477     case DS_TYPE_ABSOLUTE:
478       m->counter->value = (double)value.absolute;
479       break;
480     case DS_TYPE_COUNTER:
481       m->counter->value = (double)value.counter;
482       break;
483     default:
484       m->counter->value = (double)value.derive;
485       break;
486     }
487     m->counter->has_value = 1;
488   }
489
490   /* Prometheus has a globally configured timeout after which metrics are
491    * considered stale. This causes problems when metrics have an interval
492    * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
493    * send a timestamp value â€“ Prometheus will fill in the current time. */
494   if (interval <= staleness_delta) {
495     m->timestamp_ms = CDTIME_T_TO_MS(t);
496     m->has_timestamp_ms = 1;
497   } else {
498     static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
499     c_complain(
500         LOG_NOTICE, &long_metric,
501         "write_prometheus plugin: You have metrics with an interval exceeding "
502         "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
503         "the collectd.conf(5) manual page to understand what's going on.",
504         CDTIME_T_TO_DOUBLE(staleness_delta));
505
506     m->timestamp_ms = 0;
507     m->has_timestamp_ms = 0;
508   }
509
510   return 0;
511 }
512
513 /* metric_family_add_metric adds m to the metric list of fam. */
514 static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
515                                     Io__Prometheus__Client__Metric *m) {
516   Io__Prometheus__Client__Metric **tmp =
517       realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
518   if (tmp == NULL)
519     return ENOMEM;
520   fam->metric = tmp;
521
522   fam->metric[fam->n_metric] = m;
523   fam->n_metric++;
524
525   /* Sort the metrics so that lookup is fast. */
526   qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
527
528   return 0;
529 }
530
531 /* metric_family_delete_metric looks up and deletes the metric corresponding to
532  * vl. */
533 static int
534 metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
535                             value_list_t const *vl) {
536   Io__Prometheus__Client__Metric *key = METRIC_INIT;
537   METRIC_ADD_LABELS(key, vl);
538
539   size_t i;
540   for (i = 0; i < fam->n_metric; i++) {
541     if (metric_cmp(&key, &fam->metric[i]) == 0)
542       break;
543   }
544
545   if (i >= fam->n_metric)
546     return ENOENT;
547
548   metric_destroy(fam->metric[i]);
549   if ((fam->n_metric - 1) > i)
550     memmove(&fam->metric[i], &fam->metric[i + 1],
551             ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
552   fam->n_metric--;
553
554   if (fam->n_metric == 0) {
555     sfree(fam->metric);
556     return 0;
557   }
558
559   Io__Prometheus__Client__Metric **tmp =
560       realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
561   if (tmp != NULL)
562     fam->metric = tmp;
563
564   return 0;
565 }
566
567 /* metric_family_get_metric looks up the matching metric in a metric family,
568  * allocating it if necessary. */
569 static Io__Prometheus__Client__Metric *
570 metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
571                          value_list_t const *vl) {
572   Io__Prometheus__Client__Metric *key = METRIC_INIT;
573   METRIC_ADD_LABELS(key, vl);
574
575   /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
576    * search here. */
577   Io__Prometheus__Client__Metric **m = bsearch(
578       &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
579
580   if (m != NULL) {
581     return *m;
582   }
583
584   Io__Prometheus__Client__Metric *new_metric = metric_clone(key);
585   if (new_metric == NULL)
586     return NULL;
587
588   DEBUG("write_prometheus plugin: created new metric in family");
589   int status = metric_family_add_metric(fam, new_metric);
590   if (status != 0) {
591     metric_destroy(new_metric);
592     return NULL;
593   }
594
595   return new_metric;
596 }
597
598 /* metric_family_update looks up the matching metric in a metric family,
599  * allocating it if necessary, and updates the metric to the latest value. */
600 static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
601                                 data_set_t const *ds, value_list_t const *vl,
602                                 size_t ds_index) {
603   Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
604   if (m == NULL)
605     return -1;
606
607   return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
608                        vl->interval);
609 }
610
611 /* metric_family_destroy frees the memory used by a metric family. */
612 static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
613   if (msg == NULL)
614     return;
615
616   sfree(msg->name);
617   sfree(msg->help);
618
619   for (size_t i = 0; i < msg->n_metric; i++) {
620     metric_destroy(msg->metric[i]);
621   }
622   sfree(msg->metric);
623
624   sfree(msg);
625 }
626
627 /* metric_family_create allocates and initializes a new metric family. */
628 static Io__Prometheus__Client__MetricFamily *
629 metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
630                      size_t ds_index) {
631   Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
632   if (msg == NULL)
633     return NULL;
634   io__prometheus__client__metric_family__init(msg);
635
636   msg->name = name;
637
638   char help[1024];
639   ssnprintf(
640       help, sizeof(help),
641       "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
642       vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
643       ds->ds[ds_index].name);
644   msg->help = strdup(help);
645
646   msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
647                   ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
648                   : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
649   msg->has_type = 1;
650
651   return msg;
652 }
653
654 /* metric_family_name creates a metric family's name from a data source. This is
655  * done in the same way as done by the "collectd_exporter" for best possible
656  * compatibility. In essence, the plugin, type and data source name go in the
657  * metric family name, while hostname, plugin instance and type instance go into
658  * the labels of a metric. */
659 static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
660                                 size_t ds_index) {
661   char const *fields[5] = {"collectd"};
662   size_t fields_num = 1;
663
664   if (strcmp(vl->plugin, vl->type) != 0) {
665     fields[fields_num] = vl->plugin;
666     fields_num++;
667   }
668   fields[fields_num] = vl->type;
669   fields_num++;
670
671   if (strcmp("value", ds->ds[ds_index].name) != 0) {
672     fields[fields_num] = ds->ds[ds_index].name;
673     fields_num++;
674   }
675
676   /* Prometheus best practices:
677    * cumulative metrics should have a "total" suffix. */
678   if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
679       (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
680     fields[fields_num] = "total";
681     fields_num++;
682   }
683
684   char name[5 * DATA_MAX_NAME_LEN];
685   strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
686   return strdup(name);
687 }
688
689 /* metric_family_get looks up the matching metric family, allocating it if
690  * necessary. */
691 static Io__Prometheus__Client__MetricFamily *
692 metric_family_get(data_set_t const *ds, value_list_t const *vl, size_t ds_index,
693                   bool allocate) {
694   char *name = metric_family_name(ds, vl, ds_index);
695   if (name == NULL) {
696     ERROR("write_prometheus plugin: Allocating metric family name failed.");
697     return NULL;
698   }
699
700   Io__Prometheus__Client__MetricFamily *fam = NULL;
701   if (c_avl_get(metrics, name, (void *)&fam) == 0) {
702     sfree(name);
703     assert(fam != NULL);
704     return fam;
705   }
706
707   if (!allocate) {
708     sfree(name);
709     return NULL;
710   }
711
712   fam = metric_family_create(name, ds, vl, ds_index);
713   if (fam == NULL) {
714     ERROR("write_prometheus plugin: Allocating metric family failed.");
715     sfree(name);
716     return NULL;
717   }
718
719   /* If successful, "name" is owned by "fam", i.e. don't free it here. */
720   DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
721         name);
722   name = NULL;
723
724   int status = c_avl_insert(metrics, fam->name, fam);
725   if (status != 0) {
726     ERROR("write_prometheus plugin: Adding \"%s\" failed.", fam->name);
727     metric_family_destroy(fam);
728     return NULL;
729   }
730
731   return fam;
732 }
733 /* }}} */
734
735 static void prom_logger(__attribute__((unused)) void *arg, char const *fmt,
736                         va_list ap) {
737   /* {{{ */
738   char errbuf[1024];
739   vsnprintf(errbuf, sizeof(errbuf), fmt, ap);
740
741   ERROR("write_prometheus plugin: %s", errbuf);
742 } /* }}} prom_logger */
743
744 #if MHD_VERSION >= 0x00090000
745 static int prom_open_socket(int addrfamily) {
746   /* {{{ */
747   char service[NI_MAXSERV];
748   ssnprintf(service, sizeof(service), "%hu", httpd_port);
749
750   struct addrinfo *res;
751   int status = getaddrinfo(httpd_host, service,
752                            &(struct addrinfo){
753                                .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
754                                .ai_family = addrfamily,
755                                .ai_socktype = SOCK_STREAM,
756                            },
757                            &res);
758   if (status != 0) {
759     return -1;
760   }
761
762   int fd = -1;
763   for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
764     int flags = ai->ai_socktype;
765 #ifdef SOCK_CLOEXEC
766     flags |= SOCK_CLOEXEC;
767 #endif
768
769     fd = socket(ai->ai_family, flags, 0);
770     if (fd == -1)
771       continue;
772
773     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) != 0) {
774       WARNING("write_prometheus plugin: setsockopt(SO_REUSEADDR) failed: %s",
775               STRERRNO);
776       close(fd);
777       fd = -1;
778       continue;
779     }
780
781     if (bind(fd, ai->ai_addr, ai->ai_addrlen) != 0) {
782       close(fd);
783       fd = -1;
784       continue;
785     }
786
787     if (listen(fd, /* backlog = */ 16) != 0) {
788       close(fd);
789       fd = -1;
790       continue;
791     }
792
793     char str_node[NI_MAXHOST];
794     char str_service[NI_MAXSERV];
795
796     getnameinfo(ai->ai_addr, ai->ai_addrlen, str_node, sizeof(str_node),
797                 str_service, sizeof(str_service),
798                 NI_NUMERICHOST | NI_NUMERICSERV);
799
800     INFO("write_prometheus plugin: Listening on [%s]:%s.", str_node,
801          str_service);
802     break;
803   }
804
805   freeaddrinfo(res);
806
807   return fd;
808 } /* }}} int prom_open_socket */
809
810 static struct MHD_Daemon *prom_start_daemon() {
811   /* {{{ */
812   int fd = prom_open_socket(PF_INET6);
813   if (fd == -1)
814     fd = prom_open_socket(PF_INET);
815   if (fd == -1) {
816     ERROR("write_prometheus plugin: Opening a listening socket for [%s]:%hu "
817           "failed.",
818           (httpd_host != NULL) ? httpd_host : "::", httpd_port);
819     return NULL;
820   }
821
822   unsigned int flags = MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG;
823 #if MHD_VERSION >= 0x00095300
824   flags |= MHD_USE_INTERNAL_POLLING_THREAD;
825 #endif
826
827   struct MHD_Daemon *d = MHD_start_daemon(
828       flags, httpd_port,
829       /* MHD_AcceptPolicyCallback = */ NULL,
830       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
831       MHD_OPTION_LISTEN_SOCKET, fd, MHD_OPTION_EXTERNAL_LOGGER, prom_logger,
832       NULL, MHD_OPTION_END);
833   if (d == NULL) {
834     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
835     close(fd);
836     return NULL;
837   }
838
839   return d;
840 } /* }}} struct MHD_Daemon *prom_start_daemon */
841 #else /* if MHD_VERSION < 0x00090000 */
842 static struct MHD_Daemon *prom_start_daemon() {
843   /* {{{ */
844   struct MHD_Daemon *d = MHD_start_daemon(
845       MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG, httpd_port,
846       /* MHD_AcceptPolicyCallback = */ NULL,
847       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
848       MHD_OPTION_EXTERNAL_LOGGER, prom_logger, NULL, MHD_OPTION_END);
849   if (d == NULL) {
850     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
851     return NULL;
852   }
853
854   return d;
855 } /* }}} struct MHD_Daemon *prom_start_daemon */
856 #endif
857
858 /*
859  * collectd callbacks
860  */
861 static int prom_config(oconfig_item_t *ci) {
862   for (int i = 0; i < ci->children_num; i++) {
863     oconfig_item_t *child = ci->children + i;
864
865     if (strcasecmp("Host", child->key) == 0) {
866 #if MHD_VERSION >= 0x00090000
867       cf_util_get_string(child, &httpd_host);
868 #else
869       ERROR("write_prometheus plugin: Option `Host' not supported. Please "
870             "upgrade libmicrohttpd to at least 0.9.0");
871       return -1;
872 #endif
873     } else if (strcasecmp("Port", child->key) == 0) {
874       int status = cf_util_get_port_number(child);
875       if (status > 0)
876         httpd_port = (unsigned short)status;
877     } else if (strcasecmp("StalenessDelta", child->key) == 0) {
878       cf_util_get_cdtime(child, &staleness_delta);
879     } else {
880       WARNING("write_prometheus plugin: Ignoring unknown configuration option "
881               "\"%s\".",
882               child->key);
883     }
884   }
885
886   return 0;
887 }
888
889 static int prom_init() {
890   if (metrics == NULL) {
891     metrics = c_avl_create((void *)strcmp);
892     if (metrics == NULL) {
893       ERROR("write_prometheus plugin: c_avl_create() failed.");
894       return -1;
895     }
896   }
897
898   if (httpd == NULL) {
899     httpd = prom_start_daemon();
900     if (httpd == NULL) {
901       return -1;
902     }
903     DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
904           MHD_get_version());
905   }
906
907   return 0;
908 }
909
910 static int prom_write(data_set_t const *ds, value_list_t const *vl,
911                       __attribute__((unused)) user_data_t *ud) {
912   pthread_mutex_lock(&metrics_lock);
913
914   for (size_t i = 0; i < ds->ds_num; i++) {
915     Io__Prometheus__Client__MetricFamily *fam =
916         metric_family_get(ds, vl, i, /* allocate = */ true);
917     if (fam == NULL)
918       continue;
919
920     int status = metric_family_update(fam, ds, vl, i);
921     if (status != 0) {
922       ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
923             "status %d",
924             fam->name, status);
925       continue;
926     }
927   }
928
929   pthread_mutex_unlock(&metrics_lock);
930   return 0;
931 }
932
933 static int prom_missing(value_list_t const *vl,
934                         __attribute__((unused)) user_data_t *ud) {
935   data_set_t const *ds = plugin_get_ds(vl->type);
936   if (ds == NULL)
937     return ENOENT;
938
939   pthread_mutex_lock(&metrics_lock);
940
941   for (size_t i = 0; i < ds->ds_num; i++) {
942     Io__Prometheus__Client__MetricFamily *fam =
943         metric_family_get(ds, vl, i, /* allocate = */ false);
944     if (fam == NULL)
945       continue;
946
947     int status = metric_family_delete_metric(fam, vl);
948     if (status != 0) {
949       ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
950             "failed with status %d",
951             fam->name, status);
952
953       continue;
954     }
955
956     if (fam->n_metric == 0) {
957       int status = c_avl_remove(metrics, fam->name, NULL, NULL);
958       if (status != 0) {
959         ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
960               "with status %d",
961               fam->name, status);
962         continue;
963       }
964       metric_family_destroy(fam);
965     }
966   }
967
968   pthread_mutex_unlock(&metrics_lock);
969   return 0;
970 }
971
972 static int prom_shutdown() {
973   if (httpd != NULL) {
974     MHD_stop_daemon(httpd);
975     httpd = NULL;
976   }
977
978   pthread_mutex_lock(&metrics_lock);
979   if (metrics != NULL) {
980     char *name;
981     Io__Prometheus__Client__MetricFamily *fam;
982     while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
983       assert(name == fam->name);
984       name = NULL;
985
986       metric_family_destroy(fam);
987     }
988     c_avl_destroy(metrics);
989     metrics = NULL;
990   }
991   pthread_mutex_unlock(&metrics_lock);
992
993   sfree(httpd_host);
994
995   return 0;
996 }
997
998 void module_register() {
999   plugin_register_complex_config("write_prometheus", prom_config);
1000   plugin_register_init("write_prometheus", prom_init);
1001   plugin_register_write("write_prometheus", prom_write,
1002                         /* user data = */ NULL);
1003   plugin_register_missing("write_prometheus", prom_missing,
1004                           /* user data = */ NULL);
1005   plugin_register_shutdown("write_prometheus", prom_shutdown);
1006 }