grpc plugin: Implement the <Server "host" "port"> option.
authorFlorian Forster <octo@collectd.org>
Fri, 12 Aug 2016 13:22:49 +0000 (15:22 +0200)
committerFlorian Forster <octo@collectd.org>
Fri, 12 Aug 2016 14:14:29 +0000 (16:14 +0200)
This allows the gRPC plugin to send metrics, via gRPC, to another
collectd instance or other implementation of the "Collectd" gRPC
service.

src/collectd.conf.in
src/collectd.conf.pod
src/grpc.cc

index e06465b..e1e02a8 100644 (file)
 #</Plugin>
 
 #<Plugin grpc>
-#      <Listen "0.0.0.0" "50051">
+#      <Server "example.com" "50051">
 #              EnableSSL true
 #              SSLRootCerts "/path/to/root.pem"
 #              SSLServerCert "/path/to/server.pem"
 #              SSLServerKey "/path/to/server.key"
+#      </Server>
+#      <Listen "0.0.0.0" "50051">
+#              EnableSSL true
+#              SSLCACertificateFile "/path/to/root.pem"
+#              SSLCertificateFile "/path/to/client.pem"
+#              SSLCertificateKeyFile "/path/to/client.key"
 #      </Listen>
 #</Plugin>
 
index 6d1547a..d8faba5 100644 (file)
@@ -2728,6 +2728,33 @@ The B<gRPC> homepage can be found at L<https://grpc.io/>.
 
 =over 4
 
+=item B<Server> I<Host> I<Port>
+
+The B<Server> statement sets the address of a server to which to send metrics
+via the C<DispatchValues> function.
+
+The argument I<Host> may be a hostname, an IPv4 address, or an IPv6 address.
+
+Optionally, B<Server> may be specified as a configuration block which supports
+the following options:
+
+=over 4
+
+=item B<EnableSSL> B<false>|B<true>
+
+Whether to require SSL for outgoing connections. Default: false.
+
+=item B<SSLCACertificateFile> I<Filename>
+
+=item B<SSLCertificateFile> I<Filename>
+
+=item B<SSLCertificateKeyFile> I<Filename>
+
+Filenames specifying SSL certificate and key material to be used with SSL
+connections.
+
+=back
+
 =item B<Listen> I<Host> I<Port>
 
 The B<Listen> statement sets the network address to bind to. When multiple
index a38abc1..70c8e6c 100644 (file)
@@ -420,12 +420,60 @@ private:
        std::unique_ptr<grpc::Server> server_;
 }; /* class CollectdServer */
 
+class CollectdClient final
+{
+public:
+       CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
+       }
+
+       int DispatchValues(value_list_t const *vl) {
+               grpc::ClientContext ctx;
+
+               DispatchValuesRequest req;
+               auto status = marshal_value_list(vl, req.mutable_value_list());
+               if (!status.ok()) {
+                       ERROR("grpc: Marshalling value_list_t failed.");
+                       return -1;
+               }
+
+               DispatchValuesResponse res;
+               auto stream = stub_->DispatchValues(&ctx, &res);
+               if (!stream->Write(req)) {
+                       NOTICE("grpc: Broken stream.");
+                       /* intentionally not returning. */
+               }
+
+               stream->WritesDone();
+               status = stream->Finish();
+               if (!status.ok()) {
+                       ERROR ("grpc: Error while closing stream.");
+                       return -1;
+               }
+
+               return 0;
+       } /* int DispatchValues */
+
+private:
+       std::unique_ptr<Collectd::Stub> stub_;
+};
+
 static CollectdServer *server = nullptr;
 
 /*
  * collectd plugin interface
  */
 extern "C" {
+       static void c_grpc_destroy_write_callback (void *ptr) {
+               delete (CollectdClient *) ptr;
+       }
+
+       static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
+                       value_list_t const *vl,
+                       user_data_t *ud) {
+               CollectdClient *c = (CollectdClient *) ud->data;
+               return c->DispatchValues(vl);
+       }
+
        static int c_grpc_config_listen(oconfig_item_t *ci)
        {
                if ((ci->values_num != 2)
@@ -498,6 +546,78 @@ extern "C" {
                return 0;
        } /* c_grpc_config_listen() */
 
+       static int c_grpc_config_server(oconfig_item_t *ci)
+       {
+               if ((ci->values_num != 2)
+                               || (ci->values[0].type != OCONFIG_TYPE_STRING)
+                               || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+                       ERROR("grpc: The `%s` config option needs exactly "
+                                       "two string argument (address and port).", ci->key);
+                       return -1;
+               }
+
+               grpc::SslCredentialsOptions ssl_opts;
+               bool use_ssl = false;
+
+               for (int i = 0; i < ci->children_num; i++) {
+                       oconfig_item_t *child = ci->children + i;
+
+                       if (!strcasecmp("EnableSSL", child->key)) {
+                               if (cf_util_get_boolean(child, &use_ssl)) {
+                                       return -1;
+                               }
+                       }
+                       else if (!strcasecmp("SSLCACertificateFile", child->key)) {
+                               char *certs = NULL;
+                               if (cf_util_get_string(child, &certs)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_root_certs = read_file(certs);
+                       }
+                       else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
+                               char *key = NULL;
+                               if (cf_util_get_string(child, &key)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_private_key = read_file(key);
+                       }
+                       else if (!strcasecmp("SSLCertificateFile", child->key)) {
+                               char *cert = NULL;
+                               if (cf_util_get_string(child, &cert)) {
+                                       return -1;
+                               }
+                               ssl_opts.pem_cert_chain = read_file(cert);
+                       }
+                       else {
+                               WARNING("grpc: Option `%s` not allowed in <%s> block.",
+                                               child->key, ci->key);
+                       }
+               }
+
+               auto node    = grpc::string(ci->values[0].value.string);
+               auto service = grpc::string(ci->values[1].value.string);
+               auto addr    = node + ":" + service;
+
+               CollectdClient *client;
+               if (use_ssl) {
+                       auto channel_creds = grpc::SslCredentials(ssl_opts);
+                       auto channel = grpc::CreateChannel(addr, channel_creds);
+                       client = new CollectdClient(channel);
+               } else {
+                       auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
+                       client = new CollectdClient(channel);
+               }
+
+               auto callback_name = grpc::string("grpc/") + addr;
+               user_data_t ud = {
+                       .data = client,
+                       .free_func = c_grpc_destroy_write_callback,
+               };
+
+               plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
+               return 0;
+       } /* c_grpc_config_server() */
+
        static int c_grpc_config(oconfig_item_t *ci)
        {
                int i;
@@ -509,6 +629,11 @@ extern "C" {
                                if (c_grpc_config_listen(child))
                                        return -1;
                        }
+                       else if (!strcasecmp("Server", child->key)) {
+                               if (c_grpc_config_server(child))
+                                       return -1;
+                       }
+
                        else {
                                WARNING("grpc: Option `%s` not allowed here.", child->key);
                        }