cloud_pubsub plugin: New plugin publishing to or subscribing from Google Cloud Pub... cloud_pubsub
authorFlorian Forster <octo@collectd.org>
Thu, 4 Oct 2018 18:07:14 +0000 (20:07 +0200)
committerFlorian Forster <octo@collectd.org>
Mon, 8 Oct 2018 08:24:58 +0000 (10:24 +0200)
Makefile.am
configure.ac
src/cloud_pubsub.c [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod

index 39f1d3a..059f30a 100644 (file)
@@ -792,6 +792,20 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 chrony_la_LIBADD = -lm
 endif
 
+if BUILD_PLUGIN_CLOUD_PUBSUB
+pkglib_LTLIBRARIES += cloud_pubsub.la
+cloud_pubsub_la_SOURCES = src/cloud_pubsub.c
+cloud_pubsub_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+cloud_pubsub_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBCURL_CFLAGS)
+cloud_pubsub_la_LIBADD = \
+       libformat_json.la \
+       libgce.la \
+       liboauth.la \
+       libparse_json.la \
+       libstrbuf.la \
+       $(BUILD_WITH_LIBCURL_LIBS)
+endif
+
 if BUILD_PLUGIN_CONNTRACK
 pkglib_LTLIBRARIES += conntrack.la
 conntrack_la_SOURCES = src/conntrack.c
index 4627965..e27f6c5 100644 (file)
@@ -6345,6 +6345,7 @@ plugin_battery="no"
 plugin_bind="no"
 plugin_ceph="no"
 plugin_cgroups="no"
+plugin_cloud_pubsub="no"
 plugin_conntrack="no"
 plugin_contextswitch="no"
 plugin_cpu="no"
@@ -6562,6 +6563,7 @@ if test "x$with_libcurl" = "xyes" && test "x$with_libyajl" = "xyes"; then
 fi
 
 if test "x$with_libcurl" = "xyes" && test "x$with_libssl" = "xyes" && test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
+  plugin_cloud_pubsub="yes"
   plugin_write_stackdriver="yes"
 fi
 
@@ -6767,6 +6769,7 @@ AC_PLUGIN([bind],                [$plugin_bind],              [ISC Bind nameserv
 AC_PLUGIN([ceph],                [$plugin_ceph],              [Ceph daemon statistics])
 AC_PLUGIN([cgroups],             [$plugin_cgroups],           [CGroups CPU usage accounting])
 AC_PLUGIN([chrony],              [yes],                       [Chrony statistics])
+AC_PLUGIN([cloud_pubsub],        [$plugin_cloud_pubsub],      [Google Cloud Pub/Sub plugin])
 AC_PLUGIN([conntrack],           [$plugin_conntrack],         [nf_conntrack statistics])
 AC_PLUGIN([contextswitch],       [$plugin_contextswitch],     [context switch statistics])
 AC_PLUGIN([cpu],                 [$plugin_cpu],               [CPU usage statistics])
@@ -7191,6 +7194,7 @@ AC_MSG_RESULT([    bind  . . . . . . . . $enable_bind])
 AC_MSG_RESULT([    ceph  . . . . . . . . $enable_ceph])
 AC_MSG_RESULT([    cgroups . . . . . . . $enable_cgroups])
 AC_MSG_RESULT([    chrony. . . . . . . . $enable_chrony])
+AC_MSG_RESULT([    cloud_pubsub  . . . . $enable_cloud_pubsub])
 AC_MSG_RESULT([    conntrack . . . . . . $enable_conntrack])
 AC_MSG_RESULT([    contextswitch . . . . $enable_contextswitch])
 AC_MSG_RESULT([    cpu . . . . . . . . . $enable_cpu])
diff --git a/src/cloud_pubsub.c b/src/cloud_pubsub.c
new file mode 100644 (file)
index 0000000..75043cd
--- /dev/null
@@ -0,0 +1,947 @@
+/**
+ * collectd - src/cloud_pubsub.c
+ * Copyright (C) 2018  Florian Forster
+ *
+ * MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Florian Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_format_json.h"
+#include "utils_gce.h"
+#include "utils_oauth.h"
+#include "utils_parse_json.h"
+#include "utils_strbuf.h"
+
+#include <curl/curl.h>
+#include <openssl/bio.h>
+#include <openssl/buffer.h>
+#include <openssl/err.h>
+#include <openssl/evp.h> /* for BIO_f_base64() */
+
+#include <yajl/yajl_gen.h>
+#include <yajl/yajl_tree.h>
+
+#ifndef CLOUD_PUBSUB_BUFFER_SIZE
+#define CLOUD_PUBSUB_BUFFER_SIZE 65536
+#endif
+
+#ifndef CLOUD_PUBSUB_URL
+#define CLOUD_PUBSUB_URL "https://pubsub.googleapis.com/v1"
+#endif
+
+#ifndef CLOUD_PUBSUB_SCOPE
+#define CLOUD_PUBSUB_SCOPE "https://www.googleapis.com/auth/pubsub"
+#endif
+
+struct topic_s {
+  char *name;
+
+  char *email;
+  char *project;
+  char *url;
+  _Bool store_rates;
+  size_t max_messages;
+
+  oauth_t *auth;
+  CURL *curl;
+  char curl_errbuf[256];
+  strbuf_t *buffer;
+  pthread_mutex_t lock;
+};
+typedef struct topic_s topic_t;
+
+struct message_s {
+  char *ack_id;
+  char *data;
+  char *message_id;
+};
+typedef struct message_s message_t;
+
+struct pull_response_s {
+  message_t *messages;
+  size_t messages_num;
+};
+typedef struct pull_response_s pull_response_t;
+
+struct blob_s {
+  char *data;
+  size_t size;
+};
+typedef struct blob_s blob_t;
+
+static size_t write_callback(void *contents, size_t size, size_t nmemb,
+                             void *ud) {
+  size_t realsize = size * nmemb;
+  blob_t *blob = ud;
+
+  if ((0x7FFFFFF0 < blob->size) || (0x7FFFFFF0 - blob->size < realsize)) {
+    ERROR("cloud_pubsub plugin: write_callback: integer overflow");
+    return 0;
+  }
+
+  blob->data = realloc(blob->data, blob->size + realsize + 1);
+  if (blob->data == NULL) {
+    /* out of memory! */
+    ERROR("cloud_pubsub plugin: write_callback: not enough memory (realloc "
+          "returned NULL)");
+    return 0;
+  }
+
+  memcpy(blob->data + blob->size, contents, realsize);
+  blob->size += realsize;
+  blob->data[blob->size] = 0;
+
+  return realsize;
+} /* size_t write_callback */
+
+static int access_token(topic_t *t, char *buffer, size_t buffer_size) {
+  /* t->auth is NULL only if we're running on GCE. */
+  assert(t->auth || gce_check());
+
+  if (t->auth != NULL)
+    return oauth_access_token(t->auth, buffer, buffer_size);
+
+  return gce_access_token(t->email, buffer, buffer_size);
+}
+
+typedef struct {
+  int code;
+  char *message;
+} api_error_t;
+
+static api_error_t *parse_api_error(char const *body) {
+  char errbuf[1024];
+  yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
+  if (root == NULL) {
+    ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
+    return NULL;
+  }
+
+  api_error_t *err = calloc(1, sizeof(*err));
+  if (err == NULL) {
+    ERROR("cloud_pubsub plugin: calloc failed");
+    yajl_tree_free(root);
+    return NULL;
+  }
+
+  yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
+                                yajl_t_number);
+  if (code != NULL) {
+    err->code = YAJL_GET_INTEGER(code);
+  }
+
+  yajl_val message = yajl_tree_get(
+      root, (char const *[]){"error", "message", NULL}, yajl_t_string);
+  if (message != NULL) {
+    err->message = strdup(YAJL_GET_STRING(message));
+  }
+
+  return err;
+}
+
+static char *api_error_string(api_error_t *err, char *buffer,
+                              size_t buffer_size) {
+  if (err == NULL) {
+    strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
+  } else if (err->message == NULL) {
+    snprintf(buffer, buffer_size, "API error %d", err->code);
+  } else {
+    snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
+  }
+
+  return buffer;
+}
+#define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
+
+// do_post does a HTTP POST request, assuming a JSON payload and using OAuth
+// authentication. Returns -1 on error and the HTTP status code otherwise.
+// ret_content, if not NULL, will contain the server's response.
+// If ret_content is provided and the server responds with a 4xx or 5xx error,
+// an appropriate message will be logged.
+static int do_post(topic_t *t, char const *url, void const *payload,
+                   blob_t *ret_content) {
+  if (t->curl == NULL) {
+    t->curl = curl_easy_init();
+    if (t->curl == NULL) {
+      ERROR("cloud_pubsub plugin: curl_easy_init() failed");
+      return -1;
+    }
+
+    curl_easy_setopt(t->curl, CURLOPT_ERRORBUFFER, t->curl_errbuf);
+    curl_easy_setopt(t->curl, CURLOPT_NOSIGNAL, 1L);
+  }
+
+  curl_easy_setopt(t->curl, CURLOPT_POST, 1L);
+  curl_easy_setopt(t->curl, CURLOPT_URL, url);
+
+  /* header */
+  char tok[256];
+  int status = access_token(t, tok, sizeof(tok));
+  if (status != 0) {
+    ERROR("cloud_pubsub plugin: getting access token failed with status %d",
+          status);
+    return -1;
+  }
+  char auth_header[384];
+  snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", tok);
+
+  struct curl_slist *headers =
+      curl_slist_append(NULL, "Content-Type: application/json");
+  headers = curl_slist_append(headers, auth_header);
+  curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, headers);
+
+  curl_easy_setopt(t->curl, CURLOPT_POSTFIELDS, payload);
+
+  curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION,
+                   ret_content ? write_callback : NULL);
+  curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, ret_content);
+
+  status = curl_easy_perform(t->curl);
+
+  /* clean up that has to happen in any case */
+  curl_slist_free_all(headers);
+  curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, NULL);
+  curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, NULL);
+  curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, NULL);
+
+  if (status != CURLE_OK) {
+    ERROR("cloud_pubsub plugin: POST %s failed: %s", url, t->curl_errbuf);
+    sfree(ret_content->data);
+    ret_content->size = 0;
+    return -1;
+  }
+
+  long http_code = 0;
+  curl_easy_getinfo(t->curl, CURLINFO_RESPONSE_CODE, &http_code);
+
+  if (ret_content != NULL) {
+    if ((status >= 400) && (status < 500)) {
+      ERROR("write_stackdriver plugin: POST %s: %s", url,
+            API_ERROR_STRING(parse_api_error(ret_content->data)));
+    } else if (status >= 500) {
+      WARNING("write_stackdriver plugin: POST %s: %s", url, ret_content->data);
+    }
+  }
+
+  return (int)http_code;
+}
+
+static void topic_free(topic_t *t) {
+  if (t == NULL)
+    return;
+
+  sfree(t->name);
+  oauth_destroy(t->auth);
+  sfree(t->email);
+  sfree(t->project);
+  if (t->curl != NULL)
+    curl_easy_cleanup(t->curl);
+  strbuf_destroy(t->buffer);
+  sfree(t);
+} /* void topic_free */
+
+static topic_t *topic_alloc(void) {
+  topic_t *t = calloc(1, sizeof(*t));
+  if (t == NULL)
+    return NULL;
+
+  t->url = strdup(CLOUD_PUBSUB_URL);
+  t->max_messages = 1000;
+  pthread_mutex_init(&t->lock, /* attr = */ NULL);
+
+  return t;
+} /* topic_t *topic_alloc */
+
+/* pubsub_topic_publish calls a topic's "publish" method. */
+static int pubsub_topic_publish(topic_t *t) {
+  char url[1024];
+  snprintf(url, sizeof(url), "%s/projects/%s/topics/%s:publish", t->url,
+           t->project, t->name);
+
+  char *payload = t->buffer->buffer;
+
+  blob_t response = {0};
+
+  int status = do_post(t, url, payload, &response);
+  if (status == -1) {
+    ERROR("cloud_pubsub plugin: POST %s failed", url);
+    return -1;
+  }
+  sfree(response.data);
+
+  if (status != 200) {
+    ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+          "want 200",
+          url, status);
+    return -1;
+  }
+  return 0;
+} /* int pubsub_topic_publish */
+
+static int base64_encode(char *buffer, size_t buffer_size) {
+  /* Set up the memory-base64 chain */
+  BIO *b64 = BIO_new(BIO_f_base64());
+  BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+  b64 = BIO_push(b64, BIO_new(BIO_s_mem()));
+
+  /* Write data to the chain */
+  BIO_write(b64, buffer, strlen(buffer));
+  if (BIO_flush(b64) != 1) {
+    BIO_free_all(b64);
+    return (-1);
+  }
+
+  /* Never fails */
+  BUF_MEM *ptr;
+  BIO_get_mem_ptr(b64, &ptr);
+
+  if (buffer_size <= ptr->length) {
+    BIO_free_all(b64);
+    return (ENOMEM);
+  }
+
+  /* Copy data to buffer. */
+  memcpy(buffer, ptr->data, ptr->length);
+  buffer[ptr->length] = 0;
+
+  BIO_free_all(b64);
+  return 0;
+} /* int base64_encode */
+
+static int base64_decode(char const *in, char *buffer, size_t buffer_size) {
+  /* Set up the memory-base64 chain */
+  BIO *b64 = BIO_new(BIO_f_base64());
+  BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+  b64 = BIO_push(b64, BIO_new_mem_buf(in, -1));
+
+  int status = BIO_read(b64, buffer, buffer_size);
+  if (status < 0) {
+    BIO_free_all(b64);
+    return status;
+  }
+  if (status >= buffer_size) {
+    BIO_free_all(b64);
+    return ENOMEM;
+  }
+
+  buffer[status] = 0;
+
+  BIO_free_all(b64);
+  return 0;
+}
+
+/* parse_pull parses the JSON returned by a "pull" request and returns the
+ * messages in a pull_response_t*. */
+static pull_response_t *parse_pull(char const *body) {
+  char const *path[] = {"receivedMessages", NULL};
+
+  char errbuf[1024];
+  yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
+  if (root == NULL) {
+    ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
+    return NULL;
+  }
+
+  yajl_val messages = yajl_tree_get(root, path, yajl_t_array);
+  if ((messages == NULL) || !YAJL_IS_ARRAY(messages)) {
+    yajl_tree_free(root);
+    return NULL;
+  }
+
+  pull_response_t *res = calloc(1, sizeof(*res));
+  if (res == NULL) {
+    yajl_tree_free(root);
+    return NULL;
+  }
+
+  for (size_t i = 0; i < YAJL_GET_ARRAY(messages)->len; i++) {
+    yajl_val msg_var = YAJL_GET_ARRAY(messages)->values[i];
+    char const *ackid_path[] = {"ackId", NULL};
+    char const *data_path[] = {"message", "data", NULL};
+    char const *msg_id_path[] = {"message", "messageId", NULL};
+
+    yajl_val ackid_var = yajl_tree_get(msg_var, ackid_path, yajl_t_string);
+    yajl_val data_var = yajl_tree_get(msg_var, data_path, yajl_t_string);
+    yajl_val msg_id_var = yajl_tree_get(msg_var, msg_id_path, yajl_t_string);
+
+    if ((ackid_var == NULL) || (data_var == NULL) || (msg_id_var == NULL))
+      continue;
+
+    message_t *msg = realloc(res->messages,
+                             sizeof(*res->messages) * (res->messages_num + 1));
+    if (msg == NULL)
+      continue;
+    res->messages = msg;
+    msg = res->messages + res->messages_num;
+
+    msg->ack_id = strdup(YAJL_GET_STRING(ackid_var));
+    msg->message_id = strdup(YAJL_GET_STRING(msg_id_var));
+    msg->data = strdup(YAJL_GET_STRING(data_var));
+
+    res->messages_num++;
+  }
+
+  yajl_tree_free(root);
+  return res;
+} /* pull_response_t *parse_pull */
+
+/* pubsub_topic_pull calls a subscription's "pull" method. */
+static int pubsub_topic_pull(topic_t *t, blob_t *ret_content) {
+  char url[1024];
+  snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:pull", t->url,
+           t->project, t->name);
+
+  char payload[128];
+  snprintf(payload, sizeof(payload),
+           "{\"returnImmediately\":\"false\",\"maxMessages\":\"%zu\"}",
+           t->max_messages);
+
+  int status = do_post(t, url, payload, ret_content);
+  if (status == -1)
+    return -1;
+
+  if (status != 200) {
+    ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+          "want 200",
+          url, status);
+    sfree(ret_content->data);
+    ret_content->size = 0;
+    return -1;
+  }
+  return 0;
+} /* char *pubsub_topic_pull */
+
+static char *format_ack(char **ack_ids, size_t ack_ids_num) {
+  yajl_gen gen;
+  unsigned char const *out;
+  size_t out_size;
+  char *ret;
+  char key[] = "ackIds";
+  size_t i;
+
+  gen = yajl_gen_alloc(NULL);
+  if (gen == NULL)
+    return NULL;
+
+  yajl_gen_map_open(gen);
+  yajl_gen_string(gen, (unsigned char *)&key[0], (unsigned int)strlen(key));
+  yajl_gen_array_open(gen);
+  for (i = 0; i < ack_ids_num; i++)
+    yajl_gen_string(gen, (unsigned char *)ack_ids[i],
+                    (unsigned int)strlen(ack_ids[i]));
+  yajl_gen_array_close(gen);
+  yajl_gen_map_close(gen);
+
+  if (yajl_gen_get_buf(gen, &out, &out_size) != yajl_gen_status_ok) {
+    yajl_gen_free(gen);
+    return NULL;
+  }
+
+  ret = strdup((char const *)out);
+  yajl_gen_free(gen);
+  return ret;
+} /* char *format_ack */
+
+/* pubsub_topic_ack calls a subscription's "ack" method. */
+static int pubsub_topic_ack(topic_t *t, char **ack_ids, size_t ack_ids_num) {
+  char url[1024];
+  snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:acknowledge",
+           t->url, t->project, t->name);
+
+  char *payload = format_ack(ack_ids, ack_ids_num);
+
+  blob_t response = {0};
+
+  int status = do_post(t, url, payload, &response);
+  sfree(payload);
+  if (status == -1) {
+    ERROR("cloud_pubsub plugin: POST %s failed", url);
+    return -1;
+  }
+  sfree(response.data);
+
+  if (status != 200) {
+    ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
+          "want 200",
+          url, status);
+    return -1;
+  }
+  return 0;
+} /* int pubsub_topic_ack */
+
+/* topic_tryadd tries to add a valud list to the topic's buffer. If
+ * this fails, for example because there is not enough space, it may leave the
+ * buffer in a modified / inconsistent state. */
+static int topic_tryadd(topic_t *t, data_set_t const *ds,
+                        value_list_t const *vl) {
+  char json[1024];
+  size_t json_fill = 0;
+  size_t json_free = sizeof(json);
+  int status;
+
+  status = format_json_initialize(json, &json_fill, &json_free);
+  if (status != 0)
+    return status;
+
+  status = format_json_value_list(json, &json_fill, &json_free, ds, vl,
+                                  t->store_rates);
+  if (status != 0)
+    return status;
+
+  status = format_json_finalize(json, &json_fill, &json_free);
+  if (status != 0)
+    return status;
+
+  status = base64_encode(json, sizeof(json));
+  if (status != 0)
+    return -1;
+
+  if (t->buffer->used == 0) /* initialize */
+    status = strbuf_add(t->buffer, "{\"messages\":[{\"data\":\"");
+  else
+    status = strbuf_add(t->buffer, "\"},{\"data\":\"");
+  if (status != 0)
+    return status;
+
+  return strbuf_add(t->buffer, json);
+} /* int topic_tryadd */
+
+/* topic_add adds a value list to the topic's buffer in an
+ * all-or-nothing fashion, i.e. if there is not enough space, it will not
+ * modify
+ * the buffer and return ENOMEM. */
+static int topic_add(topic_t *t, data_set_t const *ds, value_list_t const *vl) {
+  size_t saved_used = t->buffer->used;
+  size_t saved_free = t->buffer->free;
+  int status;
+
+  status = topic_tryadd(t, ds, vl);
+  if ((status == 0) && (t->buffer->free < 5))
+    status = ENOMEM;
+
+  if (status == 0)
+    return 0;
+
+  t->buffer->used = saved_used;
+  t->buffer->free = saved_free;
+  t->buffer->buffer[t->buffer->used] = 0;
+
+  return status;
+} /* int topic_add */
+
+static int handle_message(message_t msg) {
+  size_t json_size = ((strlen(msg.data) * 3) / 4) + 1;
+  char json[json_size];
+
+  int status = base64_decode(msg.data, json, json_size);
+  if (status != 0) {
+    ERROR("cloud_pubsub plugin: base64_decode failed: %d", status);
+    return status;
+  }
+
+  value_list_t **value_lists = NULL;
+  size_t value_lists_num = 0;
+  status = parse_json(json, &value_lists, &value_lists_num);
+  if (status != 0) {
+    ERROR("cloud_pubsub plugin: parse_json() failed: %d", status);
+    return status;
+  }
+
+  for (size_t i = 0; i < value_lists_num; i++) {
+    value_list_t *vl = value_lists[i];
+
+    int status = plugin_dispatch_values(vl);
+    if (status != 0) {
+      NOTICE("cloud_pubsub plugin: plugin_dispatch_values() failed: %d",
+             status);
+    }
+
+    sfree(vl->values);
+    sfree(vl);
+  }
+
+  sfree(value_lists);
+  return 0;
+} /* int handle_message */
+
+static int topic_pull(topic_t *t) {
+  blob_t json = {0};
+  int status = pubsub_topic_pull(t, &json);
+  if (status != 0) {
+    return status;
+  }
+
+  pull_response_t *res = parse_pull(json.data);
+  sfree(json.data);
+  if (res == NULL)
+    return -1;
+
+  char **ack_ids = NULL;
+  size_t ack_ids_num = 0;
+  for (size_t i = 0; i < res->messages_num; i++) {
+    message_t msg = res->messages[i];
+
+    int status = handle_message(msg);
+    if (status == 0)
+      strarray_add(&ack_ids, &ack_ids_num, msg.ack_id);
+
+    sfree(msg.ack_id);
+    sfree(msg.data);
+    sfree(msg.message_id);
+  }
+
+  sfree(res->messages);
+  sfree(res);
+
+  if (ack_ids_num > 0) {
+    int status = pubsub_topic_ack(t, ack_ids, ack_ids_num);
+    strarray_free(ack_ids, ack_ids_num);
+    return status;
+  }
+
+  return 0;
+} /* int topic_pull */
+
+static void *receive_thread(void *arg) {
+  topic_t *t = arg;
+  cdtime_t delay = 0;
+
+  while (42) {
+    int status = topic_pull(t);
+    if (status == 0) {
+      delay = 0;
+      continue;
+    }
+
+    /* failure: use exponential backoff. */
+    if (delay == 0) {
+      delay = MS_TO_CDTIME_T(64);
+    } else {
+      delay = 2 * delay;
+    }
+    if (delay > TIME_T_TO_CDTIME_T(10)) {
+      delay = TIME_T_TO_CDTIME_T(10);
+    }
+    NOTICE(
+        "cloud_pubsub plugin: topic_pull() failed, sleeping for %.3f seconds.",
+        CDTIME_T_TO_DOUBLE(delay));
+
+    struct timespec ts = CDTIME_T_TO_TIMESPEC(delay);
+    while (ts.tv_sec != 0 || ts.tv_nsec != 0) {
+      int status = nanosleep(&ts, &ts);
+      if (status == 0) {
+        break;
+      }
+    }
+  }
+
+  return (NULL);
+} /* void *receive_thread */
+
+/* topic_flush_locked flushes a topic's internal buffer. You must hold t->lock
+ * when calling this function. */
+static int topic_flush_locked(topic_t *t) {
+  int status;
+
+  if (t->buffer->used == 0)
+    return 0;
+
+  /* finalize the buffer */
+  status = strbuf_add(t->buffer, "\"}]}");
+  if (status != 0) {
+    strbuf_reset(t->buffer);
+    return status;
+  }
+
+  status = pubsub_topic_publish(t);
+  strbuf_reset(t->buffer);
+
+  return status;
+} /* int topic_flush_locked */
+
+static int cps_flush(__attribute__((unused)) cdtime_t timeout,
+                     __attribute__((unused)) char const *id, user_data_t *ud) {
+  topic_t *t = ud->data;
+
+  pthread_mutex_lock(&t->lock);
+  int status = topic_flush_locked(t);
+  pthread_mutex_unlock(&t->lock);
+
+  return status;
+} /* int cps_flush */
+
+static int cps_write(data_set_t const *ds, value_list_t const *vl,
+                     user_data_t *ud) {
+  topic_t *t = ud->data;
+
+  pthread_mutex_lock(&t->lock);
+
+  int status = topic_add(t, ds, vl);
+  if (status != ENOMEM) {
+    if (status != 0)
+      ERROR("cloud_pubsub plugin: topic_add (\"%s\") failed with status %d.",
+            t->name, status);
+    pthread_mutex_unlock(&t->lock);
+    return status;
+  }
+
+  status = topic_flush_locked(t);
+  if (status != 0) {
+    ERROR("cloud_pubsub plugin: topic_flush_locked (\"%s\") failed with status "
+          "%d.",
+          t->name, status);
+    pthread_mutex_unlock(&t->lock);
+    return status;
+  }
+
+  status = topic_add(t, ds, vl);
+  if (status != 0)
+    ERROR("cloud_pubsub plugin: topic_add[retry] (\"%s\") failed with status "
+          "%d.",
+          t->name, status);
+
+  pthread_mutex_unlock(&t->lock);
+  return status;
+} /* int cps_write */
+
+static int cps_init(void) {
+  /* Call this while collectd is still single-threaded to avoid
+   * initialization issues in libgcrypt. */
+  curl_global_init(CURL_GLOBAL_SSL);
+
+  ERR_load_crypto_strings();
+
+  return 0;
+} /* int cps_init */
+
+static void check_scope(char const *email) /* {{{ */
+{
+  char *scope = gce_scope(email);
+  if (scope == NULL) {
+    WARNING("cloud_pubsub plugin: Unable to determine scope of this "
+            "instance.");
+    return;
+  }
+
+  if (strstr(scope, CLOUD_PUBSUB_SCOPE) == NULL) {
+    size_t scope_len;
+
+    /* Strip trailing newline characers for printing. */
+    scope_len = strlen(scope);
+    while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
+      scope[--scope_len] = 0;
+
+    WARNING("cloud_pubsub plugin: The determined scope of this instance "
+            "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
+            "to add this scope to the list of scopes passed to gcutil with "
+            "--service_account_scopes when creating the instance. "
+            "Alternatively, to use this plugin on an instance which does not "
+            "have this scope, use a Service Account.",
+            scope, CLOUD_PUBSUB_SCOPE);
+  }
+
+  sfree(scope);
+} /* }}} void check_scope */
+
+static int cps_config_topic(oconfig_item_t *ci) {
+  topic_t *t = topic_alloc();
+  if (t == NULL)
+    return ENOMEM;
+
+  if (cf_util_get_string(ci, &t->name) != 0) {
+    topic_free(t);
+    return -1;
+  }
+
+  char *credential_file = NULL;
+  int conf_buffer_size = CLOUD_PUBSUB_BUFFER_SIZE;
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Project", child->key) == 0)
+      cf_util_get_string(child, &t->project);
+    else if (strcasecmp("Email", child->key) == 0)
+      cf_util_get_string(child, &t->email);
+    else if (strcasecmp("Url", child->key) == 0)
+      cf_util_get_string(child, &t->url);
+    else if (strcasecmp("CredentialFile", child->key) == 0)
+      cf_util_get_string(child, &credential_file);
+    else if (strcasecmp("StoreRates", child->key) == 0)
+      cf_util_get_boolean(child, &t->store_rates);
+    else if (strcasecmp("BufferSize", child->key) == 0) {
+      cf_util_get_int(child, &conf_buffer_size);
+      if (conf_buffer_size < 1024) {
+        ERROR("cloud_pubsub plugin: BufferSize %d is too small. Using 1024 "
+              "byte.",
+              conf_buffer_size);
+        conf_buffer_size = 1024;
+      }
+    } else if (strcasecmp("MaxMessages", child->key) == 0) {
+      int max_messages = 0;
+      if (cf_util_get_int(child, &max_messages) != 0) {
+        continue;
+      }
+      if (max_messages < 1) {
+        ERROR("cloud_pubsub plugin: MaxMessages %d is too small.",
+              max_messages);
+        continue;
+      }
+      t->max_messages = (size_t)max_messages;
+    } else
+      ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
+            child->key);
+  }
+
+  /* Set up authentication */
+  /* Option 1: Credentials file given => use service account */
+  if (credential_file != NULL) {
+    oauth_google_t cfg =
+        oauth_create_google_file(credential_file, CLOUD_PUBSUB_SCOPE);
+    if (cfg.oauth == NULL) {
+      ERROR("cloud_pubsub plugin: oauth_create_google_file failed");
+      topic_free(t);
+      return EINVAL;
+    }
+    t->auth = cfg.oauth;
+
+    if (t->project == NULL) {
+      t->project = cfg.project_id;
+      INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
+           t->project);
+    } else {
+      sfree(cfg.project_id);
+    }
+  }
+
+  /* Option 2: Look for credentials in well-known places */
+  if (t->auth == NULL) {
+    oauth_google_t cfg = oauth_create_google_default(CLOUD_PUBSUB_SCOPE);
+    t->auth = cfg.oauth;
+
+    if (t->project == NULL) {
+      t->project = cfg.project_id;
+      INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
+           t->project);
+    } else {
+      sfree(cfg.project_id);
+    }
+  }
+
+  if ((t->auth != NULL) && (t->email != NULL)) {
+    NOTICE("cloud_pubsub plugin: A service account email was configured "
+           "but is "
+           "not used for authentication because %s used instead.",
+           (credential_file != NULL) ? "a credential file was"
+                                     : "application default credentials were");
+  }
+
+  /* Option 3: Running on GCE => use metadata service */
+  if ((t->auth == NULL) && gce_check()) {
+    check_scope(t->email);
+  } else if (t->auth == NULL) {
+    ERROR("cloud_pubsub plugin: Unable to determine credentials. Please "
+          "either "
+          "specify the \"Credentials\" option or set up Application Default "
+          "Credentials.");
+    topic_free(t);
+    return EINVAL;
+  }
+
+  if ((t->project == NULL) && gce_check()) {
+    t->project = gce_project_id();
+  }
+  if (t->project == NULL) {
+    ERROR("cloud_pubsub plugin: Unable to determine the project number. "
+          "Please specify the \"Project\" option manually.");
+    topic_free(t);
+    return EINVAL;
+  }
+
+  if (strcasecmp("Publish", ci->key) == 0) {
+    t->buffer = strbuf_create((size_t)conf_buffer_size);
+    if (t->buffer == NULL) {
+      ERROR("cloud_pubsub plugin: strbuf_create failed.");
+      topic_free(t);
+      return -1;
+    }
+
+    assert(t->name != NULL);
+    assert(t->project != NULL);
+
+    char cbname[128];
+    snprintf(cbname, sizeof(cbname), "cloud_pubsub/%s", t->name);
+
+    plugin_register_write(cbname, cps_write,
+                          &(user_data_t){
+                              .data = t, .free_func = (void *)topic_free,
+                          });
+    plugin_register_flush(cbname, cps_flush, &(user_data_t){.data = t});
+  } else { /* if (strcasecmp ("Subscribe", ci->key) == 0) */
+    pthread_t tid;
+
+    /* TODO(octo): Store thread_id and kill threads in shutdown. */
+    int status = plugin_thread_create(&tid,
+                                      /* attrs = */ NULL, receive_thread,
+                                      /* arg = */ t,
+                                      /* name = */ "cloud_pubsub recv");
+    if (status != 0) {
+      char errbuf[1024];
+      ERROR("cloud_pubsub plugin: pthread_create failed: %s",
+            sstrerror(errno, errbuf, sizeof(errbuf)));
+      topic_free(t);
+      return -1;
+    }
+  }
+
+  return 0;
+} /* int cps_config_topic */
+
+static int cps_config(oconfig_item_t *ci) {
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+    int status = -1;
+
+    if (strcasecmp("Publish", child->key) == 0)
+      status = cps_config_topic(child);
+    else if (strcasecmp("Subscribe", child->key) == 0)
+      status = cps_config_topic(child);
+    else
+      ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
+            child->key);
+
+    if (status != 0)
+      return status;
+  }
+
+  return 0;
+} /* int cps_config */
+
+void module_register(void) {
+  plugin_register_complex_config("cloud_pubsub", cps_config);
+  plugin_register_init("cloud_pubsub", cps_init);
+}
index 9421475..77698d8 100644 (file)
 #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony
 #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
 #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch
