Merge pull request #1876 from octo/issue/1819
[collectd.git] / src / grpc.cc
index 9c4f258..42c6b32 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <fstream>
 #include <iostream>
+#include <queue>
 #include <vector>
 
 #include "collectd.grpc.pb.h"
@@ -46,7 +47,6 @@ extern "C" {
 }
 
 using collectd::Collectd;
-using collectd::Dispatch;
 
 using collectd::DispatchValuesRequest;
 using collectd::DispatchValuesResponse;
@@ -258,189 +258,119 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call-backs and call objects
+ * Collectd service
  */
-static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesResponse *reply)
-{
-       value_list_t vl = VALUE_LIST_INIT;
-       auto status = unmarshal_value_list(request.value_list(), &vl);
-       if (!status.ok())
-               return status;
-
-       if (plugin_dispatch_values(&vl))
-               status = grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to enqueue values for writing"));
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+       grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+               value_list_t match;
+               auto status = unmarshal_ident(req->identifier(), &match, false);
+               if (!status.ok()) {
+                       return status;
+               }
 
-       reply->Clear();
-       return status;
-} /* grpc::Status DispatchValue */
+               std::queue<value_list_t> value_lists;
+               status = this->queryValuesRead(&match, &value_lists);
+               if (status.ok()) {
+                       status = this->queryValuesWrite(ctx, writer, &value_lists);
+               }
 
-static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesResponse *res)
-{
-       uc_iter_t *iter;
-       char *name = NULL;
+               while (!value_lists.empty()) {
+                       auto vl = value_lists.front();
+                       value_lists.pop();
+                       sfree(vl.values);
+               }
 
-       value_list_t matcher;
-       auto status = unmarshal_ident(req.identifier(), &matcher, false);
-       if (!status.ok())
                return status;
-
-       if ((iter = uc_get_iterator()) == NULL) {
-               return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to query values: cannot create iterator"));
        }
 
-       status = grpc::Status::OK;
-       while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t vl;
-               if (parse_identifier_vl(name, &vl) != 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to parse identifier"));
-                       break;
-               }
+       grpc::Status DispatchValues(grpc::ServerContext *ctx,
+                                                               grpc::ServerReader<DispatchValuesRequest> *reader,
+                                                               DispatchValuesResponse *res) override {
+               DispatchValuesRequest req;
 
-               if (!ident_matches(&vl, &matcher))
-                       continue;
+               while (reader->Read(&req)) {
+                       value_list_t vl = VALUE_LIST_INIT;
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
 
-               if (uc_iterator_get_time(iter, &vl.time) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value timestamp"));
-                       break;
-               }
-               if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value interval"));
-                       break;
-               }
-               if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-                       break;
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
                }
 
-               auto pb_vl = res->add_value_lists();
-               status = marshal_value_list(&vl, pb_vl);
-               free(vl.values);
-               if (!status.ok())
-                       break;
+               res->Clear();
+               return grpc::Status::OK;
        }
 
-       uc_iterator_destroy(iter);
+private:
+       grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+               uc_iter_t *iter;
+               if ((iter = uc_get_iterator()) == NULL) {
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                               grpc::string("failed to query values: cannot create iterator"));
+               }
 
-       return status;
-} /* grpc::Status QueryValues */
+               grpc::Status status = grpc::Status::OK;
+               char *name = NULL;
+               while (uc_iterator_next(iter, &name) == 0) {
+                       value_list_t vl;
+                       if (parse_identifier_vl(name, &vl) != 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to parse identifier"));
+                               break;
+                       }
 
-// CallData is the abstract base class for asynchronous calls.
-class CallData {
-public:
-  virtual ~CallData() {}
-  virtual void process(bool ok) = 0;
+                       if (!ident_matches(&vl, match))
+                               continue;
 
-protected:
-  CallData() {}
+                       if (uc_iterator_get_time(iter, &vl.time) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value timestamp"));
+                               break;
+                       }
+                       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value interval"));
+                               break;
+                       }
+                       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve values"));
+                               break;
+                       }
 
-private:
-  CallData(const CallData&) = delete;
-  CallData& operator=(const CallData&) = delete;
-};
+                       value_lists->push(vl);
+               } // while (uc_iterator_next(iter, &name) == 0)
 
-/*
- * Collectd service
- */
-// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
-class QueryValuesCallData : public CallData {
-public:
-       QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), writer_(&context_) {
-               // As part of the initialization, we *request* that the system start
-               // processing QueryValues requests. In this request, "this" acts as
-               // the tag uniquely identifying the request (so that different
-               // QueryValuesCallData instances can serve different requests
-               // concurrently), in this case the memory address of this
-               // QueryValuesCallData instance.
-               service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+               uc_iterator_destroy(iter);
+               return status;
        }
 
-       void process(bool ok) final {
-               if (done_) {
-                       delete this;
-               } else {
-                       // Spawn a new QueryValuesCallData instance to serve new clients
-                       // while we process the one for this QueryValuesCallData. The
-                       // instance will deallocate itself as part of its FINISH state.
-                       new QueryValuesCallData(service_, cq_);
+       grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+                                          grpc::ServerWriter<QueryValuesResponse> *writer,
+                                          std::queue<value_list_t> *value_lists) {
+               while (!value_lists->empty()) {
+                       auto vl = value_lists->front();
+                       QueryValuesResponse res;
+                       res.Clear();
 
-                       auto status = QueryValues(&context_, request_, &response_);
+                       auto status = marshal_value_list(&vl, res.mutable_value_list());
                        if (!status.ok()) {
-                               writer_.FinishWithError(status, this);
-                       } else {
-                               writer_.Finish(response_, grpc::Status::OK, this);
+                               return status;
                        }
 
-                       done_ = true;
-               }
-       }
-
-private:
-       bool done_ = false;
-       grpc::ServerContext context_;
-       grpc::ServerCompletionQueue* cq_;
-       Collectd::AsyncService* service_;
-       QueryValuesRequest request_;
-       QueryValuesResponse response_;
-       grpc::ServerAsyncResponseWriter<QueryValuesResponse> writer_;
-};
-
-/*
- * Dispatch service
- */
-// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
-class DispatchValuesCallData : public CallData {
-public:
-       DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), reader_(&context_) {
-               process(true);
-       }
+                       if (!writer->Write(res)) {
+                               return grpc::Status::CANCELLED;
+                       }
 
-       void process(bool ok) final {
-               if (status == Status::INIT) {
-                       service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
-                       status = Status::CALL;
-               } else if (status == Status::CALL) {
-                       reader_.Read(&request_, this);
-                       status = Status::READ;
-               } else if (status == Status::READ && ok) {
-                       (void) DispatchValue(&context_, request_, &response_);
-
-                       reader_.Read(&request_, this);
-               } else if (status == Status::READ) {
-                       response_.Clear();
-
-                       status = Status::DONE;
-               } else if (status == Status::DONE) {
-                       new DispatchValuesCallData(service_, cq_);
-                       delete this;
-               } else {
-                       ERROR("grpc: DispatchValuesCallData: invalid state");
+                       value_lists->pop();
+                       sfree(vl.values);
                }
-       }
 
-private:
-       enum class Status {
-               INIT,
-               CALL,
-               READ,
-               DONE,
-       };
-       Status status = Status::INIT;
-
-       grpc::ServerContext          context_;
-       grpc::ServerCompletionQueue* cq_;
-       Dispatch::AsyncService*      service_;
-
-       DispatchValuesRequest request_;
-       DispatchValuesResponse response_;
-       grpc::ServerAsyncReader<DispatchValuesResponse, DispatchValuesRequest> reader_;
+               return grpc::Status::OK;
+       }
 };
 
 /*
@@ -475,44 +405,20 @@ public:
                        }
                }
 
-               cq_ = builder.AddCompletionQueue();
-
                builder.RegisterService(&collectd_service_);
-               builder.RegisterService(&dispatch_service_);
 
                server_ = builder.BuildAndStart();
-               new QueryValuesCallData(&collectd_service_, cq_.get());
-               new DispatchValuesCallData(&dispatch_service_, cq_.get());
        } /* Start() */
 
        void Shutdown()
        {
                server_->Shutdown();
-               cq_->Shutdown();
        } /* Shutdown() */
 
-       void Mainloop()
-       {
-               while (true) {
-                       void *tag = NULL;
-                       bool ok = false;
-
-                       // Block waiting to read the next event from the completion queue.
-                       // The event is uniquely identified by its tag, which in this case
-                       // is the memory address of a CallData instance.
-                       if (!cq_->Next(&tag, &ok))
-                               break; // Queue shut down.
-
-                       static_cast<CallData*>(tag)->process(ok);
-               }
-       } /* Mainloop() */
-
 private:
-       Collectd::AsyncService collectd_service_;
-       Dispatch::AsyncService dispatch_service_;
+       CollectdImpl collectd_service_;
 
        std::unique_ptr<grpc::Server> server_;
-       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
 }; /* class CollectdServer */
 
 static CollectdServer *server = nullptr;
@@ -521,16 +427,6 @@ static CollectdServer *server = nullptr;
  * collectd plugin interface
  */
 extern "C" {
-       static pthread_t *workers;
-       static size_t workers_num = 5;
-
-       static void *worker_thread(void *arg)
-       {
-               CollectdServer *s = (CollectdServer *)arg;
-               s->Mainloop();
-               return NULL;
-       } /* worker_thread() */
-
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -614,12 +510,6 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
-                       else if (!strcasecmp("WorkerThreads", child->key)) {
-                               int n;
-                               if (cf_util_get_int(child, &n))
-                                       return -1;
-                               workers_num = (size_t)n;
-                       }
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }
@@ -631,47 +521,22 @@ extern "C" {
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
-               size_t i;
-
-               if (! server) {
+               if (!server) {
                        ERROR("grpc: Failed to create server");
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
-               if (! workers) {
-                       delete server;
-                       server = nullptr;
-
-                       ERROR("grpc: Failed to allocate worker threads");
-                       return -1;
-               }
-
                server->Start();
-               for (i = 0; i < workers_num; i++) {
-                       plugin_thread_create(&workers[i], /* attr = */ NULL,
-                                       worker_thread, server);
-               }
-               INFO("grpc: Started %zu workers", workers_num);
                return 0;
        } /* c_grpc_init() */
 
        static int c_grpc_shutdown(void)
        {
-               size_t i;
-
                if (!server)
-                       return -1;
+                       return 0;
 
                server->Shutdown();
 
-               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
-               for (i = 0; i < workers_num; i++)
-                       pthread_join(workers[i], NULL);
-               free(workers);
-               workers = NULL;
-               workers_num = 0;
-
                delete server;
                server = nullptr;