a1abe4a292d97cb8256afe475456d046b14c2167
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include "collectd.grpc.pb.h"
31
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
35
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40
41 #include "daemon/utils_cache.h"
42
43         typedef struct {
44                 char *addr;
45                 char *port;
46         } listener_t;
47
48         static listener_t *listeners;
49         static size_t listeners_num;
50 }
51
52 using collectd::Collectd;
53
54 using collectd::DispatchValuesRequest;
55 using collectd::DispatchValuesReply;
56 using collectd::ListValuesRequest;
57 using collectd::ListValuesReply;
58
59 using google::protobuf::util::TimeUtil;
60
61 /*
62  * proto conversion
63  */
64
65 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
66 {
67         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
68         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
69
70         std::string s;
71
72         s = msg.host();
73         if (!s.length())
74                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
75                                 grpc::string("missing host name"));
76         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
77
78         s = msg.plugin();
79         if (!s.length())
80                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
81                                 grpc::string("missing plugin name"));
82         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
83
84         s = msg.type();
85         if (!s.length())
86                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
87                                 grpc::string("missing type name"));
88         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
89
90         s = msg.plugin_instance();
91         if (s.length())
92                 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
93
94         s = msg.type_instance();
95         if (s.length())
96                 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
97
98         value_t *values = NULL;
99         size_t values_len = 0;
100         auto status = grpc::Status::OK;
101
102         for (auto v : msg.value()) {
103                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
104                 if (!val) {
105                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
106                                         grpc::string("failed to allocate values array"));
107                         break;
108                 }
109
110                 values = val;
111                 val = values + values_len;
112                 values_len++;
113
114                 switch (v.value_case()) {
115                 case collectd::types::Value::ValueCase::kCounter:
116                         val->counter = counter_t(v.counter());
117                         break;
118                 case collectd::types::Value::ValueCase::kGauge:
119                         val->gauge = gauge_t(v.gauge());
120                         break;
121                 case collectd::types::Value::ValueCase::kDerive:
122                         val->derive = derive_t(v.derive());
123                         break;
124                 case collectd::types::Value::ValueCase::kAbsolute:
125                         val->absolute = absolute_t(v.absolute());
126                         break;
127                 default:
128                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
129                                         grpc::string("unkown value type"));
130                         break;
131                 }
132
133                 if (!status.ok())
134                         break;
135         }
136         if (status.ok()) {
137                 vl->values = values;
138                 vl->values_len = values_len;
139         }
140         else if (values) {
141                 free(values);
142         }
143
144         return status;
145 } /* unmarshal_value_list() */
146
147 /*
148  * request call-backs and call objects
149  */
150
151 static grpc::Status Process(grpc::ServerContext *ctx,
152                 DispatchValuesRequest request, DispatchValuesReply *reply)
153 {
154         value_list_t vl = VALUE_LIST_INIT;
155         auto status = unmarshal_value_list(request.values(), &vl);
156         if (!status.ok())
157                 return status;
158
159         if (plugin_dispatch_values(&vl))
160                 status = grpc::Status(grpc::StatusCode::INTERNAL,
161                                 grpc::string("failed to enqueue values for writing"));
162         return status;
163 } /* Process(): DispatchValues */
164
165 static grpc::Status Process(grpc::ServerContext *ctx,
166                 ListValuesRequest request, ListValuesReply *reply)
167 {
168         char **names = NULL;
169         cdtime_t *times = NULL;
170         size_t i, n = 0;
171
172         if (uc_get_names(&names, &times, &n))
173                 return grpc::Status(grpc::StatusCode::INTERNAL,
174                                 grpc::string("failed to retrieve values"));
175
176         for (i = 0; i < n; i++) {
177                 auto v = reply->add_value();
178                 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
179                 v->set_name(names[i]);
180                 v->set_allocated_time(new google::protobuf::Timestamp(t));
181                 sfree(names[i]);
182         }
183         sfree(names);
184         sfree(times);
185
186         return grpc::Status::OK;
187 } /* Process(): ListValues */
188
189 class Call
190 {
191 public:
192         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
193                 : service_(service), cq_(cq), status_(CREATE)
194         { }
195
196         virtual ~Call()
197         { }
198
199         void Handle()
200         {
201                 if (status_ == CREATE) {
202                         Create();
203                         status_ = PROCESS;
204                 }
205                 else if (status_ == PROCESS) {
206                         Process();
207                         status_ = FINISH;
208                 }
209                 else {
210                         GPR_ASSERT(status_ == FINISH);
211                         Finish();
212                 }
213         } /* Handle() */
214
215 protected:
216         virtual void Create() = 0;
217         virtual void Process() = 0;
218         virtual void Finish() = 0;
219
220         Collectd::AsyncService *service_;
221         grpc::ServerCompletionQueue *cq_;
222         grpc::ServerContext ctx_;
223
224 private:
225         enum CallStatus { CREATE, PROCESS, FINISH };
226         CallStatus status_;
227 }; /* class Call */
228
229 template<typename RequestT, typename ReplyT>
230 class RpcCall final : public Call
231 {
232         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
233                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
234                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
235
236 public:
237         RpcCall(Collectd::AsyncService *service,
238                         CreatorT creator, grpc::ServerCompletionQueue *cq)
239                 : Call(service, cq), creator_(creator), responder_(&ctx_)
240         {
241                 Handle();
242         } /* RpcCall() */
243
244         virtual ~RpcCall()
245         { }
246
247 private:
248         void Create()
249         {
250                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
251         } /* Create() */
252
253         void Process()
254         {
255                 // Add a new request object to the queue.
256                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
257                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
258                 responder_.Finish(reply_, status, this);
259         } /* Process() */
260
261         void Finish()
262         {
263                 delete this;
264         } /* Finish() */
265
266         CreatorT creator_;
267
268         RequestT request_;
269         ReplyT reply_;
270
271         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
272 }; /* class RpcCall */
273
274 /*
275  * gRPC server implementation
276  */
277
278 class CollectdServer final
279 {
280 public:
281         void Start()
282         {
283                 // TODO: make configurable
284                 auto auth = grpc::InsecureServerCredentials();
285
286                 grpc::ServerBuilder builder;
287
288                 if (!listeners_num) {
289                         std::string default_addr("0.0.0.0:50051");
290                         builder.AddListeningPort(default_addr, auth);
291                         INFO("grpc: Listening on %s", default_addr.c_str());
292                 }
293                 else {
294                         size_t i;
295                         for (i = 0; i < listeners_num; i++) {
296                                 auto l = listeners[i];
297                                 std::string addr(l.addr);
298                                 addr += std::string(":") + std::string(l.port);
299                                 builder.AddListeningPort(addr, auth);
300                                 INFO("grpc: Listening on %s", addr.c_str());
301                         }
302                 }
303
304                 builder.RegisterService(&service_);
305                 cq_ = builder.AddCompletionQueue();
306                 server_ = builder.BuildAndStart();
307         } /* Start() */
308
309         void Shutdown()
310         {
311                 server_->Shutdown();
312                 cq_->Shutdown();
313         } /* Shutdown() */
314
315         void Mainloop()
316         {
317                 // Register request types.
318                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
319                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
320                 new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
321                                 &Collectd::AsyncService::RequestListValues, cq_.get());
322
323                 while (true) {
324                         void *req = NULL;
325                         bool ok = false;
326
327                         if (!cq_->Next(&req, &ok))
328                                 break; // Queue shut down.
329                         if (!ok) {
330                                 ERROR("grpc: Failed to read from queue");
331                                 break;
332                         }
333
334                         static_cast<Call *>(req)->Handle();
335                 }
336         } /* Mainloop() */
337
338 private:
339         Collectd::AsyncService service_;
340
341         std::unique_ptr<grpc::Server> server_;
342         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
343 }; /* class CollectdServer */
344
345 static CollectdServer *server = nullptr;
346
347 /*
348  * collectd plugin interface
349  */
350
351 extern "C" {
352         static pthread_t *workers;
353         static size_t workers_num = 5;
354
355         static void *worker_thread(void *arg)
356         {
357                 CollectdServer *s = (CollectdServer *)arg;
358                 s->Mainloop();
359                 return NULL;
360         } /* worker_thread() */
361
362         static int c_grpc_config_listen(oconfig_item_t *ci)
363         {
364                 listener_t *listener;
365                 int i;
366
367                 if ((ci->values_num != 2)
368                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
369                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
370                         ERROR("grpc: The `%s` config option needs exactly "
371                                         "two string argument (address and port).", ci->key);
372                         return -1;
373                 }
374
375                 listener = (listener_t *)realloc(listeners,
376                                 (listeners_num + 1) * sizeof(*listeners));
377                 if (!listener) {
378                         ERROR("grpc: Failed to allocate listeners");
379                         return -1;
380                 }
381                 listeners = listener;
382                 listener = listeners + listeners_num;
383                 listeners_num++;
384
385                 listener->addr = strdup(ci->values[0].value.string);
386                 listener->port = strdup(ci->values[1].value.string);
387
388                 for (i = 0; i < ci->children_num; i++) {
389                         oconfig_item_t *child = ci->children + i;
390                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
391                                         child->key, ci->key);
392                 }
393
394                 return 0;
395         } /* c_grpc_config_listen() */
396
397         static int c_grpc_config(oconfig_item_t *ci)
398         {
399                 int i;
400
401                 for (i = 0; i < ci->children_num; i++) {
402                         oconfig_item_t *child = ci->children + i;
403
404                         if (!strcasecmp("Listen", child->key)) {
405                                 if (c_grpc_config_listen(child))
406                                         return -1;
407                         }
408                         else if (!strcasecmp("WorkerThreads", child->key)) {
409                                 int n;
410                                 if (cf_util_get_int(child, &n))
411                                         return -1;
412                                 workers_num = (size_t)n;
413                         }
414                         else {
415                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
416                         }
417                 }
418
419                 return 0;
420         } /* c_grpc_config() */
421
422         static int c_grpc_init(void)
423         {
424                 server = new CollectdServer();
425                 size_t i;
426
427                 if (! server) {
428                         ERROR("grpc: Failed to create server");
429                         return -1;
430                 }
431
432                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
433                 if (! workers) {
434                         delete server;
435                         server = nullptr;
436
437                         ERROR("grpc: Failed to allocate worker threads");
438                         return -1;
439                 }
440
441                 server->Start();
442                 for (i = 0; i < workers_num; i++) {
443                         pthread_create(&workers[i], /* attr = */ NULL,
444                                         worker_thread, server);
445                 }
446                 INFO("grpc: Started %zu workers", workers_num);
447                 return 0;
448         } /* c_grpc_init() */
449
450         static int c_grpc_shutdown(void)
451         {
452                 size_t i;
453
454                 if (!server)
455                         return -1;
456
457                 server->Shutdown();
458
459                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
460                 for (i = 0; i < workers_num; i++)
461                         pthread_join(workers[i], NULL);
462                 free(workers);
463                 workers = NULL;
464                 workers_num = 0;
465
466                 delete server;
467                 server = nullptr;
468
469                 return 0;
470         } /* c_grpc_shutdown() */
471
472         void module_register(void)
473         {
474                 plugin_register_complex_config("grpc", c_grpc_config);
475                 plugin_register_init("grpc", c_grpc_init);
476                 plugin_register_shutdown("grpc", c_grpc_shutdown);
477         } /* module_register() */
478 } /* extern "C" */
479
480 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */