grpc plugin: Implement the ListValues() RPC.
authorSebastian Harl <sh@tokkee.org>
Thu, 29 Oct 2015 23:01:52 +0000 (00:01 +0100)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
proto/collectd.proto
src/collectd.conf.pod
src/grpc.cc

index 84db755..ba60793 100644 (file)
@@ -27,10 +27,14 @@ syntax = "proto3";
 package collectd;
 
 import "types.proto";
 package collectd;
 
 import "types.proto";
+import "google/protobuf/timestamp.proto";
 
 service Collectd {
        // Dispatch collected values to collectd.
        rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
 
 service Collectd {
        // Dispatch collected values to collectd.
        rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
+
+       // Retrieve a list of all values available in collectd's value cache.
+       rpc ListValues(ListValuesRequest) returns (ListValuesReply);
 }
 
 // The arguments to DispatchValues.
 }
 
 // The arguments to DispatchValues.
@@ -41,3 +45,17 @@ message DispatchValuesRequest {
 // The response from DispatchValues.
 message DispatchValuesReply {
 }
 // The response from DispatchValues.
 message DispatchValuesReply {
 }
+
+// The arguments to ListValues.
+message ListValuesRequest {
+}
+
+// The response from ListValues.
+message ListValuesReply {
+       message Value {
+               string name = 1;
+               google.protobuf.Timestamp time = 2;
+       }
+
+       repeated Value value = 1;
+}
index 82562ed..083aae8 100644 (file)
@@ -2499,9 +2499,9 @@ source, this is optional. Otherwise the option is required.
 
 =head2 Plugin C<grpc>
 
 
 =head2 Plugin C<grpc>
 
-The I<grpc> plugin provides an RPC interface to submit values to collectd
-based on the open source gRPC framework. It exposes and end-point for
-dispatching values to the daemon.
+The I<grpc> plugin provides an RPC interface to submit values to or query
+values from collectd based on the open source gRPC framework. It exposes and
+end-point for dispatching values to the daemon.
 
 The B<gRPC> homepage can be found at L<https://grpc.io/>.
 
 
 The B<gRPC> homepage can be found at L<https://grpc.io/>.
 
index 9523fc2..abdd6b0 100644 (file)
@@ -38,6 +38,8 @@ extern "C" {
 #include "configfile.h"
 #include "plugin.h"
 
 #include "configfile.h"
 #include "plugin.h"
 
+#include "daemon/utils_cache.h"
+
        typedef struct {
                char *addr;
                char *port;
        typedef struct {
                char *addr;
                char *port;
@@ -225,7 +227,63 @@ private:
        collectd::DispatchValuesReply reply_;
 
        grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
        collectd::DispatchValuesReply reply_;
 
        grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-};
+}; /* class DispatchValuesCall */
+
+class ListValuesCall : public Call
+{
+public:
+       ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+               : Call(service, cq), responder_(&ctx_)
+       {
+               Handle();
+       } /* ListValuesCall() */
+
+       virtual ~ListValuesCall()
+       { }
+
+private:
+       void Create()
+       {
+               service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+       } /* Create() */
+
+       void Process()
+       {
+               new ListValuesCall(service_, cq_);
+
+               char **names = NULL;
+               cdtime_t *times = NULL;
+               size_t i, n = 0;
+
+               auto status = grpc::Status::OK;
+               if (uc_get_names(&names, &times, &n)) {
+                       status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to retrieve values"));
+               }
+
+               for (i = 0; i < n; i++) {
+                       auto v = reply_.add_value();
+                       auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
+                       v->set_name(names[i]);
+                       v->set_allocated_time(new google::protobuf::Timestamp(t));
+                       sfree(names[i]);
+               }
+               sfree(names);
+               sfree(times);
+
+               responder_.Finish(reply_, status, this);
+       } /* Process() */
+
+       void Finish()
+       {
+               delete this;
+       } /* Finish() */
+
+       collectd::ListValuesRequest request_;
+       collectd::ListValuesReply reply_;
+
+       grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
+}; /* class ListValuesCall */
 
 /*
  * gRPC server implementation
 
 /*
  * gRPC server implementation
@@ -272,6 +330,7 @@ public:
        {
                // Register request types.
                new DispatchValuesCall(&service_, cq_.get());
        {
                // Register request types.
                new DispatchValuesCall(&service_, cq_.get());
+               new ListValuesCall(&service_, cq_.get());
 
                while (true) {
                        void *req = NULL;
 
                while (true) {
                        void *req = NULL;