2 * collectd - src/write_stackdriver.c
5 * Copyright (C) 2017 Florian Forster
7 * Permission to use, copy, modify, and/or distribute this software for any
8 * purpose with or without fee is hereby granted, provided that the above
9 * copyright notice and this permission notice appear in all copies.
11 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Florian Forster <octo at collectd.org>
26 #include "configfile.h"
28 #include "utils_format_stackdriver.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
32 #include <curl/curl.h>
34 #include <yajl/yajl_tree.h>
40 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
43 #ifndef MONITORING_SCOPE
44 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
47 struct wg_callback_s {
52 sd_resource_t *resource;
56 sd_output_t *formatter;
58 char curl_errbuf[CURL_ERROR_SIZE];
60 size_t timeseries_count;
61 cdtime_t send_buffer_init_time;
65 typedef struct wg_callback_s wg_callback_t;
71 typedef struct wg_memory_s wg_memory_t;
73 static size_t wg_write_memory_cb(void *contents, size_t size,
74 size_t nmemb, /* {{{ */
76 size_t realsize = size * nmemb;
77 wg_memory_t *mem = (wg_memory_t *)userp;
79 if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
80 ERROR("integer overflow");
84 mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
85 if (mem->memory == NULL) {
87 ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
91 memcpy(&(mem->memory[mem->size]), contents, realsize);
92 mem->size += realsize;
93 mem->memory[mem->size] = 0;
95 } /* }}} size_t wg_write_memory_cb */
97 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
99 char access_token[256];
100 char authorization_header[256];
102 assert((cb->auth != NULL) || gce_check());
103 if (cb->auth != NULL)
104 status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
106 status = gce_access_token(cb->email, access_token, sizeof(access_token));
108 ERROR("write_stackdriver plugin: Failed to get access token");
112 status = snprintf(authorization_header, sizeof(authorization_header),
113 "Authorization: Bearer %s", access_token);
114 if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
117 return strdup(authorization_header);
118 } /* }}} char *wg_get_authorization_header */
125 static api_error_t *parse_api_error(char const *body) {
127 yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
129 ERROR("write_stackdriver plugin: yajl_tree_parse failed: %s", errbuf);
133 api_error_t *err = calloc(1, sizeof(*err));
135 ERROR("write_stackdriver plugin: calloc failed");
136 yajl_tree_free(root);
140 yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
143 err->code = YAJL_GET_INTEGER(code);
146 yajl_val message = yajl_tree_get(
147 root, (char const *[]){"error", "message", NULL}, yajl_t_string);
148 if (message != NULL) {
149 err->message = strdup(YAJL_GET_STRING(message));
155 static char *api_error_string(api_error_t *err, char *buffer,
156 size_t buffer_size) {
158 strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
159 } else if (err->message == NULL) {
160 snprintf(buffer, buffer_size, "API error %d", err->code);
162 snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
167 #define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
169 // do_post does a HTTP POST request, assuming a JSON payload and using OAuth
170 // authentication. Returns -1 on error and the HTTP status code otherwise.
171 // ret_content, if not NULL, will contain the server's response.
172 // If ret_content is provided and the server responds with a 4xx or 5xx error,
173 // an appropriate message will be logged.
174 static int do_post(wg_callback_t *cb, char const *url, void const *payload,
175 wg_memory_t *ret_content) {
176 if (cb->curl == NULL) {
177 cb->curl = curl_easy_init();
178 if (cb->curl == NULL) {
179 ERROR("write_stackdriver plugin: curl_easy_init() failed");
183 curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
184 curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
187 curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
188 curl_easy_setopt(cb->curl, CURLOPT_URL, url);
191 char *auth_header = wg_get_authorization_header(cb);
192 if (auth_header == NULL) {
193 ERROR("write_stackdriver plugin: getting access token failed with");
197 struct curl_slist *headers =
198 curl_slist_append(NULL, "Content-Type: application/json");
199 headers = curl_slist_append(headers, auth_header);
200 curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
202 curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
204 curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION,
205 ret_content ? wg_write_memory_cb : NULL);
206 curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, ret_content);
208 int status = curl_easy_perform(cb->curl);
210 /* clean up that has to happen in any case */
211 curl_slist_free_all(headers);
213 curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, NULL);
214 curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, NULL);
215 curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, NULL);
217 if (status != CURLE_OK) {
218 ERROR("write_stackdriver plugin: POST %s failed: %s", url, cb->curl_errbuf);
219 sfree(ret_content->memory);
220 ret_content->size = 0;
225 curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
227 if (ret_content != NULL) {
228 if ((status >= 400) && (status < 500)) {
229 ERROR("write_stackdriver plugin: POST %s: %s", url,
230 API_ERROR_STRING(parse_api_error(ret_content->memory)));
231 } else if (status >= 500) {
232 WARNING("write_stackdriver plugin: POST %s: %s", url,
233 ret_content->memory);
237 return (int)http_code;
240 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
241 char const *payload) {
243 snprintf(url, sizeof(url), "%s/projects/%s/metricDescriptors", cb->url,
245 wg_memory_t response = {0};
247 int status = do_post(cb, url, payload, &response);
249 ERROR("write_stackdriver plugin: POST %s failed", url);
252 sfree(response.memory);
255 ERROR("write_stackdriver plugin: POST %s: unexpected response code: got "
261 } /* int wg_call_metricdescriptor_create */
263 static int wg_call_timeseries_write(wg_callback_t *cb, char const *payload) {
265 snprintf(url, sizeof(url), "%s/projects/%s/timeSeries", cb->url, cb->project);
266 wg_memory_t response = {0};
268 int status = do_post(cb, url, payload, &response);
270 ERROR("write_stackdriver plugin: POST %s failed", url);
273 sfree(response.memory);
276 ERROR("write_stackdriver plugin: POST %s: unexpected response code: got "
282 } /* int wg_call_timeseries_write */
284 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
286 cb->timeseries_count = 0;
287 cb->send_buffer_init_time = cdtime();
288 } /* }}} wg_reset_buffer */
290 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
292 if (cb->curl != NULL)
295 cb->formatter = sd_output_create(cb->resource);
296 if (cb->formatter == NULL) {
297 ERROR("write_stackdriver plugin: sd_output_create failed.");
301 cb->curl = curl_easy_init();
302 if (cb->curl == NULL) {
303 ERROR("write_stackdriver plugin: curl_easy_init failed.");
307 curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
308 curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
309 PACKAGE_NAME "/" PACKAGE_VERSION);
310 curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
314 } /* }}} int wg_callback_init */
316 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
318 if (cb->timeseries_count == 0) {
319 cb->send_buffer_init_time = cdtime();
323 /* timeout == 0 => flush unconditionally */
325 cdtime_t now = cdtime();
327 if ((cb->send_buffer_init_time + timeout) > now)
331 char *payload = sd_output_reset(cb->formatter);
332 int status = wg_call_timeseries_write(cb, payload);
334 ERROR("write_stackdriver plugin: Sending buffer failed with status %d.",
340 } /* }}} wg_flush_nolock */
342 static int wg_flush(cdtime_t timeout, /* {{{ */
343 const char *identifier __attribute__((unused)),
344 user_data_t *user_data) {
348 if (user_data == NULL)
351 cb = user_data->data;
353 pthread_mutex_lock(&cb->lock);
355 if (cb->curl == NULL) {
356 status = wg_callback_init(cb);
358 ERROR("write_stackdriver plugin: wg_callback_init failed.");
359 pthread_mutex_unlock(&cb->lock);
364 status = wg_flush_nolock(timeout, cb);
365 pthread_mutex_unlock(&cb->lock);
368 } /* }}} int wg_flush */
370 static void wg_callback_free(void *data) /* {{{ */
372 wg_callback_t *cb = data;
376 sd_output_destroy(cb->formatter);
377 cb->formatter = NULL;
383 oauth_destroy(cb->auth);
385 curl_easy_cleanup(cb->curl);
389 } /* }}} void wg_callback_free */
391 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
392 const value_list_t *vl) {
394 for (size_t i = 0; i < ds->ds_num; i++) {
397 int status = sd_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
399 ERROR("write_stackdriver plugin: sd_format_metric_descriptor failed "
406 status = wg_call_metricdescriptor_create(cb, buffer);
408 ERROR("write_stackdriver plugin: wg_call_metricdescriptor_create failed "
416 return sd_output_register_metric(cb->formatter, ds, vl);
417 } /* }}} int wg_metric_descriptors_create */
419 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
420 user_data_t *user_data) {
421 wg_callback_t *cb = user_data->data;
425 pthread_mutex_lock(&cb->lock);
427 if (cb->curl == NULL) {
428 int status = wg_callback_init(cb);
430 ERROR("write_stackdriver plugin: wg_callback_init failed.");
431 pthread_mutex_unlock(&cb->lock);
438 status = sd_output_add(cb->formatter, ds, vl);
439 if (status == 0) { /* success */
441 } else if (status == ENOBUFS) { /* success, flush */
442 wg_flush_nolock(0, cb);
445 } else if (status == EEXIST) {
446 /* metric already in the buffer; flush and retry */
447 wg_flush_nolock(0, cb);
449 } else if (status == ENOENT) {
450 /* new metric, create metric descriptor first */
451 status = wg_metric_descriptors_create(cb, ds, vl);
462 cb->timeseries_count++;
465 pthread_mutex_unlock(&cb->lock);
467 } /* }}} int wg_write */
469 static void wg_check_scope(char const *email) /* {{{ */
471 char *scope = gce_scope(email);
473 WARNING("write_stackdriver plugin: Unable to determine scope of this "
478 if (strstr(scope, MONITORING_SCOPE) == NULL) {
481 /* Strip trailing newline characers for printing. */
482 scope_len = strlen(scope);
483 while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
484 scope[--scope_len] = 0;
486 WARNING("write_stackdriver plugin: The determined scope of this instance "
487 "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
488 "to add this scope to the list of scopes passed to gcutil with "
489 "--service_account_scopes when creating the instance. "
490 "Alternatively, to use this plugin on an instance which does not "
491 "have this scope, use a Service Account.",
492 scope, MONITORING_SCOPE);
496 } /* }}} void wg_check_scope */
498 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
500 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
501 ERROR("write_stackdriver plugin: The \"%s\" option requires exactly one "
507 char *resource_type = ci->values[0].value.string;
509 if (cb->resource != NULL) {
510 sd_resource_destroy(cb->resource);
513 cb->resource = sd_resource_create(resource_type);
514 if (cb->resource == NULL) {
515 ERROR("write_stackdriver plugin: sd_resource_create(\"%s\") failed.",
520 for (int i = 0; i < ci->children_num; i++) {
521 oconfig_item_t *child = ci->children + i;
523 if (strcasecmp("Label", child->key) == 0) {
524 if ((child->values_num != 2) ||
525 (child->values[0].type != OCONFIG_TYPE_STRING) ||
526 (child->values[1].type != OCONFIG_TYPE_STRING)) {
527 ERROR("write_stackdriver plugin: The \"Label\" option needs exactly "
528 "two string arguments.");
532 sd_resource_add_label(cb->resource, child->values[0].value.string,
533 child->values[1].value.string);
538 } /* }}} int wg_config_resource */
540 static int wg_config(oconfig_item_t *ci) /* {{{ */
546 wg_callback_t *cb = calloc(1, sizeof(*cb));
548 ERROR("write_stackdriver plugin: calloc failed.");
551 cb->url = strdup(GCM_API_URL);
552 pthread_mutex_init(&cb->lock, /* attr = */ NULL);
554 char *credential_file = NULL;
556 for (int i = 0; i < ci->children_num; i++) {
557 oconfig_item_t *child = ci->children + i;
558 if (strcasecmp("Project", child->key) == 0)
559 cf_util_get_string(child, &cb->project);
560 else if (strcasecmp("Email", child->key) == 0)
561 cf_util_get_string(child, &cb->email);
562 else if (strcasecmp("Url", child->key) == 0)
563 cf_util_get_string(child, &cb->url);
564 else if (strcasecmp("CredentialFile", child->key) == 0)
565 cf_util_get_string(child, &credential_file);
566 else if (strcasecmp("Resource", child->key) == 0)
567 wg_config_resource(child, cb);
569 ERROR("write_stackdriver plugin: Invalid configuration option: %s.",
571 wg_callback_free(cb);
576 /* Set up authentication */
577 /* Option 1: Credentials file given => use service account */
578 if (credential_file != NULL) {
580 oauth_create_google_file(credential_file, MONITORING_SCOPE);
581 if (cfg.oauth == NULL) {
582 ERROR("write_stackdriver plugin: oauth_create_google_file failed");
583 wg_callback_free(cb);
586 cb->auth = cfg.oauth;
588 if (cb->project == NULL) {
589 cb->project = cfg.project_id;
590 INFO("write_stackdriver plugin: Automatically detected project ID: "
594 sfree(cfg.project_id);
597 /* Option 2: Look for credentials in well-known places */
598 if (cb->auth == NULL) {
599 oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
600 cb->auth = cfg.oauth;
602 if (cb->project == NULL) {
603 cb->project = cfg.project_id;
604 INFO("write_stackdriver plugin: Automatically detected project ID: "
608 sfree(cfg.project_id);
612 if ((cb->auth != NULL) && (cb->email != NULL)) {
613 NOTICE("write_stackdriver plugin: A service account email was configured "
615 "not used for authentication because %s used instead.",
616 (credential_file != NULL) ? "a credential file was"
617 : "application default credentials were");
620 /* Option 3: Running on GCE => use metadata service */
621 if ((cb->auth == NULL) && gce_check()) {
622 wg_check_scope(cb->email);
623 } else if (cb->auth == NULL) {
624 ERROR("write_stackdriver plugin: Unable to determine credentials. Please "
626 "specify the \"Credentials\" option or set up Application Default "
628 wg_callback_free(cb);
632 if ((cb->project == NULL) && gce_check()) {
633 cb->project = gce_project_id();
635 if (cb->project == NULL) {
636 ERROR("write_stackdriver plugin: Unable to determine the project number. "
637 "Please specify the \"Project\" option manually.");
638 wg_callback_free(cb);
642 if ((cb->resource == NULL) && gce_check()) {
643 /* TODO(octo): add error handling */
644 cb->resource = sd_resource_create("gce_instance");
645 sd_resource_add_label(cb->resource, "project_id", gce_project_id());
646 sd_resource_add_label(cb->resource, "instance_id", gce_instance_id());
647 sd_resource_add_label(cb->resource, "zone", gce_zone());
649 if (cb->resource == NULL) {
650 /* TODO(octo): add error handling */
651 cb->resource = sd_resource_create("global");
652 sd_resource_add_label(cb->resource, "project_id", cb->project);
655 DEBUG("write_stackdriver plugin: Registering write callback with URL %s",
657 assert((cb->auth != NULL) || gce_check());
659 user_data_t user_data = {
662 plugin_register_flush("write_stackdriver", wg_flush, &user_data);
664 user_data.free_func = wg_callback_free;
665 plugin_register_write("write_stackdriver", wg_write, &user_data);
668 } /* }}} int wg_config */
670 static int wg_init(void) {
672 /* Call this while collectd is still single-threaded to avoid
673 * initialization issues in libgcrypt. */
674 curl_global_init(CURL_GLOBAL_SSL);
677 } /* }}} int wg_init */
679 void module_register(void) /* {{{ */
681 plugin_register_complex_config("write_stackdriver", wg_config);
682 plugin_register_init("write_stackdriver", wg_init);
683 } /* }}} void module_register */