proto/collectd.proto: Remove unused import "google/protobuf/timestamp".
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <vector>
33
34 #include "collectd.grpc.pb.h"
35
36 extern "C" {
37 #include <fnmatch.h>
38 #include <stdbool.h>
39
40 #include "collectd.h"
41 #include "common.h"
42 #include "configfile.h"
43 #include "plugin.h"
44
45 #include "daemon/utils_cache.h"
46 }
47
48 using collectd::Collectd;
49
50 using collectd::DispatchValuesRequest;
51 using collectd::DispatchValuesReply;
52 using collectd::QueryValuesRequest;
53 using collectd::QueryValuesReply;
54
55 using google::protobuf::util::TimeUtil;
56
57 /*
58  * private types
59  */
60
61 struct Listener {
62         grpc::string addr;
63         grpc::string port;
64
65         grpc::SslServerCredentialsOptions *ssl;
66 };
67 static std::vector<Listener> listeners;
68 static grpc::string default_addr("0.0.0.0:50051");
69
70 /*
71  * helper functions
72  */
73
74 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
75 {
76         if (fnmatch(matcher->host, vl->host, 0))
77                 return false;
78
79         if (fnmatch(matcher->plugin, vl->plugin, 0))
80                 return false;
81         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
82                 return false;
83
84         if (fnmatch(matcher->type, vl->type, 0))
85                 return false;
86         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
87                 return false;
88
89         return true;
90 } /* ident_matches */
91
92 static grpc::string read_file(const char *filename)
93 {
94         std::ifstream f;
95         grpc::string s, content;
96
97         f.open(filename);
98         if (!f.is_open()) {
99                 ERROR("grpc: Failed to open '%s'", filename);
100                 return "";
101         }
102
103         while (std::getline(f, s)) {
104                 content += s;
105                 content.push_back('\n');
106         }
107         f.close();
108         return content;
109 } /* read_file */
110
111 /*
112  * proto conversion
113  */
114
115 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
116 {
117         msg->set_host(vl->host);
118         msg->set_plugin(vl->plugin);
119         if (vl->plugin_instance[0] != '\0')
120                 msg->set_plugin_instance(vl->plugin_instance);
121         msg->set_type(vl->type);
122         if (vl->type_instance[0] != '\0')
123                 msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
125
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
127                 bool require_fields)
128 {
129         std::string s;
130
131         s = msg.host();
132         if (!s.length() && require_fields)
133                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
134                                 grpc::string("missing host name"));
135         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
136
137         s = msg.plugin();
138         if (!s.length() && require_fields)
139                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
140                                 grpc::string("missing plugin name"));
141         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
142
143         s = msg.type();
144         if (!s.length() && require_fields)
145                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
146                                 grpc::string("missing type name"));
147         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
148
149         s = msg.plugin_instance();
150         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
151
152         s = msg.type_instance();
153         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
154
155         return grpc::Status::OK;
156 } /* unmarshal_ident() */
157
158 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
159 {
160         auto id = msg->mutable_identifier();
161         marshal_ident(vl, id);
162
163         auto ds = plugin_get_ds(vl->type);
164         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
165                 return grpc::Status(grpc::StatusCode::INTERNAL,
166                                 grpc::string("failed to retrieve data-set for values"));
167         }
168
169         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
170         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
171         msg->set_allocated_time(new google::protobuf::Timestamp(t));
172         msg->set_allocated_interval(new google::protobuf::Duration(d));
173
174         for (size_t i = 0; i < vl->values_len; ++i) {
175                 auto v = msg->add_value();
176                 switch (ds->ds[i].type) {
177                         case DS_TYPE_COUNTER:
178                                 v->set_counter(vl->values[i].counter);
179                                 break;
180                         case DS_TYPE_GAUGE:
181                                 v->set_gauge(vl->values[i].gauge);
182                                 break;
183                         case DS_TYPE_DERIVE:
184                                 v->set_derive(vl->values[i].derive);
185                                 break;
186                         case DS_TYPE_ABSOLUTE:
187                                 v->set_absolute(vl->values[i].absolute);
188                                 break;
189                         default:
190                                 return grpc::Status(grpc::StatusCode::INTERNAL,
191                                                 grpc::string("unknown value type"));
192                 }
193         }
194
195         return grpc::Status::OK;
196 } /* marshal_value_list */
197
198 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
199 {
200         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
201         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
202
203         auto status = unmarshal_ident(msg.identifier(), vl, true);
204         if (!status.ok())
205                 return status;
206
207         value_t *values = NULL;
208         size_t values_len = 0;
209
210         status = grpc::Status::OK;
211         for (auto v : msg.value()) {
212                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
213                 if (!val) {
214                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
215                                         grpc::string("failed to allocate values array"));
216                         break;
217                 }
218
219                 values = val;
220                 val = values + values_len;
221                 values_len++;
222
223                 switch (v.value_case()) {
224                 case collectd::types::Value::ValueCase::kCounter:
225                         val->counter = counter_t(v.counter());
226                         break;
227                 case collectd::types::Value::ValueCase::kGauge:
228                         val->gauge = gauge_t(v.gauge());
229                         break;
230                 case collectd::types::Value::ValueCase::kDerive:
231                         val->derive = derive_t(v.derive());
232                         break;
233                 case collectd::types::Value::ValueCase::kAbsolute:
234                         val->absolute = absolute_t(v.absolute());
235                         break;
236                 default:
237                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
238                                         grpc::string("unknown value type"));
239                         break;
240                 }
241
242                 if (!status.ok())
243                         break;
244         }
245         if (status.ok()) {
246                 vl->values = values;
247                 vl->values_len = values_len;
248         }
249         else if (values) {
250                 free(values);
251         }
252
253         return status;
254 } /* unmarshal_value_list() */
255
256 /*
257  * request call-backs and call objects
258  */
259
260 static grpc::Status Process(grpc::ServerContext *ctx,
261                 DispatchValuesRequest request, DispatchValuesReply *reply)
262 {
263         value_list_t vl = VALUE_LIST_INIT;
264         auto status = unmarshal_value_list(request.values(), &vl);
265         if (!status.ok())
266                 return status;
267
268         if (plugin_dispatch_values(&vl))
269                 status = grpc::Status(grpc::StatusCode::INTERNAL,
270                                 grpc::string("failed to enqueue values for writing"));
271         return status;
272 } /* Process(): DispatchValues */
273
274 static grpc::Status Process(grpc::ServerContext *ctx,
275                 QueryValuesRequest request, QueryValuesReply *reply)
276 {
277         uc_iter_t *iter;
278         char *name = NULL;
279
280         value_list_t matcher;
281         auto status = unmarshal_ident(request.identifier(), &matcher, false);
282         if (!status.ok())
283                 return status;
284
285         if ((iter = uc_get_iterator()) == NULL) {
286                 return grpc::Status(grpc::StatusCode::INTERNAL,
287                                 grpc::string("failed to query values: cannot create iterator"));
288         }
289
290         while (uc_iterator_next(iter, &name) == 0) {
291                 value_list_t res;
292                 if (parse_identifier_vl(name, &res) != 0) {
293                         uc_iterator_destroy(iter);
294                         return grpc::Status(grpc::StatusCode::INTERNAL,
295                                         grpc::string("failed to parse identifier"));
296                 }
297
298                 if (!ident_matches(&res, &matcher))
299                         continue;
300
301                 if (uc_iterator_get_time(iter, &res.time) < 0) {
302                         uc_iterator_destroy(iter);
303                         return grpc::Status(grpc::StatusCode::INTERNAL,
304                                         grpc::string("failed to retrieve value timestamp"));
305                 }
306                 if (uc_iterator_get_interval(iter, &res.interval) < 0) {
307                         uc_iterator_destroy(iter);
308                         return grpc::Status(grpc::StatusCode::INTERNAL,
309                                         grpc::string("failed to retrieve value interval"));
310                 }
311                 if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
312                         uc_iterator_destroy(iter);
313                         return grpc::Status(grpc::StatusCode::INTERNAL,
314                                         grpc::string("failed to retrieve values"));
315                 }
316
317                 auto vl = reply->add_values();
318                 status = marshal_value_list(&res, vl);
319                 free(res.values);
320                 if (!status.ok()) {
321                         uc_iterator_destroy(iter);
322                         return status;
323                 }
324         }
325
326         uc_iterator_destroy(iter);
327
328         return grpc::Status::OK;
329 } /* Process(): QueryValues */
330
331 class Call
332 {
333 public:
334         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
335                 : service_(service), cq_(cq), status_(CREATE)
336         { }
337
338         virtual ~Call()
339         { }
340
341         void Handle()
342         {
343                 if (status_ == CREATE) {
344                         Create();
345                         status_ = PROCESS;
346                 }
347                 else if (status_ == PROCESS) {
348                         Process();
349                         status_ = FINISH;
350                 }
351                 else {
352                         GPR_ASSERT(status_ == FINISH);
353                         Finish();
354                 }
355         } /* Handle() */
356
357 protected:
358         virtual void Create() = 0;
359         virtual void Process() = 0;
360         virtual void Finish() = 0;
361
362         Collectd::AsyncService *service_;
363         grpc::ServerCompletionQueue *cq_;
364         grpc::ServerContext ctx_;
365
366 private:
367         enum CallStatus { CREATE, PROCESS, FINISH };
368         CallStatus status_;
369 }; /* class Call */
370
371 template<typename RequestT, typename ReplyT>
372 class RpcCall final : public Call
373 {
374         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
375                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
376                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
377
378 public:
379         RpcCall(Collectd::AsyncService *service,
380                         CreatorT creator, grpc::ServerCompletionQueue *cq)
381                 : Call(service, cq), creator_(creator), responder_(&ctx_)
382         {
383                 Handle();
384         } /* RpcCall() */
385
386         virtual ~RpcCall()
387         { }
388
389 private:
390         void Create()
391         {
392                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
393         } /* Create() */
394
395         void Process()
396         {
397                 // Add a new request object to the queue.
398                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
399                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
400                 responder_.Finish(reply_, status, this);
401         } /* Process() */
402
403         void Finish()
404         {
405                 delete this;
406         } /* Finish() */
407
408         CreatorT creator_;
409
410         RequestT request_;
411         ReplyT reply_;
412
413         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
414 }; /* class RpcCall */
415
416 /*
417  * gRPC server implementation
418  */
419
420 class CollectdServer final
421 {
422 public:
423         void Start()
424         {
425                 auto auth = grpc::InsecureServerCredentials();
426
427                 grpc::ServerBuilder builder;
428
429                 if (listeners.empty()) {
430                         builder.AddListeningPort(default_addr, auth);
431                         INFO("grpc: Listening on %s", default_addr.c_str());
432                 }
433                 else {
434                         for (auto l : listeners) {
435                                 grpc::string addr = l.addr + ":" + l.port;
436
437                                 auto use_ssl = grpc::string("");
438                                 auto a = auth;
439                                 if (l.ssl != nullptr) {
440                                         use_ssl = grpc::string(" (SSL enabled)");
441                                         a = grpc::SslServerCredentials(*l.ssl);
442                                 }
443
444                                 builder.AddListeningPort(addr, a);
445                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
446                         }
447                 }
448
449                 builder.RegisterService(&service_);
450                 cq_ = builder.AddCompletionQueue();
451                 server_ = builder.BuildAndStart();
452         } /* Start() */
453
454         void Shutdown()
455         {
456                 server_->Shutdown();
457                 cq_->Shutdown();
458         } /* Shutdown() */
459
460         void Mainloop()
461         {
462                 // Register request types.
463                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
464                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
465                 new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
466                                 &Collectd::AsyncService::RequestQueryValues, cq_.get());
467
468                 while (true) {
469                         void *req = NULL;
470                         bool ok = false;
471
472                         if (!cq_->Next(&req, &ok))
473                                 break; // Queue shut down.
474                         if (!ok) {
475                                 ERROR("grpc: Failed to read from queue");
476                                 break;
477                         }
478
479                         static_cast<Call *>(req)->Handle();
480                 }
481         } /* Mainloop() */
482
483 private:
484         Collectd::AsyncService service_;
485
486         std::unique_ptr<grpc::Server> server_;
487         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
488 }; /* class CollectdServer */
489
490 static CollectdServer *server = nullptr;
491
492 /*
493  * collectd plugin interface
494  */
495
496 extern "C" {
497         static pthread_t *workers;
498         static size_t workers_num = 5;
499
500         static void *worker_thread(void *arg)
501         {
502                 CollectdServer *s = (CollectdServer *)arg;
503                 s->Mainloop();
504                 return NULL;
505         } /* worker_thread() */
506
507         static int c_grpc_config_listen(oconfig_item_t *ci)
508         {
509                 if ((ci->values_num != 2)
510                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
511                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
512                         ERROR("grpc: The `%s` config option needs exactly "
513                                         "two string argument (address and port).", ci->key);
514                         return -1;
515                 }
516
517                 auto listener = Listener();
518                 listener.addr = grpc::string(ci->values[0].value.string);
519                 listener.port = grpc::string(ci->values[1].value.string);
520                 listener.ssl = nullptr;
521
522                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
523                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
524                 bool use_ssl = false;
525
526                 for (int i = 0; i < ci->children_num; i++) {
527                         oconfig_item_t *child = ci->children + i;
528
529                         if (!strcasecmp("EnableSSL", child->key)) {
530                                 if (cf_util_get_boolean(child, &use_ssl)) {
531                                         ERROR("grpc: Option `%s` expects a boolean value",
532                                                         child->key);
533                                         return -1;
534                                 }
535                         }
536                         else if (!strcasecmp("SSLRootCerts", child->key)) {
537                                 char *certs = NULL;
538                                 if (cf_util_get_string(child, &certs)) {
539                                         ERROR("grpc: Option `%s` expects a string value",
540                                                         child->key);
541                                         return -1;
542                                 }
543                                 ssl_opts->pem_root_certs = read_file(certs);
544                         }
545                         else if (!strcasecmp("SSLServerKey", child->key)) {
546                                 char *key = NULL;
547                                 if (cf_util_get_string(child, &key)) {
548                                         ERROR("grpc: Option `%s` expects a string value",
549                                                         child->key);
550                                         return -1;
551                                 }
552                                 pkcp.private_key = read_file(key);
553                         }
554                         else if (!strcasecmp("SSLServerCert", child->key)) {
555                                 char *cert = NULL;
556                                 if (cf_util_get_string(child, &cert)) {
557                                         ERROR("grpc: Option `%s` expects a string value",
558                                                         child->key);
559                                         return -1;
560                                 }
561                                 pkcp.cert_chain = read_file(cert);
562                         }
563                         else {
564                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
565                                                 child->key, ci->key);
566                         }
567                 }
568
569                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
570                 if (use_ssl)
571                         listener.ssl = ssl_opts;
572                 else
573                         delete(ssl_opts);
574
575                 listeners.push_back(listener);
576                 return 0;
577         } /* c_grpc_config_listen() */
578
579         static int c_grpc_config(oconfig_item_t *ci)
580         {
581                 int i;
582
583                 for (i = 0; i < ci->children_num; i++) {
584                         oconfig_item_t *child = ci->children + i;
585
586                         if (!strcasecmp("Listen", child->key)) {
587                                 if (c_grpc_config_listen(child))
588                                         return -1;
589                         }
590                         else if (!strcasecmp("WorkerThreads", child->key)) {
591                                 int n;
592                                 if (cf_util_get_int(child, &n))
593                                         return -1;
594                                 workers_num = (size_t)n;
595                         }
596                         else {
597                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
598                         }
599                 }
600
601                 return 0;
602         } /* c_grpc_config() */
603
604         static int c_grpc_init(void)
605         {
606                 server = new CollectdServer();
607                 size_t i;
608
609                 if (! server) {
610                         ERROR("grpc: Failed to create server");
611                         return -1;
612                 }
613
614                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
615                 if (! workers) {
616                         delete server;
617                         server = nullptr;
618
619                         ERROR("grpc: Failed to allocate worker threads");
620                         return -1;
621                 }
622
623                 server->Start();
624                 for (i = 0; i < workers_num; i++) {
625                         plugin_thread_create(&workers[i], /* attr = */ NULL,
626                                         worker_thread, server);
627                 }
628                 INFO("grpc: Started %zu workers", workers_num);
629                 return 0;
630         } /* c_grpc_init() */
631
632         static int c_grpc_shutdown(void)
633         {
634                 size_t i;
635
636                 if (!server)
637                         return -1;
638
639                 server->Shutdown();
640
641                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
642                 for (i = 0; i < workers_num; i++)
643                         pthread_join(workers[i], NULL);
644                 free(workers);
645                 workers = NULL;
646                 workers_num = 0;
647
648                 delete server;
649                 server = nullptr;
650
651                 return 0;
652         } /* c_grpc_shutdown() */
653
654         void module_register(void)
655         {
656                 plugin_register_complex_config("grpc", c_grpc_config);
657                 plugin_register_init("grpc", c_grpc_init);
658                 plugin_register_shutdown("grpc", c_grpc_shutdown);
659         } /* module_register() */
660 } /* extern "C" */
661
662 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */