grpc plugin: Create a "Dispatch" service and use streaming RPCs.
authorFlorian Forster <octo@collectd.org>
Fri, 29 Jul 2016 14:54:58 +0000 (16:54 +0200)
committerFlorian Forster <octo@collectd.org>
Tue, 2 Aug 2016 14:56:58 +0000 (16:56 +0200)
This patch, unfortunately, does two things:
* It breaks out the DispatchValues() call into a separate service. The
  intention of this is to make it easier to implement only part of the
  API, namely to not implement querying of values, which may not be
  useful for stateless proxies.
* It turns the DispatchValues() call into a (client side) streaming RPC.
  That means that a client can send many ValueLists to the server in one
  streaming request, rather than many self-contained requests.

This code is heavily influenced by test code of the "protobuf-qml"
project, which is MIT licensed.

proto/collectd.proto
src/grpc.cc

index 4bc3501..0ed6606 100644 (file)
@@ -29,13 +29,15 @@ package collectd;
 import "types.proto";
 
 service Collectd {
-       // Dispatch collected values to collectd.
-       rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
-
        // Query a list of values available from collectd's value cache.
        rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply);
 }
 
+service Dispatch {
+  // DispatchValues sends a stream of ValueLists to the server.
+  rpc DispatchValues(stream DispatchValuesRequest) returns (DispatchValuesReply);
+}
+
 // The arguments to DispatchValues.
 message DispatchValuesRequest {
        collectd.types.ValueList value_list = 1;
index 5c8fa15..ca3314e 100644 (file)
@@ -46,6 +46,7 @@ extern "C" {
 }
 
 using collectd::Collectd;
+using collectd::Dispatch;
 
 using collectd::DispatchValuesRequest;
 using collectd::DispatchValuesReply;
@@ -259,9 +260,7 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 /*
  * request call-backs and call objects
  */
-
-static grpc::Status Process(grpc::ServerContext *ctx,
-               DispatchValuesRequest request, DispatchValuesReply *reply)
+static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesReply *reply)
 {
        value_list_t vl = VALUE_LIST_INIT;
        auto status = unmarshal_value_list(request.value_list(), &vl);
@@ -271,17 +270,18 @@ static grpc::Status Process(grpc::ServerContext *ctx,
        if (plugin_dispatch_values(&vl))
                status = grpc::Status(grpc::StatusCode::INTERNAL,
                                grpc::string("failed to enqueue values for writing"));
+
+       reply->Clear();
        return status;
-} /* Process(): DispatchValues */
+} /* grpc::Status DispatchValue */
 
-static grpc::Status Process(grpc::ServerContext *ctx,
-               QueryValuesRequest request, QueryValuesReply *reply)
+static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesReply *res)
 {
        uc_iter_t *iter;
        char *name = NULL;
 
        value_list_t matcher;
-       auto status = unmarshal_ident(request.identifier(), &matcher, false);
+       auto status = unmarshal_ident(req.identifier(), &matcher, false);
        if (!status.ok())
                return status;
 
@@ -292,35 +292,35 @@ static grpc::Status Process(grpc::ServerContext *ctx,
 
        status = grpc::Status::OK;
        while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t res;
-               if (parse_identifier_vl(name, &res) != 0) {
+               value_list_t vl;
+               if (parse_identifier_vl(name, &vl) != 0) {
                        status = grpc::Status(grpc::StatusCode::INTERNAL,
                                        grpc::string("failed to parse identifier"));
                        break;
                }
 
-               if (!ident_matches(&res, &matcher))
+               if (!ident_matches(&vl, &matcher))
                        continue;
 
-               if (uc_iterator_get_time(iter, &res.time) < 0) {
+               if (uc_iterator_get_time(iter, &vl.time) < 0) {
                        status = grpc::Status(grpc::StatusCode::INTERNAL,
                                        grpc::string("failed to retrieve value timestamp"));
                        break;
                }
-               if (uc_iterator_get_interval(iter, &res.interval) < 0) {
+               if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
                        status = grpc::Status(grpc::StatusCode::INTERNAL,
                                        grpc::string("failed to retrieve value interval"));
                        break;
                }
-               if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
+               if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
                        status = grpc::Status(grpc::StatusCode::INTERNAL,
                                        grpc::string("failed to retrieve values"));
                        break;
                }
 
-               auto vl = reply->add_value_lists();
-               status = marshal_value_list(&res, vl);
-               free(res.values);
+               auto pb_vl = res->add_value_lists();
+               status = marshal_value_list(&vl, pb_vl);
+               free(vl.values);
                if (!status.ok())
                        break;
        }
@@ -328,97 +328,124 @@ static grpc::Status Process(grpc::ServerContext *ctx,
        uc_iterator_destroy(iter);
 
        return status;
-} /* Process(): QueryValues */
+} /* grpc::Status QueryValues */
 
-class Call
-{
+// CallData is the abstract base class for asynchronous calls.
+class CallData {
 public:
-       Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : service_(service), cq_(cq), status_(CREATE)
-       { }
-
-       virtual ~Call()
-       { }
-
-       void Handle()
-       {
-               if (status_ == CREATE) {
-                       Create();
-                       status_ = PROCESS;
-               }
-               else if (status_ == PROCESS) {
-                       Process();
-                       status_ = FINISH;
-               }
-               else {
-                       GPR_ASSERT(status_ == FINISH);
-                       Finish();
-               }
-       } /* Handle() */
+  virtual ~CallData() {}
+  virtual void process(bool ok) = 0;
 
 protected:
-       virtual void Create() = 0;
-       virtual void Process() = 0;
-       virtual void Finish() = 0;
-
-       Collectd::AsyncService *service_;
-       grpc::ServerCompletionQueue *cq_;
-       grpc::ServerContext ctx_;
+  CallData() {}
 
 private:
-       enum CallStatus { CREATE, PROCESS, FINISH };
-       CallStatus status_;
-}; /* class Call */
-
-template<typename RequestT, typename ReplyT>
-class RpcCall final : public Call
-{
-       typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
-                       RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
-                       grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+  CallData(const CallData&) = delete;
+  CallData& operator=(const CallData&) = delete;
+};
 
+/*
+ * Collectd service
+ */
+// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
+class QueryValuesCallData : public CallData {
 public:
-       RpcCall(Collectd::AsyncService *service,
-                       CreatorT creator, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), creator_(creator), responder_(&ctx_)
-       {
-               Handle();
-       } /* RpcCall() */
-
-       virtual ~RpcCall()
-       { }
+       QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
+                       : cq_(cq), service_(service), writer_(&context_) {
+               // As part of the initialization, we *request* that the system start
+               // processing QueryValues requests. In this request, "this" acts as
+               // the tag uniquely identifying the request (so that different
+               // QueryValuesCallData instances can serve different requests
+               // concurrently), in this case the memory address of this
+               // QueryValuesCallData instance.
+               service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+       }
 
-private:
-       void Create()
-       {
-               (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
-       } /* Create() */
+       void process(bool ok) final {
+               if (done_) {
+                       delete this;
+               } else {
+                       // Spawn a new QueryValuesCallData instance to serve new clients
+                       // while we process the one for this QueryValuesCallData. The
+                       // instance will deallocate itself as part of its FINISH state.
+                       new QueryValuesCallData(service_, cq_);
+
+                       auto status = QueryValues(&context_, request_, &response_);
+                       if (!status.ok()) {
+                               writer_.FinishWithError(status, this);
+                       } else {
+                               writer_.Finish(response_, grpc::Status::OK, this);
+                       }
 
-       void Process()
-       {
-               // Add a new request object to the queue.
-               new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
-               grpc::Status status = ::Process(&ctx_, request_, &reply_);
-               responder_.Finish(reply_, status, this);
-       } /* Process() */
+                       done_ = true;
+               }
+       }
 
-       void Finish()
-       {
-               delete this;
-       } /* Finish() */
+private:
+       bool done_ = false;
+       grpc::ServerContext context_;
+       grpc::ServerCompletionQueue* cq_;
+       Collectd::AsyncService* service_;
+       QueryValuesRequest request_;
+       QueryValuesReply response_;
+       grpc::ServerAsyncResponseWriter<QueryValuesReply> writer_;
+};
 
-       CreatorT creator_;
+/*
+ * Dispatch service
+ */
+// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
+class DispatchValuesCallData : public CallData {
+public:
+       DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
+                       : cq_(cq), service_(service), reader_(&context_) {
+               process(true);
+       }
 
-       RequestT request_;
-       ReplyT reply_;
+       void process(bool ok) final {
+               if (status == Status::INIT) {
+                       service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
+                       status = Status::CALL;
+               } else if (status == Status::CALL) {
+                       reader_.Read(&request_, this);
+                       status = Status::READ;
+               } else if (status == Status::READ && ok) {
+                       (void) DispatchValue(&context_, request_, &response_);
+
+                       reader_.Read(&request_, this);
+               } else if (status == Status::READ) {
+                       response_.Clear();
+
+                       status = Status::DONE;
+               } else if (status == Status::DONE) {
+                       new DispatchValuesCallData(service_, cq_);
+                       delete this;
+               } else {
+                       ERROR("grpc: DispatchValuesCallData: invalid state");
+               }
+       }
 
-       grpc::ServerAsyncResponseWriter<ReplyT> responder_;
-}; /* class RpcCall */
+private:
+       enum class Status {
+               INIT,
+               CALL,
+               READ,
+               DONE,
+       };
+       Status status = Status::INIT;
+
+       grpc::ServerContext          context_;
+       grpc::ServerCompletionQueue* cq_;
+       Dispatch::AsyncService*      service_;
+
+       DispatchValuesRequest request_;
+       DispatchValuesReply response_;
+       grpc::ServerAsyncReader<DispatchValuesReply, DispatchValuesRequest> reader_;
+};
 
 /*
  * gRPC server implementation
  */
-
 class CollectdServer final
 {
 public:
@@ -448,9 +475,14 @@ public:
                        }
                }
 
-               builder.RegisterService(&service_);
                cq_ = builder.AddCompletionQueue();
+
+               builder.RegisterService(&collectd_service_);
+               builder.RegisterService(&dispatch_service_);
+
                server_ = builder.BuildAndStart();
+               new QueryValuesCallData(&collectd_service_, cq_.get());
+               new DispatchValuesCallData(&dispatch_service_, cq_.get());
        } /* Start() */
 
        void Shutdown()
@@ -461,29 +493,23 @@ public:
 
        void Mainloop()
        {
-               // Register request types.
-               new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestDispatchValues, cq_.get());
-               new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestQueryValues, cq_.get());
-
                while (true) {
-                       void *req = NULL;
+                       void *tag = NULL;
                        bool ok = false;
 
-                       if (!cq_->Next(&req, &ok))
+                       // Block waiting to read the next event from the completion queue.
+                       // The event is uniquely identified by its tag, which in this case
+                       // is the memory address of a CallData instance.
+                       if (!cq_->Next(&tag, &ok))
                                break; // Queue shut down.
-                       if (!ok) {
-                               ERROR("grpc: Failed to read from queue");
-                               break;
-                       }
 
-                       static_cast<Call *>(req)->Handle();
+                       static_cast<CallData*>(tag)->process(ok);
                }
        } /* Mainloop() */
 
 private:
-       Collectd::AsyncService service_;
+       Collectd::AsyncService collectd_service_;
+       Dispatch::AsyncService dispatch_service_;
 
        std::unique_ptr<grpc::Server> server_;
        std::unique_ptr<grpc::ServerCompletionQueue> cq_;
@@ -494,7 +520,6 @@ static CollectdServer *server = nullptr;
 /*
  * collectd plugin interface
  */
-
 extern "C" {
        static pthread_t *workers;
        static size_t workers_num = 5;