cloud_pubsub plugin: New plugin publishing to or subscribing from Google Cloud Pub...
[collectd.git] / src / cloud_pubsub.c
1 /**
2  * collectd - src/cloud_pubsub.c
3  * Copyright (C) 2018  Florian Forster
4  *
5  * MIT License
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  *
25  * Authors:
26  *   Florian Forster <octo at collectd.org>
27  **/
28
29 #include "collectd.h"
30
31 #include "common.h"
32 #include "plugin.h"
33 #include "utils_format_json.h"
34 #include "utils_gce.h"
35 #include "utils_oauth.h"
36 #include "utils_parse_json.h"
37 #include "utils_strbuf.h"
38
39 #include <curl/curl.h>
40 #include <openssl/bio.h>
41 #include <openssl/buffer.h>
42 #include <openssl/err.h>
43 #include <openssl/evp.h> /* for BIO_f_base64() */
44
45 #include <yajl/yajl_gen.h>
46 #include <yajl/yajl_tree.h>
47
48 #ifndef CLOUD_PUBSUB_BUFFER_SIZE
49 #define CLOUD_PUBSUB_BUFFER_SIZE 65536
50 #endif
51
52 #ifndef CLOUD_PUBSUB_URL
53 #define CLOUD_PUBSUB_URL "https://pubsub.googleapis.com/v1"
54 #endif
55
56 #ifndef CLOUD_PUBSUB_SCOPE
57 #define CLOUD_PUBSUB_SCOPE "https://www.googleapis.com/auth/pubsub"
58 #endif
59
60 struct topic_s {
61   char *name;
62
63   char *email;
64   char *project;
65   char *url;
66   _Bool store_rates;
67   size_t max_messages;
68
69   oauth_t *auth;
70   CURL *curl;
71   char curl_errbuf[256];
72   strbuf_t *buffer;
73   pthread_mutex_t lock;
74 };
75 typedef struct topic_s topic_t;
76
77 struct message_s {
78   char *ack_id;
79   char *data;
80   char *message_id;
81 };
82 typedef struct message_s message_t;
83
84 struct pull_response_s {
85   message_t *messages;
86   size_t messages_num;
87 };
88 typedef struct pull_response_s pull_response_t;
89
90 struct blob_s {
91   char *data;
92   size_t size;
93 };
94 typedef struct blob_s blob_t;
95
96 static size_t write_callback(void *contents, size_t size, size_t nmemb,
97                              void *ud) {
98   size_t realsize = size * nmemb;
99   blob_t *blob = ud;
100
101   if ((0x7FFFFFF0 < blob->size) || (0x7FFFFFF0 - blob->size < realsize)) {
102     ERROR("cloud_pubsub plugin: write_callback: integer overflow");
103     return 0;
104   }
105
106   blob->data = realloc(blob->data, blob->size + realsize + 1);
107   if (blob->data == NULL) {
108     /* out of memory! */
109     ERROR("cloud_pubsub plugin: write_callback: not enough memory (realloc "
110           "returned NULL)");
111     return 0;
112   }
113
114   memcpy(blob->data + blob->size, contents, realsize);
115   blob->size += realsize;
116   blob->data[blob->size] = 0;
117
118   return realsize;
119 } /* size_t write_callback */
120
121 static int access_token(topic_t *t, char *buffer, size_t buffer_size) {
122   /* t->auth is NULL only if we're running on GCE. */
123   assert(t->auth || gce_check());
124
125   if (t->auth != NULL)
126     return oauth_access_token(t->auth, buffer, buffer_size);
127
128   return gce_access_token(t->email, buffer, buffer_size);
129 }
130
131 typedef struct {
132   int code;
133   char *message;
134 } api_error_t;
135
136 static api_error_t *parse_api_error(char const *body) {
137   char errbuf[1024];
138   yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
139   if (root == NULL) {
140     ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
141     return NULL;
142   }
143
144   api_error_t *err = calloc(1, sizeof(*err));
145   if (err == NULL) {
146     ERROR("cloud_pubsub plugin: calloc failed");
147     yajl_tree_free(root);
148     return NULL;
149   }
150
151   yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
152                                 yajl_t_number);
153   if (code != NULL) {
154     err->code = YAJL_GET_INTEGER(code);
155   }
156
157   yajl_val message = yajl_tree_get(
158       root, (char const *[]){"error", "message", NULL}, yajl_t_string);
159   if (message != NULL) {
160     err->message = strdup(YAJL_GET_STRING(message));
161   }
162
163   return err;
164 }
165
166 static char *api_error_string(api_error_t *err, char *buffer,
167                               size_t buffer_size) {
168   if (err == NULL) {
169     strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
170   } else if (err->message == NULL) {
171     snprintf(buffer, buffer_size, "API error %d", err->code);
172   } else {
173     snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
174   }
175
176   return buffer;
177 }
178 #define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
179
180 // do_post does a HTTP POST request, assuming a JSON payload and using OAuth
181 // authentication. Returns -1 on error and the HTTP status code otherwise.
182 // ret_content, if not NULL, will contain the server's response.
183 // If ret_content is provided and the server responds with a 4xx or 5xx error,
184 // an appropriate message will be logged.
185 static int do_post(topic_t *t, char const *url, void const *payload,
186                    blob_t *ret_content) {
187   if (t->curl == NULL) {
188     t->curl = curl_easy_init();
189     if (t->curl == NULL) {
190       ERROR("cloud_pubsub plugin: curl_easy_init() failed");
191       return -1;
192     }
193
194     curl_easy_setopt(t->curl, CURLOPT_ERRORBUFFER, t->curl_errbuf);
195     curl_easy_setopt(t->curl, CURLOPT_NOSIGNAL, 1L);
196   }
197
198   curl_easy_setopt(t->curl, CURLOPT_POST, 1L);
199   curl_easy_setopt(t->curl, CURLOPT_URL, url);
200
201   /* header */
202   char tok[256];
203   int status = access_token(t, tok, sizeof(tok));
204   if (status != 0) {
205     ERROR("cloud_pubsub plugin: getting access token failed with status %d",
206           status);
207     return -1;
208   }
209   char auth_header[384];
210   snprintf(auth_header, sizeof(auth_header), "Authorization: Bearer %s", tok);
211
212   struct curl_slist *headers =
213       curl_slist_append(NULL, "Content-Type: application/json");
214   headers = curl_slist_append(headers, auth_header);
215   curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, headers);
216
217   curl_easy_setopt(t->curl, CURLOPT_POSTFIELDS, payload);
218
219   curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION,
220                    ret_content ? write_callback : NULL);
221   curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, ret_content);
222
223   status = curl_easy_perform(t->curl);
224
225   /* clean up that has to happen in any case */
226   curl_slist_free_all(headers);
227   curl_easy_setopt(t->curl, CURLOPT_HTTPHEADER, NULL);
228   curl_easy_setopt(t->curl, CURLOPT_WRITEFUNCTION, NULL);
229   curl_easy_setopt(t->curl, CURLOPT_WRITEDATA, NULL);
230
231   if (status != CURLE_OK) {
232     ERROR("cloud_pubsub plugin: POST %s failed: %s", url, t->curl_errbuf);
233     sfree(ret_content->data);
234     ret_content->size = 0;
235     return -1;
236   }
237
238   long http_code = 0;
239   curl_easy_getinfo(t->curl, CURLINFO_RESPONSE_CODE, &http_code);
240
241   if (ret_content != NULL) {
242     if ((status >= 400) && (status < 500)) {
243       ERROR("write_stackdriver plugin: POST %s: %s", url,
244             API_ERROR_STRING(parse_api_error(ret_content->data)));
245     } else if (status >= 500) {
246       WARNING("write_stackdriver plugin: POST %s: %s", url, ret_content->data);
247     }
248   }
249
250   return (int)http_code;
251 }
252
253 static void topic_free(topic_t *t) {
254   if (t == NULL)
255     return;
256
257   sfree(t->name);
258   oauth_destroy(t->auth);
259   sfree(t->email);
260   sfree(t->project);
261   if (t->curl != NULL)
262     curl_easy_cleanup(t->curl);
263   strbuf_destroy(t->buffer);
264   sfree(t);
265 } /* void topic_free */
266
267 static topic_t *topic_alloc(void) {
268   topic_t *t = calloc(1, sizeof(*t));
269   if (t == NULL)
270     return NULL;
271
272   t->url = strdup(CLOUD_PUBSUB_URL);
273   t->max_messages = 1000;
274   pthread_mutex_init(&t->lock, /* attr = */ NULL);
275
276   return t;
277 } /* topic_t *topic_alloc */
278
279 /* pubsub_topic_publish calls a topic's "publish" method. */
280 static int pubsub_topic_publish(topic_t *t) {
281   char url[1024];
282   snprintf(url, sizeof(url), "%s/projects/%s/topics/%s:publish", t->url,
283            t->project, t->name);
284
285   char *payload = t->buffer->buffer;
286
287   blob_t response = {0};
288
289   int status = do_post(t, url, payload, &response);
290   if (status == -1) {
291     ERROR("cloud_pubsub plugin: POST %s failed", url);
292     return -1;
293   }
294   sfree(response.data);
295
296   if (status != 200) {
297     ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
298           "want 200",
299           url, status);
300     return -1;
301   }
302   return 0;
303 } /* int pubsub_topic_publish */
304
305 static int base64_encode(char *buffer, size_t buffer_size) {
306   /* Set up the memory-base64 chain */
307   BIO *b64 = BIO_new(BIO_f_base64());
308   BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
309   b64 = BIO_push(b64, BIO_new(BIO_s_mem()));
310
311   /* Write data to the chain */
312   BIO_write(b64, buffer, strlen(buffer));
313   if (BIO_flush(b64) != 1) {
314     BIO_free_all(b64);
315     return (-1);
316   }
317
318   /* Never fails */
319   BUF_MEM *ptr;
320   BIO_get_mem_ptr(b64, &ptr);
321
322   if (buffer_size <= ptr->length) {
323     BIO_free_all(b64);
324     return (ENOMEM);
325   }
326
327   /* Copy data to buffer. */
328   memcpy(buffer, ptr->data, ptr->length);
329   buffer[ptr->length] = 0;
330
331   BIO_free_all(b64);
332   return 0;
333 } /* int base64_encode */
334
335 static int base64_decode(char const *in, char *buffer, size_t buffer_size) {
336   /* Set up the memory-base64 chain */
337   BIO *b64 = BIO_new(BIO_f_base64());
338   BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
339   b64 = BIO_push(b64, BIO_new_mem_buf(in, -1));
340
341   int status = BIO_read(b64, buffer, buffer_size);
342   if (status < 0) {
343     BIO_free_all(b64);
344     return status;
345   }
346   if (status >= buffer_size) {
347     BIO_free_all(b64);
348     return ENOMEM;
349   }
350
351   buffer[status] = 0;
352
353   BIO_free_all(b64);
354   return 0;
355 }
356
357 /* parse_pull parses the JSON returned by a "pull" request and returns the
358  * messages in a pull_response_t*. */
359 static pull_response_t *parse_pull(char const *body) {
360   char const *path[] = {"receivedMessages", NULL};
361
362   char errbuf[1024];
363   yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
364   if (root == NULL) {
365     ERROR("cloud_pubsub plugin: yajl_tree_parse failed: %s", errbuf);
366     return NULL;
367   }
368
369   yajl_val messages = yajl_tree_get(root, path, yajl_t_array);
370   if ((messages == NULL) || !YAJL_IS_ARRAY(messages)) {
371     yajl_tree_free(root);
372     return NULL;
373   }
374
375   pull_response_t *res = calloc(1, sizeof(*res));
376   if (res == NULL) {
377     yajl_tree_free(root);
378     return NULL;
379   }
380
381   for (size_t i = 0; i < YAJL_GET_ARRAY(messages)->len; i++) {
382     yajl_val msg_var = YAJL_GET_ARRAY(messages)->values[i];
383     char const *ackid_path[] = {"ackId", NULL};
384     char const *data_path[] = {"message", "data", NULL};
385     char const *msg_id_path[] = {"message", "messageId", NULL};
386
387     yajl_val ackid_var = yajl_tree_get(msg_var, ackid_path, yajl_t_string);
388     yajl_val data_var = yajl_tree_get(msg_var, data_path, yajl_t_string);
389     yajl_val msg_id_var = yajl_tree_get(msg_var, msg_id_path, yajl_t_string);
390
391     if ((ackid_var == NULL) || (data_var == NULL) || (msg_id_var == NULL))
392       continue;
393
394     message_t *msg = realloc(res->messages,
395                              sizeof(*res->messages) * (res->messages_num + 1));
396     if (msg == NULL)
397       continue;
398     res->messages = msg;
399     msg = res->messages + res->messages_num;
400
401     msg->ack_id = strdup(YAJL_GET_STRING(ackid_var));
402     msg->message_id = strdup(YAJL_GET_STRING(msg_id_var));
403     msg->data = strdup(YAJL_GET_STRING(data_var));
404
405     res->messages_num++;
406   }
407
408   yajl_tree_free(root);
409   return res;
410 } /* pull_response_t *parse_pull */
411
412 /* pubsub_topic_pull calls a subscription's "pull" method. */
413 static int pubsub_topic_pull(topic_t *t, blob_t *ret_content) {
414   char url[1024];
415   snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:pull", t->url,
416            t->project, t->name);
417
418   char payload[128];
419   snprintf(payload, sizeof(payload),
420            "{\"returnImmediately\":\"false\",\"maxMessages\":\"%zu\"}",
421            t->max_messages);
422
423   int status = do_post(t, url, payload, ret_content);
424   if (status == -1)
425     return -1;
426
427   if (status != 200) {
428     ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
429           "want 200",
430           url, status);
431     sfree(ret_content->data);
432     ret_content->size = 0;
433     return -1;
434   }
435   return 0;
436 } /* char *pubsub_topic_pull */
437
438 static char *format_ack(char **ack_ids, size_t ack_ids_num) {
439   yajl_gen gen;
440   unsigned char const *out;
441   size_t out_size;
442   char *ret;
443   char key[] = "ackIds";
444   size_t i;
445
446   gen = yajl_gen_alloc(NULL);
447   if (gen == NULL)
448     return NULL;
449
450   yajl_gen_map_open(gen);
451   yajl_gen_string(gen, (unsigned char *)&key[0], (unsigned int)strlen(key));
452   yajl_gen_array_open(gen);
453   for (i = 0; i < ack_ids_num; i++)
454     yajl_gen_string(gen, (unsigned char *)ack_ids[i],
455                     (unsigned int)strlen(ack_ids[i]));
456   yajl_gen_array_close(gen);
457   yajl_gen_map_close(gen);
458
459   if (yajl_gen_get_buf(gen, &out, &out_size) != yajl_gen_status_ok) {
460     yajl_gen_free(gen);
461     return NULL;
462   }
463
464   ret = strdup((char const *)out);
465   yajl_gen_free(gen);
466   return ret;
467 } /* char *format_ack */
468
469 /* pubsub_topic_ack calls a subscription's "ack" method. */
470 static int pubsub_topic_ack(topic_t *t, char **ack_ids, size_t ack_ids_num) {
471   char url[1024];
472   snprintf(url, sizeof(url), "%s/projects/%s/subscriptions/%s:acknowledge",
473            t->url, t->project, t->name);
474
475   char *payload = format_ack(ack_ids, ack_ids_num);
476
477   blob_t response = {0};
478
479   int status = do_post(t, url, payload, &response);
480   sfree(payload);
481   if (status == -1) {
482     ERROR("cloud_pubsub plugin: POST %s failed", url);
483     return -1;
484   }
485   sfree(response.data);
486
487   if (status != 200) {
488     ERROR("cloud_pubsub plugin: POST %s: unexpected response code: got %d, "
489           "want 200",
490           url, status);
491     return -1;
492   }
493   return 0;
494 } /* int pubsub_topic_ack */
495
496 /* topic_tryadd tries to add a valud list to the topic's buffer. If
497  * this fails, for example because there is not enough space, it may leave the
498  * buffer in a modified / inconsistent state. */
499 static int topic_tryadd(topic_t *t, data_set_t const *ds,
500                         value_list_t const *vl) {
501   char json[1024];
502   size_t json_fill = 0;
503   size_t json_free = sizeof(json);
504   int status;
505
506   status = format_json_initialize(json, &json_fill, &json_free);
507   if (status != 0)
508     return status;
509
510   status = format_json_value_list(json, &json_fill, &json_free, ds, vl,
511                                   t->store_rates);
512   if (status != 0)
513     return status;
514
515   status = format_json_finalize(json, &json_fill, &json_free);
516   if (status != 0)
517     return status;
518
519   status = base64_encode(json, sizeof(json));
520   if (status != 0)
521     return -1;
522
523   if (t->buffer->used == 0) /* initialize */
524     status = strbuf_add(t->buffer, "{\"messages\":[{\"data\":\"");
525   else
526     status = strbuf_add(t->buffer, "\"},{\"data\":\"");
527   if (status != 0)
528     return status;
529
530   return strbuf_add(t->buffer, json);
531 } /* int topic_tryadd */
532
533 /* topic_add adds a value list to the topic's buffer in an
534  * all-or-nothing fashion, i.e. if there is not enough space, it will not
535  * modify
536  * the buffer and return ENOMEM. */
537 static int topic_add(topic_t *t, data_set_t const *ds, value_list_t const *vl) {
538   size_t saved_used = t->buffer->used;
539   size_t saved_free = t->buffer->free;
540   int status;
541
542   status = topic_tryadd(t, ds, vl);
543   if ((status == 0) && (t->buffer->free < 5))
544     status = ENOMEM;
545
546   if (status == 0)
547     return 0;
548
549   t->buffer->used = saved_used;
550   t->buffer->free = saved_free;
551   t->buffer->buffer[t->buffer->used] = 0;
552
553   return status;
554 } /* int topic_add */
555
556 static int handle_message(message_t msg) {
557   size_t json_size = ((strlen(msg.data) * 3) / 4) + 1;
558   char json[json_size];
559
560   int status = base64_decode(msg.data, json, json_size);
561   if (status != 0) {
562     ERROR("cloud_pubsub plugin: base64_decode failed: %d", status);
563     return status;
564   }
565
566   value_list_t **value_lists = NULL;
567   size_t value_lists_num = 0;
568   status = parse_json(json, &value_lists, &value_lists_num);
569   if (status != 0) {
570     ERROR("cloud_pubsub plugin: parse_json() failed: %d", status);
571     return status;
572   }
573
574   for (size_t i = 0; i < value_lists_num; i++) {
575     value_list_t *vl = value_lists[i];
576
577     int status = plugin_dispatch_values(vl);
578     if (status != 0) {
579       NOTICE("cloud_pubsub plugin: plugin_dispatch_values() failed: %d",
580              status);
581     }
582
583     sfree(vl->values);
584     sfree(vl);
585   }
586
587   sfree(value_lists);
588   return 0;
589 } /* int handle_message */
590
591 static int topic_pull(topic_t *t) {
592   blob_t json = {0};
593   int status = pubsub_topic_pull(t, &json);
594   if (status != 0) {
595     return status;
596   }
597
598   pull_response_t *res = parse_pull(json.data);
599   sfree(json.data);
600   if (res == NULL)
601     return -1;
602
603   char **ack_ids = NULL;
604   size_t ack_ids_num = 0;
605   for (size_t i = 0; i < res->messages_num; i++) {
606     message_t msg = res->messages[i];
607
608     int status = handle_message(msg);
609     if (status == 0)
610       strarray_add(&ack_ids, &ack_ids_num, msg.ack_id);
611
612     sfree(msg.ack_id);
613     sfree(msg.data);
614     sfree(msg.message_id);
615   }
616
617   sfree(res->messages);
618   sfree(res);
619
620   if (ack_ids_num > 0) {
621     int status = pubsub_topic_ack(t, ack_ids, ack_ids_num);
622     strarray_free(ack_ids, ack_ids_num);
623     return status;
624   }
625
626   return 0;
627 } /* int topic_pull */
628
629 static void *receive_thread(void *arg) {
630   topic_t *t = arg;
631   cdtime_t delay = 0;
632
633   while (42) {
634     int status = topic_pull(t);
635     if (status == 0) {
636       delay = 0;
637       continue;
638     }
639
640     /* failure: use exponential backoff. */
641     if (delay == 0) {
642       delay = MS_TO_CDTIME_T(64);
643     } else {
644       delay = 2 * delay;
645     }
646     if (delay > TIME_T_TO_CDTIME_T(10)) {
647       delay = TIME_T_TO_CDTIME_T(10);
648     }
649     NOTICE(
650         "cloud_pubsub plugin: topic_pull() failed, sleeping for %.3f seconds.",
651         CDTIME_T_TO_DOUBLE(delay));
652
653     struct timespec ts = CDTIME_T_TO_TIMESPEC(delay);
654     while (ts.tv_sec != 0 || ts.tv_nsec != 0) {
655       int status = nanosleep(&ts, &ts);
656       if (status == 0) {
657         break;
658       }
659     }
660   }
661
662   return (NULL);
663 } /* void *receive_thread */
664
665 /* topic_flush_locked flushes a topic's internal buffer. You must hold t->lock
666  * when calling this function. */
667 static int topic_flush_locked(topic_t *t) {
668   int status;
669
670   if (t->buffer->used == 0)
671     return 0;
672
673   /* finalize the buffer */
674   status = strbuf_add(t->buffer, "\"}]}");
675   if (status != 0) {
676     strbuf_reset(t->buffer);
677     return status;
678   }
679
680   status = pubsub_topic_publish(t);
681   strbuf_reset(t->buffer);
682
683   return status;
684 } /* int topic_flush_locked */
685
686 static int cps_flush(__attribute__((unused)) cdtime_t timeout,
687                      __attribute__((unused)) char const *id, user_data_t *ud) {
688   topic_t *t = ud->data;
689
690   pthread_mutex_lock(&t->lock);
691   int status = topic_flush_locked(t);
692   pthread_mutex_unlock(&t->lock);
693
694   return status;
695 } /* int cps_flush */
696
697 static int cps_write(data_set_t const *ds, value_list_t const *vl,
698                      user_data_t *ud) {
699   topic_t *t = ud->data;
700
701   pthread_mutex_lock(&t->lock);
702
703   int status = topic_add(t, ds, vl);
704   if (status != ENOMEM) {
705     if (status != 0)
706       ERROR("cloud_pubsub plugin: topic_add (\"%s\") failed with status %d.",
707             t->name, status);
708     pthread_mutex_unlock(&t->lock);
709     return status;
710   }
711
712   status = topic_flush_locked(t);
713   if (status != 0) {
714     ERROR("cloud_pubsub plugin: topic_flush_locked (\"%s\") failed with status "
715           "%d.",
716           t->name, status);
717     pthread_mutex_unlock(&t->lock);
718     return status;
719   }
720
721   status = topic_add(t, ds, vl);
722   if (status != 0)
723     ERROR("cloud_pubsub plugin: topic_add[retry] (\"%s\") failed with status "
724           "%d.",
725           t->name, status);
726
727   pthread_mutex_unlock(&t->lock);
728   return status;
729 } /* int cps_write */
730
731 static int cps_init(void) {
732   /* Call this while collectd is still single-threaded to avoid
733    * initialization issues in libgcrypt. */
734   curl_global_init(CURL_GLOBAL_SSL);
735
736   ERR_load_crypto_strings();
737
738   return 0;
739 } /* int cps_init */
740
741 static void check_scope(char const *email) /* {{{ */
742 {
743   char *scope = gce_scope(email);
744   if (scope == NULL) {
745     WARNING("cloud_pubsub plugin: Unable to determine scope of this "
746             "instance.");
747     return;
748   }
749
750   if (strstr(scope, CLOUD_PUBSUB_SCOPE) == NULL) {
751     size_t scope_len;
752
753     /* Strip trailing newline characers for printing. */
754     scope_len = strlen(scope);
755     while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
756       scope[--scope_len] = 0;
757
758     WARNING("cloud_pubsub plugin: The determined scope of this instance "
759             "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
760             "to add this scope to the list of scopes passed to gcutil with "
761             "--service_account_scopes when creating the instance. "
762             "Alternatively, to use this plugin on an instance which does not "
763             "have this scope, use a Service Account.",
764             scope, CLOUD_PUBSUB_SCOPE);
765   }
766
767   sfree(scope);
768 } /* }}} void check_scope */
769
770 static int cps_config_topic(oconfig_item_t *ci) {
771   topic_t *t = topic_alloc();
772   if (t == NULL)
773     return ENOMEM;
774
775   if (cf_util_get_string(ci, &t->name) != 0) {
776     topic_free(t);
777     return -1;
778   }
779
780   char *credential_file = NULL;
781   int conf_buffer_size = CLOUD_PUBSUB_BUFFER_SIZE;
782
783   for (int i = 0; i < ci->children_num; i++) {
784     oconfig_item_t *child = ci->children + i;
785
786     if (strcasecmp("Project", child->key) == 0)
787       cf_util_get_string(child, &t->project);
788     else if (strcasecmp("Email", child->key) == 0)
789       cf_util_get_string(child, &t->email);
790     else if (strcasecmp("Url", child->key) == 0)
791       cf_util_get_string(child, &t->url);
792     else if (strcasecmp("CredentialFile", child->key) == 0)
793       cf_util_get_string(child, &credential_file);
794     else if (strcasecmp("StoreRates", child->key) == 0)
795       cf_util_get_boolean(child, &t->store_rates);
796     else if (strcasecmp("BufferSize", child->key) == 0) {
797       cf_util_get_int(child, &conf_buffer_size);
798       if (conf_buffer_size < 1024) {
799         ERROR("cloud_pubsub plugin: BufferSize %d is too small. Using 1024 "
800               "byte.",
801               conf_buffer_size);
802         conf_buffer_size = 1024;
803       }
804     } else if (strcasecmp("MaxMessages", child->key) == 0) {
805       int max_messages = 0;
806       if (cf_util_get_int(child, &max_messages) != 0) {
807         continue;
808       }
809       if (max_messages < 1) {
810         ERROR("cloud_pubsub plugin: MaxMessages %d is too small.",
811               max_messages);
812         continue;
813       }
814       t->max_messages = (size_t)max_messages;
815     } else
816       ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
817             child->key);
818   }
819
820   /* Set up authentication */
821   /* Option 1: Credentials file given => use service account */
822   if (credential_file != NULL) {
823     oauth_google_t cfg =
824         oauth_create_google_file(credential_file, CLOUD_PUBSUB_SCOPE);
825     if (cfg.oauth == NULL) {
826       ERROR("cloud_pubsub plugin: oauth_create_google_file failed");
827       topic_free(t);
828       return EINVAL;
829     }
830     t->auth = cfg.oauth;
831
832     if (t->project == NULL) {
833       t->project = cfg.project_id;
834       INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
835            t->project);
836     } else {
837       sfree(cfg.project_id);
838     }
839   }
840
841   /* Option 2: Look for credentials in well-known places */
842   if (t->auth == NULL) {
843     oauth_google_t cfg = oauth_create_google_default(CLOUD_PUBSUB_SCOPE);
844     t->auth = cfg.oauth;
845
846     if (t->project == NULL) {
847       t->project = cfg.project_id;
848       INFO("cloud_pubsub plugin: Automatically detected project ID: \"%s\"",
849            t->project);
850     } else {
851       sfree(cfg.project_id);
852     }
853   }
854
855   if ((t->auth != NULL) && (t->email != NULL)) {
856     NOTICE("cloud_pubsub plugin: A service account email was configured "
857            "but is "
858            "not used for authentication because %s used instead.",
859            (credential_file != NULL) ? "a credential file was"
860                                      : "application default credentials were");
861   }
862
863   /* Option 3: Running on GCE => use metadata service */
864   if ((t->auth == NULL) && gce_check()) {
865     check_scope(t->email);
866   } else if (t->auth == NULL) {
867     ERROR("cloud_pubsub plugin: Unable to determine credentials. Please "
868           "either "
869           "specify the \"Credentials\" option or set up Application Default "
870           "Credentials.");
871     topic_free(t);
872     return EINVAL;
873   }
874
875   if ((t->project == NULL) && gce_check()) {
876     t->project = gce_project_id();
877   }
878   if (t->project == NULL) {
879     ERROR("cloud_pubsub plugin: Unable to determine the project number. "
880           "Please specify the \"Project\" option manually.");
881     topic_free(t);
882     return EINVAL;
883   }
884
885   if (strcasecmp("Publish", ci->key) == 0) {
886     t->buffer = strbuf_create((size_t)conf_buffer_size);
887     if (t->buffer == NULL) {
888       ERROR("cloud_pubsub plugin: strbuf_create failed.");
889       topic_free(t);
890       return -1;
891     }
892
893     assert(t->name != NULL);
894     assert(t->project != NULL);
895
896     char cbname[128];
897     snprintf(cbname, sizeof(cbname), "cloud_pubsub/%s", t->name);
898
899     plugin_register_write(cbname, cps_write,
900                           &(user_data_t){
901                               .data = t, .free_func = (void *)topic_free,
902                           });
903     plugin_register_flush(cbname, cps_flush, &(user_data_t){.data = t});
904   } else { /* if (strcasecmp ("Subscribe", ci->key) == 0) */
905     pthread_t tid;
906
907     /* TODO(octo): Store thread_id and kill threads in shutdown. */
908     int status = plugin_thread_create(&tid,
909                                       /* attrs = */ NULL, receive_thread,
910                                       /* arg = */ t,
911                                       /* name = */ "cloud_pubsub recv");
912     if (status != 0) {
913       char errbuf[1024];
914       ERROR("cloud_pubsub plugin: pthread_create failed: %s",
915             sstrerror(errno, errbuf, sizeof(errbuf)));
916       topic_free(t);
917       return -1;
918     }
919   }
920
921   return 0;
922 } /* int cps_config_topic */
923
924 static int cps_config(oconfig_item_t *ci) {
925   for (int i = 0; i < ci->children_num; i++) {
926     oconfig_item_t *child = ci->children + i;
927     int status = -1;
928
929     if (strcasecmp("Publish", child->key) == 0)
930       status = cps_config_topic(child);
931     else if (strcasecmp("Subscribe", child->key) == 0)
932       status = cps_config_topic(child);
933     else
934       ERROR("cloud_pubsub plugin: Unknown configuration option: \"%s\"",
935             child->key);
936
937     if (status != 0)
938       return status;
939   }
940
941   return 0;
942 } /* int cps_config */
943
944 void module_register(void) {
945   plugin_register_complex_config("cloud_pubsub", cps_config);
946   plugin_register_init("cloud_pubsub", cps_init);
947 }