+#@BUILD_PLUGIN_CLOUD_PUBSUB_TRUE@LoadPlugin cloud_pubsub
 @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
 #@BUILD_PLUGIN_CPUFREQ_TRUE@LoadPlugin cpufreq
 #@BUILD_PLUGIN_CPUSLEEP_TRUE@LoadPlugin cpusleep
 #  IgnoreSelected false
 #</Plugin>
 
+#<Plugin cloud_pubsub>
+#  <Publish "topic_name">
+#    CredentialFile "/path/to/creds.json"
+#    Project "project_id"
+#    StoreRates true
+#    BufferSize 65535
+#    Url "https://pubsub.googleapis.com/v1"
+#  </Publish>
+#</Plugin>
+
 #<Plugin cpu>
 #  ReportByCpu true
 #  ReportByState true
index da8e873..a356017 100644 (file)
@@ -1574,6 +1574,114 @@ Connection timeout in seconds. Defaults to B<2>.
 
 =back
 
+=head2 Plugin C<cloud_pubsub>
+
+The B<cloud_pubsub> plugin sends metrics to I<Google Cloud Pub/Sub>, a highly
+available message queueing service. The data is encoded in I<collectd's> JSON
+export format.
+
+B<Synopsis:>
+
+ <Plugin "cloud_pubsub">
+   <Publish "topic_name">
+     CredentialFile "/path/to/credentials.json"
+     Project "project_name"
+     Email "0123456789012-abcdefghijklmnopqrstuvwxyzabcdef@developer.gserviceaccount.com
+
+     StoreRates true
+     BufferSize 65536
+   </Publish>
+   <Subscribe "subscription_name">
+     CredentialFile "/path/to/credentials.json"
+     Project "project_name"
+     Email "0123456789012-abcdefghijklmnopqrstuvwxyzabcdef@developer.gserviceaccount.com
+   </Subscribe>
+ </Plugin>
+
+The configuration consists of one or more C<Publish> blocks, each configuring
+one topic to post metrics to. Inside each C<Publish> block, the following
+configuration options are available:
+
+=over 4
+
+=item B<CredentialFile> I<file>
+
+Path to a JSON credentials file holding the credentials for a GCP service
+account.
+
+If B<CredentialFile> is not specified, the plugin uses I<Application Default
+Credentials>. That means which credentials are used depends on the environment:
+
+=over 4
+
+=item
+
+The environment variable C<GOOGLE_APPLICATION_CREDENTIALS> is checked. If this
+variable is specified it should point to a JSON file that defines the
+credentials.
+
+=item
+
+The path C<${HOME}/.config/gcloud/application_default_credentials.json> is
+checked. This is where credentials used by the I<gcloud> command line utility
+are stored. You can use C<gcloud auth application-default login> to create
+these credentials.
+
+Please note that these credentials are often of your personal account, not a
+service account, and are therefore unfit to be used in a production
+environment.
+
+=item
+
+When running on GCE, the built-in service account associated with the virtual
+machine instance is used.
+See also the B<Email> option below.
+
+=back
+
+For B<Publish> blocks, the service account / user requires the
+C<roles/pubsub.publisher> role.
+
+=item B<Project> I<Name>
+
+The I<Project ID> or the I<Project Number> of the I<Stackdriver Account>. The
+I<Project ID> is a string identifying the GCP project, which you can chose
+freely when creating a new project. The I<Project Number> is a 12-digit decimal
+number. You can look up both on the I<Developer Console>.
+
+This setting is optional. If not set, the project ID is read from the
+credentials file or determined from the GCE's metadata service.
+
+=item B<Email> I<Email> (GCE only)
+
+Choses the GCE I<Service Account> used for authentication.
+
+Each GCE instance has a C<default> I<Service Account> but may also be
+associated with additional I<Service Accounts>. This is often used to restrict
+the permissions of services running on the GCE instance to the required
+minimum. The I<cloud_pubsub plugin> requires the
+C<https://www.googleapis.com/auth/pubsub> scope. When multiple I<Service
+Accounts> are available, this option selects which one is used by
+I<cloud_pubsub plugin>.
+
+=item B<StoreRates> B<false>|B<true> (Publish only)
+
+If set to true, counters are converted to a rate before submitting.
+Defaults to B<false> because this plugin itself cannot consume non-gauge
+metrics that have been converted to rates.
+
+=item B<BufferSize> I<Bytes> (Publish only)
+
+Sets the size of the buffer used to build requests to I<Cloud Pub/Sub>, in
+bytes. Must be at least 1024E<nbsp>bytes. Defaults to 65536 (64E<nbsp>kiByte).
+
+=item B<MaxMessages> I<Number> (Subscribe only)
+
+Sets the maximum number of messages to receive from Cloud Pub/Sub at once.
+Defaults to 1000.
+
+=back
+
 =head2 Plugin C<conntrack>
 
 This plugin collects IP conntrack statistics.