grpc plugin: Split out identifier into it's own message type.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include "collectd.grpc.pb.h"
31
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
35
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40
41 #include "daemon/utils_cache.h"
42
43         typedef struct {
44                 char *addr;
45                 char *port;
46         } listener_t;
47
48         static listener_t *listeners;
49         static size_t listeners_num;
50 }
51
52 using collectd::Collectd;
53
54 using collectd::DispatchValuesRequest;
55 using collectd::DispatchValuesReply;
56 using collectd::ListValuesRequest;
57 using collectd::ListValuesReply;
58
59 using google::protobuf::util::TimeUtil;
60
61 /*
62  * proto conversion
63  */
64
65 static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg, value_list_t *vl)
66 {
67         std::string s;
68
69         s = msg.host();
70         if (!s.length())
71                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
72                                 grpc::string("missing host name"));
73         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
74
75         s = msg.plugin();
76         if (!s.length())
77                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
78                                 grpc::string("missing plugin name"));
79         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
80
81         s = msg.type();
82         if (!s.length())
83                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
84                                 grpc::string("missing type name"));
85         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
86
87         s = msg.plugin_instance();
88         sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
89
90         s = msg.type_instance();
91         sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
92
93         return grpc::Status::OK;
94 } /* unmarshal_ident() */
95
96 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
97 {
98         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
99         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
100
101         auto status = unmarshal_ident(msg.identifier(), vl);
102         if (!status.ok())
103                 return status;
104
105         value_t *values = NULL;
106         size_t values_len = 0;
107         status = grpc::Status::OK;
108
109         for (auto v : msg.value()) {
110                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
111                 if (!val) {
112                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
113                                         grpc::string("failed to allocate values array"));
114                         break;
115                 }
116
117                 values = val;
118                 val = values + values_len;
119                 values_len++;
120
121                 switch (v.value_case()) {
122                 case collectd::types::Value::ValueCase::kCounter:
123                         val->counter = counter_t(v.counter());
124                         break;
125                 case collectd::types::Value::ValueCase::kGauge:
126                         val->gauge = gauge_t(v.gauge());
127                         break;
128                 case collectd::types::Value::ValueCase::kDerive:
129                         val->derive = derive_t(v.derive());
130                         break;
131                 case collectd::types::Value::ValueCase::kAbsolute:
132                         val->absolute = absolute_t(v.absolute());
133                         break;
134                 default:
135                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
136                                         grpc::string("unknown value type"));
137                         break;
138                 }
139
140                 if (!status.ok())
141                         break;
142         }
143         if (status.ok()) {
144                 vl->values = values;
145                 vl->values_len = values_len;
146         }
147         else if (values) {
148                 free(values);
149         }
150
151         return status;
152 } /* unmarshal_value_list() */
153
154 /*
155  * request call-backs and call objects
156  */
157
158 static grpc::Status Process(grpc::ServerContext *ctx,
159                 DispatchValuesRequest request, DispatchValuesReply *reply)
160 {
161         value_list_t vl = VALUE_LIST_INIT;
162         auto status = unmarshal_value_list(request.values(), &vl);
163         if (!status.ok())
164                 return status;
165
166         if (plugin_dispatch_values(&vl))
167                 status = grpc::Status(grpc::StatusCode::INTERNAL,
168                                 grpc::string("failed to enqueue values for writing"));
169         return status;
170 } /* Process(): DispatchValues */
171
172 static grpc::Status Process(grpc::ServerContext *ctx,
173                 ListValuesRequest request, ListValuesReply *reply)
174 {
175         char **names = NULL;
176         cdtime_t *times = NULL;
177         size_t i, n = 0;
178
179         if (uc_get_names(&names, &times, &n))
180                 return grpc::Status(grpc::StatusCode::INTERNAL,
181                                 grpc::string("failed to retrieve values"));
182
183         for (i = 0; i < n; i++) {
184                 auto v = reply->add_value();
185                 auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
186                 v->set_name(names[i]);
187                 v->set_allocated_time(new google::protobuf::Timestamp(t));
188                 sfree(names[i]);
189         }
190         sfree(names);
191         sfree(times);
192
193         return grpc::Status::OK;
194 } /* Process(): ListValues */
195
196 class Call
197 {
198 public:
199         Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
200                 : service_(service), cq_(cq), status_(CREATE)
201         { }
202
203         virtual ~Call()
204         { }
205
206         void Handle()
207         {
208                 if (status_ == CREATE) {
209                         Create();
210                         status_ = PROCESS;
211                 }
212                 else if (status_ == PROCESS) {
213                         Process();
214                         status_ = FINISH;
215                 }
216                 else {
217                         GPR_ASSERT(status_ == FINISH);
218                         Finish();
219                 }
220         } /* Handle() */
221
222 protected:
223         virtual void Create() = 0;
224         virtual void Process() = 0;
225         virtual void Finish() = 0;
226
227         Collectd::AsyncService *service_;
228         grpc::ServerCompletionQueue *cq_;
229         grpc::ServerContext ctx_;
230
231 private:
232         enum CallStatus { CREATE, PROCESS, FINISH };
233         CallStatus status_;
234 }; /* class Call */
235
236 template<typename RequestT, typename ReplyT>
237 class RpcCall final : public Call
238 {
239         typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
240                         RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
241                         grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
242
243 public:
244         RpcCall(Collectd::AsyncService *service,
245                         CreatorT creator, grpc::ServerCompletionQueue *cq)
246                 : Call(service, cq), creator_(creator), responder_(&ctx_)
247         {
248                 Handle();
249         } /* RpcCall() */
250
251         virtual ~RpcCall()
252         { }
253
254 private:
255         void Create()
256         {
257                 (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
258         } /* Create() */
259
260         void Process()
261         {
262                 // Add a new request object to the queue.
263                 new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
264                 grpc::Status status = ::Process(&ctx_, request_, &reply_);
265                 responder_.Finish(reply_, status, this);
266         } /* Process() */
267
268         void Finish()
269         {
270                 delete this;
271         } /* Finish() */
272
273         CreatorT creator_;
274
275         RequestT request_;
276         ReplyT reply_;
277
278         grpc::ServerAsyncResponseWriter<ReplyT> responder_;
279 }; /* class RpcCall */
280
281 /*
282  * gRPC server implementation
283  */
284
285 class CollectdServer final
286 {
287 public:
288         void Start()
289         {
290                 // TODO: make configurable
291                 auto auth = grpc::InsecureServerCredentials();
292
293                 grpc::ServerBuilder builder;
294
295                 if (!listeners_num) {
296                         std::string default_addr("0.0.0.0:50051");
297                         builder.AddListeningPort(default_addr, auth);
298                         INFO("grpc: Listening on %s", default_addr.c_str());
299                 }
300                 else {
301                         size_t i;
302                         for (i = 0; i < listeners_num; i++) {
303                                 auto l = listeners[i];
304                                 std::string addr(l.addr);
305                                 addr += std::string(":") + std::string(l.port);
306                                 builder.AddListeningPort(addr, auth);
307                                 INFO("grpc: Listening on %s", addr.c_str());
308                         }
309                 }
310
311                 builder.RegisterService(&service_);
312                 cq_ = builder.AddCompletionQueue();
313                 server_ = builder.BuildAndStart();
314         } /* Start() */
315
316         void Shutdown()
317         {
318                 server_->Shutdown();
319                 cq_->Shutdown();
320         } /* Shutdown() */
321
322         void Mainloop()
323         {
324                 // Register request types.
325                 new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
326                                 &Collectd::AsyncService::RequestDispatchValues, cq_.get());
327                 new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
328                                 &Collectd::AsyncService::RequestListValues, cq_.get());
329
330                 while (true) {
331                         void *req = NULL;
332                         bool ok = false;
333
334                         if (!cq_->Next(&req, &ok))
335                                 break; // Queue shut down.
336                         if (!ok) {
337                                 ERROR("grpc: Failed to read from queue");
338                                 break;
339                         }
340
341                         static_cast<Call *>(req)->Handle();
342                 }
343         } /* Mainloop() */
344
345 private:
346         Collectd::AsyncService service_;
347
348         std::unique_ptr<grpc::Server> server_;
349         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
350 }; /* class CollectdServer */
351
352 static CollectdServer *server = nullptr;
353
354 /*
355  * collectd plugin interface
356  */
357
358 extern "C" {
359         static pthread_t *workers;
360         static size_t workers_num = 5;
361
362         static void *worker_thread(void *arg)
363         {
364                 CollectdServer *s = (CollectdServer *)arg;
365                 s->Mainloop();
366                 return NULL;
367         } /* worker_thread() */
368
369         static int c_grpc_config_listen(oconfig_item_t *ci)
370         {
371                 listener_t *listener;
372                 int i;
373
374                 if ((ci->values_num != 2)
375                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
376                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
377                         ERROR("grpc: The `%s` config option needs exactly "
378                                         "two string argument (address and port).", ci->key);
379                         return -1;
380                 }
381
382                 listener = (listener_t *)realloc(listeners,
383                                 (listeners_num + 1) * sizeof(*listeners));
384                 if (!listener) {
385                         ERROR("grpc: Failed to allocate listeners");
386                         return -1;
387                 }
388                 listeners = listener;
389                 listener = listeners + listeners_num;
390                 listeners_num++;
391
392                 listener->addr = strdup(ci->values[0].value.string);
393                 listener->port = strdup(ci->values[1].value.string);
394
395                 for (i = 0; i < ci->children_num; i++) {
396                         oconfig_item_t *child = ci->children + i;
397                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
398                                         child->key, ci->key);
399                 }
400
401                 return 0;
402         } /* c_grpc_config_listen() */
403
404         static int c_grpc_config(oconfig_item_t *ci)
405         {
406                 int i;
407
408                 for (i = 0; i < ci->children_num; i++) {
409                         oconfig_item_t *child = ci->children + i;
410
411                         if (!strcasecmp("Listen", child->key)) {
412                                 if (c_grpc_config_listen(child))
413                                         return -1;
414                         }
415                         else if (!strcasecmp("WorkerThreads", child->key)) {
416                                 int n;
417                                 if (cf_util_get_int(child, &n))
418                                         return -1;
419                                 workers_num = (size_t)n;
420                         }
421                         else {
422                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
423                         }
424                 }
425
426                 return 0;
427         } /* c_grpc_config() */
428
429         static int c_grpc_init(void)
430         {
431                 server = new CollectdServer();
432                 size_t i;
433
434                 if (! server) {
435                         ERROR("grpc: Failed to create server");
436                         return -1;
437                 }
438
439                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
440                 if (! workers) {
441                         delete server;
442                         server = nullptr;
443
444                         ERROR("grpc: Failed to allocate worker threads");
445                         return -1;
446                 }
447
448                 server->Start();
449                 for (i = 0; i < workers_num; i++) {
450                         plugin_thread_create(&workers[i], /* attr = */ NULL,
451                                         worker_thread, server);
452                 }
453                 INFO("grpc: Started %zu workers", workers_num);
454                 return 0;
455         } /* c_grpc_init() */
456
457         static int c_grpc_shutdown(void)
458         {
459                 size_t i;
460
461                 if (!server)
462                         return -1;
463
464                 server->Shutdown();
465
466                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
467                 for (i = 0; i < workers_num; i++)
468                         pthread_join(workers[i], NULL);
469                 free(workers);
470                 workers = NULL;
471                 workers_num = 0;
472
473                 delete server;
474                 server = nullptr;
475
476                 return 0;
477         } /* c_grpc_shutdown() */
478
479         void module_register(void)
480         {
481                 plugin_register_complex_config("grpc", c_grpc_config);
482                 plugin_register_init("grpc", c_grpc_init);
483                 plugin_register_shutdown("grpc", c_grpc_shutdown);
484         } /* module_register() */
485 } /* extern "C" */
486
487 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */