X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fgrpc.cc;h=977c5b2cc2a5392a7950bd9c7805dff272dbd6f9;hp=2f16dbcb0f23c099b602b415c2e2008bbcc6497f;hb=a9e50e9e30ecde17e167e271060c8183bfcbf407;hpb=9717b1a55d60d992c16e66e2ae5bdfb42f80aca8 diff --git a/src/grpc.cc b/src/grpc.cc index 2f16dbcb..977c5b2c 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -56,6 +56,9 @@ using collectd::QueryValuesResponse; using google::protobuf::util::TimeUtil; +typedef google::protobuf::Map + grpcMetadata; + /* * private types */ @@ -154,6 +157,125 @@ static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, return grpc::Status::OK; } /* unmarshal_ident() */ +static grpc::Status marshal_meta_data(meta_data_t *meta, + grpcMetadata *mutable_meta_data) { + char **meta_data_keys = nullptr; + int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys); + if (meta_data_keys_len < 0) { + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("error getting metadata keys")); + } + + for (int i = 0; i < meta_data_keys_len; i++) { + char *key = meta_data_keys[i]; + int md_type = meta_data_type(meta, key); + + collectd::types::MetadataValue md_value; + md_value.Clear(); + + switch (md_type) { + case MD_TYPE_STRING: + char *md_string; + if (meta_data_get_string(meta, key, &md_string) != 0 || + md_string == nullptr) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_string_value(md_string); + free(md_string); + break; + case MD_TYPE_SIGNED_INT: + int64_t int64_value; + if (meta_data_get_signed_int(meta, key, &int64_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_int64_value(int64_value); + break; + case MD_TYPE_UNSIGNED_INT: + uint64_t uint64_value; + if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_uint64_value(uint64_value); + break; + case MD_TYPE_DOUBLE: + double double_value; + if (meta_data_get_double(meta, key, &double_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_double_value(double_value); + break; + case MD_TYPE_BOOLEAN: + bool bool_value; + if (meta_data_get_boolean(meta, key, &bool_value) != 0) { + strarray_free(meta_data_keys, meta_data_keys_len); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("missing metadata")); + } + md_value.set_bool_value(bool_value); + break; + default: + strarray_free(meta_data_keys, meta_data_keys_len); + ERROR("grpc: invalid metadata type (%d)", md_type); + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("unknown metadata type")); + } + + (*mutable_meta_data)[grpc::string(key)] = md_value; + + strarray_free(meta_data_keys, meta_data_keys_len); + } + + return grpc::Status::OK; +} + +static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata, + meta_data_t **md_out) { + *md_out = meta_data_create(); + if (*md_out == nullptr) { + return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED, + grpc::string("failed to create metadata list")); + } + for (auto kv : rpc_metadata) { + auto k = kv.first.c_str(); + auto v = kv.second; + + // The meta_data collection individually allocates copies of the keys and + // string values for each entry, so it's safe for us to pass a reference + // to our short-lived strings. + + switch (v.value_case()) { + case collectd::types::MetadataValue::ValueCase::kStringValue: + meta_data_add_string(*md_out, k, v.string_value().c_str()); + break; + case collectd::types::MetadataValue::ValueCase::kInt64Value: + meta_data_add_signed_int(*md_out, k, v.int64_value()); + break; + case collectd::types::MetadataValue::ValueCase::kUint64Value: + meta_data_add_unsigned_int(*md_out, k, v.uint64_value()); + break; + case collectd::types::MetadataValue::ValueCase::kDoubleValue: + meta_data_add_double(*md_out, k, v.double_value()); + break; + case collectd::types::MetadataValue::ValueCase::kBoolValue: + meta_data_add_boolean(*md_out, k, v.bool_value()); + break; + default: + meta_data_destroy(*md_out); + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + grpc::string("Metadata of unknown type")); + } + } + return grpc::Status::OK; +} + static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg) { auto id = msg->mutable_identifier(); @@ -170,9 +292,18 @@ static grpc::Status marshal_value_list(const value_list_t *vl, msg->set_allocated_time(new google::protobuf::Timestamp(t)); msg->set_allocated_interval(new google::protobuf::Duration(d)); + msg->clear_meta_data(); + if (vl->meta != nullptr) { + grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data()); + if (!status.ok()) { + return status; + } + } + for (size_t i = 0; i < vl->values_len; ++i) { auto v = msg->add_values(); - switch (ds->ds[i].type) { + int value_type = ds->ds[i].type; + switch (value_type) { case DS_TYPE_COUNTER: v->set_counter(vl->values[i].counter); break; @@ -186,6 +317,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl, v->set_absolute(vl->values[i].absolute); break; default: + ERROR("grpc: invalid value type (%d)", value_type); return grpc::Status(grpc::StatusCode::INTERNAL, grpc::string("unknown value type")); } @@ -207,6 +339,10 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, if (!status.ok()) return status; + status = unmarshal_meta_data(msg.meta_data(), &vl->meta); + if (!status.ok()) + return status; + value_t *values = NULL; size_t values_len = 0; @@ -249,7 +385,8 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, if (status.ok()) { vl->values = values; vl->values_len = values_len; - } else if (values) { + } else { + meta_data_destroy(vl->meta); free(values); } @@ -280,6 +417,7 @@ public: auto vl = value_lists.front(); value_lists.pop(); sfree(vl.values); + meta_data_destroy(vl.meta); } return status; @@ -328,7 +466,6 @@ private: if (!ident_matches(&vl, match)) continue; - if (uc_iterator_get_time(iter, &vl.time) < 0) { status = grpc::Status(grpc::StatusCode::INTERNAL, @@ -346,6 +483,11 @@ private: grpc::string("failed to retrieve values")); break; } + if (uc_iterator_get_meta(iter, &vl.meta) < 0) { + status = + grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value metadata")); + } value_lists->push(vl); } // while (uc_iterator_next(iter, &name) == 0) @@ -487,7 +629,8 @@ static int c_grpc_config_listen(oconfig_item_t *ci) { listener.port = grpc::string(ci->values[1].value.string); listener.ssl = nullptr; - auto ssl_opts = new (grpc::SslServerCredentialsOptions); + auto ssl_opts = new grpc::SslServerCredentialsOptions( + GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY); grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {}; bool use_ssl = false; @@ -520,6 +663,14 @@ static int c_grpc_config_listen(oconfig_item_t *ci) { return -1; } pkcp.cert_chain = read_file(cert); + } else if (!strcasecmp("VerifyPeer", child->key)) { + _Bool verify = 0; + if (cf_util_get_boolean(child, &verify)) { + return -1; + } + ssl_opts->client_certificate_request = + verify ? GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY + : GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE; } else { WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key, ci->key);