grpc plugin: Replace the ListValues RPC with QueryValues.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015-2016 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include "collectd.grpc.pb.h"
31
32 extern "C" {
33 #include <fnmatch.h>
34 #include <stdbool.h>
35 #include <pthread.h>
36
37 #include "collectd.h"
38 #include "common.h"
39 #include "configfile.h"
40 #include "plugin.h"
41
42 #include "daemon/utils_cache.h"
43
44         typedef struct {
45                 char *addr;
46                 char *port;
47         } listener_t;
48
49         static listener_t *listeners;
50         static size_t listeners_num;
51 }
52
53 using collectd::Collectd;
54
55 using collectd::DispatchValuesRequest;
56 using collectd::DispatchValuesReply;
57 using collectd::QueryValuesRequest;
58 using collectd::QueryValuesReply;
59
60 using google::protobuf::util::TimeUtil;
61
62 /*
63  * helper functions
64  */
65
66 static bool ident_matches(const value_list_t *vl, const value_list_t *matcher)
67 {
68         if (fnmatch(matcher->host, vl->host, 0))
69                 return false;
70
71         if (fnmatch(matcher->plugin, vl->plugin, 0))
72                 return false;
73         if (fnmatch(matcher->plugin_instance, vl->plugin_instance, 0))
74                 return false;
75
76         if (fnmatch(matcher->type, vl->type, 0))
77                 return false;
78         if (fnmatch(matcher->type_instance, vl->type_instance, 0))
79                 return false;
80
81         return true;
82 } /* ident_matches */
83
84 /*
85  * proto conversion
86  */
87
88 static void marshal_ident(const value_list_t *vl, collectd::types::Identifier *msg)
89 {
90         msg->set_host(vl->host);
91         msg->set_plugin(vl->plugin);
92         if (vl->plugin_instance[0] != '\0')
93                 msg->set_plugin_instance(vl->plugin_instance);
94         msg->set_type(vl->type);
95         if (vl->type_instance[0] != '\0')
96                 msg->set_type_instance(vl->type_instance);
97 } /* marshal_ident */
98
99 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl,
100                 bool require_fields)
101 {
102         std::string s;
103
104         s = msg.host();
105         if (!s.length() && require_fields)
106                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
107                                 grpc::string("missing host name"));
108         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
109
110         s = msg.plugin();
111         if (!s.length() && require_fields)
112                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
113                                 grpc::string("missing plugin name"));
114         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
115
116         s = msg.type();
117         if (!s.length() && require_fields)
118                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
119                                 grpc::string("missing type name"));
120         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
121
122         s = msg.plugin_instance();
123         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
124
125         s = msg.type_instance();
126         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
127
128         return grpc::Status::OK;
129 } /* unmarshal_ident() */
130
131 static grpc::Status marshal_value_list(const value_list_t *vl, collectd::types::ValueList *msg)
132 {
133         auto id = msg->mutable_identifier();
134         marshal_ident(vl, id);
135
136         auto ds = plugin_get_ds(vl->type);
137         if ((ds == NULL) || (ds->ds_num != vl->values_len)) {
138                 return grpc::Status(grpc::StatusCode::INTERNAL,
139                                 grpc::string("failed to retrieve data-set for values"));
140         }
141
142         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(vl->time));
143         auto d = TimeUtil::NanosecondsToDuration(CDTIME_T_TO_NS(vl->interval));
144         msg->set_allocated_time(new google::protobuf::Timestamp(t));
145         msg->set_allocated_interval(new google::protobuf::Duration(d));
146
147         for (size_t i = 0; i < vl->values_len; ++i) {
148                 auto v = msg->add_value();
149                 switch (ds->ds[i].type) {
150                         case DS_TYPE_COUNTER:
151                                 v->set_counter(vl->values[i].counter);
152                                 break;
153                         case DS_TYPE_GAUGE:
154                                 v->set_gauge(vl->values[i].gauge);
155                                 break;
156                         case DS_TYPE_DERIVE:
157                                 v->set_derive(vl->values[i].derive);
158                                 break;
159                         case DS_TYPE_ABSOLUTE:
160                                 v->set_absolute(vl->values[i].absolute);
161                                 break;
162                         default:
163                                 return grpc::Status(grpc::StatusCode::INTERNAL,
164                                                 grpc::string("unknown value type"));
165                 }
166         }
167
168         return grpc::Status::OK;
169 } /* marshal_value_list */
170
171 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
172 {
173         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
174         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
175
176         auto status = unmarshal_ident(msg.identifier(), vl, true);
177         if (!status.ok())
178                 return status;
179
180         value_t *values = NULL;
181         size_t values_len = 0;
182
183         status = grpc::Status::OK;
184         for (auto v : msg.value()) {
185                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
186                 if (!val) {
187                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
188                                         grpc::string("failed to allocate values array"));
189                         break;
190                 }
191
192                 values = val;
193                 val = values + values_len;
194                 values_len++;
195
196                 switch (v.value_case()) {
197                 case collectd::types::Value::ValueCase::kCounter:
198                         val->counter = counter_t(v.counter());
199                         break;
200                 case collectd::types::Value::ValueCase::kGauge:
201                         val->gauge = gauge_t(v.gauge());
202                         break;
203                 case collectd::types::Value::ValueCase::kDerive:
204                         val->derive = derive_t(v.derive());
205                         break;
206                 case collectd::types::Value::ValueCase::kAbsolute:
207                         val->absolute = absolute_t(v.absolute());
208                         break;
209                 default:
210                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
211                                         grpc::string("unknown value type"));
212                         break;
213                 }
214
215                 if (!status.ok())
216                         break;
217         }
218         if (status.ok()) {
219                 vl->values = values;
220                 vl->values_len = values_len;
221         }
222         else if (values) {
223                 free(values);
224         }
225
226         return status;
227 } /* unmarshal_value_list() */
228
229 /*
230  * request call-backs and call objects
231  */
232
233 static grpc::Status Process(grpc::ServerContext *ctx,
234                 DispatchValuesRequest request, DispatchValuesReply *reply)
235 {
236         value_list_t vl = VALUE_LIST_INIT;
237         auto status = unmarshal_value_list(request.values(), &vl);
238         if (!status.ok())
239                 return status;
240
241         if (plugin_dispatch_values(&vl))
242                 status = grpc::Status(grpc::StatusCode::INTERNAL,
243                                 grpc::string("failed to enqueue values for writing"));
244         return status;
245 } /* Process(): DispatchValues */
246
247 static grpc::Status Process(grpc::ServerContext *ctx,
248                 QueryValuesRequest request, QueryValuesReply *reply)
249 {
250         uc_iter_t *iter;
251         char *name = NULL;
252
253         value_list_t matcher;
254         auto status = unmarshal_ident(request.identifier(), &matcher, false);
255         if (!status.ok())
256                 return status;
257
258         if ((iter = uc_get_iterator()) == NULL) {
259                 return grpc::Status(grpc::StatusCode::INTERNAL,
260                                 grpc::string("failed to query values: cannot create iterator"));
261         }
262
263         while (uc_iterator_next(iter, &name) == 0) {
264                 value_list_t res;
265                 if (parse_identifier_vl(name, &res) != 0)
266                         return grpc::Status(grpc::StatusCode::INTERNAL,
267                                         grpc::string("failed to parse identifier"));
268
269                 if (!ident_matches(&res, &matcher))
270                         continue;
271
272                 if (uc_iterator_get_time(iter, &res.time) < 0)
273                         return grpc::Status(grpc::StatusCode::INTERNAL,
274                                         grpc::string("failed to retrieve value timestamp"));
275                 if (uc_iterator_get_interval(iter, &res.interval) < 0)
276                         return grpc::Status(grpc::StatusCode::INTERNAL,
277                                         grpc::string("failed to retrieve value interval"));
278                 if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
279                         return grpc::Status(grpc::StatusCode::INTERNAL,
280                                         grpc::string("failed to retrieve values"));
281
282                 auto vl = reply->add_values();
283                 status = marshal_value_list(&res, vl);
284                 free(res.values);
285                 if (!status.ok())
286                         return status;
287         }
288
289         uc_iterator_destroy(iter);
290
291         return grpc::Status::OK;
292 } /* Process(): QueryValues */
293
294 class Call
295 {
296 public:
297         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
298                 : service_(service), cq_(cq), status_(CREATE)
299         { }
300
301         virtual ~Call()
302         { }
303
304         void Handle()
305         {
306                 if (status_ == CREATE) {
307                         Create();
308                         status_ = PROCESS;
309                 }
310                 else if (status_ == PROCESS) {
311                         Process();
312                         status_ = FINISH;
313                 }
314                 else {
315                         GPR_ASSERT(status_ == FINISH);
316                         Finish();
317                 }
318         } /* Handle() */
319
320 protected:
321         virtual void Create() = 0;
322         virtual void Process() = 0;
323         virtual void Finish() = 0;
324
325         Collectd::AsyncService *service_;
326         grpc::ServerCompletionQueue *cq_;
327         grpc::ServerContext ctx_;
328
329 private:
330         enum CallStatus { CREATE, PROCESS, FINISH };
331         CallStatus status_;
332 }; /* class Call */
333
334 template<typename RequestT, typename ReplyT>
335 class RpcCall final : public Call
336 {
337         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
338                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
339                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
340
341 public:
342         RpcCall(Collectd::AsyncService *service,
343                         CreatorT creator, grpc::ServerCompletionQueue *cq)
344                 : Call(service, cq), creator_(creator), responder_(&ctx_)
345         {
346                 Handle();
347         } /* RpcCall() */
348
349         virtual ~RpcCall()
350         { }
351
352 private:
353         void Create()
354         {
355                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
356         } /* Create() */
357
358         void Process()
359         {
360                 // Add a new request object to the queue.
361                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
362                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
363                 responder_.Finish(reply_, status, this);
364         } /* Process() */
365
366         void Finish()
367         {
368                 delete this;
369         } /* Finish() */
370
371         CreatorT creator_;
372
373         RequestT request_;
374         ReplyT reply_;
375
376         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
377 }; /* class RpcCall */
378
379 /*
380  * gRPC server implementation
381  */
382
383 class CollectdServer final
384 {
385 public:
386         void Start()
387         {
388                 // TODO: make configurable
389                 auto auth = grpc::InsecureServerCredentials();
390
391                 grpc::ServerBuilder builder;
392
393                 if (!listeners_num) {
394                         std::string default_addr("0.0.0.0:50051");
395                         builder.AddListeningPort(default_addr, auth);
396                         INFO("grpc: Listening on %s", default_addr.c_str());
397                 }
398                 else {
399                         size_t i;
400                         for (i = 0; i < listeners_num; i++) {
401                                 auto l = listeners[i];
402                                 std::string addr(l.addr);
403                                 addr += std::string(":") + std::string(l.port);
404                                 builder.AddListeningPort(addr, auth);
405                                 INFO("grpc: Listening on %s", addr.c_str());
406                         }
407                 }
408
409                 builder.RegisterService(&service_);
410                 cq_ = builder.AddCompletionQueue();
411                 server_ = builder.BuildAndStart();
412         } /* Start() */
413
414         void Shutdown()
415         {
416                 server_->Shutdown();
417                 cq_->Shutdown();
418         } /* Shutdown() */
419
420         void Mainloop()
421         {
422                 // Register request types.
423                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
424                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
425                 new RpcCall<QueryValuesRequest, QueryValuesReply>(&service_,
426                                 &Collectd::AsyncService::RequestQueryValues, cq_.get());
427
428                 while (true) {
429                         void *req = NULL;
430                         bool ok = false;
431
432                         if (!cq_->Next(&req, &ok))
433                                 break; // Queue shut down.
434                         if (!ok) {
435                                 ERROR("grpc: Failed to read from queue");
436                                 break;
437                         }
438
439                         static_cast<Call *>(req)->Handle();
440                 }
441         } /* Mainloop() */
442
443 private:
444         Collectd::AsyncService service_;
445
446         std::unique_ptr<grpc::Server> server_;
447         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
448 }; /* class CollectdServer */
449
450 static CollectdServer *server = nullptr;
451
452 /*
453  * collectd plugin interface
454  */
455
456 extern "C" {
457         static pthread_t *workers;
458         static size_t workers_num = 5;
459
460         static void *worker_thread(void *arg)
461         {
462                 CollectdServer *s = (CollectdServer *)arg;
463                 s->Mainloop();
464                 return NULL;
465         } /* worker_thread() */
466
467         static int c_grpc_config_listen(oconfig_item_t *ci)
468         {
469                 listener_t *listener;
470                 int i;
471
472                 if ((ci->values_num != 2)
473                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
474                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
475                         ERROR("grpc: The `%s` config option needs exactly "
476                                         "two string argument (address and port).", ci->key);
477                         return -1;
478                 }
479
480                 listener = (listener_t *)realloc(listeners,
481                                 (listeners_num + 1) * sizeof(*listeners));
482                 if (!listener) {
483                         ERROR("grpc: Failed to allocate listeners");
484                         return -1;
485                 }
486                 listeners = listener;
487                 listener = listeners + listeners_num;
488                 listeners_num++;
489
490                 listener->addr = strdup(ci->values[0].value.string);
491                 listener->port = strdup(ci->values[1].value.string);
492
493                 for (i = 0; i < ci->children_num; i++) {
494                         oconfig_item_t *child = ci->children + i;
495                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
496                                         child->key, ci->key);
497                 }
498
499                 return 0;
500         } /* c_grpc_config_listen() */
501
502         static int c_grpc_config(oconfig_item_t *ci)
503         {
504                 int i;
505
506                 for (i = 0; i < ci->children_num; i++) {
507                         oconfig_item_t *child = ci->children + i;
508
509                         if (!strcasecmp("Listen", child->key)) {
510                                 if (c_grpc_config_listen(child))
511                                         return -1;
512                         }
513                         else if (!strcasecmp("WorkerThreads", child->key)) {
514                                 int n;
515                                 if (cf_util_get_int(child, &n))
516                                         return -1;
517                                 workers_num = (size_t)n;
518                         }
519                         else {
520                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
521                         }
522                 }
523
524                 return 0;
525         } /* c_grpc_config() */
526
527         static int c_grpc_init(void)
528         {
529                 server = new CollectdServer();
530                 size_t i;
531
532                 if (! server) {
533                         ERROR("grpc: Failed to create server");
534                         return -1;
535                 }
536
537                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
538                 if (! workers) {
539                         delete server;
540                         server = nullptr;
541
542                         ERROR("grpc: Failed to allocate worker threads");
543                         return -1;
544                 }
545
546                 server->Start();
547                 for (i = 0; i < workers_num; i++) {
548                         plugin_thread_create(&workers[i], /* attr = */ NULL,
549                                         worker_thread, server);
550                 }
551                 INFO("grpc: Started %zu workers", workers_num);
552                 return 0;
553         } /* c_grpc_init() */
554
555         static int c_grpc_shutdown(void)
556         {
557                 size_t i;
558
559                 if (!server)
560                         return -1;
561
562                 server->Shutdown();
563
564                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
565                 for (i = 0; i < workers_num; i++)
566                         pthread_join(workers[i], NULL);
567                 free(workers);
568                 workers = NULL;
569                 workers_num = 0;
570
571                 delete server;
572                 server = nullptr;
573
574                 return 0;
575         } /* c_grpc_shutdown() */
576
577         void module_register(void)
578         {
579                 plugin_register_complex_config("grpc", c_grpc_config);
580                 plugin_register_init("grpc", c_grpc_init);
581                 plugin_register_shutdown("grpc", c_grpc_shutdown);
582         } /* module_register() */
583 } /* extern "C" */
584
585 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */