2 * collectd - src/grpc.cc
3 * Copyright (C) 2015-2016 Sebastian Harl
4 * Copyright (C) 2016 Florian octo Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastian Harl <sh at tokkee.org>
26 * Florian octo Forster <octo at collectd.org>
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
37 #include "collectd.grpc.pb.h"
47 #include "daemon/utils_cache.h"
50 using collectd::Collectd;
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
57 using google::protobuf::util::TimeUtil;
59 typedef google::protobuf::Map<grpc::string, collectd::types::MetadataValue> grpcMetadata;
69 grpc::SslServerCredentialsOptions *ssl;
71 static std::vector<Listener> listeners;
72 static grpc::string default_addr("0.0.0.0:50051");
78 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
79 if (fnmatch(matcher->host, vl->host, 0))
82 if (fnmatch(matcher->plugin, vl->plugin, 0))
84 if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
87 if (fnmatch(matcher->type, vl->type, 0))
89 if (fnmatch(matcher->type_instance, vl->type_instance, 0))
95 static grpc::string read_file(const char *filename) {
97 grpc::string s, content;
101 ERROR("grpc: Failed to open '%s'", filename);
105 while (std::getline(f, s)) {
107 content.push_back('\n');
117 static void marshal_ident(const value_list_t *vl,
118 collectd::types::Identifier *msg) {
119 msg->set_host(vl->host);
120 msg->set_plugin(vl->plugin);
121 if (vl->plugin_instance[0] != '\0')
122 msg->set_plugin_instance(vl->plugin_instance);
123 msg->set_type(vl->type);
124 if (vl->type_instance[0] != '\0')
125 msg->set_type_instance(vl->type_instance);
126 } /* marshal_ident */
128 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
129 value_list_t *vl, bool require_fields) {
133 if (!s.length() && require_fields)
134 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
135 grpc::string("missing host name"));
136 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
139 if (!s.length() && require_fields)
140 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
141 grpc::string("missing plugin name"));
142 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
145 if (!s.length() && require_fields)
146 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
147 grpc::string("missing type name"));
148 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
150 s = msg.plugin_instance();
151 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
153 s = msg.type_instance();
154 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
156 return grpc::Status::OK;
157 } /* unmarshal_ident() */
159 static grpc::Status marshal_meta_data(meta_data_t *meta,
160 grpcMetadata *mutable_meta_data) {
161 char **meta_data_keys = nullptr;
162 int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys);
163 if (meta_data_keys_len < 0) {
164 return grpc::Status(grpc::StatusCode::INTERNAL,
165 grpc::string("error getting metadata keys"));
168 for (int i = 0; i < meta_data_keys_len; i++) {
169 char *key = meta_data_keys[i];
170 int md_type = meta_data_type(meta, key);
172 collectd::types::MetadataValue md_value;
178 if (meta_data_get_string(meta, key, &md_string) != 0 || md_string == nullptr) {
179 strarray_free(meta_data_keys, meta_data_keys_len);
180 return grpc::Status(grpc::StatusCode::INTERNAL,
181 grpc::string("missing metadata"));
183 md_value.set_string_value(md_string);
186 case MD_TYPE_SIGNED_INT:
188 if (meta_data_get_signed_int(meta, key, &int64_value) != 0) {
189 strarray_free(meta_data_keys, meta_data_keys_len);
190 return grpc::Status(grpc::StatusCode::INTERNAL,
191 grpc::string("missing metadata"));
193 md_value.set_int64_value(int64_value);
195 case MD_TYPE_UNSIGNED_INT:
196 uint64_t uint64_value;
197 if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) {
198 strarray_free(meta_data_keys, meta_data_keys_len);
199 return grpc::Status(grpc::StatusCode::INTERNAL,
200 grpc::string("missing metadata"));
202 md_value.set_uint64_value(uint64_value);
206 if (meta_data_get_double(meta, key, &double_value) != 0) {
207 strarray_free(meta_data_keys, meta_data_keys_len);
208 return grpc::Status(grpc::StatusCode::INTERNAL,
209 grpc::string("missing metadata"));
211 md_value.set_double_value(double_value);
213 case MD_TYPE_BOOLEAN:
215 if (meta_data_get_boolean(meta, key, &bool_value) != 0) {
216 strarray_free(meta_data_keys, meta_data_keys_len);
217 return grpc::Status(grpc::StatusCode::INTERNAL,
218 grpc::string("missing metadata"));
220 md_value.set_bool_value(bool_value);
223 strarray_free(meta_data_keys, meta_data_keys_len);
224 ERROR("grpc: invalid metadata type (%d)", md_type);
225 return grpc::Status(grpc::StatusCode::INTERNAL,
226 grpc::string("unknown metadata type"));
229 (*mutable_meta_data)[grpc::string(key)] = md_value;
231 strarray_free(meta_data_keys, meta_data_keys_len);
234 return grpc::Status::OK;
237 static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata,
238 meta_data_t **md_out) {
239 *md_out = meta_data_create();
240 if (*md_out == nullptr) {
241 return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
242 grpc::string("failed to metadata list"));
244 for (auto kv: rpc_metadata) {
245 auto k = kv.first.c_str();
248 // The meta_data collection individually allocates copies of the keys and
249 // string values for each entry, so it's safe for us to pass a reference
250 // to our short-lived strings.
252 switch (v.value_case()) {
253 case collectd::types::MetadataValue::ValueCase::kStringValue:
254 meta_data_add_string(*md_out, k, v.string_value().c_str());
256 case collectd::types::MetadataValue::ValueCase::kInt64Value:
257 meta_data_add_signed_int(*md_out, k, v.int64_value());
259 case collectd::types::MetadataValue::ValueCase::kUint64Value:
260 meta_data_add_unsigned_int(*md_out, k, v.uint64_value());
262 case collectd::types::MetadataValue::ValueCase::kDoubleValue:
263 meta_data_add_double(*md_out, k, v.double_value());
265 case collectd::types::MetadataValue::ValueCase::kBoolValue:
266 meta_data_add_boolean(*md_out, k, v.bool_value());
269 meta_data_destroy(*md_out);
270 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
271 grpc::string("Metadata of unknown type"));
274 return grpc::Status::OK;
277 static grpc::Status marshal_value_list(const value_list_t *vl,
278 collectd::types::ValueList *msg) {
279 auto id = msg->mutable_identifier();
280 marshal_ident(vl, id);
282 auto ds = plugin_get_ds(vl->type);
283 if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
284 return grpc::Status(grpc::StatusCode::INTERNAL,
285 grpc::string("failed to retrieve data-set for values"));
288 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
289 auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
290 msg->set_allocated_time(new google::protobuf::Timestamp(t));
291 msg->set_allocated_interval(new google::protobuf::Duration(d));
293 msg->clear_meta_data();
294 if (vl->meta != nullptr) {
295 grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data());
301 for (size_t i = 0; i < vl->values_len; ++i) {
302 auto v = msg->add_values();
303 int value_type = ds->ds[i].type;
304 switch (value_type) {
305 case DS_TYPE_COUNTER:
306 v->set_counter(vl->values[i].counter);
309 v->set_gauge(vl->values[i].gauge);
312 v->set_derive(vl->values[i].derive);
314 case DS_TYPE_ABSOLUTE:
315 v->set_absolute(vl->values[i].absolute);
318 ERROR("grpc: invalid value type (%d)", value_type);
319 return grpc::Status(grpc::StatusCode::INTERNAL,
320 grpc::string("unknown value type"));
323 auto name = msg->add_ds_names();
324 name->assign(ds->ds[i].name);
327 return grpc::Status::OK;
328 } /* marshal_value_list */
330 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
332 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
334 NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
336 auto status = unmarshal_ident(msg.identifier(), vl, true);
340 status = unmarshal_meta_data(msg.meta_data(), &vl->meta);
344 value_t *values = NULL;
345 size_t values_len = 0;
347 status = grpc::Status::OK;
348 for (auto v : msg.values()) {
350 (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
352 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
353 grpc::string("failed to allocate values array"));
358 val = values + values_len;
361 switch (v.value_case()) {
362 case collectd::types::Value::ValueCase::kCounter:
363 val->counter = counter_t(v.counter());
365 case collectd::types::Value::ValueCase::kGauge:
366 val->gauge = gauge_t(v.gauge());
368 case collectd::types::Value::ValueCase::kDerive:
369 val->derive = derive_t(v.derive());
371 case collectd::types::Value::ValueCase::kAbsolute:
372 val->absolute = absolute_t(v.absolute());
375 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
376 grpc::string("unknown value type"));
385 vl->values_len = values_len;
387 meta_data_destroy(vl->meta);
392 } /* unmarshal_value_list() */
397 class CollectdImpl : public collectd::Collectd::Service {
400 QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
401 grpc::ServerWriter<QueryValuesResponse> *writer) override {
403 auto status = unmarshal_ident(req->identifier(), &match, false);
408 std::queue<value_list_t> value_lists;
409 status = this->queryValuesRead(&match, &value_lists);
411 status = this->queryValuesWrite(ctx, writer, &value_lists);
414 while (!value_lists.empty()) {
415 auto vl = value_lists.front();
418 meta_data_destroy(vl.meta);
424 grpc::Status PutValues(grpc::ServerContext *ctx,
425 grpc::ServerReader<PutValuesRequest> *reader,
426 PutValuesResponse *res) override {
427 PutValuesRequest req;
429 while (reader->Read(&req)) {
430 value_list_t vl = {0};
431 auto status = unmarshal_value_list(req.value_list(), &vl);
435 if (plugin_dispatch_values(&vl))
437 grpc::StatusCode::INTERNAL,
438 grpc::string("failed to enqueue values for writing"));
442 return grpc::Status::OK;
446 grpc::Status queryValuesRead(value_list_t const *match,
447 std::queue<value_list_t> *value_lists) {
449 if ((iter = uc_get_iterator()) == NULL) {
451 grpc::StatusCode::INTERNAL,
452 grpc::string("failed to query values: cannot create iterator"));
455 grpc::Status status = grpc::Status::OK;
457 while (uc_iterator_next(iter, &name) == 0) {
459 if (parse_identifier_vl(name, &vl) != 0) {
460 status = grpc::Status(grpc::StatusCode::INTERNAL,
461 grpc::string("failed to parse identifier"));
465 if (!ident_matches(&vl, match))
467 if (uc_iterator_get_time(iter, &vl.time) < 0) {
469 grpc::Status(grpc::StatusCode::INTERNAL,
470 grpc::string("failed to retrieve value timestamp"));
473 if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
475 grpc::Status(grpc::StatusCode::INTERNAL,
476 grpc::string("failed to retrieve value interval"));
479 if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
480 status = grpc::Status(grpc::StatusCode::INTERNAL,
481 grpc::string("failed to retrieve values"));
484 if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
485 status = grpc::Status(grpc::StatusCode::INTERNAL,
486 grpc::string("failed to retrieve value metadata"));
489 value_lists->push(vl);
490 } // while (uc_iterator_next(iter, &name) == 0)
492 uc_iterator_destroy(iter);
496 grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
497 grpc::ServerWriter<QueryValuesResponse> *writer,
498 std::queue<value_list_t> *value_lists) {
499 while (!value_lists->empty()) {
500 auto vl = value_lists->front();
501 QueryValuesResponse res;
504 auto status = marshal_value_list(&vl, res.mutable_value_list());
509 if (!writer->Write(res)) {
510 return grpc::Status::CANCELLED;
517 return grpc::Status::OK;
522 * gRPC server implementation
524 class CollectdServer final {
527 auto auth = grpc::InsecureServerCredentials();
529 grpc::ServerBuilder builder;
531 if (listeners.empty()) {
532 builder.AddListeningPort(default_addr, auth);
533 INFO("grpc: Listening on %s", default_addr.c_str());
535 for (auto l : listeners) {
536 grpc::string addr = l.addr + ":" + l.port;
538 auto use_ssl = grpc::string("");
540 if (l.ssl != nullptr) {
541 use_ssl = grpc::string(" (SSL enabled)");
542 a = grpc::SslServerCredentials(*l.ssl);
545 builder.AddListeningPort(addr, a);
546 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
550 builder.RegisterService(&collectd_service_);
552 server_ = builder.BuildAndStart();
555 void Shutdown() { server_->Shutdown(); } /* Shutdown() */
558 CollectdImpl collectd_service_;
560 std::unique_ptr<grpc::Server> server_;
561 }; /* class CollectdServer */
563 class CollectdClient final {
565 CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
566 : stub_(Collectd::NewStub(channel)) {}
568 int PutValues(value_list_t const *vl) {
569 grpc::ClientContext ctx;
571 PutValuesRequest req;
572 auto status = marshal_value_list(vl, req.mutable_value_list());
574 ERROR("grpc: Marshalling value_list_t failed.");
578 PutValuesResponse res;
579 auto stream = stub_->PutValues(&ctx, &res);
580 if (!stream->Write(req)) {
581 NOTICE("grpc: Broken stream.");
582 /* intentionally not returning. */
585 stream->WritesDone();
586 status = stream->Finish();
588 ERROR("grpc: Error while closing stream.");
593 } /* int PutValues */
596 std::unique_ptr<Collectd::Stub> stub_;
599 static CollectdServer *server = nullptr;
602 * collectd plugin interface
605 static void c_grpc_destroy_write_callback(void *ptr) {
606 delete (CollectdClient *)ptr;
609 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
610 value_list_t const *vl, user_data_t *ud) {
611 CollectdClient *c = (CollectdClient *)ud->data;
612 return c->PutValues(vl);
615 static int c_grpc_config_listen(oconfig_item_t *ci) {
616 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
617 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
618 ERROR("grpc: The `%s` config option needs exactly "
619 "two string argument (address and port).",
624 auto listener = Listener();
625 listener.addr = grpc::string(ci->values[0].value.string);
626 listener.port = grpc::string(ci->values[1].value.string);
627 listener.ssl = nullptr;
629 auto ssl_opts = new (grpc::SslServerCredentialsOptions);
630 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
631 bool use_ssl = false;
633 for (int i = 0; i < ci->children_num; i++) {
634 oconfig_item_t *child = ci->children + i;
636 if (!strcasecmp("EnableSSL", child->key)) {
637 if (cf_util_get_boolean(child, &use_ssl)) {
638 ERROR("grpc: Option `%s` expects a boolean value", child->key);
641 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
643 if (cf_util_get_string(child, &certs)) {
644 ERROR("grpc: Option `%s` expects a string value", child->key);
647 ssl_opts->pem_root_certs = read_file(certs);
648 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
650 if (cf_util_get_string(child, &key)) {
651 ERROR("grpc: Option `%s` expects a string value", child->key);
654 pkcp.private_key = read_file(key);
655 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
657 if (cf_util_get_string(child, &cert)) {
658 ERROR("grpc: Option `%s` expects a string value", child->key);
661 pkcp.cert_chain = read_file(cert);
663 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
668 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
670 listener.ssl = ssl_opts;
674 listeners.push_back(listener);
676 } /* c_grpc_config_listen() */
678 static int c_grpc_config_server(oconfig_item_t *ci) {
679 if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
680 (ci->values[1].type != OCONFIG_TYPE_STRING)) {
681 ERROR("grpc: The `%s` config option needs exactly "
682 "two string argument (address and port).",
687 grpc::SslCredentialsOptions ssl_opts;
688 bool use_ssl = false;
690 for (int i = 0; i < ci->children_num; i++) {
691 oconfig_item_t *child = ci->children + i;
693 if (!strcasecmp("EnableSSL", child->key)) {
694 if (cf_util_get_boolean(child, &use_ssl)) {
697 } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
699 if (cf_util_get_string(child, &certs)) {
702 ssl_opts.pem_root_certs = read_file(certs);
703 } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
705 if (cf_util_get_string(child, &key)) {
708 ssl_opts.pem_private_key = read_file(key);
709 } else if (!strcasecmp("SSLCertificateFile", child->key)) {
711 if (cf_util_get_string(child, &cert)) {
714 ssl_opts.pem_cert_chain = read_file(cert);
716 WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
721 auto node = grpc::string(ci->values[0].value.string);
722 auto service = grpc::string(ci->values[1].value.string);
723 auto addr = node + ":" + service;
725 CollectdClient *client;
727 auto channel_creds = grpc::SslCredentials(ssl_opts);
728 auto channel = grpc::CreateChannel(addr, channel_creds);
729 client = new CollectdClient(channel);
732 grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
733 client = new CollectdClient(channel);
736 auto callback_name = grpc::string("grpc/") + addr;
738 .data = client, .free_func = c_grpc_destroy_write_callback,
741 plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
743 } /* c_grpc_config_server() */
745 static int c_grpc_config(oconfig_item_t *ci) {
748 for (i = 0; i < ci->children_num; i++) {
749 oconfig_item_t *child = ci->children + i;
751 if (!strcasecmp("Listen", child->key)) {
752 if (c_grpc_config_listen(child))
754 } else if (!strcasecmp("Server", child->key)) {
755 if (c_grpc_config_server(child))
760 WARNING("grpc: Option `%s` not allowed here.", child->key);
765 } /* c_grpc_config() */
767 static int c_grpc_init(void) {
768 server = new CollectdServer();
770 ERROR("grpc: Failed to create server");
776 } /* c_grpc_init() */
778 static int c_grpc_shutdown(void) {
788 } /* c_grpc_shutdown() */
790 void module_register(void) {
791 plugin_register_complex_config("grpc", c_grpc_config);
792 plugin_register_init("grpc", c_grpc_init);
793 plugin_register_shutdown("grpc", c_grpc_shutdown);
794 } /* module_register() */