Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / grpc.cc
index 5a5899d..0ae80bb 100644 (file)
@@ -1,6 +1,7 @@
 /**
  * collectd - src/grpc.cc
  * Copyright (C) 2015-2016 Sebastian Harl
+ * Copyright (C) 2016      Florian octo Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
@@ -22,6 +23,7 @@
  *
  * Authors:
  *   Sebastian Harl <sh at tokkee.org>
+ *   Florian octo Forster <octo at collectd.org>
  **/
 
 #include <grpc++/grpc++.h>
@@ -40,17 +42,15 @@ extern "C" {
 
 #include "collectd.h"
 #include "common.h"
-#include "configfile.h"
 #include "plugin.h"
 
 #include "daemon/utils_cache.h"
 }
 
 using collectd::Collectd;
-using collectd::Dispatch;
 
-using collectd::DispatchValuesRequest;
-using collectd::DispatchValuesResponse;
+using collectd::PutValuesRequest;
+using collectd::PutValuesResponse;
 using collectd::QueryValuesRequest;
 using collectd::QueryValuesResponse;
 
@@ -265,15 +265,15 @@ class CollectdImpl : public collectd::Collectd::Service {
 public:
        grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
                value_list_t match;
-               auto err = unmarshal_ident(req->identifier(), &match, false);
-               if (!err.ok()) {
-                       return err;
+               auto status = unmarshal_ident(req->identifier(), &match, false);
+               if (!status.ok()) {
+                       return status;
                }
 
                std::queue<value_list_t> value_lists;
-               err = this->read(&match, &value_lists);
-               if (err.ok()) {
-                       err = this->write(ctx, writer, &value_lists);
+               status = this->queryValuesRead(&match, &value_lists);
+               if (status.ok()) {
+                       status = this->queryValuesWrite(ctx, writer, &value_lists);
                }
 
                while (!value_lists.empty()) {
@@ -282,11 +282,31 @@ public:
                        sfree(vl.values);
                }
 
-               return err;
+               return status;
+       }
+
+       grpc::Status PutValues(grpc::ServerContext *ctx,
+                                                  grpc::ServerReader<PutValuesRequest> *reader,
+                                                  PutValuesResponse *res) override {
+               PutValuesRequest req;
+
+               while (reader->Read(&req)) {
+                       value_list_t vl = {0};
+                       auto status = unmarshal_value_list(req.value_list(), &vl);
+                       if (!status.ok())
+                               return status;
+
+                       if (plugin_dispatch_values(&vl))
+                               return grpc::Status(grpc::StatusCode::INTERNAL,
+                                                                       grpc::string("failed to enqueue values for writing"));
+               }
+
+               res->Clear();
+               return grpc::Status::OK;
        }
 
 private:
-       grpc::Status read(value_list_t const *match, std::queue<value_list_t> *value_lists) {
+       grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
                uc_iter_t *iter;
                if ((iter = uc_get_iterator()) == NULL) {
                        return grpc::Status(grpc::StatusCode::INTERNAL,
@@ -329,7 +349,7 @@ private:
                return status;
        }
 
-       grpc::Status write(grpc::ServerContext *ctx,
+       grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
                                           grpc::ServerWriter<QueryValuesResponse> *writer,
                                           std::queue<value_list_t> *value_lists) {
                while (!value_lists->empty()) {
@@ -337,9 +357,9 @@ private:
                        QueryValuesResponse res;
                        res.Clear();
 
-                       auto err = marshal_value_list(&vl, res.mutable_value_list());
-                       if (!err.ok()) {
-                               return err;
+                       auto status = marshal_value_list(&vl, res.mutable_value_list());
+                       if (!status.ok()) {
+                               return status;
                        }
 
                        if (!writer->Write(res)) {
@@ -355,32 +375,6 @@ private:
 };
 
 /*
- * Dispatch service
- */
-class DispatchImpl : public collectd::Dispatch::Service {
-public:
-       grpc::Status DispatchValues(grpc::ServerContext *ctx,
-                                                               grpc::ServerReader<DispatchValuesRequest> *reader,
-                                                               DispatchValuesResponse *res) override {
-               DispatchValuesRequest req;
-
-               while (reader->Read(&req)) {
-                       value_list_t vl = VALUE_LIST_INIT;
-                       auto status = unmarshal_value_list(req.value_list(), &vl);
-                       if (!status.ok())
-                               return status;
-
-                       if (plugin_dispatch_values(&vl))
-                               return grpc::Status(grpc::StatusCode::INTERNAL,
-                                                                       grpc::string("failed to enqueue values for writing"));
-               }
-
-               res->Clear();
-               return grpc::Status::OK;
-       }
-};
-
-/*
  * gRPC server implementation
  */
 class CollectdServer final
@@ -413,7 +407,6 @@ public:
                }
 
                builder.RegisterService(&collectd_service_);
-               builder.RegisterService(&dispatch_service_);
 
                server_ = builder.BuildAndStart();
        } /* Start() */
@@ -425,17 +418,64 @@ public:
 
 private:
        CollectdImpl collectd_service_;
-       DispatchImpl dispatch_service_;
 
        std::unique_ptr<grpc::Server> server_;
 }; /* class CollectdServer */
 
+class CollectdClient final
+{
+public:
+       CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+       }
+
+       int PutValues(value_list_t const *vl) {
+               grpc::ClientContext ctx;
+
+               PutValuesRequest req;
+               auto status = marshal_value_list(vl, req.mutable_value_list());
+               if (!status.ok()) {
+                       ERROR("grpc: Marshalling value_list_t failed.");
+                       return -1;
+               }
+
+               PutValuesResponse res;
+               auto stream = stub_->PutValues(&ctx, &res);
+               if (!stream->Write(req)) {
+                       NOTICE("grpc: Broken stream.");
+                       /* intentionally not returning. */
+               }
+
+               stream->WritesDone();
+               status = stream->Finish();
+               if (!status.ok()) {
+                       ERROR ("grpc: Error while closing stream.");
+                       return -1;
+               }
+
+               return 0;
+       } /* int PutValues */
+
+private:
+       std::unique_ptr<Collectd::Stub> stub_;
+};
+
 static CollectdServer *server = nullptr;
 
 /*
  * collectd plugin interface
  */
 extern "C" {
+       static void c_grpc_destroy_write_callback (void *ptr) {
+               delete (CollectdClient *) ptr;
+       }
+
+       static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+                       value_list_t const *vl,
+                       user_data_t *ud) {
+               CollectdClient *c = (CollectdClient *) ud->data;
+               return c->PutValues(vl);
+       }
+
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -465,7 +505,7 @@ extern "C" {
                                        return -1;
                                }
                        }
-                       else if (!strcasecmp("SSLRootCerts", child->key)) {
+                       else if (!strcasecmp("SSLCACertificateFile", child->key)) {
                                char *certs = NULL;
                                if (cf_util_get_string(child, &certs)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -474,7 +514,7 @@ extern "C" {
                                }
                                ssl_opts->pem_root_certs = read_file(certs);
                        }
-                       else if (!strcasecmp("SSLServerKey", child->key)) {
+                       else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
                                char *key = NULL;
                                if (cf_util_get_string(child, &key)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -483,7 +523,7 @@ extern "C" {
                                }
                                pkcp.private_key = read_file(key);
                        }
-                       else if (!strcasecmp("SSLServerCert", child->key)) {
+                       else if (!strcasecmp("SSLCertificateFile", child->key)) {
                                char *cert = NULL;
                                if (cf_util_get_string(child, &cert)) {
                                        ERROR("grpc: Option `%s` expects a string value",
@@ -508,6 +548,78 @@ extern "C" {
                return 0;
        } /* c_grpc_config_listen() */
 
+       static int c_grpc_config_server(oconfig_item_t *ci)
+       {
+               if ((ci->values_num != 2)
+                               || (ci->values[0].type != OCONFIG_TYPE_STRING)
+                               || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+                       ERROR("grpc: The `%s` config option needs exactly "
+                                       "two string argument (address and port).", ci->key);
+                       return -1;
+               }
+
+               grpc::SslCredentialsOptions ssl_opts;
+               bool use_ssl = false;
+
+               for (int i = 0; i < ci->children_num; i++) {
+                       oconfig_item_t *child = ci->children + i;
+
+                       if (!strcasecmp("EnableSSL", child->key)) {
+                               if (cf_util_get_boolean(child, &use_ssl)) {
+                                       return -1;
+                               }
+                       }
+                       else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+                               char *certs = NULL;
+                               if (cf_util_get_string(child, &certs)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_root_certs = read_file(certs);
+                       }
+                       else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+                               char *key = NULL;
+                               if (cf_util_get_string(child, &key)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_private_key = read_file(key);
+                       }
+                       else if (!strcasecmp("SSLCertificateFile", child->key)) {
+                               char *cert = NULL;
+                               if (cf_util_get_string(child, &cert)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_cert_chain = read_file(cert);
+                       }
+                       else {
+                               WARNING("grpc: Option `%s` not allowed in <%s> block.",
+                                               child->key, ci->key);
+                       }
+               }
+
+               auto node    = grpc::string(ci->values[0].value.string);
+               auto service = grpc::string(ci->values[1].value.string);
+               auto addr    = node + ":" + service;
+
+               CollectdClient *client;
+               if (use_ssl) {
+                       auto channel_creds = grpc::SslCredentials(ssl_opts);
+                       auto channel = grpc::CreateChannel(addr, channel_creds);
+                       client = new CollectdClient(channel);
+               } else {
+                       auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+                       client = new CollectdClient(channel);
+               }
+
+               auto callback_name = grpc::string("grpc/") + addr;
+               user_data_t ud = {
+                       .data = client,
+                       .free_func = c_grpc_destroy_write_callback,
+               };
+
+               plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+               return 0;
+       } /* c_grpc_config_server() */
+
        static int c_grpc_config(oconfig_item_t *ci)
        {
                int i;
@@ -519,6 +631,11 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
+                       else if (!strcasecmp("Server", child->key)) {
+                               if (c_grpc_config_server(child))
+                                       return -1;
+                       }
+
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }