Merge remote-tracking branch 'origin/collectd-5.8'
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  * Copyright (C) 2016      Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastian Harl <sh at tokkee.org>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
31
32 #include <fstream>
33 #include <iostream>
34 #include <queue>
35 #include <vector>
36
37 #include "collectd.grpc.pb.h"
38
39 extern "C" {
40 #include <fnmatch.h>
41 #include <stdbool.h>
42
43 #include "collectd.h"
44 #include "common.h"
45 #include "plugin.h"
46
47 #include "daemon/utils_cache.h"
48 }
49
50 using collectd::Collectd;
51
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 typedef google::protobuf::Map<grpc::string, collectd::types::MetadataValue>
60     grpcMetadata;
61
62 /*
63  * private types
64  */
65
66 struct Listener {
67   grpc::string addr;
68   grpc::string port;
69
70   grpc::SslServerCredentialsOptions *ssl;
71 };
72 static std::vector<Listener> listeners;
73 static grpc::string default_addr("0.0.0.0:50051");
74
75 /*
76  * helper functions
77  */
78
79 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
80   if (fnmatch(matcher->host, vl->host, 0))
81     return false;
82
83   if (fnmatch(matcher->plugin, vl->plugin, 0))
84     return false;
85   if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
86     return false;
87
88   if (fnmatch(matcher->type, vl->type, 0))
89     return false;
90   if (fnmatch(matcher->type_instance, vl->type_instance, 0))
91     return false;
92
93   return true;
94 } /* ident_matches */
95
96 static grpc::string read_file(const char *filename) {
97   std::ifstream f;
98   grpc::string s, content;
99
100   f.open(filename);
101   if (!f.is_open()) {
102     ERROR("grpc: Failed to open '%s'", filename);
103     return "";
104   }
105
106   while (std::getline(f, s)) {
107     content += s;
108     content.push_back('\n');
109   }
110   f.close();
111   return content;
112 } /* read_file */
113
114 /*
115  * proto conversion
116  */
117
118 static void marshal_ident(const value_list_t *vl,
119                           collectd::types::Identifier *msg) {
120   msg->set_host(vl->host);
121   msg->set_plugin(vl->plugin);
122   if (vl->plugin_instance[0] != '\0')
123     msg->set_plugin_instance(vl->plugin_instance);
124   msg->set_type(vl->type);
125   if (vl->type_instance[0] != '\0')
126     msg->set_type_instance(vl->type_instance);
127 } /* marshal_ident */
128
129 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
130                                     value_list_t *vl, bool require_fields) {
131   std::string s;
132
133   s = msg.host();
134   if (!s.length() && require_fields)
135     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136                         grpc::string("missing host name"));
137   sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
138
139   s = msg.plugin();
140   if (!s.length() && require_fields)
141     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
142                         grpc::string("missing plugin name"));
143   sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
144
145   s = msg.type();
146   if (!s.length() && require_fields)
147     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
148                         grpc::string("missing type name"));
149   sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
150
151   s = msg.plugin_instance();
152   sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
153
154   s = msg.type_instance();
155   sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
156
157   return grpc::Status::OK;
158 } /* unmarshal_ident() */
159
160 static grpc::Status marshal_meta_data(meta_data_t *meta,
161                                       grpcMetadata *mutable_meta_data) {
162   char **meta_data_keys = nullptr;
163   int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys);
164   if (meta_data_keys_len < 0) {
165     return grpc::Status(grpc::StatusCode::INTERNAL,
166                         grpc::string("error getting metadata keys"));
167   }
168
169   for (int i = 0; i < meta_data_keys_len; i++) {
170     char *key = meta_data_keys[i];
171     int md_type = meta_data_type(meta, key);
172
173     collectd::types::MetadataValue md_value;
174     md_value.Clear();
175
176     switch (md_type) {
177     case MD_TYPE_STRING:
178       char *md_string;
179       if (meta_data_get_string(meta, key, &md_string) != 0 ||
180           md_string == nullptr) {
181         strarray_free(meta_data_keys, meta_data_keys_len);
182         return grpc::Status(grpc::StatusCode::INTERNAL,
183                             grpc::string("missing metadata"));
184       }
185       md_value.set_string_value(md_string);
186       free(md_string);
187       break;
188     case MD_TYPE_SIGNED_INT:
189       int64_t int64_value;
190       if (meta_data_get_signed_int(meta, key, &int64_value) != 0) {
191         strarray_free(meta_data_keys, meta_data_keys_len);
192         return grpc::Status(grpc::StatusCode::INTERNAL,
193                             grpc::string("missing metadata"));
194       }
195       md_value.set_int64_value(int64_value);
196       break;
197     case MD_TYPE_UNSIGNED_INT:
198       uint64_t uint64_value;
199       if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) {
200         strarray_free(meta_data_keys, meta_data_keys_len);
201         return grpc::Status(grpc::StatusCode::INTERNAL,
202                             grpc::string("missing metadata"));
203       }
204       md_value.set_uint64_value(uint64_value);
205       break;
206     case MD_TYPE_DOUBLE:
207       double double_value;
208       if (meta_data_get_double(meta, key, &double_value) != 0) {
209         strarray_free(meta_data_keys, meta_data_keys_len);
210         return grpc::Status(grpc::StatusCode::INTERNAL,
211                             grpc::string("missing metadata"));
212       }
213       md_value.set_double_value(double_value);
214       break;
215     case MD_TYPE_BOOLEAN:
216       bool bool_value;
217       if (meta_data_get_boolean(meta, key, &bool_value) != 0) {
218         strarray_free(meta_data_keys, meta_data_keys_len);
219         return grpc::Status(grpc::StatusCode::INTERNAL,
220                             grpc::string("missing metadata"));
221       }
222       md_value.set_bool_value(bool_value);
223       break;
224     default:
225       strarray_free(meta_data_keys, meta_data_keys_len);
226       ERROR("grpc: invalid metadata type (%d)", md_type);
227       return grpc::Status(grpc::StatusCode::INTERNAL,
228                           grpc::string("unknown metadata type"));
229     }
230
231     (*mutable_meta_data)[grpc::string(key)] = md_value;
232
233     strarray_free(meta_data_keys, meta_data_keys_len);
234   }
235
236   return grpc::Status::OK;
237 }
238
239 static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata,
240                                         meta_data_t **md_out) {
241   *md_out = meta_data_create();
242   if (*md_out == nullptr) {
243     return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
244                         grpc::string("failed to create metadata list"));
245   }
246   for (auto kv : rpc_metadata) {
247     auto k = kv.first.c_str();
248     auto v = kv.second;
249
250     // The meta_data collection individually allocates copies of the keys and
251     // string values for each entry, so it's safe for us to pass a reference
252     // to our short-lived strings.
253
254     switch (v.value_case()) {
255     case collectd::types::MetadataValue::ValueCase::kStringValue:
256       meta_data_add_string(*md_out, k, v.string_value().c_str());
257       break;
258     case collectd::types::MetadataValue::ValueCase::kInt64Value:
259       meta_data_add_signed_int(*md_out, k, v.int64_value());
260       break;
261     case collectd::types::MetadataValue::ValueCase::kUint64Value:
262       meta_data_add_unsigned_int(*md_out, k, v.uint64_value());
263       break;
264     case collectd::types::MetadataValue::ValueCase::kDoubleValue:
265       meta_data_add_double(*md_out, k, v.double_value());
266       break;
267     case collectd::types::MetadataValue::ValueCase::kBoolValue:
268       meta_data_add_boolean(*md_out, k, v.bool_value());
269       break;
270     default:
271       meta_data_destroy(*md_out);
272       return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
273                           grpc::string("Metadata of unknown type"));
274     }
275   }
276   return grpc::Status::OK;
277 }
278
279 static grpc::Status marshal_value_list(const value_list_t *vl,
280                                        collectd::types::ValueList *msg) {
281   auto id = msg->mutable_identifier();
282   marshal_ident(vl, id);
283
284   auto ds = plugin_get_ds(vl->type);
285   if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
286     return grpc::Status(grpc::StatusCode::INTERNAL,
287                         grpc::string("failed to retrieve data-set for values"));
288   }
289
290   auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
291   auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
292   msg->set_allocated_time(new google::protobuf::Timestamp(t));
293   msg->set_allocated_interval(new google::protobuf::Duration(d));
294
295   msg->clear_meta_data();
296   if (vl->meta != nullptr) {
297     grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data());
298     if (!status.ok()) {
299       return status;
300     }
301   }
302
303   for (size_t i = 0; i < vl->values_len; ++i) {
304     auto v = msg->add_values();
305     int value_type = ds->ds[i].type;
306     switch (value_type) {
307     case DS_TYPE_COUNTER:
308       v->set_counter(vl->values[i].counter);
309       break;
310     case DS_TYPE_GAUGE:
311       v->set_gauge(vl->values[i].gauge);
312       break;
313     case DS_TYPE_DERIVE:
314       v->set_derive(vl->values[i].derive);
315       break;
316     case DS_TYPE_ABSOLUTE:
317       v->set_absolute(vl->values[i].absolute);
318       break;
319     default:
320       ERROR("grpc: invalid value type (%d)", value_type);
321       return grpc::Status(grpc::StatusCode::INTERNAL,
322                           grpc::string("unknown value type"));
323     }
324
325     auto name = msg->add_ds_names();
326     name->assign(ds->ds[i].name);
327   }
328
329   return grpc::Status::OK;
330 } /* marshal_value_list */
331
332 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
333                                          value_list_t *vl) {
334   vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
335   vl->interval =
336       NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
337
338   auto status = unmarshal_ident(msg.identifier(), vl, true);
339   if (!status.ok())
340     return status;
341
342   status = unmarshal_meta_data(msg.meta_data(), &vl->meta);
343   if (!status.ok())
344     return status;
345
346   value_t *values = NULL;
347   size_t values_len = 0;
348
349   status = grpc::Status::OK;
350   for (auto v : msg.values()) {
351     value_t *val =
352         (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
353     if (!val) {
354       status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
355                             grpc::string("failed to allocate values array"));
356       break;
357     }
358
359     values = val;
360     val = values + values_len;
361     values_len++;
362
363     switch (v.value_case()) {
364     case collectd::types::Value::ValueCase::kCounter:
365       val->counter = counter_t(v.counter());
366       break;
367     case collectd::types::Value::ValueCase::kGauge:
368       val->gauge = gauge_t(v.gauge());
369       break;
370     case collectd::types::Value::ValueCase::kDerive:
371       val->derive = derive_t(v.derive());
372       break;
373     case collectd::types::Value::ValueCase::kAbsolute:
374       val->absolute = absolute_t(v.absolute());
375       break;
376     default:
377       status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
378                             grpc::string("unknown value type"));
379       break;
380     }
381
382     if (!status.ok())
383       break;
384   }
385   if (status.ok()) {
386     vl->values = values;
387     vl->values_len = values_len;
388   } else {
389     meta_data_destroy(vl->meta);
390     free(values);
391   }
392
393   return status;
394 } /* unmarshal_value_list() */
395
396 /*
397  * Collectd service
398  */
399 class CollectdImpl : public collectd::Collectd::Service {
400 public:
401   grpc::Status
402   QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
403               grpc::ServerWriter<QueryValuesResponse> *writer) override {
404     value_list_t match;
405     auto status = unmarshal_ident(req->identifier(), &match, false);
406     if (!status.ok()) {
407       return status;
408     }
409
410     std::queue<value_list_t> value_lists;
411     status = this->queryValuesRead(&match, &value_lists);
412     if (status.ok()) {
413       status = this->queryValuesWrite(ctx, writer, &value_lists);
414     }
415
416     while (!value_lists.empty()) {
417       auto vl = value_lists.front();
418       value_lists.pop();
419       sfree(vl.values);
420       meta_data_destroy(vl.meta);
421     }
422
423     return status;
424   }
425
426   grpc::Status PutValues(grpc::ServerContext *ctx,
427                          grpc::ServerReader<PutValuesRequest> *reader,
428                          PutValuesResponse *res) override {
429     PutValuesRequest req;
430
431     while (reader->Read(&req)) {
432       value_list_t vl = {0};
433       auto status = unmarshal_value_list(req.value_list(), &vl);
434       if (!status.ok())
435         return status;
436
437       if (plugin_dispatch_values(&vl))
438         return grpc::Status(
439             grpc::StatusCode::INTERNAL,
440             grpc::string("failed to enqueue values for writing"));
441     }
442
443     res->Clear();
444     return grpc::Status::OK;
445   }
446
447 private:
448   grpc::Status queryValuesRead(value_list_t const *match,
449                                std::queue<value_list_t> *value_lists) {
450     uc_iter_t *iter;
451     if ((iter = uc_get_iterator()) == NULL) {
452       return grpc::Status(
453           grpc::StatusCode::INTERNAL,
454           grpc::string("failed to query values: cannot create iterator"));
455     }
456
457     grpc::Status status = grpc::Status::OK;
458     char *name = NULL;
459     while (uc_iterator_next(iter, &name) == 0) {
460       value_list_t vl;
461       if (parse_identifier_vl(name, &vl) != 0) {
462         status = grpc::Status(grpc::StatusCode::INTERNAL,
463                               grpc::string("failed to parse identifier"));
464         break;
465       }
466
467       if (!ident_matches(&vl, match))
468         continue;
469       if (uc_iterator_get_time(iter, &vl.time) < 0) {
470         status =
471             grpc::Status(grpc::StatusCode::INTERNAL,
472                          grpc::string("failed to retrieve value timestamp"));
473         break;
474       }
475       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
476         status =
477             grpc::Status(grpc::StatusCode::INTERNAL,
478                          grpc::string("failed to retrieve value interval"));
479         break;
480       }
481       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
482         status = grpc::Status(grpc::StatusCode::INTERNAL,
483                               grpc::string("failed to retrieve values"));
484         break;
485       }
486       if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
487         status =
488             grpc::Status(grpc::StatusCode::INTERNAL,
489                          grpc::string("failed to retrieve value metadata"));
490       }
491
492       value_lists->push(vl);
493     } // while (uc_iterator_next(iter, &name) == 0)
494
495     uc_iterator_destroy(iter);
496     return status;
497   }
498
499   grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
500                                 grpc::ServerWriter<QueryValuesResponse> *writer,
501                                 std::queue<value_list_t> *value_lists) {
502     while (!value_lists->empty()) {
503       auto vl = value_lists->front();
504       QueryValuesResponse res;
505       res.Clear();
506
507       auto status = marshal_value_list(&vl, res.mutable_value_list());
508       if (!status.ok()) {
509         return status;
510       }
511
512       if (!writer->Write(res)) {
513         return grpc::Status::CANCELLED;
514       }
515
516       value_lists->pop();
517       sfree(vl.values);
518     }
519
520     return grpc::Status::OK;
521   }
522 };
523
524 /*
525  * gRPC server implementation
526  */
527 class CollectdServer final {
528 public:
529   void Start() {
530     auto auth = grpc::InsecureServerCredentials();
531
532     grpc::ServerBuilder builder;
533
534     if (listeners.empty()) {
535       builder.AddListeningPort(default_addr, auth);
536       INFO("grpc: Listening on %s", default_addr.c_str());
537     } else {
538       for (auto l : listeners) {
539         grpc::string addr = l.addr + ":" + l.port;
540
541         auto use_ssl = grpc::string("");
542         auto a = auth;
543         if (l.ssl != nullptr) {
544           use_ssl = grpc::string(" (SSL enabled)");
545           a = grpc::SslServerCredentials(*l.ssl);
546         }
547
548         builder.AddListeningPort(addr, a);
549         INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
550       }
551     }
552
553     builder.RegisterService(&collectd_service_);
554
555     server_ = builder.BuildAndStart();
556   } /* Start() */
557
558   void Shutdown() { server_->Shutdown(); } /* Shutdown() */
559
560 private:
561   CollectdImpl collectd_service_;
562
563   std::unique_ptr<grpc::Server> server_;
564 }; /* class CollectdServer */
565
566 class CollectdClient final {
567 public:
568   CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
569       : stub_(Collectd::NewStub(channel)) {}
570
571   int PutValues(value_list_t const *vl) {
572     grpc::ClientContext ctx;
573
574     PutValuesRequest req;
575     auto status = marshal_value_list(vl, req.mutable_value_list());
576     if (!status.ok()) {
577       ERROR("grpc: Marshalling value_list_t failed.");
578       return -1;
579     }
580
581     PutValuesResponse res;
582     auto stream = stub_->PutValues(&ctx, &res);
583     if (!stream->Write(req)) {
584       NOTICE("grpc: Broken stream.");
585       /* intentionally not returning. */
586     }
587
588     stream->WritesDone();
589     status = stream->Finish();
590     if (!status.ok()) {
591       ERROR("grpc: Error while closing stream.");
592       return -1;
593     }
594
595     return 0;
596   } /* int PutValues */
597
598 private:
599   std::unique_ptr<Collectd::Stub> stub_;
600 };
601
602 static CollectdServer *server = nullptr;
603
604 /*
605  * collectd plugin interface
606  */
607 extern "C" {
608 static void c_grpc_destroy_write_callback(void *ptr) {
609   delete (CollectdClient *)ptr;
610 }
611
612 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
613                         value_list_t const *vl, user_data_t *ud) {
614   CollectdClient *c = (CollectdClient *)ud->data;
615   return c->PutValues(vl);
616 }
617
618 static int c_grpc_config_listen(oconfig_item_t *ci) {
619   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
620       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
621     ERROR("grpc: The `%s` config option needs exactly "
622           "two string argument (address and port).",
623           ci->key);
624     return -1;
625   }
626
627   auto listener = Listener();
628   listener.addr = grpc::string(ci->values[0].value.string);
629   listener.port = grpc::string(ci->values[1].value.string);
630   listener.ssl = nullptr;
631
632   auto ssl_opts = new grpc::SslServerCredentialsOptions(
633       GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
634   grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
635   bool use_ssl = false;
636
637   for (int i = 0; i < ci->children_num; i++) {
638     oconfig_item_t *child = ci->children + i;
639
640     if (!strcasecmp("EnableSSL", child->key)) {
641       if (cf_util_get_boolean(child, &use_ssl)) {
642         ERROR("grpc: Option `%s` expects a boolean value", child->key);
643         return -1;
644       }
645     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
646       char *certs = NULL;
647       if (cf_util_get_string(child, &certs)) {
648         ERROR("grpc: Option `%s` expects a string value", child->key);
649         return -1;
650       }
651       ssl_opts->pem_root_certs = read_file(certs);
652     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
653       char *key = NULL;
654       if (cf_util_get_string(child, &key)) {
655         ERROR("grpc: Option `%s` expects a string value", child->key);
656         return -1;
657       }
658       pkcp.private_key = read_file(key);
659     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
660       char *cert = NULL;
661       if (cf_util_get_string(child, &cert)) {
662         ERROR("grpc: Option `%s` expects a string value", child->key);
663         return -1;
664       }
665       pkcp.cert_chain = read_file(cert);
666     } else if (!strcasecmp("VerifyPeer", child->key)) {
667       _Bool verify = 0;
668       if (cf_util_get_boolean(child, &verify)) {
669         return -1;
670       }
671       ssl_opts->client_certificate_request =
672           verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
673                  : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE;
674     } else {
675       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
676               ci->key);
677     }
678   }
679
680   ssl_opts->pem_key_cert_pairs.push_back(pkcp);
681   if (use_ssl)
682     listener.ssl = ssl_opts;
683   else
684     delete (ssl_opts);
685
686   listeners.push_back(listener);
687   return 0;
688 } /* c_grpc_config_listen() */
689
690 static int c_grpc_config_server(oconfig_item_t *ci) {
691   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
692       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
693     ERROR("grpc: The `%s` config option needs exactly "
694           "two string argument (address and port).",
695           ci->key);
696     return -1;
697   }
698
699   grpc::SslCredentialsOptions ssl_opts;
700   bool use_ssl = false;
701
702   for (int i = 0; i < ci->children_num; i++) {
703     oconfig_item_t *child = ci->children + i;
704
705     if (!strcasecmp("EnableSSL", child->key)) {
706       if (cf_util_get_boolean(child, &use_ssl)) {
707         return -1;
708       }
709     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
710       char *certs = NULL;
711       if (cf_util_get_string(child, &certs)) {
712         return -1;
713       }
714       ssl_opts.pem_root_certs = read_file(certs);
715     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
716       char *key = NULL;
717       if (cf_util_get_string(child, &key)) {
718         return -1;
719       }
720       ssl_opts.pem_private_key = read_file(key);
721     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
722       char *cert = NULL;
723       if (cf_util_get_string(child, &cert)) {
724         return -1;
725       }
726       ssl_opts.pem_cert_chain = read_file(cert);
727     } else {
728       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
729               ci->key);
730     }
731   }
732
733   auto node = grpc::string(ci->values[0].value.string);
734   auto service = grpc::string(ci->values[1].value.string);
735   auto addr = node + ":" + service;
736
737   CollectdClient *client;
738   if (use_ssl) {
739     auto channel_creds = grpc::SslCredentials(ssl_opts);
740     auto channel = grpc::CreateChannel(addr, channel_creds);
741     client = new CollectdClient(channel);
742   } else {
743     auto channel =
744         grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
745     client = new CollectdClient(channel);
746   }
747
748   auto callback_name = grpc::string("grpc/") + addr;
749   user_data_t ud = {
750       .data = client, .free_func = c_grpc_destroy_write_callback,
751   };
752
753   plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
754   return 0;
755 } /* c_grpc_config_server() */
756
757 static int c_grpc_config(oconfig_item_t *ci) {
758   int i;
759
760   for (i = 0; i < ci->children_num; i++) {
761     oconfig_item_t *child = ci->children + i;
762
763     if (!strcasecmp("Listen", child->key)) {
764       if (c_grpc_config_listen(child))
765         return -1;
766     } else if (!strcasecmp("Server", child->key)) {
767       if (c_grpc_config_server(child))
768         return -1;
769     }
770
771     else {
772       WARNING("grpc: Option `%s` not allowed here.", child->key);
773     }
774   }
775
776   return 0;
777 } /* c_grpc_config() */
778
779 static int c_grpc_init(void) {
780   server = new CollectdServer();
781   if (!server) {
782     ERROR("grpc: Failed to create server");
783     return -1;
784   }
785
786   server->Start();
787   return 0;
788 } /* c_grpc_init() */
789
790 static int c_grpc_shutdown(void) {
791   if (!server)
792     return 0;
793
794   server->Shutdown();
795
796   delete server;
797   server = nullptr;
798
799   return 0;
800 } /* c_grpc_shutdown() */
801
802 void module_register(void) {
803   plugin_register_complex_config("grpc", c_grpc_config);
804   plugin_register_init("grpc", c_grpc_init);
805   plugin_register_shutdown("grpc", c_grpc_shutdown);
806 } /* module_register() */
807 } /* extern "C" */