grpc plugin: Implement the ListValues() RPC.
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include "collectd.grpc.pb.h"
31
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
35
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40
41 #include "daemon/utils_cache.h"
42
43         typedef struct {
44                 char *addr;
45                 char *port;
46         } listener_t;
47
48         static listener_t *listeners;
49         static size_t listeners_num;
50 }
51
52 using google::protobuf::util::TimeUtil;
53
54 /*
55  * proto conversion
56  */
57
58 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
59 {
60         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
61         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
62
63         std::string s;
64
65         s = msg.host();
66         if (!s.length())
67                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
68                                 grpc::string("missing host name"));
69         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
70
71         s = msg.plugin();
72         if (!s.length())
73                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
74                                 grpc::string("missing plugin name"));
75         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
76
77         s = msg.type();
78         if (!s.length())
79                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
80                                 grpc::string("missing type name"));
81         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
82
83         s = msg.plugin_instance();
84         if (s.length())
85                 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
86
87         s = msg.type_instance();
88         if (s.length())
89                 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
90
91         value_t *values = NULL;
92         size_t values_len = 0;
93         auto status = grpc::Status::OK;
94
95         for (auto v : msg.value()) {
96                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
97                 if (!val) {
98                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
99                                         grpc::string("failed to allocate values array"));
100                         break;
101                 }
102
103                 values = val;
104                 val = values + values_len;
105                 values_len++;
106
107                 switch (v.value_case()) {
108                 case collectd::types::Value::ValueCase::kCounter:
109                         val->counter = counter_t(v.counter());
110                         break;
111                 case collectd::types::Value::ValueCase::kGauge:
112                         val->gauge = gauge_t(v.gauge());
113                         break;
114                 case collectd::types::Value::ValueCase::kDerive:
115                         val->derive = derive_t(v.derive());
116                         break;
117                 case collectd::types::Value::ValueCase::kAbsolute:
118                         val->absolute = absolute_t(v.absolute());
119                         break;
120                 default:
121                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
122                                         grpc::string("unkown value type"));
123                         break;
124                 }
125
126                 if (!status.ok())
127                         break;
128         }
129         if (status.ok()) {
130                 vl->values = values;
131                 vl->values_len = values_len;
132         }
133         else if (values) {
134                 free(values);
135         }
136
137         return status;
138 } /* unmarshal_value_list() */
139
140 /*
141  * request call objects
142  */
143
144 class Call
145 {
146 public:
147         Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
148                 : service_(service), cq_(cq), status_(CREATE)
149         { }
150
151         virtual ~Call()
152         { }
153
154         void Handle()
155         {
156                 if (status_ == CREATE) {
157                         Create();
158                         status_ = PROCESS;
159                 }
160                 else if (status_ == PROCESS) {
161                         Process();
162                         status_ = FINISH;
163                 }
164                 else {
165                         GPR_ASSERT(status_ == FINISH);
166                         Finish();
167                 }
168         } /* Handle() */
169
170 protected:
171         virtual void Create() = 0;
172         virtual void Process() = 0;
173         virtual void Finish() = 0;
174
175         collectd::Collectd::AsyncService *service_;
176         grpc::ServerCompletionQueue *cq_;
177         grpc::ServerContext ctx_;
178
179 private:
180         enum CallStatus { CREATE, PROCESS, FINISH };
181         CallStatus status_;
182 }; /* class Call */
183
184 class DispatchValuesCall : public Call
185 {
186 public:
187         DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
188                 : Call(service, cq), responder_(&ctx_)
189         {
190                 Handle();
191         } /* DispatchValuesCall() */
192
193         virtual ~DispatchValuesCall()
194         { }
195
196 private:
197         void Create()
198         {
199                 service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
200         } /* Create() */
201
202         void Process()
203         {
204                 // Add a new request object to the queue.
205                 new DispatchValuesCall(service_, cq_);
206
207                 value_list_t vl = VALUE_LIST_INIT;
208                 auto status = unmarshal_value_list(request_.values(), &vl);
209                 if (!status.ok()) {
210                         responder_.Finish(reply_, status, this);
211                         return;
212                 }
213
214                 if (plugin_dispatch_values(&vl))
215                         status = grpc::Status(grpc::StatusCode::INTERNAL,
216                                         grpc::string("failed to enqueue values for writing"));
217
218                 responder_.Finish(reply_, status, this);
219         } /* Process() */
220
221         void Finish()
222         {
223                 delete this;
224         } /* Finish() */
225
226         collectd::DispatchValuesRequest request_;
227         collectd::DispatchValuesReply reply_;
228
229         grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
230 }; /* class DispatchValuesCall */
231
232 class ListValuesCall : public Call
233 {
234 public:
235         ListValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
236                 : Call(service, cq), responder_(&ctx_)
237         {
238                 Handle();
239         } /* ListValuesCall() */
240
241         virtual ~ListValuesCall()
242         { }
243
244 private:
245         void Create()
246         {
247                 service_->RequestListValues(&ctx_, &request_, &responder_, cq_, cq_, this);
248         } /* Create() */
249
250         void Process()
251         {
252                 new ListValuesCall(service_, cq_);
253
254                 char **names = NULL;
255                 cdtime_t *times = NULL;
256                 size_t i, n = 0;
257
258                 auto status = grpc::Status::OK;
259                 if (uc_get_names(&names, &times, &n)) {
260                         status = grpc::Status(grpc::StatusCode::INTERNAL,
261                                         grpc::string("failed to retrieve values"));
262                 }
263
264                 for (i = 0; i < n; i++) {
265                         auto v = reply_.add_value();
266                         auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
267                         v->set_name(names[i]);
268                         v->set_allocated_time(new google::protobuf::Timestamp(t));
269                         sfree(names[i]);
270                 }
271                 sfree(names);
272                 sfree(times);
273
274                 responder_.Finish(reply_, status, this);
275         } /* Process() */
276
277         void Finish()
278         {
279                 delete this;
280         } /* Finish() */
281
282         collectd::ListValuesRequest request_;
283         collectd::ListValuesReply reply_;
284
285         grpc::ServerAsyncResponseWriter<collectd::ListValuesReply> responder_;
286 }; /* class ListValuesCall */
287
288 /*
289  * gRPC server implementation
290  */
291
292 class CollectdServer final
293 {
294 public:
295         void Start()
296         {
297                 // TODO: make configurable
298                 auto auth = grpc::InsecureServerCredentials();
299
300                 grpc::ServerBuilder builder;
301
302                 if (!listeners_num) {
303                         std::string default_addr("0.0.0.0:50051");
304                         builder.AddListeningPort(default_addr, auth);
305                         INFO("grpc: Listening on %s", default_addr.c_str());
306                 }
307                 else {
308                         size_t i;
309                         for (i = 0; i < listeners_num; i++) {
310                                 auto l = listeners[i];
311                                 std::string addr(l.addr);
312                                 addr += std::string(":") + std::string(l.port);
313                                 builder.AddListeningPort(addr, auth);
314                                 INFO("grpc: Listening on %s", addr.c_str());
315                         }
316                 }
317
318                 builder.RegisterAsyncService(&service_);
319                 cq_ = builder.AddCompletionQueue();
320                 server_ = builder.BuildAndStart();
321         } /* Start() */
322
323         void Shutdown()
324         {
325                 server_->Shutdown();
326                 cq_->Shutdown();
327         } /* Shutdown() */
328
329         void Mainloop()
330         {
331                 // Register request types.
332                 new DispatchValuesCall(&service_, cq_.get());
333                 new ListValuesCall(&service_, cq_.get());
334
335                 while (true) {
336                         void *req = NULL;
337                         bool ok = false;
338
339                         if (!cq_->Next(&req, &ok))
340                                 break; // Queue shut down.
341                         if (!ok) {
342                                 ERROR("grpc: Failed to read from queue");
343                                 break;
344                         }
345
346                         static_cast<Call *>(req)->Handle();
347                 }
348         } /* Mainloop() */
349
350 private:
351         collectd::Collectd::AsyncService service_;
352
353         std::unique_ptr<grpc::Server> server_;
354         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
355 }; /* class CollectdServer */
356
357 static CollectdServer *server = nullptr;
358
359 /*
360  * collectd plugin interface
361  */
362
363 extern "C" {
364         static pthread_t *workers;
365         static size_t workers_num = 5;
366
367         static void *worker_thread(void *arg)
368         {
369                 CollectdServer *s = (CollectdServer *)arg;
370                 s->Mainloop();
371                 return NULL;
372         } /* worker_thread() */
373
374         static int c_grpc_config_listen(oconfig_item_t *ci)
375         {
376                 listener_t *listener;
377                 int i;
378
379                 if ((ci->values_num != 2)
380                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
381                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
382                         ERROR("grpc: The `%s` config option needs exactly "
383                                         "two string argument (address and port).", ci->key);
384                         return -1;
385                 }
386
387                 listener = (listener_t *)realloc(listeners,
388                                 (listeners_num + 1) * sizeof(*listeners));
389                 if (!listener) {
390                         ERROR("grpc: Failed to allocate listeners");
391                         return -1;
392                 }
393                 listeners = listener;
394                 listener = listeners + listeners_num;
395                 listeners_num++;
396
397                 listener->addr = strdup(ci->values[0].value.string);
398                 listener->port = strdup(ci->values[1].value.string);
399
400                 for (i = 0; i < ci->children_num; i++) {
401                         oconfig_item_t *child = ci->children + i;
402                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
403                                         child->key, ci->key);
404                 }
405
406                 return 0;
407         } /* c_grpc_config_listen() */
408
409         static int c_grpc_config(oconfig_item_t *ci)
410         {
411                 int i;
412
413                 for (i = 0; i < ci->children_num; i++) {
414                         oconfig_item_t *child = ci->children + i;
415
416                         if (!strcasecmp("Listen", child->key)) {
417                                 if (c_grpc_config_listen(child))
418                                         return -1;
419                         }
420                         else if (!strcasecmp("WorkerThreads", child->key)) {
421                                 int n;
422                                 if (cf_util_get_int(child, &n))
423                                         return -1;
424                                 workers_num = (size_t)n;
425                         }
426                         else {
427                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
428                         }
429                 }
430
431                 return 0;
432         } /* c_grpc_config() */
433
434         static int c_grpc_init(void)
435         {
436                 server = new CollectdServer();
437                 size_t i;
438
439                 if (! server) {
440                         ERROR("grpc: Failed to create server");
441                         return -1;
442                 }
443
444                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
445                 if (! workers) {
446                         delete server;
447                         server = nullptr;
448
449                         ERROR("grpc: Failed to allocate worker threads");
450                         return -1;
451                 }
452
453                 server->Start();
454                 for (i = 0; i < workers_num; i++) {
455                         pthread_create(&workers[i], /* attr = */ NULL,
456                                         worker_thread, server);
457                 }
458                 INFO("grpc: Started %zu workers", workers_num);
459                 return 0;
460         } /* c_grpc_init() */
461
462         static int c_grpc_shutdown(void)
463         {
464                 size_t i;
465
466                 if (!server)
467                         return -1;
468
469                 server->Shutdown();
470
471                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
472                 for (i = 0; i < workers_num; i++)
473                         pthread_join(workers[i], NULL);
474                 free(workers);
475                 workers = NULL;
476                 workers_num = 0;
477
478                 delete server;
479                 server = nullptr;
480
481                 return 0;
482         } /* c_grpc_shutdown() */
483
484         void module_register(void)
485         {
486                 plugin_register_complex_config("grpc", c_grpc_config);
487                 plugin_register_init("grpc", c_grpc_init);
488                 plugin_register_shutdown("grpc", c_grpc_shutdown);
489         } /* module_register() */
490 } /* extern "C" */
491
492 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */