From b43eedf73eab4848a94c42cc8b9cff3349f37171 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Tue, 3 Nov 2015 00:03:11 +0100 Subject: [PATCH] grpc plugin: Make RPC call implementation more modular. Use a template class for the RPC call object implementation to handle all common functionality. The actual RPC implementation has been moved into overloaded functions, one for each RPC. This approach only works as long as the request and response type pairs are unique per call. That's common practice, though, and in case there's an exception, it would have to fall back to the previous approach (one class per call). --- src/grpc.cc | 156 ++++++++++++++++++++++++++++-------------------------------- 1 file changed, 72 insertions(+), 84 deletions(-) diff --git a/src/grpc.cc b/src/grpc.cc index abdd6b02..2f79f42d 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -49,6 +49,13 @@ extern "C" { static size_t listeners_num; } +using collectd::Collectd; + +using collectd::DispatchValuesRequest; +using collectd::DispatchValuesReply; +using collectd::ListValuesRequest; +using collectd::ListValuesReply; + using google::protobuf::util::TimeUtil; /* @@ -138,13 +145,51 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, } /* unmarshal_value_list() */ /* - * request call objects + * request call-backs and call objects */ +static grpc::Status Process(grpc::ServerContext *ctx, + DispatchValuesRequest request, DispatchValuesReply *reply) +{ + value_list_t vl = VALUE_LIST_INIT; + auto status = unmarshal_value_list(request.values(), &vl); + if (!status.ok()) + return status; + + if (plugin_dispatch_values(&vl)) + status = grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to enqueue values for writing")); + return status; +} /* Process(): DispatchValues */ + +static grpc::Status Process(grpc::ServerContext *ctx, + ListValuesRequest request, ListValuesReply *reply) +{ + char **names = NULL; + cdtime_t *times = NULL; + size_t i, n = 0; + + if (uc_get_names(&names, ×, &n)) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve values")); + + for (i = 0; i < n; i++) { + auto v = reply->add_value(); + auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i])); + v->set_name(names[i]); + v->set_allocated_time(new google::protobuf::Timestamp(t)); + sfree(names[i]); + } + sfree(names); + sfree(times); + + return grpc::Status::OK; +} /* Process(): ListValues */ + class Call { public: - Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) + Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) : service_(service), cq_(cq), status_(CREATE) { } @@ -172,7 +217,7 @@ protected: virtual void Process() = 0; virtual void Finish() = 0; - collectd::Collectd::AsyncService *service_; + Collectd::AsyncService *service_; grpc::ServerCompletionQueue *cq_; grpc::ServerContext ctx_; @@ -181,40 +226,35 @@ private: CallStatus status_; }; /* class Call */ -class DispatchValuesCall : public Call +template +class RpcCall final : public Call { + typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *, + RequestT *, grpc::ServerAsyncResponseWriter *, + grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *); + public: - DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) - : Call(service, cq), responder_(&ctx_) + RpcCall(Collectd::AsyncService *service, + CreatorT creator, grpc::ServerCompletionQueue *cq) + : Call(service, cq), creator_(creator), responder_(&ctx_) { Handle(); - } /* DispatchValuesCall() */ + } /* RpcCall() */ - virtual ~DispatchValuesCall() + virtual ~RpcCall() { } private: void Create() { - service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this); + (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this); } /* Create() */ void Process() { // Add a new request object to the queue. - new DispatchValuesCall(service_, cq_); - - value_list_t vl = VALUE_LIST_INIT; - auto status = unmarshal_value_list(request_.values(), &vl); - if (!status.ok()) { - responder_.Finish(reply_, status, this); - return; - } - - if (plugin_dispatch_values(&vl)) - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to enqueue values for writing")); - + new RpcCall(service_, creator_, cq_); + grpc::Status status = ::Process(&ctx_, request_, &reply_); responder_.Finish(reply_, status, this); } /* Process() */ @@ -223,67 +263,13 @@ private: delete this; } /* Finish() */ - collectd::DispatchValuesRequest request_; - collectd::DispatchValuesReply reply_; - - grpc::ServerAsyncResponseWriter responder_; -}; /* class DispatchValuesCall */ - -class ListValuesCall : public Call -{ -public: - ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq) - : Call(service, cq), responder_(&ctx_) - { - Handle(); - } /* ListValuesCall() */ - - virtual ~ListValuesCall() - { } - -private: - void Create() - { - service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this); - } /* Create() */ - - void Process() - { - new ListValuesCall(service_, cq_); - - char **names = NULL; - cdtime_t *times = NULL; - size_t i, n = 0; - - auto status = grpc::Status::OK; - if (uc_get_names(&names, ×, &n)) { - status = grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to retrieve values")); - } - - for (i = 0; i < n; i++) { - auto v = reply_.add_value(); - auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i])); - v->set_name(names[i]); - v->set_allocated_time(new google::protobuf::Timestamp(t)); - sfree(names[i]); - } - sfree(names); - sfree(times); - - responder_.Finish(reply_, status, this); - } /* Process() */ - - void Finish() - { - delete this; - } /* Finish() */ + CreatorT creator_; - collectd::ListValuesRequest request_; - collectd::ListValuesReply reply_; + RequestT request_; + ReplyT reply_; - grpc::ServerAsyncResponseWriter responder_; -}; /* class ListValuesCall */ + grpc::ServerAsyncResponseWriter responder_; +}; /* class RpcCall */ /* * gRPC server implementation @@ -329,8 +315,10 @@ public: void Mainloop() { // Register request types. - new DispatchValuesCall(&service_, cq_.get()); - new ListValuesCall(&service_, cq_.get()); + new RpcCall(&service_, + &Collectd::AsyncService::RequestDispatchValues, cq_.get()); + new RpcCall(&service_, + &Collectd::AsyncService::RequestListValues, cq_.get()); while (true) { void *req = NULL; @@ -348,7 +336,7 @@ public: } /* Mainloop() */ private: - collectd::Collectd::AsyncService service_; + Collectd::AsyncService service_; std::unique_ptr server_; std::unique_ptr cq_; -- 2.11.0