/**
* collectd - src/grpc.cc
- * Copyright (C) 2015 Sebastian Harl
+ * Copyright (C) 2015-2016 Sebastian Harl
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
#include <grpc++/grpc++.h>
#include <google/protobuf/util/time_util.h>
+#include <fstream>
+#include <iostream>
+#include <vector>
+
#include "collectd.grpc.pb.h"
extern "C" {
+#include <fnmatch.h>
#include <stdbool.h>
-#include <pthread.h>
#include "collectd.h"
#include "common.h"
#include "plugin.h"
#include "daemon/utils_cache.h"
+}
- typedef struct {
- char *addr;
- char *port;
- } listener_t;
+using collectd::Collectd;
- static listener_t *listeners;
- static size_t listeners_num;
-}
+using collectd::DispatchValuesRequest;
+using collectd::DispatchValuesReply;
+using collectd::QueryValuesRequest;
+using collectd::QueryValuesReply;
using google::protobuf::util::TimeUtil;
/*
- * proto conversion
+ * private types
*/
-static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
+struct Listener {
+ grpc::string addr;
+ grpc::string port;
+
+ grpc::SslServerCredentialsOptions *ssl;
+};
+static std::vector<Listener> listeners;
+static grpc::string default_addr("0.0.0.0:50051");
+
+/*
+ * helper functions
+ */
+
+static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
{
- vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
- vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
+ if (fnmatch(matcher->host, vl->host, 0))
+ return false;
+
+ if (fnmatch(matcher->plugin, vl->plugin, 0))
+ return false;
+ if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
+ return false;
+
+ if (fnmatch(matcher->type, vl->type, 0))
+ return false;
+ if (fnmatch(matcher->type_instance, vl->type_instance, 0))
+ return false;
+
+ return true;
+} /* ident_matches */
+
+static grpc::string read_file(const char *filename)
+{
+ std::ifstream f;
+ grpc::string s, content;
+ f.open(filename);
+ if (!f.is_open()) {
+ ERROR("grpc: Failed to open '%s'", filename);
+ return "";
+ }
+
+ while (std::getline(f, s)) {
+ content += s;
+ content.push_back('\n');
+ }
+ f.close();
+ return content;
+} /* read_file */
+
+/*
+ * proto conversion
+ */
+
+static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
+{
+ msg->set_host(vl->host);
+ msg->set_plugin(vl->plugin);
+ if (vl->plugin_instance[0] != '\0')
+ msg->set_plugin_instance(vl->plugin_instance);
+ msg->set_type(vl->type);
+ if (vl->type_instance[0] != '\0')
+ msg->set_type_instance(vl->type_instance);
+} /* marshal_ident */
+
+static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
+ bool require_fields)
+{
std::string s;
s = msg.host();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing host name"));
sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
s = msg.plugin();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing plugin name"));
sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
s = msg.type();
- if (!s.length())
+ if (!s.length() && require_fields)
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("missing type name"));
sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
s = msg.plugin_instance();
- if (s.length())
- sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
+ sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
s = msg.type_instance();
- if (s.length())
- sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
+ sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
+
+ return grpc::Status::OK;
+} /* unmarshal_ident() */
+
+static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
+{
+ auto id = msg->mutable_identifier();
+ marshal_ident(vl, id);
+
+ auto ds = plugin_get_ds(vl->type);
+ if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve data-set for values"));
+ }
+
+ auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
+ auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
+ msg->set_allocated_time(new google::protobuf::Timestamp(t));
+ msg->set_allocated_interval(new google::protobuf::Duration(d));
+
+ for (size_t i = 0; i < vl->values_len; ++i) {
+ auto v = msg->add_value();
+ switch (ds->ds[i].type) {
+ case DS_TYPE_COUNTER:
+ v->set_counter(vl->values[i].counter);
+ break;
+ case DS_TYPE_GAUGE:
+ v->set_gauge(vl->values[i].gauge);
+ break;
+ case DS_TYPE_DERIVE:
+ v->set_derive(vl->values[i].derive);
+ break;
+ case DS_TYPE_ABSOLUTE:
+ v->set_absolute(vl->values[i].absolute);
+ break;
+ default:
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("unknown value type"));
+ }
+ }
+
+ return grpc::Status::OK;
+} /* marshal_value_list */
+
+static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
+{
+ vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
+ vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
+
+ auto status = unmarshal_ident(msg.identifier(), vl, true);
+ if (!status.ok())
+ return status;
value_t *values = NULL;
size_t values_len = 0;
- auto status = grpc::Status::OK;
+ status = grpc::Status::OK;
for (auto v : msg.value()) {
value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
if (!val) {
break;
default:
status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- grpc::string("unkown value type"));
+ grpc::string("unknown value type"));
break;
}
} /* unmarshal_value_list() */
/*
- * request call objects
+ * request call-backs and call objects
*/
+static grpc::Status Process(grpc::ServerContext *ctx,
+ DispatchValuesRequest request, DispatchValuesReply *reply)
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(request.values(), &vl);
+ if (!status.ok())
+ return status;
+
+ if (plugin_dispatch_values(&vl))
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
+ return status;
+} /* Process(): DispatchValues */
+
+static grpc::Status Process(grpc::ServerContext *ctx,
+ QueryValuesRequest request, QueryValuesReply *reply)
+{
+ uc_iter_t *iter;
+ char *name = NULL;
+
+ value_list_t matcher;
+ auto status = unmarshal_ident(request.identifier(), &matcher, false);
+ if (!status.ok())
+ return status;
+
+ if ((iter = uc_get_iterator()) == NULL) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to query values: cannot create iterator"));
+ }
+
+ status = grpc::Status::OK;
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t res;
+ if (parse_identifier_vl(name, &res) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+ break;
+ }
+
+ if (!ident_matches(&res, &matcher))
+ continue;
+
+ if (uc_iterator_get_time(iter, &res.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &res.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ break;
+ }
+ if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ break;
+ }
+
+ auto vl = reply->add_values();
+ status = marshal_value_list(&res, vl);
+ free(res.values);
+ if (!status.ok())
+ break;
+ }
+
+ uc_iterator_destroy(iter);
+
+ return status;
+} /* Process(): QueryValues */
+
class Call
{
public:
- Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+ Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
: service_(service), cq_(cq), status_(CREATE)
{ }
virtual void Process() = 0;
virtual void Finish() = 0;
- collectd::Collectd::AsyncService *service_;
+ Collectd::AsyncService *service_;
grpc::ServerCompletionQueue *cq_;
grpc::ServerContext ctx_;
CallStatus status_;
}; /* class Call */
-class DispatchValuesCall : public Call
+template<typename RequestT, typename ReplyT>
+class RpcCall final : public Call
{
+ typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
+ RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
+ grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+
public:
- DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
- : Call(service, cq), responder_(&ctx_)
+ RpcCall(Collectd::AsyncService *service,
+ CreatorT creator, grpc::ServerCompletionQueue *cq)
+ : Call(service, cq), creator_(creator), responder_(&ctx_)
{
Handle();
- } /* DispatchValuesCall() */
+ } /* RpcCall() */
- virtual ~DispatchValuesCall()
+ virtual ~RpcCall()
{ }
private:
void Create()
{
- service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+ (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
} /* Create() */
void Process()
{
// Add a new request object to the queue.
- new DispatchValuesCall(service_, cq_);
-
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(request_.values(), &vl);
- if (!status.ok()) {
- responder_.Finish(reply_, status, this);
- return;
- }
-
- if (plugin_dispatch_values(&vl))
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
-
+ new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
+ grpc::Status status = ::Process(&ctx_, request_, &reply_);
responder_.Finish(reply_, status, this);
} /* Process() */
delete this;
} /* Finish() */
- collectd::DispatchValuesRequest request_;
- collectd::DispatchValuesReply reply_;
-
- grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-}; /* class DispatchValuesCall */
-
-class ListValuesCall : public Call
-{
-public:
- ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
- : Call(service, cq), responder_(&ctx_)
- {
- Handle();
- } /* ListValuesCall() */
-
- virtual ~ListValuesCall()
- { }
-
-private:
- void Create()
- {
- service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
- } /* Create() */
-
- void Process()
- {
- new ListValuesCall(service_, cq_);
-
- char **names = NULL;
- cdtime_t *times = NULL;
- size_t i, n = 0;
+ CreatorT creator_;
- auto status = grpc::Status::OK;
- if (uc_get_names(&names, ×, &n)) {
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
- }
+ RequestT request_;
+ ReplyT reply_;
- for (i = 0; i < n; i++) {
- auto v = reply_.add_value();
- auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
- v->set_name(names[i]);
- v->set_allocated_time(new google::protobuf::Timestamp(t));
- sfree(names[i]);
- }
- sfree(names);
- sfree(times);
-
- responder_.Finish(reply_, status, this);
- } /* Process() */
-
- void Finish()
- {
- delete this;
- } /* Finish() */
-
- collectd::ListValuesRequest request_;
- collectd::ListValuesReply reply_;
-
- grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
-}; /* class ListValuesCall */
+ grpc::ServerAsyncResponseWriter<ReplyT> responder_;
+}; /* class RpcCall */
/*
* gRPC server implementation
public:
void Start()
{
- // TODO: make configurable
auto auth = grpc::InsecureServerCredentials();
grpc::ServerBuilder builder;
- if (!listeners_num) {
- std::string default_addr("0.0.0.0:50051");
+ if (listeners.empty()) {
builder.AddListeningPort(default_addr, auth);
INFO("grpc: Listening on %s", default_addr.c_str());
}
else {
- size_t i;
- for (i = 0; i < listeners_num; i++) {
- auto l = listeners[i];
- std::string addr(l.addr);
- addr += std::string(":") + std::string(l.port);
- builder.AddListeningPort(addr, auth);
- INFO("grpc: Listening on %s", addr.c_str());
+ for (auto l : listeners) {
+ grpc::string addr = l.addr + ":" + l.port;
+
+ auto use_ssl = grpc::string("");
+ auto a = auth;
+ if (l.ssl != nullptr) {
+ use_ssl = grpc::string(" (SSL enabled)");
+ a = grpc::SslServerCredentials(*l.ssl);
+ }
+
+ builder.AddListeningPort(addr, a);
+ INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
}
}
- builder.RegisterAsyncService(&service_);
+ builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
} /* Start() */
void Mainloop()
{
// Register request types.
- new DispatchValuesCall(&service_, cq_.get());
- new ListValuesCall(&service_, cq_.get());
+ new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
+ &Collectd::AsyncService::RequestDispatchValues, cq_.get());
+ new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
+ &Collectd::AsyncService::RequestQueryValues, cq_.get());
while (true) {
void *req = NULL;
} /* Mainloop() */
private:
- collectd::Collectd::AsyncService service_;
+ Collectd::AsyncService service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
static int c_grpc_config_listen(oconfig_item_t *ci)
{
- listener_t *listener;
- int i;
-
if ((ci->values_num != 2)
|| (ci->values[0].type != OCONFIG_TYPE_STRING)
|| (ci->values[1].type != OCONFIG_TYPE_STRING)) {
return -1;
}
- listener = (listener_t *)realloc(listeners,
- (listeners_num + 1) * sizeof(*listeners));
- if (!listener) {
- ERROR("grpc: Failed to allocate listeners");
- return -1;
- }
- listeners = listener;
- listener = listeners + listeners_num;
- listeners_num++;
+ auto listener = Listener();
+ listener.addr = grpc::string(ci->values[0].value.string);
+ listener.port = grpc::string(ci->values[1].value.string);
+ listener.ssl = nullptr;
- listener->addr = strdup(ci->values[0].value.string);
- listener->port = strdup(ci->values[1].value.string);
+ auto ssl_opts = new(grpc::SslServerCredentialsOptions);
+ grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
+ bool use_ssl = false;
- for (i = 0; i < ci->children_num; i++) {
+ for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *child = ci->children + i;
- WARNING("grpc: Option `%s` not allowed in <%s> block.",
- child->key, ci->key);
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ ERROR("grpc: Option `%s` expects a boolean value",
+ child->key);
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLRootCerts", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ ssl_opts->pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLServerKey", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLServerCert", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
}
+ ssl_opts->pem_key_cert_pairs.push_back(pkcp);
+ if (use_ssl)
+ listener.ssl = ssl_opts;
+ else
+ delete(ssl_opts);
+
+ listeners.push_back(listener);
return 0;
} /* c_grpc_config_listen() */
server->Start();
for (i = 0; i < workers_num; i++) {
- pthread_create(&workers[i], /* attr = */ NULL,
+ plugin_thread_create(&workers[i], /* attr = */ NULL,
worker_thread, server);
}
INFO("grpc: Started %zu workers", workers_num);