9523fc2513dd2cf6804a2fdec8ab2029bbeb04c8
[collectd.git] / src / grpc.cc
1 /**
2  * collectd - src/grpc.cc
3  * Copyright (C) 2015 Sebastian Harl
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Sebastian Harl <sh at tokkee.org>
25  **/
26
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
29
30 #include "collectd.grpc.pb.h"
31
32 extern "C" {
33 #include <stdbool.h>
34 #include <pthread.h>
35
36 #include "collectd.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "plugin.h"
40
41         typedef struct {
42                 char *addr;
43                 char *port;
44         } listener_t;
45
46         static listener_t *listeners;
47         static size_t listeners_num;
48 }
49
50 using google::protobuf::util::TimeUtil;
51
52 /*
53  * proto conversion
54  */
55
56 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
57 {
58         vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
59         vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
60
61         std::string s;
62
63         s = msg.host();
64         if (!s.length())
65                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
66                                 grpc::string("missing host name"));
67         sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
68
69         s = msg.plugin();
70         if (!s.length())
71                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
72                                 grpc::string("missing plugin name"));
73         sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
74
75         s = msg.type();
76         if (!s.length())
77                 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
78                                 grpc::string("missing type name"));
79         sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
80
81         s = msg.plugin_instance();
82         if (s.length())
83                 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
84
85         s = msg.type_instance();
86         if (s.length())
87                 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
88
89         value_t *values = NULL;
90         size_t values_len = 0;
91         auto status = grpc::Status::OK;
92
93         for (auto v : msg.value()) {
94                 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
95                 if (!val) {
96                         status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
97                                         grpc::string("failed to allocate values array"));
98                         break;
99                 }
100
101                 values = val;
102                 val = values + values_len;
103                 values_len++;
104
105                 switch (v.value_case()) {
106                 case collectd::types::Value::ValueCase::kCounter:
107                         val->counter = counter_t(v.counter());
108                         break;
109                 case collectd::types::Value::ValueCase::kGauge:
110                         val->gauge = gauge_t(v.gauge());
111                         break;
112                 case collectd::types::Value::ValueCase::kDerive:
113                         val->derive = derive_t(v.derive());
114                         break;
115                 case collectd::types::Value::ValueCase::kAbsolute:
116                         val->absolute = absolute_t(v.absolute());
117                         break;
118                 default:
119                         status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
120                                         grpc::string("unkown value type"));
121                         break;
122                 }
123
124                 if (!status.ok())
125                         break;
126         }
127         if (status.ok()) {
128                 vl->values = values;
129                 vl->values_len = values_len;
130         }
131         else if (values) {
132                 free(values);
133         }
134
135         return status;
136 } /* unmarshal_value_list() */
137
138 /*
139  * request call objects
140  */
141
142 class Call
143 {
144 public:
145         Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
146                 : service_(service), cq_(cq), status_(CREATE)
147         { }
148
149         virtual ~Call()
150         { }
151
152         void Handle()
153         {
154                 if (status_ == CREATE) {
155                         Create();
156                         status_ = PROCESS;
157                 }
158                 else if (status_ == PROCESS) {
159                         Process();
160                         status_ = FINISH;
161                 }
162                 else {
163                         GPR_ASSERT(status_ == FINISH);
164                         Finish();
165                 }
166         } /* Handle() */
167
168 protected:
169         virtual void Create() = 0;
170         virtual void Process() = 0;
171         virtual void Finish() = 0;
172
173         collectd::Collectd::AsyncService *service_;
174         grpc::ServerCompletionQueue *cq_;
175         grpc::ServerContext ctx_;
176
177 private:
178         enum CallStatus { CREATE, PROCESS, FINISH };
179         CallStatus status_;
180 }; /* class Call */
181
182 class DispatchValuesCall : public Call
183 {
184 public:
185         DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
186                 : Call(service, cq), responder_(&ctx_)
187         {
188                 Handle();
189         } /* DispatchValuesCall() */
190
191         virtual ~DispatchValuesCall()
192         { }
193
194 private:
195         void Create()
196         {
197                 service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
198         } /* Create() */
199
200         void Process()
201         {
202                 // Add a new request object to the queue.
203                 new DispatchValuesCall(service_, cq_);
204
205                 value_list_t vl = VALUE_LIST_INIT;
206                 auto status = unmarshal_value_list(request_.values(), &vl);
207                 if (!status.ok()) {
208                         responder_.Finish(reply_, status, this);
209                         return;
210                 }
211
212                 if (plugin_dispatch_values(&vl))
213                         status = grpc::Status(grpc::StatusCode::INTERNAL,
214                                         grpc::string("failed to enqueue values for writing"));
215
216                 responder_.Finish(reply_, status, this);
217         } /* Process() */
218
219         void Finish()
220         {
221                 delete this;
222         } /* Finish() */
223
224         collectd::DispatchValuesRequest request_;
225         collectd::DispatchValuesReply reply_;
226
227         grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
228 };
229
230 /*
231  * gRPC server implementation
232  */
233
234 class CollectdServer final
235 {
236 public:
237         void Start()
238         {
239                 // TODO: make configurable
240                 auto auth = grpc::InsecureServerCredentials();
241
242                 grpc::ServerBuilder builder;
243
244                 if (!listeners_num) {
245                         std::string default_addr("0.0.0.0:50051");
246                         builder.AddListeningPort(default_addr, auth);
247                         INFO("grpc: Listening on %s", default_addr.c_str());
248                 }
249                 else {
250                         size_t i;
251                         for (i = 0; i < listeners_num; i++) {
252                                 auto l = listeners[i];
253                                 std::string addr(l.addr);
254                                 addr += std::string(":") + std::string(l.port);
255                                 builder.AddListeningPort(addr, auth);
256                                 INFO("grpc: Listening on %s", addr.c_str());
257                         }
258                 }
259
260                 builder.RegisterAsyncService(&service_);
261                 cq_ = builder.AddCompletionQueue();
262                 server_ = builder.BuildAndStart();
263         } /* Start() */
264
265         void Shutdown()
266         {
267                 server_->Shutdown();
268                 cq_->Shutdown();
269         } /* Shutdown() */
270
271         void Mainloop()
272         {
273                 // Register request types.
274                 new DispatchValuesCall(&service_, cq_.get());
275
276                 while (true) {
277                         void *req = NULL;
278                         bool ok = false;
279
280                         if (!cq_->Next(&req, &ok))
281                                 break; // Queue shut down.
282                         if (!ok) {
283                                 ERROR("grpc: Failed to read from queue");
284                                 break;
285                         }
286
287                         static_cast<Call *>(req)->Handle();
288                 }
289         } /* Mainloop() */
290
291 private:
292         collectd::Collectd::AsyncService service_;
293
294         std::unique_ptr<grpc::Server> server_;
295         std::unique_ptr<grpc::ServerCompletionQueue> cq_;
296 }; /* class CollectdServer */
297
298 static CollectdServer *server = nullptr;
299
300 /*
301  * collectd plugin interface
302  */
303
304 extern "C" {
305         static pthread_t *workers;
306         static size_t workers_num = 5;
307
308         static void *worker_thread(void *arg)
309         {
310                 CollectdServer *s = (CollectdServer *)arg;
311                 s->Mainloop();
312                 return NULL;
313         } /* worker_thread() */
314
315         static int c_grpc_config_listen(oconfig_item_t *ci)
316         {
317                 listener_t *listener;
318                 int i;
319
320                 if ((ci->values_num != 2)
321                                 || (ci->values[0].type != OCONFIG_TYPE_STRING)
322                                 || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
323                         ERROR("grpc: The `%s` config option needs exactly "
324                                         "two string argument (address and port).", ci->key);
325                         return -1;
326                 }
327
328                 listener = (listener_t *)realloc(listeners,
329                                 (listeners_num + 1) * sizeof(*listeners));
330                 if (!listener) {
331                         ERROR("grpc: Failed to allocate listeners");
332                         return -1;
333                 }
334                 listeners = listener;
335                 listener = listeners + listeners_num;
336                 listeners_num++;
337
338                 listener->addr = strdup(ci->values[0].value.string);
339                 listener->port = strdup(ci->values[1].value.string);
340
341                 for (i = 0; i < ci->children_num; i++) {
342                         oconfig_item_t *child = ci->children + i;
343                         WARNING("grpc: Option `%s` not allowed in <%s> block.",
344                                         child->key, ci->key);
345                 }
346
347                 return 0;
348         } /* c_grpc_config_listen() */
349
350         static int c_grpc_config(oconfig_item_t *ci)
351         {
352                 int i;
353
354                 for (i = 0; i < ci->children_num; i++) {
355                         oconfig_item_t *child = ci->children + i;
356
357                         if (!strcasecmp("Listen", child->key)) {
358                                 if (c_grpc_config_listen(child))
359                                         return -1;
360                         }
361                         else if (!strcasecmp("WorkerThreads", child->key)) {
362                                 int n;
363                                 if (cf_util_get_int(child, &n))
364                                         return -1;
365                                 workers_num = (size_t)n;
366                         }
367                         else {
368                                 WARNING("grpc: Option `%s` not allowed here.", child->key);
369                         }
370                 }
371
372                 return 0;
373         } /* c_grpc_config() */
374
375         static int c_grpc_init(void)
376         {
377                 server = new CollectdServer();
378                 size_t i;
379
380                 if (! server) {
381                         ERROR("grpc: Failed to create server");
382                         return -1;
383                 }
384
385                 workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
386                 if (! workers) {
387                         delete server;
388                         server = nullptr;
389
390                         ERROR("grpc: Failed to allocate worker threads");
391                         return -1;
392                 }
393
394                 server->Start();
395                 for (i = 0; i < workers_num; i++) {
396                         pthread_create(&workers[i], /* attr = */ NULL,
397                                         worker_thread, server);
398                 }
399                 INFO("grpc: Started %zu workers", workers_num);
400                 return 0;
401         } /* c_grpc_init() */
402
403         static int c_grpc_shutdown(void)
404         {
405                 size_t i;
406
407                 if (!server)
408                         return -1;
409
410                 server->Shutdown();
411
412                 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
413                 for (i = 0; i < workers_num; i++)
414                         pthread_join(workers[i], NULL);
415                 free(workers);
416                 workers = NULL;
417                 workers_num = 0;
418
419                 delete server;
420                 server = nullptr;
421
422                 return 0;
423         } /* c_grpc_shutdown() */
424
425         void module_register(void)
426         {
427                 plugin_register_complex_config("grpc", c_grpc_config);
428                 plugin_register_init("grpc", c_grpc_init);
429                 plugin_register_shutdown("grpc", c_grpc_shutdown);
430         } /* module_register() */
431 } /* extern "C" */
432
433 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */