Auto-Merge pull request #2822 from rpv-tomsk/issue-976
[collectd.git] / src / write_prometheus.c
1 /**
2  * collectd - src/write_prometheus.c
3  * Copyright (C) 2016       Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to deal
7  * in the Software without restriction, including without limitation the rights
8  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9  * copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21  * SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at collectd.org>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_avltree.h"
32 #include "utils_complain.h"
33 #include "utils_time.h"
34
35 #include "prometheus.pb-c.h"
36
37 #include <microhttpd.h>
38
39 #include <netdb.h>
40 #include <sys/socket.h>
41 #include <sys/types.h>
42
43 #ifndef PROMETHEUS_DEFAULT_STALENESS_DELTA
44 #define PROMETHEUS_DEFAULT_STALENESS_DELTA TIME_T_TO_CDTIME_T_STATIC(300)
45 #endif
46
47 #define VARINT_UINT32_BYTES 5
48
49 #define CONTENT_TYPE_PROTO                                                     \
50   "application/vnd.google.protobuf; proto=io.prometheus.client.MetricFamily; " \
51   "encoding=delimited"
52 #define CONTENT_TYPE_TEXT "text/plain; version=0.0.4"
53
54 static c_avl_tree_t *metrics;
55 static pthread_mutex_t metrics_lock = PTHREAD_MUTEX_INITIALIZER;
56
57 static unsigned short httpd_port = 9103;
58 static struct MHD_Daemon *httpd;
59
60 static cdtime_t staleness_delta = PROMETHEUS_DEFAULT_STALENESS_DELTA;
61
62 /* Unfortunately, protoc-c doesn't export its implementation of varint, so we
63  * need to implement our own. */
64 static size_t varint(uint8_t buffer[static VARINT_UINT32_BYTES],
65                      uint32_t value) {
66   for (size_t i = 0; i < VARINT_UINT32_BYTES; i++) {
67     buffer[i] = (uint8_t)(value & 0x7f);
68     value >>= 7;
69
70     if (value == 0)
71       return i + 1;
72
73     buffer[i] |= 0x80;
74   }
75
76   return 0;
77 }
78
79 /* format_protobuf iterates over all metric families in "metrics" and adds them
80  * to a buffer in ProtoBuf format. It prefixes each protobuf with its encoded
81  * size, the so called "delimited" format. */
82 static void format_protobuf(ProtobufCBuffer *buffer) {
83   pthread_mutex_lock(&metrics_lock);
84
85   char *unused_name;
86   Io__Prometheus__Client__MetricFamily *fam;
87   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
88   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
89     /* Prometheus uses a message length prefix to determine where one
90      * MetricFamily ends and the next begins. This delimiter is encoded as a
91      * "varint", which is common in Protobufs. */
92     uint8_t delim[VARINT_UINT32_BYTES] = {0};
93     size_t delim_len = varint(
94         delim,
95         (uint32_t)io__prometheus__client__metric_family__get_packed_size(fam));
96     buffer->append(buffer, delim_len, delim);
97
98     io__prometheus__client__metric_family__pack_to_buffer(fam, buffer);
99   }
100   c_avl_iterator_destroy(iter);
101
102   pthread_mutex_unlock(&metrics_lock);
103 }
104
105 static char const *escape_label_value(char *buffer, size_t buffer_size,
106                                       char const *value) {
107   /* shortcut for values that don't need escaping. */
108   if (strpbrk(value, "\n\"\\") == NULL)
109     return value;
110
111   size_t value_len = strlen(value);
112   size_t buffer_len = 0;
113
114   for (size_t i = 0; i < value_len; i++) {
115     switch (value[i]) {
116     case '\n':
117     case '"':
118     case '\\':
119       if ((buffer_size - buffer_len) < 3) {
120         break;
121       }
122       buffer[buffer_len] = '\\';
123       buffer[buffer_len + 1] = (value[i] == '\n') ? 'n' : value[i];
124       buffer_len += 2;
125       break;
126
127     default:
128       if ((buffer_size - buffer_len) < 2) {
129         break;
130       }
131       buffer[buffer_len] = value[i];
132       buffer_len++;
133       break;
134     }
135   }
136
137   assert(buffer_len < buffer_size);
138   buffer[buffer_len] = 0;
139   return buffer;
140 }
141
142 /* format_labels formats a metric's labels in Prometheus-compatible format. This
143  * format looks like this:
144  *
145  *   key0="value0",key1="value1"
146  */
147 static char *format_labels(char *buffer, size_t buffer_size,
148                            Io__Prometheus__Client__Metric const *m) {
149   /* our metrics always have at least one and at most three labels. */
150   assert(m->n_label >= 1);
151   assert(m->n_label <= 3);
152
153 #define LABEL_KEY_SIZE DATA_MAX_NAME_LEN
154 #define LABEL_VALUE_SIZE (2 * DATA_MAX_NAME_LEN - 1)
155 #define LABEL_BUFFER_SIZE (LABEL_KEY_SIZE + LABEL_VALUE_SIZE + 4)
156
157   char *labels[3] = {
158       (char[LABEL_BUFFER_SIZE]){0}, (char[LABEL_BUFFER_SIZE]){0},
159       (char[LABEL_BUFFER_SIZE]){0},
160   };
161
162   /* N.B.: the label *names* are hard-coded by this plugin and therefore we
163    * know that they are sane. */
164   for (size_t i = 0; i < m->n_label; i++) {
165     char value[LABEL_VALUE_SIZE];
166     snprintf(labels[i], LABEL_BUFFER_SIZE, "%s=\"%s\"", m->label[i]->name,
167              escape_label_value(value, sizeof(value), m->label[i]->value));
168   }
169
170   strjoin(buffer, buffer_size, labels, m->n_label, ",");
171   return buffer;
172 }
173
174 /* format_protobuf iterates over all metric families in "metrics" and adds them
175  * to a buffer in plain text format. */
176 static void format_text(ProtobufCBuffer *buffer) {
177   pthread_mutex_lock(&metrics_lock);
178
179   char *unused_name;
180   Io__Prometheus__Client__MetricFamily *fam;
181   c_avl_iterator_t *iter = c_avl_get_iterator(metrics);
182   while (c_avl_iterator_next(iter, (void *)&unused_name, (void *)&fam) == 0) {
183     char line[1024]; /* 4x DATA_MAX_NAME_LEN? */
184
185     snprintf(line, sizeof(line), "# HELP %s %s\n", fam->name, fam->help);
186     buffer->append(buffer, strlen(line), (uint8_t *)line);
187
188     snprintf(line, sizeof(line), "# TYPE %s %s\n", fam->name,
189              (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
190                  ? "gauge"
191                  : "counter");
192     buffer->append(buffer, strlen(line), (uint8_t *)line);
193
194     for (size_t i = 0; i < fam->n_metric; i++) {
195       Io__Prometheus__Client__Metric *m = fam->metric[i];
196
197       char labels[1024];
198
199       char timestamp_ms[24] = "";
200       if (m->has_timestamp_ms)
201         snprintf(timestamp_ms, sizeof(timestamp_ms), " %" PRIi64,
202                  m->timestamp_ms);
203
204       if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE)
205         snprintf(line, sizeof(line), "%s{%s} " GAUGE_FORMAT "%s\n", fam->name,
206                  format_labels(labels, sizeof(labels), m), m->gauge->value,
207                  timestamp_ms);
208       else /* if (fam->type == IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER) */
209         snprintf(line, sizeof(line), "%s{%s} %.0f%s\n", fam->name,
210                  format_labels(labels, sizeof(labels), m), m->counter->value,
211                  timestamp_ms);
212
213       buffer->append(buffer, strlen(line), (uint8_t *)line);
214     }
215   }
216   c_avl_iterator_destroy(iter);
217
218   char server[1024];
219   snprintf(server, sizeof(server), "\n# collectd/write_prometheus %s at %s\n",
220            PACKAGE_VERSION, hostname_g);
221   buffer->append(buffer, strlen(server), (uint8_t *)server);
222
223   pthread_mutex_unlock(&metrics_lock);
224 }
225
226 /* http_handler is the callback called by the microhttpd library. It essentially
227  * handles all HTTP request aspects and creates an HTTP response. */
228 static int http_handler(void *cls, struct MHD_Connection *connection,
229                         const char *url, const char *method,
230                         const char *version, const char *upload_data,
231                         size_t *upload_data_size, void **connection_state) {
232   if (strcmp(method, MHD_HTTP_METHOD_GET) != 0) {
233     return MHD_NO;
234   }
235
236   /* On the first call for each connection, return without anything further.
237    * Apparently not everything has been initialized yet or so; the docs are not
238    * very specific on the issue. */
239   if (*connection_state == NULL) {
240     /* set to a random non-NULL pointer. */
241     *connection_state = &(int){42};
242     return MHD_YES;
243   }
244
245   char const *accept = MHD_lookup_connection_value(connection, MHD_HEADER_KIND,
246                                                    MHD_HTTP_HEADER_ACCEPT);
247   bool want_proto = (accept != NULL) &&
248                     (strstr(accept, "application/vnd.google.protobuf") != NULL);
249
250   uint8_t scratch[4096] = {0};
251   ProtobufCBufferSimple simple = PROTOBUF_C_BUFFER_SIMPLE_INIT(scratch);
252   ProtobufCBuffer *buffer = (ProtobufCBuffer *)&simple;
253
254   if (want_proto)
255     format_protobuf(buffer);
256   else
257     format_text(buffer);
258
259 #if defined(MHD_VERSION) && MHD_VERSION >= 0x00090500
260   struct MHD_Response *res = MHD_create_response_from_buffer(
261       simple.len, simple.data, MHD_RESPMEM_MUST_COPY);
262 #else
263   struct MHD_Response *res = MHD_create_response_from_data(
264       simple.len, simple.data, /* must_free = */ 0, /* must_copy = */ 1);
265 #endif
266   MHD_add_response_header(res, MHD_HTTP_HEADER_CONTENT_TYPE,
267                           want_proto ? CONTENT_TYPE_PROTO : CONTENT_TYPE_TEXT);
268
269   int status = MHD_queue_response(connection, MHD_HTTP_OK, res);
270
271   MHD_destroy_response(res);
272   PROTOBUF_C_BUFFER_SIMPLE_CLEAR(&simple);
273   return status;
274 }
275
276 /*
277  * Functions for manipulating the global state in "metrics". This is organized
278  * in two tiers: the global "metrics" tree holds "metric families", which are
279  * identified by a name (a string). Each metric family has one or more
280  * "metrics", which are identified by a unique set of key-value-pairs. For
281  * example:
282  *
283  * collectd_cpu_total
284  *   {cpu="0",type="idle"}
285  *   {cpu="0",type="user"}
286  *   ...
287  * collectd_memory
288  *   {memory="used"}
289  *   {memory="free"}
290  *   ...
291  * {{{ */
292 /* label_pair_destroy frees the memory used by a label pair. */
293 static void label_pair_destroy(Io__Prometheus__Client__LabelPair *msg) {
294   if (msg == NULL)
295     return;
296
297   sfree(msg->name);
298   sfree(msg->value);
299
300   sfree(msg);
301 }
302
303 /* label_pair_clone allocates and initializes a new label pair. */
304 static Io__Prometheus__Client__LabelPair *
305 label_pair_clone(Io__Prometheus__Client__LabelPair const *orig) {
306   Io__Prometheus__Client__LabelPair *copy = calloc(1, sizeof(*copy));
307   if (copy == NULL)
308     return NULL;
309   io__prometheus__client__label_pair__init(copy);
310
311   copy->name = strdup(orig->name);
312   copy->value = strdup(orig->value);
313   if ((copy->name == NULL) || (copy->value == NULL)) {
314     label_pair_destroy(copy);
315     return NULL;
316   }
317
318   return copy;
319 }
320
321 /* metric_destroy frees the memory used by a metric. */
322 static void metric_destroy(Io__Prometheus__Client__Metric *msg) {
323   if (msg == NULL)
324     return;
325
326   for (size_t i = 0; i < msg->n_label; i++) {
327     label_pair_destroy(msg->label[i]);
328   }
329   sfree(msg->label);
330
331   sfree(msg->gauge);
332   sfree(msg->counter);
333
334   sfree(msg);
335 }
336
337 /* metric_cmp compares two metrics. It's prototype makes it easy to use with
338  * qsort(3) and bsearch(3). */
339 static int metric_cmp(void const *a, void const *b) {
340   Io__Prometheus__Client__Metric const *m_a =
341       *((Io__Prometheus__Client__Metric **)a);
342   Io__Prometheus__Client__Metric const *m_b =
343       *((Io__Prometheus__Client__Metric **)b);
344
345   if (m_a->n_label < m_b->n_label)
346     return -1;
347   else if (m_a->n_label > m_b->n_label)
348     return 1;
349
350   /* Prometheus does not care about the order of labels. All labels in this
351    * plugin are created by METRIC_ADD_LABELS(), though, and therefore always
352    * appear in the same order. We take advantage of this and simplify the check
353    * by making sure all labels are the same in each position.
354    *
355    * We also only need to check the label values, because the label names are
356    * the same for all metrics in a metric family.
357    *
358    * 3 labels:
359    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
360    * [1] type="$type_instance"      => "type" is a static string
361    * [2] instance="$host"           => "instance" is a static string
362    *
363    * 2 labels, variant 1:
364    * [0] $plugin="$plugin_instance" => $plugin is the same within a family
365    * [1] instance="$host"           => "instance" is a static string
366    *
367    * 2 labels, variant 2:
368    * [0] $plugin="$type_instance"   => $plugin is the same within a family
369    * [1] instance="$host"           => "instance" is a static string
370    *
371    * 1 label:
372    * [1] instance="$host"           => "instance" is a static string
373    */
374   for (size_t i = 0; i < m_a->n_label; i++) {
375     int status = strcmp(m_a->label[i]->value, m_b->label[i]->value);
376     if (status != 0)
377       return status;
378
379 #if COLLECT_DEBUG
380     assert(strcmp(m_a->label[i]->name, m_b->label[i]->name) == 0);
381 #endif
382   }
383
384   return 0;
385 }
386
387 #define METRIC_INIT                                                            \
388   &(Io__Prometheus__Client__Metric) {                                          \
389     .label =                                                                   \
390         (Io__Prometheus__Client__LabelPair *[]){                               \
391             &(Io__Prometheus__Client__LabelPair){                              \
392                 .name = NULL,                                                  \
393             },                                                                 \
394             &(Io__Prometheus__Client__LabelPair){                              \
395                 .name = NULL,                                                  \
396             },                                                                 \
397             &(Io__Prometheus__Client__LabelPair){                              \
398                 .name = NULL,                                                  \
399             },                                                                 \
400         },                                                                     \
401     .n_label = 0,                                                              \
402   }
403
404 #define METRIC_ADD_LABELS(m, vl)                                               \
405   do {                                                                         \
406     if (strlen((vl)->plugin_instance) != 0) {                                  \
407       (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                   \
408       (m)->label[(m)->n_label]->value = (char *)(vl)->plugin_instance;         \
409       (m)->n_label++;                                                          \
410     }                                                                          \
411                                                                                \
412     if (strlen((vl)->type_instance) != 0) {                                    \
413       (m)->label[(m)->n_label]->name = "type";                                 \
414       if (strlen((vl)->plugin_instance) == 0)                                  \
415         (m)->label[(m)->n_label]->name = (char *)(vl)->plugin;                 \
416       (m)->label[(m)->n_label]->value = (char *)(vl)->type_instance;           \
417       (m)->n_label++;                                                          \
418     }                                                                          \
419                                                                                \
420     (m)->label[(m)->n_label]->name = "instance";                               \
421     (m)->label[(m)->n_label]->value = (char *)(vl)->host;                      \
422     (m)->n_label++;                                                            \
423   } while (0)
424
425 /* metric_clone allocates and initializes a new metric based on orig. */
426 static Io__Prometheus__Client__Metric *
427 metric_clone(Io__Prometheus__Client__Metric const *orig) {
428   Io__Prometheus__Client__Metric *copy = calloc(1, sizeof(*copy));
429   if (copy == NULL)
430     return NULL;
431   io__prometheus__client__metric__init(copy);
432
433   copy->n_label = orig->n_label;
434   copy->label = calloc(copy->n_label, sizeof(*copy->label));
435   if (copy->label == NULL) {
436     sfree(copy);
437     return NULL;
438   }
439
440   for (size_t i = 0; i < copy->n_label; i++) {
441     copy->label[i] = label_pair_clone(orig->label[i]);
442     if (copy->label[i] == NULL) {
443       metric_destroy(copy);
444       return NULL;
445     }
446   }
447
448   return copy;
449 }
450
451 /* metric_update stores the new value and timestamp in m. */
452 static int metric_update(Io__Prometheus__Client__Metric *m, value_t value,
453                          int ds_type, cdtime_t t, cdtime_t interval) {
454   if (ds_type == DS_TYPE_GAUGE) {
455     sfree(m->counter);
456     if (m->gauge == NULL) {
457       m->gauge = calloc(1, sizeof(*m->gauge));
458       if (m->gauge == NULL)
459         return ENOMEM;
460       io__prometheus__client__gauge__init(m->gauge);
461     }
462
463     m->gauge->value = (double)value.gauge;
464     m->gauge->has_value = 1;
465   } else { /* not gauge */
466     sfree(m->gauge);
467     if (m->counter == NULL) {
468       m->counter = calloc(1, sizeof(*m->counter));
469       if (m->counter == NULL)
470         return ENOMEM;
471       io__prometheus__client__counter__init(m->counter);
472     }
473
474     switch (ds_type) {
475     case DS_TYPE_ABSOLUTE:
476       m->counter->value = (double)value.absolute;
477       break;
478     case DS_TYPE_COUNTER:
479       m->counter->value = (double)value.counter;
480       break;
481     default:
482       m->counter->value = (double)value.derive;
483       break;
484     }
485     m->counter->has_value = 1;
486   }
487
488   /* Prometheus has a globally configured timeout after which metrics are
489    * considered stale. This causes problems when metrics have an interval
490    * exceeding that limit. We emulate the behavior of "pushgateway" and *not*
491    * send a timestamp value â€“ Prometheus will fill in the current time. */
492   if (interval <= staleness_delta) {
493     m->timestamp_ms = CDTIME_T_TO_MS(t);
494     m->has_timestamp_ms = 1;
495   } else {
496     static c_complain_t long_metric = C_COMPLAIN_INIT_STATIC;
497     c_complain(
498         LOG_NOTICE, &long_metric,
499         "write_prometheus plugin: You have metrics with an interval exceeding "
500         "\"StalenessDelta\" setting (%.3fs). This is suboptimal, please check "
501         "the collectd.conf(5) manual page to understand what's going on.",
502         CDTIME_T_TO_DOUBLE(staleness_delta));
503
504     m->timestamp_ms = 0;
505     m->has_timestamp_ms = 0;
506   }
507
508   return 0;
509 }
510
511 /* metric_family_add_metric adds m to the metric list of fam. */
512 static int metric_family_add_metric(Io__Prometheus__Client__MetricFamily *fam,
513                                     Io__Prometheus__Client__Metric *m) {
514   Io__Prometheus__Client__Metric **tmp =
515       realloc(fam->metric, (fam->n_metric + 1) * sizeof(*fam->metric));
516   if (tmp == NULL)
517     return ENOMEM;
518   fam->metric = tmp;
519
520   fam->metric[fam->n_metric] = m;
521   fam->n_metric++;
522
523   /* Sort the metrics so that lookup is fast. */
524   qsort(fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
525
526   return 0;
527 }
528
529 /* metric_family_delete_metric looks up and deletes the metric corresponding to
530  * vl. */
531 static int
532 metric_family_delete_metric(Io__Prometheus__Client__MetricFamily *fam,
533                             value_list_t const *vl) {
534   Io__Prometheus__Client__Metric *key = METRIC_INIT;
535   METRIC_ADD_LABELS(key, vl);
536
537   size_t i;
538   for (i = 0; i < fam->n_metric; i++) {
539     if (metric_cmp(&key, &fam->metric[i]) == 0)
540       break;
541   }
542
543   if (i >= fam->n_metric)
544     return ENOENT;
545
546   metric_destroy(fam->metric[i]);
547   if ((fam->n_metric - 1) > i)
548     memmove(&fam->metric[i], &fam->metric[i + 1],
549             ((fam->n_metric - 1) - i) * sizeof(fam->metric[i]));
550   fam->n_metric--;
551
552   if (fam->n_metric == 0) {
553     sfree(fam->metric);
554     return 0;
555   }
556
557   Io__Prometheus__Client__Metric **tmp =
558       realloc(fam->metric, fam->n_metric * sizeof(*fam->metric));
559   if (tmp != NULL)
560     fam->metric = tmp;
561
562   return 0;
563 }
564
565 /* metric_family_get_metric looks up the matching metric in a metric family,
566  * allocating it if necessary. */
567 static Io__Prometheus__Client__Metric *
568 metric_family_get_metric(Io__Prometheus__Client__MetricFamily *fam,
569                          value_list_t const *vl) {
570   Io__Prometheus__Client__Metric *key = METRIC_INIT;
571   METRIC_ADD_LABELS(key, vl);
572
573   /* Metrics are sorted in metric_family_add_metric() so that we can do a binary
574    * search here. */
575   Io__Prometheus__Client__Metric **m = bsearch(
576       &key, fam->metric, fam->n_metric, sizeof(*fam->metric), metric_cmp);
577
578   if (m != NULL) {
579     return *m;
580   }
581
582   Io__Prometheus__Client__Metric *new_metric = metric_clone(key);
583   if (new_metric == NULL)
584     return NULL;
585
586   DEBUG("write_prometheus plugin: created new metric in family");
587   int status = metric_family_add_metric(fam, new_metric);
588   if (status != 0) {
589     metric_destroy(new_metric);
590     return NULL;
591   }
592
593   return new_metric;
594 }
595
596 /* metric_family_update looks up the matching metric in a metric family,
597  * allocating it if necessary, and updates the metric to the latest value. */
598 static int metric_family_update(Io__Prometheus__Client__MetricFamily *fam,
599                                 data_set_t const *ds, value_list_t const *vl,
600                                 size_t ds_index) {
601   Io__Prometheus__Client__Metric *m = metric_family_get_metric(fam, vl);
602   if (m == NULL)
603     return -1;
604
605   return metric_update(m, vl->values[ds_index], ds->ds[ds_index].type, vl->time,
606                        vl->interval);
607 }
608
609 /* metric_family_destroy frees the memory used by a metric family. */
610 static void metric_family_destroy(Io__Prometheus__Client__MetricFamily *msg) {
611   if (msg == NULL)
612     return;
613
614   sfree(msg->name);
615   sfree(msg->help);
616
617   for (size_t i = 0; i < msg->n_metric; i++) {
618     metric_destroy(msg->metric[i]);
619   }
620   sfree(msg->metric);
621
622   sfree(msg);
623 }
624
625 /* metric_family_create allocates and initializes a new metric family. */
626 static Io__Prometheus__Client__MetricFamily *
627 metric_family_create(char *name, data_set_t const *ds, value_list_t const *vl,
628                      size_t ds_index) {
629   Io__Prometheus__Client__MetricFamily *msg = calloc(1, sizeof(*msg));
630   if (msg == NULL)
631     return NULL;
632   io__prometheus__client__metric_family__init(msg);
633
634   msg->name = name;
635
636   char help[1024];
637   snprintf(
638       help, sizeof(help),
639       "write_prometheus plugin: '%s' Type: '%s', Dstype: '%s', Dsname: '%s'",
640       vl->plugin, vl->type, DS_TYPE_TO_STRING(ds->ds[ds_index].type),
641       ds->ds[ds_index].name);
642   msg->help = strdup(help);
643
644   msg->type = (ds->ds[ds_index].type == DS_TYPE_GAUGE)
645                   ? IO__PROMETHEUS__CLIENT__METRIC_TYPE__GAUGE
646                   : IO__PROMETHEUS__CLIENT__METRIC_TYPE__COUNTER;
647   msg->has_type = 1;
648
649   return msg;
650 }
651
652 /* metric_family_name creates a metric family's name from a data source. This is
653  * done in the same way as done by the "collectd_exporter" for best possible
654  * compatibility. In essence, the plugin, type and data source name go in the
655  * metric family name, while hostname, plugin instance and type instance go into
656  * the labels of a metric. */
657 static char *metric_family_name(data_set_t const *ds, value_list_t const *vl,
658                                 size_t ds_index) {
659   char const *fields[5] = {"collectd"};
660   size_t fields_num = 1;
661
662   if (strcmp(vl->plugin, vl->type) != 0) {
663     fields[fields_num] = vl->plugin;
664     fields_num++;
665   }
666   fields[fields_num] = vl->type;
667   fields_num++;
668
669   if (strcmp("value", ds->ds[ds_index].name) != 0) {
670     fields[fields_num] = ds->ds[ds_index].name;
671     fields_num++;
672   }
673
674   /* Prometheus best practices:
675    * cumulative metrics should have a "total" suffix. */
676   if ((ds->ds[ds_index].type == DS_TYPE_COUNTER) ||
677       (ds->ds[ds_index].type == DS_TYPE_DERIVE)) {
678     fields[fields_num] = "total";
679     fields_num++;
680   }
681
682   char name[5 * DATA_MAX_NAME_LEN];
683   strjoin(name, sizeof(name), (char **)fields, fields_num, "_");
684   return strdup(name);
685 }
686
687 /* metric_family_get looks up the matching metric family, allocating it if
688  * necessary. */
689 static Io__Prometheus__Client__MetricFamily *
690 metric_family_get(data_set_t const *ds, value_list_t const *vl, size_t ds_index,
691                   bool allocate) {
692   char *name = metric_family_name(ds, vl, ds_index);
693   if (name == NULL) {
694     ERROR("write_prometheus plugin: Allocating metric family name failed.");
695     return NULL;
696   }
697
698   Io__Prometheus__Client__MetricFamily *fam = NULL;
699   if (c_avl_get(metrics, name, (void *)&fam) == 0) {
700     sfree(name);
701     assert(fam != NULL);
702     return fam;
703   }
704
705   if (!allocate) {
706     sfree(name);
707     return NULL;
708   }
709
710   fam = metric_family_create(name, ds, vl, ds_index);
711   if (fam == NULL) {
712     ERROR("write_prometheus plugin: Allocating metric family failed.");
713     sfree(name);
714     return NULL;
715   }
716
717   /* If successful, "name" is owned by "fam", i.e. don't free it here. */
718   DEBUG("write_prometheus plugin: metric family \"%s\" has been created.",
719         name);
720   name = NULL;
721
722   int status = c_avl_insert(metrics, fam->name, fam);
723   if (status != 0) {
724     ERROR("write_prometheus plugin: Adding \"%s\" failed.", name);
725     metric_family_destroy(fam);
726     return NULL;
727   }
728
729   return fam;
730 }
731 /* }}} */
732
733 static void prom_logger(__attribute__((unused)) void *arg, char const *fmt,
734                         va_list ap) {
735   /* {{{ */
736   char errbuf[1024];
737   vsnprintf(errbuf, sizeof(errbuf), fmt, ap);
738
739   ERROR("write_prometheus plugin: %s", errbuf);
740 } /* }}} prom_logger */
741
742 #if MHD_VERSION >= 0x00090000
743 static int prom_open_socket(int addrfamily) {
744   /* {{{ */
745   char service[NI_MAXSERV];
746   snprintf(service, sizeof(service), "%hu", httpd_port);
747
748   struct addrinfo *res;
749   int status = getaddrinfo(NULL, service,
750                            &(struct addrinfo){
751                                .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
752                                .ai_family = addrfamily,
753                                .ai_socktype = SOCK_STREAM,
754                            },
755                            &res);
756   if (status != 0) {
757     return -1;
758   }
759
760   int fd = -1;
761   for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
762     fd = socket(ai->ai_family, ai->ai_socktype | SOCK_CLOEXEC, 0);
763     if (fd == -1)
764       continue;
765
766     int tmp = 1;
767     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof(tmp)) != 0) {
768       WARNING("write_prometheus: setsockopt(SO_REUSEADDR) failed: %s",
769               STRERRNO);
770       close(fd);
771       fd = -1;
772       continue;
773     }
774
775     if (bind(fd, ai->ai_addr, ai->ai_addrlen) != 0) {
776       close(fd);
777       fd = -1;
778       continue;
779     }
780
781     if (listen(fd, /* backlog = */ 16) != 0) {
782       close(fd);
783       fd = -1;
784       continue;
785     }
786
787     break;
788   }
789
790   freeaddrinfo(res);
791
792   return fd;
793 } /* }}} int prom_open_socket */
794
795 static struct MHD_Daemon *prom_start_daemon() {
796   /* {{{ */
797   int fd = prom_open_socket(PF_INET6);
798   if (fd == -1)
799     fd = prom_open_socket(PF_INET);
800   if (fd == -1) {
801     ERROR("write_prometheus plugin: Opening a listening socket failed.");
802     return NULL;
803   }
804
805   struct MHD_Daemon *d = MHD_start_daemon(
806       MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG, httpd_port,
807       /* MHD_AcceptPolicyCallback = */ NULL,
808       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
809       MHD_OPTION_LISTEN_SOCKET, fd, MHD_OPTION_EXTERNAL_LOGGER, prom_logger,
810       NULL, MHD_OPTION_END);
811   if (d == NULL) {
812     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
813     close(fd);
814     return NULL;
815   }
816
817   return d;
818 } /* }}} struct MHD_Daemon *prom_start_daemon */
819 #else /* if MHD_VERSION < 0x00090000 */
820 static struct MHD_Daemon *prom_start_daemon() {
821   /* {{{ */
822   struct MHD_Daemon *d = MHD_start_daemon(
823       MHD_USE_THREAD_PER_CONNECTION | MHD_USE_DEBUG, httpd_port,
824       /* MHD_AcceptPolicyCallback = */ NULL,
825       /* MHD_AcceptPolicyCallback arg = */ NULL, http_handler, NULL,
826       MHD_OPTION_EXTERNAL_LOGGER, prom_logger, NULL, MHD_OPTION_END);
827   if (d == NULL) {
828     ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
829     return NULL;
830   }
831
832   return d;
833 } /* }}} struct MHD_Daemon *prom_start_daemon */
834 #endif
835
836 /*
837  * collectd callbacks
838  */
839 static int prom_config(oconfig_item_t *ci) {
840   for (int i = 0; i < ci->children_num; i++) {
841     oconfig_item_t *child = ci->children + i;
842
843     if (strcasecmp("Port", child->key) == 0) {
844       int status = cf_util_get_port_number(child);
845       if (status > 0)
846         httpd_port = (unsigned short)status;
847     } else if (strcasecmp("StalenessDelta", child->key) == 0) {
848       cf_util_get_cdtime(child, &staleness_delta);
849     } else {
850       WARNING("write_prometheus plugin: Ignoring unknown configuration option "
851               "\"%s\".",
852               child->key);
853     }
854   }
855
856   return 0;
857 }
858
859 static int prom_init() {
860   if (metrics == NULL) {
861     metrics = c_avl_create((void *)strcmp);
862     if (metrics == NULL) {
863       ERROR("write_prometheus plugin: c_avl_create() failed.");
864       return -1;
865     }
866   }
867
868   if (httpd == NULL) {
869     httpd = prom_start_daemon();
870     if (httpd == NULL) {
871       ERROR("write_prometheus plugin: MHD_start_daemon() failed.");
872       return -1;
873     }
874     DEBUG("write_prometheus plugin: Successfully started microhttpd %s",
875           MHD_get_version());
876   }
877
878   return 0;
879 }
880
881 static int prom_write(data_set_t const *ds, value_list_t const *vl,
882                       __attribute__((unused)) user_data_t *ud) {
883   pthread_mutex_lock(&metrics_lock);
884
885   for (size_t i = 0; i < ds->ds_num; i++) {
886     Io__Prometheus__Client__MetricFamily *fam =
887         metric_family_get(ds, vl, i, /* allocate = */ true);
888     if (fam == NULL)
889       continue;
890
891     int status = metric_family_update(fam, ds, vl, i);
892     if (status != 0) {
893       ERROR("write_prometheus plugin: Updating metric \"%s\" failed with "
894             "status %d",
895             fam->name, status);
896       continue;
897     }
898   }
899
900   pthread_mutex_unlock(&metrics_lock);
901   return 0;
902 }
903
904 static int prom_missing(value_list_t const *vl,
905                         __attribute__((unused)) user_data_t *ud) {
906   data_set_t const *ds = plugin_get_ds(vl->type);
907   if (ds == NULL)
908     return ENOENT;
909
910   pthread_mutex_lock(&metrics_lock);
911
912   for (size_t i = 0; i < ds->ds_num; i++) {
913     Io__Prometheus__Client__MetricFamily *fam =
914         metric_family_get(ds, vl, i, /* allocate = */ false);
915     if (fam == NULL)
916       continue;
917
918     int status = metric_family_delete_metric(fam, vl);
919     if (status != 0) {
920       ERROR("write_prometheus plugin: Deleting a metric in family \"%s\" "
921             "failed with status %d",
922             fam->name, status);
923
924       continue;
925     }
926
927     if (fam->n_metric == 0) {
928       int status = c_avl_remove(metrics, fam->name, NULL, NULL);
929       if (status != 0) {
930         ERROR("write_prometheus plugin: Deleting metric family \"%s\" failed "
931               "with status %d",
932               fam->name, status);
933         continue;
934       }
935       metric_family_destroy(fam);
936     }
937   }
938
939   pthread_mutex_unlock(&metrics_lock);
940   return 0;
941 }
942
943 static int prom_shutdown() {
944   if (httpd != NULL) {
945     MHD_stop_daemon(httpd);
946     httpd = NULL;
947   }
948
949   pthread_mutex_lock(&metrics_lock);
950   if (metrics != NULL) {
951     char *name;
952     Io__Prometheus__Client__MetricFamily *fam;
953     while (c_avl_pick(metrics, (void *)&name, (void *)&fam) == 0) {
954       assert(name == fam->name);
955       name = NULL;
956
957       metric_family_destroy(fam);
958     }
959     c_avl_destroy(metrics);
960     metrics = NULL;
961   }
962   pthread_mutex_unlock(&metrics_lock);
963
964   return 0;
965 }
966
967 void module_register() {
968   plugin_register_complex_config("write_prometheus", prom_config);
969   plugin_register_init("write_prometheus", prom_init);
970   plugin_register_write("write_prometheus", prom_write,
971                         /* user data = */ NULL);
972   plugin_register_missing("write_prometheus", prom_missing,
973                           /* user data = */ NULL);
974   plugin_register_shutdown("write_prometheus", prom_shutdown);
975 }