Merge pull request #1876 from octo/issue/1819
authorSebastian Harl <sh@tokkee.org>
Thu, 11 Aug 2016 20:51:50 +0000 (22:51 +0200)
committerGitHub <noreply@github.com>
Thu, 11 Aug 2016 20:51:50 +0000 (22:51 +0200)
grpc plugin: Refactor plugin.

proto/collectd.proto
proto/types.proto
src/collectd.conf.in
src/collectd.conf.pod
src/grpc.cc

index 5134dbf..917c5de 100644 (file)
 syntax = "proto3";
 
 package collectd;
+option go_package = "collectd.org/rpc/proto";
 
 import "types.proto";
 
 service Collectd {
-       // Dispatch collected values to collectd.
-       rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
-
-       // Query a list of values available from collectd's value cache.
-       rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply);
+  // DispatchValues reads the value lists from the DispatchValuesRequest stream.
+  // The gRPC server embedded into collectd will inject them into the system
+  // just like the network plugin.
+  rpc DispatchValues(stream DispatchValuesRequest)
+      returns (DispatchValuesResponse);
+
+  // QueryValues returns a stream of matching value lists from collectd's
+  // internal cache.
+  rpc QueryValues(QueryValuesRequest) returns (stream QueryValuesResponse);
 }
 
 // The arguments to DispatchValues.
 message DispatchValuesRequest {
-       collectd.types.ValueList values = 1;
+  // value_list is the metric to be sent to the server.
+  collectd.types.ValueList value_list = 1;
 }
 
 // The response from DispatchValues.
-message DispatchValuesReply {
-}
+message DispatchValuesResponse {}
 
 // The arguments to QueryValues.
 message QueryValuesRequest {
-       // Query by the fields of the identifier. Only return values matching the
-       // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
-       // any value.
-       collectd.types.Identifier identifier = 1;
+  // Query by the fields of the identifier. Only return values matching the
+  // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
+  // any value.
+  collectd.types.Identifier identifier = 1;
 }
 
 // The response from QueryValues.
-message QueryValuesReply {
-       repeated collectd.types.ValueList values = 1;
-}
+message QueryValuesResponse { collectd.types.ValueList value_list = 1; }
index 4a852e4..952c541 100644 (file)
 syntax = "proto3";
 
 package collectd.types;
+option go_package = "collectd.org/rpc/proto/types";
 
 import "google/protobuf/duration.proto";
 import "google/protobuf/timestamp.proto";
 
 message Identifier {
-       string host = 1;
-       string plugin = 2;
-       string plugin_instance = 3;
-       string type = 4;
-       string type_instance = 5;
+  string host = 1;
+  string plugin = 2;
+  string plugin_instance = 3;
+  string type = 4;
+  string type_instance = 5;
 }
 
 message Value {
-       oneof value {
-               uint64 counter = 1;
-               double gauge = 2;
-               int64 derive = 3;
-               uint64 absolute = 4;
-       };
+  oneof value {
+    uint64 counter = 1;
+    double gauge = 2;
+    int64 derive = 3;
+    uint64 absolute = 4;
+  };
 }
 
 message ValueList {
-       repeated Value value = 1;
+  repeated Value values = 1;
 
-       google.protobuf.Timestamp time = 2;
-       google.protobuf.Duration interval = 3;
+  google.protobuf.Timestamp time = 2;
+  google.protobuf.Duration interval = 3;
 
-       Identifier identifier = 4;
+  Identifier identifier = 4;
+
+  repeated string ds_names = 5;
 }
index 8eb08a6..e06465b 100644 (file)
 #</Plugin>
 
 #<Plugin grpc>
-#      WorkerThreads 5
 #      <Listen "0.0.0.0" "50051">
 #              EnableSSL true
 #              SSLRootCerts "/path/to/root.pem"
index f3ff4bb..9ae7310 100644 (file)
@@ -2751,11 +2751,6 @@ connections.
 
 =back
 
-=item B<WorkerThreads> I<Num>
-
-Number of threads to start for handling incoming connections. The default
-value is B<5>.
-
 =back
 
 =head2 Plugin C<hddtemp>
index ae3dab2..42c6b32 100644 (file)
@@ -29,6 +29,7 @@
 
 #include <fstream>
 #include <iostream>
+#include <queue>
 #include <vector>
 
 #include "collectd.grpc.pb.h"
@@ -48,9 +49,9 @@ extern "C" {
 using collectd::Collectd;
 
 using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesReply;
+using collectd::DispatchValuesResponse;
 using collectd::QueryValuesRequest;
-using collectd::QueryValuesReply;
+using collectd::QueryValuesResponse;
 
 using google::protobuf::util::TimeUtil;
 
@@ -172,7 +173,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::
        msg->set_allocated_interval(new google::protobuf::Duration(d));
 
        for (size_t i = 0; i < vl->values_len; ++i) {
-               auto v = msg->add_value();
+               auto v = msg->add_values();
                switch (ds->ds[i].type) {
                        case DS_TYPE_COUNTER:
                                v->set_counter(vl->values[i].counter);
@@ -190,6 +191,9 @@ static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::
                                return grpc::Status(grpc::StatusCode::INTERNAL,
                                                grpc::string("unknown value type"));
                }
+
+               auto name = msg->add_ds_names();
+               name->assign(ds->ds[i].name);
        }
 
        return grpc::Status::OK;
@@ -208,7 +212,7 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
        size_t values_len = 0;
 
        status = grpc::Status::OK;
-       for (auto v : msg.value()) {
+       for (auto v : msg.values()) {
                value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
                if (!val) {
                        status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
@@ -254,168 +258,124 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call-backs and call objects
+ * Collectd service
  */
-
-static grpc::Status Process(grpc::ServerContext *ctx,
-               DispatchValuesRequest request, DispatchValuesReply *reply)
-{
-       value_list_t vl = VALUE_LIST_INIT;
-       auto status = unmarshal_value_list(request.values(), &vl);
-       if (!status.ok())
-               return status;
-
-       if (plugin_dispatch_values(&vl))
-               status = grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to enqueue values for writing"));
-       return status;
-} /* Process(): DispatchValues */
-
-static grpc::Status Process(grpc::ServerContext *ctx,
-               QueryValuesRequest request, QueryValuesReply *reply)
-{
-       uc_iter_t *iter;
-       char *name = NULL;
-
-       value_list_t matcher;
-       auto status = unmarshal_ident(request.identifier(), &matcher, false);
-       if (!status.ok())
-               return status;
-
-       if ((iter = uc_get_iterator()) == NULL) {
-               return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to query values: cannot create iterator"));
-       }
-
-       status = grpc::Status::OK;
-       while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t res;
-               if (parse_identifier_vl(name, &res) != 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to parse identifier"));
-                       break;
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+       grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+               value_list_t match;
+               auto status = unmarshal_ident(req->identifier(), &match, false);
+               if (!status.ok()) {
+                       return status;
                }
 
-               if (!ident_matches(&res, &matcher))
-                       continue;
-
-               if (uc_iterator_get_time(iter, &res.time) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value timestamp"));
-                       break;
+               std::queue<value_list_t> value_lists;
+               status = this->queryValuesRead(&match, &value_lists);
+               if (status.ok()) {
+                       status = this->queryValuesWrite(ctx, writer, &value_lists);
                }
-               if (uc_iterator_get_interval(iter, &res.interval) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value interval"));
-                       break;
-               }
-               if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-                       break;
+
+               while (!value_lists.empty()) {
+                       auto vl = value_lists.front();
+                       value_lists.pop();
+                       sfree(vl.values);
                }
 
-               auto vl = reply->add_values();
-               status = marshal_value_list(&res, vl);
-               free(res.values);
-               if (!status.ok())
-                       break;
+               return status;
        }
 
-       uc_iterator_destroy(iter);
-
-       return status;
-} /* Process(): QueryValues */
-
-class Call
-{
-public:
-       Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
-               : service_(service), cq_(cq), status_(CREATE)
-       { }
+       grpc::Status DispatchValues(grpc::ServerContext *ctx,
+                                                               grpc::ServerReader<DispatchValuesRequest> *reader,
+                                                               DispatchValuesResponse *res) override {
+               DispatchValuesRequest req;
 
-       virtual ~Call()
-       { }
+               while (reader->Read(&req)) {
+                       value_list_t vl = VALUE_LIST_INIT;
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
 
-       void Handle()
-       {
-               if (status_ == CREATE) {
-                       Create();
-                       status_ = PROCESS;
-               }
-               else if (status_ == PROCESS) {
-                       Process();
-                       status_ = FINISH;
-               }
-               else {
-                       GPR_ASSERT(status_ == FINISH);
-                       Finish();
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
                }
-       } /* Handle() */
 
-protected:
-       virtual void Create() = 0;
-       virtual void Process() = 0;
-       virtual void Finish() = 0;
-
-       Collectd::AsyncService *service_;
-       grpc::ServerCompletionQueue *cq_;
-       grpc::ServerContext ctx_;
+               res->Clear();
+               return grpc::Status::OK;
+       }
 
 private:
-       enum CallStatus { CREATE, PROCESS, FINISH };
-       CallStatus status_;
-}; /* class Call */
+       grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+               uc_iter_t *iter;
+               if ((iter = uc_get_iterator()) == NULL) {
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                               grpc::string("failed to query values: cannot create iterator"));
+               }
 
-template<typename RequestT, typename ReplyT>
-class RpcCall final : public Call
-{
-       typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
-                       RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
-                       grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+               grpc::Status status = grpc::Status::OK;
+               char *name = NULL;
+               while (uc_iterator_next(iter, &name) == 0) {
+                       value_list_t vl;
+                       if (parse_identifier_vl(name, &vl) != 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to parse identifier"));
+                               break;
+                       }
 
-public:
-       RpcCall(Collectd::AsyncService *service,
-                       CreatorT creator, grpc::ServerCompletionQueue *cq)
-               : Call(service, cq), creator_(creator), responder_(&ctx_)
-       {
-               Handle();
-       } /* RpcCall() */
+                       if (!ident_matches(&vl, match))
+                               continue;
 
-       virtual ~RpcCall()
-       { }
+                       if (uc_iterator_get_time(iter, &vl.time) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value timestamp"));
+                               break;
+                       }
+                       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value interval"));
+                               break;
+                       }
+                       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve values"));
+                               break;
+                       }
 
-private:
-       void Create()
-       {
-               (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
-       } /* Create() */
+                       value_lists->push(vl);
+               } // while (uc_iterator_next(iter, &name) == 0)
 
-       void Process()
-       {
-               // Add a new request object to the queue.
-               new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
-               grpc::Status status = ::Process(&ctx_, request_, &reply_);
-               responder_.Finish(reply_, status, this);
-       } /* Process() */
+               uc_iterator_destroy(iter);
+               return status;
+       }
 
-       void Finish()
-       {
-               delete this;
-       } /* Finish() */
+       grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+                                          grpc::ServerWriter<QueryValuesResponse> *writer,
+                                          std::queue<value_list_t> *value_lists) {
+               while (!value_lists->empty()) {
+                       auto vl = value_lists->front();
+                       QueryValuesResponse res;
+                       res.Clear();
+
+                       auto status = marshal_value_list(&vl, res.mutable_value_list());
+                       if (!status.ok()) {
+                               return status;
+                       }
 
-       CreatorT creator_;
+                       if (!writer->Write(res)) {
+                               return grpc::Status::CANCELLED;
+                       }
 
-       RequestT request_;
-       ReplyT reply_;
+                       value_lists->pop();
+                       sfree(vl.values);
+               }
 
-       grpc::ServerAsyncResponseWriter<ReplyT> responder_;
-}; /* class RpcCall */
+               return grpc::Status::OK;
+       }
+};
 
 /*
  * gRPC server implementation
  */
-
 class CollectdServer final
 {
 public:
@@ -445,45 +405,20 @@ public:
                        }
                }
 
-               builder.RegisterService(&service_);
-               cq_ = builder.AddCompletionQueue();
+               builder.RegisterService(&collectd_service_);
+
                server_ = builder.BuildAndStart();
        } /* Start() */
 
        void Shutdown()
        {
                server_->Shutdown();
-               cq_->Shutdown();
        } /* Shutdown() */
 
-       void Mainloop()
-       {
-               // Register request types.
-               new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestDispatchValues, cq_.get());
-               new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestQueryValues, cq_.get());
-
-               while (true) {
-                       void *req = NULL;
-                       bool ok = false;
-
-                       if (!cq_->Next(&req, &ok))
-                               break; // Queue shut down.
-                       if (!ok) {
-                               ERROR("grpc: Failed to read from queue");
-                               break;
-                       }
-
-                       static_cast<Call *>(req)->Handle();
-               }
-       } /* Mainloop() */
-
 private:
-       Collectd::AsyncService service_;
+       CollectdImpl collectd_service_;
 
        std::unique_ptr<grpc::Server> server_;
-       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
 }; /* class CollectdServer */
 
 static CollectdServer *server = nullptr;
@@ -491,18 +426,7 @@ static CollectdServer *server = nullptr;
 /*
  * collectd plugin interface
  */
-
 extern "C" {
-       static pthread_t *workers;
-       static size_t workers_num = 5;
-
-       static void *worker_thread(void *arg)
-       {
-               CollectdServer *s = (CollectdServer *)arg;
-               s->Mainloop();
-               return NULL;
-       } /* worker_thread() */
-
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -586,12 +510,6 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
-                       else if (!strcasecmp("WorkerThreads", child->key)) {
-                               int n;
-                               if (cf_util_get_int(child, &n))
-                                       return -1;
-                               workers_num = (size_t)n;
-                       }
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }
@@ -603,47 +521,22 @@ extern "C" {
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
-               size_t i;
-
-               if (! server) {
+               if (!server) {
                        ERROR("grpc: Failed to create server");
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
-               if (! workers) {
-                       delete server;
-                       server = nullptr;
-
-                       ERROR("grpc: Failed to allocate worker threads");
-                       return -1;
-               }
-
                server->Start();
-               for (i = 0; i < workers_num; i++) {
-                       plugin_thread_create(&workers[i], /* attr = */ NULL,
-                                       worker_thread, server);
-               }
-               INFO("grpc: Started %zu workers", workers_num);
                return 0;
        } /* c_grpc_init() */
 
        static int c_grpc_shutdown(void)
        {
-               size_t i;
-
                if (!server)
-                       return -1;
+                       return 0;
 
                server->Shutdown();
 
-               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
-               for (i = 0; i < workers_num; i++)
-                       pthread_join(workers[i], NULL);
-               free(workers);
-               workers = NULL;
-               workers_num = 0;
-
                delete server;
                server = nullptr;