From 5aabca01574c7ab85d4dd85aa35e41f4297007d4 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 7 May 2016 00:15:49 +0200 Subject: [PATCH] grpc plugin: Replace the ListValues RPC with QueryValues. This is a more powerful function to query values from collectd's cache. --- proto/collectd.proto | 25 ++++----- proto/types.proto | 2 +- src/grpc.cc | 152 ++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 138 insertions(+), 41 deletions(-) diff --git a/proto/collectd.proto b/proto/collectd.proto index ba60793b..608fcbb1 100644 --- a/proto/collectd.proto +++ b/proto/collectd.proto @@ -1,5 +1,5 @@ // collectd - proto/collectd.proto -// Copyright (C) 2015 Sebastian Harl +// Copyright (C) 2015-2016 Sebastian Harl // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -33,8 +33,8 @@ service Collectd { // Dispatch collected values to collectd. rpc DispatchValues(DispatchValuesRequest) returns (DispatchValuesReply); - // Retrieve a list of all values available in collectd's value cache. - rpc ListValues(ListValuesRequest) returns (ListValuesReply); + // Query a list of values available from collectd's value cache. + rpc QueryValues(QueryValuesRequest) returns (QueryValuesReply); } // The arguments to DispatchValues. @@ -46,16 +46,15 @@ message DispatchValuesRequest { message DispatchValuesReply { } -// The arguments to ListValues. -message ListValuesRequest { +// The arguments to QueryValues. +message QueryValuesRequest { + // Query by the fields of the identifier. Only return values matching the + // specified shell wildcard patterns (see fnmatch(3)). Use '*' to match + // any value. + collectd.types.Identifier identifier = 1; } -// The response from ListValues. -message ListValuesReply { - message Value { - string name = 1; - google.protobuf.Timestamp time = 2; - } - - repeated Value value = 1; +// The response from QueryValues. +message QueryValuesReply { + repeated collectd.types.ValueList values = 1; } diff --git a/proto/types.proto b/proto/types.proto index 6e6714b6..4a852e42 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -1,5 +1,5 @@ // collectd - proto/types.proto -// Copyright (C) 2015 Sebastian Harl +// Copyright (C) 2015-2016 Sebastian Harl // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), diff --git a/src/grpc.cc b/src/grpc.cc index 45fb0290..4e470986 100644 --- a/src/grpc.cc +++ b/src/grpc.cc @@ -1,6 +1,6 @@ /** * collectd - src/grpc.cc - * Copyright (C) 2015 Sebastian Harl + * Copyright (C) 2015-2016 Sebastian Harl * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -30,6 +30,7 @@ #include "collectd.grpc.pb.h" extern "C" { +#include #include #include @@ -53,33 +54,67 @@ using collectd::Collectd; using collectd::DispatchValuesRequest; using collectd::DispatchValuesReply; -using collectd::ListValuesRequest; -using collectd::ListValuesReply; +using collectd::QueryValuesRequest; +using collectd::QueryValuesReply; using google::protobuf::util::TimeUtil; /* + * helper functions + */ + +static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) +{ + if (fnmatch(matcher->host, vl->host, 0)) + return false; + + if (fnmatch(matcher->plugin, vl->plugin, 0)) + return false; + if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0)) + return false; + + if (fnmatch(matcher->type, vl->type, 0)) + return false; + if (fnmatch(matcher->type_instance, vl->type_instance, 0)) + return false; + + return true; +} /* ident_matches */ + +/* * proto conversion */ -static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl) +static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg) +{ + msg->set_host(vl->host); + msg->set_plugin(vl->plugin); + if (vl->plugin_instance[0] != '\0') + msg->set_plugin_instance(vl->plugin_instance); + msg->set_type(vl->type); + if (vl->type_instance[0] != '\0') + msg->set_type_instance(vl->type_instance); +} /* marshal_ident */ + +static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl, + bool require_fields) { std::string s; s = msg.host(); - if (!s.length()) + if (!s.length() && require_fields) return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, grpc::string("missing host name")); sstrncpy(vl->host, s.c_str(), sizeof(vl->host)); s = msg.plugin(); - if (!s.length()) + if (!s.length() && require_fields) return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, grpc::string("missing plugin name")); sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin)); s = msg.type(); - if (!s.length()) + if (!s.length() && require_fields) return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, grpc::string("missing type name")); sstrncpy(vl->type, s.c_str(), sizeof(vl->type)); @@ -93,19 +128,59 @@ static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, valu return grpc::Status::OK; } /* unmarshal_ident() */ +static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg) +{ + auto id = msg->mutable_identifier(); + marshal_ident(vl, id); + + auto ds = plugin_get_ds(vl->type); + if ((ds == NULL) || (ds->ds_num != vl->values_len)) { + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve data-set for values")); + } + + auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time)); + auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval)); + msg->set_allocated_time(new google::protobuf::Timestamp(t)); + msg->set_allocated_interval(new google::protobuf::Duration(d)); + + for (size_t i = 0; i < vl->values_len; ++i) { + auto v = msg->add_value(); + switch (ds->ds[i].type) { + case DS_TYPE_COUNTER: + v->set_counter(vl->values[i].counter); + break; + case DS_TYPE_GAUGE: + v->set_gauge(vl->values[i].gauge); + break; + case DS_TYPE_DERIVE: + v->set_derive(vl->values[i].derive); + break; + case DS_TYPE_ABSOLUTE: + v->set_absolute(vl->values[i].absolute); + break; + default: + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("unknown value type")); + } + } + + return grpc::Status::OK; +} /* marshal_value_list */ + static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl) { vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time())); vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval())); - auto status = unmarshal_ident(msg.identifier(), vl); + auto status = unmarshal_ident(msg.identifier(), vl, true); if (!status.ok()) return status; value_t *values = NULL; size_t values_len = 0; - status = grpc::Status::OK; + status = grpc::Status::OK; for (auto v : msg.value()) { value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values)); if (!val) { @@ -170,28 +245,51 @@ static grpc::Status Process(grpc::ServerContext *ctx, } /* Process(): DispatchValues */ static grpc::Status Process(grpc::ServerContext *ctx, - ListValuesRequest request, ListValuesReply *reply) + QueryValuesRequest request, QueryValuesReply *reply) { - char **names = NULL; - cdtime_t *times = NULL; - size_t i, n = 0; + uc_iter_t *iter; + char *name = NULL; + + value_list_t matcher; + auto status = unmarshal_ident(request.identifier(), &matcher, false); + if (!status.ok()) + return status; - if (uc_get_names(&names, ×, &n)) + if ((iter = uc_get_iterator()) == NULL) { return grpc::Status(grpc::StatusCode::INTERNAL, - grpc::string("failed to retrieve values")); - - for (i = 0; i < n; i++) { - auto v = reply->add_value(); - auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i])); - v->set_name(names[i]); - v->set_allocated_time(new google::protobuf::Timestamp(t)); - sfree(names[i]); + grpc::string("failed to query values: cannot create iterator")); } - sfree(names); - sfree(times); + + while (uc_iterator_next(iter, &name) == 0) { + value_list_t res; + if (parse_identifier_vl(name, &res) != 0) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to parse identifier")); + + if (!ident_matches(&res, &matcher)) + continue; + + if (uc_iterator_get_time(iter, &res.time) < 0) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value timestamp")); + if (uc_iterator_get_interval(iter, &res.interval) < 0) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve value interval")); + if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) + return grpc::Status(grpc::StatusCode::INTERNAL, + grpc::string("failed to retrieve values")); + + auto vl = reply->add_values(); + status = marshal_value_list(&res, vl); + free(res.values); + if (!status.ok()) + return status; + } + + uc_iterator_destroy(iter); return grpc::Status::OK; -} /* Process(): ListValues */ +} /* Process(): QueryValues */ class Call { @@ -324,8 +422,8 @@ public: // Register request types. new RpcCall(&service_, &Collectd::AsyncService::RequestDispatchValues, cq_.get()); - new RpcCall(&service_, - &Collectd::AsyncService::RequestListValues, cq_.get()); + new RpcCall(&service_, + &Collectd::AsyncService::RequestQueryValues, cq_.get()); while (true) { void *req = NULL; -- 2.11.0