Add metadata support to GRPC plugin
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  * Copyright (C) 2016      Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastian Harl <sh at tokkee.org>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
31
32 #include <fstream>
33 #include <iostream>
34 #include <queue>
35 #include <vector>
36
37 #include "collectd.grpc.pb.h"
38
39 extern "C" {
40 #include <fnmatch.h>
41 #include <stdbool.h>
42
43 #include "collectd.h"
44 #include "common.h"
45 #include "plugin.h"
46
47 #include "daemon/utils_cache.h"
48 }
49
50 using collectd::Collectd;
51
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 /*
60  * private types
61  */
62
63 struct Listener {
64   grpc::string addr;
65   grpc::string port;
66
67   grpc::SslServerCredentialsOptions *ssl;
68 };
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
71
72 /*
73  * helper functions
74  */
75
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
77   if (fnmatch(matcher->host, vl->host, 0))
78     return false;
79
80   if (fnmatch(matcher->plugin, vl->plugin, 0))
81     return false;
82   if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
83     return false;
84
85   if (fnmatch(matcher->type, vl->type, 0))
86     return false;
87   if (fnmatch(matcher->type_instance, vl->type_instance, 0))
88     return false;
89
90   return true;
91 } /* ident_matches */
92
93 static grpc::string read_file(const char *filename) {
94   std::ifstream f;
95   grpc::string s, content;
96
97   f.open(filename);
98   if (!f.is_open()) {
99     ERROR("grpc: Failed to open '%s'", filename);
100     return "";
101   }
102
103   while (std::getline(f, s)) {
104     content += s;
105     content.push_back('\n');
106   }
107   f.close();
108   return content;
109 } /* read_file */
110
111 /*
112  * proto conversion
113  */
114
115 static void marshal_ident(const value_list_t *vl,
116                           collectd::types::Identifier *msg) {
117   msg->set_host(vl->host);
118   msg->set_plugin(vl->plugin);
119   if (vl->plugin_instance[0] != '\0')
120     msg->set_plugin_instance(vl->plugin_instance);
121   msg->set_type(vl->type);
122   if (vl->type_instance[0] != '\0')
123     msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
125
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
127                                     value_list_t *vl, bool require_fields) {
128   std::string s;
129
130   s = msg.host();
131   if (!s.length() && require_fields)
132     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
133                         grpc::string("missing host name"));
134   sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
135
136   s = msg.plugin();
137   if (!s.length() && require_fields)
138     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
139                         grpc::string("missing plugin name"));
140   sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
141
142   s = msg.type();
143   if (!s.length() && require_fields)
144     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
145                         grpc::string("missing type name"));
146   sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
147
148   s = msg.plugin_instance();
149   sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
150
151   s = msg.type_instance();
152   sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
153
154   return grpc::Status::OK;
155 } /* unmarshal_ident() */
156
157 static grpc::Status marshal_value_list(const value_list_t *vl,
158                                        collectd::types::ValueList *msg) {
159   auto id = msg->mutable_identifier();
160   marshal_ident(vl, id);
161
162   auto ds = plugin_get_ds(vl->type);
163   if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
164     return grpc::Status(grpc::StatusCode::INTERNAL,
165                         grpc::string("failed to retrieve data-set for values"));
166   }
167
168   auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
169   auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
170   msg->set_allocated_time(new google::protobuf::Timestamp(t));
171   msg->set_allocated_interval(new google::protobuf::Duration(d));
172
173   msg->clear_meta_data();
174   if (vl->meta != nullptr) {
175     char **meta_data_keys = nullptr;
176     int meta_data_keys_len = meta_data_toc(vl->meta, &meta_data_keys);
177     if (meta_data_keys_len < 0) {
178       return grpc::Status(grpc::StatusCode::INTERNAL,
179                           grpc::string("error getting metadata keys"));
180     }
181
182     for (int i = 0; i < meta_data_keys_len; i++) {
183       char *key = meta_data_keys[i];
184       int md_type = meta_data_type(vl->meta, key);
185
186       collectd::types::MetadataValue md_value;
187       md_value.Clear();
188
189       switch (md_type) {
190       case MD_TYPE_STRING:
191         char *md_string;
192         if (meta_data_get_string(vl->meta, key, &md_string) != 0 || md_string == nullptr) {
193           strarray_free(meta_data_keys, meta_data_keys_len);
194           return grpc::Status(grpc::StatusCode::INTERNAL,
195                             grpc::string("missing metadata"));
196         }
197         md_value.set_string_value(md_string);
198         free(md_string);
199         break;
200       case MD_TYPE_SIGNED_INT:
201         int64_t int64_value;
202         if (meta_data_get_signed_int(vl->meta, key, &int64_value) != 0) {
203           strarray_free(meta_data_keys, meta_data_keys_len);
204           return grpc::Status(grpc::StatusCode::INTERNAL,
205                             grpc::string("missing metadata"));
206         }
207         md_value.set_int64_value(int64_value);
208         break;
209       case MD_TYPE_UNSIGNED_INT:
210         uint64_t uint64_value;
211         if (meta_data_get_unsigned_int(vl->meta, key, &uint64_value) != 0) {
212           strarray_free(meta_data_keys, meta_data_keys_len);
213           return grpc::Status(grpc::StatusCode::INTERNAL,
214                             grpc::string("missing metadata"));
215         }
216         md_value.set_uint64_value(uint64_value);
217         break;
218       case MD_TYPE_DOUBLE:
219         double double_value;
220         if (meta_data_get_double(vl->meta, key, &double_value) != 0) {
221           strarray_free(meta_data_keys, meta_data_keys_len);
222           return grpc::Status(grpc::StatusCode::INTERNAL,
223                             grpc::string("missing metadata"));
224         }
225         md_value.set_double_value(double_value);
226         break;
227       case MD_TYPE_BOOLEAN:
228         bool bool_value;
229         if (meta_data_get_boolean(vl->meta, key, &bool_value) != 0) {
230           strarray_free(meta_data_keys, meta_data_keys_len);
231           return grpc::Status(grpc::StatusCode::INTERNAL,
232                             grpc::string("missing metadata"));
233         }
234         md_value.set_bool_value(bool_value);
235         break;
236       default:
237         strarray_free(meta_data_keys, meta_data_keys_len);
238         return grpc::Status(grpc::StatusCode::INTERNAL,
239                             grpc::string("unknown metadata type"));
240       }
241
242       (*msg->mutable_meta_data())[grpc::string(key)] = md_value;
243
244       if (meta_data_keys != nullptr) {
245         strarray_free(meta_data_keys, meta_data_keys_len);
246       }
247     }
248   }
249
250   for (size_t i = 0; i < vl->values_len; ++i) {
251     auto v = msg->add_values();
252     int value_type = ds->ds[i].type;
253     switch (value_type) {
254     case DS_TYPE_COUNTER:
255       v->set_counter(vl->values[i].counter);
256       break;
257     case DS_TYPE_GAUGE:
258       v->set_gauge(vl->values[i].gauge);
259       break;
260     case DS_TYPE_DERIVE:
261       v->set_derive(vl->values[i].derive);
262       break;
263     case DS_TYPE_ABSOLUTE:
264       v->set_absolute(vl->values[i].absolute);
265       break;
266     default:
267       ERROR("grpc: invalid value type (%d)", value_type);
268       return grpc::Status(grpc::StatusCode::INTERNAL,
269                           grpc::string("unknown value type"));
270     }
271
272     auto name = msg->add_ds_names();
273     name->assign(ds->ds[i].name);
274   }
275
276   return grpc::Status::OK;
277 } /* marshal_value_list */
278
279 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
280                                          value_list_t *vl) {
281   vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
282   vl->interval =
283       NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
284
285   auto status = unmarshal_ident(msg.identifier(), vl, true);
286   if (!status.ok())
287     return status;
288
289   vl->meta = meta_data_create();
290   if (vl->meta == nullptr) {
291     return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
292                         grpc::string("failed to metadata list"));
293   }
294   for (auto kv: msg.meta_data()) {
295     auto k = kv.first.c_str();
296     auto v = kv.second;
297
298     // The meta_data collection individually allocates copies of the keys and
299     // string values for each entry, so it's safe for us to pass a reference
300     // to our short-lived strings.
301
302     switch (v.value_case()) {
303     case collectd::types::MetadataValue::ValueCase::kStringValue:
304       meta_data_add_string(vl->meta, k, v.string_value().c_str());
305       break;
306     case collectd::types::MetadataValue::ValueCase::kInt64Value:
307       meta_data_add_signed_int(vl->meta, k, v.int64_value());
308       break;
309     case collectd::types::MetadataValue::ValueCase::kUint64Value:
310       meta_data_add_unsigned_int(vl->meta, k, v.uint64_value());
311       break;
312     case collectd::types::MetadataValue::ValueCase::kDoubleValue:
313       meta_data_add_double(vl->meta, k, v.double_value());
314       break;
315     case collectd::types::MetadataValue::ValueCase::kBoolValue:
316       meta_data_add_boolean(vl->meta, k, v.bool_value());
317       break;
318     default:
319       meta_data_destroy(vl->meta);
320       return  grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
321                            grpc::string("Metadata of unknown type"));
322     }
323   }
324
325   value_t *values = NULL;
326   size_t values_len = 0;
327
328   status = grpc::Status::OK;
329   for (auto v : msg.values()) {
330     value_t *val =
331         (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
332     if (!val) {
333       status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
334                             grpc::string("failed to allocate values array"));
335       break;
336     }
337
338     values = val;
339     val = values + values_len;
340     values_len++;
341
342     switch (v.value_case()) {
343     case collectd::types::Value::ValueCase::kCounter:
344       val->counter = counter_t(v.counter());
345       break;
346     case collectd::types::Value::ValueCase::kGauge:
347       val->gauge = gauge_t(v.gauge());
348       break;
349     case collectd::types::Value::ValueCase::kDerive:
350       val->derive = derive_t(v.derive());
351       break;
352     case collectd::types::Value::ValueCase::kAbsolute:
353       val->absolute = absolute_t(v.absolute());
354       break;
355     default:
356       status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
357                             grpc::string("unknown value type"));
358       break;
359     }
360
361     if (!status.ok())
362       break;
363   }
364   if (status.ok()) {
365     vl->values = values;
366     vl->values_len = values_len;
367   } else {
368     meta_data_destroy(vl->meta);
369     if (values) {
370       free(values);
371     }
372   }
373
374   return status;
375 } /* unmarshal_value_list() */
376
377 /*
378  * Collectd service
379  */
380 class CollectdImpl : public collectd::Collectd::Service {
381 public:
382   grpc::Status
383   QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
384               grpc::ServerWriter<QueryValuesResponse> *writer) override {
385     value_list_t match;
386     auto status = unmarshal_ident(req->identifier(), &match, false);
387     if (!status.ok()) {
388       return status;
389     }
390
391     std::queue<value_list_t> value_lists;
392     status = this->queryValuesRead(&match, &value_lists);
393     if (status.ok()) {
394       status = this->queryValuesWrite(ctx, writer, &value_lists);
395     }
396
397     while (!value_lists.empty()) {
398       auto vl = value_lists.front();
399       value_lists.pop();
400       sfree(vl.values);
401       meta_data_destroy(vl.meta);
402     }
403
404     return status;
405   }
406
407   grpc::Status PutValues(grpc::ServerContext *ctx,
408                          grpc::ServerReader<PutValuesRequest> *reader,
409                          PutValuesResponse *res) override {
410     PutValuesRequest req;
411
412     while (reader->Read(&req)) {
413       value_list_t vl = {0};
414       auto status = unmarshal_value_list(req.value_list(), &vl);
415       if (!status.ok())
416         return status;
417
418       if (plugin_dispatch_values(&vl))
419         return grpc::Status(
420             grpc::StatusCode::INTERNAL,
421             grpc::string("failed to enqueue values for writing"));
422     }
423
424     res->Clear();
425     return grpc::Status::OK;
426   }
427
428 private:
429   grpc::Status queryValuesRead(value_list_t const *match,
430                                std::queue<value_list_t> *value_lists) {
431     uc_iter_t *iter;
432     if ((iter = uc_get_iterator()) == NULL) {
433       return grpc::Status(
434           grpc::StatusCode::INTERNAL,
435           grpc::string("failed to query values: cannot create iterator"));
436     }
437
438     grpc::Status status = grpc::Status::OK;
439     char *name = NULL;
440     while (uc_iterator_next(iter, &name) == 0) {
441       value_list_t vl;
442       if (parse_identifier_vl(name, &vl) != 0) {
443         status = grpc::Status(grpc::StatusCode::INTERNAL,
444                               grpc::string("failed to parse identifier"));
445         break;
446       }
447
448       if (!ident_matches(&vl, match))
449         continue;
450       if (uc_iterator_get_time(iter, &vl.time) < 0) {
451         status =
452             grpc::Status(grpc::StatusCode::INTERNAL,
453                          grpc::string("failed to retrieve value timestamp"));
454         break;
455       }
456       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
457         status =
458             grpc::Status(grpc::StatusCode::INTERNAL,
459                          grpc::string("failed to retrieve value interval"));
460         break;
461       }
462       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
463         status = grpc::Status(grpc::StatusCode::INTERNAL,
464                               grpc::string("failed to retrieve values"));
465         break;
466       }
467       if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
468         status = grpc::Status(grpc::StatusCode::INTERNAL,
469                               grpc::string("failed to retrieve value metadata"));
470       }
471
472       value_lists->push(vl);
473     } // while (uc_iterator_next(iter, &name) == 0)
474
475     uc_iterator_destroy(iter);
476     return status;
477   }
478
479   grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
480                                 grpc::ServerWriter<QueryValuesResponse> *writer,
481                                 std::queue<value_list_t> *value_lists) {
482     while (!value_lists->empty()) {
483       auto vl = value_lists->front();
484       QueryValuesResponse res;
485       res.Clear();
486
487       auto status = marshal_value_list(&vl, res.mutable_value_list());
488       if (!status.ok()) {
489         return status;
490       }
491
492       if (!writer->Write(res)) {
493         return grpc::Status::CANCELLED;
494       }
495
496       value_lists->pop();
497       sfree(vl.values);
498     }
499
500     return grpc::Status::OK;
501   }
502 };
503
504 /*
505  * gRPC server implementation
506  */
507 class CollectdServer final {
508 public:
509   void Start() {
510     auto auth = grpc::InsecureServerCredentials();
511
512     grpc::ServerBuilder builder;
513
514     if (listeners.empty()) {
515       builder.AddListeningPort(default_addr, auth);
516       INFO("grpc: Listening on %s", default_addr.c_str());
517     } else {
518       for (auto l : listeners) {
519         grpc::string addr = l.addr + ":" + l.port;
520
521         auto use_ssl = grpc::string("");
522         auto a = auth;
523         if (l.ssl != nullptr) {
524           use_ssl = grpc::string(" (SSL enabled)");
525           a = grpc::SslServerCredentials(*l.ssl);
526         }
527
528         builder.AddListeningPort(addr, a);
529         INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
530       }
531     }
532
533     builder.RegisterService(&collectd_service_);
534
535     server_ = builder.BuildAndStart();
536   } /* Start() */
537
538   void Shutdown() { server_->Shutdown(); } /* Shutdown() */
539
540 private:
541   CollectdImpl collectd_service_;
542
543   std::unique_ptr<grpc::Server> server_;
544 }; /* class CollectdServer */
545
546 class CollectdClient final {
547 public:
548   CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
549       : stub_(Collectd::NewStub(channel)) {}
550
551   int PutValues(value_list_t const *vl) {
552     grpc::ClientContext ctx;
553
554     PutValuesRequest req;
555     auto status = marshal_value_list(vl, req.mutable_value_list());
556     if (!status.ok()) {
557       ERROR("grpc: Marshalling value_list_t failed.");
558       return -1;
559     }
560
561     PutValuesResponse res;
562     auto stream = stub_->PutValues(&ctx, &res);
563     if (!stream->Write(req)) {
564       NOTICE("grpc: Broken stream.");
565       /* intentionally not returning. */
566     }
567
568     stream->WritesDone();
569     status = stream->Finish();
570     if (!status.ok()) {
571       ERROR("grpc: Error while closing stream.");
572       return -1;
573     }
574
575     return 0;
576   } /* int PutValues */
577
578 private:
579   std::unique_ptr<Collectd::Stub> stub_;
580 };
581
582 static CollectdServer *server = nullptr;
583
584 /*
585  * collectd plugin interface
586  */
587 extern "C" {
588 static void c_grpc_destroy_write_callback(void *ptr) {
589   delete (CollectdClient *)ptr;
590 }
591
592 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
593                         value_list_t const *vl, user_data_t *ud) {
594   CollectdClient *c = (CollectdClient *)ud->data;
595   return c->PutValues(vl);
596 }
597
598 static int c_grpc_config_listen(oconfig_item_t *ci) {
599   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
600       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
601     ERROR("grpc: The `%s` config option needs exactly "
602           "two string argument (address and port).",
603           ci->key);
604     return -1;
605   }
606
607   auto listener = Listener();
608   listener.addr = grpc::string(ci->values[0].value.string);
609   listener.port = grpc::string(ci->values[1].value.string);
610   listener.ssl = nullptr;
611
612   auto ssl_opts = new (grpc::SslServerCredentialsOptions);
613   grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
614   bool use_ssl = false;
615
616   for (int i = 0; i < ci->children_num; i++) {
617     oconfig_item_t *child = ci->children + i;
618
619     if (!strcasecmp("EnableSSL", child->key)) {
620       if (cf_util_get_boolean(child, &use_ssl)) {
621         ERROR("grpc: Option `%s` expects a boolean value", child->key);
622         return -1;
623       }
624     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
625       char *certs = NULL;
626       if (cf_util_get_string(child, &certs)) {
627         ERROR("grpc: Option `%s` expects a string value", child->key);
628         return -1;
629       }
630       ssl_opts->pem_root_certs = read_file(certs);
631     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
632       char *key = NULL;
633       if (cf_util_get_string(child, &key)) {
634         ERROR("grpc: Option `%s` expects a string value", child->key);
635         return -1;
636       }
637       pkcp.private_key = read_file(key);
638     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
639       char *cert = NULL;
640       if (cf_util_get_string(child, &cert)) {
641         ERROR("grpc: Option `%s` expects a string value", child->key);
642         return -1;
643       }
644       pkcp.cert_chain = read_file(cert);
645     } else {
646       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
647               ci->key);
648     }
649   }
650
651   ssl_opts->pem_key_cert_pairs.push_back(pkcp);
652   if (use_ssl)
653     listener.ssl = ssl_opts;
654   else
655     delete (ssl_opts);
656
657   listeners.push_back(listener);
658   return 0;
659 } /* c_grpc_config_listen() */
660
661 static int c_grpc_config_server(oconfig_item_t *ci) {
662   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
663       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
664     ERROR("grpc: The `%s` config option needs exactly "
665           "two string argument (address and port).",
666           ci->key);
667     return -1;
668   }
669
670   grpc::SslCredentialsOptions ssl_opts;
671   bool use_ssl = false;
672
673   for (int i = 0; i < ci->children_num; i++) {
674     oconfig_item_t *child = ci->children + i;
675
676     if (!strcasecmp("EnableSSL", child->key)) {
677       if (cf_util_get_boolean(child, &use_ssl)) {
678         return -1;
679       }
680     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
681       char *certs = NULL;
682       if (cf_util_get_string(child, &certs)) {
683         return -1;
684       }
685       ssl_opts.pem_root_certs = read_file(certs);
686     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
687       char *key = NULL;
688       if (cf_util_get_string(child, &key)) {
689         return -1;
690       }
691       ssl_opts.pem_private_key = read_file(key);
692     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
693       char *cert = NULL;
694       if (cf_util_get_string(child, &cert)) {
695         return -1;
696       }
697       ssl_opts.pem_cert_chain = read_file(cert);
698     } else {
699       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
700               ci->key);
701     }
702   }
703
704   auto node = grpc::string(ci->values[0].value.string);
705   auto service = grpc::string(ci->values[1].value.string);
706   auto addr = node + ":" + service;
707
708   CollectdClient *client;
709   if (use_ssl) {
710     auto channel_creds = grpc::SslCredentials(ssl_opts);
711     auto channel = grpc::CreateChannel(addr, channel_creds);
712     client = new CollectdClient(channel);
713   } else {
714     auto channel =
715         grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
716     client = new CollectdClient(channel);
717   }
718
719   auto callback_name = grpc::string("grpc/") + addr;
720   user_data_t ud = {
721       .data = client, .free_func = c_grpc_destroy_write_callback,
722   };
723
724   plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
725   return 0;
726 } /* c_grpc_config_server() */
727
728 static int c_grpc_config(oconfig_item_t *ci) {
729   int i;
730
731   for (i = 0; i < ci->children_num; i++) {
732     oconfig_item_t *child = ci->children + i;
733
734     if (!strcasecmp("Listen", child->key)) {
735       if (c_grpc_config_listen(child))
736         return -1;
737     } else if (!strcasecmp("Server", child->key)) {
738       if (c_grpc_config_server(child))
739         return -1;
740     }
741
742     else {
743       WARNING("grpc: Option `%s` not allowed here.", child->key);
744     }
745   }
746
747   return 0;
748 } /* c_grpc_config() */
749
750 static int c_grpc_init(void) {
751   server = new CollectdServer();
752   if (!server) {
753     ERROR("grpc: Failed to create server");
754     return -1;
755   }
756
757   server->Start();
758   return 0;
759 } /* c_grpc_init() */
760
761 static int c_grpc_shutdown(void) {
762   if (!server)
763     return 0;
764
765   server->Shutdown();
766
767   delete server;
768   server = nullptr;
769
770   return 0;
771 } /* c_grpc_shutdown() */
772
773 void module_register(void) {
774   plugin_register_complex_config("grpc", c_grpc_config);
775   plugin_register_init("grpc", c_grpc_init);
776   plugin_register_shutdown("grpc", c_grpc_shutdown);
777 } /* module_register() */
778 } /* extern "C" */