2 * collectd - src/grpc.cc
3 * Copyright (C) 2015-2016 Sebastian Harl
4 * Copyright (C) 2016 Florian octo Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastian Harl <sh at tokkee.org>
26 * Florian octo Forster <octo at collectd.org>
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
37 #include "collectd.grpc.pb.h"
47 #include "daemon/utils_cache.h"
50 using collectd::Collectd;
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
57 using google::protobuf::util::TimeUtil;
59 typedef google::protobuf::Map<grpc::string, collectd::types::MetadataValue>
70 grpc::SslServerCredentialsOptions *ssl;
72 static std::vector<Listener> listeners;
73 static grpc::string default_addr("0.0.0.0:50051");
79 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
80 if (fnmatch(matcher->host, vl->host, 0))
83 if (fnmatch(matcher->plugin, vl->plugin, 0))
85 if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
88 if (fnmatch(matcher->type, vl->type, 0))
90 if (fnmatch(matcher->type_instance, vl->type_instance, 0))
96 static grpc::string read_file(const char *filename) {
98 grpc::string s, content;
102 ERROR("grpc: Failed to open '%s'", filename);
106 while (std::getline(f, s)) {
108 content.push_back('\n');
118 static void marshal_ident(const value_list_t *vl,
119 collectd::types::Identifier *msg) {
120 msg->set_host(vl->host);
121 msg->set_plugin(vl->plugin);
122 if (vl->plugin_instance[0] != '\0')
123 msg->set_plugin_instance(vl->plugin_instance);
124 msg->set_type(vl->type);
125 if (vl->type_instance[0] != '\0')
126 msg->set_type_instance(vl->type_instance);
127 } /* marshal_ident */
129 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
130 value_list_t *vl, bool require_fields) {
134 if (!s.length() && require_fields)
135 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136 grpc::string("missing host name"));
137 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
140 if (!s.length() && require_fields)
141 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
142 grpc::string("missing plugin name"));
143 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
146 if (!s.length() && require_fields)
147 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
148 grpc::string("missing type name"));
149 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
151 s = msg.plugin_instance();
152 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
154 s = msg.type_instance();
155 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
157 return grpc::Status::OK;
158 } /* unmarshal_ident() */
160 static grpc::Status marshal_meta_data(meta_data_t *meta,
161 grpcMetadata *mutable_meta_data) {
162 char **meta_data_keys = nullptr;
163 int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys);
164 if (meta_data_keys_len < 0) {
165 return grpc::Status(grpc::StatusCode::INTERNAL,
166 grpc::string("error getting metadata keys"));
169 for (int i = 0; i < meta_data_keys_len; i++) {
170 char *key = meta_data_keys[i];
171 int md_type = meta_data_type(meta, key);
173 collectd::types::MetadataValue md_value;
179 if (meta_data_get_string(meta, key, &md_string) != 0 ||
180 md_string == nullptr) {
181 strarray_free(meta_data_keys, meta_data_keys_len);
182 return grpc::Status(grpc::StatusCode::INTERNAL,
183 grpc::string("missing metadata"));
185 md_value.set_string_value(md_string);
188 case MD_TYPE_SIGNED_INT:
190 if (meta_data_get_signed_int(meta, key, &int64_value) != 0) {
191 strarray_free(meta_data_keys, meta_data_keys_len);
192 return grpc::Status(grpc::StatusCode::INTERNAL,
193 grpc::string("missing metadata"));
195 md_value.set_int64_value(int64_value);
197 case MD_TYPE_UNSIGNED_INT:
198 uint64_t uint64_value;
199 if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) {
200 strarray_free(meta_data_keys, meta_data_keys_len);
201 return grpc::Status(grpc::StatusCode::INTERNAL,
202 grpc::string("missing metadata"));
204 md_value.set_uint64_value(uint64_value);
208 if (meta_data_get_double(meta, key, &double_value) != 0) {
209 strarray_free(meta_data_keys, meta_data_keys_len);
210 return grpc::Status(grpc::StatusCode::INTERNAL,
211 grpc::string("missing metadata"));
213 md_value.set_double_value(double_value);
215 case MD_TYPE_BOOLEAN:
217 if (meta_data_get_boolean(meta, key, &bool_value) != 0) {
218 strarray_free(meta_data_keys, meta_data_keys_len);
219 return grpc::Status(grpc::StatusCode::INTERNAL,
220 grpc::string("missing metadata"));
222 md_value.set_bool_value(bool_value);
225 strarray_free(meta_data_keys, meta_data_keys_len);
226 ERROR("grpc: invalid metadata type (%d)", md_type);
227 return grpc::Status(grpc::StatusCode::INTERNAL,
228 grpc::string("unknown metadata type"));
231 (*mutable_meta_data)[grpc::string(key)] = md_value;
233 strarray_free(meta_data_keys, meta_data_keys_len);
236 return grpc::Status::OK;
239 static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata,
240 meta_data_t **md_out) {
241 *md_out = meta_data_create();
242 if (*md_out == nullptr) {
243 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
244 grpc::string("failed to create metadata list"));
246 for (auto kv : rpc_metadata) {
247 auto k = kv.first.c_str();
250 // The meta_data collection individually allocates copies of the keys and
251 // string values for each entry, so it's safe for us to pass a reference
252 // to our short-lived strings.
254 switch (v.value_case()) {
255 case collectd::types::MetadataValue::ValueCase::kStringValue:
256 meta_data_add_string(*md_out, k, v.string_value().c_str());
258 case collectd::types::MetadataValue::ValueCase::kInt64Value:
259 meta_data_add_signed_int(*md_out, k, v.int64_value());
261 case collectd::types::MetadataValue::ValueCase::kUint64Value:
262 meta_data_add_unsigned_int(*md_out, k, v.uint64_value());
264 case collectd::types::MetadataValue::ValueCase::kDoubleValue:
265 meta_data_add_double(*md_out, k, v.double_value());
267 case collectd::types::MetadataValue::ValueCase::kBoolValue:
268 meta_data_add_boolean(*md_out, k, v.bool_value());
271 meta_data_destroy(*md_out);
272 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
273 grpc::string("Metadata of unknown type"));
276 return grpc::Status::OK;
279 static grpc::Status marshal_value_list(const value_list_t *vl,
280 collectd::types::ValueList *msg) {
281 auto id = msg->mutable_identifier();
282 marshal_ident(vl, id);
284 auto ds = plugin_get_ds(vl->type);
285 if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
286 return grpc::Status(grpc::StatusCode::INTERNAL,
287 grpc::string("failed to retrieve data-set for values"));
290 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
291 auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
292 msg->set_allocated_time(new google::protobuf::Timestamp(t));
293 msg->set_allocated_interval(new google::protobuf::Duration(d));
295 msg->clear_meta_data();
296 if (vl->meta != nullptr) {
297 grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data());
303 for (size_t i = 0; i < vl->values_len; ++i) {
304 auto v = msg->add_values();
305 int value_type = ds->ds[i].type;
306 switch (value_type) {
307 case DS_TYPE_COUNTER:
308 v->set_counter(vl->values[i].counter);
311 v->set_gauge(vl->values[i].gauge);
314 v->set_derive(vl->values[i].derive);
316 case DS_TYPE_ABSOLUTE:
317 v->set_absolute(vl->values[i].absolute);
320 ERROR("grpc: invalid value type (%d)", value_type);
321 return grpc::Status(grpc::StatusCode::INTERNAL,
322 grpc::string("unknown value type"));
325 auto name = msg->add_ds_names();
326 name->assign(ds->ds[i].name);
329 return grpc::Status::OK;
330 } /* marshal_value_list */
332 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
334 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
336 NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
338 auto status = unmarshal_ident(msg.identifier(), vl, true);
342 status = unmarshal_meta_data(msg.meta_data(), &vl->meta);
346 value_t *values = NULL;
347 size_t values_len = 0;
349 status = grpc::Status::OK;
350 for (auto v : msg.values()) {
352 (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
354 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
355 grpc::string("failed to allocate values array"));
360 val = values + values_len;
363 switch (v.value_case()) {
364 case collectd::types::Value::ValueCase::kCounter:
365 val->counter = counter_t(v.counter());
367 case collectd::types::Value::ValueCase::kGauge:
368 val->gauge = gauge_t(v.gauge());
370 case collectd::types::Value::ValueCase::kDerive:
371 val->derive = derive_t(v.derive());
373 case collectd::types::Value::ValueCase::kAbsolute:
374 val->absolute = absolute_t(v.absolute());
377 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
378 grpc::string("unknown value type"));
387 vl->values_len = values_len;
389 meta_data_destroy(vl->meta);
394 } /* unmarshal_value_list() */
399 class CollectdImpl : public collectd::Collectd::Service {
402 QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
403 grpc::ServerWriter<QueryValuesResponse> *writer) override {
405 auto status = unmarshal_ident(req->identifier(), &match, false);
410 std::queue<value_list_t> value_lists;
411 status = this->queryValuesRead(&match, &value_lists);
413 status = this->queryValuesWrite(ctx, writer, &value_lists);
416 while (!value_lists.empty()) {
417 auto vl = value_lists.front();
420 meta_data_destroy(vl.meta);
426 grpc::Status PutValues(grpc::ServerContext *ctx,
427 grpc::ServerReader<PutValuesRequest> *reader,
428 PutValuesResponse *res) override {
429 PutValuesRequest req;
431 while (reader->Read(&req)) {
432 value_list_t vl = {0};
433 auto status = unmarshal_value_list(req.value_list(), &vl);
437 if (plugin_dispatch_values(&vl))
439 grpc::StatusCode::INTERNAL,
440 grpc::string("failed to enqueue values for writing"));
444 return grpc::Status::OK;
448 grpc::Status queryValuesRead(value_list_t const *match,
449 std::queue<value_list_t> *value_lists) {
451 if ((iter = uc_get_iterator()) == NULL) {
453 grpc::StatusCode::INTERNAL,
454 grpc::string("failed to query values: cannot create iterator"));
457 grpc::Status status = grpc::Status::OK;
459 while (uc_iterator_next(iter, &name) == 0) {
461 if (parse_identifier_vl(name, &vl) != 0) {
462 status = grpc::Status(grpc::StatusCode::INTERNAL,
463 grpc::string("failed to parse identifier"));
467 if (!ident_matches(&vl, match))
469 if (uc_iterator_get_time(iter, &vl.time) < 0) {
471 grpc::Status(grpc::StatusCode::INTERNAL,
472 grpc::string("failed to retrieve value timestamp"));
475 if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
477 grpc::Status(grpc::StatusCode::INTERNAL,
478 grpc::string("failed to retrieve value interval"));
481 if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
482 status = grpc::Status(grpc::StatusCode::INTERNAL,
483 grpc::string("failed to retrieve values"));
486 if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
488 grpc::Status(grpc::StatusCode::INTERNAL,
489 grpc::string("failed to retrieve value metadata"));
492 value_lists->push(vl);
493 } // while (uc_iterator_next(iter, &name) == 0)
495 uc_iterator_destroy(iter);
499 grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
500 grpc::ServerWriter<QueryValuesResponse> *writer,
501 std::queue<value_list_t> *value_lists) {
502 while (!value_lists->empty()) {
503 auto vl = value_lists->front();
504 QueryValuesResponse res;
507 auto status = marshal_value_list(&vl, res.mutable_value_list());
512 if (!writer->Write(res)) {
513 return grpc::Status::CANCELLED;
520 return grpc::Status::OK;
525 * gRPC server implementation
527 class CollectdServer final {
530 auto auth = grpc::InsecureServerCredentials();
532 grpc::ServerBuilder builder;
534 if (listeners.empty()) {
535 builder.AddListeningPort(default_addr, auth);
536 INFO("grpc: Listening on %s", default_addr.c_str());
538 for (auto l : listeners) {
539 grpc::string addr = l.addr + ":" + l.port;
541 auto use_ssl = grpc::string("");
543 if (l.ssl != nullptr) {
544 use_ssl = grpc::string(" (SSL enabled)");
545 a = grpc::SslServerCredentials(*l.ssl);
548 builder.AddListeningPort(addr, a);
549 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
553 builder.RegisterService(&collectd_service_);
555 server_ = builder.BuildAndStart();
558 void Shutdown() { server_->Shutdown(); } /* Shutdown() */
561 CollectdImpl collectd_service_;
563 std::unique_ptr<grpc::Server> server_;
564 }; /* class CollectdServer */
566 class CollectdClient final {
568 CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
569 : stub_(Collectd::NewStub(channel)) {}
571 int PutValues(value_list_t const *vl) {
572 grpc::ClientContext ctx;
574 PutValuesRequest req;
575 auto status = marshal_value_list(vl, req.mutable_value_list());
577 ERROR("grpc: Marshalling value_list_t failed.");
581 PutValuesResponse res;
582 auto stream = stub_->PutValues(&ctx, &res);
583 if (!stream->Write(req)) {
584 NOTICE("grpc: Broken stream.");
585 /* intentionally not returning. */
588 stream->WritesDone();
589 status = stream->Finish();
591 ERROR("grpc: Error while closing stream.");
596 } /* int PutValues */
599 std::unique_ptr<Collectd::Stub> stub_;
602 static CollectdServer *server = nullptr;
605 * collectd plugin interface
608 static void c_grpc_destroy_write_callback(void *ptr) {
609 delete (CollectdClient *)ptr;
612 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
613 value_list_t const *vl, user_data_t *ud) {
614 CollectdClient *c = (CollectdClient *)ud->data;
615 return c->PutValues(vl);
618 static int c_grpc_config_listen(oconfig_item_t *ci) {
619 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
620 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
621 ERROR("grpc: The `%s` config option needs exactly "
622 "two string argument (address and port).",
627 auto listener = Listener();
628 listener.addr = grpc::string(ci->values[0].value.string);
629 listener.port = grpc::string(ci->values[1].value.string);
630 listener.ssl = nullptr;
632 auto ssl_opts = new grpc::SslServerCredentialsOptions(
633 GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY);
634 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
635 bool use_ssl = false;
637 for (int i = 0; i < ci->children_num; i++) {
638 oconfig_item_t *child = ci->children + i;
640 if (!strcasecmp("EnableSSL", child->key)) {
641 if (cf_util_get_boolean(child, &use_ssl)) {
642 ERROR("grpc: Option `%s` expects a boolean value", child->key);
645 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
647 if (cf_util_get_string(child, &certs)) {
648 ERROR("grpc: Option `%s` expects a string value", child->key);
651 ssl_opts->pem_root_certs = read_file(certs);
652 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
654 if (cf_util_get_string(child, &key)) {
655 ERROR("grpc: Option `%s` expects a string value", child->key);
658 pkcp.private_key = read_file(key);
659 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
661 if (cf_util_get_string(child, &cert)) {
662 ERROR("grpc: Option `%s` expects a string value", child->key);
665 pkcp.cert_chain = read_file(cert);
666 } else if (!strcasecmp("VerifyPeer", child->key)) {
668 if (cf_util_get_boolean(child, &verify)) {
671 ssl_opts->client_certificate_request =
672 verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY
673 : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE;
675 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
680 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
682 listener.ssl = ssl_opts;
686 listeners.push_back(listener);
688 } /* c_grpc_config_listen() */
690 static int c_grpc_config_server(oconfig_item_t *ci) {
691 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
692 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
693 ERROR("grpc: The `%s` config option needs exactly "
694 "two string argument (address and port).",
699 grpc::SslCredentialsOptions ssl_opts;
700 bool use_ssl = false;
702 for (int i = 0; i < ci->children_num; i++) {
703 oconfig_item_t *child = ci->children + i;
705 if (!strcasecmp("EnableSSL", child->key)) {
706 if (cf_util_get_boolean(child, &use_ssl)) {
709 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
711 if (cf_util_get_string(child, &certs)) {
714 ssl_opts.pem_root_certs = read_file(certs);
715 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
717 if (cf_util_get_string(child, &key)) {
720 ssl_opts.pem_private_key = read_file(key);
721 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
723 if (cf_util_get_string(child, &cert)) {
726 ssl_opts.pem_cert_chain = read_file(cert);
728 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
733 auto node = grpc::string(ci->values[0].value.string);
734 auto service = grpc::string(ci->values[1].value.string);
735 auto addr = node + ":" + service;
737 CollectdClient *client;
739 auto channel_creds = grpc::SslCredentials(ssl_opts);
740 auto channel = grpc::CreateChannel(addr, channel_creds);
741 client = new CollectdClient(channel);
744 grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
745 client = new CollectdClient(channel);
748 auto callback_name = grpc::string("grpc/") + addr;
750 .data = client, .free_func = c_grpc_destroy_write_callback,
753 plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
755 } /* c_grpc_config_server() */
757 static int c_grpc_config(oconfig_item_t *ci) {
760 for (i = 0; i < ci->children_num; i++) {
761 oconfig_item_t *child = ci->children + i;
763 if (!strcasecmp("Listen", child->key)) {
764 if (c_grpc_config_listen(child))
766 } else if (!strcasecmp("Server", child->key)) {
767 if (c_grpc_config_server(child))
772 WARNING("grpc: Option `%s` not allowed here.", child->key);
777 } /* c_grpc_config() */
779 static int c_grpc_init(void) {
780 server = new CollectdServer();
782 ERROR("grpc: Failed to create server");
788 } /* c_grpc_init() */
790 static int c_grpc_shutdown(void) {
800 } /* c_grpc_shutdown() */
802 void module_register(void) {
803 plugin_register_complex_config("grpc", c_grpc_config);
804 plugin_register_init("grpc", c_grpc_init);
805 plugin_register_shutdown("grpc", c_grpc_shutdown);
806 } /* module_register() */