Remove configfile.h from plugins
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <vector>
33
34 #include "collectd.grpc.pb.h"
35
36 extern "C" {
37 #include <fnmatch.h>
38 #include <stdbool.h>
39
40 #include "collectd.h"
41 #include "common.h"
42 #include "plugin.h"
43
44 #include "daemon/utils_cache.h"
45 }
46
47 using collectd::Collectd;
48
49 using collectd::DispatchValuesRequest;
50 using collectd::DispatchValuesReply;
51 using collectd::QueryValuesRequest;
52 using collectd::QueryValuesReply;
53
54 using google::protobuf::util::TimeUtil;
55
56 /*
57  * private types
58  */
59
60 struct Listener {
61         grpc::string addr;
62         grpc::string port;
63
64         grpc::SslServerCredentialsOptions *ssl;
65 };
66 static std::vector<Listener> listeners;
67 static grpc::string default_addr("0.0.0.0:50051");
68
69 /*
70  * helper functions
71  */
72
73 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
74 {
75         if (fnmatch(matcher->host, vl->host, 0))
76                 return false;
77
78         if (fnmatch(matcher->plugin, vl->plugin, 0))
79                 return false;
80         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
81                 return false;
82
83         if (fnmatch(matcher->type, vl->type, 0))
84                 return false;
85         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
86                 return false;
87
88         return true;
89 } /* ident_matches */
90
91 static grpc::string read_file(const char *filename)
92 {
93         std::ifstream f;
94         grpc::string s, content;
95
96         f.open(filename);
97         if (!f.is_open()) {
98                 ERROR("grpc: Failed to open '%s'", filename);
99                 return "";
100         }
101
102         while (std::getline(f, s)) {
103                 content += s;
104                 content.push_back('\n');
105         }
106         f.close();
107         return content;
108 } /* read_file */
109
110 /*
111  * proto conversion
112  */
113
114 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
115 {
116         msg->set_host(vl->host);
117         msg->set_plugin(vl->plugin);
118         if (vl->plugin_instance[0] != '\0')
119                 msg->set_plugin_instance(vl->plugin_instance);
120         msg->set_type(vl->type);
121         if (vl->type_instance[0] != '\0')
122                 msg->set_type_instance(vl->type_instance);
123 } /* marshal_ident */
124
125 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
126                 bool require_fields)
127 {
128         std::string s;
129
130         s = msg.host();
131         if (!s.length() && require_fields)
132                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
133                                 grpc::string("missing host name"));
134         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
135
136         s = msg.plugin();
137         if (!s.length() && require_fields)
138                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
139                                 grpc::string("missing plugin name"));
140         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
141
142         s = msg.type();
143         if (!s.length() && require_fields)
144                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
145                                 grpc::string("missing type name"));
146         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
147
148         s = msg.plugin_instance();
149         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
150
151         s = msg.type_instance();
152         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
153
154         return grpc::Status::OK;
155 } /* unmarshal_ident() */
156
157 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
158 {
159         auto id = msg->mutable_identifier();
160         marshal_ident(vl, id);
161
162         auto ds = plugin_get_ds(vl->type);
163         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
164                 return grpc::Status(grpc::StatusCode::INTERNAL,
165                                 grpc::string("failed to retrieve data-set for values"));
166         }
167
168         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
169         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
170         msg->set_allocated_time(new google::protobuf::Timestamp(t));
171         msg->set_allocated_interval(new google::protobuf::Duration(d));
172
173         for (size_t i = 0; i < vl->values_len; ++i) {
174                 auto v = msg->add_value();
175                 switch (ds->ds[i].type) {
176                         case DS_TYPE_COUNTER:
177                                 v->set_counter(vl->values[i].counter);
178                                 break;
179                         case DS_TYPE_GAUGE:
180                                 v->set_gauge(vl->values[i].gauge);
181                                 break;
182                         case DS_TYPE_DERIVE:
183                                 v->set_derive(vl->values[i].derive);
184                                 break;
185                         case DS_TYPE_ABSOLUTE:
186                                 v->set_absolute(vl->values[i].absolute);
187                                 break;
188                         default:
189                                 return grpc::Status(grpc::StatusCode::INTERNAL,
190                                                 grpc::string("unknown value type"));
191                 }
192         }
193
194         return grpc::Status::OK;
195 } /* marshal_value_list */
196
197 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
198 {
199         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
200         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
201
202         auto status = unmarshal_ident(msg.identifier(), vl, true);
203         if (!status.ok())
204                 return status;
205
206         value_t *values = NULL;
207         size_t values_len = 0;
208
209         status = grpc::Status::OK;
210         for (auto v : msg.value()) {
211                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
212                 if (!val) {
213                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
214                                         grpc::string("failed to allocate values array"));
215                         break;
216                 }
217
218                 values = val;
219                 val = values + values_len;
220                 values_len++;
221
222                 switch (v.value_case()) {
223                 case collectd::types::Value::ValueCase::kCounter:
224                         val->counter = counter_t(v.counter());
225                         break;
226                 case collectd::types::Value::ValueCase::kGauge:
227                         val->gauge = gauge_t(v.gauge());
228                         break;
229                 case collectd::types::Value::ValueCase::kDerive:
230                         val->derive = derive_t(v.derive());
231                         break;
232                 case collectd::types::Value::ValueCase::kAbsolute:
233                         val->absolute = absolute_t(v.absolute());
234                         break;
235                 default:
236                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
237                                         grpc::string("unknown value type"));
238                         break;
239                 }
240
241                 if (!status.ok())
242                         break;
243         }
244         if (status.ok()) {
245                 vl->values = values;
246                 vl->values_len = values_len;
247         }
248         else if (values) {
249                 free(values);
250         }
251
252         return status;
253 } /* unmarshal_value_list() */
254
255 /*
256  * request call-backs and call objects
257  */
258
259 static grpc::Status Process(grpc::ServerContext *ctx,
260                 DispatchValuesRequest request, DispatchValuesReply *reply)
261 {
262         value_list_t vl = VALUE_LIST_INIT;
263         auto status = unmarshal_value_list(request.values(), &vl);
264         if (!status.ok())
265                 return status;
266
267         if (plugin_dispatch_values(&vl))
268                 status = grpc::Status(grpc::StatusCode::INTERNAL,
269                                 grpc::string("failed to enqueue values for writing"));
270         return status;
271 } /* Process(): DispatchValues */
272
273 static grpc::Status Process(grpc::ServerContext *ctx,
274                 QueryValuesRequest request, QueryValuesReply *reply)
275 {
276         uc_iter_t *iter;
277         char *name = NULL;
278
279         value_list_t matcher;
280         auto status = unmarshal_ident(request.identifier(), &matcher, false);
281         if (!status.ok())
282                 return status;
283
284         if ((iter = uc_get_iterator()) == NULL) {
285                 return grpc::Status(grpc::StatusCode::INTERNAL,
286                                 grpc::string("failed to query values: cannot create iterator"));
287         }
288
289         status = grpc::Status::OK;
290         while (uc_iterator_next(iter, &name) == 0) {
291                 value_list_t res;
292                 if (parse_identifier_vl(name, &res) != 0) {
293                         status = grpc::Status(grpc::StatusCode::INTERNAL,
294                                         grpc::string("failed to parse identifier"));
295                         break;
296                 }
297
298                 if (!ident_matches(&res, &matcher))
299                         continue;
300
301                 if (uc_iterator_get_time(iter, &res.time) < 0) {
302                         status = grpc::Status(grpc::StatusCode::INTERNAL,
303                                         grpc::string("failed to retrieve value timestamp"));
304                         break;
305                 }
306                 if (uc_iterator_get_interval(iter, &res.interval) < 0) {
307                         status = grpc::Status(grpc::StatusCode::INTERNAL,
308                                         grpc::string("failed to retrieve value interval"));
309                         break;
310                 }
311                 if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
312                         status = grpc::Status(grpc::StatusCode::INTERNAL,
313                                         grpc::string("failed to retrieve values"));
314                         break;
315                 }
316
317                 auto vl = reply->add_values();
318                 status = marshal_value_list(&res, vl);
319                 free(res.values);
320                 if (!status.ok())
321                         break;
322         }
323
324         uc_iterator_destroy(iter);
325
326         return status;
327 } /* Process(): QueryValues */
328
329 class Call
330 {
331 public:
332         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
333                 : service_(service), cq_(cq), status_(CREATE)
334         { }
335
336         virtual ~Call()
337         { }
338
339         void Handle()
340         {
341                 if (status_ == CREATE) {
342                         Create();
343                         status_ = PROCESS;
344                 }
345                 else if (status_ == PROCESS) {
346                         Process();
347                         status_ = FINISH;
348                 }
349                 else {
350                         GPR_ASSERT(status_ == FINISH);
351                         Finish();
352                 }
353         } /* Handle() */
354
355 protected:
356         virtual void Create() = 0;
357         virtual void Process() = 0;
358         virtual void Finish() = 0;
359
360         Collectd::AsyncService *service_;
361         grpc::ServerCompletionQueue *cq_;
362         grpc::ServerContext ctx_;
363
364 private:
365         enum CallStatus { CREATE, PROCESS, FINISH };
366         CallStatus status_;
367 }; /* class Call */
368
369 template<typename RequestT, typename ReplyT>
370 class RpcCall final : public Call
371 {
372         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
373                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
374                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
375
376 public:
377         RpcCall(Collectd::AsyncService *service,
378                         CreatorT creator, grpc::ServerCompletionQueue *cq)
379                 : Call(service, cq), creator_(creator), responder_(&ctx_)
380         {
381                 Handle();
382         } /* RpcCall() */
383
384         virtual ~RpcCall()
385         { }
386
387 private:
388         void Create()
389         {
390                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
391         } /* Create() */
392
393         void Process()
394         {
395                 // Add a new request object to the queue.
396                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
397                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
398                 responder_.Finish(reply_, status, this);
399         } /* Process() */
400
401         void Finish()
402         {
403                 delete this;
404         } /* Finish() */
405
406         CreatorT creator_;
407
408         RequestT request_;
409         ReplyT reply_;
410
411         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
412 }; /* class RpcCall */
413
414 /*
415  * gRPC server implementation
416  */
417
418 class CollectdServer final
419 {
420 public:
421         void Start()
422         {
423                 auto auth = grpc::InsecureServerCredentials();
424
425                 grpc::ServerBuilder builder;
426
427                 if (listeners.empty()) {
428                         builder.AddListeningPort(default_addr, auth);
429                         INFO("grpc: Listening on %s", default_addr.c_str());
430                 }
431                 else {
432                         for (auto l : listeners) {
433                                 grpc::string addr = l.addr + ":" + l.port;
434
435                                 auto use_ssl = grpc::string("");
436                                 auto a = auth;
437                                 if (l.ssl != nullptr) {
438                                         use_ssl = grpc::string(" (SSL enabled)");
439                                         a = grpc::SslServerCredentials(*l.ssl);
440                                 }
441
442                                 builder.AddListeningPort(addr, a);
443                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
444                         }
445                 }
446
447                 builder.RegisterService(&service_);
448                 cq_ = builder.AddCompletionQueue();
449                 server_ = builder.BuildAndStart();
450         } /* Start() */
451
452         void Shutdown()
453         {
454                 server_->Shutdown();
455                 cq_->Shutdown();
456         } /* Shutdown() */
457
458         void Mainloop()
459         {
460                 // Register request types.
461                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
462                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
463                 new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
464                                 &Collectd::AsyncService::RequestQueryValues, cq_.get());
465
466                 while (true) {
467                         void *req = NULL;
468                         bool ok = false;
469
470                         if (!cq_->Next(&req, &ok))
471                                 break; // Queue shut down.
472                         if (!ok) {
473                                 ERROR("grpc: Failed to read from queue");
474                                 break;
475                         }
476
477                         static_cast<Call *>(req)->Handle();
478                 }
479         } /* Mainloop() */
480
481 private:
482         Collectd::AsyncService service_;
483
484         std::unique_ptr<grpc::Server> server_;
485         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
486 }; /* class CollectdServer */
487
488 static CollectdServer *server = nullptr;
489
490 /*
491  * collectd plugin interface
492  */
493
494 extern "C" {
495         static pthread_t *workers;
496         static size_t workers_num = 5;
497
498         static void *worker_thread(void *arg)
499         {
500                 CollectdServer *s = (CollectdServer *)arg;
501                 s->Mainloop();
502                 return NULL;
503         } /* worker_thread() */
504
505         static int c_grpc_config_listen(oconfig_item_t *ci)
506         {
507                 if ((ci->values_num != 2)
508                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
509                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
510                         ERROR("grpc: The `%s` config option needs exactly "
511                                         "two string argument (address and port).", ci->key);
512                         return -1;
513                 }
514
515                 auto listener = Listener();
516                 listener.addr = grpc::string(ci->values[0].value.string);
517                 listener.port = grpc::string(ci->values[1].value.string);
518                 listener.ssl = nullptr;
519
520                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
521                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
522                 bool use_ssl = false;
523
524                 for (int i = 0; i < ci->children_num; i++) {
525                         oconfig_item_t *child = ci->children + i;
526
527                         if (!strcasecmp("EnableSSL", child->key)) {
528                                 if (cf_util_get_boolean(child, &use_ssl)) {
529                                         ERROR("grpc: Option `%s` expects a boolean value",
530                                                         child->key);
531                                         return -1;
532                                 }
533                         }
534                         else if (!strcasecmp("SSLRootCerts", child->key)) {
535                                 char *certs = NULL;
536                                 if (cf_util_get_string(child, &certs)) {
537                                         ERROR("grpc: Option `%s` expects a string value",
538                                                         child->key);
539                                         return -1;
540                                 }
541                                 ssl_opts->pem_root_certs = read_file(certs);
542                         }
543                         else if (!strcasecmp("SSLServerKey", child->key)) {
544                                 char *key = NULL;
545                                 if (cf_util_get_string(child, &key)) {
546                                         ERROR("grpc: Option `%s` expects a string value",
547                                                         child->key);
548                                         return -1;
549                                 }
550                                 pkcp.private_key = read_file(key);
551                         }
552                         else if (!strcasecmp("SSLServerCert", child->key)) {
553                                 char *cert = NULL;
554                                 if (cf_util_get_string(child, &cert)) {
555                                         ERROR("grpc: Option `%s` expects a string value",
556                                                         child->key);
557                                         return -1;
558                                 }
559                                 pkcp.cert_chain = read_file(cert);
560                         }
561                         else {
562                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
563                                                 child->key, ci->key);
564                         }
565                 }
566
567                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
568                 if (use_ssl)
569                         listener.ssl = ssl_opts;
570                 else
571                         delete(ssl_opts);
572
573                 listeners.push_back(listener);
574                 return 0;
575         } /* c_grpc_config_listen() */
576
577         static int c_grpc_config(oconfig_item_t *ci)
578         {
579                 int i;
580
581                 for (i = 0; i < ci->children_num; i++) {
582                         oconfig_item_t *child = ci->children + i;
583
584                         if (!strcasecmp("Listen", child->key)) {
585                                 if (c_grpc_config_listen(child))
586                                         return -1;
587                         }
588                         else if (!strcasecmp("WorkerThreads", child->key)) {
589                                 int n;
590                                 if (cf_util_get_int(child, &n))
591                                         return -1;
592                                 workers_num = (size_t)n;
593                         }
594                         else {
595                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
596                         }
597                 }
598
599                 return 0;
600         } /* c_grpc_config() */
601
602         static int c_grpc_init(void)
603         {
604                 server = new CollectdServer();
605                 size_t i;
606
607                 if (! server) {
608                         ERROR("grpc: Failed to create server");
609                         return -1;
610                 }
611
612                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
613                 if (! workers) {
614                         delete server;
615                         server = nullptr;
616
617                         ERROR("grpc: Failed to allocate worker threads");
618                         return -1;
619                 }
620
621                 server->Start();
622                 for (i = 0; i < workers_num; i++) {
623                         plugin_thread_create(&workers[i], /* attr = */ NULL,
624                                         worker_thread, server);
625                 }
626                 INFO("grpc: Started %zu workers", workers_num);
627                 return 0;
628         } /* c_grpc_init() */
629
630         static int c_grpc_shutdown(void)
631         {
632                 size_t i;
633
634                 if (!server)
635                         return -1;
636
637                 server->Shutdown();
638
639                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
640                 for (i = 0; i < workers_num; i++)
641                         pthread_join(workers[i], NULL);
642                 free(workers);
643                 workers = NULL;
644                 workers_num = 0;
645
646                 delete server;
647                 server = nullptr;
648
649                 return 0;
650         } /* c_grpc_shutdown() */
651
652         void module_register(void)
653         {
654                 plugin_register_complex_config("grpc", c_grpc_config);
655                 plugin_register_init("grpc", c_grpc_init);
656                 plugin_register_shutdown("grpc", c_grpc_shutdown);
657         } /* module_register() */
658 } /* extern "C" */
659
660 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */