574191d4aca54384075df67ab1b864aea26a5acf
[collectd.git] / src / write_stackdriver.c
1 /**
2  * collectd - src/write_stackdriver.c
3  * ISC license
4  *
5  * Copyright (C) 2017  Florian Forster
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Florian Forster <octo at collectd.org>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "configfile.h"
27 #include "plugin.h"
28 #include "utils_format_stackdriver.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
31
32 #include <curl/curl.h>
33 #include <pthread.h>
34
35 /*
36  * Private variables
37  */
38 #ifndef GCM_API_URL
39 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
40 #endif
41
42 #ifndef MONITORING_SCOPE
43 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
44 #endif
45
46 struct wg_callback_s {
47   /* config */
48   char *email;
49   char *project;
50   char *url;
51   sd_resource_t *resource;
52
53   /* runtime */
54   oauth_t *auth;
55   sd_output_t *formatter;
56   CURL *curl;
57   char curl_errbuf[CURL_ERROR_SIZE];
58   /* used by flush */
59   size_t timeseries_count;
60   cdtime_t send_buffer_init_time;
61
62   pthread_mutex_t lock;
63 };
64 typedef struct wg_callback_s wg_callback_t;
65
66 struct wg_memory_s {
67   char *memory;
68   size_t size;
69 };
70 typedef struct wg_memory_s wg_memory_t;
71
72 static size_t wg_write_memory_cb(void *contents, size_t size,
73                                  size_t nmemb, /* {{{ */
74                                  void *userp) {
75   size_t realsize = size * nmemb;
76   wg_memory_t *mem = (wg_memory_t *)userp;
77
78   if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
79     ERROR("integer overflow");
80     return 0;
81   }
82
83   mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
84   if (mem->memory == NULL) {
85     /* out of memory! */
86     ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
87     return 0;
88   }
89
90   memcpy(&(mem->memory[mem->size]), contents, realsize);
91   mem->size += realsize;
92   mem->memory[mem->size] = 0;
93   return realsize;
94 } /* }}} size_t wg_write_memory_cb */
95
96 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
97   int status = 0;
98   char access_token[256];
99   char authorization_header[256];
100
101   assert((cb->auth != NULL) || gce_check());
102   if (cb->auth != NULL)
103     status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
104   else
105     status = gce_access_token(cb->email, access_token, sizeof(access_token));
106   if (status != 0) {
107     ERROR("write_stackdriver plugin: Failed to get access token");
108     return NULL;
109   }
110
111   status = snprintf(authorization_header, sizeof(authorization_header),
112                     "Authorization: Bearer %s", access_token);
113   if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
114     return NULL;
115
116   return strdup(authorization_header);
117 } /* }}} char *wg_get_authorization_header */
118
119 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
120                                            char const *payload) {
121   /* {{{ */
122   char final_url[1024];
123   int status =
124       snprintf(final_url, sizeof(final_url), "%s/projects/%s/metricDescriptors",
125                cb->url, cb->project);
126   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
127     return -1;
128
129   char *authorization_header = wg_get_authorization_header(cb);
130   if (authorization_header == NULL)
131     return -1;
132
133   struct curl_slist *headers = NULL;
134   headers = curl_slist_append(headers, "Content-Type: application/json");
135   headers = curl_slist_append(headers, authorization_header);
136
137   CURL *curl = curl_easy_init();
138   if (!curl) {
139     ERROR("write_stackdriver plugin: curl_easy_init failed.");
140     curl_slist_free_all(headers);
141     sfree(authorization_header);
142     return -1;
143   }
144
145   curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
146   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
147                    PACKAGE_NAME "/" PACKAGE_VERSION);
148   char curl_errbuf[CURL_ERROR_SIZE];
149   curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_errbuf);
150   curl_easy_setopt(curl, CURLOPT_URL, final_url);
151   curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
152   curl_easy_setopt(curl, CURLOPT_POST, 1L);
153   curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload);
154
155   wg_memory_t res = {
156       .memory = NULL, .size = 0,
157   };
158   curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
159   curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
160
161   status = curl_easy_perform(curl);
162   if (status != CURLE_OK) {
163     ERROR(
164         "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
165         status, curl_errbuf);
166     sfree(res.memory);
167     curl_easy_cleanup(curl);
168     curl_slist_free_all(headers);
169     sfree(authorization_header);
170     return -1;
171   }
172
173   long http_code = 0;
174   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
175   if ((http_code < 200) || (http_code >= 300)) {
176     ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
177           final_url, http_code);
178     INFO("write_stackdriver plugin: Server replied: %s", res.memory);
179     sfree(res.memory);
180     curl_easy_cleanup(curl);
181     curl_slist_free_all(headers);
182     sfree(authorization_header);
183     return -1;
184   }
185
186   sfree(res.memory);
187   curl_easy_cleanup(curl);
188   curl_slist_free_all(headers);
189   sfree(authorization_header);
190   return 0;
191 } /* }}} int wg_call_metricdescriptor_create */
192
193 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
194 {
195   cb->timeseries_count = 0;
196   cb->send_buffer_init_time = cdtime();
197 } /* }}} wg_reset_buffer */
198
199 static int wg_call_timeseries_write(wg_callback_t *cb,
200                                     char const *payload) /* {{{ */
201 {
202   char final_url[1024];
203   int status = snprintf(final_url, sizeof(final_url),
204                         "%s/projects/%s/timeSeries", cb->url, cb->project);
205   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
206     return -1;
207
208   char *authorization_header = wg_get_authorization_header(cb);
209   if (authorization_header == NULL)
210     return -1;
211
212   struct curl_slist *headers = NULL;
213   headers = curl_slist_append(headers, authorization_header);
214   headers = curl_slist_append(headers, "Content-Type: application/json");
215
216   curl_easy_setopt(cb->curl, CURLOPT_URL, final_url);
217   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
218   curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
219   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
220
221   wg_memory_t res = {
222       .memory = NULL, .size = 0,
223   };
224   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
225   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, &res);
226
227   status = curl_easy_perform(cb->curl);
228   if (status != CURLE_OK) {
229     ERROR(
230         "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
231         status, cb->curl_errbuf);
232     sfree(res.memory);
233     curl_slist_free_all(headers);
234     sfree(authorization_header);
235     return -1;
236   }
237
238   long http_code = 0;
239   curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
240   if ((http_code < 200) || (http_code >= 300)) {
241     ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
242           final_url, http_code);
243     INFO("write_stackdriver plugin: Server replied: %s", res.memory);
244     sfree(res.memory);
245     curl_slist_free_all(headers);
246     sfree(authorization_header);
247     return -1;
248   }
249
250   sfree(res.memory);
251   curl_slist_free_all(headers);
252   sfree(authorization_header);
253   return status;
254 } /* }}} wg_call_timeseries_write */
255
256 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
257 {
258   if (cb->curl != NULL)
259     return 0;
260
261   cb->formatter = sd_output_create(cb->resource);
262   if (cb->formatter == NULL) {
263     ERROR("write_stackdriver plugin: sd_output_create failed.");
264     return -1;
265   }
266
267   cb->curl = curl_easy_init();
268   if (cb->curl == NULL) {
269     ERROR("write_stackdriver plugin: curl_easy_init failed.");
270     return -1;
271   }
272
273   curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
274   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
275                    PACKAGE_NAME "/" PACKAGE_VERSION);
276   curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
277   wg_reset_buffer(cb);
278
279   return 0;
280 } /* }}} int wg_callback_init */
281
282 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
283 {
284   if (cb->timeseries_count == 0) {
285     cb->send_buffer_init_time = cdtime();
286     return 0;
287   }
288
289   /* timeout == 0  => flush unconditionally */
290   if (timeout > 0) {
291     cdtime_t now = cdtime();
292
293     if ((cb->send_buffer_init_time + timeout) > now)
294       return 0;
295   }
296
297   char *payload = sd_output_reset(cb->formatter);
298   int status = wg_call_timeseries_write(cb, payload);
299   if (status != 0) {
300     ERROR("write_stackdriver plugin: Sending buffer failed with status %d.",
301           status);
302   }
303
304   wg_reset_buffer(cb);
305   return status;
306 } /* }}} wg_flush_nolock */
307
308 static int wg_flush(cdtime_t timeout, /* {{{ */
309                     const char *identifier __attribute__((unused)),
310                     user_data_t *user_data) {
311   wg_callback_t *cb;
312   int status;
313
314   if (user_data == NULL)
315     return -EINVAL;
316
317   cb = user_data->data;
318
319   pthread_mutex_lock(&cb->lock);
320
321   if (cb->curl == NULL) {
322     status = wg_callback_init(cb);
323     if (status != 0) {
324       ERROR("write_stackdriver plugin: wg_callback_init failed.");
325       pthread_mutex_unlock(&cb->lock);
326       return -1;
327     }
328   }
329
330   status = wg_flush_nolock(timeout, cb);
331   pthread_mutex_unlock(&cb->lock);
332
333   return status;
334 } /* }}} int wg_flush */
335
336 static void wg_callback_free(void *data) /* {{{ */
337 {
338   wg_callback_t *cb = data;
339   if (cb == NULL)
340     return;
341
342   sd_output_destroy(cb->formatter);
343   cb->formatter = NULL;
344
345   sfree(cb->email);
346   sfree(cb->project);
347   sfree(cb->url);
348
349   oauth_destroy(cb->auth);
350   if (cb->curl) {
351     curl_easy_cleanup(cb->curl);
352   }
353
354   sfree(cb);
355 } /* }}} void wg_callback_free */
356
357 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
358                                         const value_list_t *vl) {
359   /* {{{ */
360   for (size_t i = 0; i < ds->ds_num; i++) {
361     char buffer[4096];
362
363     int status = sd_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
364     if (status != 0) {
365       ERROR("write_stackdriver plugin: sd_format_metric_descriptor failed "
366             "with status "
367             "%d",
368             status);
369       return status;
370     }
371
372     status = wg_call_metricdescriptor_create(cb, buffer);
373     if (status != 0) {
374       ERROR("write_stackdriver plugin: wg_call_metricdescriptor_create failed "
375             "with "
376             "status %d",
377             status);
378       return status;
379     }
380   }
381
382   return sd_output_register_metric(cb->formatter, ds, vl);
383 } /* }}} int wg_metric_descriptors_create */
384
385 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
386                     user_data_t *user_data) {
387   wg_callback_t *cb = user_data->data;
388   if (cb == NULL)
389     return EINVAL;
390
391   pthread_mutex_lock(&cb->lock);
392
393   if (cb->curl == NULL) {
394     int status = wg_callback_init(cb);
395     if (status != 0) {
396       ERROR("write_stackdriver plugin: wg_callback_init failed.");
397       pthread_mutex_unlock(&cb->lock);
398       return status;
399     }
400   }
401
402   int status;
403   while (42) {
404     status = sd_output_add(cb->formatter, ds, vl);
405     if (status == 0) { /* success */
406       break;
407     } else if (status == ENOBUFS) { /* success, flush */
408       wg_flush_nolock(0, cb);
409       status = 0;
410       break;
411     } else if (status == EEXIST) {
412       /* metric already in the buffer; flush and retry */
413       wg_flush_nolock(0, cb);
414       continue;
415     } else if (status == ENOENT) {
416       /* new metric, create metric descriptor first */
417       status = wg_metric_descriptors_create(cb, ds, vl);
418       if (status != 0) {
419         break;
420       }
421       continue;
422     } else {
423       break;
424     }
425   }
426
427   if (status == 0) {
428     cb->timeseries_count++;
429   }
430
431   pthread_mutex_unlock(&cb->lock);
432   return status;
433 } /* }}} int wg_write */
434
435 static void wg_check_scope(char const *email) /* {{{ */
436 {
437   char *scope = gce_scope(email);
438   if (scope == NULL) {
439     WARNING("write_stackdriver plugin: Unable to determine scope of this "
440             "instance.");
441     return;
442   }
443
444   if (strstr(scope, MONITORING_SCOPE) == NULL) {
445     size_t scope_len;
446
447     /* Strip trailing newline characers for printing. */
448     scope_len = strlen(scope);
449     while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
450       scope[--scope_len] = 0;
451
452     WARNING("write_stackdriver plugin: The determined scope of this instance "
453             "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
454             "to add this scope to the list of scopes passed to gcutil with "
455             "--service_account_scopes when creating the instance. "
456             "Alternatively, to use this plugin on an instance which does not "
457             "have this scope, use a Service Account.",
458             scope, MONITORING_SCOPE);
459   }
460
461   sfree(scope);
462 } /* }}} void wg_check_scope */
463
464 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
465 {
466   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
467     ERROR("write_stackdriver plugin: The \"%s\" option requires exactly one "
468           "string "
469           "argument.",
470           ci->key);
471     return EINVAL;
472   }
473   char *resource_type = ci->values[0].value.string;
474
475   if (cb->resource != NULL) {
476     sd_resource_destroy(cb->resource);
477   }
478
479   cb->resource = sd_resource_create(resource_type);
480   if (cb->resource == NULL) {
481     ERROR("write_stackdriver plugin: sd_resource_create(\"%s\") failed.",
482           resource_type);
483     return ENOMEM;
484   }
485
486   for (int i = 0; i < ci->children_num; i++) {
487     oconfig_item_t *child = ci->children + i;
488
489     if (strcasecmp("Label", child->key) == 0) {
490       if ((child->values_num != 2) ||
491           (child->values[0].type != OCONFIG_TYPE_STRING) ||
492           (child->values[1].type != OCONFIG_TYPE_STRING)) {
493         ERROR("write_stackdriver plugin: The \"Label\" option needs exactly "
494               "two string arguments.");
495         continue;
496       }
497
498       sd_resource_add_label(cb->resource, child->values[0].value.string,
499                             child->values[1].value.string);
500     }
501   }
502
503   return 0;
504 } /* }}} int wg_config_resource */
505
506 static int wg_config(oconfig_item_t *ci) /* {{{ */
507 {
508   if (ci == NULL) {
509     return EINVAL;
510   }
511
512   wg_callback_t *cb = calloc(1, sizeof(*cb));
513   if (cb == NULL) {
514     ERROR("write_stackdriver plugin: calloc failed.");
515     return ENOMEM;
516   }
517   cb->url = strdup(GCM_API_URL);
518   pthread_mutex_init(&cb->lock, /* attr = */ NULL);
519
520   char *credential_file = NULL;
521
522   for (int i = 0; i < ci->children_num; i++) {
523     oconfig_item_t *child = ci->children + i;
524     if (strcasecmp("Project", child->key) == 0)
525       cf_util_get_string(child, &cb->project);
526     else if (strcasecmp("Email", child->key) == 0)
527       cf_util_get_string(child, &cb->email);
528     else if (strcasecmp("Url", child->key) == 0)
529       cf_util_get_string(child, &cb->url);
530     else if (strcasecmp("CredentialFile", child->key) == 0)
531       cf_util_get_string(child, &credential_file);
532     else if (strcasecmp("Resource", child->key) == 0)
533       wg_config_resource(child, cb);
534     else {
535       ERROR("write_stackdriver plugin: Invalid configuration option: %s.",
536             child->key);
537       wg_callback_free(cb);
538       return EINVAL;
539     }
540   }
541
542   /* Set up authentication */
543   /* Option 1: Credentials file given => use service account */
544   if (credential_file != NULL) {
545     oauth_google_t cfg =
546         oauth_create_google_file(credential_file, MONITORING_SCOPE);
547     if (cfg.oauth == NULL) {
548       ERROR("write_stackdriver plugin: oauth_create_google_file failed");
549       wg_callback_free(cb);
550       return EINVAL;
551     }
552     cb->auth = cfg.oauth;
553
554     if (cb->project == NULL) {
555       cb->project = cfg.project_id;
556       INFO(
557           "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
558           cb->project);
559     } else {
560       sfree(cfg.project_id);
561     }
562   }
563   /* Option 2: Look for credentials in well-known places */
564   if (cb->auth == NULL) {
565     oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
566     cb->auth = cfg.oauth;
567
568     if (cb->project == NULL) {
569       cb->project = cfg.project_id;
570       INFO(
571           "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
572           cb->project);
573     } else {
574       sfree(cfg.project_id);
575     }
576   }
577
578   if ((cb->auth != NULL) && (cb->email != NULL)) {
579     NOTICE("write_stackdriver plugin: A service account email was configured "
580            "but is "
581            "not used for authentication because %s used instead.",
582            (credential_file != NULL) ? "a credential file was"
583                                      : "application default credentials were");
584   }
585
586   /* Option 3: Running on GCE => use metadata service */
587   if ((cb->auth == NULL) && gce_check()) {
588     wg_check_scope(cb->email);
589   } else if (cb->auth == NULL) {
590     ERROR("write_stackdriver plugin: Unable to determine credentials. Please "
591           "either "
592           "specify the \"Credentials\" option or set up Application Default "
593           "Credentials.");
594     wg_callback_free(cb);
595     return EINVAL;
596   }
597
598   if ((cb->project == NULL) && gce_check()) {
599     cb->project = gce_project_id();
600   }
601   if (cb->project == NULL) {
602     ERROR("write_stackdriver plugin: Unable to determine the project number. "
603           "Please specify the \"Project\" option manually.");
604     wg_callback_free(cb);
605     return EINVAL;
606   }
607
608   if ((cb->resource == NULL) && gce_check()) {
609     /* TODO(octo): add error handling */
610     cb->resource = sd_resource_create("gce_instance");
611     sd_resource_add_label(cb->resource, "project_id", gce_project_id());
612     sd_resource_add_label(cb->resource, "instance_id", gce_instance_id());
613     sd_resource_add_label(cb->resource, "zone", gce_zone());
614   }
615   if (cb->resource == NULL) {
616     /* TODO(octo): add error handling */
617     cb->resource = sd_resource_create("global");
618     sd_resource_add_label(cb->resource, "project_id", cb->project);
619   }
620
621   DEBUG("write_stackdriver plugin: Registering write callback with URL %s",
622         cb->url);
623   assert((cb->auth != NULL) || gce_check());
624
625   user_data_t user_data = {
626       .data = cb,
627   };
628   plugin_register_flush("write_stackdriver", wg_flush, &user_data);
629
630   user_data.free_func = wg_callback_free;
631   plugin_register_write("write_stackdriver", wg_write, &user_data);
632
633   return 0;
634 } /* }}} int wg_config */
635
636 static int wg_init(void) {
637   /* {{{ */
638   /* Call this while collectd is still single-threaded to avoid
639    * initialization issues in libgcrypt. */
640   curl_global_init(CURL_GLOBAL_SSL);
641
642   return 0;
643 } /* }}} int wg_init */
644
645 void module_register(void) /* {{{ */
646 {
647   plugin_register_complex_config("write_stackdriver", wg_config);
648   plugin_register_init("write_stackdriver", wg_init);
649 } /* }}} void module_register */
650
651 /* vim: set sw=2 sts=2 et fdm=marker : */