Rename write_gcm to write_stackdriver.
[collectd.git] / src / write_stackdriver.c
1 /**
2  * collectd - src/write_stackdriver.c
3  * ISC license
4  *
5  * Copyright (C) 2017  Florian Forster
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Florian Forster <octo at collectd.org>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "configfile.h"
27 #include "plugin.h"
28 #include "utils_format_stackdriver.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
31
32 #include <curl/curl.h>
33 #include <pthread.h>
34
35 /*
36  * Private variables
37  */
38 #ifndef GCM_API_URL
39 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
40 #endif
41
42 #ifndef MONITORING_SCOPE
43 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
44 #endif
45
46 struct wg_callback_s {
47   /* config */
48   char *email;
49   char *project;
50   char *url;
51   sd_resource_t *resource;
52
53   /* runtime */
54   oauth_t *auth;
55   sd_output_t *formatter;
56   CURL *curl;
57   char curl_errbuf[CURL_ERROR_SIZE];
58   /* used by flush */
59   size_t timeseries_count;
60   cdtime_t send_buffer_init_time;
61
62   pthread_mutex_t lock;
63 };
64 typedef struct wg_callback_s wg_callback_t;
65
66 struct wg_memory_s {
67   char *memory;
68   size_t size;
69 };
70 typedef struct wg_memory_s wg_memory_t;
71
72 static size_t wg_write_memory_cb(void *contents, size_t size,
73                                  size_t nmemb, /* {{{ */
74                                  void *userp) {
75   size_t realsize = size * nmemb;
76   wg_memory_t *mem = (wg_memory_t *)userp;
77
78   if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
79     ERROR("integer overflow");
80     return 0;
81   }
82
83   mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
84   if (mem->memory == NULL) {
85     /* out of memory! */
86     ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
87     return 0;
88   }
89
90   memcpy(&(mem->memory[mem->size]), contents, realsize);
91   mem->size += realsize;
92   mem->memory[mem->size] = 0;
93   return realsize;
94 } /* }}} size_t wg_write_memory_cb */
95
96 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
97   int status = 0;
98   char access_token[256];
99   char authorization_header[256];
100
101   assert((cb->auth != NULL) || gce_check());
102   if (cb->auth != NULL)
103     status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
104   else
105     status = gce_access_token(cb->email, access_token, sizeof(access_token));
106   if (status != 0) {
107     ERROR("write_stackdriver plugin: Failed to get access token");
108     return NULL;
109   }
110
111   status = snprintf(authorization_header, sizeof(authorization_header),
112                     "Authorization: Bearer %s", access_token);
113   if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
114     return NULL;
115
116   return strdup(authorization_header);
117 } /* }}} char *wg_get_authorization_header */
118
119 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
120                                            char const *payload) {
121   /* {{{ */
122   char final_url[1024];
123   int status =
124       snprintf(final_url, sizeof(final_url), "%s/projects/%s/metricDescriptors",
125                cb->url, cb->project);
126   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
127     return -1;
128
129   char *authorization_header = wg_get_authorization_header(cb);
130   if (authorization_header == NULL)
131     return -1;
132
133   struct curl_slist *headers = NULL;
134   headers = curl_slist_append(headers, "Content-Type: application/json");
135   headers = curl_slist_append(headers, authorization_header);
136
137   CURL *curl = curl_easy_init();
138   if (!curl) {
139     ERROR("write_stackdriver plugin: curl_easy_init failed.");
140     curl_slist_free_all(headers);
141     sfree(authorization_header);
142     return -1;
143   }
144
145   curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
146   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
147                    PACKAGE_NAME "/" PACKAGE_VERSION);
148   char curl_errbuf[CURL_ERROR_SIZE];
149   curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_errbuf);
150   curl_easy_setopt(curl, CURLOPT_URL, final_url);
151   curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
152   curl_easy_setopt(curl, CURLOPT_POST, 1L);
153   curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload);
154
155   wg_memory_t res = {
156       .memory = NULL, .size = 0,
157   };
158   curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
159   curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
160
161   status = curl_easy_perform(curl);
162   if (status != CURLE_OK) {
163     ERROR(
164         "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
165         status, curl_errbuf);
166     sfree(res.memory);
167     curl_easy_cleanup(curl);
168     curl_slist_free_all(headers);
169     sfree(authorization_header);
170     return -1;
171   }
172
173   long http_code = 0;
174   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
175   if ((http_code < 200) || (http_code >= 300)) {
176     ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
177           final_url, http_code);
178     INFO("write_stackdriver plugin: Server replied: %s", res.memory);
179     sfree(res.memory);
180     curl_easy_cleanup(curl);
181     curl_slist_free_all(headers);
182     sfree(authorization_header);
183     return -1;
184   }
185
186   sfree(res.memory);
187   curl_easy_cleanup(curl);
188   curl_slist_free_all(headers);
189   sfree(authorization_header);
190   return 0;
191 } /* }}} int wg_call_metricdescriptor_create */
192
193 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
194 {
195   cb->timeseries_count = 0;
196   cb->send_buffer_init_time = cdtime();
197 } /* }}} wg_reset_buffer */
198
199 static int wg_call_timeseries_write(wg_callback_t *cb,
200                                     char const *payload) /* {{{ */
201 {
202   char final_url[1024];
203   int status = snprintf(final_url, sizeof(final_url),
204                         "%s/projects/%s/timeSeries", cb->url, cb->project);
205   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
206     return -1;
207
208   char *authorization_header = wg_get_authorization_header(cb);
209   if (authorization_header == NULL)
210     return -1;
211
212   struct curl_slist *headers = NULL;
213   headers = curl_slist_append(headers, authorization_header);
214   headers = curl_slist_append(headers, "Content-Type: application/json");
215
216   curl_easy_setopt(cb->curl, CURLOPT_URL, final_url);
217   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
218   curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
219   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
220
221   wg_memory_t res = {
222       .memory = NULL, .size = 0,
223   };
224   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
225   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, &res);
226
227   status = curl_easy_perform(cb->curl);
228   if (status != CURLE_OK) {
229     ERROR(
230         "write_stackdriver plugin: curl_easy_perform failed with status %d: %s",
231         status, cb->curl_errbuf);
232     sfree(res.memory);
233     curl_slist_free_all(headers);
234     sfree(authorization_header);
235     return -1;
236   }
237
238   long http_code = 0;
239   curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
240   if ((http_code < 200) || (http_code >= 300)) {
241     ERROR("write_stackdriver plugin: POST request to %s failed: HTTP error %ld",
242           final_url, http_code);
243     INFO("write_stackdriver plugin: Server replied: %s", res.memory);
244     sfree(res.memory);
245     curl_slist_free_all(headers);
246     sfree(authorization_header);
247     return -1;
248   }
249
250   sfree(res.memory);
251   curl_slist_free_all(headers);
252   sfree(authorization_header);
253   return status;
254 } /* }}} wg_call_timeseries_write */
255
256 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
257 {
258   if (cb->curl != NULL)
259     return 0;
260
261   cb->formatter = sd_output_create(cb->resource);
262   if (cb->formatter == NULL) {
263     ERROR("write_stackdriver plugin: sd_output_create failed.");
264     return -1;
265   }
266
267   cb->curl = curl_easy_init();
268   if (cb->curl == NULL) {
269     ERROR("write_stackdriver plugin: curl_easy_init failed.");
270     return -1;
271   }
272
273   curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
274   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
275                    PACKAGE_NAME "/" PACKAGE_VERSION);
276   curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
277   wg_reset_buffer(cb);
278
279   return 0;
280 } /* }}} int wg_callback_init */
281
282 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
283 {
284   if (cb->timeseries_count == 0) {
285     cb->send_buffer_init_time = cdtime();
286     return 0;
287   }
288
289   /* timeout == 0  => flush unconditionally */
290   if (timeout > 0) {
291     cdtime_t now = cdtime();
292
293     if ((cb->send_buffer_init_time + timeout) > now)
294       return 0;
295   }
296
297   char *payload = sd_output_reset(cb->formatter);
298   int status = wg_call_timeseries_write(cb, payload);
299   if (status != 0) {
300     ERROR("write_stackdriver plugin: Sending buffer failed with status %d.",
301           status);
302   }
303
304   wg_reset_buffer(cb);
305   return status;
306 } /* }}} wg_flush_nolock */
307
308 static int wg_flush(cdtime_t timeout, /* {{{ */
309                     const char *identifier __attribute__((unused)),
310                     user_data_t *user_data) {
311   wg_callback_t *cb;
312   int status;
313
314   if (user_data == NULL)
315     return -EINVAL;
316
317   cb = user_data->data;
318
319   pthread_mutex_lock(&cb->lock);
320
321   if (cb->curl == NULL) {
322     status = wg_callback_init(cb);
323     if (status != 0) {
324       ERROR("write_stackdriver plugin: wg_callback_init failed.");
325       pthread_mutex_unlock(&cb->lock);
326       return -1;
327     }
328   }
329
330   status = wg_flush_nolock(timeout, cb);
331   pthread_mutex_unlock(&cb->lock);
332
333   return status;
334 } /* }}} int wg_flush */
335
336 static void wg_callback_free(void *data) /* {{{ */
337 {
338   wg_callback_t *cb = data;
339   if (cb == NULL)
340     return;
341
342   sd_output_destroy(cb->formatter);
343   cb->formatter = NULL;
344
345   sfree(cb->email);
346   sfree(cb->project);
347   sfree(cb->url);
348
349   oauth_destroy(cb->auth);
350   if (cb->curl) {
351     curl_easy_cleanup(cb->curl);
352   }
353
354   sfree(cb);
355 } /* }}} void wg_callback_free */
356
357 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
358                                         const value_list_t *vl) {
359   /* {{{ */
360   for (size_t i = 0; i < ds->ds_num; i++) {
361     char buffer[4096];
362
363     int status = sd_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
364     if (status != 0) {
365       ERROR("write_stackdriver plugin: sd_format_metric_descriptor failed "
366             "with status "
367             "%d",
368             status);
369       return status;
370     }
371
372     status = wg_call_metricdescriptor_create(cb, buffer);
373     if (status != 0) {
374       ERROR("write_stackdriver plugin: wg_call_metricdescriptor_create failed "
375             "with "
376             "status %d",
377             status);
378       return status;
379     }
380   }
381
382   return sd_output_register_metric(cb->formatter, ds, vl);
383 } /* }}} int wg_metric_descriptors_create */
384
385 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
386                     user_data_t *user_data) {
387   wg_callback_t *cb = user_data->data;
388   if (cb == NULL)
389     return EINVAL;
390
391   pthread_mutex_lock(&cb->lock);
392
393   if (cb->curl == NULL) {
394     int status = wg_callback_init(cb);
395     if (status != 0) {
396       ERROR("write_stackdriver plugin: wg_callback_init failed.");
397       pthread_mutex_unlock(&cb->lock);
398       return status;
399     }
400   }
401
402   int status;
403   while (42) {
404     status = sd_output_add(cb->formatter, ds, vl);
405     if (status == 0) { /* success */
406       break;
407     } else if (status == ENOBUFS) { /* success, flush */
408       wg_flush_nolock(0, cb);
409       status = 0;
410       break;
411     } else if (status == EEXIST) {
412       /* metric already in the buffer; flush and retry */
413       wg_flush_nolock(0, cb);
414       continue;
415     } else if (status == ENOENT) {
416       /* new metric, create metric descriptor first */
417       status = wg_metric_descriptors_create(cb, ds, vl);
418       if (status != 0) {
419         break;
420       }
421       continue;
422     } else {
423       break;
424     }
425   }
426
427   if (status == 0) {
428     cb->timeseries_count++;
429   }
430
431   pthread_mutex_unlock(&cb->lock);
432   return status;
433 } /* }}} int wg_write */
434
435 static void wg_check_scope(char const *email) /* {{{ */
436 {
437   char *scope = gce_scope(email);
438   if (scope == NULL) {
439     WARNING("write_stackdriver plugin: Unable to determine scope of this "
440             "instance.");
441     return;
442   }
443
444   if (strstr(scope, MONITORING_SCOPE) == NULL) {
445     size_t scope_len;
446
447     /* Strip trailing newline characers for printing. */
448     scope_len = strlen(scope);
449     while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
450       scope[--scope_len] = 0;
451
452     WARNING("write_stackdriver plugin: The determined scope of this instance "
453             "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
454             "to add this scope to the list of scopes passed to gcutil with "
455             "--service_account_scopes when creating the instance. "
456             "Alternatively, to use this plugin on an instance which does not "
457             "have this scope, use a Service Account.",
458             scope, MONITORING_SCOPE);
459   }
460
461   sfree(scope);
462 } /* }}} void wg_check_scope */
463
464 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
465 {
466   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
467     ERROR("write_stackdriver plugin: The \"%s\" option requires exactly one "
468           "string "
469           "argument.",
470           ci->key);
471     return EINVAL;
472   }
473   char *resource_type = ci->values[0].value.string;
474
475   if (cb->resource != NULL) {
476     sd_resource_destroy(cb->resource);
477   }
478
479   cb->resource = sd_resource_create(resource_type);
480   if (cb->resource == NULL) {
481     ERROR("write_stackdriver plugin: sd_resource_create(\"%s\") failed.",
482           resource_type);
483     return ENOMEM;
484   }
485
486   for (int i = 0; i < ci->children_num; i++) {
487     oconfig_item_t *child = ci->children + i;
488
489     if ((child->values_num != 1) ||
490         (child->values[0].type != OCONFIG_TYPE_STRING)) {
491       ERROR("write_stackdriver plugin: Resource labels must have exactly one "
492             "string "
493             "value. Ignoring label \"%s\".",
494             child->key);
495       continue;
496     }
497
498     sd_resource_add_label(cb->resource, child->key,
499                           child->values[0].value.string);
500   }
501
502   return 0;
503 } /* }}} int wg_config_resource */
504
505 static int wg_config(oconfig_item_t *ci) /* {{{ */
506 {
507   if (ci == NULL) {
508     return EINVAL;
509   }
510
511   wg_callback_t *cb = calloc(1, sizeof(*cb));
512   if (cb == NULL) {
513     ERROR("write_stackdriver plugin: calloc failed.");
514     return ENOMEM;
515   }
516   cb->url = strdup(GCM_API_URL);
517   pthread_mutex_init(&cb->lock, /* attr = */ NULL);
518
519   char *credential_file = NULL;
520
521   for (int i = 0; i < ci->children_num; i++) {
522     oconfig_item_t *child = ci->children + i;
523     if (strcasecmp("Project", child->key) == 0)
524       cf_util_get_string(child, &cb->project);
525     else if (strcasecmp("Email", child->key) == 0)
526       cf_util_get_string(child, &cb->email);
527     else if (strcasecmp("Url", child->key) == 0)
528       cf_util_get_string(child, &cb->url);
529     else if (strcasecmp("CredentialFile", child->key) == 0)
530       cf_util_get_string(child, &credential_file);
531     else if (strcasecmp("Resource", child->key) == 0)
532       wg_config_resource(child, cb);
533     else {
534       ERROR("write_stackdriver plugin: Invalid configuration option: %s.",
535             child->key);
536       wg_callback_free(cb);
537       return EINVAL;
538     }
539   }
540
541   /* Set up authentication */
542   /* Option 1: Credentials file given => use service account */
543   if (credential_file != NULL) {
544     oauth_google_t cfg =
545         oauth_create_google_file(credential_file, MONITORING_SCOPE);
546     if (cfg.oauth == NULL) {
547       ERROR("write_stackdriver plugin: oauth_create_google_file failed");
548       wg_callback_free(cb);
549       return EINVAL;
550     }
551     cb->auth = cfg.oauth;
552
553     if (cb->project == NULL) {
554       cb->project = cfg.project_id;
555       INFO(
556           "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
557           cb->project);
558     } else {
559       sfree(cfg.project_id);
560     }
561   }
562   /* Option 2: Look for credentials in well-known places */
563   if (cb->auth == NULL) {
564     oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
565     cb->auth = cfg.oauth;
566
567     if (cb->project == NULL) {
568       cb->project = cfg.project_id;
569       INFO(
570           "write_stackdriver plugin: Automatically detected project ID: \"%s\"",
571           cb->project);
572     } else {
573       sfree(cfg.project_id);
574     }
575   }
576
577   if ((cb->auth != NULL) && (cb->email != NULL)) {
578     NOTICE("write_stackdriver plugin: A service account email was configured "
579            "but is "
580            "not used for authentication because %s used instead.",
581            (credential_file != NULL) ? "a credential file was"
582                                      : "application default credentials were");
583   }
584
585   /* Option 3: Running on GCE => use metadata service */
586   if ((cb->auth == NULL) && gce_check()) {
587     wg_check_scope(cb->email);
588   } else if (cb->auth == NULL) {
589     ERROR("write_stackdriver plugin: Unable to determine credentials. Please "
590           "either "
591           "specify the \"Credentials\" option or set up Application Default "
592           "Credentials.");
593     wg_callback_free(cb);
594     return EINVAL;
595   }
596
597   if ((cb->project == NULL) && gce_check()) {
598     cb->project = gce_project_id();
599   }
600   if (cb->project == NULL) {
601     ERROR("write_stackdriver plugin: Unable to determine the project number. "
602           "Please specify the \"Project\" option manually.");
603     wg_callback_free(cb);
604     return EINVAL;
605   }
606
607   if ((cb->resource == NULL) && gce_check()) {
608     /* TODO(octo): add error handling */
609     cb->resource = sd_resource_create("gce_instance");
610     sd_resource_add_label(cb->resource, "project_id", gce_project_id());
611     sd_resource_add_label(cb->resource, "instance_id", gce_instance_id());
612     sd_resource_add_label(cb->resource, "zone", gce_zone());
613   }
614   if (cb->resource == NULL) {
615     /* TODO(octo): add error handling */
616     cb->resource = sd_resource_create("global");
617     sd_resource_add_label(cb->resource, "project_id", cb->project);
618   }
619
620   DEBUG("write_stackdriver plugin: Registering write callback with URL %s",
621         cb->url);
622   assert((cb->auth != NULL) || gce_check());
623
624   user_data_t user_data = {
625       .data = cb,
626   };
627   plugin_register_flush("write_stackdriver", wg_flush, &user_data);
628
629   user_data.free_func = wg_callback_free;
630   plugin_register_write("write_stackdriver", wg_write, &user_data);
631
632   return 0;
633 } /* }}} int wg_config */
634
635 static int wg_init(void) {
636   /* {{{ */
637   /* Call this while collectd is still single-threaded to avoid
638    * initialization issues in libgcrypt. */
639   curl_global_init(CURL_GLOBAL_SSL);
640
641   return 0;
642 } /* }}} int wg_init */
643
644 void module_register(void) /* {{{ */
645 {
646   plugin_register_complex_config("write_stackdriver", wg_config);
647   plugin_register_init("write_stackdriver", wg_init);
648 } /* }}} void module_register */
649
650 /* vim: set sw=2 sts=2 et fdm=marker : */