#include "common.h"
#include "configfile.h"
#include "plugin.h"
+
+#include "daemon/utils_cache.h"
+
+ typedef struct {
+ char *addr;
+ char *port;
+ } listener_t;
+
+ static listener_t *listeners;
+ static size_t listeners_num;
}
+using collectd::Collectd;
+
+using collectd::DispatchValuesRequest;
+using collectd::DispatchValuesReply;
+using collectd::ListValuesRequest;
+using collectd::ListValuesReply;
+
using google::protobuf::util::TimeUtil;
/*
sstrncpy(vl->type, s.c_str(), sizeof(vl->type));
s = msg.plugin_instance();
- if (s.length())
- sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
+ sstrncpy(vl->plugin_instance, s.c_str(), sizeof(vl->plugin_instance));
s = msg.type_instance();
- if (s.length())
- sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
+ sstrncpy(vl->type_instance, s.c_str(), sizeof(vl->type_instance));
value_t *values = NULL;
size_t values_len = 0;
break;
default:
status = grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
- grpc::string("unkown value type"));
+ grpc::string("unknown value type"));
break;
}
} /* unmarshal_value_list() */
/*
- * request call objects
+ * request call-backs and call objects
*/
+static grpc::Status Process(grpc::ServerContext *ctx,
+ DispatchValuesRequest request, DispatchValuesReply *reply)
+{
+ value_list_t vl = VALUE_LIST_INIT;
+ auto status = unmarshal_value_list(request.values(), &vl);
+ if (!status.ok())
+ return status;
+
+ if (plugin_dispatch_values(&vl))
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to enqueue values for writing"));
+ return status;
+} /* Process(): DispatchValues */
+
+static grpc::Status Process(grpc::ServerContext *ctx,
+ ListValuesRequest request, ListValuesReply *reply)
+{
+ char **names = NULL;
+ cdtime_t *times = NULL;
+ size_t i, n = 0;
+
+ if (uc_get_names(&names, ×, &n))
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ grpc::string("failed to retrieve values"));
+
+ for (i = 0; i < n; i++) {
+ auto v = reply->add_value();
+ auto t = TimeUtil::NanosecondsToTimestamp(CDTIME_T_TO_NS(times[i]));
+ v->set_name(names[i]);
+ v->set_allocated_time(new google::protobuf::Timestamp(t));
+ sfree(names[i]);
+ }
+ sfree(names);
+ sfree(times);
+
+ return grpc::Status::OK;
+} /* Process(): ListValues */
+
class Call
{
public:
- Call(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
+ Call(Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
: service_(service), cq_(cq), status_(CREATE)
{ }
virtual void Process() = 0;
virtual void Finish() = 0;
- collectd::Collectd::AsyncService *service_;
+ Collectd::AsyncService *service_;
grpc::ServerCompletionQueue *cq_;
grpc::ServerContext ctx_;
CallStatus status_;
}; /* class Call */
-class DispatchValuesCall : public Call
+template<typename RequestT, typename ReplyT>
+class RpcCall final : public Call
{
+ typedef void (Collectd::AsyncService::*CreatorT)(grpc::ServerContext *,
+ RequestT *, grpc::ServerAsyncResponseWriter<ReplyT> *,
+ grpc::CompletionQueue *, grpc::ServerCompletionQueue *, void *);
+
public:
- DispatchValuesCall(collectd::Collectd::AsyncService *service, grpc::ServerCompletionQueue *cq)
- : Call(service, cq), responder_(&ctx_)
+ RpcCall(Collectd::AsyncService *service,
+ CreatorT creator, grpc::ServerCompletionQueue *cq)
+ : Call(service, cq), creator_(creator), responder_(&ctx_)
{
Handle();
- } /* DispatchValuesCall() */
+ } /* RpcCall() */
- virtual ~DispatchValuesCall()
+ virtual ~RpcCall()
{ }
private:
void Create()
{
- service_->RequestDispatchValues(&ctx_, &request_, &responder_, cq_, cq_, this);
+ (service_->*creator_)(&ctx_, &request_, &responder_, cq_, cq_, this);
} /* Create() */
void Process()
{
// Add a new request object to the queue.
- new DispatchValuesCall(service_, cq_);
-
- value_list_t vl = VALUE_LIST_INIT;
- auto status = unmarshal_value_list(request_.values(), &vl);
- if (!status.ok()) {
- responder_.Finish(reply_, status, this);
- return;
- }
-
- if (plugin_dispatch_values(&vl))
- status = grpc::Status(grpc::StatusCode::INTERNAL,
- grpc::string("failed to enqueue values for writing"));
-
+ new RpcCall<RequestT, ReplyT>(service_, creator_, cq_);
+ grpc::Status status = ::Process(&ctx_, request_, &reply_);
responder_.Finish(reply_, status, this);
} /* Process() */
delete this;
} /* Finish() */
- collectd::DispatchValuesRequest request_;
- collectd::DispatchValuesReply reply_;
+ CreatorT creator_;
+
+ RequestT request_;
+ ReplyT reply_;
- grpc::ServerAsyncResponseWriter<collectd::DispatchValuesReply> responder_;
-};
+ grpc::ServerAsyncResponseWriter<ReplyT> responder_;
+}; /* class RpcCall */
/*
* gRPC server implementation
void Start()
{
// TODO: make configurable
- std::string addr("0.0.0.0:50051");
-
- // TODO: make configurable
auto auth = grpc::InsecureServerCredentials();
grpc::ServerBuilder builder;
- builder.AddListeningPort(addr, auth);
- builder.RegisterAsyncService(&service_);
+
+ if (!listeners_num) {
+ std::string default_addr("0.0.0.0:50051");
+ builder.AddListeningPort(default_addr, auth);
+ INFO("grpc: Listening on %s", default_addr.c_str());
+ }
+ else {
+ size_t i;
+ for (i = 0; i < listeners_num; i++) {
+ auto l = listeners[i];
+ std::string addr(l.addr);
+ addr += std::string(":") + std::string(l.port);
+ builder.AddListeningPort(addr, auth);
+ INFO("grpc: Listening on %s", addr.c_str());
+ }
+ }
+
+ builder.RegisterService(&service_);
cq_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
-
- INFO("grpc: Listening on %s", addr.c_str());
} /* Start() */
void Shutdown()
void Mainloop()
{
// Register request types.
- new DispatchValuesCall(&service_, cq_.get());
+ new RpcCall<DispatchValuesRequest, DispatchValuesReply>(&service_,
+ &Collectd::AsyncService::RequestDispatchValues, cq_.get());
+ new RpcCall<ListValuesRequest, ListValuesReply>(&service_,
+ &Collectd::AsyncService::RequestListValues, cq_.get());
while (true) {
void *req = NULL;
} /* Mainloop() */
private:
- collectd::Collectd::AsyncService service_;
+ Collectd::AsyncService service_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<grpc::ServerCompletionQueue> cq_;
extern "C" {
static pthread_t *workers;
- static size_t workers_num;
+ static size_t workers_num = 5;
static void *worker_thread(void *arg)
{
return NULL;
} /* worker_thread() */
+ static int c_grpc_config_listen(oconfig_item_t *ci)
+ {
+ listener_t *listener;
+ int i;
+
+ if ((ci->values_num != 2)
+ || (ci->values[0].type != OCONFIG_TYPE_STRING)
+ || (ci->values[1].type != OCONFIG_TYPE_STRING)) {
+ ERROR("grpc: The `%s` config option needs exactly "
+ "two string argument (address and port).", ci->key);
+ return -1;
+ }
+
+ listener = (listener_t *)realloc(listeners,
+ (listeners_num + 1) * sizeof(*listeners));
+ if (!listener) {
+ ERROR("grpc: Failed to allocate listeners");
+ return -1;
+ }
+ listeners = listener;
+ listener = listeners + listeners_num;
+ listeners_num++;
+
+ listener->addr = strdup(ci->values[0].value.string);
+ listener->port = strdup(ci->values[1].value.string);
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
+
+ return 0;
+ } /* c_grpc_config_listen() */
+
+ static int c_grpc_config(oconfig_item_t *ci)
+ {
+ int i;
+
+ for (i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (!strcasecmp("Listen", child->key)) {
+ if (c_grpc_config_listen(child))
+ return -1;
+ }
+ else if (!strcasecmp("WorkerThreads", child->key)) {
+ int n;
+ if (cf_util_get_int(child, &n))
+ return -1;
+ workers_num = (size_t)n;
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed here.", child->key);
+ }
+ }
+
+ return 0;
+ } /* c_grpc_config() */
+
static int c_grpc_init(void)
{
server = new CollectdServer();
return -1;
}
- workers = (pthread_t *)calloc(5, sizeof(*workers));
+ workers = (pthread_t *)calloc(workers_num, sizeof(*workers));
if (! workers) {
delete server;
server = nullptr;
ERROR("grpc: Failed to allocate worker threads");
return -1;
}
- workers_num = 5;
server->Start();
for (i = 0; i < workers_num; i++) {
- pthread_create(&workers[i], /* attr = */ NULL,
+ plugin_thread_create(&workers[i], /* attr = */ NULL,
worker_thread, server);
}
INFO("grpc: Started %zu workers", workers_num);
void module_register(void)
{
+ plugin_register_complex_config("grpc", c_grpc_config);
plugin_register_init("grpc", c_grpc_init);
plugin_register_shutdown("grpc", c_grpc_shutdown);
} /* module_register() */