Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / grpc.cc
index 9c4f258..0ae80bb 100644 (file)
@@ -1,6 +1,7 @@
 /**
  * collectd - src/grpc.cc
  * Copyright (C) 2015-2016 Sebastian Harl
+ * Copyright (C) 2016      Florian octo Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
@@ -22,6 +23,7 @@
  *
  * Authors:
  *   Sebastian Harl <sh at tokkee.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #include <grpc++/grpc++.h>
@@ -29,6 +31,7 @@
 
 #include <fstream>
 #include <iostream>
+#include <queue>
 #include <vector>
 
 #include "collectd.grpc.pb.h"
@@ -39,17 +42,15 @@ extern "C" {
 
 #include "collectd.h"
 #include "common.h"
-#include "configfile.h"
 #include "plugin.h"
 
 #include "daemon/utils_cache.h"
 }
 
 using collectd::Collectd;
-using collectd::Dispatch;
 
-using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesResponse;
+using collectd::PutValuesRequest;
+using collectd::PutValuesResponse;
 using collectd::QueryValuesRequest;
 using collectd::QueryValuesResponse;
 
@@ -258,189 +259,119 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
 } /* unmarshal_value_list() */
 
 /*
- * request call-backs and call objects
+ * Collectd service
  */
-static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesResponse *reply)
-{
-       value_list_t vl = VALUE_LIST_INIT;
-       auto status = unmarshal_value_list(request.value_list(), &vl);
-       if (!status.ok())
-               return status;
-
-       if (plugin_dispatch_values(&vl))
-               status = grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to enqueue values for writing"));
+class CollectdImpl : public collectd::Collectd::Service {
+public:
+       grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
+               value_list_t match;
+               auto status = unmarshal_ident(req->identifier(), &match, false);
+               if (!status.ok()) {
+                       return status;
+               }
 
-       reply->Clear();
-       return status;
-} /* grpc::Status DispatchValue */
+               std::queue<value_list_t> value_lists;
+               status = this->queryValuesRead(&match, &value_lists);
+               if (status.ok()) {
+                       status = this->queryValuesWrite(ctx, writer, &value_lists);
+               }
 
-static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesResponse *res)
-{
-       uc_iter_t *iter;
-       char *name = NULL;
+               while (!value_lists.empty()) {
+                       auto vl = value_lists.front();
+                       value_lists.pop();
+                       sfree(vl.values);
+               }
 
-       value_list_t matcher;
-       auto status = unmarshal_ident(req.identifier(), &matcher, false);
-       if (!status.ok())
                return status;
-
-       if ((iter = uc_get_iterator()) == NULL) {
-               return grpc::Status(grpc::StatusCode::INTERNAL,
-                               grpc::string("failed to query values: cannot create iterator"));
        }
 
-       status = grpc::Status::OK;
-       while (uc_iterator_next(iter, &name) == 0) {
-               value_list_t vl;
-               if (parse_identifier_vl(name, &vl) != 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to parse identifier"));
-                       break;
-               }
+       grpc::Status PutValues(grpc::ServerContext *ctx,
+                                                  grpc::ServerReader<PutValuesRequest> *reader,
+                                                  PutValuesResponse *res) override {
+               PutValuesRequest req;
 
-               if (!ident_matches(&vl, &matcher))
-                       continue;
+               while (reader->Read(&req)) {
+                       value_list_t vl = {0};
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
 
-               if (uc_iterator_get_time(iter, &vl.time) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value timestamp"));
-                       break;
-               }
-               if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve value interval"));
-                       break;
-               }
-               if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
-                       status = grpc::Status(grpc::StatusCode::INTERNAL,
-                                       grpc::string("failed to retrieve values"));
-                       break;
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
                }
 
-               auto pb_vl = res->add_value_lists();
-               status = marshal_value_list(&vl, pb_vl);
-               free(vl.values);
-               if (!status.ok())
-                       break;
+               res->Clear();
+               return grpc::Status::OK;
        }
 
-       uc_iterator_destroy(iter);
+private:
+       grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+               uc_iter_t *iter;
+               if ((iter = uc_get_iterator()) == NULL) {
+                       return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                               grpc::string("failed to query values: cannot create iterator"));
+               }
 
-       return status;
-} /* grpc::Status QueryValues */
+               grpc::Status status = grpc::Status::OK;
+               char *name = NULL;
+               while (uc_iterator_next(iter, &name) == 0) {
+                       value_list_t vl;
+                       if (parse_identifier_vl(name, &vl) != 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to parse identifier"));
+                               break;
+                       }
 
-// CallData is the abstract base class for asynchronous calls.
-class CallData {
-public:
-  virtual ~CallData() {}
-  virtual void process(bool ok) = 0;
+                       if (!ident_matches(&vl, match))
+                               continue;
 
-protected:
-  CallData() {}
+                       if (uc_iterator_get_time(iter, &vl.time) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value timestamp"));
+                               break;
+                       }
+                       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve value interval"));
+                               break;
+                       }
+                       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
+                               status = grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                         grpc::string("failed to retrieve values"));
+                               break;
+                       }
 
-private:
-  CallData(const CallData&) = delete;
-  CallData& operator=(const CallData&) = delete;
-};
+                       value_lists->push(vl);
+               } // while (uc_iterator_next(iter, &name) == 0)
 
-/*
- * Collectd service
- */
-// QueryValuesCallData holds the state and implements the logic for QueryValues calls.
-class QueryValuesCallData : public CallData {
-public:
-       QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), writer_(&context_) {
-               // As part of the initialization, we *request* that the system start
-               // processing QueryValues requests. In this request, "this" acts as
-               // the tag uniquely identifying the request (so that different
-               // QueryValuesCallData instances can serve different requests
-               // concurrently), in this case the memory address of this
-               // QueryValuesCallData instance.
-               service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
+               uc_iterator_destroy(iter);
+               return status;
        }
 
-       void process(bool ok) final {
-               if (done_) {
-                       delete this;
-               } else {
-                       // Spawn a new QueryValuesCallData instance to serve new clients
-                       // while we process the one for this QueryValuesCallData. The
-                       // instance will deallocate itself as part of its FINISH state.
-                       new QueryValuesCallData(service_, cq_);
+       grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
+                                          grpc::ServerWriter<QueryValuesResponse> *writer,
+                                          std::queue<value_list_t> *value_lists) {
+               while (!value_lists->empty()) {
+                       auto vl = value_lists->front();
+                       QueryValuesResponse res;
+                       res.Clear();
 
-                       auto status = QueryValues(&context_, request_, &response_);
+                       auto status = marshal_value_list(&vl, res.mutable_value_list());
                        if (!status.ok()) {
-                               writer_.FinishWithError(status, this);
-                       } else {
-                               writer_.Finish(response_, grpc::Status::OK, this);
+                               return status;
                        }
 
-                       done_ = true;
-               }
-       }
-
-private:
-       bool done_ = false;
-       grpc::ServerContext context_;
-       grpc::ServerCompletionQueue* cq_;
-       Collectd::AsyncService* service_;
-       QueryValuesRequest request_;
-       QueryValuesResponse response_;
-       grpc::ServerAsyncResponseWriter<QueryValuesResponse> writer_;
-};
-
-/*
- * Dispatch service
- */
-// DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
-class DispatchValuesCallData : public CallData {
-public:
-       DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
-                       : cq_(cq), service_(service), reader_(&context_) {
-               process(true);
-       }
+                       if (!writer->Write(res)) {
+                               return grpc::Status::CANCELLED;
+                       }
 
-       void process(bool ok) final {
-               if (status == Status::INIT) {
-                       service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
-                       status = Status::CALL;
-               } else if (status == Status::CALL) {
-                       reader_.Read(&request_, this);
-                       status = Status::READ;
-               } else if (status == Status::READ && ok) {
-                       (void) DispatchValue(&context_, request_, &response_);
-
-                       reader_.Read(&request_, this);
-               } else if (status == Status::READ) {
-                       response_.Clear();
-
-                       status = Status::DONE;
-               } else if (status == Status::DONE) {
-                       new DispatchValuesCallData(service_, cq_);
-                       delete this;
-               } else {
-                       ERROR("grpc: DispatchValuesCallData: invalid state");
+                       value_lists->pop();
+                       sfree(vl.values);
                }
-       }
 
-private:
-       enum class Status {
-               INIT,
-               CALL,
-               READ,
-               DONE,
-       };
-       Status status = Status::INIT;
-
-       grpc::ServerContext          context_;
-       grpc::ServerCompletionQueue* cq_;
-       Dispatch::AsyncService*      service_;
-
-       DispatchValuesRequest request_;
-       DispatchValuesResponse response_;
-       grpc::ServerAsyncReader<DispatchValuesResponse, DispatchValuesRequest> reader_;
+               return grpc::Status::OK;
+       }
 };
 
 /*
@@ -475,45 +406,58 @@ public:
                        }
                }
 
-               cq_ = builder.AddCompletionQueue();
-
                builder.RegisterService(&collectd_service_);
-               builder.RegisterService(&dispatch_service_);
 
                server_ = builder.BuildAndStart();
-               new QueryValuesCallData(&collectd_service_, cq_.get());
-               new DispatchValuesCallData(&dispatch_service_, cq_.get());
        } /* Start() */
 
        void Shutdown()
        {
                server_->Shutdown();
-               cq_->Shutdown();
        } /* Shutdown() */
 
-       void Mainloop()
-       {
-               while (true) {
-                       void *tag = NULL;
-                       bool ok = false;
+private:
+       CollectdImpl collectd_service_;
+
+       std::unique_ptr<grpc::Server> server_;
+}; /* class CollectdServer */
+
+class CollectdClient final
+{
+public:
+       CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+       }
+
+       int PutValues(value_list_t const *vl) {
+               grpc::ClientContext ctx;
 
-                       // Block waiting to read the next event from the completion queue.
-                       // The event is uniquely identified by its tag, which in this case
-                       // is the memory address of a CallData instance.
-                       if (!cq_->Next(&tag, &ok))
-                               break; // Queue shut down.
+               PutValuesRequest req;
+               auto status = marshal_value_list(vl, req.mutable_value_list());
+               if (!status.ok()) {
+                       ERROR("grpc: Marshalling value_list_t failed.");
+                       return -1;
+               }
 
-                       static_cast<CallData*>(tag)->process(ok);
+               PutValuesResponse res;
+               auto stream = stub_->PutValues(&ctx, &res);
+               if (!stream->Write(req)) {
+                       NOTICE("grpc: Broken stream.");
+                       /* intentionally not returning. */
                }
-       } /* Mainloop() */
 
-private:
-       Collectd::AsyncService collectd_service_;
-       Dispatch::AsyncService dispatch_service_;
+               stream->WritesDone();
+               status = stream->Finish();
+               if (!status.ok()) {
+                       ERROR ("grpc: Error while closing stream.");
+                       return -1;
+               }
 
-       std::unique_ptr<grpc::Server> server_;
-       std::unique_ptr<grpc::ServerCompletionQueue> cq_;
-}; /* class CollectdServer */
+               return 0;
+       } /* int PutValues */
+
+private:
+       std::unique_ptr<Collectd::Stub> stub_;
+};
 
 static CollectdServer *server = nullptr;
 
@@ -521,15 +465,16 @@ static CollectdServer *server = nullptr;
  * collectd plugin interface
  */
 extern "C" {
-       static pthread_t *workers;
-       static size_t workers_num = 5;
+       static void c_grpc_destroy_write_callback (void *ptr) {
+               delete (CollectdClient *) ptr;
+       }
 
-       static void *worker_thread(void *arg)
-       {
-               CollectdServer *s = (CollectdServer *)arg;
-               s->Mainloop();
-               return NULL;
-       } /* worker_thread() */
+       static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+                       value_list_t const *vl,
+                       user_data_t *ud) {
+               CollectdClient *c = (CollectdClient *) ud->data;
+               return c->PutValues(vl);
+       }
 
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
@@ -560,7 +505,7 @@ extern "C" {
                                        return -1;
                                }
                        }
-                       else if (!strcasecmp("SSLRootCerts", child->key)) {
+                       else if (!strcasecmp("SSLCACertificateFile", child->key)) {
                                char *certs = NULL;
                                if (cf_util_get_string(child, &certs)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -569,7 +514,7 @@ extern "C" {
                                }
                                ssl_opts->pem_root_certs = read_file(certs);
                        }
-                       else if (!strcasecmp("SSLServerKey", child->key)) {
+                       else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
                                char *key = NULL;
                                if (cf_util_get_string(child, &key)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -578,7 +523,7 @@ extern "C" {
                                }
                                pkcp.private_key = read_file(key);
                        }
-                       else if (!strcasecmp("SSLServerCert", child->key)) {
+                       else if (!strcasecmp("SSLCertificateFile", child->key)) {
                                char *cert = NULL;
                                if (cf_util_get_string(child, &cert)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -603,6 +548,78 @@ extern "C" {
                return 0;
        } /* c_grpc_config_listen() */
 
+       static int c_grpc_config_server(oconfig_item_t *ci)
+       {
+               if ((ci->values_num != 2)
+                               || (ci->values[0].type != OCONFIG_TYPE_STRING)
+                               || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+                       ERROR("grpc: The `%s` config option needs exactly "
+                                       "two string argument (address and port).", ci->key);
+                       return -1;
+               }
+
+               grpc::SslCredentialsOptions ssl_opts;
+               bool use_ssl = false;
+
+               for (int i = 0; i < ci->children_num; i++) {
+                       oconfig_item_t *child = ci->children + i;
+
+                       if (!strcasecmp("EnableSSL", child->key)) {
+                               if (cf_util_get_boolean(child, &use_ssl)) {
+                                       return -1;
+                               }
+                       }
+                       else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+                               char *certs = NULL;
+                               if (cf_util_get_string(child, &certs)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_root_certs = read_file(certs);
+                       }
+                       else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+                               char *key = NULL;
+                               if (cf_util_get_string(child, &key)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_private_key = read_file(key);
+                       }
+                       else if (!strcasecmp("SSLCertificateFile", child->key)) {
+                               char *cert = NULL;
+                               if (cf_util_get_string(child, &cert)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_cert_chain = read_file(cert);
+                       }
+                       else {
+                               WARNING("grpc: Option `%s` not allowed in <%s> block.",
+                                               child->key, ci->key);
+                       }
+               }
+
+               auto node    = grpc::string(ci->values[0].value.string);
+               auto service = grpc::string(ci->values[1].value.string);
+               auto addr    = node + ":" + service;
+
+               CollectdClient *client;
+               if (use_ssl) {
+                       auto channel_creds = grpc::SslCredentials(ssl_opts);
+                       auto channel = grpc::CreateChannel(addr, channel_creds);
+                       client = new CollectdClient(channel);
+               } else {
+                       auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+                       client = new CollectdClient(channel);
+               }
+
+               auto callback_name = grpc::string("grpc/") + addr;
+               user_data_t ud = {
+                       .data = client,
+                       .free_func = c_grpc_destroy_write_callback,
+               };
+
+               plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+               return 0;
+       } /* c_grpc_config_server() */
+
        static int c_grpc_config(oconfig_item_t *ci)
        {
                int i;
@@ -614,12 +631,11 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
-                       else if (!strcasecmp("WorkerThreads", child->key)) {
-                               int n;
-                               if (cf_util_get_int(child, &n))
+                       else if (!strcasecmp("Server", child->key)) {
+                               if (c_grpc_config_server(child))
                                        return -1;
-                               workers_num = (size_t)n;
                        }
+
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }
@@ -631,47 +647,22 @@ extern "C" {
        static int c_grpc_init(void)
        {
                server = new CollectdServer();
-               size_t i;
-
-               if (! server) {
+               if (!server) {
                        ERROR("grpc: Failed to create server");
                        return -1;
                }
 
-               workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
-               if (! workers) {
-                       delete server;
-                       server = nullptr;
-
-                       ERROR("grpc: Failed to allocate worker threads");
-                       return -1;
-               }
-
                server->Start();
-               for (i = 0; i < workers_num; i++) {
-                       plugin_thread_create(&workers[i], /* attr = */ NULL,
-                                       worker_thread, server);
-               }
-               INFO("grpc: Started %zu workers", workers_num);
                return 0;
        } /* c_grpc_init() */
 
        static int c_grpc_shutdown(void)
        {
-               size_t i;
-
                if (!server)
-                       return -1;
+                       return 0;
 
                server->Shutdown();
 
-               INFO("grpc: Waiting for %zu workers to terminate", workers_num);
-               for (i = 0; i < workers_num; i++)
-                       pthread_join(workers[i], NULL);
-               free(workers);
-               workers = NULL;
-               workers_num = 0;
-
                delete server;
                server = nullptr;