write_gcm plugin: New plugin for Google Cloud Monitoring.
[collectd.git] / src / write_gcm.c
1 /**
2  * collectd - src/write_gcm.c
3  * ISC license
4  *
5  * Copyright (C) 2017  Florian Forster
6  *
7  * Permission to use, copy, modify, and/or distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Florian Forster <octo at collectd.org>
21  **/
22
23 #include "collectd.h"
24
25 #include "common.h"
26 #include "configfile.h"
27 #include "plugin.h"
28 #include "utils_format_gcm.h"
29 #include "utils_gce.h"
30 #include "utils_oauth.h"
31
32 #include <curl/curl.h>
33 #include <pthread.h>
34
35 /*
36  * Private variables
37  */
38 #ifndef GCM_API_URL
39 #define GCM_API_URL "https://monitoring.googleapis.com/v3"
40 #endif
41
42 #ifndef MONITORING_SCOPE
43 #define MONITORING_SCOPE "https://www.googleapis.com/auth/monitoring"
44 #endif
45
46 struct wg_callback_s {
47   /* config */
48   char *email;
49   char *project;
50   char *url;
51   gcm_resource_t *resource;
52
53   /* runtime */
54   oauth_t *auth;
55   gcm_output_t *formatter;
56   CURL *curl;
57   char curl_errbuf[CURL_ERROR_SIZE];
58   /* used by flush */
59   size_t timeseries_count;
60   cdtime_t send_buffer_init_time;
61
62   pthread_mutex_t lock;
63 };
64 typedef struct wg_callback_s wg_callback_t;
65
66 struct wg_memory_s {
67   char *memory;
68   size_t size;
69 };
70 typedef struct wg_memory_s wg_memory_t;
71
72 static size_t wg_write_memory_cb(void *contents, size_t size,
73                                  size_t nmemb, /* {{{ */
74                                  void *userp) {
75   size_t realsize = size * nmemb;
76   wg_memory_t *mem = (wg_memory_t *)userp;
77
78   if (0x7FFFFFF0 < mem->size || 0x7FFFFFF0 - mem->size < realsize) {
79     ERROR("integer overflow");
80     return 0;
81   }
82
83   mem->memory = (char *)realloc((void *)mem->memory, mem->size + realsize + 1);
84   if (mem->memory == NULL) {
85     /* out of memory! */
86     ERROR("wg_write_memory_cb: not enough memory (realloc returned NULL)");
87     return 0;
88   }
89
90   memcpy(&(mem->memory[mem->size]), contents, realsize);
91   mem->size += realsize;
92   mem->memory[mem->size] = 0;
93   return realsize;
94 } /* }}} size_t wg_write_memory_cb */
95
96 static char *wg_get_authorization_header(wg_callback_t *cb) { /* {{{ */
97   int status = 0;
98   char access_token[256];
99   char authorization_header[256];
100
101   assert((cb->auth != NULL) || gce_check());
102   if (cb->auth != NULL)
103     status = oauth_access_token(cb->auth, access_token, sizeof(access_token));
104   else
105     status = gce_access_token(cb->email, access_token, sizeof(access_token));
106   if (status != 0) {
107     ERROR("write_gcm plugin: Failed to get access token");
108     return NULL;
109   }
110
111   status = snprintf(authorization_header, sizeof(authorization_header),
112                     "Authorization: Bearer %s", access_token);
113   if ((status < 1) || ((size_t)status >= sizeof(authorization_header)))
114     return NULL;
115
116   return strdup(authorization_header);
117 } /* }}} char *wg_get_authorization_header */
118
119 static int wg_call_metricdescriptor_create(wg_callback_t *cb,
120                                            char const *payload) {
121   /* {{{ */
122   char final_url[1024];
123   int status =
124       snprintf(final_url, sizeof(final_url), "%s/projects/%s/metricDescriptors",
125                cb->url, cb->project);
126   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
127     return -1;
128
129   char *authorization_header = wg_get_authorization_header(cb);
130   if (authorization_header == NULL)
131     return -1;
132
133   struct curl_slist *headers = NULL;
134   headers = curl_slist_append(headers, "Content-Type: application/json");
135   headers = curl_slist_append(headers, authorization_header);
136
137   CURL *curl = curl_easy_init();
138   if (!curl) {
139     ERROR("write_gcm plugin: curl_easy_init failed.");
140     curl_slist_free_all(headers);
141     sfree(authorization_header);
142     return -1;
143   }
144
145   curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
146   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
147                    PACKAGE_NAME "/" PACKAGE_VERSION);
148   char curl_errbuf[CURL_ERROR_SIZE];
149   curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, curl_errbuf);
150   curl_easy_setopt(curl, CURLOPT_URL, final_url);
151   curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
152   curl_easy_setopt(curl, CURLOPT_POST, 1L);
153   curl_easy_setopt(curl, CURLOPT_POSTFIELDS, payload);
154
155   wg_memory_t res = {
156       .memory = NULL, .size = 0,
157   };
158   curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
159   curl_easy_setopt(curl, CURLOPT_WRITEDATA, &res);
160
161   status = curl_easy_perform(curl);
162   if (status != CURLE_OK) {
163     ERROR("write_gcm plugin: curl_easy_perform failed with status %d: %s",
164           status, curl_errbuf);
165     sfree(res.memory);
166     curl_easy_cleanup(curl);
167     curl_slist_free_all(headers);
168     sfree(authorization_header);
169     return -1;
170   }
171
172   long http_code = 0;
173   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
174   if ((http_code < 200) || (http_code >= 300)) {
175     ERROR("write_gcm plugin: POST request to %s failed: HTTP error %ld",
176           final_url, http_code);
177     INFO("write_gcm plugin: Server replied: %s", res.memory);
178     sfree(res.memory);
179     curl_easy_cleanup(curl);
180     curl_slist_free_all(headers);
181     sfree(authorization_header);
182     return -1;
183   }
184
185   sfree(res.memory);
186   curl_easy_cleanup(curl);
187   curl_slist_free_all(headers);
188   sfree(authorization_header);
189   return 0;
190 } /* }}} int wg_call_metricdescriptor_create */
191
192 static void wg_reset_buffer(wg_callback_t *cb) /* {{{ */
193 {
194   cb->timeseries_count = 0;
195   cb->send_buffer_init_time = cdtime();
196 } /* }}} wg_reset_buffer */
197
198 static int wg_call_timeseries_write(wg_callback_t *cb,
199                                     char const *payload) /* {{{ */
200 {
201   char final_url[1024];
202   int status = snprintf(final_url, sizeof(final_url),
203                         "%s/projects/%s/timeSeries", cb->url, cb->project);
204   if ((status < 1) || ((size_t)status >= sizeof(final_url)))
205     return -1;
206
207   char *authorization_header = wg_get_authorization_header(cb);
208   if (authorization_header == NULL)
209     return -1;
210
211   struct curl_slist *headers = NULL;
212   headers = curl_slist_append(headers, authorization_header);
213   headers = curl_slist_append(headers, "Content-Type: application/json");
214
215   curl_easy_setopt(cb->curl, CURLOPT_URL, final_url);
216   curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, headers);
217   curl_easy_setopt(cb->curl, CURLOPT_POST, 1L);
218   curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, payload);
219
220   wg_memory_t res = {
221       .memory = NULL, .size = 0,
222   };
223   curl_easy_setopt(cb->curl, CURLOPT_WRITEFUNCTION, wg_write_memory_cb);
224   curl_easy_setopt(cb->curl, CURLOPT_WRITEDATA, &res);
225
226   status = curl_easy_perform(cb->curl);
227   if (status != CURLE_OK) {
228     ERROR("write_gcm plugin: curl_easy_perform failed with status %d: %s",
229           status, cb->curl_errbuf);
230     sfree(res.memory);
231     curl_slist_free_all(headers);
232     sfree(authorization_header);
233     return -1;
234   }
235
236   long http_code = 0;
237   curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &http_code);
238   if ((http_code < 200) || (http_code >= 300)) {
239     ERROR("write_gcm plugin: POST request to %s failed: HTTP error %ld",
240           final_url, http_code);
241     INFO("write_gcm plugin: Server replied: %s", res.memory);
242     sfree(res.memory);
243     curl_slist_free_all(headers);
244     sfree(authorization_header);
245     return -1;
246   }
247
248   sfree(res.memory);
249   curl_slist_free_all(headers);
250   sfree(authorization_header);
251   return status;
252 } /* }}} wg_call_timeseries_write */
253
254 static int wg_callback_init(wg_callback_t *cb) /* {{{ */
255 {
256   if (cb->curl != NULL)
257     return 0;
258
259   cb->formatter = gcm_output_create(cb->resource);
260   if (cb->formatter == NULL) {
261     ERROR("write_gcm plugin: gcm_output_create failed.");
262     return -1;
263   }
264
265   cb->curl = curl_easy_init();
266   if (cb->curl == NULL) {
267     ERROR("write_gcm plugin: curl_easy_init failed.");
268     return -1;
269   }
270
271   curl_easy_setopt(cb->curl, CURLOPT_NOSIGNAL, 1L);
272   curl_easy_setopt(cb->curl, CURLOPT_USERAGENT,
273                    PACKAGE_NAME "/" PACKAGE_VERSION);
274   curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
275   wg_reset_buffer(cb);
276
277   return 0;
278 } /* }}} int wg_callback_init */
279
280 static int wg_flush_nolock(cdtime_t timeout, wg_callback_t *cb) /* {{{ */
281 {
282   if (cb->timeseries_count == 0) {
283     cb->send_buffer_init_time = cdtime();
284     return 0;
285   }
286
287   /* timeout == 0  => flush unconditionally */
288   if (timeout > 0) {
289     cdtime_t now = cdtime();
290
291     if ((cb->send_buffer_init_time + timeout) > now)
292       return 0;
293   }
294
295   char *payload = gcm_output_reset(cb->formatter);
296   int status = wg_call_timeseries_write(cb, payload);
297   if (status != 0) {
298     ERROR("write_gcm plugin: Sending buffer failed with status %d.", status);
299   }
300
301   wg_reset_buffer(cb);
302   return status;
303 } /* }}} wg_flush_nolock */
304
305 static int wg_flush(cdtime_t timeout, /* {{{ */
306                     const char *identifier __attribute__((unused)),
307                     user_data_t *user_data) {
308   wg_callback_t *cb;
309   int status;
310
311   if (user_data == NULL)
312     return -EINVAL;
313
314   cb = user_data->data;
315
316   pthread_mutex_lock(&cb->lock);
317
318   if (cb->curl == NULL) {
319     status = wg_callback_init(cb);
320     if (status != 0) {
321       ERROR("write_gcm plugin: wg_callback_init failed.");
322       pthread_mutex_unlock(&cb->lock);
323       return -1;
324     }
325   }
326
327   status = wg_flush_nolock(timeout, cb);
328   pthread_mutex_unlock(&cb->lock);
329
330   return status;
331 } /* }}} int wg_flush */
332
333 static void wg_callback_free(void *data) /* {{{ */
334 {
335   wg_callback_t *cb = data;
336   if (cb == NULL)
337     return;
338
339   gcm_output_destroy(cb->formatter);
340   cb->formatter = NULL;
341
342   sfree(cb->email);
343   sfree(cb->project);
344   sfree(cb->url);
345
346   oauth_destroy(cb->auth);
347   if (cb->curl) {
348     curl_easy_cleanup(cb->curl);
349   }
350
351   sfree(cb);
352 } /* }}} void wg_callback_free */
353
354 static int wg_metric_descriptors_create(wg_callback_t *cb, const data_set_t *ds,
355                                         const value_list_t *vl) {
356   /* {{{ */
357   for (size_t i = 0; i < ds->ds_num; i++) {
358     char buffer[4096];
359
360     int status =
361         gcm_format_metric_descriptor(buffer, sizeof(buffer), ds, vl, i);
362     if (status != 0) {
363       ERROR("write_gcm plugin: gcm_format_metric_descriptor failed with status "
364             "%d",
365             status);
366       return status;
367     }
368
369     status = wg_call_metricdescriptor_create(cb, buffer);
370     if (status != 0) {
371       ERROR("write_gcm plugin: wg_call_metricdescriptor_create failed with "
372             "status %d",
373             status);
374       return status;
375     }
376   }
377
378   return gcm_output_register_metric(cb->formatter, ds, vl);
379 } /* }}} int wg_metric_descriptors_create */
380
381 static int wg_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
382                     user_data_t *user_data) {
383   wg_callback_t *cb = user_data->data;
384   if (cb == NULL)
385     return EINVAL;
386
387   pthread_mutex_lock(&cb->lock);
388
389   if (cb->curl == NULL) {
390     int status = wg_callback_init(cb);
391     if (status != 0) {
392       ERROR("write_gcm plugin: wg_callback_init failed.");
393       pthread_mutex_unlock(&cb->lock);
394       return status;
395     }
396   }
397
398   int status;
399   while (42) {
400     status = gcm_output_add(cb->formatter, ds, vl);
401     if (status == 0) { /* success */
402       break;
403     } else if (status == ENOBUFS) { /* success, flush */
404       wg_flush_nolock(0, cb);
405       status = 0;
406       break;
407     } else if (status == EEXIST) {
408       /* metric already in the buffer; flush and retry */
409       wg_flush_nolock(0, cb);
410       continue;
411     } else if (status == ENOENT) {
412       /* new metric, create metric descriptor first */
413       status = wg_metric_descriptors_create(cb, ds, vl);
414       if (status != 0) {
415         break;
416       }
417       continue;
418     } else {
419       break;
420     }
421   }
422
423   if (status == 0) {
424     cb->timeseries_count++;
425   }
426
427   pthread_mutex_unlock(&cb->lock);
428   return status;
429 } /* }}} int wg_write */
430
431 static void wg_check_scope(char const *email) /* {{{ */
432 {
433   char *scope = gce_scope(email);
434   if (scope == NULL) {
435     WARNING("write_gcm plugin: Unable to determine scope of this instance.");
436     return;
437   }
438
439   if (strstr(scope, MONITORING_SCOPE) == NULL) {
440     size_t scope_len;
441
442     /* Strip trailing newline characers for printing. */
443     scope_len = strlen(scope);
444     while ((scope_len > 0) && (iscntrl((int)scope[scope_len - 1])))
445       scope[--scope_len] = 0;
446
447     WARNING("write_gcm plugin: The determined scope of this instance "
448             "(\"%s\") does not contain the monitoring scope (\"%s\"). You need "
449             "to add this scope to the list of scopes passed to gcutil with "
450             "--service_account_scopes when creating the instance. "
451             "Alternatively, to use this plugin on an instance which does not "
452             "have this scope, use a Service Account.",
453             scope, MONITORING_SCOPE);
454   }
455
456   sfree(scope);
457 } /* }}} void wg_check_scope */
458
459 static int wg_config_resource(oconfig_item_t *ci, wg_callback_t *cb) /* {{{ */
460 {
461   if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
462     ERROR("write_gcm plugin: The \"%s\" option requires exactly one string "
463           "argument.",
464           ci->key);
465     return EINVAL;
466   }
467   char *resource_type = ci->values[0].value.string;
468
469   if (cb->resource != NULL) {
470     gcm_resource_destroy(cb->resource);
471   }
472
473   cb->resource = gcm_resource_create(resource_type);
474   if (cb->resource == NULL) {
475     ERROR("write_gcm plugin: gcm_resource_create(\"%s\") failed.",
476           resource_type);
477     return ENOMEM;
478   }
479
480   for (int i = 0; i < ci->children_num; i++) {
481     oconfig_item_t *child = ci->children + i;
482
483     if ((child->values_num != 1) ||
484         (child->values[0].type != OCONFIG_TYPE_STRING)) {
485       ERROR("write_gcm plugin: Resource labels must have exactly one string "
486             "value. Ignoring label \"%s\".",
487             child->key);
488       continue;
489     }
490
491     gcm_resource_add_label(cb->resource, child->key,
492                            child->values[0].value.string);
493   }
494
495   return 0;
496 } /* }}} int wg_config_resource */
497
498 static int wg_config(oconfig_item_t *ci) /* {{{ */
499 {
500   if (ci == NULL) {
501     return EINVAL;
502   }
503
504   wg_callback_t *cb = calloc(1, sizeof(*cb));
505   if (cb == NULL) {
506     ERROR("write_gcm plugin: calloc failed.");
507     return ENOMEM;
508   }
509   cb->url = strdup(GCM_API_URL);
510   pthread_mutex_init(&cb->lock, /* attr = */ NULL);
511
512   char *credential_file = NULL;
513
514   for (int i = 0; i < ci->children_num; i++) {
515     oconfig_item_t *child = ci->children + i;
516     if (strcasecmp("Project", child->key) == 0)
517       cf_util_get_string(child, &cb->project);
518     else if (strcasecmp("Email", child->key) == 0)
519       cf_util_get_string(child, &cb->email);
520     else if (strcasecmp("Url", child->key) == 0)
521       cf_util_get_string(child, &cb->url);
522     else if (strcasecmp("CredentialFile", child->key) == 0)
523       cf_util_get_string(child, &credential_file);
524     else if (strcasecmp("Resource", child->key) == 0)
525       wg_config_resource(child, cb);
526     else {
527       ERROR("write_gcm plugin: Invalid configuration option: %s.", child->key);
528       wg_callback_free(cb);
529       return EINVAL;
530     }
531   }
532
533   /* Set up authentication */
534   /* Option 1: Credentials file given => use service account */
535   if (credential_file != NULL) {
536     oauth_google_t cfg =
537         oauth_create_google_file(credential_file, MONITORING_SCOPE);
538     if (cfg.oauth == NULL) {
539       ERROR("write_gcm plugin: oauth_create_google_file failed");
540       wg_callback_free(cb);
541       return EINVAL;
542     }
543     cb->auth = cfg.oauth;
544
545     if (cb->project == NULL) {
546       cb->project = cfg.project_id;
547       INFO("write_gcm plugin: Automatically detected project ID: \"%s\"",
548            cb->project);
549     } else {
550       sfree(cfg.project_id);
551     }
552   }
553   /* Option 2: Look for credentials in well-known places */
554   if (cb->auth == NULL) {
555     oauth_google_t cfg = oauth_create_google_default(MONITORING_SCOPE);
556     cb->auth = cfg.oauth;
557
558     if (cb->project == NULL) {
559       cb->project = cfg.project_id;
560       INFO("write_gcm plugin: Automatically detected project ID: \"%s\"",
561            cb->project);
562     } else {
563       sfree(cfg.project_id);
564     }
565   }
566
567   if ((cb->auth != NULL) && (cb->email != NULL)) {
568     NOTICE("write_gcm plugin: A service account email was configured but is "
569            "not used for authentication because %s used instead.",
570            (credential_file != NULL) ? "a credential file was"
571                                      : "application default credentials were");
572   }
573
574   /* Option 3: Running on GCE => use metadata service */
575   if ((cb->auth == NULL) && gce_check()) {
576     wg_check_scope(cb->email);
577   } else if (cb->auth == NULL) {
578     ERROR("write_gcm plugin: Unable to determine credentials. Please either "
579           "specify the \"Credentials\" option or set up Application Default "
580           "Credentials.");
581     wg_callback_free(cb);
582     return EINVAL;
583   }
584
585   if ((cb->project == NULL) && gce_check()) {
586     cb->project = gce_project_id();
587   }
588   if (cb->project == NULL) {
589     ERROR("write_gcm plugin: Unable to determine the project number. "
590           "Please specify the \"Project\" option manually.");
591     wg_callback_free(cb);
592     return EINVAL;
593   }
594
595   if ((cb->resource == NULL) && gce_check()) {
596     /* TODO(octo): add error handling */
597     cb->resource = gcm_resource_create("gce_instance");
598     gcm_resource_add_label(cb->resource, "project_id", gce_project_id());
599     gcm_resource_add_label(cb->resource, "instance_id", gce_instance_id());
600     gcm_resource_add_label(cb->resource, "zone", gce_zone());
601   }
602   if (cb->resource == NULL) {
603     /* TODO(octo): add error handling */
604     cb->resource = gcm_resource_create("global");
605     gcm_resource_add_label(cb->resource, "project_id", cb->project);
606   }
607
608   DEBUG("write_gcm plugin: Registering write callback with URL %s", cb->url);
609   assert((cb->auth != NULL) || gce_check());
610
611   user_data_t user_data = {
612       .data = cb,
613   };
614   plugin_register_flush("write_gcm", wg_flush, &user_data);
615
616   user_data.free_func = wg_callback_free;
617   plugin_register_write("write_gcm", wg_write, &user_data);
618
619   return 0;
620 } /* }}} int wg_config */
621
622 static int wg_init(void) {
623   /* {{{ */
624   /* Call this while collectd is still single-threaded to avoid
625    * initialization issues in libgcrypt. */
626   curl_global_init(CURL_GLOBAL_SSL);
627
628   return 0;
629 } /* }}} int wg_init */
630
631 void module_register(void) /* {{{ */
632 {
633   plugin_register_complex_config("write_gcm", wg_config);
634   plugin_register_init("write_gcm", wg_init);
635 } /* }}} void module_register */
636
637 /* vim: set sw=2 sts=2 et fdm=marker : */