write_stackdriver plugin: Check "http_code" instead of "status".
[collectd.git] / src / write_stackdriver.c
1 /**
2  * collectd - src/write_stackdriver.c
3  * ISC license
4  *
5  * Copyright (C) 2017  Florian Forster
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Florian Forster <octo at collectd.org>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "configfile.h"
27 #include "plugin.h"
28 #include "utils_format_stackdriver.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
31
32 #include <curl/curl.h>
33 #include <pthread.h>
34 #include <yajl/yajl_tree.h>
35
36 /*
37  * Private variables
38  */
39 #ifndef GCM_API_URL
40 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
41 #endif
42
43 #ifndef MONITORING_SCOPE
44 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
45 #endif
46
47 struct wg_callback_s {
48   /* config */
49   char *email;
50   char *project;
51   char *url;
52   sd_resource_t *resource;
53
54   /* runtime */
55   oauth_t *auth;
56   sd_output_t *formatter;
57   CURL *curl;
58   char curl_errbuf[CURL_ERROR_SIZE];
59   /* used by flush */
60   size_t timeseries_count;
61   cdtime_t send_buffer_init_time;
62
63   pthread_mutex_t lock;
64 };
65 typedef struct wg_callback_s wg_callback_t;
66
67 struct wg_memory_s {
68   char *memory;
69   size_t size;
70 };
71 typedef struct wg_memory_s wg_memory_t;
72
73 static size_t wg_write_memory_cb(void *contents, size_t size,
74                                  size_t nmemb, /* {{{ */
75                                  void *userp) {
76   size_t realsize = size * nmemb;
77   wg_memory_t *mem = (wg_memory_t *)userp;
78
79   if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
80     ERROR("integer overflow");
81     return 0;
82   }
83
84   mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
85   if (mem->memory == NULL) {
86     /* out of memory! */
87     ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
88     return 0;
89   }
90
91   memcpy(&(mem->memory[mem->size]), contents, realsize);
92   mem->size += realsize;
93   mem->memory[mem->size] = 0;
94   return realsize;
95 } /* }}} size_t wg_write_memory_cb */
96
97 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
98   int status = 0;
99   char access_token[256];
100   char authorization_header[256];
101
102   assert((cb->auth != NULL) || gce_check());
103   if (cb->auth != NULL)
104     status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
105   else
106     status = gce_access_token(cb->email, access_token, sizeof(access_token));
107   if (status != 0) {
108     ERROR("write_stackdriver plugin: Failed to get access token");
109     return NULL;
110   }
111
112   status = snprintf(authorization_header, sizeof(authorization_header),
113                     "Authorization: Bearer %s", access_token);
114   if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
115     return NULL;
116
117   return strdup(authorization_header);
118 } /* }}} char *wg_get_authorization_header */
119
120 typedef struct {
121   int code;
122   char *message;
123 } api_error_t;
124
125 static api_error_t *parse_api_error(char const *body) {
126   char errbuf[1024];
127   yajl_val root = yajl_tree_parse(body, errbuf, sizeof(errbuf));
128   if (root == NULL) {
129     ERROR("write_stackdriver plugin: yajl_tree_parse failed: %s", errbuf);
130     return NULL;
131   }
132
133   api_error_t *err = calloc(1, sizeof(*err));
134   if (err == NULL) {
135     ERROR("write_stackdriver plugin: calloc failed");
136     yajl_tree_free(root);
137     return NULL;
138   }
139
140   yajl_val code = yajl_tree_get(root, (char const *[]){"error", "code", NULL},
141                                 yajl_t_number);
142   if (YAJL_IS_INTEGER(code)) {
143     err->code = YAJL_GET_INTEGER(code);
144   }
145
146   yajl_val message = yajl_tree_get(
147       root, (char const *[]){"error", "message", NULL}, yajl_t_string);
148   if (YAJL_IS_STRING(message)) {
149     char const *m = YAJL_GET_STRING(message);
150     if (m != NULL) {
151       err->message = strdup(m);
152     }
153   }
154
155   return err;
156 }
157
158 static char *api_error_string(api_error_t *err, char *buffer,
159                               size_t buffer_size) {
160   if (err == NULL) {
161     strncpy(buffer, "Unknown error (API error is NULL)", buffer_size);
162   } else if (err->message == NULL) {
163     snprintf(buffer, buffer_size, "API error %d", err->code);
164   } else {
165     snprintf(buffer, buffer_size, "API error %d: %s", err->code, err->message);
166   }
167
168   return buffer;
169 }
170 #define API_ERROR_STRING(err) api_error_string(err, (char[1024]){""}, 1024)
171
172 // do_post does a HTTP POST request, assuming a JSON payload and using OAuth
173 // authentication. Returns -1 on error and the HTTP status code otherwise.
174 // ret_content, if not NULL, will contain the server's response.
175 // If ret_content is provided and the server responds with a 4xx or 5xx error,
176 // an appropriate message will be logged.
177 static int do_post(wg_callback_t *cb, char const *url, void const *payload,
178                    wg_memory_t *ret_content) {
179   if (cb->curl == NULL) {
180     cb->curl = curl_easy_init();
181     if (cb->curl == NULL) {
182       ERROR("write_stackdriver plugin: curl_easy_init() failed");
183       return -1;
184     }
185
186     curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
187     curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
188   }
189
190   curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
191   curl_easy_setopt(cb->curl, CURLOPT_URL, url);
192
193   long timeout_ms = 2 * CDTIME_T_TO_MS(plugin_get_interval());
194   if (timeout_ms < 10000) {
195     timeout_ms = 10000;
196   }
197   curl_easy_setopt(cb->curl, CURLOPT_TIMEOUT_MS, timeout_ms);
198
199   /* header */
200   char *auth_header = wg_get_authorization_header(cb);
201   if (auth_header == NULL) {
202     ERROR("write_stackdriver plugin: getting access token failed with");
203     return -1;
204   }
205
206   struct curl_slist *headers =
207       curl_slist_append(NULL, "Content-Type: application/json");
208   headers = curl_slist_append(headers, auth_header);
209   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
210
211   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
212
213   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION,
214                    ret_content ? wg_write_memory_cb : NULL);
215   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, ret_content);
216
217   int status = curl_easy_perform(cb->curl);
218
219   /* clean up that has to happen in any case */
220   curl_slist_free_all(headers);
221   sfree(auth_header);
222   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, NULL);
223   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, NULL);
224   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, NULL);
225
226   if (status != CURLE_OK) {
227     ERROR("write_stackdriver plugin: POST %s failed: %s", url, cb->curl_errbuf);
228     if (ret_content != NULL) {
229       sfree(ret_content->memory);
230       ret_content->size = 0;
231     }
232     return -1;
233   }
234
235   long http_code = 0;
236   curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
237
238   if (ret_content != NULL) {
239     if ((http_code >= 400) && (http_code < 500)) {
240       ERROR("write_stackdriver plugin: POST %s: %s", url,
241             API_ERROR_STRING(parse_api_error(ret_content->memory)));
242     } else if (http_code >= 500) {
243       WARNING("write_stackdriver plugin: POST %s: %s", url,
244               ret_content->memory);
245     }
246   }
247
248   return (int)http_code;
249 } /* int do_post */
250
251 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
252                                            char const *payload) {
253   char url[1024];
254   snprintf(url, sizeof(url), "%s/projects/%s/metricDescriptors", cb->url,
255            cb->project);
256   wg_memory_t response = {0};
257
258   int status = do_post(cb, url, payload, &response);
259   if (status == -1) {
260     ERROR("write_stackdriver plugin: POST %s failed", url);
261     return -1;
262   }
263   sfree(response.memory);
264
265   if (status != 200) {
266     ERROR("write_stackdriver plugin: POST %s: unexpected response code: got "
267           "%d, want 200",
268           url, status);
269     return -1;
270   }
271   return 0;
272 } /* int wg_call_metricdescriptor_create */
273
274 static int wg_call_timeseries_write(wg_callback_t *cb, char const *payload) {
275   char url[1024];
276   snprintf(url, sizeof(url), "%s/projects/%s/timeSeries", cb->url, cb->project);
277   wg_memory_t response = {0};
278
279   int status = do_post(cb, url, payload, &response);
280   if (status == -1) {
281     ERROR("write_stackdriver plugin: POST %s failed", url);
282     return -1;
283   }
284   sfree(response.memory);
285
286   if (status != 200) {
287     ERROR("write_stackdriver plugin: POST %s: unexpected response code: got "
288           "%d, want 200",
289           url, status);
290     return -1;
291   }
292   return 0;
293 } /* int wg_call_timeseries_write */
294
295 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
296 {
297   cb->timeseries_count = 0;
298   cb->send_buffer_init_time = cdtime();
299 } /* }}} wg_reset_buffer */
300
301 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
302 {
303   if (cb->curl != NULL)
304     return 0;
305
306   cb->formatter = sd_output_create(cb->resource);
307   if (cb->formatter == NULL) {
308     ERROR("write_stackdriver plugin: sd_output_create failed.");
309     return -1;
310   }
311
312   cb->curl = curl_easy_init();
313   if (cb->curl == NULL) {
314     ERROR("write_stackdriver plugin: curl_easy_init failed.");
315     return -1;
316   }
317
318   curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
319   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
320                    PACKAGE_NAME "/" PACKAGE_VERSION);
321   curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
322   wg_reset_buffer(cb);
323
324   return 0;
325 } /* }}} int wg_callback_init */
326
327 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
328 {
329   if (cb->timeseries_count == 0) {
330     cb->send_buffer_init_time = cdtime();
331     return 0;
332   }
333
334   /* timeout == 0  => flush unconditionally */
335   if (timeout > 0) {
336     cdtime_t now = cdtime();
337
338     if ((cb->send_buffer_init_time + timeout) > now)
339       return 0;
340   }
341
342   char *payload = sd_output_reset(cb->formatter);
343   int status = wg_call_timeseries_write(cb, payload);
344   if (status != 0) {
345     ERROR("write_stackdriver plugin: Sending buffer failed with status %d.",
346           status);
347   }
348
349   wg_reset_buffer(cb);
350   return status;
351 } /* }}} wg_flush_nolock */
352
353 static int wg_flush(cdtime_t timeout, /* {{{ */
354                     const char *identifier __attribute__((unused)),
355                     user_data_t *user_data) {
356   wg_callback_t *cb;
357   int status;
358
359   if (user_data == NULL)
360     return -EINVAL;
361
362   cb = user_data->data;
363
364   pthread_mutex_lock(&cb->lock);
365
366   if (cb->curl == NULL) {
367     status = wg_callback_init(cb);
368     if (status != 0) {
369       ERROR("write_stackdriver plugin: wg_callback_init failed.");
370       pthread_mutex_unlock(&cb->lock);
371       return -1;
372     }
373   }
374
375   status = wg_flush_nolock(timeout, cb);
376   pthread_mutex_unlock(&cb->lock);
377
378   return status;
379 } /* }}} int wg_flush */
380
381 static void wg_callback_free(void *data) /* {{{ */
382 {
383   wg_callback_t *cb = data;
384   if (cb == NULL)
385     return;
386
387   sd_output_destroy(cb->formatter);
388   cb->formatter = NULL;
389
390   sfree(cb->email);
391   sfree(cb->project);
392   sfree(cb->url);
393
394   oauth_destroy(cb->auth);
395   if (cb->curl) {
396     curl_easy_cleanup(cb->curl);
397   }
398
399   sfree(cb);
400 } /* }}} void wg_callback_free */
401
402 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
403                                         const value_list_t *vl) {
404   /* {{{ */
405   for (size_t i = 0; i < ds->ds_num; i++) {
406     char buffer[4096];
407
408     int status = sd_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
409     if (status != 0) {
410       ERROR("write_stackdriver plugin: sd_format_metric_descriptor failed "
411             "with status "
412             "%d",
413             status);
414       return status;
415     }
416
417     status = wg_call_metricdescriptor_create(cb, buffer);
418     if (status != 0) {
419       ERROR("write_stackdriver plugin: wg_call_metricdescriptor_create failed "
420             "with "
421             "status %d",
422             status);
423       return status;
424     }
425   }
426
427   return sd_output_register_metric(cb->formatter, ds, vl);
428 } /* }}} int wg_metric_descriptors_create */
429
430 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
431                     user_data_t *user_data) {
432   wg_callback_t *cb = user_data->data;
433   if (cb == NULL)
434     return EINVAL;
435
436   pthread_mutex_lock(&cb->lock);
437
438   if (cb->curl == NULL) {
439     int status = wg_callback_init(cb);
440     if (status != 0) {
441       ERROR("write_stackdriver plugin: wg_callback_init failed.");
442       pthread_mutex_unlock(&cb->lock);
443       return status;
444     }
445   }
446
447   int status;
448   while (42) {
449     status = sd_output_add(cb->formatter, ds, vl);
450     if (status == 0) { /* success */
451       break;
452     } else if (status == ENOBUFS) { /* success, flush */
453       wg_flush_nolock(0, cb);
454       status = 0;
455       break;
456     } else if (status == EEXIST) {
457       /* metric already in the buffer; flush and retry */
458       wg_flush_nolock(0, cb);
459       continue;
460     } else if (status == ENOENT) {
461       /* new metric, create metric descriptor first */
462       status = wg_metric_descriptors_create(cb, ds, vl);
463       if (status != 0) {
464         break;
465       }
466       continue;
467     } else {
468       break;
469     }
470   }
471
472   if (status == 0) {
473     cb->timeseries_count++;
474   }
475
476   pthread_mutex_unlock(&cb->lock);
477   return status;
478 } /* }}} int wg_write */
479
480 static void wg_check_scope(char const *email) /* {{{ */
481 {
482   char *scope = gce_scope(email);
483   if (scope == NULL) {
484     WARNING("write_stackdriver plugin: Unable to determine scope of this "
485             "instance.");
486     return;
487   }
488
489   if (strstr(scope, MONITORING_SCOPE) == NULL) {
490     size_t scope_len;
491
492     /* Strip trailing newline characers for printing. */
493     scope_len = strlen(scope);
494     while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
495       scope[--scope_len] = 0;
496
497     WARNING("write_stackdriver plugin: The determined scope of this instance "
498             "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
499             "to add this scope to the list of scopes passed to gcutil with "
500             "--service_account_scopes when creating the instance. "
501             "Alternatively, to use this plugin on an instance which does not "
502             "have this scope, use a Service Account.",
503             scope, MONITORING_SCOPE);
504   }
505
506   sfree(scope);
507 } /* }}} void wg_check_scope */
508
509 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
510 {
511   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
512     ERROR("write_stackdriver plugin: The \"%s\" option requires exactly one "
513           "string "
514           "argument.",
515           ci->key);
516     return EINVAL;
517   }
518   char *resource_type = ci->values[0].value.string;
519
520   if (cb->resource != NULL) {
521     sd_resource_destroy(cb->resource);
522   }
523
524   cb->resource = sd_resource_create(resource_type);
525   if (cb->resource == NULL) {
526     ERROR("write_stackdriver plugin: sd_resource_create(\"%s\") failed.",
527           resource_type);
528     return ENOMEM;
529   }
530
531   for (int i = 0; i < ci->children_num; i++) {
532     oconfig_item_t *child = ci->children + i;
533
534     if (strcasecmp("Label", child->key) == 0) {
535       if ((child->values_num != 2) ||
536           (child->values[0].type != OCONFIG_TYPE_STRING) ||
537           (child->values[1].type != OCONFIG_TYPE_STRING)) {
538         ERROR("write_stackdriver plugin: The \"Label\" option needs exactly "
539               "two string arguments.");
540         continue;
541       }
542
543       sd_resource_add_label(cb->resource, child->values[0].value.string,
544                             child->values[1].value.string);
545     }
546   }
547
548   return 0;
549 } /* }}} int wg_config_resource */
550
551 static int wg_config(oconfig_item_t *ci) /* {{{ */
552 {
553   if (ci == NULL) {
554     return EINVAL;
555   }
556
557   wg_callback_t *cb = calloc(1, sizeof(*cb));
558   if (cb == NULL) {
559     ERROR("write_stackdriver plugin: calloc failed.");
560     return ENOMEM;
561   }
562   cb->url = strdup(GCM_API_URL);
563   pthread_mutex_init(&cb->lock, /* attr = */ NULL);
564
565   char *credential_file = NULL;
566
567   for (int i = 0; i < ci->children_num; i++) {
568     oconfig_item_t *child = ci->children + i;
569     if (strcasecmp("Project", child->key) == 0)
570       cf_util_get_string(child, &cb->project);
571     else if (strcasecmp("Email", child->key) == 0)
572       cf_util_get_string(child, &cb->email);
573     else if (strcasecmp("Url", child->key) == 0)
574       cf_util_get_string(child, &cb->url);
575     else if (strcasecmp("CredentialFile", child->key) == 0)
576       cf_util_get_string(child, &credential_file);
577     else if (strcasecmp("Resource", child->key) == 0)
578       wg_config_resource(child, cb);
579     else {
580       ERROR("write_stackdriver plugin: Invalid configuration option: %s.",
581             child->key);
582       wg_callback_free(cb);
583       return EINVAL;
584     }
585   }
586
587   /* Set up authentication */
588   /* Option 1: Credentials file given => use service account */
589   if (credential_file != NULL) {
590     oauth_google_t cfg =
591         oauth_create_google_file(credential_file, MONITORING_SCOPE);
592     if (cfg.oauth == NULL) {
593       ERROR("write_stackdriver plugin: oauth_create_google_file failed");
594       wg_callback_free(cb);
595       return EINVAL;
596     }
597     cb->auth = cfg.oauth;
598
599     if (cb->project == NULL) {
600       cb->project = cfg.project_id;
601       INFO("write_stackdriver plugin: Automatically detected project ID: "
602            "\"%s\"",
603            cb->project);
604     } else {
605       sfree(cfg.project_id);
606     }
607   }
608   /* Option 2: Look for credentials in well-known places */
609   if (cb->auth == NULL) {
610     oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
611     cb->auth = cfg.oauth;
612
613     if (cb->project == NULL) {
614       cb->project = cfg.project_id;
615       INFO("write_stackdriver plugin: Automatically detected project ID: "
616            "\"%s\"",
617            cb->project);
618     } else {
619       sfree(cfg.project_id);
620     }
621   }
622
623   if ((cb->auth != NULL) && (cb->email != NULL)) {
624     NOTICE("write_stackdriver plugin: A service account email was configured "
625            "but is "
626            "not used for authentication because %s used instead.",
627            (credential_file != NULL) ? "a credential file was"
628                                      : "application default credentials were");
629   }
630
631   /* Option 3: Running on GCE => use metadata service */
632   if ((cb->auth == NULL) && gce_check()) {
633     wg_check_scope(cb->email);
634   } else if (cb->auth == NULL) {
635     ERROR("write_stackdriver plugin: Unable to determine credentials. Please "
636           "either "
637           "specify the \"Credentials\" option or set up Application Default "
638           "Credentials.");
639     wg_callback_free(cb);
640     return EINVAL;
641   }
642
643   if ((cb->project == NULL) && gce_check()) {
644     cb->project = gce_project_id();
645   }
646   if (cb->project == NULL) {
647     ERROR("write_stackdriver plugin: Unable to determine the project number. "
648           "Please specify the \"Project\" option manually.");
649     wg_callback_free(cb);
650     return EINVAL;
651   }
652
653   if ((cb->resource == NULL) && gce_check()) {
654     /* TODO(octo): add error handling */
655     cb->resource = sd_resource_create("gce_instance");
656     sd_resource_add_label(cb->resource, "project_id", gce_project_id());
657     sd_resource_add_label(cb->resource, "instance_id", gce_instance_id());
658     sd_resource_add_label(cb->resource, "zone", gce_zone());
659   }
660   if (cb->resource == NULL) {
661     /* TODO(octo): add error handling */
662     cb->resource = sd_resource_create("global");
663     sd_resource_add_label(cb->resource, "project_id", cb->project);
664   }
665
666   DEBUG("write_stackdriver plugin: Registering write callback with URL %s",
667         cb->url);
668   assert((cb->auth != NULL) || gce_check());
669
670   user_data_t user_data = {
671       .data = cb,
672   };
673   plugin_register_flush("write_stackdriver", wg_flush, &user_data);
674
675   user_data.free_func = wg_callback_free;
676   plugin_register_write("write_stackdriver", wg_write, &user_data);
677
678   return 0;
679 } /* }}} int wg_config */
680
681 static int wg_init(void) {
682   /* {{{ */
683   /* Call this while collectd is still single-threaded to avoid
684    * initialization issues in libgcrypt. */
685   curl_global_init(CURL_GLOBAL_SSL);
686
687   return 0;
688 } /* }}} int wg_init */
689
690 void module_register(void) /* {{{ */
691 {
692   plugin_register_complex_config("write_stackdriver", wg_config);
693   plugin_register_init("write_stackdriver", wg_init);
694 } /* }}} void module_register */