From: Florian Forster Date: Thu, 4 Oct 2018 18:07:14 +0000 (+0200) Subject: cloud_pubsub plugin: New plugin publishing to or subscribing from Google Cloud Pub... X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=refs%2Fheads%2Fcloud_pubsub cloud_pubsub plugin: New plugin publishing to or subscribing from Google Cloud Pub/Sub. --- diff --git a/Makefile.am b/Makefile.am index 39f1d3ac..059f30a4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -792,6 +792,20 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS) chrony_la_LIBADD = -lm endif +if BUILD_PLUGIN_CLOUD_PUBSUB +pkglib_LTLIBRARIES += cloud_pubsub.la +cloud_pubsub_la_SOURCES = src/cloud_pubsub.c +cloud_pubsub_la_LDFLAGS = $(PLUGIN_LDFLAGS) +cloud_pubsub_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS) +cloud_pubsub_la_LIBADD = \ + libformat_json.la \ + libgce.la \ + liboauth.la \ + libparse_json.la \ + libstrbuf.la \ + $(BUILD_WITH_LIBCURL_LIBS) +endif + if BUILD_PLUGIN_CONNTRACK pkglib_LTLIBRARIES += conntrack.la conntrack_la_SOURCES = src/conntrack.c diff --git a/configure.ac b/configure.ac index 46279652..e27f6c5d 100644 --- a/configure.ac +++ b/configure.ac @@ -6345,6 +6345,7 @@ plugin_battery="no" plugin_bind="no" plugin_ceph="no" plugin_cgroups="no" +plugin_cloud_pubsub="no" plugin_conntrack="no" plugin_contextswitch="no" plugin_cpu="no" @@ -6562,6 +6563,7 @@ if test "x$with_libcurl" = "xyes" && test "x$with_libyajl" = "xyes"; then fi if test "x$with_libcurl" = "xyes" && test "x$with_libssl" = "xyes" && test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then + plugin_cloud_pubsub="yes" plugin_write_stackdriver="yes" fi @@ -6767,6 +6769,7 @@ AC_PLUGIN([bind], [$plugin_bind], [ISC Bind nameserv AC_PLUGIN([ceph], [$plugin_ceph], [Ceph daemon statistics]) AC_PLUGIN([cgroups], [$plugin_cgroups], [CGroups CPU usage accounting]) AC_PLUGIN([chrony], [yes], [Chrony statistics]) +AC_PLUGIN([cloud_pubsub], [$plugin_cloud_pubsub], [Google Cloud Pub/Sub plugin]) AC_PLUGIN([conntrack], [$plugin_conntrack], [nf_conntrack statistics]) AC_PLUGIN([contextswitch], [$plugin_contextswitch], [context switch statistics]) AC_PLUGIN([cpu], [$plugin_cpu], [CPU usage statistics]) @@ -7191,6 +7194,7 @@ AC_MSG_RESULT([ bind . . . . . . . . $enable_bind]) AC_MSG_RESULT([ ceph . . . . . . . . $enable_ceph]) AC_MSG_RESULT([ cgroups . . . . . . . $enable_cgroups]) AC_MSG_RESULT([ chrony. . . . . . . . $enable_chrony]) +AC_MSG_RESULT([ cloud_pubsub . . . . $enable_cloud_pubsub]) AC_MSG_RESULT([ conntrack . . . . . . $enable_conntrack]) AC_MSG_RESULT([ contextswitch . . . . $enable_contextswitch]) AC_MSG_RESULT([ cpu . . . . . . . . . $enable_cpu]) diff --git a/src/cloud_pubsub.c b/src/cloud_pubsub.c new file mode 100644 index 00000000..75043cd8 --- /dev/null +++ b/src/cloud_pubsub.c @@ -0,0 +1,947 @@ +/** + * collectd - src/cloud_pubsub.c + * Copyright (C) 2018 Florian Forster + * + * MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * Authors: + * Florian Forster + **/ + +#include "collectd.h" + +#include "common.h" +#include "plugin.h" +#include "utils_format_json.h" +#include "utils_gce.h" +#include "utils_oauth.h" +#include "utils_parse_json.h" +#include "utils_strbuf.h" + +#include +#include +#include +#include +#include /* for BIO_f_base64() */ + +#include +#include + +#ifndef CLOUD_PUBSUB_BUFFER_SIZE +#define CLOUD_PUBSUB_BUFFER_SIZE 65536 +#endif + +#ifndef CLOUD_PUBSUB_URL +#define CLOUD_PUBSUB_URL "https://pubsub.googleapis.com/v1" +#endif + +#ifndef CLOUD_PUBSUB_SCOPE +#define CLOUD_PUBSUB_SCOPE "https://www.googleapis.com/auth/pubsub" +#endif + +struct topic_s { + char *name; + + char *email; + char *project; + char *url; + _Bool store_rates; + size_t max_messages; + + oauth_t *auth; + CURL *curl; + char curl_errbuf[256]; + strbuf_t *buffer; + pthread_mutex_t lock; +}; +typedef struct topic_s topic_t; + +struct message_s { + char *ack_id; + char *data; + char *message_id; +}; +typedef struct message_s message_t; + +struct pull_response_s { + message_t *messages; + size_t messages_num; +}; +typedef struct pull_response_s pull_response_t; + +struct blob_s { + char *data; + size_t size; +}; +typedef struct blob_s blob_t; + +static size_t write_callback(void *contents, size_t size, size_t nmemb, + void *ud) { + size_t realsize = size * nmemb; + blob_t *blob = ud; + + if ((0x7FFFFFF0 < blob->size) || (0x7FFFFFF0 - blob->size < realsize)) { + ERROR("cloud_pubsub plugin: write_callback: integer overflow"); + return 0; + } + + blob->data = realloc(blob->data, blob->size + realsize + 1); + if (blob->data == NULL) { + /* out of memory! */ + ERROR("cloud_pubsub plugin: write_callback: not enough memory (realloc " + "returned NULL)"); + return 0; + } + + memcpy(blob->data + blob->size, contents, realsize); + blob->size += realsize; + blob->data[blob->size] = 0; + + return realsize; +} /* size_t write_callback */ + +static int access_token(topic_t *t, char *buffer, size_t buffer_size) { + /* t->auth is NULL only if we're running on GCE. */ + assert(t->auth || gce_check()); + + if (t->auth != NULL) + return oauth_access_token(t->auth, buffer, buffer_size); + + return gce_access_token(t->email, buffer, buffer_size); +} + +typedef struct { + int code; + char *message; +} api_error_t; + +static api_error_t *parse_api_error(char const *body) { + char errbuf[1024]; + yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf)); + if (root == NULL) { + ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf); + return NULL; + } + + api_error_t *err = calloc(1, sizeof(*err)); + if (err == NULL) { + ERROR("cloud_pubsub plugin: calloc failed"); + yajl_tree_free(root); + return NULL; + } + + yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL}, + yajl_t_number); + if (code != NULL) { + err->code = YAJL_GET_INTEGER(code); + } + + yajl_val message = yajl_tree_get( + root, (char const *[]){"error", "message", NULL}, yajl_t_string); + if (message != NULL) { + err->message = strdup(YAJL_GET_STRING(message)); + } + + return err; +} + +static char *api_error_string(api_error_t *err, char *buffer, + size_t buffer_size) { + if (err == NULL) { + strncpy(buffer, "Unknown error (API error is NULL)", buffer_size); + } else if (err->message == NULL) { + snprintf(buffer, buffer_size, "API error %d", err->code); + } else { + snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message); + } + + return buffer; +} +#define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024) + +// do_post does a HTTP POST request, assuming a JSON payload and using OAuth +// authentication. Returns -1 on error and the HTTP status code otherwise. +// ret_content, if not NULL, will contain the server's response. +// If ret_content is provided and the server responds with a 4xx or 5xx error, +// an appropriate message will be logged. +static int do_post(topic_t *t, char const *url, void const *payload, + blob_t *ret_content) { + if (t->curl == NULL) { + t->curl = curl_easy_init(); + if (t->curl == NULL) { + ERROR("cloud_pubsub plugin: curl_easy_init() failed"); + return -1; + } + + curl_easy_setopt(t->curl, CURLOPT_ERRORBUFFER, t->curl_errbuf); + curl_easy_setopt(t->curl, CURLOPT_NOSIGNAL, 1L); + } + + curl_easy_setopt(t->curl, CURLOPT_POST, 1L); + curl_easy_setopt(t->curl, CURLOPT_URL, url); + + /* header */ + char tok[256]; + int status = access_token(t, tok, sizeof(tok)); + if (status != 0) { + ERROR("cloud_pubsub plugin: getting access token failed with status %d", + status); + return -1; + } + char auth_header[384]; + snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", tok); + + struct curl_slist *headers = + curl_slist_append(NULL, "Content-Type: application/json"); + headers = curl_slist_append(headers, auth_header); + curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_setopt(t->curl, CURLOPT_POSTFIELDS, payload); + + curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, + ret_content ? write_callback : NULL); + curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, ret_content); + + status = curl_easy_perform(t->curl); + + /* clean up that has to happen in any case */ + curl_slist_free_all(headers); + curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, NULL); + curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, NULL); + curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, NULL); + + if (status != CURLE_OK) { + ERROR("cloud_pubsub plugin: POST %s failed: %s", url, t->curl_errbuf); + sfree(ret_content->data); + ret_content->size = 0; + return -1; + } + + long http_code = 0; + curl_easy_getinfo(t->curl, CURLINFO_RESPONSE_CODE, &http_code); + + if (ret_content != NULL) { + if ((status >= 400) && (status < 500)) { + ERROR("write_stackdriver plugin: POST %s: %s", url, + API_ERROR_STRING(parse_api_error(ret_content->data))); + } else if (status >= 500) { + WARNING("write_stackdriver plugin: POST %s: %s", url, ret_content->data); + } + } + + return (int)http_code; +} + +static void topic_free(topic_t *t) { + if (t == NULL) + return; + + sfree(t->name); + oauth_destroy(t->auth); + sfree(t->email); + sfree(t->project); + if (t->curl != NULL) + curl_easy_cleanup(t->curl); + strbuf_destroy(t->buffer); + sfree(t); +} /* void topic_free */ + +static topic_t *topic_alloc(void) { + topic_t *t = calloc(1, sizeof(*t)); + if (t == NULL) + return NULL; + + t->url = strdup(CLOUD_PUBSUB_URL); + t->max_messages = 1000; + pthread_mutex_init(&t->lock, /* attr = */ NULL); + + return t; +} /* topic_t *topic_alloc */ + +/* pubsub_topic_publish calls a topic's "publish" method. */ +static int pubsub_topic_publish(topic_t *t) { + char url[1024]; + snprintf(url, sizeof(url), "%s/projects/%s/topics/%s:publish", t->url, + t->project, t->name); + + char *payload = t->buffer->buffer; + + blob_t response = {0}; + + int status = do_post(t, url, payload, &response); + if (status == -1) { + ERROR("cloud_pubsub plugin: POST %s failed", url); + return -1; + } + sfree(response.data); + + if (status != 200) { + ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, " + "want 200", + url, status); + return -1; + } + return 0; +} /* int pubsub_topic_publish */ + +static int base64_encode(char *buffer, size_t buffer_size) { + /* Set up the memory-base64 chain */ + BIO *b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + b64 = BIO_push(b64, BIO_new(BIO_s_mem())); + + /* Write data to the chain */ + BIO_write(b64, buffer, strlen(buffer)); + if (BIO_flush(b64) != 1) { + BIO_free_all(b64); + return (-1); + } + + /* Never fails */ + BUF_MEM *ptr; + BIO_get_mem_ptr(b64, &ptr); + + if (buffer_size <= ptr->length) { + BIO_free_all(b64); + return (ENOMEM); + } + + /* Copy data to buffer. */ + memcpy(buffer, ptr->data, ptr->length); + buffer[ptr->length] = 0; + + BIO_free_all(b64); + return 0; +} /* int base64_encode */ + +static int base64_decode(char const *in, char *buffer, size_t buffer_size) { + /* Set up the memory-base64 chain */ + BIO *b64 = BIO_new(BIO_f_base64()); + BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL); + b64 = BIO_push(b64, BIO_new_mem_buf(in, -1)); + + int status = BIO_read(b64, buffer, buffer_size); + if (status < 0) { + BIO_free_all(b64); + return status; + } + if (status >= buffer_size) { + BIO_free_all(b64); + return ENOMEM; + } + + buffer[status] = 0; + + BIO_free_all(b64); + return 0; +} + +/* parse_pull parses the JSON returned by a "pull" request and returns the + * messages in a pull_response_t*. */ +static pull_response_t *parse_pull(char const *body) { + char const *path[] = {"receivedMessages", NULL}; + + char errbuf[1024]; + yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf)); + if (root == NULL) { + ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf); + return NULL; + } + + yajl_val messages = yajl_tree_get(root, path, yajl_t_array); + if ((messages == NULL) || !YAJL_IS_ARRAY(messages)) { + yajl_tree_free(root); + return NULL; + } + + pull_response_t *res = calloc(1, sizeof(*res)); + if (res == NULL) { + yajl_tree_free(root); + return NULL; + } + + for (size_t i = 0; i < YAJL_GET_ARRAY(messages)->len; i++) { + yajl_val msg_var = YAJL_GET_ARRAY(messages)->values[i]; + char const *ackid_path[] = {"ackId", NULL}; + char const *data_path[] = {"message", "data", NULL}; + char const *msg_id_path[] = {"message", "messageId", NULL}; + + yajl_val ackid_var = yajl_tree_get(msg_var, ackid_path, yajl_t_string); + yajl_val data_var = yajl_tree_get(msg_var, data_path, yajl_t_string); + yajl_val msg_id_var = yajl_tree_get(msg_var, msg_id_path, yajl_t_string); + + if ((ackid_var == NULL) || (data_var == NULL) || (msg_id_var == NULL)) + continue; + + message_t *msg = realloc(res->messages, + sizeof(*res->messages) * (res->messages_num + 1)); + if (msg == NULL) + continue; + res->messages = msg; + msg = res->messages + res->messages_num; + + msg->ack_id = strdup(YAJL_GET_STRING(ackid_var)); + msg->message_id = strdup(YAJL_GET_STRING(msg_id_var)); + msg->data = strdup(YAJL_GET_STRING(data_var)); + + res->messages_num++; + } + + yajl_tree_free(root); + return res; +} /* pull_response_t *parse_pull */ + +/* pubsub_topic_pull calls a subscription's "pull" method. */ +static int pubsub_topic_pull(topic_t *t, blob_t *ret_content) { + char url[1024]; + snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:pull", t->url, + t->project, t->name); + + char payload[128]; + snprintf(payload, sizeof(payload), + "{\"returnImmediately\":\"false\",\"maxMessages\":\"%zu\"}", + t->max_messages); + + int status = do_post(t, url, payload, ret_content); + if (status == -1) + return -1; + + if (status != 200) { + ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, " + "want 200", + url, status); + sfree(ret_content->data); + ret_content->size = 0; + return -1; + } + return 0; +} /* char *pubsub_topic_pull */ + +static char *format_ack(char **ack_ids, size_t ack_ids_num) { + yajl_gen gen; + unsigned char const *out; + size_t out_size; + char *ret; + char key[] = "ackIds"; + size_t i; + + gen = yajl_gen_alloc(NULL); + if (gen == NULL) + return NULL; + + yajl_gen_map_open(gen); + yajl_gen_string(gen, (unsigned char *)&key[0], (unsigned int)strlen(key)); + yajl_gen_array_open(gen); + for (i = 0; i < ack_ids_num; i++) + yajl_gen_string(gen, (unsigned char *)ack_ids[i], + (unsigned int)strlen(ack_ids[i])); + yajl_gen_array_close(gen); + yajl_gen_map_close(gen); + + if (yajl_gen_get_buf(gen, &out, &out_size) != yajl_gen_status_ok) { + yajl_gen_free(gen); + return NULL; + } + + ret = strdup((char const *)out); + yajl_gen_free(gen); + return ret; +} /* char *format_ack */ + +/* pubsub_topic_ack calls a subscription's "ack" method. */ +static int pubsub_topic_ack(topic_t *t, char **ack_ids, size_t ack_ids_num) { + char url[1024]; + snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:acknowledge", + t->url, t->project, t->name); + + char *payload = format_ack(ack_ids, ack_ids_num); + + blob_t response = {0}; + + int status = do_post(t, url, payload, &response); + sfree(payload); + if (status == -1) { + ERROR("cloud_pubsub plugin: POST %s failed", url); + return -1; + } + sfree(response.data); + + if (status != 200) { + ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, " + "want 200", + url, status); + return -1; + } + return 0; +} /* int pubsub_topic_ack */ + +/* topic_tryadd tries to add a valud list to the topic's buffer. If + * this fails, for example because there is not enough space, it may leave the + * buffer in a modified / inconsistent state. */ +static int topic_tryadd(topic_t *t, data_set_t const *ds, + value_list_t const *vl) { + char json[1024]; + size_t json_fill = 0; + size_t json_free = sizeof(json); + int status; + + status = format_json_initialize(json, &json_fill, &json_free); + if (status != 0) + return status; + + status = format_json_value_list(json, &json_fill, &json_free, ds, vl, + t->store_rates); + if (status != 0) + return status; + + status = format_json_finalize(json, &json_fill, &json_free); + if (status != 0) + return status; + + status = base64_encode(json, sizeof(json)); + if (status != 0) + return -1; + + if (t->buffer->used == 0) /* initialize */ + status = strbuf_add(t->buffer, "{\"messages\":[{\"data\":\""); + else + status = strbuf_add(t->buffer, "\"},{\"data\":\""); + if (status != 0) + return status; + + return strbuf_add(t->buffer, json); +} /* int topic_tryadd */ + +/* topic_add adds a value list to the topic's buffer in an + * all-or-nothing fashion, i.e. if there is not enough space, it will not + * modify + * the buffer and return ENOMEM. */ +static int topic_add(topic_t *t, data_set_t const *ds, value_list_t const *vl) { + size_t saved_used = t->buffer->used; + size_t saved_free = t->buffer->free; + int status; + + status = topic_tryadd(t, ds, vl); + if ((status == 0) && (t->buffer->free < 5)) + status = ENOMEM; + + if (status == 0) + return 0; + + t->buffer->used = saved_used; + t->buffer->free = saved_free; + t->buffer->buffer[t->buffer->used] = 0; + + return status; +} /* int topic_add */ + +static int handle_message(message_t msg) { + size_t json_size = ((strlen(msg.data) * 3) / 4) + 1; + char json[json_size]; + + int status = base64_decode(msg.data, json, json_size); + if (status != 0) { + ERROR("cloud_pubsub plugin: base64_decode failed: %d", status); + return status; + } + + value_list_t **value_lists = NULL; + size_t value_lists_num = 0; + status = parse_json(json, &value_lists, &value_lists_num); + if (status != 0) { + ERROR("cloud_pubsub plugin: parse_json() failed: %d", status); + return status; + } + + for (size_t i = 0; i < value_lists_num; i++) { + value_list_t *vl = value_lists[i]; + + int status = plugin_dispatch_values(vl); + if (status != 0) { + NOTICE("cloud_pubsub plugin: plugin_dispatch_values() failed: %d", + status); + } + + sfree(vl->values); + sfree(vl); + } + + sfree(value_lists); + return 0; +} /* int handle_message */ + +static int topic_pull(topic_t *t) { + blob_t json = {0}; + int status = pubsub_topic_pull(t, &json); + if (status != 0) { + return status; + } + + pull_response_t *res = parse_pull(json.data); + sfree(json.data); + if (res == NULL) + return -1; + + char **ack_ids = NULL; + size_t ack_ids_num = 0; + for (size_t i = 0; i < res->messages_num; i++) { + message_t msg = res->messages[i]; + + int status = handle_message(msg); + if (status == 0) + strarray_add(&ack_ids, &ack_ids_num, msg.ack_id); + + sfree(msg.ack_id); + sfree(msg.data); + sfree(msg.message_id); + } + + sfree(res->messages); + sfree(res); + + if (ack_ids_num > 0) { + int status = pubsub_topic_ack(t, ack_ids, ack_ids_num); + strarray_free(ack_ids, ack_ids_num); + return status; + } + + return 0; +} /* int topic_pull */ + +static void *receive_thread(void *arg) { + topic_t *t = arg; + cdtime_t delay = 0; + + while (42) { + int status = topic_pull(t); + if (status == 0) { + delay = 0; + continue; + } + + /* failure: use exponential backoff. */ + if (delay == 0) { + delay = MS_TO_CDTIME_T(64); + } else { + delay = 2 * delay; + } + if (delay > TIME_T_TO_CDTIME_T(10)) { + delay = TIME_T_TO_CDTIME_T(10); + } + NOTICE( + "cloud_pubsub plugin: topic_pull() failed, sleeping for %.3f seconds.", + CDTIME_T_TO_DOUBLE(delay)); + + struct timespec ts = CDTIME_T_TO_TIMESPEC(delay); + while (ts.tv_sec != 0 || ts.tv_nsec != 0) { + int status = nanosleep(&ts, &ts); + if (status == 0) { + break; + } + } + } + + return (NULL); +} /* void *receive_thread */ + +/* topic_flush_locked flushes a topic's internal buffer. You must hold t->lock + * when calling this function. */ +static int topic_flush_locked(topic_t *t) { + int status; + + if (t->buffer->used == 0) + return 0; + + /* finalize the buffer */ + status = strbuf_add(t->buffer, "\"}]}"); + if (status != 0) { + strbuf_reset(t->buffer); + return status; + } + + status = pubsub_topic_publish(t); + strbuf_reset(t->buffer); + + return status; +} /* int topic_flush_locked */ + +static int cps_flush(__attribute__((unused)) cdtime_t timeout, + __attribute__((unused)) char const *id, user_data_t *ud) { + topic_t *t = ud->data; + + pthread_mutex_lock(&t->lock); + int status = topic_flush_locked(t); + pthread_mutex_unlock(&t->lock); + + return status; +} /* int cps_flush */ + +static int cps_write(data_set_t const *ds, value_list_t const *vl, + user_data_t *ud) { + topic_t *t = ud->data; + + pthread_mutex_lock(&t->lock); + + int status = topic_add(t, ds, vl); + if (status != ENOMEM) { + if (status != 0) + ERROR("cloud_pubsub plugin: topic_add (\"%s\") failed with status %d.", + t->name, status); + pthread_mutex_unlock(&t->lock); + return status; + } + + status = topic_flush_locked(t); + if (status != 0) { + ERROR("cloud_pubsub plugin: topic_flush_locked (\"%s\") failed with status " + "%d.", + t->name, status); + pthread_mutex_unlock(&t->lock); + return status; + } + + status = topic_add(t, ds, vl); + if (status != 0) + ERROR("cloud_pubsub plugin: topic_add[retry] (\"%s\") failed with status " + "%d.", + t->name, status); + + pthread_mutex_unlock(&t->lock); + return status; +} /* int cps_write */ + +static int cps_init(void) { + /* Call this while collectd is still single-threaded to avoid + * initialization issues in libgcrypt. */ + curl_global_init(CURL_GLOBAL_SSL); + + ERR_load_crypto_strings(); + + return 0; +} /* int cps_init */ + +static void check_scope(char const *email) /* {{{ */ +{ + char *scope = gce_scope(email); + if (scope == NULL) { + WARNING("cloud_pubsub plugin: Unable to determine scope of this " + "instance."); + return; + } + + if (strstr(scope, CLOUD_PUBSUB_SCOPE) == NULL) { + size_t scope_len; + + /* Strip trailing newline characers for printing. */ + scope_len = strlen(scope); + while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1]))) + scope[--scope_len] = 0; + + WARNING("cloud_pubsub plugin: The determined scope of this instance " + "(\"%s\") does not contain the monitoring scope (\"%s\"). You need " + "to add this scope to the list of scopes passed to gcutil with " + "--service_account_scopes when creating the instance. " + "Alternatively, to use this plugin on an instance which does not " + "have this scope, use a Service Account.", + scope, CLOUD_PUBSUB_SCOPE); + } + + sfree(scope); +} /* }}} void check_scope */ + +static int cps_config_topic(oconfig_item_t *ci) { + topic_t *t = topic_alloc(); + if (t == NULL) + return ENOMEM; + + if (cf_util_get_string(ci, &t->name) != 0) { + topic_free(t); + return -1; + } + + char *credential_file = NULL; + int conf_buffer_size = CLOUD_PUBSUB_BUFFER_SIZE; + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Project", child->key) == 0) + cf_util_get_string(child, &t->project); + else if (strcasecmp("Email", child->key) == 0) + cf_util_get_string(child, &t->email); + else if (strcasecmp("Url", child->key) == 0) + cf_util_get_string(child, &t->url); + else if (strcasecmp("CredentialFile", child->key) == 0) + cf_util_get_string(child, &credential_file); + else if (strcasecmp("StoreRates", child->key) == 0) + cf_util_get_boolean(child, &t->store_rates); + else if (strcasecmp("BufferSize", child->key) == 0) { + cf_util_get_int(child, &conf_buffer_size); + if (conf_buffer_size < 1024) { + ERROR("cloud_pubsub plugin: BufferSize %d is too small. Using 1024 " + "byte.", + conf_buffer_size); + conf_buffer_size = 1024; + } + } else if (strcasecmp("MaxMessages", child->key) == 0) { + int max_messages = 0; + if (cf_util_get_int(child, &max_messages) != 0) { + continue; + } + if (max_messages < 1) { + ERROR("cloud_pubsub plugin: MaxMessages %d is too small.", + max_messages); + continue; + } + t->max_messages = (size_t)max_messages; + } else + ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"", + child->key); + } + + /* Set up authentication */ + /* Option 1: Credentials file given => use service account */ + if (credential_file != NULL) { + oauth_google_t cfg = + oauth_create_google_file(credential_file, CLOUD_PUBSUB_SCOPE); + if (cfg.oauth == NULL) { + ERROR("cloud_pubsub plugin: oauth_create_google_file failed"); + topic_free(t); + return EINVAL; + } + t->auth = cfg.oauth; + + if (t->project == NULL) { + t->project = cfg.project_id; + INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"", + t->project); + } else { + sfree(cfg.project_id); + } + } + + /* Option 2: Look for credentials in well-known places */ + if (t->auth == NULL) { + oauth_google_t cfg = oauth_create_google_default(CLOUD_PUBSUB_SCOPE); + t->auth = cfg.oauth; + + if (t->project == NULL) { + t->project = cfg.project_id; + INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"", + t->project); + } else { + sfree(cfg.project_id); + } + } + + if ((t->auth != NULL) && (t->email != NULL)) { + NOTICE("cloud_pubsub plugin: A service account email was configured " + "but is " + "not used for authentication because %s used instead.", + (credential_file != NULL) ? "a credential file was" + : "application default credentials were"); + } + + /* Option 3: Running on GCE => use metadata service */ + if ((t->auth == NULL) && gce_check()) { + check_scope(t->email); + } else if (t->auth == NULL) { + ERROR("cloud_pubsub plugin: Unable to determine credentials. Please " + "either " + "specify the \"Credentials\" option or set up Application Default " + "Credentials."); + topic_free(t); + return EINVAL; + } + + if ((t->project == NULL) && gce_check()) { + t->project = gce_project_id(); + } + if (t->project == NULL) { + ERROR("cloud_pubsub plugin: Unable to determine the project number. " + "Please specify the \"Project\" option manually."); + topic_free(t); + return EINVAL; + } + + if (strcasecmp("Publish", ci->key) == 0) { + t->buffer = strbuf_create((size_t)conf_buffer_size); + if (t->buffer == NULL) { + ERROR("cloud_pubsub plugin: strbuf_create failed."); + topic_free(t); + return -1; + } + + assert(t->name != NULL); + assert(t->project != NULL); + + char cbname[128]; + snprintf(cbname, sizeof(cbname), "cloud_pubsub/%s", t->name); + + plugin_register_write(cbname, cps_write, + &(user_data_t){ + .data = t, .free_func = (void *)topic_free, + }); + plugin_register_flush(cbname, cps_flush, &(user_data_t){.data = t}); + } else { /* if (strcasecmp ("Subscribe", ci->key) == 0) */ + pthread_t tid; + + /* TODO(octo): Store thread_id and kill threads in shutdown. */ + int status = plugin_thread_create(&tid, + /* attrs = */ NULL, receive_thread, + /* arg = */ t, + /* name = */ "cloud_pubsub recv"); + if (status != 0) { + char errbuf[1024]; + ERROR("cloud_pubsub plugin: pthread_create failed: %s", + sstrerror(errno, errbuf, sizeof(errbuf))); + topic_free(t); + return -1; + } + } + + return 0; +} /* int cps_config_topic */ + +static int cps_config(oconfig_item_t *ci) { + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + int status = -1; + + if (strcasecmp("Publish", child->key) == 0) + status = cps_config_topic(child); + else if (strcasecmp("Subscribe", child->key) == 0) + status = cps_config_topic(child); + else + ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"", + child->key); + + if (status != 0) + return status; + } + + return 0; +} /* int cps_config */ + +void module_register(void) { + plugin_register_complex_config("cloud_pubsub", cps_config); + plugin_register_init("cloud_pubsub", cps_init); +} diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 94214759..77698d89 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -104,6 +104,7 @@ #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch +#@BUILD_PLUGIN_CLOUD_PUBSUB_TRUE@LoadPlugin cloud_pubsub @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu #@BUILD_PLUGIN_CPUFREQ_TRUE@LoadPlugin cpufreq #@BUILD_PLUGIN_CPUSLEEP_TRUE@LoadPlugin cpusleep @@ -386,6 +387,16 @@ # IgnoreSelected false # +# +# +# CredentialFile "/path/to/creds.json" +# Project "project_id" +# StoreRates true +# BufferSize 65535 +# Url "https://pubsub.googleapis.com/v1" +# +# + # # ReportByCpu true # ReportByState true diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index da8e8736..a356017c 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -1574,6 +1574,114 @@ Connection timeout in seconds. Defaults to B<2>. =back +=head2 Plugin C + +The B plugin sends metrics to I, a highly +available message queueing service. The data is encoded in I JSON +export format. + +B + + + + CredentialFile "/path/to/credentials.json" + Project "project_name" + Email "0123456789012-abcdefghijklmnopqrstuvwxyzabcdef@developer.gserviceaccount.com + + StoreRates true + BufferSize 65536 + + + CredentialFile "/path/to/credentials.json" + Project "project_name" + Email "0123456789012-abcdefghijklmnopqrstuvwxyzabcdef@developer.gserviceaccount.com + + + +The configuration consists of one or more C blocks, each configuring +one topic to post metrics to. Inside each C block, the following +configuration options are available: + +=over 4 + +=item B I + +Path to a JSON credentials file holding the credentials for a GCP service +account. + +If B is not specified, the plugin uses I. That means which credentials are used depends on the environment: + +=over 4 + +=item + +The environment variable C is checked. If this +variable is specified it should point to a JSON file that defines the +credentials. + +=item + +The path C<${HOME}/.config/gcloud/application_default_credentials.json> is +checked. This is where credentials used by the I command line utility +are stored. You can use C to create +these credentials. + +Please note that these credentials are often of your personal account, not a +service account, and are therefore unfit to be used in a production +environment. + +=item + +When running on GCE, the built-in service account associated with the virtual +machine instance is used. +See also the B option below. + +=back + +For B blocks, the service account / user requires the +C role. + +=item B I + +The I or the I of the I. The +I is a string identifying the GCP project, which you can chose +freely when creating a new project. The I is a 12-digit decimal +number. You can look up both on the I. + +This setting is optional. If not set, the project ID is read from the +credentials file or determined from the GCE's metadata service. + +=item B I (GCE only) + +Choses the GCE I used for authentication. + +Each GCE instance has a C I but may also be +associated with additional I. This is often used to restrict +the permissions of services running on the GCE instance to the required +minimum. The I requires the +C scope. When multiple I are available, this option selects which one is used by +I. + +=item B B|B (Publish only) + +If set to true, counters are converted to a rate before submitting. +Defaults to B because this plugin itself cannot consume non-gauge +metrics that have been converted to rates. + +=item B I (Publish only) + +Sets the size of the buffer used to build requests to I, in +bytes. Must be at least 1024Ebytes. Defaults to 65536 (64EkiByte). + +=item B I (Subscribe only) + +Sets the maximum number of messages to receive from Cloud Pub/Sub at once. +Defaults to 1000. + +=back + =head2 Plugin C This plugin collects IP conntrack statistics.