2 * collectd - src/cloud_pubsub.c
3 * Copyright (C) 2018 Florian Forster
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
26 * Florian Forster <octo at collectd.org>
33 #include "utils_format_json.h"
34 #include "utils_gce.h"
35 #include "utils_oauth.h"
36 #include "utils_parse_json.h"
37 #include "utils_strbuf.h"
39 #include <curl/curl.h>
40 #include <openssl/bio.h>
41 #include <openssl/buffer.h>
42 #include <openssl/err.h>
43 #include <openssl/evp.h> /* for BIO_f_base64() */
45 #include <yajl/yajl_gen.h>
46 #include <yajl/yajl_tree.h>
48 #ifndef CLOUD_PUBSUB_BUFFER_SIZE
49 #define CLOUD_PUBSUB_BUFFER_SIZE 65536
52 #ifndef CLOUD_PUBSUB_URL
53 #define CLOUD_PUBSUB_URL "https://pubsub.googleapis.com/v1"
56 #ifndef CLOUD_PUBSUB_SCOPE
57 #define CLOUD_PUBSUB_SCOPE "https://www.googleapis.com/auth/pubsub"
71 char curl_errbuf[256];
75 typedef struct topic_s topic_t;
82 typedef struct message_s message_t;
84 struct pull_response_s {
88 typedef struct pull_response_s pull_response_t;
94 typedef struct blob_s blob_t;
96 static size_t write_callback(void *contents, size_t size, size_t nmemb,
98 size_t realsize = size * nmemb;
101 if ((0x7FFFFFF0 < blob->size) || (0x7FFFFFF0 - blob->size < realsize)) {
102 ERROR("cloud_pubsub plugin: write_callback: integer overflow");
106 blob->data = realloc(blob->data, blob->size + realsize + 1);
107 if (blob->data == NULL) {
109 ERROR("cloud_pubsub plugin: write_callback: not enough memory (realloc "
114 memcpy(blob->data + blob->size, contents, realsize);
115 blob->size += realsize;
116 blob->data[blob->size] = 0;
119 } /* size_t write_callback */
121 static int access_token(topic_t *t, char *buffer, size_t buffer_size) {
122 /* t->auth is NULL only if we're running on GCE. */
123 assert(t->auth || gce_check());
126 return oauth_access_token(t->auth, buffer, buffer_size);
128 return gce_access_token(t->email, buffer, buffer_size);
136 static api_error_t *parse_api_error(char const *body) {
138 yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
140 ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
144 api_error_t *err = calloc(1, sizeof(*err));
146 ERROR("cloud_pubsub plugin: calloc failed");
147 yajl_tree_free(root);
151 yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
154 err->code = YAJL_GET_INTEGER(code);
157 yajl_val message = yajl_tree_get(
158 root, (char const *[]){"error", "message", NULL}, yajl_t_string);
159 if (message != NULL) {
160 err->message = strdup(YAJL_GET_STRING(message));
166 static char *api_error_string(api_error_t *err, char *buffer,
167 size_t buffer_size) {
169 strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
170 } else if (err->message == NULL) {
171 snprintf(buffer, buffer_size, "API error %d", err->code);
173 snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
178 #define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
180 // do_post does a HTTP POST request, assuming a JSON payload and using OAuth
181 // authentication. Returns -1 on error and the HTTP status code otherwise.
182 // ret_content, if not NULL, will contain the server's response.
183 // If ret_content is provided and the server responds with a 4xx or 5xx error,
184 // an appropriate message will be logged.
185 static int do_post(topic_t *t, char const *url, void const *payload,
186 blob_t *ret_content) {
187 if (t->curl == NULL) {
188 t->curl = curl_easy_init();
189 if (t->curl == NULL) {
190 ERROR("cloud_pubsub plugin: curl_easy_init() failed");
194 curl_easy_setopt(t->curl, CURLOPT_ERRORBUFFER, t->curl_errbuf);
195 curl_easy_setopt(t->curl, CURLOPT_NOSIGNAL, 1L);
198 curl_easy_setopt(t->curl, CURLOPT_POST, 1L);
199 curl_easy_setopt(t->curl, CURLOPT_URL, url);
203 int status = access_token(t, tok, sizeof(tok));
205 ERROR("cloud_pubsub plugin: getting access token failed with status %d",
209 char auth_header[384];
210 snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", tok);
212 struct curl_slist *headers =
213 curl_slist_append(NULL, "Content-Type: application/json");
214 headers = curl_slist_append(headers, auth_header);
215 curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, headers);
217 curl_easy_setopt(t->curl, CURLOPT_POSTFIELDS, payload);
219 curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION,
220 ret_content ? write_callback : NULL);
221 curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, ret_content);
223 status = curl_easy_perform(t->curl);
225 /* clean up that has to happen in any case */
226 curl_slist_free_all(headers);
227 curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, NULL);
228 curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, NULL);
229 curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, NULL);
231 if (status != CURLE_OK) {
232 ERROR("cloud_pubsub plugin: POST %s failed: %s", url, t->curl_errbuf);
233 sfree(ret_content->data);
234 ret_content->size = 0;
239 curl_easy_getinfo(t->curl, CURLINFO_RESPONSE_CODE, &http_code);
241 if (ret_content != NULL) {
242 if ((status >= 400) && (status < 500)) {
243 ERROR("write_stackdriver plugin: POST %s: %s", url,
244 API_ERROR_STRING(parse_api_error(ret_content->data)));
245 } else if (status >= 500) {
246 WARNING("write_stackdriver plugin: POST %s: %s", url, ret_content->data);
250 return (int)http_code;
253 static void topic_free(topic_t *t) {
258 oauth_destroy(t->auth);
262 curl_easy_cleanup(t->curl);
263 strbuf_destroy(t->buffer);
265 } /* void topic_free */
267 static topic_t *topic_alloc(void) {
268 topic_t *t = calloc(1, sizeof(*t));
272 t->url = strdup(CLOUD_PUBSUB_URL);
273 t->max_messages = 1000;
274 pthread_mutex_init(&t->lock, /* attr = */ NULL);
277 } /* topic_t *topic_alloc */
279 /* pubsub_topic_publish calls a topic's "publish" method. */
280 static int pubsub_topic_publish(topic_t *t) {
282 snprintf(url, sizeof(url), "%s/projects/%s/topics/%s:publish", t->url,
283 t->project, t->name);
285 char *payload = t->buffer->buffer;
287 blob_t response = {0};
289 int status = do_post(t, url, payload, &response);
291 ERROR("cloud_pubsub plugin: POST %s failed", url);
294 sfree(response.data);
297 ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
303 } /* int pubsub_topic_publish */
305 static int base64_encode(char *buffer, size_t buffer_size) {
306 /* Set up the memory-base64 chain */
307 BIO *b64 = BIO_new(BIO_f_base64());
308 BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
309 b64 = BIO_push(b64, BIO_new(BIO_s_mem()));
311 /* Write data to the chain */
312 BIO_write(b64, buffer, strlen(buffer));
313 if (BIO_flush(b64) != 1) {
320 BIO_get_mem_ptr(b64, &ptr);
322 if (buffer_size <= ptr->length) {
327 /* Copy data to buffer. */
328 memcpy(buffer, ptr->data, ptr->length);
329 buffer[ptr->length] = 0;
333 } /* int base64_encode */
335 static int base64_decode(char const *in, char *buffer, size_t buffer_size) {
336 /* Set up the memory-base64 chain */
337 BIO *b64 = BIO_new(BIO_f_base64());
338 BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
339 b64 = BIO_push(b64, BIO_new_mem_buf(in, -1));
341 int status = BIO_read(b64, buffer, buffer_size);
346 if (status >= buffer_size) {
357 /* parse_pull parses the JSON returned by a "pull" request and returns the
358 * messages in a pull_response_t*. */
359 static pull_response_t *parse_pull(char const *body) {
360 char const *path[] = {"receivedMessages", NULL};
363 yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
365 ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
369 yajl_val messages = yajl_tree_get(root, path, yajl_t_array);
370 if ((messages == NULL) || !YAJL_IS_ARRAY(messages)) {
371 yajl_tree_free(root);
375 pull_response_t *res = calloc(1, sizeof(*res));
377 yajl_tree_free(root);
381 for (size_t i = 0; i < YAJL_GET_ARRAY(messages)->len; i++) {
382 yajl_val msg_var = YAJL_GET_ARRAY(messages)->values[i];
383 char const *ackid_path[] = {"ackId", NULL};
384 char const *data_path[] = {"message", "data", NULL};
385 char const *msg_id_path[] = {"message", "messageId", NULL};
387 yajl_val ackid_var = yajl_tree_get(msg_var, ackid_path, yajl_t_string);
388 yajl_val data_var = yajl_tree_get(msg_var, data_path, yajl_t_string);
389 yajl_val msg_id_var = yajl_tree_get(msg_var, msg_id_path, yajl_t_string);
391 if ((ackid_var == NULL) || (data_var == NULL) || (msg_id_var == NULL))
394 message_t *msg = realloc(res->messages,
395 sizeof(*res->messages) * (res->messages_num + 1));
399 msg = res->messages + res->messages_num;
401 msg->ack_id = strdup(YAJL_GET_STRING(ackid_var));
402 msg->message_id = strdup(YAJL_GET_STRING(msg_id_var));
403 msg->data = strdup(YAJL_GET_STRING(data_var));
408 yajl_tree_free(root);
410 } /* pull_response_t *parse_pull */
412 /* pubsub_topic_pull calls a subscription's "pull" method. */
413 static int pubsub_topic_pull(topic_t *t, blob_t *ret_content) {
415 snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:pull", t->url,
416 t->project, t->name);
419 snprintf(payload, sizeof(payload),
420 "{\"returnImmediately\":\"false\",\"maxMessages\":\"%zu\"}",
423 int status = do_post(t, url, payload, ret_content);
428 ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
431 sfree(ret_content->data);
432 ret_content->size = 0;
436 } /* char *pubsub_topic_pull */
438 static char *format_ack(char **ack_ids, size_t ack_ids_num) {
440 unsigned char const *out;
443 char key[] = "ackIds";
446 gen = yajl_gen_alloc(NULL);
450 yajl_gen_map_open(gen);
451 yajl_gen_string(gen, (unsigned char *)&key[0], (unsigned int)strlen(key));
452 yajl_gen_array_open(gen);
453 for (i = 0; i < ack_ids_num; i++)
454 yajl_gen_string(gen, (unsigned char *)ack_ids[i],
455 (unsigned int)strlen(ack_ids[i]));
456 yajl_gen_array_close(gen);
457 yajl_gen_map_close(gen);
459 if (yajl_gen_get_buf(gen, &out, &out_size) != yajl_gen_status_ok) {
464 ret = strdup((char const *)out);
467 } /* char *format_ack */
469 /* pubsub_topic_ack calls a subscription's "ack" method. */
470 static int pubsub_topic_ack(topic_t *t, char **ack_ids, size_t ack_ids_num) {
472 snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:acknowledge",
473 t->url, t->project, t->name);
475 char *payload = format_ack(ack_ids, ack_ids_num);
477 blob_t response = {0};
479 int status = do_post(t, url, payload, &response);
482 ERROR("cloud_pubsub plugin: POST %s failed", url);
485 sfree(response.data);
488 ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
494 } /* int pubsub_topic_ack */
496 /* topic_tryadd tries to add a valud list to the topic's buffer. If
497 * this fails, for example because there is not enough space, it may leave the
498 * buffer in a modified / inconsistent state. */
499 static int topic_tryadd(topic_t *t, data_set_t const *ds,
500 value_list_t const *vl) {
502 size_t json_fill = 0;
503 size_t json_free = sizeof(json);
506 status = format_json_initialize(json, &json_fill, &json_free);
510 status = format_json_value_list(json, &json_fill, &json_free, ds, vl,
515 status = format_json_finalize(json, &json_fill, &json_free);
519 status = base64_encode(json, sizeof(json));
523 if (t->buffer->used == 0) /* initialize */
524 status = strbuf_add(t->buffer, "{\"messages\":[{\"data\":\"");
526 status = strbuf_add(t->buffer, "\"},{\"data\":\"");
530 return strbuf_add(t->buffer, json);
531 } /* int topic_tryadd */
533 /* topic_add adds a value list to the topic's buffer in an
534 * all-or-nothing fashion, i.e. if there is not enough space, it will not
536 * the buffer and return ENOMEM. */
537 static int topic_add(topic_t *t, data_set_t const *ds, value_list_t const *vl) {
538 size_t saved_used = t->buffer->used;
539 size_t saved_free = t->buffer->free;
542 status = topic_tryadd(t, ds, vl);
543 if ((status == 0) && (t->buffer->free < 5))
549 t->buffer->used = saved_used;
550 t->buffer->free = saved_free;
551 t->buffer->buffer[t->buffer->used] = 0;
554 } /* int topic_add */
556 static int handle_message(message_t msg) {
557 size_t json_size = ((strlen(msg.data) * 3) / 4) + 1;
558 char json[json_size];
560 int status = base64_decode(msg.data, json, json_size);
562 ERROR("cloud_pubsub plugin: base64_decode failed: %d", status);
566 value_list_t **value_lists = NULL;
567 size_t value_lists_num = 0;
568 status = parse_json(json, &value_lists, &value_lists_num);
570 ERROR("cloud_pubsub plugin: parse_json() failed: %d", status);
574 for (size_t i = 0; i < value_lists_num; i++) {
575 value_list_t *vl = value_lists[i];
577 int status = plugin_dispatch_values(vl);
579 NOTICE("cloud_pubsub plugin: plugin_dispatch_values() failed: %d",
589 } /* int handle_message */
591 static int topic_pull(topic_t *t) {
593 int status = pubsub_topic_pull(t, &json);
598 pull_response_t *res = parse_pull(json.data);
603 char **ack_ids = NULL;
604 size_t ack_ids_num = 0;
605 for (size_t i = 0; i < res->messages_num; i++) {
606 message_t msg = res->messages[i];
608 int status = handle_message(msg);
610 strarray_add(&ack_ids, &ack_ids_num, msg.ack_id);
614 sfree(msg.message_id);
617 sfree(res->messages);
620 if (ack_ids_num > 0) {
621 int status = pubsub_topic_ack(t, ack_ids, ack_ids_num);
622 strarray_free(ack_ids, ack_ids_num);
627 } /* int topic_pull */
629 static void *receive_thread(void *arg) {
634 int status = topic_pull(t);
640 /* failure: use exponential backoff. */
642 delay = MS_TO_CDTIME_T(64);
646 if (delay > TIME_T_TO_CDTIME_T(10)) {
647 delay = TIME_T_TO_CDTIME_T(10);
650 "cloud_pubsub plugin: topic_pull() failed, sleeping for %.3f seconds.",
651 CDTIME_T_TO_DOUBLE(delay));
653 struct timespec ts = CDTIME_T_TO_TIMESPEC(delay);
654 while (ts.tv_sec != 0 || ts.tv_nsec != 0) {
655 int status = nanosleep(&ts, &ts);
663 } /* void *receive_thread */
665 /* topic_flush_locked flushes a topic's internal buffer. You must hold t->lock
666 * when calling this function. */
667 static int topic_flush_locked(topic_t *t) {
670 if (t->buffer->used == 0)
673 /* finalize the buffer */
674 status = strbuf_add(t->buffer, "\"}]}");
676 strbuf_reset(t->buffer);
680 status = pubsub_topic_publish(t);
681 strbuf_reset(t->buffer);
684 } /* int topic_flush_locked */
686 static int cps_flush(__attribute__((unused)) cdtime_t timeout,
687 __attribute__((unused)) char const *id, user_data_t *ud) {
688 topic_t *t = ud->data;
690 pthread_mutex_lock(&t->lock);
691 int status = topic_flush_locked(t);
692 pthread_mutex_unlock(&t->lock);
695 } /* int cps_flush */
697 static int cps_write(data_set_t const *ds, value_list_t const *vl,
699 topic_t *t = ud->data;
701 pthread_mutex_lock(&t->lock);
703 int status = topic_add(t, ds, vl);
704 if (status != ENOMEM) {
706 ERROR("cloud_pubsub plugin: topic_add (\"%s\") failed with status %d.",
708 pthread_mutex_unlock(&t->lock);
712 status = topic_flush_locked(t);
714 ERROR("cloud_pubsub plugin: topic_flush_locked (\"%s\") failed with status "
717 pthread_mutex_unlock(&t->lock);
721 status = topic_add(t, ds, vl);
723 ERROR("cloud_pubsub plugin: topic_add[retry] (\"%s\") failed with status "
727 pthread_mutex_unlock(&t->lock);
729 } /* int cps_write */
731 static int cps_init(void) {
732 /* Call this while collectd is still single-threaded to avoid
733 * initialization issues in libgcrypt. */
734 curl_global_init(CURL_GLOBAL_SSL);
736 ERR_load_crypto_strings();
741 static void check_scope(char const *email) /* {{{ */
743 char *scope = gce_scope(email);
745 WARNING("cloud_pubsub plugin: Unable to determine scope of this "
750 if (strstr(scope, CLOUD_PUBSUB_SCOPE) == NULL) {
753 /* Strip trailing newline characers for printing. */
754 scope_len = strlen(scope);
755 while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
756 scope[--scope_len] = 0;
758 WARNING("cloud_pubsub plugin: The determined scope of this instance "
759 "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
760 "to add this scope to the list of scopes passed to gcutil with "
761 "--service_account_scopes when creating the instance. "
762 "Alternatively, to use this plugin on an instance which does not "
763 "have this scope, use a Service Account.",
764 scope, CLOUD_PUBSUB_SCOPE);
768 } /* }}} void check_scope */
770 static int cps_config_topic(oconfig_item_t *ci) {
771 topic_t *t = topic_alloc();
775 if (cf_util_get_string(ci, &t->name) != 0) {
780 char *credential_file = NULL;
781 int conf_buffer_size = CLOUD_PUBSUB_BUFFER_SIZE;
783 for (int i = 0; i < ci->children_num; i++) {
784 oconfig_item_t *child = ci->children + i;
786 if (strcasecmp("Project", child->key) == 0)
787 cf_util_get_string(child, &t->project);
788 else if (strcasecmp("Email", child->key) == 0)
789 cf_util_get_string(child, &t->email);
790 else if (strcasecmp("Url", child->key) == 0)
791 cf_util_get_string(child, &t->url);
792 else if (strcasecmp("CredentialFile", child->key) == 0)
793 cf_util_get_string(child, &credential_file);
794 else if (strcasecmp("StoreRates", child->key) == 0)
795 cf_util_get_boolean(child, &t->store_rates);
796 else if (strcasecmp("BufferSize", child->key) == 0) {
797 cf_util_get_int(child, &conf_buffer_size);
798 if (conf_buffer_size < 1024) {
799 ERROR("cloud_pubsub plugin: BufferSize %d is too small. Using 1024 "
802 conf_buffer_size = 1024;
804 } else if (strcasecmp("MaxMessages", child->key) == 0) {
805 int max_messages = 0;
806 if (cf_util_get_int(child, &max_messages) != 0) {
809 if (max_messages < 1) {
810 ERROR("cloud_pubsub plugin: MaxMessages %d is too small.",
814 t->max_messages = (size_t)max_messages;
816 ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
820 /* Set up authentication */
821 /* Option 1: Credentials file given => use service account */
822 if (credential_file != NULL) {
824 oauth_create_google_file(credential_file, CLOUD_PUBSUB_SCOPE);
825 if (cfg.oauth == NULL) {
826 ERROR("cloud_pubsub plugin: oauth_create_google_file failed");
832 if (t->project == NULL) {
833 t->project = cfg.project_id;
834 INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
837 sfree(cfg.project_id);
841 /* Option 2: Look for credentials in well-known places */
842 if (t->auth == NULL) {
843 oauth_google_t cfg = oauth_create_google_default(CLOUD_PUBSUB_SCOPE);
846 if (t->project == NULL) {
847 t->project = cfg.project_id;
848 INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
851 sfree(cfg.project_id);
855 if ((t->auth != NULL) && (t->email != NULL)) {
856 NOTICE("cloud_pubsub plugin: A service account email was configured "
858 "not used for authentication because %s used instead.",
859 (credential_file != NULL) ? "a credential file was"
860 : "application default credentials were");
863 /* Option 3: Running on GCE => use metadata service */
864 if ((t->auth == NULL) && gce_check()) {
865 check_scope(t->email);
866 } else if (t->auth == NULL) {
867 ERROR("cloud_pubsub plugin: Unable to determine credentials. Please "
869 "specify the \"Credentials\" option or set up Application Default "
875 if ((t->project == NULL) && gce_check()) {
876 t->project = gce_project_id();
878 if (t->project == NULL) {
879 ERROR("cloud_pubsub plugin: Unable to determine the project number. "
880 "Please specify the \"Project\" option manually.");
885 if (strcasecmp("Publish", ci->key) == 0) {
886 t->buffer = strbuf_create((size_t)conf_buffer_size);
887 if (t->buffer == NULL) {
888 ERROR("cloud_pubsub plugin: strbuf_create failed.");
893 assert(t->name != NULL);
894 assert(t->project != NULL);
897 snprintf(cbname, sizeof(cbname), "cloud_pubsub/%s", t->name);
899 plugin_register_write(cbname, cps_write,
901 .data = t, .free_func = (void *)topic_free,
903 plugin_register_flush(cbname, cps_flush, &(user_data_t){.data = t});
904 } else { /* if (strcasecmp ("Subscribe", ci->key) == 0) */
907 /* TODO(octo): Store thread_id and kill threads in shutdown. */
908 int status = plugin_thread_create(&tid,
909 /* attrs = */ NULL, receive_thread,
911 /* name = */ "cloud_pubsub recv");
914 ERROR("cloud_pubsub plugin: pthread_create failed: %s",
915 sstrerror(errno, errbuf, sizeof(errbuf)));
922 } /* int cps_config_topic */
924 static int cps_config(oconfig_item_t *ci) {
925 for (int i = 0; i < ci->children_num; i++) {
926 oconfig_item_t *child = ci->children + i;
929 if (strcasecmp("Publish", child->key) == 0)
930 status = cps_config_topic(child);
931 else if (strcasecmp("Subscribe", child->key) == 0)
932 status = cps_config_topic(child);
934 ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
942 } /* int cps_config */
944 void module_register(void) {
945 plugin_register_complex_config("cloud_pubsub", cps_config);
946 plugin_register_init("cloud_pubsub", cps_init);