2 * collectd - src/grpc.cc
3 * Copyright (C) 2015 Sebastian Harl
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21 * DEALINGS IN THE SOFTWARE.
24 * Sebastian Harl <sh at tokkee.org>
27 #include <grpc++/grpc++.h>
28 #include <google/protobuf/util/time_util.h>
30 #include "collectd.grpc.pb.h"
38 #include "configfile.h"
42 using google::protobuf::util::TimeUtil;
48 static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg, value_list_t *vl)
50 vl->time = NS_TO_CDTIME_T(TimeUtil::TimestampToNanoseconds(msg.time()));
51 vl->interval = NS_TO_CDTIME_T(TimeUtil::DurationToNanoseconds(msg.interval()));
57 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
58 grpc::string("missing host name"));
59 sstrncpy(vl->host, s.c_str(), sizeof(vl->host));
63 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
64 grpc::string("missing plugin name"));
65 sstrncpy(vl->plugin, s.c_str(), sizeof(vl->plugin));
69 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
70 grpc::string("missing type name"));
71 sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
73 s = msg.plugin_instance();
75 sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
77 s = msg.type_instance();
79 sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
81 value_t *values = NULL;
82 size_t values_len = 0;
83 auto status = grpc::Status::OK;
85 for (auto v : msg.value()) {
86 value_t *val = (value_t *)realloc(values, (values_len + 1) * sizeof(*values));
88 status = grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
89 grpc::string("failed to allocate values array"));
94 val = values + values_len;
97 switch (v.value_case()) {
98 case collectd::types::Value::ValueCase::kCounter:
99 val->counter = counter_t(v.counter());
101 case collectd::types::Value::ValueCase::kGauge:
102 val->gauge = gauge_t(v.gauge());
104 case collectd::types::Value::ValueCase::kDerive:
105 val->derive = derive_t(v.derive());
107 case collectd::types::Value::ValueCase::kAbsolute:
108 val->absolute = absolute_t(v.absolute());
111 status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
112 grpc::string("unkown value type"));
121 vl->values_len = values_len;
128 } /* unmarshal_value_list() */
131 * request call objects
137 Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
138 : service_(service), cq_(cq), status_(CREATE)
146 if (status_ == CREATE) {
150 else if (status_ == PROCESS) {
155 GPR_ASSERT(status_ == FINISH);
161 virtual void Create() = 0;
162 virtual void Process() = 0;
163 virtual void Finish() = 0;
165 collectd::Collectd::AsyncService *service_;
166 grpc::ServerCompletionQueue *cq_;
167 grpc::ServerContext ctx_;
170 enum CallStatus { CREATE, PROCESS, FINISH };
174 class DispatchValuesCall : public Call
177 DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
178 : Call(service, cq), responder_(&ctx_)
181 } /* DispatchValuesCall() */
183 virtual ~DispatchValuesCall()
189 service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
194 // Add a new request object to the queue.
195 new DispatchValuesCall(service_, cq_);
197 value_list_t vl = VALUE_LIST_INIT;
198 auto status = unmarshal_value_list(request_.values(), &vl);
200 responder_.Finish(reply_, status, this);
204 if (plugin_dispatch_values(&vl))
205 status = grpc::Status(grpc::StatusCode::INTERNAL,
206 grpc::string("failed to enqueue values for writing"));
208 responder_.Finish(reply_, status, this);
216 collectd::DispatchValuesRequest request_;
217 collectd::DispatchValuesReply reply_;
219 grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
223 * gRPC server implementation
226 class CollectdServer final
231 // TODO: make configurable
232 std::string addr("0.0.0.0:50051");
234 // TODO: make configurable
235 auto auth = grpc::InsecureServerCredentials();
237 grpc::ServerBuilder builder;
238 builder.AddListeningPort(addr, auth);
239 builder.RegisterAsyncService(&service_);
240 cq_ = builder.AddCompletionQueue();
241 server_ = builder.BuildAndStart();
243 INFO("grpc: Listening on %s", addr.c_str());
254 // Register request types.
255 new DispatchValuesCall(&service_, cq_.get());
261 if (!cq_->Next(&req, &ok))
262 break; // Queue shut down.
264 ERROR("grpc: Failed to read from queue");
268 static_cast<Call *>(req)->Handle();
273 collectd::Collectd::AsyncService service_;
275 std::unique_ptr<grpc::Server> server_;
276 std::unique_ptr<grpc::ServerCompletionQueue> cq_;
277 }; /* class CollectdServer */
279 static CollectdServer *server = nullptr;
282 * collectd plugin interface
286 static pthread_t *workers;
287 static size_t workers_num;
289 static void *worker_thread(void *arg)
291 CollectdServer *s = (CollectdServer *)arg;
294 } /* worker_thread() */
296 static int c_grpc_init(void)
298 server = new CollectdServer();
302 ERROR("grpc: Failed to create server");
306 workers = (pthread_t *)calloc(5, sizeof(*workers));
311 ERROR("grpc: Failed to allocate worker threads");
317 for (i = 0; i < workers_num; i++) {
318 pthread_create(&workers[i], /* attr = */ NULL,
319 worker_thread, server);
321 INFO("grpc: Started %zu workers", workers_num);
323 } /* c_grpc_init() */
325 static int c_grpc_shutdown(void)
334 INFO("grpc: Waiting for %zu workers to terminate", workers_num);
335 for (i = 0; i < workers_num; i++)
336 pthread_join(workers[i], NULL);
345 } /* c_grpc_shutdown() */
347 void module_register(void)
349 plugin_register_init("grpc", c_grpc_init);
350 plugin_register_shutdown("grpc", c_grpc_shutdown);
351 } /* module_register() */
354 /* vim: set sw=4 ts=4 tw=78 noexpandtab : */