Merge pull request #1876 from octo/issue/1819
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <fstream>
31 #include <iostream>
32 #include <queue>
33 #include <vector>
34
35 #include "collectd.grpc.pb.h"
36
37 extern "C" {
38 #include <fnmatch.h>
39 #include <stdbool.h>
40
41 #include "collectd.h"
42 #include "common.h"
43 #include "configfile.h"
44 #include "plugin.h"
45
46 #include "daemon/utils_cache.h"
47 }
48
49 using collectd::Collectd;
50
51 using collectd::DispatchValuesRequest;
52 using collectd::DispatchValuesResponse;
53 using collectd::QueryValuesRequest;
54 using collectd::QueryValuesResponse;
55
56 using google::protobuf::util::TimeUtil;
57
58 /*
59  * private types
60  */
61
62 struct Listener {
63         grpc::string addr;
64         grpc::string port;
65
66         grpc::SslServerCredentialsOptions *ssl;
67 };
68 static std::vector<Listener> listeners;
69 static grpc::string default_addr("0.0.0.0:50051");
70
71 /*
72  * helper functions
73  */
74
75 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
76 {
77         if (fnmatch(matcher->host, vl->host, 0))
78                 return false;
79
80         if (fnmatch(matcher->plugin, vl->plugin, 0))
81                 return false;
82         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
83                 return false;
84
85         if (fnmatch(matcher->type, vl->type, 0))
86                 return false;
87         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
88                 return false;
89
90         return true;
91 } /* ident_matches */
92
93 static grpc::string read_file(const char *filename)
94 {
95         std::ifstream f;
96         grpc::string s, content;
97
98         f.open(filename);
99         if (!f.is_open()) {
100                 ERROR("grpc: Failed to open '%s'", filename);
101                 return "";
102         }
103
104         while (std::getline(f, s)) {
105                 content += s;
106                 content.push_back('\n');
107         }
108         f.close();
109         return content;
110 } /* read_file */
111
112 /*
113  * proto conversion
114  */
115
116 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
117 {
118         msg->set_host(vl->host);
119         msg->set_plugin(vl->plugin);
120         if (vl->plugin_instance[0] != '\0')
121                 msg->set_plugin_instance(vl->plugin_instance);
122         msg->set_type(vl->type);
123         if (vl->type_instance[0] != '\0')
124                 msg->set_type_instance(vl->type_instance);
125 } /* marshal_ident */
126
127 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
128                 bool require_fields)
129 {
130         std::string s;
131
132         s = msg.host();
133         if (!s.length() && require_fields)
134                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
135                                 grpc::string("missing host name"));
136         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
137
138         s = msg.plugin();
139         if (!s.length() && require_fields)
140                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
141                                 grpc::string("missing plugin name"));
142         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
143
144         s = msg.type();
145         if (!s.length() && require_fields)
146                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
147                                 grpc::string("missing type name"));
148         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
149
150         s = msg.plugin_instance();
151         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
152
153         s = msg.type_instance();
154         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
155
156         return grpc::Status::OK;
157 } /* unmarshal_ident() */
158
159 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
160 {
161         auto id = msg->mutable_identifier();
162         marshal_ident(vl, id);
163
164         auto ds = plugin_get_ds(vl->type);
165         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
166                 return grpc::Status(grpc::StatusCode::INTERNAL,
167                                 grpc::string("failed to retrieve data-set for values"));
168         }
169
170         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
171         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
172         msg->set_allocated_time(new google::protobuf::Timestamp(t));
173         msg->set_allocated_interval(new google::protobuf::Duration(d));
174
175         for (size_t i = 0; i < vl->values_len; ++i) {
176                 auto v = msg->add_values();
177                 switch (ds->ds[i].type) {
178                         case DS_TYPE_COUNTER:
179                                 v->set_counter(vl->values[i].counter);
180                                 break;
181                         case DS_TYPE_GAUGE:
182                                 v->set_gauge(vl->values[i].gauge);
183                                 break;
184                         case DS_TYPE_DERIVE:
185                                 v->set_derive(vl->values[i].derive);
186                                 break;
187                         case DS_TYPE_ABSOLUTE:
188                                 v->set_absolute(vl->values[i].absolute);
189                                 break;
190                         default:
191                                 return grpc::Status(grpc::StatusCode::INTERNAL,
192                                                 grpc::string("unknown value type"));
193                 }
194
195                 auto name = msg->add_ds_names();
196                 name->assign(ds->ds[i].name);
197         }
198
199         return grpc::Status::OK;
200 } /* marshal_value_list */
201
202 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
203 {
204         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
205         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
206
207         auto status = unmarshal_ident(msg.identifier(), vl, true);
208         if (!status.ok())
209                 return status;
210
211         value_t *values = NULL;
212         size_t values_len = 0;
213
214         status = grpc::Status::OK;
215         for (auto v : msg.values()) {
216                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
217                 if (!val) {
218                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
219                                         grpc::string("failed to allocate values array"));
220                         break;
221                 }
222
223                 values = val;
224                 val = values + values_len;
225                 values_len++;
226
227                 switch (v.value_case()) {
228                 case collectd::types::Value::ValueCase::kCounter:
229                         val->counter = counter_t(v.counter());
230                         break;
231                 case collectd::types::Value::ValueCase::kGauge:
232                         val->gauge = gauge_t(v.gauge());
233                         break;
234                 case collectd::types::Value::ValueCase::kDerive:
235                         val->derive = derive_t(v.derive());
236                         break;
237                 case collectd::types::Value::ValueCase::kAbsolute:
238                         val->absolute = absolute_t(v.absolute());
239                         break;
240                 default:
241                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
242                                         grpc::string("unknown value type"));
243                         break;
244                 }
245
246                 if (!status.ok())
247                         break;
248         }
249         if (status.ok()) {
250                 vl->values = values;
251                 vl->values_len = values_len;
252         }
253         else if (values) {
254                 free(values);
255         }
256
257         return status;
258 } /* unmarshal_value_list() */
259
260 /*
261  * Collectd service
262  */
263 class CollectdImpl : public collectd::Collectd::Service {
264 public:
265         grpc::Status QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req, grpc::ServerWriter<QueryValuesResponse> *writer) override {
266                 value_list_t match;
267                 auto status = unmarshal_ident(req->identifier(), &match, false);
268                 if (!status.ok()) {
269                         return status;
270                 }
271
272                 std::queue<value_list_t> value_lists;
273                 status = this->queryValuesRead(&match, &value_lists);
274                 if (status.ok()) {
275                         status = this->queryValuesWrite(ctx, writer, &value_lists);
276                 }
277
278                 while (!value_lists.empty()) {
279                         auto vl = value_lists.front();
280                         value_lists.pop();
281                         sfree(vl.values);
282                 }
283
284                 return status;
285         }
286
287         grpc::Status DispatchValues(grpc::ServerContext *ctx,
288                                                                 grpc::ServerReader<DispatchValuesRequest> *reader,
289                                                                 DispatchValuesResponse *res) override {
290                 DispatchValuesRequest req;
291
292                 while (reader->Read(&req)) {
293                         value_list_t vl = VALUE_LIST_INIT;
294                         auto status = unmarshal_value_list(req.value_list(), &vl);
295                         if (!status.ok())
296                                 return status;
297
298                         if (plugin_dispatch_values(&vl))
299                                 return grpc::Status(grpc::StatusCode::INTERNAL,
300                                                                         grpc::string("failed to enqueue values for writing"));
301                 }
302
303                 res->Clear();
304                 return grpc::Status::OK;
305         }
306
307 private:
308         grpc::Status queryValuesRead(value_list_t const *match, std::queue<value_list_t> *value_lists) {
309                 uc_iter_t *iter;
310                 if ((iter = uc_get_iterator()) == NULL) {
311                         return grpc::Status(grpc::StatusCode::INTERNAL,
312                                                                 grpc::string("failed to query values: cannot create iterator"));
313                 }
314
315                 grpc::Status status = grpc::Status::OK;
316                 char *name = NULL;
317                 while (uc_iterator_next(iter, &name) == 0) {
318                         value_list_t vl;
319                         if (parse_identifier_vl(name, &vl) != 0) {
320                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
321                                                                           grpc::string("failed to parse identifier"));
322                                 break;
323                         }
324
325                         if (!ident_matches(&vl, match))
326                                 continue;
327
328                         if (uc_iterator_get_time(iter, &vl.time) < 0) {
329                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
330                                                                           grpc::string("failed to retrieve value timestamp"));
331                                 break;
332                         }
333                         if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
334                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
335                                                                           grpc::string("failed to retrieve value interval"));
336                                 break;
337                         }
338                         if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
339                                 status = grpc::Status(grpc::StatusCode::INTERNAL,
340                                                                           grpc::string("failed to retrieve values"));
341                                 break;
342                         }
343
344                         value_lists->push(vl);
345                 } // while (uc_iterator_next(iter, &name) == 0)
346
347                 uc_iterator_destroy(iter);
348                 return status;
349         }
350
351         grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
352                                            grpc::ServerWriter<QueryValuesResponse> *writer,
353                                            std::queue<value_list_t> *value_lists) {
354                 while (!value_lists->empty()) {
355                         auto vl = value_lists->front();
356                         QueryValuesResponse res;
357                         res.Clear();
358
359                         auto status = marshal_value_list(&vl, res.mutable_value_list());
360                         if (!status.ok()) {
361                                 return status;
362                         }
363
364                         if (!writer->Write(res)) {
365                                 return grpc::Status::CANCELLED;
366                         }
367
368                         value_lists->pop();
369                         sfree(vl.values);
370                 }
371
372                 return grpc::Status::OK;
373         }
374 };
375
376 /*
377  * gRPC server implementation
378  */
379 class CollectdServer final
380 {
381 public:
382         void Start()
383         {
384                 auto auth = grpc::InsecureServerCredentials();
385
386                 grpc::ServerBuilder builder;
387
388                 if (listeners.empty()) {
389                         builder.AddListeningPort(default_addr, auth);
390                         INFO("grpc: Listening on %s", default_addr.c_str());
391                 }
392                 else {
393                         for (auto l : listeners) {
394                                 grpc::string addr = l.addr + ":" + l.port;
395
396                                 auto use_ssl = grpc::string("");
397                                 auto a = auth;
398                                 if (l.ssl != nullptr) {
399                                         use_ssl = grpc::string(" (SSL enabled)");
400                                         a = grpc::SslServerCredentials(*l.ssl);
401                                 }
402
403                                 builder.AddListeningPort(addr, a);
404                                 INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
405                         }
406                 }
407
408                 builder.RegisterService(&collectd_service_);
409
410                 server_ = builder.BuildAndStart();
411         } /* Start() */
412
413         void Shutdown()
414         {
415                 server_->Shutdown();
416         } /* Shutdown() */
417
418 private:
419         CollectdImpl collectd_service_;
420
421         std::unique_ptr<grpc::Server> server_;
422 }; /* class CollectdServer */
423
424 static CollectdServer *server = nullptr;
425
426 /*
427  * collectd plugin interface
428  */
429 extern "C" {
430         static int c_grpc_config_listen(oconfig_item_t *ci)
431         {
432                 if ((ci->values_num != 2)
433                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
434                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
435                         ERROR("grpc: The `%s` config option needs exactly "
436                                         "two string argument (address and port).", ci->key);
437                         return -1;
438                 }
439
440                 auto listener = Listener();
441                 listener.addr = grpc::string(ci->values[0].value.string);
442                 listener.port = grpc::string(ci->values[1].value.string);
443                 listener.ssl = nullptr;
444
445                 auto ssl_opts = new(grpc::SslServerCredentialsOptions);
446                 grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
447                 bool use_ssl = false;
448
449                 for (int i = 0; i < ci->children_num; i++) {
450                         oconfig_item_t *child = ci->children + i;
451
452                         if (!strcasecmp("EnableSSL", child->key)) {
453                                 if (cf_util_get_boolean(child, &use_ssl)) {
454                                         ERROR("grpc: Option `%s` expects a boolean value",
455                                                         child->key);
456                                         return -1;
457                                 }
458                         }
459                         else if (!strcasecmp("SSLRootCerts", child->key)) {
460                                 char *certs = NULL;
461                                 if (cf_util_get_string(child, &certs)) {
462                                         ERROR("grpc: Option `%s` expects a string value",
463                                                         child->key);
464                                         return -1;
465                                 }
466                                 ssl_opts->pem_root_certs = read_file(certs);
467                         }
468                         else if (!strcasecmp("SSLServerKey", child->key)) {
469                                 char *key = NULL;
470                                 if (cf_util_get_string(child, &key)) {
471                                         ERROR("grpc: Option `%s` expects a string value",
472                                                         child->key);
473                                         return -1;
474                                 }
475                                 pkcp.private_key = read_file(key);
476                         }
477                         else if (!strcasecmp("SSLServerCert", child->key)) {
478                                 char *cert = NULL;
479                                 if (cf_util_get_string(child, &cert)) {
480                                         ERROR("grpc: Option `%s` expects a string value",
481                                                         child->key);
482                                         return -1;
483                                 }
484                                 pkcp.cert_chain = read_file(cert);
485                         }
486                         else {
487                                 WARNING("grpc: Option `%s` not allowed in <%s> block.",
488                                                 child->key, ci->key);
489                         }
490                 }
491
492                 ssl_opts->pem_key_cert_pairs.push_back(pkcp);
493                 if (use_ssl)
494                         listener.ssl = ssl_opts;
495                 else
496                         delete(ssl_opts);
497
498                 listeners.push_back(listener);
499                 return 0;
500         } /* c_grpc_config_listen() */
501
502         static int c_grpc_config(oconfig_item_t *ci)
503         {
504                 int i;
505
506                 for (i = 0; i < ci->children_num; i++) {
507                         oconfig_item_t *child = ci->children + i;
508
509                         if (!strcasecmp("Listen", child->key)) {
510                                 if (c_grpc_config_listen(child))
511                                         return -1;
512                         }
513                         else {
514                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
515                         }
516                 }
517
518                 return 0;
519         } /* c_grpc_config() */
520
521         static int c_grpc_init(void)
522         {
523                 server = new CollectdServer();
524                 if (!server) {
525                         ERROR("grpc: Failed to create server");
526                         return -1;
527                 }
528
529                 server->Start();
530                 return 0;
531         } /* c_grpc_init() */
532
533         static int c_grpc_shutdown(void)
534         {
535                 if (!server)
536                         return 0;
537
538                 server->Shutdown();
539
540                 delete server;
541                 server = nullptr;
542
543                 return 0;
544         } /* c_grpc_shutdown() */
545
546         void module_register(void)
547         {
548                 plugin_register_complex_config("grpc", c_grpc_config);
549                 plugin_register_init("grpc", c_grpc_init);
550                 plugin_register_shutdown("grpc", c_grpc_shutdown);
551         } /* module_register() */
552 } /* extern "C" */
553
554 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */