Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  * Copyright (C) 2016      Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastian Harl <sh at tokkee.org>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 #include <grpc++/grpc++.h>
30 #include <google/protobuf/util/time_util.h>
31
32 #include <fstream>
33 #include <iostream>
34 #include <queue>
35 #include <vector>
36
37 #include "collectd.grpc.pb.h"
38
39 extern "C" {
40 #include <fnmatch.h>
41 #include <stdbool.h>
42
43 #include "collectd.h"
44 #include "common.h"
45 #include "plugin.h"
46
47 #include "daemon/utils_cache.h"
48 }
49
50 using collectd::Collectd;
51
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 /*
60  * private types
61  */
62
63 struct Listener {
64         grpc::string addr;
65         grpc::string port;
66
67         grpc::SslServerCredentialsOptions *ssl;
68 };
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
71
72 /*
73  * helper functions
74  */
75
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
77 {
78         if (fnmatch(matcher->host, vl->host, 0))
79                 return false;
80
81         if (fnmatch(matcher->plugin, vl->plugin, 0))
82                 return false;
83         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
84                 return false;
85
86         if (fnmatch(matcher->type, vl->type, 0))
87                 return false;
88         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
89                 return false;
90
91         return true;
92 } /* ident_matches */
93
94 static grpc::string read_file(const char *filename)
95 {
96         std::ifstream f;
97         grpc::string s, content;
98
99         f.open(filename);
100         if (!f.is_open()) {
101                 ERROR("grpc: Failed to open '%s'", filename);
102                 return "";
103         }
104
105         while (std::getline(f, s)) {
106                 content += s;
107                 content.push_back('\n');
108         }
109         f.close();
110         return content;
111 } /* read_file */
112
113 /*
114  * proto conversion
115  */
116
117 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
118 {
119         msg->set_host(vl->host);
120         msg->set_plugin(vl->plugin);
121         if (vl->plugin_instance[0] != '\0')
122                 msg->set_plugin_instance(vl->plugin_instance);
123         msg->set_type(vl->type);
124         if (vl->type_instance[0] != '\0')
125                 msg->set_type_instance(vl->type_instance);
126 } /* marshal_ident */
127
128 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
129                 bool require_fields)
130 {
131         std::string s;
132
133         s = msg.host();
134         if (!s.length() && require_fields)
135                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136                                 grpc::string("missing host name"));
137         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
138
139         s = msg.plugin();
140         if (!s.length() && require_fields)
141                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
142                                 grpc::string("missing plugin name"));
143         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
144
145         s = msg.type();
146         if (!s.length() && require_fields)
147                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
148                                 grpc::string("missing type name"));
149         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
150
151         s = msg.plugin_instance();
152         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
153
154         s = msg.type_instance();
155         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
156
157         return grpc::Status::OK;
158 } /* unmarshal_ident() */
159
160 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
161 {
162         auto id = msg->mutable_identifier();
163         marshal_ident(vl, id);
164
165         auto ds = plugin_get_ds(vl->type);
166         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
167                 return grpc::Status(grpc::StatusCode::INTERNAL,
168                                 grpc::string("failed to retrieve data-set for values"));
169         }
170
171         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
172         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
173         msg->set_allocated_time(new google::protobuf::Timestamp(t));
174         msg->set_allocated_interval(new google::protobuf::Duration(d));
175
176         for (size_t i = 0; i < vl->values_len; ++i) {
177                 auto v = msg->add_values();
178                 switch (ds->ds[i].type) {
179                         case DS_TYPE_COUNTER:
180                                 v->set_counter(vl->values[i].counter);
181                                 break;
182                         case DS_TYPE_GAUGE:
183                                 v->set_gauge(vl->values[i].gauge);
184                                 break;
185                         case DS_TYPE_DERIVE:
186                                 v->set_derive(vl->values[i].derive);
187                                 break;
188                         case DS_TYPE_ABSOLUTE:
189                                 v->set_absolute(vl->values[i].absolute);
190                                 break;
191                         default:
192                                 return grpc::Status(grpc::StatusCode::INTERNAL,
193                                                 grpc::string("unknown value type"));
194                 }
195
196                 auto name = msg->add_ds_names();
197                 name->assign(ds->ds[i].name);
198         }
199
200         return grpc::Status::OK;
201 } /* marshal_value_list */
202
203 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
204 {
205         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
206         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
207
208         auto status = unmarshal_ident(msg.identifier(), vl, true);
209         if (!status.ok())
210                 return status;
211
212         value_t *values = NULL;
213         size_t values_len = 0;
214
215         status = grpc::Status::OK;
216         for (auto v : msg.values()) {
217                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
218                 if (!val) {
219                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
220                                         grpc::string("failed to allocate values array"));
221                         break;
222                 }
223
224                 values = val;
225                 val = values + values_len;
226                 values_len++;
227
228                 switch (v.value_case()) {
229                 case collectd::types::Value::ValueCase::kCounter:
230                         val->counter = counter_t(v.counter());
231                         break;
232                 case collectd::types::Value::ValueCase::kGauge:
233                         val->gauge = gauge_t(v.gauge());
234                         break;
235                 case collectd::types::Value::ValueCase::kDerive:
236                         val->derive = derive_t(v.derive());
237                         break;
238                 case collectd::types::Value::ValueCase::kAbsolute:
239                         val->absolute = absolute_t(v.absolute());
240                         break;
241                 default:
242                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
243                                         grpc::string("unknown value type"));
244                         break;
245                 }
246
247                 if (!status.ok())
248                         break;
249         }
250         if (status.ok()) {
251                 vl->values = values;
252                 vl->values_len = values_len;
253         }
254         else if (values) {
255                 free(values);
256         }
257
258         return status;
259 } /* unmarshal_value_list() */
260
261 /*
262  * Collectd service
263  */
264 class CollectdImpl : public collectd::Collectd::Service {
265 public:
266         grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
267                 value_list_t match;
268                 auto status = unmarshal_ident(req->identifier(), &match, false);
269                 if (!status.ok()) {
270                         return status;
271                 }
272
273                 std::queue<value_list_t> value_lists;
274                 status = this->queryValuesRead(&match, &value_lists);
275                 if (status.ok()) {
276                         status = this->queryValuesWrite(ctx, writer, &value_lists);
277                 }
278
279                 while (!value_lists.empty()) {
280                         auto vl = value_lists.front();
281                         value_lists.pop();
282                         sfree(vl.values);
283                 }
284
285                 return status;
286         }
287
288         grpc::Status PutValues(grpc::ServerContext *ctx,
289                                                    grpc::ServerReader<PutValuesRequest> *reader,
290                                                    PutValuesResponse *res) override {
291                 PutValuesRequest req;
292
293                 while (reader->Read(&req)) {
294                         value_list_t vl = {0};
295                         auto status = unmarshal_value_list(req.value_list(), &vl);
296                         if (!status.ok())
297                                 return status;
298
299                         if (plugin_dispatch_values(&vl))
300                                 return grpc::Status(grpc::StatusCode::INTERNAL,
301                                                                         grpc::string("failed to enqueue values for writing"));
302                 }
303
304                 res->Clear();
305                 return grpc::Status::OK;
306         }
307
308 private:
309         grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
310                 uc_iter_t *iter;
311                 if ((iter = uc_get_iterator()) == NULL) {
312                         return grpc::Status(grpc::StatusCode::INTERNAL,
313                                                                 grpc::string("failed to query values: cannot create iterator"));
314                 }
315
316                 grpc::Status status = grpc::Status::OK;
317                 char *name = NULL;
318                 while (uc_iterator_next(iter, &name) == 0) {
319                         value_list_t vl;
320                         if (parse_identifier_vl(name, &vl) != 0) {
321                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
322                                                                           grpc::string("failed to parse identifier"));
323                                 break;
324                         }
325
326                         if (!ident_matches(&vl, match))
327                                 continue;
328
329                         if (uc_iterator_get_time(iter, &vl.time) < 0) {
330                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
331                                                                           grpc::string("failed to retrieve value timestamp"));
332                                 break;
333                         }
334                         if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
335                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
336                                                                           grpc::string("failed to retrieve value interval"));
337                                 break;
338                         }
339                         if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
340                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
341                                                                           grpc::string("failed to retrieve values"));
342                                 break;
343                         }
344
345                         value_lists->push(vl);
346                 } // while (uc_iterator_next(iter, &name) == 0)
347
348                 uc_iterator_destroy(iter);
349                 return status;
350         }
351
352         grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
353                                            grpc::ServerWriter<QueryValuesResponse> *writer,
354                                            std::queue<value_list_t> *value_lists) {
355                 while (!value_lists->empty()) {
356                         auto vl = value_lists->front();
357                         QueryValuesResponse res;
358                         res.Clear();
359
360                         auto status = marshal_value_list(&vl, res.mutable_value_list());
361                         if (!status.ok()) {
362                                 return status;
363                         }
364
365                         if (!writer->Write(res)) {
366                                 return grpc::Status::CANCELLED;
367                         }
368
369                         value_lists->pop();
370                         sfree(vl.values);
371                 }
372
373                 return grpc::Status::OK;
374         }
375 };
376
377 /*
378  * gRPC server implementation
379  */
380 class CollectdServer final
381 {
382 public:
383         void Start()
384         {
385                 auto auth = grpc::InsecureServerCredentials();
386
387                 grpc::ServerBuilder builder;
388
389                 if (listeners.empty()) {
390                         builder.AddListeningPort(default_addr, auth);
391                         INFO("grpc: Listening on %s", default_addr.c_str());
392                 }
393                 else {
394                         for (auto l : listeners) {
395                                 grpc::string addr = l.addr + ":" + l.port;
396
397                                 auto use_ssl = grpc::string("");
398                                 auto a = auth;
399                                 if (l.ssl != nullptr) {
400                                         use_ssl = grpc::string(" (SSL enabled)");
401                                         a = grpc::SslServerCredentials(*l.ssl);
402                                 }
403
404                                 builder.AddListeningPort(addr, a);
405                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
406                         }
407                 }
408
409                 builder.RegisterService(&collectd_service_);
410
411                 server_ = builder.BuildAndStart();
412         } /* Start() */
413
414         void Shutdown()
415         {
416                 server_->Shutdown();
417         } /* Shutdown() */
418
419 private:
420         CollectdImpl collectd_service_;
421
422         std::unique_ptr<grpc::Server> server_;
423 }; /* class CollectdServer */
424
425 class CollectdClient final
426 {
427 public:
428         CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel) : stub_(Collectd::NewStub(channel)) {
429         }
430
431         int PutValues(value_list_t const *vl) {
432                 grpc::ClientContext ctx;
433
434                 PutValuesRequest req;
435                 auto status = marshal_value_list(vl, req.mutable_value_list());
436                 if (!status.ok()) {
437                         ERROR("grpc: Marshalling value_list_t failed.");
438                         return -1;
439                 }
440
441                 PutValuesResponse res;
442                 auto stream = stub_->PutValues(&ctx, &res);
443                 if (!stream->Write(req)) {
444                         NOTICE("grpc: Broken stream.");
445                         /* intentionally not returning. */
446                 }
447
448                 stream->WritesDone();
449                 status = stream->Finish();
450                 if (!status.ok()) {
451                         ERROR ("grpc: Error while closing stream.");
452                         return -1;
453                 }
454
455                 return 0;
456         } /* int PutValues */
457
458 private:
459         std::unique_ptr<Collectd::Stub> stub_;
460 };
461
462 static CollectdServer *server = nullptr;
463
464 /*
465  * collectd plugin interface
466  */
467 extern "C" {
468         static void c_grpc_destroy_write_callback (void *ptr) {
469                 delete (CollectdClient *) ptr;
470         }
471
472         static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
473                         value_list_t const *vl,
474                         user_data_t *ud) {
475                 CollectdClient *c = (CollectdClient *) ud->data;
476                 return c->PutValues(vl);
477         }
478
479         static int c_grpc_config_listen(oconfig_item_t *ci)
480         {
481                 if ((ci->values_num != 2)
482                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
483                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
484                         ERROR("grpc: The `%s` config option needs exactly "
485                                         "two string argument (address and port).", ci->key);
486                         return -1;
487                 }
488
489                 auto listener = Listener();
490                 listener.addr = grpc::string(ci->values[0].value.string);
491                 listener.port = grpc::string(ci->values[1].value.string);
492                 listener.ssl = nullptr;
493
494                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
495                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
496                 bool use_ssl = false;
497
498                 for (int i = 0; i < ci->children_num; i++) {
499                         oconfig_item_t *child = ci->children + i;
500
501                         if (!strcasecmp("EnableSSL", child->key)) {
502                                 if (cf_util_get_boolean(child, &use_ssl)) {
503                                         ERROR("grpc: Option `%s` expects a boolean value",
504                                                         child->key);
505                                         return -1;
506                                 }
507                         }
508                         else if (!strcasecmp("SSLCACertificateFile", child->key)) {
509                                 char *certs = NULL;
510                                 if (cf_util_get_string(child, &certs)) {
511                                         ERROR("grpc: Option `%s` expects a string value",
512                                                         child->key);
513                                         return -1;
514                                 }
515                                 ssl_opts->pem_root_certs = read_file(certs);
516                         }
517                         else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
518                                 char *key = NULL;
519                                 if (cf_util_get_string(child, &key)) {
520                                         ERROR("grpc: Option `%s` expects a string value",
521                                                         child->key);
522                                         return -1;
523                                 }
524                                 pkcp.private_key = read_file(key);
525                         }
526                         else if (!strcasecmp("SSLCertificateFile", child->key)) {
527                                 char *cert = NULL;
528                                 if (cf_util_get_string(child, &cert)) {
529                                         ERROR("grpc: Option `%s` expects a string value",
530                                                         child->key);
531                                         return -1;
532                                 }
533                                 pkcp.cert_chain = read_file(cert);
534                         }
535                         else {
536                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
537                                                 child->key, ci->key);
538                         }
539                 }
540
541                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
542                 if (use_ssl)
543                         listener.ssl = ssl_opts;
544                 else
545                         delete(ssl_opts);
546
547                 listeners.push_back(listener);
548                 return 0;
549         } /* c_grpc_config_listen() */
550
551         static int c_grpc_config_server(oconfig_item_t *ci)
552         {
553                 if ((ci->values_num != 2)
554                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
555                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
556                         ERROR("grpc: The `%s` config option needs exactly "
557                                         "two string argument (address and port).", ci->key);
558                         return -1;
559                 }
560
561                 grpc::SslCredentialsOptions ssl_opts;
562                 bool use_ssl = false;
563
564                 for (int i = 0; i < ci->children_num; i++) {
565                         oconfig_item_t *child = ci->children + i;
566
567                         if (!strcasecmp("EnableSSL", child->key)) {
568                                 if (cf_util_get_boolean(child, &use_ssl)) {
569                                         return -1;
570                                 }
571                         }
572                         else if (!strcasecmp("SSLCACertificateFile", child->key)) {
573                                 char *certs = NULL;
574                                 if (cf_util_get_string(child, &certs)) {
575                                         return -1;
576                                 }
577                                 ssl_opts.pem_root_certs = read_file(certs);
578                         }
579                         else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
580                                 char *key = NULL;
581                                 if (cf_util_get_string(child, &key)) {
582                                         return -1;
583                                 }
584                                 ssl_opts.pem_private_key = read_file(key);
585                         }
586                         else if (!strcasecmp("SSLCertificateFile", child->key)) {
587                                 char *cert = NULL;
588                                 if (cf_util_get_string(child, &cert)) {
589                                         return -1;
590                                 }
591                                 ssl_opts.pem_cert_chain = read_file(cert);
592                         }
593                         else {
594                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
595                                                 child->key, ci->key);
596                         }
597                 }
598
599                 auto node    = grpc::string(ci->values[0].value.string);
600                 auto service = grpc::string(ci->values[1].value.string);
601                 auto addr    = node + ":" + service;
602
603                 CollectdClient *client;
604                 if (use_ssl) {
605                         auto channel_creds = grpc::SslCredentials(ssl_opts);
606                         auto channel = grpc::CreateChannel(addr, channel_creds);
607                         client = new CollectdClient(channel);
608                 } else {
609                         auto channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
610                         client = new CollectdClient(channel);
611                 }
612
613                 auto callback_name = grpc::string("grpc/") + addr;
614                 user_data_t ud = {
615                         .data = client,
616                         .free_func = c_grpc_destroy_write_callback,
617                 };
618
619                 plugin_register_write (callback_name.c_str(), c_grpc_write, &ud);
620                 return 0;
621         } /* c_grpc_config_server() */
622
623         static int c_grpc_config(oconfig_item_t *ci)
624         {
625                 int i;
626
627                 for (i = 0; i < ci->children_num; i++) {
628                         oconfig_item_t *child = ci->children + i;
629
630                         if (!strcasecmp("Listen", child->key)) {
631                                 if (c_grpc_config_listen(child))
632                                         return -1;
633                         }
634                         else if (!strcasecmp("Server", child->key)) {
635                                 if (c_grpc_config_server(child))
636                                         return -1;
637                         }
638
639                         else {
640                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
641                         }
642                 }
643
644                 return 0;
645         } /* c_grpc_config() */
646
647         static int c_grpc_init(void)
648         {
649                 server = new CollectdServer();
650                 if (!server) {
651                         ERROR("grpc: Failed to create server");
652                         return -1;
653                 }
654
655                 server->Start();
656                 return 0;
657         } /* c_grpc_init() */
658
659         static int c_grpc_shutdown(void)
660         {
661                 if (!server)
662                         return 0;
663
664                 server->Shutdown();
665
666                 delete server;
667                 server = nullptr;
668
669                 return 0;
670         } /* c_grpc_shutdown() */
671
672         void module_register(void)
673         {
674                 plugin_register_complex_config("grpc", c_grpc_config);
675                 plugin_register_init("grpc", c_grpc_init);
676                 plugin_register_shutdown("grpc", c_grpc_shutdown);
677         } /* module_register() */
678 } /* extern "C" */
679
680 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */