#include <grpc++/grpc++.h>
#include <google/protobuf/util/time_util.h>
+#include <fstream>
+#include <iostream>
+#include <queue>
#include <vector>
#include "collectd.grpc.pb.h"
extern "C" {
#include <fnmatch.h>
#include <stdbool.h>
-#include <pthread.h>
#include "collectd.h"
#include "common.h"
}
using collectd::Collectd;
+using collectd::Dispatch;
using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesReply;
+using collectd::DispatchValuesResponse;
using collectd::QueryValuesRequest;
-using collectd::QueryValuesReply;
+using collectd::QueryValuesResponse;
using google::protobuf::util::TimeUtil;
struct Listener {
grpc::string addr;
grpc::string port;
+
+ grpc::SslServerCredentialsOptions *ssl;
};
static std::vector<Listener> listeners;
static grpc::string default_addr("0.0.0.0:50051");
return true;
} /* ident_matches */
+static grpc::string read_file(const char *filename)
+{
+ std::ifstream f;
+ grpc::string s, content;
+
+ f.open(filename);
+ if (!f.is_open()) {
+ ERROR("grpc: Failed to open '%s'", filename);
+ return "";
+ }
+
+ while (std::getline(f, s)) {
+ content += s;
+ content.push_back('\n');
+ }
+ f.close();
+ return content;
+} /* read_file */
+
/*
* proto conversion
*/
msg->set_allocated_interval(new google::protobuf::Duration(d));
for (size_t i = 0; i < vl->values_len; ++i) {
- auto v = msg->add_value();
+ auto v = msg->add_values();
switch (ds->ds[i].type) {
case DS_TYPE_COUNTER:
v->set_counter(vl->values[i].counter);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown value type"));
}
+
+ auto name = msg->add_ds_names();
+ name->assign(ds->ds[i].name);
}
return grpc::Status::OK;
size_t values_len = 0;
status = grpc::Status::OK;
- for (auto v : msg.value()) {
+ for (auto v : msg.values()) {
value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
if (!val) {
status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
/*
* request call-backs and call objects
*/
-
-static grpc::Status Process(grpc::ServerContext *ctx,
- DispatchValuesRequest request, DispatchValuesReply *reply)
+static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesResponse *reply)
{
value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(request.values(), &vl);
+ auto status = unmarshal_value_list(request.value_list(), &vl);
if (!status.ok())
return status;
if (plugin_dispatch_values(&vl))
status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to enqueue values for writing"));
+
+ reply->Clear();
return status;
-} /* Process(): DispatchValues */
+} /* grpc::Status DispatchValue */
-static grpc::Status Process(grpc::ServerContext *ctx,
- QueryValuesRequest request, QueryValuesReply *reply)
-{
- uc_iter_t *iter;
- char *name = NULL;
+// CallData is the abstract base class for asynchronous calls.
+class CallData {
+public:
+ virtual ~CallData() {}
+ virtual void process(bool ok) = 0;
- value_list_t matcher;
- auto status = unmarshal_ident(request.identifier(), &matcher, false);
- if (!status.ok())
- return status;
+protected:
+ CallData() {}
- if ((iter = uc_get_iterator()) == NULL) {
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to query values: cannot create iterator"));
- }
+private:
+ CallData(const CallData&) = delete;
+ CallData& operator=(const CallData&) = delete;
+};
- while (uc_iterator_next(iter, &name) == 0) {
- value_list_t res;
- if (parse_identifier_vl(name, &res) != 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to parse identifier"));
+/*
+ * Collectd service
+ */
- if (!ident_matches(&res, &matcher))
- continue;
+// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
+class QueryValuesCallData : public CallData {
+public:
+ QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
+ : cq_(cq), service_(service), writer_(&context_) {
+ process(true);
+ }
- if (uc_iterator_get_time(iter, &res.time) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value timestamp"));
- if (uc_iterator_get_interval(iter, &res.interval) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value interval"));
- if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
+ void process(bool ok) final {
+ if (status_ == Status::INIT) {
+ status_ = Status::READ;
+ service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+ } else if (status_ == Status::READ) {
+ auto err = queryValues();
+ if (!err.ok()) {
+ writer_.Finish(err, this);
+ status_ = Status::DONE;
+ return;
+ }
+ respond();
+ } else if (status_ == Status::WRITE) {
+ respond();
+ } else if (status_ == Status::DONE) {
+ new QueryValuesCallData(service_, cq_);
+ delete this;
+ } else {
+ throw std::logic_error("Unhandled state enum.");
+ }
+ }
- auto vl = reply->add_values();
- status = marshal_value_list(&res, vl);
- free(res.values);
+private:
+ enum class Status {
+ INIT,
+ READ,
+ WRITE,
+ DONE,
+ };
+
+ grpc::Status queryValues (void) {
+ uc_iter_t *iter;
+ char *name = NULL;
+
+ value_list_t matcher;
+ auto status = unmarshal_ident(request_.identifier(), &matcher, false);
if (!status.ok())
return status;
- }
- uc_iterator_destroy(iter);
+ if ((iter = uc_get_iterator()) == NULL) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to query values: cannot create iterator"));
+ }
- return grpc::Status::OK;
-} /* Process(): QueryValues */
+ status = grpc::Status::OK;
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t vl;
+ if (parse_identifier_vl(name, &vl) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+ break;
+ }
-class Call
-{
-public:
- Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
- : service_(service), cq_(cq), status_(CREATE)
- { }
+ if (!ident_matches(&vl, &matcher))
+ continue;
- virtual ~Call()
- { }
+ if (uc_iterator_get_time(iter, &vl.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ break;
+ }
+ if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ break;
+ }
- void Handle()
- {
- if (status_ == CREATE) {
- Create();
- status_ = PROCESS;
- }
- else if (status_ == PROCESS) {
- Process();
- status_ = FINISH;
- }
- else {
- GPR_ASSERT(status_ == FINISH);
- Finish();
+ value_lists_.push(vl);
}
- } /* Handle() */
-protected:
- virtual void Create() = 0;
- virtual void Process() = 0;
- virtual void Finish() = 0;
-
- Collectd::AsyncService *service_;
- grpc::ServerCompletionQueue *cq_;
- grpc::ServerContext ctx_;
-
-private:
- enum CallStatus { CREATE, PROCESS, FINISH };
- CallStatus status_;
-}; /* class Call */
+ uc_iterator_destroy(iter);
+ return status;
+ }
-template<typename RequestT, typename ReplyT>
-class RpcCall final : public Call
-{
- typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
- RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
- grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+ void respond() {
+ if (value_lists_.empty()) {
+ writer_.Finish(grpc::Status::OK, this);
+ status_ = Status::DONE;
+ return;
+ }
-public:
- RpcCall(Collectd::AsyncService *service,
- CreatorT creator, grpc::ServerCompletionQueue *cq)
- : Call(service, cq), creator_(creator), responder_(&ctx_)
- {
- Handle();
- } /* RpcCall() */
+ auto vl = value_lists_.front();
- virtual ~RpcCall()
- { }
+ response_.Clear();
+ auto err = marshal_value_list(&vl, response_.mutable_value_list());
+ if (!err.ok()) {
+ writer_.Finish(err, this);
+ status_ = Status::DONE;
+ return;
+ }
-private:
- void Create()
- {
- (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
- } /* Create() */
+ value_lists_.pop();
+ free(vl.values);
- void Process()
- {
- // Add a new request object to the queue.
- new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
- grpc::Status status = ::Process(&ctx_, request_, &reply_);
- responder_.Finish(reply_, status, this);
- } /* Process() */
+ writer_.Write(response_, this);
+ status_ = Status::WRITE;
+ }
- void Finish()
- {
- delete this;
- } /* Finish() */
+ Status status_ = Status::INIT;
+ grpc::ServerContext context_;
+ grpc::ServerCompletionQueue* cq_;
+ Collectd::AsyncService* service_;
+ QueryValuesRequest request_;
+ std::queue<value_list_t> value_lists_;
+ QueryValuesResponse response_;
+ grpc::ServerAsyncWriter<QueryValuesResponse> writer_;
+};
- CreatorT creator_;
+/*
+ * Dispatch service
+ */
+// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
+class DispatchValuesCallData : public CallData {
+public:
+ DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
+ : cq_(cq), service_(service), reader_(&context_) {
+ process(true);
+ }
- RequestT request_;
- ReplyT reply_;
+ void process(bool ok) final {
+ if (status == Status::INIT) {
+ service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
+ status = Status::CALL;
+ } else if (status == Status::CALL) {
+ reader_.Read(&request_, this);
+ status = Status::READ;
+ } else if (status == Status::READ && ok) {
+ (void) DispatchValue(&context_, request_, &response_);
+
+ reader_.Read(&request_, this);
+ } else if (status == Status::READ) {
+ response_.Clear();
+
+ status = Status::DONE;
+ } else if (status == Status::DONE) {
+ new DispatchValuesCallData(service_, cq_);
+ delete this;
+ } else {
+ ERROR("grpc: DispatchValuesCallData: invalid state");
+ }
+ }
- grpc::ServerAsyncResponseWriter<ReplyT> responder_;
-}; /* class RpcCall */
+private:
+ enum class Status {
+ INIT,
+ CALL,
+ READ,
+ DONE,
+ };
+ Status status = Status::INIT;
+
+ grpc::ServerContext context_;
+ grpc::ServerCompletionQueue* cq_;
+ Dispatch::AsyncService* service_;
+
+ DispatchValuesRequest request_;
+ DispatchValuesResponse response_;
+ grpc::ServerAsyncReader<DispatchValuesResponse, DispatchValuesRequest> reader_;
+};
/*
* gRPC server implementation
*/
-
class CollectdServer final
{
public:
void Start()
{
- // TODO: make configurable
auto auth = grpc::InsecureServerCredentials();
grpc::ServerBuilder builder;
else {
for (auto l : listeners) {
grpc::string addr = l.addr + ":" + l.port;
- builder.AddListeningPort(addr, auth);
- INFO("grpc: Listening on %s", addr.c_str());
+
+ auto use_ssl = grpc::string("");
+ auto a = auth;
+ if (l.ssl != nullptr) {
+ use_ssl = grpc::string(" (SSL enabled)");
+ a = grpc::SslServerCredentials(*l.ssl);
+ }
+
+ builder.AddListeningPort(addr, a);
+ INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
}
}
- builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
+
+ builder.RegisterService(&collectd_service_);
+ builder.RegisterService(&dispatch_service_);
+
server_ = builder.BuildAndStart();
+ new QueryValuesCallData(&collectd_service_, cq_.get());
+ new DispatchValuesCallData(&dispatch_service_, cq_.get());
} /* Start() */
void Shutdown()
void Mainloop()
{
- // Register request types.
- new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
- &Collectd::AsyncService::RequestDispatchValues, cq_.get());
- new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
- &Collectd::AsyncService::RequestQueryValues, cq_.get());
-
while (true) {
- void *req = NULL;
+ void *tag = NULL;
bool ok = false;
- if (!cq_->Next(&req, &ok))
+ // Block waiting to read the next event from the completion queue.
+ // The event is uniquely identified by its tag, which in this case
+ // is the memory address of a CallData instance.
+ if (!cq_->Next(&tag, &ok))
break; // Queue shut down.
- if (!ok) {
- ERROR("grpc: Failed to read from queue");
- break;
- }
- static_cast<Call *>(req)->Handle();
+ static_cast<CallData*>(tag)->process(ok);
}
} /* Mainloop() */
private:
- Collectd::AsyncService service_;
+ Collectd::AsyncService collectd_service_;
+ Dispatch::AsyncService dispatch_service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
/*
* collectd plugin interface
*/
-
extern "C" {
static pthread_t *workers;
static size_t workers_num = 5;
auto listener = Listener();
listener.addr = grpc::string(ci->values[0].value.string);
listener.port = grpc::string(ci->values[1].value.string);
- listeners.push_back(listener);
+ listener.ssl = nullptr;
+
+ auto ssl_opts = new(grpc::SslServerCredentialsOptions);
+ grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
+ bool use_ssl = false;
for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *child = ci->children + i;
- WARNING("grpc: Option `%s` not allowed in <%s> block.",
- child->key, ci->key);
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ ERROR("grpc: Option `%s` expects a boolean value",
+ child->key);
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLRootCerts", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ ssl_opts->pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLServerKey", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLServerCert", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
}
+ ssl_opts->pem_key_cert_pairs.push_back(pkcp);
+ if (use_ssl)
+ listener.ssl = ssl_opts;
+ else
+ delete(ssl_opts);
+
+ listeners.push_back(listener);
return 0;
} /* c_grpc_config_listen() */