grpc plugin: Add options to enable SSL protected connections.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <vector>
33
34 #include "collectd.grpc.pb.h"
35
36 extern "C" {
37 #include <fnmatch.h>
38 #include <stdbool.h>
39 #include <pthread.h>
40
41 #include "collectd.h"
42 #include "common.h"
43 #include "configfile.h"
44 #include "plugin.h"
45
46 #include "daemon/utils_cache.h"
47 }
48
49 using collectd::Collectd;
50
51 using collectd::DispatchValuesRequest;
52 using collectd::DispatchValuesReply;
53 using collectd::QueryValuesRequest;
54 using collectd::QueryValuesReply;
55
56 using google::protobuf::util::TimeUtil;
57
58 /*
59  * private types
60  */
61
62 struct Listener {
63         grpc::string addr;
64         grpc::string port;
65
66         grpc::SslServerCredentialsOptions *ssl;
67 };
68 static std::vector<Listener> listeners;
69 static grpc::string default_addr("0.0.0.0:50051");
70
71 /*
72  * helper functions
73  */
74
75 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
76 {
77         if (fnmatch(matcher->host, vl->host, 0))
78                 return false;
79
80         if (fnmatch(matcher->plugin, vl->plugin, 0))
81                 return false;
82         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
83                 return false;
84
85         if (fnmatch(matcher->type, vl->type, 0))
86                 return false;
87         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
88                 return false;
89
90         return true;
91 } /* ident_matches */
92
93 static grpc::string read_file(const char *filename)
94 {
95         std::ifstream f;
96         grpc::string s, content;
97
98         f.open(filename);
99         if (!f.is_open()) {
100                 ERROR("grpc: Failed to open '%s'", filename);
101                 return "";
102         }
103
104         while (std::getline(f, s)) {
105                 content += s;
106                 content.push_back('\n');
107         }
108         f.close();
109         return content;
110 } /* read_file */
111
112 /*
113  * proto conversion
114  */
115
116 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
117 {
118         msg->set_host(vl->host);
119         msg->set_plugin(vl->plugin);
120         if (vl->plugin_instance[0] != '\0')
121                 msg->set_plugin_instance(vl->plugin_instance);
122         msg->set_type(vl->type);
123         if (vl->type_instance[0] != '\0')
124                 msg->set_type_instance(vl->type_instance);
125 } /* marshal_ident */
126
127 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
128                 bool require_fields)
129 {
130         std::string s;
131
132         s = msg.host();
133         if (!s.length() && require_fields)
134                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
135                                 grpc::string("missing host name"));
136         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137
138         s = msg.plugin();
139         if (!s.length() && require_fields)
140                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
141                                 grpc::string("missing plugin name"));
142         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143
144         s = msg.type();
145         if (!s.length() && require_fields)
146                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
147                                 grpc::string("missing type name"));
148         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
149
150         s = msg.plugin_instance();
151         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
152
153         s = msg.type_instance();
154         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
155
156         return grpc::Status::OK;
157 } /* unmarshal_ident() */
158
159 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
160 {
161         auto id = msg->mutable_identifier();
162         marshal_ident(vl, id);
163
164         auto ds = plugin_get_ds(vl->type);
165         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
166                 return grpc::Status(grpc::StatusCode::INTERNAL,
167                                 grpc::string("failed to retrieve data-set for values"));
168         }
169
170         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
171         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
172         msg->set_allocated_time(new google::protobuf::Timestamp(t));
173         msg->set_allocated_interval(new google::protobuf::Duration(d));
174
175         for (size_t i = 0; i < vl->values_len; ++i) {
176                 auto v = msg->add_value();
177                 switch (ds->ds[i].type) {
178                         case DS_TYPE_COUNTER:
179                                 v->set_counter(vl->values[i].counter);
180                                 break;
181                         case DS_TYPE_GAUGE:
182                                 v->set_gauge(vl->values[i].gauge);
183                                 break;
184                         case DS_TYPE_DERIVE:
185                                 v->set_derive(vl->values[i].derive);
186                                 break;
187                         case DS_TYPE_ABSOLUTE:
188                                 v->set_absolute(vl->values[i].absolute);
189                                 break;
190                         default:
191                                 return grpc::Status(grpc::StatusCode::INTERNAL,
192                                                 grpc::string("unknown value type"));
193                 }
194         }
195
196         return grpc::Status::OK;
197 } /* marshal_value_list */
198
199 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
200 {
201         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
202         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
203
204         auto status = unmarshal_ident(msg.identifier(), vl, true);
205         if (!status.ok())
206                 return status;
207
208         value_t *values = NULL;
209         size_t values_len = 0;
210
211         status = grpc::Status::OK;
212         for (auto v : msg.value()) {
213                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
214                 if (!val) {
215                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
216                                         grpc::string("failed to allocate values array"));
217                         break;
218                 }
219
220                 values = val;
221                 val = values + values_len;
222                 values_len++;
223
224                 switch (v.value_case()) {
225                 case collectd::types::Value::ValueCase::kCounter:
226                         val->counter = counter_t(v.counter());
227                         break;
228                 case collectd::types::Value::ValueCase::kGauge:
229                         val->gauge = gauge_t(v.gauge());
230                         break;
231                 case collectd::types::Value::ValueCase::kDerive:
232                         val->derive = derive_t(v.derive());
233                         break;
234                 case collectd::types::Value::ValueCase::kAbsolute:
235                         val->absolute = absolute_t(v.absolute());
236                         break;
237                 default:
238                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
239                                         grpc::string("unknown value type"));
240                         break;
241                 }
242
243                 if (!status.ok())
244                         break;
245         }
246         if (status.ok()) {
247                 vl->values = values;
248                 vl->values_len = values_len;
249         }
250         else if (values) {
251                 free(values);
252         }
253
254         return status;
255 } /* unmarshal_value_list() */
256
257 /*
258  * request call-backs and call objects
259  */
260
261 static grpc::Status Process(grpc::ServerContext *ctx,
262                 DispatchValuesRequest request, DispatchValuesReply *reply)
263 {
264         value_list_t vl = VALUE_LIST_INIT;
265         auto status = unmarshal_value_list(request.values(), &vl);
266         if (!status.ok())
267                 return status;
268
269         if (plugin_dispatch_values(&vl))
270                 status = grpc::Status(grpc::StatusCode::INTERNAL,
271                                 grpc::string("failed to enqueue values for writing"));
272         return status;
273 } /* Process(): DispatchValues */
274
275 static grpc::Status Process(grpc::ServerContext *ctx,
276                 QueryValuesRequest request, QueryValuesReply *reply)
277 {
278         uc_iter_t *iter;
279         char *name = NULL;
280
281         value_list_t matcher;
282         auto status = unmarshal_ident(request.identifier(), &matcher, false);
283         if (!status.ok())
284                 return status;
285
286         if ((iter = uc_get_iterator()) == NULL) {
287                 return grpc::Status(grpc::StatusCode::INTERNAL,
288                                 grpc::string("failed to query values: cannot create iterator"));
289         }
290
291         while (uc_iterator_next(iter, &name) == 0) {
292                 value_list_t res;
293                 if (parse_identifier_vl(name, &res) != 0)
294                         return grpc::Status(grpc::StatusCode::INTERNAL,
295                                         grpc::string("failed to parse identifier"));
296
297                 if (!ident_matches(&res, &matcher))
298                         continue;
299
300                 if (uc_iterator_get_time(iter, &res.time) < 0)
301                         return grpc::Status(grpc::StatusCode::INTERNAL,
302                                         grpc::string("failed to retrieve value timestamp"));
303                 if (uc_iterator_get_interval(iter, &res.interval) < 0)
304                         return grpc::Status(grpc::StatusCode::INTERNAL,
305                                         grpc::string("failed to retrieve value interval"));
306                 if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
307                         return grpc::Status(grpc::StatusCode::INTERNAL,
308                                         grpc::string("failed to retrieve values"));
309
310                 auto vl = reply->add_values();
311                 status = marshal_value_list(&res, vl);
312                 free(res.values);
313                 if (!status.ok())
314                         return status;
315         }
316
317         uc_iterator_destroy(iter);
318
319         return grpc::Status::OK;
320 } /* Process(): QueryValues */
321
322 class Call
323 {
324 public:
325         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
326                 : service_(service), cq_(cq), status_(CREATE)
327         { }
328
329         virtual ~Call()
330         { }
331
332         void Handle()
333         {
334                 if (status_ == CREATE) {
335                         Create();
336                         status_ = PROCESS;
337                 }
338                 else if (status_ == PROCESS) {
339                         Process();
340                         status_ = FINISH;
341                 }
342                 else {
343                         GPR_ASSERT(status_ == FINISH);
344                         Finish();
345                 }
346         } /* Handle() */
347
348 protected:
349         virtual void Create() = 0;
350         virtual void Process() = 0;
351         virtual void Finish() = 0;
352
353         Collectd::AsyncService *service_;
354         grpc::ServerCompletionQueue *cq_;
355         grpc::ServerContext ctx_;
356
357 private:
358         enum CallStatus { CREATE, PROCESS, FINISH };
359         CallStatus status_;
360 }; /* class Call */
361
362 template<typename RequestT, typename ReplyT>
363 class RpcCall final : public Call
364 {
365         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
366                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
367                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
368
369 public:
370         RpcCall(Collectd::AsyncService *service,
371                         CreatorT creator, grpc::ServerCompletionQueue *cq)
372                 : Call(service, cq), creator_(creator), responder_(&ctx_)
373         {
374                 Handle();
375         } /* RpcCall() */
376
377         virtual ~RpcCall()
378         { }
379
380 private:
381         void Create()
382         {
383                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
384         } /* Create() */
385
386         void Process()
387         {
388                 // Add a new request object to the queue.
389                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
390                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
391                 responder_.Finish(reply_, status, this);
392         } /* Process() */
393
394         void Finish()
395         {
396                 delete this;
397         } /* Finish() */
398
399         CreatorT creator_;
400
401         RequestT request_;
402         ReplyT reply_;
403
404         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
405 }; /* class RpcCall */
406
407 /*
408  * gRPC server implementation
409  */
410
411 class CollectdServer final
412 {
413 public:
414         void Start()
415         {
416                 auto auth = grpc::InsecureServerCredentials();
417
418                 grpc::ServerBuilder builder;
419
420                 if (listeners.empty()) {
421                         builder.AddListeningPort(default_addr, auth);
422                         INFO("grpc: Listening on %s", default_addr.c_str());
423                 }
424                 else {
425                         for (auto l : listeners) {
426                                 grpc::string addr = l.addr + ":" + l.port;
427
428                                 auto use_ssl = grpc::string("");
429                                 auto a = auth;
430                                 if (l.ssl != nullptr) {
431                                         use_ssl = grpc::string(" (SSL enabled)");
432                                         a = grpc::SslServerCredentials(*l.ssl);
433                                 }
434
435                                 builder.AddListeningPort(addr, a);
436                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
437                         }
438                 }
439
440                 builder.RegisterService(&service_);
441                 cq_ = builder.AddCompletionQueue();
442                 server_ = builder.BuildAndStart();
443         } /* Start() */
444
445         void Shutdown()
446         {
447                 server_->Shutdown();
448                 cq_->Shutdown();
449         } /* Shutdown() */
450
451         void Mainloop()
452         {
453                 // Register request types.
454                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
455                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
456                 new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
457                                 &Collectd::AsyncService::RequestQueryValues, cq_.get());
458
459                 while (true) {
460                         void *req = NULL;
461                         bool ok = false;
462
463                         if (!cq_->Next(&req, &ok))
464                                 break; // Queue shut down.
465                         if (!ok) {
466                                 ERROR("grpc: Failed to read from queue");
467                                 break;
468                         }
469
470                         static_cast<Call *>(req)->Handle();
471                 }
472         } /* Mainloop() */
473
474 private:
475         Collectd::AsyncService service_;
476
477         std::unique_ptr<grpc::Server> server_;
478         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
479 }; /* class CollectdServer */
480
481 static CollectdServer *server = nullptr;
482
483 /*
484  * collectd plugin interface
485  */
486
487 extern "C" {
488         static pthread_t *workers;
489         static size_t workers_num = 5;
490
491         static void *worker_thread(void *arg)
492         {
493                 CollectdServer *s = (CollectdServer *)arg;
494                 s->Mainloop();
495                 return NULL;
496         } /* worker_thread() */
497
498         static int c_grpc_config_listen(oconfig_item_t *ci)
499         {
500                 if ((ci->values_num != 2)
501                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
502                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
503                         ERROR("grpc: The `%s` config option needs exactly "
504                                         "two string argument (address and port).", ci->key);
505                         return -1;
506                 }
507
508                 auto listener = Listener();
509                 listener.addr = grpc::string(ci->values[0].value.string);
510                 listener.port = grpc::string(ci->values[1].value.string);
511                 listener.ssl = nullptr;
512
513                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
514                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
515                 bool use_ssl = false;
516
517                 for (int i = 0; i < ci->children_num; i++) {
518                         oconfig_item_t *child = ci->children + i;
519
520                         if (!strcasecmp("EnableSSL", child->key)) {
521                                 if (cf_util_get_boolean(child, &use_ssl)) {
522                                         ERROR("grpc: Option `%s` expects a boolean value",
523                                                         child->key);
524                                         return -1;
525                                 }
526                         }
527                         else if (!strcasecmp("SSLRootCerts", child->key)) {
528                                 char *certs = NULL;
529                                 if (cf_util_get_string(child, &certs)) {
530                                         ERROR("grpc: Option `%s` expects a string value",
531                                                         child->key);
532                                         return -1;
533                                 }
534                                 ssl_opts->pem_root_certs = read_file(certs);
535                         }
536                         else if (!strcasecmp("SSLServerKey", child->key)) {
537                                 char *key = NULL;
538                                 if (cf_util_get_string(child, &key)) {
539                                         ERROR("grpc: Option `%s` expects a string value",
540                                                         child->key);
541                                         return -1;
542                                 }
543                                 pkcp.private_key = read_file(key);
544                         }
545                         else if (!strcasecmp("SSLServerCert", child->key)) {
546                                 char *cert = NULL;
547                                 if (cf_util_get_string(child, &cert)) {
548                                         ERROR("grpc: Option `%s` expects a string value",
549                                                         child->key);
550                                         return -1;
551                                 }
552                                 pkcp.cert_chain = read_file(cert);
553                         }
554                         else {
555                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
556                                                 child->key, ci->key);
557                         }
558                 }
559
560                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
561                 if (use_ssl)
562                         listener.ssl = ssl_opts;
563                 else
564                         delete(ssl_opts);
565
566                 listeners.push_back(listener);
567                 return 0;
568         } /* c_grpc_config_listen() */
569
570         static int c_grpc_config(oconfig_item_t *ci)
571         {
572                 int i;
573
574                 for (i = 0; i < ci->children_num; i++) {
575                         oconfig_item_t *child = ci->children + i;
576
577                         if (!strcasecmp("Listen", child->key)) {
578                                 if (c_grpc_config_listen(child))
579                                         return -1;
580                         }
581                         else if (!strcasecmp("WorkerThreads", child->key)) {
582                                 int n;
583                                 if (cf_util_get_int(child, &n))
584                                         return -1;
585                                 workers_num = (size_t)n;
586                         }
587                         else {
588                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
589                         }
590                 }
591
592                 return 0;
593         } /* c_grpc_config() */
594
595         static int c_grpc_init(void)
596         {
597                 server = new CollectdServer();
598                 size_t i;
599
600                 if (! server) {
601                         ERROR("grpc: Failed to create server");
602                         return -1;
603                 }
604
605                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
606                 if (! workers) {
607                         delete server;
608                         server = nullptr;
609
610                         ERROR("grpc: Failed to allocate worker threads");
611                         return -1;
612                 }
613
614                 server->Start();
615                 for (i = 0; i < workers_num; i++) {
616                         plugin_thread_create(&workers[i], /* attr = */ NULL,
617                                         worker_thread, server);
618                 }
619                 INFO("grpc: Started %zu workers", workers_num);
620                 return 0;
621         } /* c_grpc_init() */
622
623         static int c_grpc_shutdown(void)
624         {
625                 size_t i;
626
627                 if (!server)
628                         return -1;
629
630                 server->Shutdown();
631
632                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
633                 for (i = 0; i < workers_num; i++)
634                         pthread_join(workers[i], NULL);
635                 free(workers);
636                 workers = NULL;
637                 workers_num = 0;
638
639                 delete server;
640                 server = nullptr;
641
642                 return 0;
643         } /* c_grpc_shutdown() */
644
645         void module_register(void)
646         {
647                 plugin_register_complex_config("grpc", c_grpc_config);
648                 plugin_register_init("grpc", c_grpc_init);
649                 plugin_register_shutdown("grpc", c_grpc_shutdown);
650         } /* module_register() */
651 } /* extern "C" */
652
653 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */