grpc plugin: Replace the ListValues RPC with QueryValues.
authorSebastian Harl <sh@tokkee.org>
Fri, 6 May 2016 22:15:49 +0000 (00:15 +0200)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 21:44:19 +0000 (23:44 +0200)
This is a more powerful function to query values from collectd's cache.

proto/collectd.proto
proto/types.proto
src/grpc.cc

index ba60793..608fcbb 100644 (file)
@@ -1,5 +1,5 @@
 // collectd - proto/collectd.proto
-// Copyright (C) 2015 Sebastian Harl
+// Copyright (C) 2015-2016 Sebastian Harl
 //
 // Permission is hereby granted, free of charge, to any person obtaining a
 // copy of this software and associated documentation files (the "Software"),
@@ -33,8 +33,8 @@ service Collectd {
        // Dispatch collected values to collectd.
        rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply);
 
-       // Retrieve a list of all values available in collectd's value cache.
-       rpc ListValues(ListValuesRequest) returns (ListValuesReply);
+       // Query a list of values available from collectd's value cache.
+       rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply);
 }
 
 // The arguments to DispatchValues.
@@ -46,16 +46,15 @@ message DispatchValuesRequest {
 message DispatchValuesReply {
 }
 
-// The arguments to ListValues.
-message ListValuesRequest {
+// The arguments to QueryValues.
+message QueryValuesRequest {
+       // Query by the fields of the identifier. Only return values matching the
+       // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match
+       // any value.
+       collectd.types.Identifier identifier = 1;
 }
 
-// The response from ListValues.
-message ListValuesReply {
-       message Value {
-               string name = 1;
-               google.protobuf.Timestamp time = 2;
-       }
-
-       repeated Value value = 1;
+// The response from QueryValues.
+message QueryValuesReply {
+       repeated collectd.types.ValueList values = 1;
 }
index 6e6714b..4a852e4 100644 (file)
@@ -1,5 +1,5 @@
 // collectd - proto/types.proto
-// Copyright (C) 2015 Sebastian Harl
+// Copyright (C) 2015-2016 Sebastian Harl
 //
 // Permission is hereby granted, free of charge, to any person obtaining a
 // copy of this software and associated documentation files (the "Software"),
index 45fb029..4e47098 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/grpc.cc
- * Copyright (C) 2015 Sebastian Harl
+ * Copyright (C) 2015-2016 Sebastian Harl
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
@@ -30,6 +30,7 @@
 #include "collectd.grpc.pb.h"
 
 extern "C" {
+#include <fnmatch.h>
 #include <stdbool.h>
 #include <pthread.h>
 
@@ -53,33 +54,67 @@ using collectd::Collectd;
 
 using collectd::DispatchValuesRequest;
 using collectd::DispatchValuesReply;
-using collectd::ListValuesRequest;
-using collectd::ListValuesReply;
+using collectd::QueryValuesRequest;
+using collectd::QueryValuesReply;
 
 using google::protobuf::util::TimeUtil;
 
 /*
+ * helper functions
+ */
+
+static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
+{
+       if (fnmatch(matcher->host, vl->host, 0))
+               return false;
+
+       if (fnmatch(matcher->plugin, vl->plugin, 0))
+               return false;
+       if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
+               return false;
+
+       if (fnmatch(matcher->type, vl->type, 0))
+               return false;
+       if (fnmatch(matcher->type_instance, vl->type_instance, 0))
+               return false;
+
+       return true;
+} /* ident_matches */
+
+/*
  * proto conversion
  */
 
-static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl)
+static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
+{
+       msg->set_host(vl->host);
+       msg->set_plugin(vl->plugin);
+       if (vl->plugin_instance[0] != '\0')
+               msg->set_plugin_instance(vl->plugin_instance);
+       msg->set_type(vl->type);
+       if (vl->type_instance[0] != '\0')
+               msg->set_type_instance(vl->type_instance);
+} /* marshal_ident */
+
+static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
+               bool require_fields)
 {
        std::string s;
 
        s = msg.host();
-       if (!s.length())
+       if (!s.length() && require_fields)
                return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
                                grpc::string("missing host name"));
        sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
 
        s = msg.plugin();
-       if (!s.length())
+       if (!s.length() && require_fields)
                return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
                                grpc::string("missing plugin name"));
        sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
 
        s = msg.type();
-       if (!s.length())
+       if (!s.length() && require_fields)
                return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
                                grpc::string("missing type name"));
        sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
