#include <fstream>
#include <iostream>
+#include <queue>
#include <vector>
#include "collectd.grpc.pb.h"
#include "collectd.h"
#include "common.h"
-#include "configfile.h"
#include "plugin.h"
#include "daemon/utils_cache.h"
using collectd::Collectd;
using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesReply;
+using collectd::DispatchValuesResponse;
using collectd::QueryValuesRequest;
-using collectd::QueryValuesReply;
+using collectd::QueryValuesResponse;
using google::protobuf::util::TimeUtil;
msg->set_allocated_interval(new google::protobuf::Duration(d));
for (size_t i = 0; i < vl->values_len; ++i) {
- auto v = msg->add_value();
+ auto v = msg->add_values();
switch (ds->ds[i].type) {
case DS_TYPE_COUNTER:
v->set_counter(vl->values[i].counter);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown value type"));
}
+
+ auto name = msg->add_ds_names();
+ name->assign(ds->ds[i].name);
}
return grpc::Status::OK;
size_t values_len = 0;
status = grpc::Status::OK;
- for (auto v : msg.value()) {
+ for (auto v : msg.values()) {
value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
if (!val) {
status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
} /* unmarshal_value_list() */
/*
- * request call-backs and call objects
+ * Collectd service
*/
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+ grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+ value_list_t match;
+ auto status = unmarshal_ident(req->identifier(), &match, false);
+ if (!status.ok()) {
+ return status;
+ }
-static grpc::Status Process(grpc::ServerContext *ctx,
- DispatchValuesRequest request, DispatchValuesReply *reply)
-{
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(request.values(), &vl);
- if (!status.ok())
- return status;
-
- if (plugin_dispatch_values(&vl))
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
- return status;
-} /* Process(): DispatchValues */
+ std::queue<value_list_t> value_lists;
+ status = this->queryValuesRead(&match, &value_lists);
+ if (status.ok()) {
+ status = this->queryValuesWrite(ctx, writer, &value_lists);
+ }
-static grpc::Status Process(grpc::ServerContext *ctx,
- QueryValuesRequest request, QueryValuesReply *reply)
-{
- uc_iter_t *iter;
- char *name = NULL;
+ while (!value_lists.empty()) {
+ auto vl = value_lists.front();
+ value_lists.pop();
+ sfree(vl.values);
+ }
- value_list_t matcher;
- auto status = unmarshal_ident(request.identifier(), &matcher, false);
- if (!status.ok())
return status;
-
- if ((iter = uc_get_iterator()) == NULL) {
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to query values: cannot create iterator"));
}
- while (uc_iterator_next(iter, &name) == 0) {
- value_list_t res;
- if (parse_identifier_vl(name, &res) != 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to parse identifier"));
+ grpc::Status DispatchValues(grpc::ServerContext *ctx,
+ grpc::ServerReader<DispatchValuesRequest> *reader,
+ DispatchValuesResponse *res) override {
+ DispatchValuesRequest req;
- if (!ident_matches(&res, &matcher))
- continue;
+ while (reader->Read(&req)) {
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(req.value_list(), &vl);
+ if (!status.ok())
+ return status;
- if (uc_iterator_get_time(iter, &res.time) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value timestamp"));
- if (uc_iterator_get_interval(iter, &res.interval) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve value interval"));
- if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to retrieve values"));
+ if (plugin_dispatch_values(&vl))
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
+ }
- auto vl = reply->add_values();
- status = marshal_value_list(&res, vl);
- free(res.values);
- if (!status.ok())
- return status;
+ res->Clear();
+ return grpc::Status::OK;
}
- uc_iterator_destroy(iter);
-
- return grpc::Status::OK;
-} /* Process(): QueryValues */
-
-class Call
-{
-public:
- Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
- : service_(service), cq_(cq), status_(CREATE)
- { }
-
- virtual ~Call()
- { }
-
- void Handle()
- {
- if (status_ == CREATE) {
- Create();
- status_ = PROCESS;
- }
- else if (status_ == PROCESS) {
- Process();
- status_ = FINISH;
- }
- else {
- GPR_ASSERT(status_ == FINISH);
- Finish();
+private:
+ grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+ uc_iter_t *iter;
+ if ((iter = uc_get_iterator()) == NULL) {
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to query values: cannot create iterator"));
}
- } /* Handle() */
-
-protected:
- virtual void Create() = 0;
- virtual void Process() = 0;
- virtual void Finish() = 0;
-
- Collectd::AsyncService *service_;
- grpc::ServerCompletionQueue *cq_;
- grpc::ServerContext ctx_;
-private:
- enum CallStatus { CREATE, PROCESS, FINISH };
- CallStatus status_;
-}; /* class Call */
+ grpc::Status status = grpc::Status::OK;
+ char *name = NULL;
+ while (uc_iterator_next(iter, &name) == 0) {
+ value_list_t vl;
+ if (parse_identifier_vl(name, &vl) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to parse identifier"));
+ break;
+ }
-template<typename RequestT, typename ReplyT>
-class RpcCall final : public Call
-{
- typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
- RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
- grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+ if (!ident_matches(&vl, match))
+ continue;
-public:
- RpcCall(Collectd::AsyncService *service,
- CreatorT creator, grpc::ServerCompletionQueue *cq)
- : Call(service, cq), creator_(creator), responder_(&ctx_)
- {
- Handle();
- } /* RpcCall() */
-
- virtual ~RpcCall()
- { }
+ if (uc_iterator_get_time(iter, &vl.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value timestamp"));
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve value interval"));
+ break;
+ }
+ if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+ break;
+ }
-private:
- void Create()
- {
- (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
- } /* Create() */
+ value_lists->push(vl);
+ } // while (uc_iterator_next(iter, &name) == 0)
- void Process()
- {
- // Add a new request object to the queue.
- new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
- grpc::Status status = ::Process(&ctx_, request_, &reply_);
- responder_.Finish(reply_, status, this);
- } /* Process() */
+ uc_iterator_destroy(iter);
+ return status;
+ }
- void Finish()
- {
- delete this;
- } /* Finish() */
+ grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+ grpc::ServerWriter<QueryValuesResponse> *writer,
+ std::queue<value_list_t> *value_lists) {
+ while (!value_lists->empty()) {
+ auto vl = value_lists->front();
+ QueryValuesResponse res;
+ res.Clear();
+
+ auto status = marshal_value_list(&vl, res.mutable_value_list());
+ if (!status.ok()) {
+ return status;
+ }
- CreatorT creator_;
+ if (!writer->Write(res)) {
+ return grpc::Status::CANCELLED;
+ }
- RequestT request_;
- ReplyT reply_;
+ value_lists->pop();
+ sfree(vl.values);
+ }
- grpc::ServerAsyncResponseWriter<ReplyT> responder_;
-}; /* class RpcCall */
+ return grpc::Status::OK;
+ }
+};
/*
* gRPC server implementation
*/
-
class CollectdServer final
{
public:
}
}
- builder.RegisterService(&service_);
- cq_ = builder.AddCompletionQueue();
+ builder.RegisterService(&collectd_service_);
+
server_ = builder.BuildAndStart();
} /* Start() */
void Shutdown()
{
server_->Shutdown();
- cq_->Shutdown();
} /* Shutdown() */
- void Mainloop()
- {
- // Register request types.
- new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
- &Collectd::AsyncService::RequestDispatchValues, cq_.get());
- new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
- &Collectd::AsyncService::RequestQueryValues, cq_.get());
-
- while (true) {
- void *req = NULL;
- bool ok = false;
-
- if (!cq_->Next(&req, &ok))
- break; // Queue shut down.
- if (!ok) {
- ERROR("grpc: Failed to read from queue");
- break;
- }
-
- static_cast<Call *>(req)->Handle();
- }
- } /* Mainloop() */
-
private:
- Collectd::AsyncService service_;
+ CollectdImpl collectd_service_;
std::unique_ptr<grpc::Server> server_;
- std::unique_ptr<grpc::ServerCompletionQueue> cq_;
}; /* class CollectdServer */
static CollectdServer *server = nullptr;
/*
* collectd plugin interface
*/
-
extern "C" {
- static pthread_t *workers;
- static size_t workers_num = 5;
-
- static void *worker_thread(void *arg)
- {
- CollectdServer *s = (CollectdServer *)arg;
- s->Mainloop();
- return NULL;
- } /* worker_thread() */
-
static int c_grpc_config_listen(oconfig_item_t *ci)
{
if ((ci->values_num != 2)
if (c_grpc_config_listen(child))
return -1;
}
- else if (!strcasecmp("WorkerThreads", child->key)) {
- int n;
- if (cf_util_get_int(child, &n))
- return -1;
- workers_num = (size_t)n;
- }
else {
WARNING("grpc: Option `%s` not allowed here.", child->key);
}
static int c_grpc_init(void)
{
server = new CollectdServer();
- size_t i;
-
- if (! server) {
+ if (!server) {
ERROR("grpc: Failed to create server");
return -1;
}
- workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
- if (! workers) {
- delete server;
- server = nullptr;
-
- ERROR("grpc: Failed to allocate worker threads");
- return -1;
- }
-
server->Start();
- for (i = 0; i < workers_num; i++) {
- plugin_thread_create(&workers[i], /* attr = */ NULL,
- worker_thread, server);
- }
- INFO("grpc: Started %zu workers", workers_num);
return 0;
} /* c_grpc_init() */
static int c_grpc_shutdown(void)
{
- size_t i;
-
if (!server)
- return -1;
+ return 0;
server->Shutdown();
- INFO("grpc: Waiting for %zu workers to terminate", workers_num);
- for (i = 0; i < workers_num; i++)
- pthread_join(workers[i], NULL);
- free(workers);
- workers = NULL;
- workers_num = 0;
-
delete server;
server = nullptr;