Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  * Copyright (C) 2016      Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastian Harl <sh at tokkee.org>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
31
32 #include <fstream>
33 #include <iostream>
34 #include <queue>
35 #include <vector>
36
37 #include "collectd.grpc.pb.h"
38
39 extern "C" {
40 #include <fnmatch.h>
41 #include <stdbool.h>
42
43 #include "collectd.h"
44 #include "common.h"
45 #include "plugin.h"
46
47 #include "daemon/utils_cache.h"
48 }
49
50 using collectd::Collectd;
51
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 typedef google::protobuf::Map<grpc::string, collectd::types::MetadataValue> grpcMetadata;
60
61 /*
62  * private types
63  */
64
65 struct Listener {
66   grpc::string addr;
67   grpc::string port;
68
69   grpc::SslServerCredentialsOptions *ssl;
70 };
71 static std::vector<Listener> listeners;
72 static grpc::string default_addr("0.0.0.0:50051");
73
74 /*
75  * helper functions
76  */
77
78 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
79   if (fnmatch(matcher->host, vl->host, 0))
80     return false;
81
82   if (fnmatch(matcher->plugin, vl->plugin, 0))
83     return false;
84   if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
85     return false;
86
87   if (fnmatch(matcher->type, vl->type, 0))
88     return false;
89   if (fnmatch(matcher->type_instance, vl->type_instance, 0))
90     return false;
91
92   return true;
93 } /* ident_matches */
94
95 static grpc::string read_file(const char *filename) {
96   std::ifstream f;
97   grpc::string s, content;
98
99   f.open(filename);
100   if (!f.is_open()) {
101     ERROR("grpc: Failed to open '%s'", filename);
102     return "";
103   }
104
105   while (std::getline(f, s)) {
106     content += s;
107     content.push_back('\n');
108   }
109   f.close();
110   return content;
111 } /* read_file */
112
113 /*
114  * proto conversion
115  */
116
117 static void marshal_ident(const value_list_t *vl,
118                           collectd::types::Identifier *msg) {
119   msg->set_host(vl->host);
120   msg->set_plugin(vl->plugin);
121   if (vl->plugin_instance[0] != '\0')
122     msg->set_plugin_instance(vl->plugin_instance);
123   msg->set_type(vl->type);
124   if (vl->type_instance[0] != '\0')
125     msg->set_type_instance(vl->type_instance);
126 } /* marshal_ident */
127
128 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
129                                     value_list_t *vl, bool require_fields) {
130   std::string s;
131
132   s = msg.host();
133   if (!s.length() && require_fields)
134     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
135                         grpc::string("missing host name"));
136   sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137
138   s = msg.plugin();
139   if (!s.length() && require_fields)
140     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
141                         grpc::string("missing plugin name"));
142   sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143
144   s = msg.type();
145   if (!s.length() && require_fields)
146     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
147                         grpc::string("missing type name"));
148   sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
149
150   s = msg.plugin_instance();
151   sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
152
153   s = msg.type_instance();
154   sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
155
156   return grpc::Status::OK;
157 } /* unmarshal_ident() */
158
159 static grpc::Status marshal_meta_data(meta_data_t *meta,
160                                       grpcMetadata *mutable_meta_data) {
161   char **meta_data_keys = nullptr;
162   int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys);
163   if (meta_data_keys_len < 0) {
164     return grpc::Status(grpc::StatusCode::INTERNAL,
165                         grpc::string("error getting metadata keys"));
166   }
167
168   for (int i = 0; i < meta_data_keys_len; i++) {
169     char *key = meta_data_keys[i];
170     int md_type = meta_data_type(meta, key);
171
172     collectd::types::MetadataValue md_value;
173     md_value.Clear();
174
175     switch (md_type) {
176     case MD_TYPE_STRING:
177       char *md_string;
178       if (meta_data_get_string(meta, key, &md_string) != 0 || md_string == nullptr) {
179         strarray_free(meta_data_keys, meta_data_keys_len);
180         return grpc::Status(grpc::StatusCode::INTERNAL,
181                           grpc::string("missing metadata"));
182       }
183       md_value.set_string_value(md_string);
184       free(md_string);
185       break;
186     case MD_TYPE_SIGNED_INT:
187       int64_t int64_value;
188       if (meta_data_get_signed_int(meta, key, &int64_value) != 0) {
189         strarray_free(meta_data_keys, meta_data_keys_len);
190         return grpc::Status(grpc::StatusCode::INTERNAL,
191                           grpc::string("missing metadata"));
192       }
193       md_value.set_int64_value(int64_value);
194       break;
195     case MD_TYPE_UNSIGNED_INT:
196       uint64_t uint64_value;
197       if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) {
198         strarray_free(meta_data_keys, meta_data_keys_len);
199         return grpc::Status(grpc::StatusCode::INTERNAL,
200                           grpc::string("missing metadata"));
201       }
202       md_value.set_uint64_value(uint64_value);
203       break;
204     case MD_TYPE_DOUBLE:
205       double double_value;
206       if (meta_data_get_double(meta, key, &double_value) != 0) {
207         strarray_free(meta_data_keys, meta_data_keys_len);
208         return grpc::Status(grpc::StatusCode::INTERNAL,
209                           grpc::string("missing metadata"));
210       }
211       md_value.set_double_value(double_value);
212       break;
213     case MD_TYPE_BOOLEAN:
214       bool bool_value;
215       if (meta_data_get_boolean(meta, key, &bool_value) != 0) {
216         strarray_free(meta_data_keys, meta_data_keys_len);
217         return grpc::Status(grpc::StatusCode::INTERNAL,
218                           grpc::string("missing metadata"));
219       }
220       md_value.set_bool_value(bool_value);
221       break;
222     default:
223       strarray_free(meta_data_keys, meta_data_keys_len);
224       ERROR("grpc: invalid metadata type (%d)", md_type);
225       return grpc::Status(grpc::StatusCode::INTERNAL,
226                           grpc::string("unknown metadata type"));
227     }
228
229     (*mutable_meta_data)[grpc::string(key)] = md_value;
230
231     strarray_free(meta_data_keys, meta_data_keys_len);
232   }
233
234   return grpc::Status::OK;
235 }
236
237 static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata,
238                                         meta_data_t **md_out) {
239   *md_out = meta_data_create();
240   if (*md_out == nullptr) {
241     return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
242                         grpc::string("failed to metadata list"));
243   }
244   for (auto kv: rpc_metadata) {
245     auto k = kv.first.c_str();
246     auto v = kv.second;
247
248     // The meta_data collection individually allocates copies of the keys and
249     // string values for each entry, so it's safe for us to pass a reference
250     // to our short-lived strings.
251
252     switch (v.value_case()) {
253     case collectd::types::MetadataValue::ValueCase::kStringValue:
254       meta_data_add_string(*md_out, k, v.string_value().c_str());
255       break;
256     case collectd::types::MetadataValue::ValueCase::kInt64Value:
257       meta_data_add_signed_int(*md_out, k, v.int64_value());
258       break;
259     case collectd::types::MetadataValue::ValueCase::kUint64Value:
260       meta_data_add_unsigned_int(*md_out, k, v.uint64_value());
261       break;
262     case collectd::types::MetadataValue::ValueCase::kDoubleValue:
263       meta_data_add_double(*md_out, k, v.double_value());
264       break;
265     case collectd::types::MetadataValue::ValueCase::kBoolValue:
266       meta_data_add_boolean(*md_out, k, v.bool_value());
267       break;
268     default:
269       meta_data_destroy(*md_out);
270       return  grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
271                            grpc::string("Metadata of unknown type"));
272     }
273   }
274   return grpc::Status::OK;
275 }
276
277 static grpc::Status marshal_value_list(const value_list_t *vl,
278                                        collectd::types::ValueList *msg) {
279   auto id = msg->mutable_identifier();
280   marshal_ident(vl, id);
281
282   auto ds = plugin_get_ds(vl->type);
283   if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
284     return grpc::Status(grpc::StatusCode::INTERNAL,
285                         grpc::string("failed to retrieve data-set for values"));
286   }
287
288   auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
289   auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
290   msg->set_allocated_time(new google::protobuf::Timestamp(t));
291   msg->set_allocated_interval(new google::protobuf::Duration(d));
292
293   msg->clear_meta_data();
294   if (vl->meta != nullptr) {
295     grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data());
296     if (!status.ok()) {
297       return status;
298     }
299   }
300
301   for (size_t i = 0; i < vl->values_len; ++i) {
302     auto v = msg->add_values();
303     int value_type = ds->ds[i].type;
304     switch (value_type) {
305     case DS_TYPE_COUNTER:
306       v->set_counter(vl->values[i].counter);
307       break;
308     case DS_TYPE_GAUGE:
309       v->set_gauge(vl->values[i].gauge);
310       break;
311     case DS_TYPE_DERIVE:
312       v->set_derive(vl->values[i].derive);
313       break;
314     case DS_TYPE_ABSOLUTE:
315       v->set_absolute(vl->values[i].absolute);
316       break;
317     default:
318       ERROR("grpc: invalid value type (%d)", value_type);
319       return grpc::Status(grpc::StatusCode::INTERNAL,
320                           grpc::string("unknown value type"));
321     }
322
323     auto name = msg->add_ds_names();
324     name->assign(ds->ds[i].name);
325   }
326
327   return grpc::Status::OK;
328 } /* marshal_value_list */
329
330 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
331                                          value_list_t *vl) {
332   vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
333   vl->interval =
334       NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
335
336   auto status = unmarshal_ident(msg.identifier(), vl, true);
337   if (!status.ok())
338     return status;
339
340   status = unmarshal_meta_data(msg.meta_data(), &vl->meta);
341   if (!status.ok())
342     return status;
343
344   value_t *values = NULL;
345   size_t values_len = 0;
346
347   status = grpc::Status::OK;
348   for (auto v : msg.values()) {
349     value_t *val =
350         (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
351     if (!val) {
352       status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
353                             grpc::string("failed to allocate values array"));
354       break;
355     }
356
357     values = val;
358     val = values + values_len;
359     values_len++;
360
361     switch (v.value_case()) {
362     case collectd::types::Value::ValueCase::kCounter:
363       val->counter = counter_t(v.counter());
364       break;
365     case collectd::types::Value::ValueCase::kGauge:
366       val->gauge = gauge_t(v.gauge());
367       break;
368     case collectd::types::Value::ValueCase::kDerive:
369       val->derive = derive_t(v.derive());
370       break;
371     case collectd::types::Value::ValueCase::kAbsolute:
372       val->absolute = absolute_t(v.absolute());
373       break;
374     default:
375       status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
376                             grpc::string("unknown value type"));
377       break;
378     }
379
380     if (!status.ok())
381       break;
382   }
383   if (status.ok()) {
384     vl->values = values;
385     vl->values_len = values_len;
386   } else {
387     meta_data_destroy(vl->meta);
388     free(values);
389   }
390
391   return status;
392 } /* unmarshal_value_list() */
393
394 /*
395  * Collectd service
396  */
397 class CollectdImpl : public collectd::Collectd::Service {
398 public:
399   grpc::Status
400   QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
401               grpc::ServerWriter<QueryValuesResponse> *writer) override {
402     value_list_t match;
403     auto status = unmarshal_ident(req->identifier(), &match, false);
404     if (!status.ok()) {
405       return status;
406     }
407
408     std::queue<value_list_t> value_lists;
409     status = this->queryValuesRead(&match, &value_lists);
410     if (status.ok()) {
411       status = this->queryValuesWrite(ctx, writer, &value_lists);
412     }
413
414     while (!value_lists.empty()) {
415       auto vl = value_lists.front();
416       value_lists.pop();
417       sfree(vl.values);
418       meta_data_destroy(vl.meta);
419     }
420
421     return status;
422   }
423
424   grpc::Status PutValues(grpc::ServerContext *ctx,
425                          grpc::ServerReader<PutValuesRequest> *reader,
426                          PutValuesResponse *res) override {
427     PutValuesRequest req;
428
429     while (reader->Read(&req)) {
430       value_list_t vl = {0};
431       auto status = unmarshal_value_list(req.value_list(), &vl);
432       if (!status.ok())
433         return status;
434
435       if (plugin_dispatch_values(&vl))
436         return grpc::Status(
437             grpc::StatusCode::INTERNAL,
438             grpc::string("failed to enqueue values for writing"));
439     }
440
441     res->Clear();
442     return grpc::Status::OK;
443   }
444
445 private:
446   grpc::Status queryValuesRead(value_list_t const *match,
447                                std::queue<value_list_t> *value_lists) {
448     uc_iter_t *iter;
449     if ((iter = uc_get_iterator()) == NULL) {
450       return grpc::Status(
451           grpc::StatusCode::INTERNAL,
452           grpc::string("failed to query values: cannot create iterator"));
453     }
454
455     grpc::Status status = grpc::Status::OK;
456     char *name = NULL;
457     while (uc_iterator_next(iter, &name) == 0) {
458       value_list_t vl;
459       if (parse_identifier_vl(name, &vl) != 0) {
460         status = grpc::Status(grpc::StatusCode::INTERNAL,
461                               grpc::string("failed to parse identifier"));
462         break;
463       }
464
465       if (!ident_matches(&vl, match))
466         continue;
467       if (uc_iterator_get_time(iter, &vl.time) < 0) {
468         status =
469             grpc::Status(grpc::StatusCode::INTERNAL,
470                          grpc::string("failed to retrieve value timestamp"));
471         break;
472       }
473       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
474         status =
475             grpc::Status(grpc::StatusCode::INTERNAL,
476                          grpc::string("failed to retrieve value interval"));
477         break;
478       }
479       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
480         status = grpc::Status(grpc::StatusCode::INTERNAL,
481                               grpc::string("failed to retrieve values"));
482         break;
483       }
484       if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
485         status = grpc::Status(grpc::StatusCode::INTERNAL,
486                               grpc::string("failed to retrieve value metadata"));
487       }
488
489       value_lists->push(vl);
490     } // while (uc_iterator_next(iter, &name) == 0)
491
492     uc_iterator_destroy(iter);
493     return status;
494   }
495
496   grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
497                                 grpc::ServerWriter<QueryValuesResponse> *writer,
498                                 std::queue<value_list_t> *value_lists) {
499     while (!value_lists->empty()) {
500       auto vl = value_lists->front();
501       QueryValuesResponse res;
502       res.Clear();
503
504       auto status = marshal_value_list(&vl, res.mutable_value_list());
505       if (!status.ok()) {
506         return status;
507       }
508
509       if (!writer->Write(res)) {
510         return grpc::Status::CANCELLED;
511       }
512
513       value_lists->pop();
514       sfree(vl.values);
515     }
516
517     return grpc::Status::OK;
518   }
519 };
520
521 /*
522  * gRPC server implementation
523  */
524 class CollectdServer final {
525 public:
526   void Start() {
527     auto auth = grpc::InsecureServerCredentials();
528
529     grpc::ServerBuilder builder;
530
531     if (listeners.empty()) {
532       builder.AddListeningPort(default_addr, auth);
533       INFO("grpc: Listening on %s", default_addr.c_str());
534     } else {
535       for (auto l : listeners) {
536         grpc::string addr = l.addr + ":" + l.port;
537
538         auto use_ssl = grpc::string("");
539         auto a = auth;
540         if (l.ssl != nullptr) {
541           use_ssl = grpc::string(" (SSL enabled)");
542           a = grpc::SslServerCredentials(*l.ssl);
543         }
544
545         builder.AddListeningPort(addr, a);
546         INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
547       }
548     }
549
550     builder.RegisterService(&collectd_service_);
551
552     server_ = builder.BuildAndStart();
553   } /* Start() */
554
555   void Shutdown() { server_->Shutdown(); } /* Shutdown() */
556
557 private:
558   CollectdImpl collectd_service_;
559
560   std::unique_ptr<grpc::Server> server_;
561 }; /* class CollectdServer */
562
563 class CollectdClient final {
564 public:
565   CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
566       : stub_(Collectd::NewStub(channel)) {}
567
568   int PutValues(value_list_t const *vl) {
569     grpc::ClientContext ctx;
570
571     PutValuesRequest req;
572     auto status = marshal_value_list(vl, req.mutable_value_list());
573     if (!status.ok()) {
574       ERROR("grpc: Marshalling value_list_t failed.");
575       return -1;
576     }
577
578     PutValuesResponse res;
579     auto stream = stub_->PutValues(&ctx, &res);
580     if (!stream->Write(req)) {
581       NOTICE("grpc: Broken stream.");
582       /* intentionally not returning. */
583     }
584
585     stream->WritesDone();
586     status = stream->Finish();
587     if (!status.ok()) {
588       ERROR("grpc: Error while closing stream.");
589       return -1;
590     }
591
592     return 0;
593   } /* int PutValues */
594
595 private:
596   std::unique_ptr<Collectd::Stub> stub_;
597 };
598
599 static CollectdServer *server = nullptr;
600
601 /*
602  * collectd plugin interface
603  */
604 extern "C" {
605 static void c_grpc_destroy_write_callback(void *ptr) {
606   delete (CollectdClient *)ptr;
607 }
608
609 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
610                         value_list_t const *vl, user_data_t *ud) {
611   CollectdClient *c = (CollectdClient *)ud->data;
612   return c->PutValues(vl);
613 }
614
615 static int c_grpc_config_listen(oconfig_item_t *ci) {
616   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
617       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
618     ERROR("grpc: The `%s` config option needs exactly "
619           "two string argument (address and port).",
620           ci->key);
621     return -1;
622   }
623
624   auto listener = Listener();
625   listener.addr = grpc::string(ci->values[0].value.string);
626   listener.port = grpc::string(ci->values[1].value.string);
627   listener.ssl = nullptr;
628
629   auto ssl_opts = new (grpc::SslServerCredentialsOptions);
630   grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
631   bool use_ssl = false;
632
633   for (int i = 0; i < ci->children_num; i++) {
634     oconfig_item_t *child = ci->children + i;
635
636     if (!strcasecmp("EnableSSL", child->key)) {
637       if (cf_util_get_boolean(child, &use_ssl)) {
638         ERROR("grpc: Option `%s` expects a boolean value", child->key);
639         return -1;
640       }
641     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
642       char *certs = NULL;
643       if (cf_util_get_string(child, &certs)) {
644         ERROR("grpc: Option `%s` expects a string value", child->key);
645         return -1;
646       }
647       ssl_opts->pem_root_certs = read_file(certs);
648     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
649       char *key = NULL;
650       if (cf_util_get_string(child, &key)) {
651         ERROR("grpc: Option `%s` expects a string value", child->key);
652         return -1;
653       }
654       pkcp.private_key = read_file(key);
655     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
656       char *cert = NULL;
657       if (cf_util_get_string(child, &cert)) {
658         ERROR("grpc: Option `%s` expects a string value", child->key);
659         return -1;
660       }
661       pkcp.cert_chain = read_file(cert);
662     } else {
663       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
664               ci->key);
665     }
666   }
667
668   ssl_opts->pem_key_cert_pairs.push_back(pkcp);
669   if (use_ssl)
670     listener.ssl = ssl_opts;
671   else
672     delete (ssl_opts);
673
674   listeners.push_back(listener);
675   return 0;
676 } /* c_grpc_config_listen() */
677
678 static int c_grpc_config_server(oconfig_item_t *ci) {
679   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
680       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
681     ERROR("grpc: The `%s` config option needs exactly "
682           "two string argument (address and port).",
683           ci->key);
684     return -1;
685   }
686
687   grpc::SslCredentialsOptions ssl_opts;
688   bool use_ssl = false;
689
690   for (int i = 0; i < ci->children_num; i++) {
691     oconfig_item_t *child = ci->children + i;
692
693     if (!strcasecmp("EnableSSL", child->key)) {
694       if (cf_util_get_boolean(child, &use_ssl)) {
695         return -1;
696       }
697     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
698       char *certs = NULL;
699       if (cf_util_get_string(child, &certs)) {
700         return -1;
701       }
702       ssl_opts.pem_root_certs = read_file(certs);
703     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
704       char *key = NULL;
705       if (cf_util_get_string(child, &key)) {
706         return -1;
707       }
708       ssl_opts.pem_private_key = read_file(key);
709     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
710       char *cert = NULL;
711       if (cf_util_get_string(child, &cert)) {
712         return -1;
713       }
714       ssl_opts.pem_cert_chain = read_file(cert);
715     } else {
716       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
717               ci->key);
718     }
719   }
720
721   auto node = grpc::string(ci->values[0].value.string);
722   auto service = grpc::string(ci->values[1].value.string);
723   auto addr = node + ":" + service;
724
725   CollectdClient *client;
726   if (use_ssl) {
727     auto channel_creds = grpc::SslCredentials(ssl_opts);
728     auto channel = grpc::CreateChannel(addr, channel_creds);
729     client = new CollectdClient(channel);
730   } else {
731     auto channel =
732         grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
733     client = new CollectdClient(channel);
734   }
735
736   auto callback_name = grpc::string("grpc/") + addr;
737   user_data_t ud = {
738       .data = client, .free_func = c_grpc_destroy_write_callback,
739   };
740
741   plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
742   return 0;
743 } /* c_grpc_config_server() */
744
745 static int c_grpc_config(oconfig_item_t *ci) {
746   int i;
747
748   for (i = 0; i < ci->children_num; i++) {
749     oconfig_item_t *child = ci->children + i;
750
751     if (!strcasecmp("Listen", child->key)) {
752       if (c_grpc_config_listen(child))
753         return -1;
754     } else if (!strcasecmp("Server", child->key)) {
755       if (c_grpc_config_server(child))
756         return -1;
757     }
758
759     else {
760       WARNING("grpc: Option `%s` not allowed here.", child->key);
761     }
762   }
763
764   return 0;
765 } /* c_grpc_config() */
766
767 static int c_grpc_init(void) {
768   server = new CollectdServer();
769   if (!server) {
770     ERROR("grpc: Failed to create server");
771     return -1;
772   }
773
774   server->Start();
775   return 0;
776 } /* c_grpc_init() */
777
778 static int c_grpc_shutdown(void) {
779   if (!server)
780     return 0;
781
782   server->Shutdown();
783
784   delete server;
785   server = nullptr;
786
787   return 0;
788 } /* c_grpc_shutdown() */
789
790 void module_register(void) {
791   plugin_register_complex_config("grpc", c_grpc_config);
792   plugin_register_init("grpc", c_grpc_init);
793   plugin_register_shutdown("grpc", c_grpc_shutdown);
794 } /* module_register() */
795 } /* extern "C" */