grpc plugin: Make RPC call implementation more modular.
authorSebastian Harl <sh@tokkee.org>
Mon, 2 Nov 2015 23:03:11 +0000 (00:03 +0100)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
Use a template class for the RPC call object implementation to handle all
common functionality. The actual RPC implementation has been moved into
overloaded functions, one for each RPC. This approach only works as long as
the request and response type pairs are unique per call. That's common
practice, though, and in case there's an exception, it would have to fall back
to the previous approach (one class per call).

src/grpc.cc

index abdd6b0..2f79f42 100644 (file)
@@ -49,6 +49,13 @@ extern "C" {
        static size_t listeners_num;
 }
 
+using collectd::Collectd;
+
+using collectd::DispatchValuesRequest;
+using collectd::DispatchValuesReply;
+using collectd::ListValuesRequest;
+using collectd::ListValuesReply;
+
 using google::protobuf::util::TimeUtil;
 
 /*
@@ -138,13 +145,51 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call objects
+ * request call-backs and call objects
  */
 
+static grpc::Status Process(grpc::ServerContext *ctx,
+               DispatchValuesRequest request, DispatchValuesReply *reply)
+{
+       value_list_t vl = VALUE_LIST_INIT;
+       auto status = unmarshal_value_list(request.values(), &vl);
+       if (!status.ok())
+               return status;
+
+       if (plugin_dispatch_values(&vl))
+               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                               grpc::string("failed to enqueue values for writing"));
+       return status;
+} /* Process(): DispatchValues */
+
+static grpc::Status Process(grpc::ServerContext *ctx,
+               ListValuesRequest request, ListValuesReply *reply)
+{
+       char **names = NULL;
+       cdtime_t *times = NULL;
+       size_t i, n = 0;
+
+       if (uc_get_names(&names, &times, &n))
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                               grpc::string("failed to retrieve values"));
+
+       for (i = 0; i < n; i++) {
+               auto v = reply->add_value();
+               auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
+               v->set_name(names[i]);
+               v->set_allocated_time(new google::protobuf::Timestamp(t));
+               sfree(names[i]);
+       }
+       sfree(names);
+       sfree(times);
+
+       return grpc::Status::OK;
+} /* Process(): ListValues */
+
 class Call
 {
 public:
-       Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+       Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
                : service_(service), cq_(cq), status_(CREATE)
        { }
 
@@ -172,7 +217,7 @@ protected:
        virtual void Process() = 0;
        virtual void Finish() = 0;
 
-       collectd::Collectd::AsyncService *service_;
+       Collectd::AsyncService *service_;
        grpc::ServerCompletionQueue *cq_;
        grpc::ServerContext ctx_;
 
@@ -181,40 +226,35 @@ private:
        CallStatus status_;
 }; /* class Call */
 
-class DispatchValuesCall : public Call
+template<typename RequestT, typename ReplyT>
+class RpcCall final : public Call
 {
+       typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
+                       RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
+                       grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+
 public:
-       DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), responder_(&ctx_)
+       RpcCall(Collectd::AsyncService *service,
+                       CreatorT creator, grpc::ServerCompletionQueue *cq)
+               : Call(service, cq), creator_(creator), responder_(&ctx_)
        {
                Handle();
-       } /* DispatchValuesCall() */
+       } /* RpcCall() */
 
-       virtual ~DispatchValuesCall()
+       virtual ~RpcCall()
        { }
 
 private:
        void Create()
        {
-               service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+               (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
        } /* Create() */
 
        void Process()
        {
                // Add a new request object to the queue.
-               new DispatchValuesCall(service_, cq_);
-
-               value_list_t vl = VALUE_LIST_INIT;
-               auto status = unmarshal_value_list(request_.values(), &vl);
-               if (!status.ok()) {
-                       responder_.Finish(reply_, status, this);
-                       return;
-               }
-
-               if (plugin_dispatch_values(&vl))
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to enqueue values for writing"));
-
+               new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
+               grpc::Status status = ::Process(&ctx_, request_, &reply_);
                responder_.Finish(reply_, status, this);
        } /* Process() */
 
@@ -223,67 +263,13 @@ private:
                delete this;
        } /* Finish() */
 
-       collectd::DispatchValuesRequest request_;
-       collectd::DispatchValuesReply reply_;
-
-       grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-}; /* class DispatchValuesCall */
-
-class ListValuesCall : public Call
-{
-public:
-       ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), responder_(&ctx_)
-       {
-               Handle();
-       } /* ListValuesCall() */
-
-       virtual ~ListValuesCall()
-       { }
-
-private:
-       void Create()
-       {
-               service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
-       } /* Create() */
-
-       void Process()
-       {
-               new ListValuesCall(service_, cq_);
-
-               char **names = NULL;
-               cdtime_t *times = NULL;
-               size_t i, n = 0;
-
-               auto status = grpc::Status::OK;
-               if (uc_get_names(&names, &times, &n)) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-               }
-
-               for (i = 0; i < n; i++) {
-                       auto v = reply_.add_value();
-                       auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
-                       v->set_name(names[i]);
-                       v->set_allocated_time(new google::protobuf::Timestamp(t));
-                       sfree(names[i]);
-               }
-               sfree(names);
-               sfree(times);
-
-               responder_.Finish(reply_, status, this);
-       } /* Process() */
-
-       void Finish()
-       {
-               delete this;
-       } /* Finish() */
+       CreatorT creator_;
 
-       collectd::ListValuesRequest request_;
-       collectd::ListValuesReply reply_;
+       RequestT request_;
+       ReplyT reply_;
 
-       grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
-}; /* class ListValuesCall */
+       grpc::ServerAsyncResponseWriter<ReplyT> responder_;
+}; /* class RpcCall */
 
 /*
  * gRPC server implementation
@@ -329,8 +315,10 @@ public:
        void Mainloop()
        {
                // Register request types.
-               new DispatchValuesCall(&service_, cq_.get());
-               new ListValuesCall(&service_, cq_.get());
+               new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
+                               &Collectd::AsyncService::RequestDispatchValues, cq_.get());
+               new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
+                               &Collectd::AsyncService::RequestListValues, cq_.get());
 
                while (true) {
                        void *req = NULL;
@@ -348,7 +336,7 @@ public:
        } /* Mainloop() */
 
 private:
-       collectd::Collectd::AsyncService service_;
+       Collectd::AsyncService service_;
 
        std::unique_ptr<grpc::Server> server_;
        std::unique_ptr<grpc::ServerCompletionQueue> cq_;