@@ -93,19 +128,59 @@ static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, valu
        return grpc::Status::OK;
 } /* unmarshal_ident() */
 
+static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
+{
+       auto id = msg->mutable_identifier();
+       marshal_ident(vl, id);
+
+       auto ds = plugin_get_ds(vl->type);
+       if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
+               return grpc::Status(grpc::StatusCode::INTERNAL,
+                               grpc::string("failed to retrieve data-set for values"));
+       }
+
+       auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
+       auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
+       msg->set_allocated_time(new google::protobuf::Timestamp(t));
+       msg->set_allocated_interval(new google::protobuf::Duration(d));
+
+       for (size_t i = 0; i < vl->values_len; ++i) {
+               auto v = msg->add_value();
+               switch (ds->ds[i].type) {
+                       case DS_TYPE_COUNTER:
+                               v->set_counter(vl->values[i].counter);
+                               break;
+                       case DS_TYPE_GAUGE:
+                               v->set_gauge(vl->values[i].gauge);
+                               break;
+                       case DS_TYPE_DERIVE:
+                               v->set_derive(vl->values[i].derive);
+                               break;
+                       case DS_TYPE_ABSOLUTE:
+                               v->set_absolute(vl->values[i].absolute);
+                               break;
+                       default:
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                               grpc::string("unknown value type"));
+               }
+       }
+
+       return grpc::Status::OK;
+} /* marshal_value_list */
+
 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
 {
        vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
        vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
 
-       auto status = unmarshal_ident(msg.identifier(), vl);
+       auto status = unmarshal_ident(msg.identifier(), vl, true);
        if (!status.ok())
                return status;
 
        value_t *values = NULL;
        size_t values_len = 0;
-       status = grpc::Status::OK;
 
+       status = grpc::Status::OK;
        for (auto v : msg.value()) {
                value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
                if (!val) {
@@ -170,28 +245,51 @@ static grpc::Status Process(grpc::ServerContext *ctx,
 } /* Process(): DispatchValues */
 
 static grpc::Status Process(grpc::ServerContext *ctx,
-               ListValuesRequest request, ListValuesReply *reply)
+               QueryValuesRequest request, QueryValuesReply *reply)
 {
-       char **names = NULL;
-       cdtime_t *times = NULL;
-       size_t i, n = 0;
+       uc_iter_t *iter;
+       char *name = NULL;
+
+       value_list_t matcher;
+       auto status = unmarshal_ident(request.identifier(), &matcher, false);
+       if (!status.ok())
+               return status;
 
-       if (uc_get_names(&names, &times, &n))
+       if ((iter = uc_get_iterator()) == NULL) {
                return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to retrieve values"));
-
-       for (i = 0; i < n; i++) {
-               auto v = reply->add_value();
-               auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
-               v->set_name(names[i]);
-               v->set_allocated_time(new google::protobuf::Timestamp(t));
-               sfree(names[i]);
+                               grpc::string("failed to query values: cannot create iterator"));
        }
-       sfree(names);
-       sfree(times);
+
+       while (uc_iterator_next(iter, &name) == 0) {
+               value_list_t res;
+               if (parse_identifier_vl(name, &res) != 0)
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to parse identifier"));
+
+               if (!ident_matches(&res, &matcher))
+                       continue;
+
+               if (uc_iterator_get_time(iter, &res.time) < 0)
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to retrieve value timestamp"));
+               if (uc_iterator_get_interval(iter, &res.interval) < 0)
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to retrieve value interval"));
+               if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                       grpc::string("failed to retrieve values"));
+
+               auto vl = reply->add_values();
+               status = marshal_value_list(&res, vl);
+               free(res.values);
+               if (!status.ok())
+                       return status;
+       }
+
+       uc_iterator_destroy(iter);
 
        return grpc::Status::OK;
-} /* Process(): ListValues */
+} /* Process(): QueryValues */
 
 class Call
 {
@@ -324,8 +422,8 @@ public:
                // Register request types.
                new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
                                &Collectd::AsyncService::RequestDispatchValues, cq_.get());
-               new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
-                               &Collectd::AsyncService::RequestListValues, cq_.get());
+               new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
+                               &Collectd::AsyncService::RequestQueryValues, cq_.get());
 
                while (true) {
                        void *req = NULL;