4e10783d6d3984849835af987daa5eae5e8e4dc6
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include <vector>
31
32 #include "collectd.grpc.pb.h"
33
34 extern "C" {
35 #include <fnmatch.h>
36 #include <stdbool.h>
37 #include <pthread.h>
38
39 #include "collectd.h"
40 #include "common.h"
41 #include "configfile.h"
42 #include "plugin.h"
43
44 #include "daemon/utils_cache.h"
45 }
46
47 using collectd::Collectd;
48
49 using collectd::DispatchValuesRequest;
50 using collectd::DispatchValuesReply;
51 using collectd::QueryValuesRequest;
52 using collectd::QueryValuesReply;
53
54 using google::protobuf::util::TimeUtil;
55
56 /*
57  * private types
58  */
59
60 struct Listener {
61         grpc::string addr;
62         grpc::string port;
63 };
64 static std::vector<Listener> listeners;
65 static grpc::string default_addr("0.0.0.0:50051");
66
67 /*
68  * helper functions
69  */
70
71 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
72 {
73         if (fnmatch(matcher->host, vl->host, 0))
74                 return false;
75
76         if (fnmatch(matcher->plugin, vl->plugin, 0))
77                 return false;
78         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
79                 return false;
80
81         if (fnmatch(matcher->type, vl->type, 0))
82                 return false;
83         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
84                 return false;
85
86         return true;
87 } /* ident_matches */
88
89 /*
90  * proto conversion
91  */
92
93 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
94 {
95         msg->set_host(vl->host);
96         msg->set_plugin(vl->plugin);
97         if (vl->plugin_instance[0] != '\0')
98                 msg->set_plugin_instance(vl->plugin_instance);
99         msg->set_type(vl->type);
100         if (vl->type_instance[0] != '\0')
101                 msg->set_type_instance(vl->type_instance);
102 } /* marshal_ident */
103
104 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
105                 bool require_fields)
106 {
107         std::string s;
108
109         s = msg.host();
110         if (!s.length() && require_fields)
111                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
112                                 grpc::string("missing host name"));
113         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
114
115         s = msg.plugin();
116         if (!s.length() && require_fields)
117                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
118                                 grpc::string("missing plugin name"));
119         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
120
121         s = msg.type();
122         if (!s.length() && require_fields)
123                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
124                                 grpc::string("missing type name"));
125         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
126
127         s = msg.plugin_instance();
128         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
129
130         s = msg.type_instance();
131         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
132
133         return grpc::Status::OK;
134 } /* unmarshal_ident() */
135
136 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
137 {
138         auto id = msg->mutable_identifier();
139         marshal_ident(vl, id);
140
141         auto ds = plugin_get_ds(vl->type);
142         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
143                 return grpc::Status(grpc::StatusCode::INTERNAL,
144                                 grpc::string("failed to retrieve data-set for values"));
145         }
146
147         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
148         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
149         msg->set_allocated_time(new google::protobuf::Timestamp(t));
150         msg->set_allocated_interval(new google::protobuf::Duration(d));
151
152         for (size_t i = 0; i < vl->values_len; ++i) {
153                 auto v = msg->add_value();
154                 switch (ds->ds[i].type) {
155                         case DS_TYPE_COUNTER:
156                                 v->set_counter(vl->values[i].counter);
157                                 break;
158                         case DS_TYPE_GAUGE:
159                                 v->set_gauge(vl->values[i].gauge);
160                                 break;
161                         case DS_TYPE_DERIVE:
162                                 v->set_derive(vl->values[i].derive);
163                                 break;
164                         case DS_TYPE_ABSOLUTE:
165                                 v->set_absolute(vl->values[i].absolute);
166                                 break;
167                         default:
168                                 return grpc::Status(grpc::StatusCode::INTERNAL,
169                                                 grpc::string("unknown value type"));
170                 }
171         }
172
173         return grpc::Status::OK;
174 } /* marshal_value_list */
175
176 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
177 {
178         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
179         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
180
181         auto status = unmarshal_ident(msg.identifier(), vl, true);
182         if (!status.ok())
183                 return status;
184
185         value_t *values = NULL;
186         size_t values_len = 0;
187
188         status = grpc::Status::OK;
189         for (auto v : msg.value()) {
190                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
191                 if (!val) {
192                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
193                                         grpc::string("failed to allocate values array"));
194                         break;
195                 }
196
197                 values = val;
198                 val = values + values_len;
199                 values_len++;
200
201                 switch (v.value_case()) {
202                 case collectd::types::Value::ValueCase::kCounter:
203                         val->counter = counter_t(v.counter());
204                         break;
205                 case collectd::types::Value::ValueCase::kGauge:
206                         val->gauge = gauge_t(v.gauge());
207                         break;
208                 case collectd::types::Value::ValueCase::kDerive:
209                         val->derive = derive_t(v.derive());
210                         break;
211                 case collectd::types::Value::ValueCase::kAbsolute:
212                         val->absolute = absolute_t(v.absolute());
213                         break;
214                 default:
215                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
216                                         grpc::string("unknown value type"));
217                         break;
218                 }
219
220                 if (!status.ok())
221                         break;
222         }
223         if (status.ok()) {
224                 vl->values = values;
225                 vl->values_len = values_len;
226         }
227         else if (values) {
228                 free(values);
229         }
230
231         return status;
232 } /* unmarshal_value_list() */
233
234 /*
235  * request call-backs and call objects
236  */
237
238 static grpc::Status Process(grpc::ServerContext *ctx,
239                 DispatchValuesRequest request, DispatchValuesReply *reply)
240 {
241         value_list_t vl = VALUE_LIST_INIT;
242         auto status = unmarshal_value_list(request.values(), &vl);
243         if (!status.ok())
244                 return status;
245
246         if (plugin_dispatch_values(&vl))
247                 status = grpc::Status(grpc::StatusCode::INTERNAL,
248                                 grpc::string("failed to enqueue values for writing"));
249         return status;
250 } /* Process(): DispatchValues */
251
252 static grpc::Status Process(grpc::ServerContext *ctx,
253                 QueryValuesRequest request, QueryValuesReply *reply)
254 {
255         uc_iter_t *iter;
256         char *name = NULL;
257
258         value_list_t matcher;
259         auto status = unmarshal_ident(request.identifier(), &matcher, false);
260         if (!status.ok())
261                 return status;
262
263         if ((iter = uc_get_iterator()) == NULL) {
264                 return grpc::Status(grpc::StatusCode::INTERNAL,
265                                 grpc::string("failed to query values: cannot create iterator"));
266         }
267
268         while (uc_iterator_next(iter, &name) == 0) {
269                 value_list_t res;
270                 if (parse_identifier_vl(name, &res) != 0)
271                         return grpc::Status(grpc::StatusCode::INTERNAL,
272                                         grpc::string("failed to parse identifier"));
273
274                 if (!ident_matches(&res, &matcher))
275                         continue;
276
277                 if (uc_iterator_get_time(iter, &res.time) < 0)
278                         return grpc::Status(grpc::StatusCode::INTERNAL,
279                                         grpc::string("failed to retrieve value timestamp"));
280                 if (uc_iterator_get_interval(iter, &res.interval) < 0)
281                         return grpc::Status(grpc::StatusCode::INTERNAL,
282                                         grpc::string("failed to retrieve value interval"));
283                 if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
284                         return grpc::Status(grpc::StatusCode::INTERNAL,
285                                         grpc::string("failed to retrieve values"));
286
287                 auto vl = reply->add_values();
288                 status = marshal_value_list(&res, vl);
289                 free(res.values);
290                 if (!status.ok())
291                         return status;
292         }
293
294         uc_iterator_destroy(iter);
295
296         return grpc::Status::OK;
297 } /* Process(): QueryValues */
298
299 class Call
300 {
301 public:
302         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
303                 : service_(service), cq_(cq), status_(CREATE)
304         { }
305
306         virtual ~Call()
307         { }
308
309         void Handle()
310         {
311                 if (status_ == CREATE) {
312                         Create();
313                         status_ = PROCESS;
314                 }
315                 else if (status_ == PROCESS) {
316                         Process();
317                         status_ = FINISH;
318                 }
319                 else {
320                         GPR_ASSERT(status_ == FINISH);
321                         Finish();
322                 }
323         } /* Handle() */
324
325 protected:
326         virtual void Create() = 0;
327         virtual void Process() = 0;
328         virtual void Finish() = 0;
329
330         Collectd::AsyncService *service_;
331         grpc::ServerCompletionQueue *cq_;
332         grpc::ServerContext ctx_;
333
334 private:
335         enum CallStatus { CREATE, PROCESS, FINISH };
336         CallStatus status_;
337 }; /* class Call */
338
339 template<typename RequestT, typename ReplyT>
340 class RpcCall final : public Call
341 {
342         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
343                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
344                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
345
346 public:
347         RpcCall(Collectd::AsyncService *service,
348                         CreatorT creator, grpc::ServerCompletionQueue *cq)
349                 : Call(service, cq), creator_(creator), responder_(&ctx_)
350         {
351                 Handle();
352         } /* RpcCall() */
353
354         virtual ~RpcCall()
355         { }
356
357 private:
358         void Create()
359         {
360                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
361         } /* Create() */
362
363         void Process()
364         {
365                 // Add a new request object to the queue.
366                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
367                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
368                 responder_.Finish(reply_, status, this);
369         } /* Process() */
370
371         void Finish()
372         {
373                 delete this;
374         } /* Finish() */
375
376         CreatorT creator_;
377
378         RequestT request_;
379         ReplyT reply_;
380
381         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
382 }; /* class RpcCall */
383
384 /*
385  * gRPC server implementation
386  */
387
388 class CollectdServer final
389 {
390 public:
391         void Start()
392         {
393                 // TODO: make configurable
394                 auto auth = grpc::InsecureServerCredentials();
395
396                 grpc::ServerBuilder builder;
397
398                 if (listeners.empty()) {
399                         builder.AddListeningPort(default_addr, auth);
400                         INFO("grpc: Listening on %s", default_addr.c_str());
401                 }
402                 else {
403                         for (auto l : listeners) {
404                                 grpc::string addr = l.addr + ":" + l.port;
405                                 builder.AddListeningPort(addr, auth);
406                                 INFO("grpc: Listening on %s", addr.c_str());
407                         }
408                 }
409
410                 builder.RegisterService(&service_);
411                 cq_ = builder.AddCompletionQueue();
412                 server_ = builder.BuildAndStart();
413         } /* Start() */
414
415         void Shutdown()
416         {
417                 server_->Shutdown();
418                 cq_->Shutdown();
419         } /* Shutdown() */
420
421         void Mainloop()
422         {
423                 // Register request types.
424                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
425                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
426                 new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
427                                 &Collectd::AsyncService::RequestQueryValues, cq_.get());
428
429                 while (true) {
430                         void *req = NULL;
431                         bool ok = false;
432
433                         if (!cq_->Next(&req, &ok))
434                                 break; // Queue shut down.
435                         if (!ok) {
436                                 ERROR("grpc: Failed to read from queue");
437                                 break;
438                         }
439
440                         static_cast<Call *>(req)->Handle();
441                 }
442         } /* Mainloop() */
443
444 private:
445         Collectd::AsyncService service_;
446
447         std::unique_ptr<grpc::Server> server_;
448         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
449 }; /* class CollectdServer */
450
451 static CollectdServer *server = nullptr;
452
453 /*
454  * collectd plugin interface
455  */
456
457 extern "C" {
458         static pthread_t *workers;
459         static size_t workers_num = 5;
460
461         static void *worker_thread(void *arg)
462         {
463                 CollectdServer *s = (CollectdServer *)arg;
464                 s->Mainloop();
465                 return NULL;
466         } /* worker_thread() */
467
468         static int c_grpc_config_listen(oconfig_item_t *ci)
469         {
470                 if ((ci->values_num != 2)
471                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
472                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
473                         ERROR("grpc: The `%s` config option needs exactly "
474                                         "two string argument (address and port).", ci->key);
475                         return -1;
476                 }
477
478                 auto listener = Listener();
479                 listener.addr = grpc::string(ci->values[0].value.string);
480                 listener.port = grpc::string(ci->values[1].value.string);
481                 listeners.push_back(listener);
482
483                 for (int i = 0; i < ci->children_num; i++) {
484                         oconfig_item_t *child = ci->children + i;
485                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
486                                         child->key, ci->key);
487                 }
488
489                 return 0;
490         } /* c_grpc_config_listen() */
491
492         static int c_grpc_config(oconfig_item_t *ci)
493         {
494                 int i;
495
496                 for (i = 0; i < ci->children_num; i++) {
497                         oconfig_item_t *child = ci->children + i;
498
499                         if (!strcasecmp("Listen", child->key)) {
500                                 if (c_grpc_config_listen(child))
501                                         return -1;
502                         }
503                         else if (!strcasecmp("WorkerThreads", child->key)) {
504                                 int n;
505                                 if (cf_util_get_int(child, &n))
506                                         return -1;
507                                 workers_num = (size_t)n;
508                         }
509                         else {
510                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
511                         }
512                 }
513
514                 return 0;
515         } /* c_grpc_config() */
516
517         static int c_grpc_init(void)
518         {
519                 server = new CollectdServer();
520                 size_t i;
521
522                 if (! server) {
523                         ERROR("grpc: Failed to create server");
524                         return -1;
525                 }
526
527                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
528                 if (! workers) {
529                         delete server;
530                         server = nullptr;
531
532                         ERROR("grpc: Failed to allocate worker threads");
533                         return -1;
534                 }
535
536                 server->Start();
537                 for (i = 0; i < workers_num; i++) {
538                         plugin_thread_create(&workers[i], /* attr = */ NULL,
539                                         worker_thread, server);
540                 }
541                 INFO("grpc: Started %zu workers", workers_num);
542                 return 0;
543         } /* c_grpc_init() */
544
545         static int c_grpc_shutdown(void)
546         {
547                 size_t i;
548
549                 if (!server)
550                         return -1;
551
552                 server->Shutdown();
553
554                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
555                 for (i = 0; i < workers_num; i++)
556                         pthread_join(workers[i], NULL);
557                 free(workers);
558                 workers = NULL;
559                 workers_num = 0;
560
561                 delete server;
562                 server = nullptr;
563
564                 return 0;
565         } /* c_grpc_shutdown() */
566
567         void module_register(void)
568         {
569                 plugin_register_complex_config("grpc", c_grpc_config);
570                 plugin_register_init("grpc", c_grpc_init);
571                 plugin_register_shutdown("grpc", c_grpc_shutdown);
572         } /* module_register() */
573 } /* extern "C" */
574
575 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */