grpc plugin: Create a "Dispatch" service and use streaming RPCs.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <vector>
33
34 #include "collectd.grpc.pb.h"
35
36 extern "C" {
37 #include <fnmatch.h>
38 #include <stdbool.h>
39
40 #include "collectd.h"
41 #include "common.h"
42 #include "configfile.h"
43 #include "plugin.h"
44
45 #include "daemon/utils_cache.h"
46 }
47
48 using collectd::Collectd;
49 using collectd::Dispatch;
50
51 using collectd::DispatchValuesRequest;
52 using collectd::DispatchValuesReply;
53 using collectd::QueryValuesRequest;
54 using collectd::QueryValuesReply;
55
56 using google::protobuf::util::TimeUtil;
57
58 /*
59  * private types
60  */
61
62 struct Listener {
63         grpc::string addr;
64         grpc::string port;
65
66         grpc::SslServerCredentialsOptions *ssl;
67 };
68 static std::vector<Listener> listeners;
69 static grpc::string default_addr("0.0.0.0:50051");
70
71 /*
72  * helper functions
73  */
74
75 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
76 {
77         if (fnmatch(matcher->host, vl->host, 0))
78                 return false;
79
80         if (fnmatch(matcher->plugin, vl->plugin, 0))
81                 return false;
82         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
83                 return false;
84
85         if (fnmatch(matcher->type, vl->type, 0))
86                 return false;
87         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
88                 return false;
89
90         return true;
91 } /* ident_matches */
92
93 static grpc::string read_file(const char *filename)
94 {
95         std::ifstream f;
96         grpc::string s, content;
97
98         f.open(filename);
99         if (!f.is_open()) {
100                 ERROR("grpc: Failed to open '%s'", filename);
101                 return "";
102         }
103
104         while (std::getline(f, s)) {
105                 content += s;
106                 content.push_back('\n');
107         }
108         f.close();
109         return content;
110 } /* read_file */
111
112 /*
113  * proto conversion
114  */
115
116 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
117 {
118         msg->set_host(vl->host);
119         msg->set_plugin(vl->plugin);
120         if (vl->plugin_instance[0] != '\0')
121                 msg->set_plugin_instance(vl->plugin_instance);
122         msg->set_type(vl->type);
123         if (vl->type_instance[0] != '\0')
124                 msg->set_type_instance(vl->type_instance);
125 } /* marshal_ident */
126
127 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
128                 bool require_fields)
129 {
130         std::string s;
131
132         s = msg.host();
133         if (!s.length() && require_fields)
134                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
135                                 grpc::string("missing host name"));
136         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137
138         s = msg.plugin();
139         if (!s.length() && require_fields)
140                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
141                                 grpc::string("missing plugin name"));
142         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143
144         s = msg.type();
145         if (!s.length() && require_fields)
146                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
147                                 grpc::string("missing type name"));
148         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
149
150         s = msg.plugin_instance();
151         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
152
153         s = msg.type_instance();
154         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
155
156         return grpc::Status::OK;
157 } /* unmarshal_ident() */
158
159 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
160 {
161         auto id = msg->mutable_identifier();
162         marshal_ident(vl, id);
163
164         auto ds = plugin_get_ds(vl->type);
165         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
166                 return grpc::Status(grpc::StatusCode::INTERNAL,
167                                 grpc::string("failed to retrieve data-set for values"));
168         }
169
170         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
171         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
172         msg->set_allocated_time(new google::protobuf::Timestamp(t));
173         msg->set_allocated_interval(new google::protobuf::Duration(d));
174
175         for (size_t i = 0; i < vl->values_len; ++i) {
176                 auto v = msg->add_values();
177                 switch (ds->ds[i].type) {
178                         case DS_TYPE_COUNTER:
179                                 v->set_counter(vl->values[i].counter);
180                                 break;
181                         case DS_TYPE_GAUGE:
182                                 v->set_gauge(vl->values[i].gauge);
183                                 break;
184                         case DS_TYPE_DERIVE:
185                                 v->set_derive(vl->values[i].derive);
186                                 break;
187                         case DS_TYPE_ABSOLUTE:
188                                 v->set_absolute(vl->values[i].absolute);
189                                 break;
190                         default:
191                                 return grpc::Status(grpc::StatusCode::INTERNAL,
192                                                 grpc::string("unknown value type"));
193                 }
194
195                 auto name = msg->add_ds_names();
196                 name->assign(ds->ds[i].name);
197         }
198
199         return grpc::Status::OK;
200 } /* marshal_value_list */
201
202 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
203 {
204         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
205         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
206
207         auto status = unmarshal_ident(msg.identifier(), vl, true);
208         if (!status.ok())
209                 return status;
210
211         value_t *values = NULL;
212         size_t values_len = 0;
213
214         status = grpc::Status::OK;
215         for (auto v : msg.values()) {
216                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
217                 if (!val) {
218                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
219                                         grpc::string("failed to allocate values array"));
220                         break;
221                 }
222
223                 values = val;
224                 val = values + values_len;
225                 values_len++;
226
227                 switch (v.value_case()) {
228                 case collectd::types::Value::ValueCase::kCounter:
229                         val->counter = counter_t(v.counter());
230                         break;
231                 case collectd::types::Value::ValueCase::kGauge:
232                         val->gauge = gauge_t(v.gauge());
233                         break;
234                 case collectd::types::Value::ValueCase::kDerive:
235                         val->derive = derive_t(v.derive());
236                         break;
237                 case collectd::types::Value::ValueCase::kAbsolute:
238                         val->absolute = absolute_t(v.absolute());
239                         break;
240                 default:
241                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
242                                         grpc::string("unknown value type"));
243                         break;
244                 }
245
246                 if (!status.ok())
247                         break;
248         }
249         if (status.ok()) {
250                 vl->values = values;
251                 vl->values_len = values_len;
252         }
253         else if (values) {
254                 free(values);
255         }
256
257         return status;
258 } /* unmarshal_value_list() */
259
260 /*
261  * request call-backs and call objects
262  */
263 static grpc::Status DispatchValue(grpc::ServerContext *ctx, DispatchValuesRequest request, DispatchValuesReply *reply)
264 {
265         value_list_t vl = VALUE_LIST_INIT;
266         auto status = unmarshal_value_list(request.value_list(), &vl);
267         if (!status.ok())
268                 return status;
269
270         if (plugin_dispatch_values(&vl))
271                 status = grpc::Status(grpc::StatusCode::INTERNAL,
272                                 grpc::string("failed to enqueue values for writing"));
273
274         reply->Clear();
275         return status;
276 } /* grpc::Status DispatchValue */
277
278 static grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest req, QueryValuesReply *res)
279 {
280         uc_iter_t *iter;
281         char *name = NULL;
282
283         value_list_t matcher;
284         auto status = unmarshal_ident(req.identifier(), &matcher, false);
285         if (!status.ok())
286                 return status;
287
288         if ((iter = uc_get_iterator()) == NULL) {
289                 return grpc::Status(grpc::StatusCode::INTERNAL,
290                                 grpc::string("failed to query values: cannot create iterator"));
291         }
292
293         status = grpc::Status::OK;
294         while (uc_iterator_next(iter, &name) == 0) {
295                 value_list_t vl;
296                 if (parse_identifier_vl(name, &vl) != 0) {
297                         status = grpc::Status(grpc::StatusCode::INTERNAL,
298                                         grpc::string("failed to parse identifier"));
299                         break;
300                 }
301
302                 if (!ident_matches(&vl, &matcher))
303                         continue;
304
305                 if (uc_iterator_get_time(iter, &vl.time) < 0) {
306                         status = grpc::Status(grpc::StatusCode::INTERNAL,
307                                         grpc::string("failed to retrieve value timestamp"));
308                         break;
309                 }
310                 if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
311                         status = grpc::Status(grpc::StatusCode::INTERNAL,
312                                         grpc::string("failed to retrieve value interval"));
313                         break;
314                 }
315                 if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
316                         status = grpc::Status(grpc::StatusCode::INTERNAL,
317                                         grpc::string("failed to retrieve values"));
318                         break;
319                 }
320
321                 auto pb_vl = res->add_value_lists();
322                 status = marshal_value_list(&vl, pb_vl);
323                 free(vl.values);
324                 if (!status.ok())
325                         break;
326         }
327
328         uc_iterator_destroy(iter);
329
330         return status;
331 } /* grpc::Status QueryValues */
332
333 // CallData is the abstract base class for asynchronous calls.
334 class CallData {
335 public:
336   virtual ~CallData() {}
337   virtual void process(bool ok) = 0;
338
339 protected:
340   CallData() {}
341
342 private:
343   CallData(const CallData&) = delete;
344   CallData& operator=(const CallData&) = delete;
345 };
346
347 /*
348  * Collectd service
349  */
350 // QueryValuesCallData holds the state and implements the logic for QueryValues calls.
351 class QueryValuesCallData : public CallData {
352 public:
353         QueryValuesCallData(Collectd::AsyncService* service, grpc::ServerCompletionQueue* cq)
354                         : cq_(cq), service_(service), writer_(&context_) {
355                 // As part of the initialization, we *request* that the system start
356                 // processing QueryValues requests. In this request, "this" acts as
357                 // the tag uniquely identifying the request (so that different
358                 // QueryValuesCallData instances can serve different requests
359                 // concurrently), in this case the memory address of this
360                 // QueryValuesCallData instance.
361                 service->RequestQueryValues(&context_, &request_, &writer_, cq_, cq_, this);
362         }
363
364         void process(bool ok) final {
365                 if (done_) {
366                         delete this;
367                 } else {
368                         // Spawn a new QueryValuesCallData instance to serve new clients
369                         // while we process the one for this QueryValuesCallData. The
370                         // instance will deallocate itself as part of its FINISH state.
371                         new QueryValuesCallData(service_, cq_);
372
373                         auto status = QueryValues(&context_, request_, &response_);
374                         if (!status.ok()) {
375                                 writer_.FinishWithError(status, this);
376                         } else {
377                                 writer_.Finish(response_, grpc::Status::OK, this);
378                         }
379
380                         done_ = true;
381                 }
382         }
383
384 private:
385         bool done_ = false;
386         grpc::ServerContext context_;
387         grpc::ServerCompletionQueue* cq_;
388         Collectd::AsyncService* service_;
389         QueryValuesRequest request_;
390         QueryValuesReply response_;
391         grpc::ServerAsyncResponseWriter<QueryValuesReply> writer_;
392 };
393
394 /*
395  * Dispatch service
396  */
397 // DispatchValuesCallData holds the state and implements the logic for DispatchValues calls.
398 class DispatchValuesCallData : public CallData {
399 public:
400         DispatchValuesCallData(Dispatch::AsyncService* service, grpc::ServerCompletionQueue* cq)
401                         : cq_(cq), service_(service), reader_(&context_) {
402                 process(true);
403         }
404
405         void process(bool ok) final {
406                 if (status == Status::INIT) {
407                         service_->RequestDispatchValues(&context_, &reader_, cq_, cq_, this);
408                         status = Status::CALL;
409                 } else if (status == Status::CALL) {
410                         reader_.Read(&request_, this);
411                         status = Status::READ;
412                 } else if (status == Status::READ && ok) {
413                         (void) DispatchValue(&context_, request_, &response_);
414
415                         reader_.Read(&request_, this);
416                 } else if (status == Status::READ) {
417                         response_.Clear();
418
419                         status = Status::DONE;
420                 } else if (status == Status::DONE) {
421                         new DispatchValuesCallData(service_, cq_);
422                         delete this;
423                 } else {
424                         ERROR("grpc: DispatchValuesCallData: invalid state");
425                 }
426         }
427
428 private:
429         enum class Status {
430                 INIT,
431                 CALL,
432                 READ,
433                 DONE,
434         };
435         Status status = Status::INIT;
436
437         grpc::ServerContext          context_;
438         grpc::ServerCompletionQueue* cq_;
439         Dispatch::AsyncService*      service_;
440
441         DispatchValuesRequest request_;
442         DispatchValuesReply response_;
443         grpc::ServerAsyncReader<DispatchValuesReply, DispatchValuesRequest> reader_;
444 };
445
446 /*
447  * gRPC server implementation
448  */
449 class CollectdServer final
450 {
451 public:
452         void Start()
453         {
454                 auto auth = grpc::InsecureServerCredentials();
455
456                 grpc::ServerBuilder builder;
457
458                 if (listeners.empty()) {
459                         builder.AddListeningPort(default_addr, auth);
460                         INFO("grpc: Listening on %s", default_addr.c_str());
461                 }
462                 else {
463                         for (auto l : listeners) {
464                                 grpc::string addr = l.addr + ":" + l.port;
465
466                                 auto use_ssl = grpc::string("");
467                                 auto a = auth;
468                                 if (l.ssl != nullptr) {
469                                         use_ssl = grpc::string(" (SSL enabled)");
470                                         a = grpc::SslServerCredentials(*l.ssl);
471                                 }
472
473                                 builder.AddListeningPort(addr, a);
474                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
475                         }
476                 }
477
478                 cq_ = builder.AddCompletionQueue();
479
480                 builder.RegisterService(&collectd_service_);
481                 builder.RegisterService(&dispatch_service_);
482
483                 server_ = builder.BuildAndStart();
484                 new QueryValuesCallData(&collectd_service_, cq_.get());
485                 new DispatchValuesCallData(&dispatch_service_, cq_.get());
486         } /* Start() */
487
488         void Shutdown()
489         {
490                 server_->Shutdown();
491                 cq_->Shutdown();
492         } /* Shutdown() */
493
494         void Mainloop()
495         {
496                 while (true) {
497                         void *tag = NULL;
498                         bool ok = false;
499
500                         // Block waiting to read the next event from the completion queue.
501                         // The event is uniquely identified by its tag, which in this case
502                         // is the memory address of a CallData instance.
503                         if (!cq_->Next(&tag, &ok))
504                                 break; // Queue shut down.
505
506                         static_cast<CallData*>(tag)->process(ok);
507                 }
508         } /* Mainloop() */
509
510 private:
511         Collectd::AsyncService collectd_service_;
512         Dispatch::AsyncService dispatch_service_;
513
514         std::unique_ptr<grpc::Server> server_;
515         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
516 }; /* class CollectdServer */
517
518 static CollectdServer *server = nullptr;
519
520 /*
521  * collectd plugin interface
522  */
523 extern "C" {
524         static pthread_t *workers;
525         static size_t workers_num = 5;
526
527         static void *worker_thread(void *arg)
528         {
529                 CollectdServer *s = (CollectdServer *)arg;
530                 s->Mainloop();
531                 return NULL;
532         } /* worker_thread() */
533
534         static int c_grpc_config_listen(oconfig_item_t *ci)
535         {
536                 if ((ci->values_num != 2)
537                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
538                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
539                         ERROR("grpc: The `%s` config option needs exactly "
540                                         "two string argument (address and port).", ci->key);
541                         return -1;
542                 }
543
544                 auto listener = Listener();
545                 listener.addr = grpc::string(ci->values[0].value.string);
546                 listener.port = grpc::string(ci->values[1].value.string);
547                 listener.ssl = nullptr;
548
549                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
550                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
551                 bool use_ssl = false;
552
553                 for (int i = 0; i < ci->children_num; i++) {
554                         oconfig_item_t *child = ci->children + i;
555
556                         if (!strcasecmp("EnableSSL", child->key)) {
557                                 if (cf_util_get_boolean(child, &use_ssl)) {
558                                         ERROR("grpc: Option `%s` expects a boolean value",
559                                                         child->key);
560                                         return -1;
561                                 }
562                         }
563                         else if (!strcasecmp("SSLRootCerts", child->key)) {
564                                 char *certs = NULL;
565                                 if (cf_util_get_string(child, &certs)) {
566                                         ERROR("grpc: Option `%s` expects a string value",
567                                                         child->key);
568                                         return -1;
569                                 }
570                                 ssl_opts->pem_root_certs = read_file(certs);
571                         }
572                         else if (!strcasecmp("SSLServerKey", child->key)) {
573                                 char *key = NULL;
574                                 if (cf_util_get_string(child, &key)) {
575                                         ERROR("grpc: Option `%s` expects a string value",
576                                                         child->key);
577                                         return -1;
578                                 }
579                                 pkcp.private_key = read_file(key);
580                         }
581                         else if (!strcasecmp("SSLServerCert", child->key)) {
582                                 char *cert = NULL;
583                                 if (cf_util_get_string(child, &cert)) {
584                                         ERROR("grpc: Option `%s` expects a string value",
585                                                         child->key);
586                                         return -1;
587                                 }
588                                 pkcp.cert_chain = read_file(cert);
589                         }
590                         else {
591                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
592                                                 child->key, ci->key);
593                         }
594                 }
595
596                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
597                 if (use_ssl)
598                         listener.ssl = ssl_opts;
599                 else
600                         delete(ssl_opts);
601
602                 listeners.push_back(listener);
603                 return 0;
604         } /* c_grpc_config_listen() */
605
606         static int c_grpc_config(oconfig_item_t *ci)
607         {
608                 int i;
609
610                 for (i = 0; i < ci->children_num; i++) {
611                         oconfig_item_t *child = ci->children + i;
612
613                         if (!strcasecmp("Listen", child->key)) {
614                                 if (c_grpc_config_listen(child))
615                                         return -1;
616                         }
617                         else if (!strcasecmp("WorkerThreads", child->key)) {
618                                 int n;
619                                 if (cf_util_get_int(child, &n))
620                                         return -1;
621                                 workers_num = (size_t)n;
622                         }
623                         else {
624                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
625                         }
626                 }
627
628                 return 0;
629         } /* c_grpc_config() */
630
631         static int c_grpc_init(void)
632         {
633                 server = new CollectdServer();
634                 size_t i;
635
636                 if (! server) {
637                         ERROR("grpc: Failed to create server");
638                         return -1;
639                 }
640
641                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
642                 if (! workers) {
643                         delete server;
644                         server = nullptr;
645
646                         ERROR("grpc: Failed to allocate worker threads");
647                         return -1;
648                 }
649
650                 server->Start();
651                 for (i = 0; i < workers_num; i++) {
652                         plugin_thread_create(&workers[i], /* attr = */ NULL,
653                                         worker_thread, server);
654                 }
655                 INFO("grpc: Started %zu workers", workers_num);
656                 return 0;
657         } /* c_grpc_init() */
658
659         static int c_grpc_shutdown(void)
660         {
661                 size_t i;
662
663                 if (!server)
664                         return -1;
665
666                 server->Shutdown();
667
668                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
669                 for (i = 0; i < workers_num; i++)
670                         pthread_join(workers[i], NULL);
671                 free(workers);
672                 workers = NULL;
673                 workers_num = 0;
674
675                 delete server;
676                 server = nullptr;
677
678                 return 0;
679         } /* c_grpc_shutdown() */
680
681         void module_register(void)
682         {
683                 plugin_register_complex_config("grpc", c_grpc_config);
684                 plugin_register_init("grpc", c_grpc_init);
685                 plugin_register_shutdown("grpc", c_grpc_shutdown);
686         } /* module_register() */
687 } /* extern "C" */
688
689 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */