Merge branch 'collectd-5.5' into collectd-5.6
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <queue>
33 #include <vector>
34
35 #include "collectd.grpc.pb.h"
36
37 extern "C" {
38 #include <fnmatch.h>
39 #include <stdbool.h>
40
41 #include "collectd.h"
42 #include "common.h"
43 #include "plugin.h"
44
45 #include "daemon/utils_cache.h"
46 }
47
48 using collectd::Collectd;
49
50 using collectd::DispatchValuesRequest;
51 using collectd::DispatchValuesResponse;
52 using collectd::QueryValuesRequest;
53 using collectd::QueryValuesResponse;
54
55 using google::protobuf::util::TimeUtil;
56
57 /*
58  * private types
59  */
60
61 struct Listener {
62         grpc::string addr;
63         grpc::string port;
64
65         grpc::SslServerCredentialsOptions *ssl;
66 };
67 static std::vector<Listener> listeners;
68 static grpc::string default_addr("0.0.0.0:50051");
69
70 /*
71  * helper functions
72  */
73
74 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
75 {
76         if (fnmatch(matcher->host, vl->host, 0))
77                 return false;
78
79         if (fnmatch(matcher->plugin, vl->plugin, 0))
80                 return false;
81         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
82                 return false;
83
84         if (fnmatch(matcher->type, vl->type, 0))
85                 return false;
86         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
87                 return false;
88
89         return true;
90 } /* ident_matches */
91
92 static grpc::string read_file(const char *filename)
93 {
94         std::ifstream f;
95         grpc::string s, content;
96
97         f.open(filename);
98         if (!f.is_open()) {
99                 ERROR("grpc: Failed to open '%s'", filename);
100                 return "";
101         }
102
103         while (std::getline(f, s)) {
104                 content += s;
105                 content.push_back('\n');
106         }
107         f.close();
108         return content;
109 } /* read_file */
110
111 /*
112  * proto conversion
113  */
114
115 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
116 {
117         msg->set_host(vl->host);
118         msg->set_plugin(vl->plugin);
119         if (vl->plugin_instance[0] != '\0')
120                 msg->set_plugin_instance(vl->plugin_instance);
121         msg->set_type(vl->type);
122         if (vl->type_instance[0] != '\0')
123                 msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
125
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
127                 bool require_fields)
128 {
129         std::string s;
130
131         s = msg.host();
132         if (!s.length() && require_fields)
133                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
134                                 grpc::string("missing host name"));
135         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
136
137         s = msg.plugin();
138         if (!s.length() && require_fields)
139                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
140                                 grpc::string("missing plugin name"));
141         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
142
143         s = msg.type();
144         if (!s.length() && require_fields)
145                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
146                                 grpc::string("missing type name"));
147         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
148
149         s = msg.plugin_instance();
150         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
151
152         s = msg.type_instance();
153         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
154
155         return grpc::Status::OK;
156 } /* unmarshal_ident() */
157
158 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
159 {
160         auto id = msg->mutable_identifier();
161         marshal_ident(vl, id);
162
163         auto ds = plugin_get_ds(vl->type);
164         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
165                 return grpc::Status(grpc::StatusCode::INTERNAL,
166                                 grpc::string("failed to retrieve data-set for values"));
167         }
168
169         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
170         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
171         msg->set_allocated_time(new google::protobuf::Timestamp(t));
172         msg->set_allocated_interval(new google::protobuf::Duration(d));
173
174         for (size_t i = 0; i < vl->values_len; ++i) {
175                 auto v = msg->add_values();
176                 switch (ds->ds[i].type) {
177                         case DS_TYPE_COUNTER:
178                                 v->set_counter(vl->values[i].counter);
179                                 break;
180                         case DS_TYPE_GAUGE:
181                                 v->set_gauge(vl->values[i].gauge);
182                                 break;
183                         case DS_TYPE_DERIVE:
184                                 v->set_derive(vl->values[i].derive);
185                                 break;
186                         case DS_TYPE_ABSOLUTE:
187                                 v->set_absolute(vl->values[i].absolute);
188                                 break;
189                         default:
190                                 return grpc::Status(grpc::StatusCode::INTERNAL,
191                                                 grpc::string("unknown value type"));
192                 }
193
194                 auto name = msg->add_ds_names();
195                 name->assign(ds->ds[i].name);
196         }
197
198         return grpc::Status::OK;
199 } /* marshal_value_list */
200
201 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
202 {
203         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
204         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
205
206         auto status = unmarshal_ident(msg.identifier(), vl, true);
207         if (!status.ok())
208                 return status;
209
210         value_t *values = NULL;
211         size_t values_len = 0;
212
213         status = grpc::Status::OK;
214         for (auto v : msg.values()) {
215                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
216                 if (!val) {
217                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
218                                         grpc::string("failed to allocate values array"));
219                         break;
220                 }
221
222                 values = val;
223                 val = values + values_len;
224                 values_len++;
225
226                 switch (v.value_case()) {
227                 case collectd::types::Value::ValueCase::kCounter:
228                         val->counter = counter_t(v.counter());
229                         break;
230                 case collectd::types::Value::ValueCase::kGauge:
231                         val->gauge = gauge_t(v.gauge());
232                         break;
233                 case collectd::types::Value::ValueCase::kDerive:
234                         val->derive = derive_t(v.derive());
235                         break;
236                 case collectd::types::Value::ValueCase::kAbsolute:
237                         val->absolute = absolute_t(v.absolute());
238                         break;
239                 default:
240                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
241                                         grpc::string("unknown value type"));
242                         break;
243                 }
244
245                 if (!status.ok())
246                         break;
247         }
248         if (status.ok()) {
249                 vl->values = values;
250                 vl->values_len = values_len;
251         }
252         else if (values) {
253                 free(values);
254         }
255
256         return status;
257 } /* unmarshal_value_list() */
258
259 /*
260  * Collectd service
261  */
262 class CollectdImpl : public collectd::Collectd::Service {
263 public:
264         grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
265                 value_list_t match;
266                 auto status = unmarshal_ident(req->identifier(), &match, false);
267                 if (!status.ok()) {
268                         return status;
269                 }
270
271                 std::queue<value_list_t> value_lists;
272                 status = this->queryValuesRead(&match, &value_lists);
273                 if (status.ok()) {
274                         status = this->queryValuesWrite(ctx, writer, &value_lists);
275                 }
276
277                 while (!value_lists.empty()) {
278                         auto vl = value_lists.front();
279                         value_lists.pop();
280                         sfree(vl.values);
281                 }
282
283                 return status;
284         }
285
286         grpc::Status DispatchValues(grpc::ServerContext *ctx,
287                                                                 grpc::ServerReader<DispatchValuesRequest> *reader,
288                                                                 DispatchValuesResponse *res) override {
289                 DispatchValuesRequest req;
290
291                 while (reader->Read(&req)) {
292                         value_list_t vl = VALUE_LIST_INIT;
293                         auto status = unmarshal_value_list(req.value_list(), &vl);
294                         if (!status.ok())
295                                 return status;
296
297                         if (plugin_dispatch_values(&vl))
298                                 return grpc::Status(grpc::StatusCode::INTERNAL,
299                                                                         grpc::string("failed to enqueue values for writing"));
300                 }
301
302                 res->Clear();
303                 return grpc::Status::OK;
304         }
305
306 private:
307         grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
308                 uc_iter_t *iter;
309                 if ((iter = uc_get_iterator()) == NULL) {
310                         return grpc::Status(grpc::StatusCode::INTERNAL,
311                                                                 grpc::string("failed to query values: cannot create iterator"));
312                 }
313
314                 grpc::Status status = grpc::Status::OK;
315                 char *name = NULL;
316                 while (uc_iterator_next(iter, &name) == 0) {
317                         value_list_t vl;
318                         if (parse_identifier_vl(name, &vl) != 0) {
319                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
320                                                                           grpc::string("failed to parse identifier"));
321                                 break;
322                         }
323
324                         if (!ident_matches(&vl, match))
325                                 continue;
326
327                         if (uc_iterator_get_time(iter, &vl.time) < 0) {
328                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
329                                                                           grpc::string("failed to retrieve value timestamp"));
330                                 break;
331                         }
332                         if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
333                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
334                                                                           grpc::string("failed to retrieve value interval"));
335                                 break;
336                         }
337                         if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
338                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
339                                                                           grpc::string("failed to retrieve values"));
340                                 break;
341                         }
342
343                         value_lists->push(vl);
344                 } // while (uc_iterator_next(iter, &name) == 0)
345
346                 uc_iterator_destroy(iter);
347                 return status;
348         }
349
350         grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
351                                            grpc::ServerWriter<QueryValuesResponse> *writer,
352                                            std::queue<value_list_t> *value_lists) {
353                 while (!value_lists->empty()) {
354                         auto vl = value_lists->front();
355                         QueryValuesResponse res;
356                         res.Clear();
357
358                         auto status = marshal_value_list(&vl, res.mutable_value_list());
359                         if (!status.ok()) {
360                                 return status;
361                         }
362
363                         if (!writer->Write(res)) {
364                                 return grpc::Status::CANCELLED;
365                         }
366
367                         value_lists->pop();
368                         sfree(vl.values);
369                 }
370
371                 return grpc::Status::OK;
372         }
373 };
374
375 /*
376  * gRPC server implementation
377  */
378 class CollectdServer final
379 {
380 public:
381         void Start()
382         {
383                 auto auth = grpc::InsecureServerCredentials();
384
385                 grpc::ServerBuilder builder;
386
387                 if (listeners.empty()) {
388                         builder.AddListeningPort(default_addr, auth);
389                         INFO("grpc: Listening on %s", default_addr.c_str());
390                 }
391                 else {
392                         for (auto l : listeners) {
393                                 grpc::string addr = l.addr + ":" + l.port;
394
395                                 auto use_ssl = grpc::string("");
396                                 auto a = auth;
397                                 if (l.ssl != nullptr) {
398                                         use_ssl = grpc::string(" (SSL enabled)");
399                                         a = grpc::SslServerCredentials(*l.ssl);
400                                 }
401
402                                 builder.AddListeningPort(addr, a);
403                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
404                         }
405                 }
406
407                 builder.RegisterService(&collectd_service_);
408
409                 server_ = builder.BuildAndStart();
410         } /* Start() */
411
412         void Shutdown()
413         {
414                 server_->Shutdown();
415         } /* Shutdown() */
416
417 private:
418         CollectdImpl collectd_service_;
419
420         std::unique_ptr<grpc::Server> server_;
421 }; /* class CollectdServer */
422
423 class CollectdClient final
424 {
425 public:
426         CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
427         }
428
429         int DispatchValues(value_list_t const *vl) {
430                 grpc::ClientContext ctx;
431
432                 DispatchValuesRequest req;
433                 auto status = marshal_value_list(vl, req.mutable_value_list());
434                 if (!status.ok()) {
435                         ERROR("grpc: Marshalling value_list_t failed.");
436                         return -1;
437                 }
438
439                 DispatchValuesResponse res;
440                 auto stream = stub_->DispatchValues(&ctx, &res);
441                 if (!stream->Write(req)) {
442                         NOTICE("grpc: Broken stream.");
443                         /* intentionally not returning. */
444                 }
445
446                 stream->WritesDone();
447                 status = stream->Finish();
448                 if (!status.ok()) {
449                         ERROR ("grpc: Error while closing stream.");
450                         return -1;
451                 }
452
453                 return 0;
454         } /* int DispatchValues */
455
456 private:
457         std::unique_ptr<Collectd::Stub> stub_;
458 };
459
460 static CollectdServer *server = nullptr;
461
462 /*
463  * collectd plugin interface
464  */
465 extern "C" {
466         static void c_grpc_destroy_write_callback (void *ptr) {
467                 delete (CollectdClient *) ptr;
468         }
469
470         static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
471                         value_list_t const *vl,
472                         user_data_t *ud) {
473                 CollectdClient *c = (CollectdClient *) ud->data;
474                 return c->DispatchValues(vl);
475         }
476
477         static int c_grpc_config_listen(oconfig_item_t *ci)
478         {
479                 if ((ci->values_num != 2)
480                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
481                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
482                         ERROR("grpc: The `%s` config option needs exactly "
483                                         "two string argument (address and port).", ci->key);
484                         return -1;
485                 }
486
487                 auto listener = Listener();
488                 listener.addr = grpc::string(ci->values[0].value.string);
489                 listener.port = grpc::string(ci->values[1].value.string);
490                 listener.ssl = nullptr;
491
492                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
493                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
494                 bool use_ssl = false;
495
496                 for (int i = 0; i < ci->children_num; i++) {
497                         oconfig_item_t *child = ci->children + i;
498
499                         if (!strcasecmp("EnableSSL", child->key)) {
500                                 if (cf_util_get_boolean(child, &use_ssl)) {
501                                         ERROR("grpc: Option `%s` expects a boolean value",
502                                                         child->key);
503                                         return -1;
504                                 }
505                         }
506                         else if (!strcasecmp("SSLCACertificateFile", child->key)) {
507                                 char *certs = NULL;
508                                 if (cf_util_get_string(child, &certs)) {
509                                         ERROR("grpc: Option `%s` expects a string value",
510                                                         child->key);
511                                         return -1;
512                                 }
513                                 ssl_opts->pem_root_certs = read_file(certs);
514                         }
515                         else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
516                                 char *key = NULL;
517                                 if (cf_util_get_string(child, &key)) {
518                                         ERROR("grpc: Option `%s` expects a string value",
519                                                         child->key);
520                                         return -1;
521                                 }
522                                 pkcp.private_key = read_file(key);
523                         }
524                         else if (!strcasecmp("SSLCertificateFile", child->key)) {
525                                 char *cert = NULL;
526                                 if (cf_util_get_string(child, &cert)) {
527                                         ERROR("grpc: Option `%s` expects a string value",
528                                                         child->key);
529                                         return -1;
530                                 }
531                                 pkcp.cert_chain = read_file(cert);
532                         }
533                         else {
534                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
535                                                 child->key, ci->key);
536                         }
537                 }
538
539                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
540                 if (use_ssl)
541                         listener.ssl = ssl_opts;
542                 else
543                         delete(ssl_opts);
544
545                 listeners.push_back(listener);
546                 return 0;
547         } /* c_grpc_config_listen() */
548
549         static int c_grpc_config_server(oconfig_item_t *ci)
550         {
551                 if ((ci->values_num != 2)
552                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
553                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
554                         ERROR("grpc: The `%s` config option needs exactly "
555                                         "two string argument (address and port).", ci->key);
556                         return -1;
557                 }
558
559                 grpc::SslCredentialsOptions ssl_opts;
560                 bool use_ssl = false;
561
562                 for (int i = 0; i < ci->children_num; i++) {
563                         oconfig_item_t *child = ci->children + i;
564
565                         if (!strcasecmp("EnableSSL", child->key)) {
566                                 if (cf_util_get_boolean(child, &use_ssl)) {
567                                         return -1;
568                                 }
569                         }
570                         else if (!strcasecmp("SSLCACertificateFile", child->key)) {
571                                 char *certs = NULL;
572                                 if (cf_util_get_string(child, &certs)) {
573                                         return -1;
574                                 }
575                                 ssl_opts.pem_root_certs = read_file(certs);
576                         }
577                         else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
578                                 char *key = NULL;
579                                 if (cf_util_get_string(child, &key)) {
580                                         return -1;
581                                 }
582                                 ssl_opts.pem_private_key = read_file(key);
583                         }
584                         else if (!strcasecmp("SSLCertificateFile", child->key)) {
585                                 char *cert = NULL;
586                                 if (cf_util_get_string(child, &cert)) {
587                                         return -1;
588                                 }
589                                 ssl_opts.pem_cert_chain = read_file(cert);
590                         }
591                         else {
592                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
593                                                 child->key, ci->key);
594                         }
595                 }
596
597                 auto node    = grpc::string(ci->values[0].value.string);
598                 auto service = grpc::string(ci->values[1].value.string);
599                 auto addr    = node + ":" + service;
600
601                 CollectdClient *client;
602                 if (use_ssl) {
603                         auto channel_creds = grpc::SslCredentials(ssl_opts);
604                         auto channel = grpc::CreateChannel(addr, channel_creds);
605                         client = new CollectdClient(channel);
606                 } else {
607                         auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
608                         client = new CollectdClient(channel);
609                 }
610
611                 auto callback_name = grpc::string("grpc/") + addr;
612                 user_data_t ud = {
613                         .data = client,
614                         .free_func = c_grpc_destroy_write_callback,
615                 };
616
617                 plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
618                 return 0;
619         } /* c_grpc_config_server() */
620
621         static int c_grpc_config(oconfig_item_t *ci)
622         {
623                 int i;
624
625                 for (i = 0; i < ci->children_num; i++) {
626                         oconfig_item_t *child = ci->children + i;
627
628                         if (!strcasecmp("Listen", child->key)) {
629                                 if (c_grpc_config_listen(child))
630                                         return -1;
631                         }
632                         else if (!strcasecmp("Server", child->key)) {
633                                 if (c_grpc_config_server(child))
634                                         return -1;
635                         }
636
637                         else {
638                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
639                         }
640                 }
641
642                 return 0;
643         } /* c_grpc_config() */
644
645         static int c_grpc_init(void)
646         {
647                 server = new CollectdServer();
648                 if (!server) {
649                         ERROR("grpc: Failed to create server");
650                         return -1;
651                 }
652
653                 server->Start();
654                 return 0;
655         } /* c_grpc_init() */
656
657         static int c_grpc_shutdown(void)
658         {
659                 if (!server)
660                         return 0;
661
662                 server->Shutdown();
663
664                 delete server;
665                 server = nullptr;
666
667                 return 0;
668         } /* c_grpc_shutdown() */
669
670         void module_register(void)
671         {
672                 plugin_register_complex_config("grpc", c_grpc_config);
673                 plugin_register_init("grpc", c_grpc_init);
674                 plugin_register_shutdown("grpc", c_grpc_shutdown);
675         } /* module_register() */
676 } /* extern "C" */
677
678 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */