grpc plugin: Turn QueryValues into a server-side streaming RPC.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <queue>
33 #include <vector>
34
35 #include "collectd.grpc.pb.h"
36
37 extern "C" {
38 #include <fnmatch.h>
39 #include <stdbool.h>
40
41 #include "collectd.h"
42 #include "common.h"
43 #include "configfile.h"
44 #include "plugin.h"
45
46 #include "daemon/utils_cache.h"
47 }
48
49 using collectd::Collectd;
50 using collectd::Dispatch;
51
52 using collectd::DispatchValuesRequest;
53 using collectd::DispatchValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 /*
60  * private types
61  */
62
63 struct Listener {
64         grpc::string addr;
65         grpc::string port;
66
67         grpc::SslServerCredentialsOptions *ssl;
68 };
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
71
72 /*
73  * helper functions
74  */
75
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
77 {
78         if (fnmatch(matcher->host, vl->host, 0))
79                 return false;
80
81         if (fnmatch(matcher->plugin, vl->plugin, 0))
82                 return false;
83         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
84                 return false;
85
86         if (fnmatch(matcher->type, vl->type, 0))
87                 return false;
88         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
89                 return false;
90
91         return true;
92 } /* ident_matches */
93
94 static grpc::string read_file(const char *filename)
95 {
96         std::ifstream f;
97         grpc::string s, content;
98
99         f.open(filename);
100         if (!f.is_open()) {
101                 ERROR("grpc: Failed to open '%s'", filename);
102                 return "";
103         }
104
105         while (std::getline(f, s)) {
106                 content += s;
107                 content.push_back('\n');
108         }
109         f.close();
110         return content;
111 } /* read_file */
112
113 /*
114  * proto conversion
115  */
116
117 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
118 {
119         msg->set_host(vl->host);
120         msg->set_plugin(vl->plugin);
121         if (vl->plugin_instance[0] != '\0')
122                 msg->set_plugin_instance(vl->plugin_instance);
123         msg->set_type(vl->type);
124         if (vl->type_instance[0] != '\0')
125                 msg->set_type_instance(vl->type_instance);
126 } /* marshal_ident */
127
128 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
129                 bool require_fields)
130 {
131         std::string s;
132
133         s = msg.host();
134         if (!s.length() && require_fields)
135                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136                                 grpc::string("missing host name"));
137         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
138
139         s = msg.plugin();
140         if (!s.length() && require_fields)
141                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
142                                 grpc::string("missing plugin name"));
143         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
144
145         s = msg.type();
146         if (!s.length() && require_fields)
147                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
148                                 grpc::string("missing type name"));
149         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
150
151         s = msg.plugin_instance();
152         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
153
154         s = msg.type_instance();
155         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
156
157         return grpc::Status::OK;
158 } /* unmarshal_ident() */
159
160 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
161 {
162         auto id = msg->mutable_identifier();
163         marshal_ident(vl, id);
164
165         auto ds = plugin_get_ds(vl->type);
166         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
167                 return grpc::Status(grpc::StatusCode::INTERNAL,
168                                 grpc::string("failed to retrieve data-set for values"));
169         }
170
171         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
172         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
173         msg->set_allocated_time(new google::protobuf::Timestamp(t));
174         msg->set_allocated_interval(new google::protobuf::Duration(d));
175
176         for (size_t i = 0; i < vl->values_len; ++i) {
177                 auto v = msg->add_values();
178                 switch (ds->ds[i].type) {
179                         case DS_TYPE_COUNTER:
180                                 v->set_counter(vl->values[i].counter);
181                                 break;
182                         case DS_TYPE_GAUGE:
183                                 v->set_gauge(vl->values[i].gauge);
184                                 break;
185                         case DS_TYPE_DERIVE:
186                                 v->set_derive(vl->values[i].derive);
187                                 break;
188                         case DS_TYPE_ABSOLUTE:
189                                 v->set_absolute(vl->values[i].absolute);
190                                 break;
191                         default:
192                                 return grpc::Status(grpc::StatusCode::INTERNAL,
193                                                 grpc::string("unknown value type"));
194                 }
195
196                 auto name = msg->add_ds_names();
197                 name->assign(ds->ds[i].name);
198         }
199
200         return grpc::Status::OK;
201 } /* marshal_value_list */
202
203 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
204 {
205         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
206         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
207
208         auto status = unmarshal_ident(msg.identifier(), vl, true);
209         if (!status.ok())
210                 return status;
211
212         value_t *values = NULL;
213         size_t values_len = 0;
214
215         status = grpc::Status::OK;
216         for (auto v : msg.values()) {
217                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
218                 if (!val) {
219                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
220                                         grpc::string("failed to allocate values array"));
221                         break;
222                 }
223
224                 values = val;
225                 val = values + values_len;
226                 values_len++;
227
228                 switch (v.value_case()) {
229                 case collectd::types::Value::ValueCase::kCounter:
230                         val->counter = counter_t(v.counter());
231                         break;
232                 case collectd::types::Value::ValueCase::kGauge:
233                         val->gauge = gauge_t(v.gauge());
234                         break;
235                 case collectd::types::Value::ValueCase::kDerive:
236                         val->derive = derive_t(v.derive());
237                         break;
238                 case collectd::types::Value::ValueCase::kAbsolute:
239                         val->absolute = absolute_t(v.absolute());
240                         break;
241                 default:
242                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
243                                         grpc::string("unknown value type"));
244                         break;
245                 }
246
247                 if (!status.ok())
248                         break;
249         }
250         if (status.ok()) {
251                 vl->values = values;
252                 vl->values_len = values_len;
253         }
254         else if (values) {
255                 free(values);
256         }
257
258         return status;
259 } /* unmarshal_value_list() */
260
261 /*
262  * request call-backs and call objects
263  */
264 static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesResponse *reply)
265 {
266         value_list_t vl = VALUE_LIST_INIT;
267         auto status = unmarshal_value_list(request.value_list(), &vl);
268         if (!status.ok())
269                 return status;
270
271         if (plugin_dispatch_values(&vl))
272                 status = grpc::Status(grpc::StatusCode::INTERNAL,
273                                 grpc::string("failed to enqueue values for writing"));
274
275         reply->Clear();
276         return status;
277 } /* grpc::Status DispatchValue */
278
279 // CallData is the abstract base class for asynchronous calls.
280 class CallData {
281 public:
282   virtual ~CallData() {}
283   virtual void process(bool ok) = 0;
284
285 protected:
286   CallData() {}
287
288 private:
289   CallData(const CallData&) = delete;
290   CallData& operator=(const CallData&) = delete;
291 };
292
293 /*
294  * Collectd service
295  */
296
297 // QueryValuesCallData holds the state and implements the logic for QueryValues calls.
298 class QueryValuesCallData : public CallData {
299 public:
300         QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
301                         : cq_(cq), service_(service), writer_(&context_) {
302                 process(true);
303         }
304
305         void process(bool ok) final {
306                 if (status_ == Status::INIT) {
307                         status_ = Status::READ;
308                         service_->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
309                 } else if (status_ == Status::READ) {
310                         auto err = queryValues();
311                         if (!err.ok()) {
312                                 writer_.Finish(err, this);
313                                 status_ = Status::DONE;
314                                 return;
315                         }
316                         respond();
317                 } else if (status_ == Status::WRITE) {
318                         respond();
319                 } else if (status_ == Status::DONE) {
320                         new QueryValuesCallData(service_, cq_);
321                         delete this;
322                 } else {
323                         throw std::logic_error("Unhandled state enum.");
324                 }
325         }
326
327 private:
328         enum class Status {
329                 INIT,
330                 READ,
331                 WRITE,
332                 DONE,
333         };
334
335         grpc::Status queryValues (void) {
336                 uc_iter_t *iter;
337                 char *name = NULL;
338
339                 value_list_t matcher;
340                 auto status = unmarshal_ident(request_.identifier(), &matcher, false);
341                 if (!status.ok())
342                         return status;
343
344                 if ((iter = uc_get_iterator()) == NULL) {
345                         return grpc::Status(grpc::StatusCode::INTERNAL,
346                                                                 grpc::string("failed to query values: cannot create iterator"));
347                 }
348
349                 status = grpc::Status::OK;
350                 while (uc_iterator_next(iter, &name) == 0) {
351                         value_list_t vl;
352                         if (parse_identifier_vl(name, &vl) != 0) {
353                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
354                                                                           grpc::string("failed to parse identifier"));
355                                 break;
356                         }
357
358                         if (!ident_matches(&vl, &matcher))
359                                 continue;
360
361                         if (uc_iterator_get_time(iter, &vl.time) < 0) {
362                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
363                                                                           grpc::string("failed to retrieve value timestamp"));
364                                 break;
365                         }
366                         if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
367                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
368                                                                           grpc::string("failed to retrieve value interval"));
369                                 break;
370                         }
371                         if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
372                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
373                                                                           grpc::string("failed to retrieve values"));
374                                 break;
375                         }
376
377                         value_lists_.push(vl);
378                 }
379
380                 uc_iterator_destroy(iter);
381                 return status;
382         }
383
384         void respond() {
385                 if (value_lists_.empty()) {
386                         writer_.Finish(grpc::Status::OK, this);
387                         status_ = Status::DONE;
388                         return;
389                 }
390
391                 auto vl = value_lists_.front();
392
393                 response_.Clear();
394                 auto err = marshal_value_list(&vl, response_.mutable_value_list());
395                 if (!err.ok()) {
396                         writer_.Finish(err, this);
397                         status_ = Status::DONE;
398                         return;
399                 }
400
401                 value_lists_.pop();
402                 free(vl.values);
403
404                 writer_.Write(response_, this);
405                 status_ = Status::WRITE;
406         }
407
408         Status status_ = Status::INIT;
409         grpc::ServerContext context_;
410         grpc::ServerCompletionQueue* cq_;
411         Collectd::AsyncService* service_;
412         QueryValuesRequest request_;
413         std::queue<value_list_t> value_lists_;
414         QueryValuesResponse response_;
415         grpc::ServerAsyncWriter<QueryValuesResponse> writer_;
416 };
417
418 /*
419  * Dispatch service
420  */
421 // DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
422 class DispatchValuesCallData : public CallData {
423 public:
424         DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
425                         : cq_(cq), service_(service), reader_(&context_) {
426                 process(true);
427         }
428
429         void process(bool ok) final {
430                 if (status == Status::INIT) {
431                         service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
432                         status = Status::CALL;
433                 } else if (status == Status::CALL) {
434                         reader_.Read(&request_, this);
435                         status = Status::READ;
436                 } else if (status == Status::READ && ok) {
437                         (void) DispatchValue(&context_, request_, &response_);
438
439                         reader_.Read(&request_, this);
440                 } else if (status == Status::READ) {
441                         response_.Clear();
442
443                         status = Status::DONE;
444                 } else if (status == Status::DONE) {
445                         new DispatchValuesCallData(service_, cq_);
446                         delete this;
447                 } else {
448                         ERROR("grpc: DispatchValuesCallData: invalid state");
449                 }
450         }
451
452 private:
453         enum class Status {
454                 INIT,
455                 CALL,
456                 READ,
457                 DONE,
458         };
459         Status status = Status::INIT;
460
461         grpc::ServerContext          context_;
462         grpc::ServerCompletionQueue* cq_;
463         Dispatch::AsyncService*      service_;
464
465         DispatchValuesRequest request_;
466         DispatchValuesResponse response_;
467         grpc::ServerAsyncReader<DispatchValuesResponse, DispatchValuesRequest> reader_;
468 };
469
470 /*
471  * gRPC server implementation
472  */
473 class CollectdServer final
474 {
475 public:
476         void Start()
477         {
478                 auto auth = grpc::InsecureServerCredentials();
479
480                 grpc::ServerBuilder builder;
481
482                 if (listeners.empty()) {
483                         builder.AddListeningPort(default_addr, auth);
484                         INFO("grpc: Listening on %s", default_addr.c_str());
485                 }
486                 else {
487                         for (auto l : listeners) {
488                                 grpc::string addr = l.addr + ":" + l.port;
489
490                                 auto use_ssl = grpc::string("");
491                                 auto a = auth;
492                                 if (l.ssl != nullptr) {
493                                         use_ssl = grpc::string(" (SSL enabled)");
494                                         a = grpc::SslServerCredentials(*l.ssl);
495                                 }
496
497                                 builder.AddListeningPort(addr, a);
498                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
499                         }
500                 }
501
502                 cq_ = builder.AddCompletionQueue();
503
504                 builder.RegisterService(&collectd_service_);
505                 builder.RegisterService(&dispatch_service_);
506
507                 server_ = builder.BuildAndStart();
508                 new QueryValuesCallData(&collectd_service_, cq_.get());
509                 new DispatchValuesCallData(&dispatch_service_, cq_.get());
510         } /* Start() */
511
512         void Shutdown()
513         {
514                 server_->Shutdown();
515                 cq_->Shutdown();
516         } /* Shutdown() */
517
518         void Mainloop()
519         {
520                 while (true) {
521                         void *tag = NULL;
522                         bool ok = false;
523
524                         // Block waiting to read the next event from the completion queue.
525                         // The event is uniquely identified by its tag, which in this case
526                         // is the memory address of a CallData instance.
527                         if (!cq_->Next(&tag, &ok))
528                                 break; // Queue shut down.
529
530                         static_cast<CallData*>(tag)->process(ok);
531                 }
532         } /* Mainloop() */
533
534 private:
535         Collectd::AsyncService collectd_service_;
536         Dispatch::AsyncService dispatch_service_;
537
538         std::unique_ptr<grpc::Server> server_;
539         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
540 }; /* class CollectdServer */
541
542 static CollectdServer *server = nullptr;
543
544 /*
545  * collectd plugin interface
546  */
547 extern "C" {
548         static pthread_t *workers;
549         static size_t workers_num = 5;
550
551         static void *worker_thread(void *arg)
552         {
553                 CollectdServer *s = (CollectdServer *)arg;
554                 s->Mainloop();
555                 return NULL;
556         } /* worker_thread() */
557
558         static int c_grpc_config_listen(oconfig_item_t *ci)
559         {
560                 if ((ci->values_num != 2)
561                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
562                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
563                         ERROR("grpc: The `%s` config option needs exactly "
564                                         "two string argument (address and port).", ci->key);
565                         return -1;
566                 }
567
568                 auto listener = Listener();
569                 listener.addr = grpc::string(ci->values[0].value.string);
570                 listener.port = grpc::string(ci->values[1].value.string);
571                 listener.ssl = nullptr;
572
573                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
574                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
575                 bool use_ssl = false;
576
577                 for (int i = 0; i < ci->children_num; i++) {
578                         oconfig_item_t *child = ci->children + i;
579
580                         if (!strcasecmp("EnableSSL", child->key)) {
581                                 if (cf_util_get_boolean(child, &use_ssl)) {
582                                         ERROR("grpc: Option `%s` expects a boolean value",
583                                                         child->key);
584                                         return -1;
585                                 }
586                         }
587                         else if (!strcasecmp("SSLRootCerts", child->key)) {
588                                 char *certs = NULL;
589                                 if (cf_util_get_string(child, &certs)) {
590                                         ERROR("grpc: Option `%s` expects a string value",
591                                                         child->key);
592                                         return -1;
593                                 }
594                                 ssl_opts->pem_root_certs = read_file(certs);
595                         }
596                         else if (!strcasecmp("SSLServerKey", child->key)) {
597                                 char *key = NULL;
598                                 if (cf_util_get_string(child, &key)) {
599                                         ERROR("grpc: Option `%s` expects a string value",
600                                                         child->key);
601                                         return -1;
602                                 }
603                                 pkcp.private_key = read_file(key);
604                         }
605                         else if (!strcasecmp("SSLServerCert", child->key)) {
606                                 char *cert = NULL;
607                                 if (cf_util_get_string(child, &cert)) {
608                                         ERROR("grpc: Option `%s` expects a string value",
609                                                         child->key);
610                                         return -1;
611                                 }
612                                 pkcp.cert_chain = read_file(cert);
613                         }
614                         else {
615                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
616                                                 child->key, ci->key);
617                         }
618                 }
619
620                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
621                 if (use_ssl)
622                         listener.ssl = ssl_opts;
623                 else
624                         delete(ssl_opts);
625
626                 listeners.push_back(listener);
627                 return 0;
628         } /* c_grpc_config_listen() */
629
630         static int c_grpc_config(oconfig_item_t *ci)
631         {
632                 int i;
633
634                 for (i = 0; i < ci->children_num; i++) {
635                         oconfig_item_t *child = ci->children + i;
636
637                         if (!strcasecmp("Listen", child->key)) {
638                                 if (c_grpc_config_listen(child))
639                                         return -1;
640                         }
641                         else if (!strcasecmp("WorkerThreads", child->key)) {
642                                 int n;
643                                 if (cf_util_get_int(child, &n))
644                                         return -1;
645                                 workers_num = (size_t)n;
646                         }
647                         else {
648                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
649                         }
650                 }
651
652                 return 0;
653         } /* c_grpc_config() */
654
655         static int c_grpc_init(void)
656         {
657                 server = new CollectdServer();
658                 size_t i;
659
660                 if (! server) {
661                         ERROR("grpc: Failed to create server");
662                         return -1;
663                 }
664
665                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
666                 if (! workers) {
667                         delete server;
668                         server = nullptr;
669
670                         ERROR("grpc: Failed to allocate worker threads");
671                         return -1;
672                 }
673
674                 server->Start();
675                 for (i = 0; i < workers_num; i++) {
676                         plugin_thread_create(&workers[i], /* attr = */ NULL,
677                                         worker_thread, server);
678                 }
679                 INFO("grpc: Started %zu workers", workers_num);
680                 return 0;
681         } /* c_grpc_init() */
682
683         static int c_grpc_shutdown(void)
684         {
685                 size_t i;
686
687                 if (!server)
688                         return -1;
689
690                 server->Shutdown();
691
692                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
693                 for (i = 0; i < workers_num; i++)
694                         pthread_join(workers[i], NULL);
695                 free(workers);
696                 workers = NULL;
697                 workers_num = 0;
698
699                 delete server;
700                 server = nullptr;
701
702                 return 0;
703         } /* c_grpc_shutdown() */
704
705         void module_register(void)
706         {
707                 plugin_register_complex_config("grpc", c_grpc_config);
708                 plugin_register_init("grpc", c_grpc_init);
709                 plugin_register_shutdown("grpc", c_grpc_shutdown);
710         } /* module_register() */
711 } /* extern "C" */
712
713 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */