grpc plugin: Switch to the synchronous interface.
authorFlorian Forster <octo@collectd.org>
Tue, 2 Aug 2016 12:15:45 +0000 (14:15 +0200)
committerFlorian Forster <octo@collectd.org>
Wed, 10 Aug 2016 13:02:46 +0000 (15:02 +0200)
src/collectd.conf.in
src/collectd.conf.pod
src/grpc.cc

index b678c42..9375202 100644 (file)
 #</Plugin>
 
 #<Plugin grpc>
-#      WorkerThreads 5
 #      <Listen "0.0.0.0" "50051">
 #              EnableSSL true
 #              SSLRootCerts "/path/to/root.pem"
index ac06547..381d672 100644 (file)
@@ -2669,11 +2669,6 @@ connections.
 
 =back
 
-=item B<WorkerThreads> I<Num>
-
-Number of threads to start for handling incoming connections. The default
-value is B<5>.
-
 =back
 
 =head2 Plugin C<hddtemp>
index aeb1c6d..5a5899d 100644 (file)
@@ -259,94 +259,42 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call-backs and call objects
- */
-static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesResponse *reply)
-{
-       value_list_t vl = VALUE_LIST_INIT;
-       auto status = unmarshal_value_list(request.value_list(), &vl);
-       if (!status.ok())
-               return status;
-
-       if (plugin_dispatch_values(&vl))
-               status = grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to enqueue values for writing"));
-
-       reply->Clear();
-       return status;
-} /* grpc::Status DispatchValue */
-
-// CallData is the abstract base class for asynchronous calls.
-class CallData {
-public:
-  virtual ~CallData() {}
-  virtual void process(bool ok) = 0;
-
-protected:
-  CallData() {}
-
-private:
-  CallData(const CallData&) = delete;
-  CallData& operator=(const CallData&) = delete;
-};
-
-/*
  * Collectd service
  */
