grpc plugin: Switch to the synchronous interface.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <queue>
33 #include <vector>
34
35 #include "collectd.grpc.pb.h"
36
37 extern "C" {
38 #include <fnmatch.h>
39 #include <stdbool.h>
40
41 #include "collectd.h"
42 #include "common.h"
43 #include "configfile.h"
44 #include "plugin.h"
45
46 #include "daemon/utils_cache.h"
47 }
48
49 using collectd::Collectd;
50 using collectd::Dispatch;
51
52 using collectd::DispatchValuesRequest;
53 using collectd::DispatchValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 /*
60  * private types
61  */
62
63 struct Listener {
64         grpc::string addr;
65         grpc::string port;
66
67         grpc::SslServerCredentialsOptions *ssl;
68 };
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
71
72 /*
73  * helper functions
74  */
75
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
77 {
78         if (fnmatch(matcher->host, vl->host, 0))
79                 return false;
80
81         if (fnmatch(matcher->plugin, vl->plugin, 0))
82                 return false;
83         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
84                 return false;
85
86         if (fnmatch(matcher->type, vl->type, 0))
87                 return false;
88         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
89                 return false;
90
91         return true;
92 } /* ident_matches */
93
94 static grpc::string read_file(const char *filename)
95 {
96         std::ifstream f;
97         grpc::string s, content;
98
99         f.open(filename);
100         if (!f.is_open()) {
101                 ERROR("grpc: Failed to open '%s'", filename);
102                 return "";
103         }
104
105         while (std::getline(f, s)) {
106                 content += s;
107                 content.push_back('\n');
108         }
109         f.close();
110         return content;
111 } /* read_file */
112
113 /*
114  * proto conversion
115  */
116
117 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
118 {
119         msg->set_host(vl->host);
120         msg->set_plugin(vl->plugin);
121         if (vl->plugin_instance[0] != '\0')
122                 msg->set_plugin_instance(vl->plugin_instance);
123         msg->set_type(vl->type);
124         if (vl->type_instance[0] != '\0')
125                 msg->set_type_instance(vl->type_instance);
126 } /* marshal_ident */
127
128 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
129                 bool require_fields)
130 {
131         std::string s;
132
133         s = msg.host();
134         if (!s.length() && require_fields)
135                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136                                 grpc::string("missing host name"));
137         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
138
139         s = msg.plugin();
140         if (!s.length() && require_fields)
141                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
142                                 grpc::string("missing plugin name"));
143         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
144
145         s = msg.type();
146         if (!s.length() && require_fields)
147                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
148                                 grpc::string("missing type name"));
149         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
150
151         s = msg.plugin_instance();
152         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
153
154         s = msg.type_instance();
155         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
156
157         return grpc::Status::OK;
158 } /* unmarshal_ident() */
159
160 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
161 {
162         auto id = msg->mutable_identifier();
163         marshal_ident(vl, id);
164
165         auto ds = plugin_get_ds(vl->type);
166         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
167                 return grpc::Status(grpc::StatusCode::INTERNAL,
168                                 grpc::string("failed to retrieve data-set for values"));
169         }
170
171         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
172         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
173         msg->set_allocated_time(new google::protobuf::Timestamp(t));
174         msg->set_allocated_interval(new google::protobuf::Duration(d));
175
176         for (size_t i = 0; i < vl->values_len; ++i) {
177                 auto v = msg->add_values();
178                 switch (ds->ds[i].type) {
179                         case DS_TYPE_COUNTER:
180                                 v->set_counter(vl->values[i].counter);
181                                 break;
182                         case DS_TYPE_GAUGE:
183                                 v->set_gauge(vl->values[i].gauge);
184                                 break;
185                         case DS_TYPE_DERIVE:
186                                 v->set_derive(vl->values[i].derive);
187                                 break;
188                         case DS_TYPE_ABSOLUTE:
189                                 v->set_absolute(vl->values[i].absolute);
190                                 break;
191                         default:
192                                 return grpc::Status(grpc::StatusCode::INTERNAL,
193                                                 grpc::string("unknown value type"));
194                 }
195
196                 auto name = msg->add_ds_names();
197                 name->assign(ds->ds[i].name);
198         }
199
200         return grpc::Status::OK;
201 } /* marshal_value_list */
202
203 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
204 {
205         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
206         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
207
208         auto status = unmarshal_ident(msg.identifier(), vl, true);
209         if (!status.ok())
210                 return status;
211
212         value_t *values = NULL;
213         size_t values_len = 0;
214
215         status = grpc::Status::OK;
216         for (auto v : msg.values()) {
217                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
218                 if (!val) {
219                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
220                                         grpc::string("failed to allocate values array"));
221                         break;
222                 }
223
224                 values = val;
225                 val = values + values_len;
226                 values_len++;
227
228                 switch (v.value_case()) {
229                 case collectd::types::Value::ValueCase::kCounter:
230                         val->counter = counter_t(v.counter());
231                         break;
232                 case collectd::types::Value::ValueCase::kGauge:
233                         val->gauge = gauge_t(v.gauge());
234                         break;
235                 case collectd::types::Value::ValueCase::kDerive:
236                         val->derive = derive_t(v.derive());
237                         break;
238                 case collectd::types::Value::ValueCase::kAbsolute:
239                         val->absolute = absolute_t(v.absolute());
240                         break;
241                 default:
242                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
243                                         grpc::string("unknown value type"));
244                         break;
245                 }
246
247                 if (!status.ok())
248                         break;
249         }
250         if (status.ok()) {
251                 vl->values = values;
252                 vl->values_len = values_len;
253         }
254         else if (values) {
255                 free(values);
256         }
257
258         return status;
259 } /* unmarshal_value_list() */
260
261 /*
262  * Collectd service
263  */
264 class CollectdImpl : public collectd::Collectd::Service {
265 public:
266         grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
267                 value_list_t match;
268                 auto err = unmarshal_ident(req->identifier(), &match, false);
269                 if (!err.ok()) {
270                         return err;
271                 }
272
273                 std::queue<value_list_t> value_lists;
274                 err = this->read(&match, &value_lists);
275                 if (err.ok()) {
276                         err = this->write(ctx, writer, &value_lists);
277                 }
278
279                 while (!value_lists.empty()) {
280                         auto vl = value_lists.front();
281                         value_lists.pop();
282                         sfree(vl.values);
283                 }
284
285                 return err;
286         }
287
288 private:
289         grpc::Status read(value_list_t const *match, std::queue<value_list_t> *value_lists) {
290                 uc_iter_t *iter;
291                 if ((iter = uc_get_iterator()) == NULL) {
292                         return grpc::Status(grpc::StatusCode::INTERNAL,
293                                                                 grpc::string("failed to query values: cannot create iterator"));
294                 }
295
296                 grpc::Status status = grpc::Status::OK;
297                 char *name = NULL;
298                 while (uc_iterator_next(iter, &name) == 0) {
299                         value_list_t vl;
300                         if (parse_identifier_vl(name, &vl) != 0) {
301                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
302                                                                           grpc::string("failed to parse identifier"));
303                                 break;
304                         }
305
306                         if (!ident_matches(&vl, match))
307                                 continue;
308
309                         if (uc_iterator_get_time(iter, &vl.time) < 0) {
310                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
311                                                                           grpc::string("failed to retrieve value timestamp"));
312                                 break;
313                         }
314                         if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
315                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
316                                                                           grpc::string("failed to retrieve value interval"));
317                                 break;
318                         }
319                         if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
320                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
321                                                                           grpc::string("failed to retrieve values"));
322                                 break;
323                         }
324
325                         value_lists->push(vl);
326                 } // while (uc_iterator_next(iter, &name) == 0)
327
328                 uc_iterator_destroy(iter);
329                 return status;
330         }
331
332         grpc::Status write(grpc::ServerContext *ctx,
333                                            grpc::ServerWriter<QueryValuesResponse> *writer,
334                                            std::queue<value_list_t> *value_lists) {
335                 while (!value_lists->empty()) {
336                         auto vl = value_lists->front();
337                         QueryValuesResponse res;
338                         res.Clear();
339
340                         auto err = marshal_value_list(&vl, res.mutable_value_list());
341                         if (!err.ok()) {
342                                 return err;
343                         }
344
345                         if (!writer->Write(res)) {
346                                 return grpc::Status::CANCELLED;
347                         }
348
349                         value_lists->pop();
350                         sfree(vl.values);
351                 }
352
353                 return grpc::Status::OK;
354         }
355 };
356
357 /*
358  * Dispatch service
359  */
360 class DispatchImpl : public collectd::Dispatch::Service {
361 public:
362         grpc::Status DispatchValues(grpc::ServerContext *ctx,
363                                                                 grpc::ServerReader<DispatchValuesRequest> *reader,
364                                                                 DispatchValuesResponse *res) override {
365                 DispatchValuesRequest req;
366
367                 while (reader->Read(&req)) {
368                         value_list_t vl = VALUE_LIST_INIT;
369                         auto status = unmarshal_value_list(req.value_list(), &vl);
370                         if (!status.ok())
371                                 return status;
372
373                         if (plugin_dispatch_values(&vl))
374                                 return grpc::Status(grpc::StatusCode::INTERNAL,
375                                                                         grpc::string("failed to enqueue values for writing"));
376                 }
377
378                 res->Clear();
379                 return grpc::Status::OK;
380         }
381 };
382
383 /*
384  * gRPC server implementation
385  */
386 class CollectdServer final
387 {
388 public:
389         void Start()
390         {
391                 auto auth = grpc::InsecureServerCredentials();
392
393                 grpc::ServerBuilder builder;
394
395                 if (listeners.empty()) {
396                         builder.AddListeningPort(default_addr, auth);
397                         INFO("grpc: Listening on %s", default_addr.c_str());
398                 }
399                 else {
400                         for (auto l : listeners) {
401                                 grpc::string addr = l.addr + ":" + l.port;
402
403                                 auto use_ssl = grpc::string("");
404                                 auto a = auth;
405                                 if (l.ssl != nullptr) {
406                                         use_ssl = grpc::string(" (SSL enabled)");
407                                         a = grpc::SslServerCredentials(*l.ssl);
408                                 }
409
410                                 builder.AddListeningPort(addr, a);
411                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
412                         }
413                 }
414
415                 builder.RegisterService(&collectd_service_);
416                 builder.RegisterService(&dispatch_service_);
417
418                 server_ = builder.BuildAndStart();
419         } /* Start() */
420
421         void Shutdown()
422         {
423                 server_->Shutdown();
424         } /* Shutdown() */
425
426 private:
427         CollectdImpl collectd_service_;
428         DispatchImpl dispatch_service_;
429
430         std::unique_ptr<grpc::Server> server_;
431 }; /* class CollectdServer */
432
433 static CollectdServer *server = nullptr;
434
435 /*
436  * collectd plugin interface
437  */
438 extern "C" {
439         static int c_grpc_config_listen(oconfig_item_t *ci)
440         {
441                 if ((ci->values_num != 2)
442                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
443                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
444                         ERROR("grpc: The `%s` config option needs exactly "
445                                         "two string argument (address and port).", ci->key);
446                         return -1;
447                 }
448
449                 auto listener = Listener();
450                 listener.addr = grpc::string(ci->values[0].value.string);
451                 listener.port = grpc::string(ci->values[1].value.string);
452                 listener.ssl = nullptr;
453
454                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
455                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
456                 bool use_ssl = false;
457
458                 for (int i = 0; i < ci->children_num; i++) {
459                         oconfig_item_t *child = ci->children + i;
460
461                         if (!strcasecmp("EnableSSL", child->key)) {
462                                 if (cf_util_get_boolean(child, &use_ssl)) {
463                                         ERROR("grpc: Option `%s` expects a boolean value",
464                                                         child->key);
465                                         return -1;
466                                 }
467                         }
468                         else if (!strcasecmp("SSLRootCerts", child->key)) {
469                                 char *certs = NULL;
470                                 if (cf_util_get_string(child, &certs)) {
471                                         ERROR("grpc: Option `%s` expects a string value",
472                                                         child->key);
473                                         return -1;
474                                 }
475                                 ssl_opts->pem_root_certs = read_file(certs);
476                         }
477                         else if (!strcasecmp("SSLServerKey", child->key)) {
478                                 char *key = NULL;
479                                 if (cf_util_get_string(child, &key)) {
480                                         ERROR("grpc: Option `%s` expects a string value",
481                                                         child->key);
482                                         return -1;
483                                 }
484                                 pkcp.private_key = read_file(key);
485                         }
486                         else if (!strcasecmp("SSLServerCert", child->key)) {
487                                 char *cert = NULL;
488                                 if (cf_util_get_string(child, &cert)) {
489                                         ERROR("grpc: Option `%s` expects a string value",
490                                                         child->key);
491                                         return -1;
492                                 }
493                                 pkcp.cert_chain = read_file(cert);
494                         }
495                         else {
496                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
497                                                 child->key, ci->key);
498                         }
499                 }
500
501                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
502                 if (use_ssl)
503                         listener.ssl = ssl_opts;
504                 else
505                         delete(ssl_opts);
506
507                 listeners.push_back(listener);
508                 return 0;
509         } /* c_grpc_config_listen() */
510
511         static int c_grpc_config(oconfig_item_t *ci)
512         {
513                 int i;
514
515                 for (i = 0; i < ci->children_num; i++) {
516                         oconfig_item_t *child = ci->children + i;
517
518                         if (!strcasecmp("Listen", child->key)) {
519                                 if (c_grpc_config_listen(child))
520                                         return -1;
521                         }
522                         else {
523                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
524                         }
525                 }
526
527                 return 0;
528         } /* c_grpc_config() */
529
530         static int c_grpc_init(void)
531         {
532                 server = new CollectdServer();
533                 if (!server) {
534                         ERROR("grpc: Failed to create server");
535                         return -1;
536                 }
537
538                 server->Start();
539                 return 0;
540         } /* c_grpc_init() */
541
542         static int c_grpc_shutdown(void)
543         {
544                 if (!server)
545                         return 0;
546
547                 server->Shutdown();
548
549                 delete server;
550                 server = nullptr;
551
552                 return 0;
553         } /* c_grpc_shutdown() */
554
555         void module_register(void)
556         {
557                 plugin_register_complex_config("grpc", c_grpc_config);
558                 plugin_register_init("grpc", c_grpc_init);
559                 plugin_register_shutdown("grpc", c_grpc_shutdown);
560         } /* module_register() */
561 } /* extern "C" */
562
563 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */