#include <grpc++/grpc++.h>
#include <google/protobuf/util/time_util.h>
+#include <fstream>
+#include <iostream>
+#include <vector>
+
#include "collectd.grpc.pb.h"
extern "C" {
#include <fnmatch.h>
#include <stdbool.h>
-#include <pthread.h>
#include "collectd.h"
#include "common.h"
#include "plugin.h"
#include "daemon/utils_cache.h"
-
- typedef struct {
- char *addr;
- char *port;
- } listener_t;
-
- static listener_t *listeners;
- static size_t listeners_num;
}
using collectd::Collectd;
using google::protobuf::util::TimeUtil;
/*
+ * private types
+ */
+
+struct Listener {
+ grpc::string addr;
+ grpc::string port;
+
+ grpc::SslServerCredentialsOptions *ssl;
+};
+static std::vector<Listener> listeners;
+static grpc::string default_addr("0.0.0.0:50051");
+
+/*
* helper functions
*/
return true;
} /* ident_matches */
+static grpc::string read_file(const char *filename)
+{
+ std::ifstream f;
+ grpc::string s, content;
+
+ f.open(filename);
+ if (!f.is_open()) {
+ ERROR("grpc: Failed to open '%s'", filename);
+ return "";
+ }
+
+ while (std::getline(f, s)) {
+ content += s;
+ content.push_back('\n');
+ }
+ f.close();
+ return content;
+} /* read_file */
+
/*
* proto conversion
*/
grpc::string("failed to query values: cannot create iterator"));
}
+ status = grpc::Status::OK;
while (uc_iterator_next(iter, &name) == 0) {
value_list_t res;
- if (parse_identifier_vl(name, &res) != 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
+ if (parse_identifier_vl(name, &res) != 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to parse identifier"));
+ break;
+ }
if (!ident_matches(&res, &matcher))
continue;
- if (uc_iterator_get_time(iter, &res.time) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
+ if (uc_iterator_get_time(iter, &res.time) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to retrieve value timestamp"));
- if (uc_iterator_get_interval(iter, &res.interval) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
+ break;
+ }
+ if (uc_iterator_get_interval(iter, &res.interval) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to retrieve value interval"));
- if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0)
- return grpc::Status(grpc::StatusCode::INTERNAL,
+ break;
+ }
+ if (uc_iterator_get_values(iter, &res.values, &res.values_len) < 0) {
+ status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to retrieve values"));
+ break;
+ }
auto vl = reply->add_values();
status = marshal_value_list(&res, vl);
free(res.values);
if (!status.ok())
- return status;
+ break;
}
uc_iterator_destroy(iter);
- return grpc::Status::OK;
+ return status;
} /* Process(): QueryValues */
class Call
public:
void Start()
{
- // TODO: make configurable
auto auth = grpc::InsecureServerCredentials();
grpc::ServerBuilder builder;
- if (!listeners_num) {
- std::string default_addr("0.0.0.0:50051");
+ if (listeners.empty()) {
builder.AddListeningPort(default_addr, auth);
INFO("grpc: Listening on %s", default_addr.c_str());
}
else {
- size_t i;
- for (i = 0; i < listeners_num; i++) {
- auto l = listeners[i];
- std::string addr(l.addr);
- addr += std::string(":") + std::string(l.port);
- builder.AddListeningPort(addr, auth);
- INFO("grpc: Listening on %s", addr.c_str());
+ for (auto l : listeners) {
+ grpc::string addr = l.addr + ":" + l.port;
+
+ auto use_ssl = grpc::string("");
+ auto a = auth;
+ if (l.ssl != nullptr) {
+ use_ssl = grpc::string(" (SSL enabled)");
+ a = grpc::SslServerCredentials(*l.ssl);
+ }
+
+ builder.AddListeningPort(addr, a);
+ INFO("grpc: Listening on %s%s", addr.c_str(), use_ssl.c_str());
}
}
static int c_grpc_config_listen(oconfig_item_t *ci)
{
- listener_t *listener;
- int i;
-
if ((ci->values_num != 2)
|| (ci->values[0].type != OCONFIG_TYPE_STRING)
|| (ci->values[1].type != OCONFIG_TYPE_STRING)) {
return -1;
}
- listener = (listener_t *)realloc(listeners,
- (listeners_num + 1) * sizeof(*listeners));
- if (!listener) {
- ERROR("grpc: Failed to allocate listeners");
- return -1;
- }
- listeners = listener;
- listener = listeners + listeners_num;
- listeners_num++;
+ auto listener = Listener();
+ listener.addr = grpc::string(ci->values[0].value.string);
+ listener.port = grpc::string(ci->values[1].value.string);
+ listener.ssl = nullptr;
- listener->addr = strdup(ci->values[0].value.string);
- listener->port = strdup(ci->values[1].value.string);
+ auto ssl_opts = new(grpc::SslServerCredentialsOptions);
+ grpc::SslServerCredentialsOptions::PemKeyCertPair pkcp = {};
+ bool use_ssl = false;
- for (i = 0; i < ci->children_num; i++) {
+ for (int i = 0; i < ci->children_num; i++) {
oconfig_item_t *child = ci->children + i;
- WARNING("grpc: Option `%s` not allowed in <%s> block.",
- child->key, ci->key);
+
+ if (!strcasecmp("EnableSSL", child->key)) {
+ if (cf_util_get_boolean(child, &use_ssl)) {
+ ERROR("grpc: Option `%s` expects a boolean value",
+ child->key);
+ return -1;
+ }
+ }
+ else if (!strcasecmp("SSLRootCerts", child->key)) {
+ char *certs = NULL;
+ if (cf_util_get_string(child, &certs)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ ssl_opts->pem_root_certs = read_file(certs);
+ }
+ else if (!strcasecmp("SSLServerKey", child->key)) {
+ char *key = NULL;
+ if (cf_util_get_string(child, &key)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.private_key = read_file(key);
+ }
+ else if (!strcasecmp("SSLServerCert", child->key)) {
+ char *cert = NULL;
+ if (cf_util_get_string(child, &cert)) {
+ ERROR("grpc: Option `%s` expects a string value",
+ child->key);
+ return -1;
+ }
+ pkcp.cert_chain = read_file(cert);
+ }
+ else {
+ WARNING("grpc: Option `%s` not allowed in <%s> block.",
+ child->key, ci->key);
+ }
}
+ ssl_opts->pem_key_cert_pairs.push_back(pkcp);
+ if (use_ssl)
+ listener.ssl = ssl_opts;
+ else
+ delete(ssl_opts);
+
+ listeners.push_back(listener);
return 0;
} /* c_grpc_config_listen() */