-
-// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
-class QueryValuesCallData : public CallData {
+class CollectdImpl : public collectd::Collectd::Service {
 public:
-       QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), writer_(&context_) {
-               process(true);
-       }
+       grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+               value_list_t match;
+               auto err = unmarshal_ident(req->identifier(), &match, false);
+               if (!err.ok()) {
+                       return err;
+               }
 
-       void process(bool ok) final {
-               if (status_ == Status::INIT) {
-                       status_ = Status::READ;
-                       service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
-               } else if (status_ == Status::READ) {
-                       auto err = queryValues();
-                       if (!err.ok()) {
-                               writer_.Finish(err, this);
-                               status_ = Status::DONE;
-                               return;
-                       }
-                       respond();
-               } else if (status_ == Status::WRITE) {
-                       respond();
-               } else if (status_ == Status::DONE) {
-                       new QueryValuesCallData(service_, cq_);
-                       delete this;
-               } else {
-                       throw std::logic_error("Unhandled state enum.");
+               std::queue<value_list_t> value_lists;
+               err = this->read(&match, &value_lists);
+               if (err.ok()) {
+                       err = this->write(ctx, writer, &value_lists);
+               }
+
+               while (!value_lists.empty()) {
+                       auto vl = value_lists.front();
+                       value_lists.pop();
+                       sfree(vl.values);
                }
+
+               return err;
        }
 
 private:
-       enum class Status {
-               INIT,
-               READ,
-               WRITE,
-               DONE,
-       };
-
-       grpc::Status queryValues (void) {
+       grpc::Status read(value_list_t const *match, std::queue<value_list_t> *value_lists) {
                uc_iter_t *iter;
-               char *name = NULL;
-
-               value_list_t matcher;
-               auto status = unmarshal_ident(request_.identifier(), &matcher, false);
-               if (!status.ok())
-                       return status;
-
                if ((iter = uc_get_iterator()) == NULL) {
                        return grpc::Status(grpc::StatusCode::INTERNAL,
                                                                grpc::string("failed to query values: cannot create iterator"));
                }
 
-               status = grpc::Status::OK;
+               grpc::Status status = grpc::Status::OK;
+               char *name = NULL;
                while (uc_iterator_next(iter, &name) == 0) {
                        value_list_t vl;
                        if (parse_identifier_vl(name, &vl) != 0) {
@@ -355,7 +303,7 @@ private:
                                break;
                        }
 
-                       if (!ident_matches(&vl, &matcher))
+                       if (!ident_matches(&vl, match))
                                continue;
 
                        if (uc_iterator_get_time(iter, &vl.time) < 0) {
@@ -374,97 +322,62 @@ private:
                                break;
                        }
 
-                       value_lists_.push(vl);
-               }
+                       value_lists->push(vl);
+               } // while (uc_iterator_next(iter, &name) == 0)
 
                uc_iterator_destroy(iter);
                return status;
        }
 
-       void respond() {
-               if (value_lists_.empty()) {
-                       writer_.Finish(grpc::Status::OK, this);
-                       status_ = Status::DONE;
-                       return;
-               }
+       grpc::Status write(grpc::ServerContext *ctx,
+                                          grpc::ServerWriter<QueryValuesResponse> *writer,
+                                          std::queue<value_list_t> *value_lists) {
+               while (!value_lists->empty()) {
+                       auto vl = value_lists->front();
+                       QueryValuesResponse res;
+                       res.Clear();
+
+                       auto err = marshal_value_list(&vl, res.mutable_value_list());
+                       if (!err.ok()) {
+                               return err;
+                       }
 
-               auto vl = value_lists_.front();
+                       if (!writer->Write(res)) {
+                               return grpc::Status::CANCELLED;
+                       }
 
-               response_.Clear();
-               auto err = marshal_value_list(&vl, response_.mutable_value_list());
-               if (!err.ok()) {
-                       writer_.Finish(err, this);
-                       status_ = Status::DONE;
-                       return;
+                       value_lists->pop();
+                       sfree(vl.values);
                }
 
-               value_lists_.pop();
-               free(vl.values);
-
-               writer_.Write(response_, this);
-               status_ = Status::WRITE;
+               return grpc::Status::OK;
        }
-
-       Status status_ = Status::INIT;
-       grpc::ServerContext context_;
-       grpc::ServerCompletionQueue* cq_;
-       Collectd::AsyncService* service_;
-       QueryValuesRequest request_;
-       std::queue<value_list_t> value_lists_;
-       QueryValuesResponse response_;
-       grpc::ServerAsyncWriter<QueryValuesResponse> writer_;
 };
 
 /*
  * Dispatch service
  */
-// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
-class DispatchValuesCallData : public CallData {
+class DispatchImpl : public collectd::Dispatch::Service {
 public:
-       DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), reader_(&context_) {
-               process(true);
-       }
-
-       void process(bool ok) final {
-               if (status == Status::INIT) {
-                       service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
-                       status = Status::CALL;
-               } else if (status == Status::CALL) {
-                       reader_.Read(&request_, this);
-                       status = Status::READ;
-               } else if (status == Status::READ && ok) {
-                       (void) DispatchValue(&context_, request_, &response_);
-
-                       reader_.Read(&request_, this);
-               } else if (status == Status::READ) {
-                       response_.Clear();
-
-                       status = Status::DONE;
-               } else if (status == Status::DONE) {
-                       new DispatchValuesCallData(service_, cq_);
-                       delete this;
-               } else {
-                       ERROR("grpc: DispatchValuesCallData: invalid state");
+       grpc::Status DispatchValues(grpc::ServerContext *ctx,
+                                                               grpc::ServerReader<DispatchValuesRequest> *reader,
+                                                               DispatchValuesResponse *res) override {
+               DispatchValuesRequest req;
+
+               while (reader->Read(&req)) {
+                       value_list_t vl = VALUE_LIST_INIT;
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
+
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
                }
-       }
 
-private:
-       enum class Status {
-               INIT,
-               CALL,
-               READ,
-               DONE,
-       };
-       Status status = Status::INIT;
-
-       grpc::ServerContext          context_;
-       grpc::ServerCompletionQueue* cq_;
-       Dispatch::AsyncService*      service_;
-
-       DispatchValuesRequest request_;
-       DispatchValuesResponse response_;
-       grpc::ServerAsyncReader<DispatchValuesResponse, DispatchValuesRequest> reader_;
+               res->Clear();
+               return grpc::Status::OK;
+       }
 };
 
 /*
@@ -499,44 +412,22 @@ public:
                        }
                }
 
-               cq_ = builder.AddCompletionQueue();
-
                builder.RegisterService(&collectd_service_);
                builder.RegisterService(&dispatch_service_);
 
                server_ = builder.BuildAndStart();
-               new QueryValuesCallData(&collectd_service_, cq_.get());
-               new DispatchValuesCallData(&dispatch_service_, cq_.get());
        } /* Start() */
 
        void Shutdown()
        {
                server_->Shutdown();
-               cq_->Shutdown();
        } /* Shutdown() */
 
-       void Mainloop()
-       {
-               while (true) {
-                       void *tag = NULL;
-                       bool ok = false;
-
-                       // Block waiting to read the next event from the completion queue.
-                       // The event is uniquely identified by its tag, which in this case
-                       // is the memory address of a CallData instance.
-                       if (!cq_->Next(&tag, &ok))
-                               break; // Queue shut down.
-
-                       static_cast<CallData*>(tag)->process(ok);
-               }
-       } /* Mainloop() */
-
 private:
-       Collectd::AsyncService collectd_service_;
-       Dispatch::AsyncService dispatch_service_;
+       CollectdImpl collectd_service_;
+       DispatchImpl dispatch_service_;
 
        std::unique_ptr<grpc::Server> server_;
-       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
 }; /* class CollectdServer */
 
 static CollectdServer *server = nullptr;
@@ -545,16 +436,6 @@ static CollectdServer *server = nullptr;
  * collectd plugin interface
  */
 extern "C" {
-       static pthread_t *workers;
-       static size_t workers_num = 5;
-
-       static void *worker_thread(void *arg)
-       {
-               CollectdServer *s = (CollectdServer *)arg;
-               s->Mainloop();
-               return NULL;
-       } /* worker_thread() */
-
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -638,12 +519,6 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
-                       else if (!strcasecmp("WorkerThreads", child->key)) {
-                               int n;
-                               if (cf_util_get_int(child, &n))
-                                       return -1;
-                               workers_num = (size_t)n;
-                       }
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }
@@ -655,47 +530,22 @@ extern "C" {
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
-               size_t i;
-
-               if (! server) {
+               if (!server) {
                        ERROR("grpc: Failed to create server");
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
-               if (! workers) {
-                       delete server;
-                       server = nullptr;
-
-                       ERROR("grpc: Failed to allocate worker threads");
-                       return -1;
-               }
-
                server->Start();
-               for (i = 0; i < workers_num; i++) {
-                       plugin_thread_create(&workers[i], /* attr = */ NULL,
-                                       worker_thread, server);
-               }
-               INFO("grpc: Started %zu workers", workers_num);
                return 0;
        } /* c_grpc_init() */
 
        static int c_grpc_shutdown(void)
        {
-               size_t i;
-
                if (!server)
-                       return -1;
+                       return 0;
 
                server->Shutdown();
 
-               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
-               for (i = 0; i < workers_num; i++)
-                       pthread_join(workers[i], NULL);
-               free(workers);
-               workers = NULL;
-               workers_num = 0;
-
                delete server;
                server = nullptr;