Merge pull request #2088 from landryb/fix/2061
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  * Copyright (C) 2016      Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastian Harl <sh at tokkee.org>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 #include <google/protobuf/util/time_util.h>
30 #include <grpc++/grpc++.h>
31
32 #include <fstream>
33 #include <iostream>
34 #include <queue>
35 #include <vector>
36
37 #include "collectd.grpc.pb.h"
38
39 extern "C" {
40 #include <fnmatch.h>
41 #include <stdbool.h>
42
43 #include "collectd.h"
44 #include "common.h"
45 #include "plugin.h"
46
47 #include "daemon/utils_cache.h"
48 }
49
50 using collectd::Collectd;
51
52 using collectd::PutValuesRequest;
53 using collectd::PutValuesResponse;
54 using collectd::QueryValuesRequest;
55 using collectd::QueryValuesResponse;
56
57 using google::protobuf::util::TimeUtil;
58
59 /*
60  * private types
61  */
62
63 struct Listener {
64   grpc::string addr;
65   grpc::string port;
66
67   grpc::SslServerCredentialsOptions *ssl;
68 };
69 static std::vector<Listener> listeners;
70 static grpc::string default_addr("0.0.0.0:50051");
71
72 /*
73  * helper functions
74  */
75
76 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher) {
77   if (fnmatch(matcher->host, vl->host, 0))
78     return false;
79
80   if (fnmatch(matcher->plugin, vl->plugin, 0))
81     return false;
82   if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
83     return false;
84
85   if (fnmatch(matcher->type, vl->type, 0))
86     return false;
87   if (fnmatch(matcher->type_instance, vl->type_instance, 0))
88     return false;
89
90   return true;
91 } /* ident_matches */
92
93 static grpc::string read_file(const char *filename) {
94   std::ifstream f;
95   grpc::string s, content;
96
97   f.open(filename);
98   if (!f.is_open()) {
99     ERROR("grpc: Failed to open '%s'", filename);
100     return "";
101   }
102
103   while (std::getline(f, s)) {
104     content += s;
105     content.push_back('\n');
106   }
107   f.close();
108   return content;
109 } /* read_file */
110
111 /*
112  * proto conversion
113  */
114
115 static void marshal_ident(const value_list_t *vl,
116                           collectd::types::Identifier *msg) {
117   msg->set_host(vl->host);
118   msg->set_plugin(vl->plugin);
119   if (vl->plugin_instance[0] != '\0')
120     msg->set_plugin_instance(vl->plugin_instance);
121   msg->set_type(vl->type);
122   if (vl->type_instance[0] != '\0')
123     msg->set_type_instance(vl->type_instance);
124 } /* marshal_ident */
125
126 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
127                                     value_list_t *vl, bool require_fields) {
128   std::string s;
129
130   s = msg.host();
131   if (!s.length() && require_fields)
132     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
133                         grpc::string("missing host name"));
134   sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
135
136   s = msg.plugin();
137   if (!s.length() && require_fields)
138     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
139                         grpc::string("missing plugin name"));
140   sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
141
142   s = msg.type();
143   if (!s.length() && require_fields)
144     return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
145                         grpc::string("missing type name"));
146   sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
147
148   s = msg.plugin_instance();
149   sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
150
151   s = msg.type_instance();
152   sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
153
154   return grpc::Status::OK;
155 } /* unmarshal_ident() */
156
157 static grpc::Status marshal_value_list(const value_list_t *vl,
158                                        collectd::types::ValueList *msg) {
159   auto id = msg->mutable_identifier();
160   marshal_ident(vl, id);
161
162   auto ds = plugin_get_ds(vl->type);
163   if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
164     return grpc::Status(grpc::StatusCode::INTERNAL,
165                         grpc::string("failed to retrieve data-set for values"));
166   }
167
168   auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
169   auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
170   msg->set_allocated_time(new google::protobuf::Timestamp(t));
171   msg->set_allocated_interval(new google::protobuf::Duration(d));
172
173   for (size_t i = 0; i < vl->values_len; ++i) {
174     auto v = msg->add_values();
175     switch (ds->ds[i].type) {
176     case DS_TYPE_COUNTER:
177       v->set_counter(vl->values[i].counter);
178       break;
179     case DS_TYPE_GAUGE:
180       v->set_gauge(vl->values[i].gauge);
181       break;
182     case DS_TYPE_DERIVE:
183       v->set_derive(vl->values[i].derive);
184       break;
185     case DS_TYPE_ABSOLUTE:
186       v->set_absolute(vl->values[i].absolute);
187       break;
188     default:
189       return grpc::Status(grpc::StatusCode::INTERNAL,
190                           grpc::string("unknown value type"));
191     }
192
193     auto name = msg->add_ds_names();
194     name->assign(ds->ds[i].name);
195   }
196
197   return grpc::Status::OK;
198 } /* marshal_value_list */
199
200 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
201                                          value_list_t *vl) {
202   vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
203   vl->interval =
204       NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
205
206   auto status = unmarshal_ident(msg.identifier(), vl, true);
207   if (!status.ok())
208     return status;
209
210   value_t *values = NULL;
211   size_t values_len = 0;
212
213   status = grpc::Status::OK;
214   for (auto v : msg.values()) {
215     value_t *val =
216         (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
217     if (!val) {
218       status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
219                             grpc::string("failed to allocate values array"));
220       break;
221     }
222
223     values = val;
224     val = values + values_len;
225     values_len++;
226
227     switch (v.value_case()) {
228     case collectd::types::Value::ValueCase::kCounter:
229       val->counter = counter_t(v.counter());
230       break;
231     case collectd::types::Value::ValueCase::kGauge:
232       val->gauge = gauge_t(v.gauge());
233       break;
234     case collectd::types::Value::ValueCase::kDerive:
235       val->derive = derive_t(v.derive());
236       break;
237     case collectd::types::Value::ValueCase::kAbsolute:
238       val->absolute = absolute_t(v.absolute());
239       break;
240     default:
241       status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
242                             grpc::string("unknown value type"));
243       break;
244     }
245
246     if (!status.ok())
247       break;
248   }
249   if (status.ok()) {
250     vl->values = values;
251     vl->values_len = values_len;
252   } else if (values) {
253     free(values);
254   }
255
256   return status;
257 } /* unmarshal_value_list() */
258
259 /*
260  * Collectd service
261  */
262 class CollectdImpl : public collectd::Collectd::Service {
263 public:
264   grpc::Status
265   QueryValues(grpc::ServerContext *ctx, QueryValuesRequest const *req,
266               grpc::ServerWriter<QueryValuesResponse> *writer) override {
267     value_list_t match;
268     auto status = unmarshal_ident(req->identifier(), &match, false);
269     if (!status.ok()) {
270       return status;
271     }
272
273     std::queue<value_list_t> value_lists;
274     status = this->queryValuesRead(&match, &value_lists);
275     if (status.ok()) {
276       status = this->queryValuesWrite(ctx, writer, &value_lists);
277     }
278
279     while (!value_lists.empty()) {
280       auto vl = value_lists.front();
281       value_lists.pop();
282       sfree(vl.values);
283     }
284
285     return status;
286   }
287
288   grpc::Status PutValues(grpc::ServerContext *ctx,
289                          grpc::ServerReader<PutValuesRequest> *reader,
290                          PutValuesResponse *res) override {
291     PutValuesRequest req;
292
293     while (reader->Read(&req)) {
294       value_list_t vl = {0};
295       auto status = unmarshal_value_list(req.value_list(), &vl);
296       if (!status.ok())
297         return status;
298
299       if (plugin_dispatch_values(&vl))
300         return grpc::Status(
301             grpc::StatusCode::INTERNAL,
302             grpc::string("failed to enqueue values for writing"));
303     }
304
305     res->Clear();
306     return grpc::Status::OK;
307   }
308
309 private:
310   grpc::Status queryValuesRead(value_list_t const *match,
311                                std::queue<value_list_t> *value_lists) {
312     uc_iter_t *iter;
313     if ((iter = uc_get_iterator()) == NULL) {
314       return grpc::Status(
315           grpc::StatusCode::INTERNAL,
316           grpc::string("failed to query values: cannot create iterator"));
317     }
318
319     grpc::Status status = grpc::Status::OK;
320     char *name = NULL;
321     while (uc_iterator_next(iter, &name) == 0) {
322       value_list_t vl;
323       if (parse_identifier_vl(name, &vl) != 0) {
324         status = grpc::Status(grpc::StatusCode::INTERNAL,
325                               grpc::string("failed to parse identifier"));
326         break;
327       }
328
329       if (!ident_matches(&vl, match))
330         continue;
331
332       if (uc_iterator_get_time(iter, &vl.time) < 0) {
333         status =
334             grpc::Status(grpc::StatusCode::INTERNAL,
335                          grpc::string("failed to retrieve value timestamp"));
336         break;
337       }
338       if (uc_iterator_get_interval(iter, &vl.interval) < 0) {
339         status =
340             grpc::Status(grpc::StatusCode::INTERNAL,
341                          grpc::string("failed to retrieve value interval"));
342         break;
343       }
344       if (uc_iterator_get_values(iter, &vl.values, &vl.values_len) < 0) {
345         status = grpc::Status(grpc::StatusCode::INTERNAL,
346                               grpc::string("failed to retrieve values"));
347         break;
348       }
349
350       value_lists->push(vl);
351     } // while (uc_iterator_next(iter, &name) == 0)
352
353     uc_iterator_destroy(iter);
354     return status;
355   }
356
357   grpc::Status queryValuesWrite(grpc::ServerContext *ctx,
358                                 grpc::ServerWriter<QueryValuesResponse> *writer,
359                                 std::queue<value_list_t> *value_lists) {
360     while (!value_lists->empty()) {
361       auto vl = value_lists->front();
362       QueryValuesResponse res;
363       res.Clear();
364
365       auto status = marshal_value_list(&vl, res.mutable_value_list());
366       if (!status.ok()) {
367         return status;
368       }
369
370       if (!writer->Write(res)) {
371         return grpc::Status::CANCELLED;
372       }
373
374       value_lists->pop();
375       sfree(vl.values);
376     }
377
378     return grpc::Status::OK;
379   }
380 };
381
382 /*
383  * gRPC server implementation
384  */
385 class CollectdServer final {
386 public:
387   void Start() {
388     auto auth = grpc::InsecureServerCredentials();
389
390     grpc::ServerBuilder builder;
391
392     if (listeners.empty()) {
393       builder.AddListeningPort(default_addr, auth);
394       INFO("grpc: Listening on %s", default_addr.c_str());
395     } else {
396       for (auto l : listeners) {
397         grpc::string addr = l.addr + ":" + l.port;
398
399         auto use_ssl = grpc::string("");
400         auto a = auth;
401         if (l.ssl != nullptr) {
402           use_ssl = grpc::string(" (SSL enabled)");
403           a = grpc::SslServerCredentials(*l.ssl);
404         }
405
406         builder.AddListeningPort(addr, a);
407         INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
408       }
409     }
410
411     builder.RegisterService(&collectd_service_);
412
413     server_ = builder.BuildAndStart();
414   } /* Start() */
415
416   void Shutdown() { server_->Shutdown(); } /* Shutdown() */
417
418 private:
419   CollectdImpl collectd_service_;
420
421   std::unique_ptr<grpc::Server> server_;
422 }; /* class CollectdServer */
423
424 class CollectdClient final {
425 public:
426   CollectdClient(std::shared_ptr<grpc::ChannelInterface> channel)
427       : stub_(Collectd::NewStub(channel)) {}
428
429   int PutValues(value_list_t const *vl) {
430     grpc::ClientContext ctx;
431
432     PutValuesRequest req;
433     auto status = marshal_value_list(vl, req.mutable_value_list());
434     if (!status.ok()) {
435       ERROR("grpc: Marshalling value_list_t failed.");
436       return -1;
437     }
438
439     PutValuesResponse res;
440     auto stream = stub_->PutValues(&ctx, &res);
441     if (!stream->Write(req)) {
442       NOTICE("grpc: Broken stream.");
443       /* intentionally not returning. */
444     }
445
446     stream->WritesDone();
447     status = stream->Finish();
448     if (!status.ok()) {
449       ERROR("grpc: Error while closing stream.");
450       return -1;
451     }
452
453     return 0;
454   } /* int PutValues */
455
456 private:
457   std::unique_ptr<Collectd::Stub> stub_;
458 };
459
460 static CollectdServer *server = nullptr;
461
462 /*
463  * collectd plugin interface
464  */
465 extern "C" {
466 static void c_grpc_destroy_write_callback(void *ptr) {
467   delete (CollectdClient *)ptr;
468 }
469
470 static int c_grpc_write(__attribute__((unused)) data_set_t const *ds,
471                         value_list_t const *vl, user_data_t *ud) {
472   CollectdClient *c = (CollectdClient *)ud->data;
473   return c->PutValues(vl);
474 }
475
476 static int c_grpc_config_listen(oconfig_item_t *ci) {
477   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
478       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
479     ERROR("grpc: The `%s` config option needs exactly "
480           "two string argument (address and port).",
481           ci->key);
482     return -1;
483   }
484
485   auto listener = Listener();
486   listener.addr = grpc::string(ci->values[0].value.string);
487   listener.port = grpc::string(ci->values[1].value.string);
488   listener.ssl = nullptr;
489
490   auto ssl_opts = new (grpc::SslServerCredentialsOptions);
491   grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
492   bool use_ssl = false;
493
494   for (int i = 0; i < ci->children_num; i++) {
495     oconfig_item_t *child = ci->children + i;
496
497     if (!strcasecmp("EnableSSL", child->key)) {
498       if (cf_util_get_boolean(child, &use_ssl)) {
499         ERROR("grpc: Option `%s` expects a boolean value", child->key);
500         return -1;
501       }
502     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
503       char *certs = NULL;
504       if (cf_util_get_string(child, &certs)) {
505         ERROR("grpc: Option `%s` expects a string value", child->key);
506         return -1;
507       }
508       ssl_opts->pem_root_certs = read_file(certs);
509     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
510       char *key = NULL;
511       if (cf_util_get_string(child, &key)) {
512         ERROR("grpc: Option `%s` expects a string value", child->key);
513         return -1;
514       }
515       pkcp.private_key = read_file(key);
516     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
517       char *cert = NULL;
518       if (cf_util_get_string(child, &cert)) {
519         ERROR("grpc: Option `%s` expects a string value", child->key);
520         return -1;
521       }
522       pkcp.cert_chain = read_file(cert);
523     } else {
524       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
525               ci->key);
526     }
527   }
528
529   ssl_opts->pem_key_cert_pairs.push_back(pkcp);
530   if (use_ssl)
531     listener.ssl = ssl_opts;
532   else
533     delete (ssl_opts);
534
535   listeners.push_back(listener);
536   return 0;
537 } /* c_grpc_config_listen() */
538
539 static int c_grpc_config_server(oconfig_item_t *ci) {
540   if ((ci->values_num != 2) || (ci->values[0].type != OCONFIG_TYPE_STRING) ||
541       (ci->values[1].type != OCONFIG_TYPE_STRING)) {
542     ERROR("grpc: The `%s` config option needs exactly "
543           "two string argument (address and port).",
544           ci->key);
545     return -1;
546   }
547
548   grpc::SslCredentialsOptions ssl_opts;
549   bool use_ssl = false;
550
551   for (int i = 0; i < ci->children_num; i++) {
552     oconfig_item_t *child = ci->children + i;
553
554     if (!strcasecmp("EnableSSL", child->key)) {
555       if (cf_util_get_boolean(child, &use_ssl)) {
556         return -1;
557       }
558     } else if (!strcasecmp("SSLCACertificateFile", child->key)) {
559       char *certs = NULL;
560       if (cf_util_get_string(child, &certs)) {
561         return -1;
562       }
563       ssl_opts.pem_root_certs = read_file(certs);
564     } else if (!strcasecmp("SSLCertificateKeyFile", child->key)) {
565       char *key = NULL;
566       if (cf_util_get_string(child, &key)) {
567         return -1;
568       }
569       ssl_opts.pem_private_key = read_file(key);
570     } else if (!strcasecmp("SSLCertificateFile", child->key)) {
571       char *cert = NULL;
572       if (cf_util_get_string(child, &cert)) {
573         return -1;
574       }
575       ssl_opts.pem_cert_chain = read_file(cert);
576     } else {
577       WARNING("grpc: Option `%s` not allowed in <%s> block.", child->key,
578               ci->key);
579     }
580   }
581
582   auto node = grpc::string(ci->values[0].value.string);
583   auto service = grpc::string(ci->values[1].value.string);
584   auto addr = node + ":" + service;
585
586   CollectdClient *client;
587   if (use_ssl) {
588     auto channel_creds = grpc::SslCredentials(ssl_opts);
589     auto channel = grpc::CreateChannel(addr, channel_creds);
590     client = new CollectdClient(channel);
591   } else {
592     auto channel =
593         grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
594     client = new CollectdClient(channel);
595   }
596
597   auto callback_name = grpc::string("grpc/") + addr;
598   user_data_t ud = {
599       .data = client, .free_func = c_grpc_destroy_write_callback,
600   };
601
602   plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
603   return 0;
604 } /* c_grpc_config_server() */
605
606 static int c_grpc_config(oconfig_item_t *ci) {
607   int i;
608
609   for (i = 0; i < ci->children_num; i++) {
610     oconfig_item_t *child = ci->children + i;
611
612     if (!strcasecmp("Listen", child->key)) {
613       if (c_grpc_config_listen(child))
614         return -1;
615     } else if (!strcasecmp("Server", child->key)) {
616       if (c_grpc_config_server(child))
617         return -1;
618     }
619
620     else {
621       WARNING("grpc: Option `%s` not allowed here.", child->key);
622     }
623   }
624
625   return 0;
626 } /* c_grpc_config() */
627
628 static int c_grpc_init(void) {
629   server = new CollectdServer();
630   if (!server) {
631     ERROR("grpc: Failed to create server");
632     return -1;
633   }
634
635   server->Start();
636   return 0;
637 } /* c_grpc_init() */
638
639 static int c_grpc_shutdown(void) {
640   if (!server)
641     return 0;
642
643   server->Shutdown();
644
645   delete server;
646   server = nullptr;
647
648   return 0;
649 } /* c_grpc_shutdown() */
650
651 void module_register(void) {
652   plugin_register_complex_config("grpc", c_grpc_config);
653   plugin_register_init("grpc", c_grpc_init);
654   plugin_register_shutdown("grpc", c_grpc_shutdown);
655 } /* module_register() */
656 } /* extern "C" */