X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_http.c;h=ab8757ed67f151875bc806279425f3c86d971c87;hb=7665e23080bb08021efff821ed160c4360ed2cca;hp=912c92dd432f6352fa3a5087c2f40b72d0dd5a7a;hpb=cf7c7d8538822a2ad09021adf1a919b7ce1892c0;p=collectd.git diff --git a/src/write_http.c b/src/write_http.c index 912c92dd..ab8757ed 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -28,6 +28,7 @@ #include "common.h" #include "utils_cache.h" #include "utils_parse_option.h" +#include "utils_format_json.h" #if HAVE_PTHREAD_H # include @@ -45,6 +46,14 @@ struct wh_callback_s char *user; char *pass; char *credentials; + int verify_peer; + int verify_host; + char *cacert; + int store_rates; + +#define WH_FORMAT_COMMAND 0 +#define WH_FORMAT_JSON 1 + int format; CURL *curl; char curl_errbuf[CURL_ERROR_SIZE]; @@ -64,6 +73,13 @@ static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */ cb->send_buffer_free = sizeof (cb->send_buffer); cb->send_buffer_fill = 0; cb->send_buffer_init_time = time (NULL); + + if (cb->format == WH_FORMAT_JSON) + { + format_json_initialize (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free); + } } /* }}} wh_reset_buffer */ static int wh_send_buffer (wh_callback_t *cb) /* {{{ */ @@ -75,7 +91,7 @@ static int wh_send_buffer (wh_callback_t *cb) /* {{{ */ if (status != 0) { ERROR ("write_http plugin: curl_easy_perform failed with " - "staus %i: %s", + "status %i: %s", status, cb->curl_errbuf); } return (status); @@ -99,7 +115,11 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */ headers = NULL; headers = curl_slist_append (headers, "Accept: */*"); - headers = curl_slist_append (headers, "Content-Type: text/plain"); + if (cb->format == WH_FORMAT_JSON) + headers = curl_slist_append (headers, "Content-Type: application/json"); + else + headers = curl_slist_append (headers, "Content-Type: text/plain"); + headers = curl_slist_append (headers, "Expect:"); curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers); curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf); @@ -123,9 +143,15 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */ ssnprintf (cb->credentials, credentials_size, "%s:%s", cb->user, (cb->pass == NULL) ? "" : cb->pass); curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials); - curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); + curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY); } + curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer); + curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYHOST, + cb->verify_host ? 2 : 0); + if (cb->cacert != NULL) + curl_easy_setopt (cb->curl, CURLOPT_CAINFO, cb->cacert); + wh_reset_buffer (cb); return (0); @@ -148,14 +174,46 @@ static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ return (0); } - if (cb->send_buffer_fill <= 0) + if (cb->format == WH_FORMAT_COMMAND) { - cb->send_buffer_init_time = time (NULL); - return (0); + if (cb->send_buffer_fill <= 0) + { + cb->send_buffer_init_time = time (NULL); + return (0); + } + + status = wh_send_buffer (cb); + wh_reset_buffer (cb); } + else if (cb->format == WH_FORMAT_JSON) + { + if (cb->send_buffer_fill <= 2) + { + cb->send_buffer_init_time = time (NULL); + return (0); + } - status = wh_send_buffer (cb); - wh_reset_buffer (cb); + status = format_json_finalize (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free); + if (status != 0) + { + ERROR ("write_http: wh_flush_nolock: " + "format_json_finalize failed."); + wh_reset_buffer (cb); + return (status); + } + + status = wh_send_buffer (cb); + wh_reset_buffer (cb); + } + else + { + ERROR ("write_http: wh_flush_nolock: " + "Unknown format: %i", + cb->format); + return (-1); + } return (status); } /* }}} wh_flush_nolock */ @@ -207,17 +265,20 @@ static void wh_callback_free (void *data) /* {{{ */ sfree (cb->user); sfree (cb->pass); sfree (cb->credentials); + sfree (cb->cacert); sfree (cb); } /* }}} void wh_callback_free */ static int wh_value_list_to_string (char *buffer, /* {{{ */ size_t buffer_size, - const data_set_t *ds, const value_list_t *vl) + const data_set_t *ds, const value_list_t *vl, + wh_callback_t *cb) { size_t offset = 0; int status; int i; + gauge_t *rates = NULL; assert (0 == strcmp (ds->type, vl->type)); @@ -227,9 +288,15 @@ static int wh_value_list_to_string (char *buffer, /* {{{ */ status = ssnprintf (buffer + offset, buffer_size - offset, \ __VA_ARGS__); \ if (status < 1) \ + { \ + sfree (rates); \ return (-1); \ + } \ else if (((size_t) status) >= (buffer_size - offset)) \ + { \ + sfree (rates); \ return (-1); \ + } \ else \ offset += ((size_t) status); \ } while (0) @@ -237,54 +304,41 @@ static int wh_value_list_to_string (char *buffer, /* {{{ */ BUFFER_ADD ("%lu", (unsigned long) vl->time); for (i = 0; i < ds->ds_num; i++) -{ - if (ds->ds[i].type == DS_TYPE_GAUGE) - BUFFER_ADD (":%f", vl->values[i].gauge); - else if (ds->ds[i].type == DS_TYPE_COUNTER) - BUFFER_ADD (":%llu", vl->values[i].counter); - else if (ds->ds[i].type == DS_TYPE_DERIVE) - BUFFER_ADD (":%"PRIi64, vl->values[i].derive); - else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) - BUFFER_ADD (":%"PRIu64, vl->values[i].absolute); - else { - ERROR ("write_http plugin: Unknown data source type: %i", - ds->ds[i].type); - return (-1); - } -} /* for ds->ds_num */ + if (ds->ds[i].type == DS_TYPE_GAUGE) + BUFFER_ADD (":%f", vl->values[i].gauge); + else if (cb->store_rates) + { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + if (rates == NULL) + { + WARNING ("write_http plugin: " + "uc_get_rate failed."); + return (-1); + } + BUFFER_ADD (":%g", rates[i]); + } + else if (ds->ds[i].type == DS_TYPE_COUNTER) + BUFFER_ADD (":%llu", vl->values[i].counter); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + BUFFER_ADD (":%"PRIi64, vl->values[i].derive); + else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD (":%"PRIu64, vl->values[i].absolute); + else + { + ERROR ("write_http plugin: Unknown data source type: %i", + ds->ds[i].type); + sfree (rates); + return (-1); + } + } /* for ds->ds_num */ #undef BUFFER_ADD -return (0); -} /* }}} int wh_value_list_to_string */ - -static int config_set_string (char **ret_string, /* {{{ */ - oconfig_item_t *ci) -{ - char *string; - - if ((ci->values_num != 1) - || (ci->values[0].type != OCONFIG_TYPE_STRING)) - { - WARNING ("write_http plugin: The `%s' config option " - "needs exactly one string argument.", ci->key); - return (-1); - } - - string = strdup (ci->values[0].value.string); - if (string == NULL) - { - ERROR ("write_http plugin: strdup failed."); - return (-1); - } - - if (*ret_string != NULL) - free (*ret_string); - *ret_string = string; - + sfree (rates); return (0); -} /* }}} int config_set_string */ +} /* }}} int wh_value_list_to_string */ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{{ */ wh_callback_t *cb) @@ -312,7 +366,7 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{ /* Convert the values to an ASCII representation and put that into * `values'. */ - status = wh_value_list_to_string (values, sizeof (values), ds, vl); + status = wh_value_list_to_string (values, sizeof (values), ds, vl, cb); if (status != 0) { ERROR ("write_http plugin: error with " "wh_value_list_to_string"); @@ -320,7 +374,7 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{ } command_len = (size_t) ssnprintf (command, sizeof (command), - "PUTVAL %s interval=%i %s\n", + "PUTVAL %s interval=%i %s\r\n", key, vl->interval, values); if (command_len >= sizeof (command)) { ERROR ("write_http plugin: Command buffer too small: " @@ -371,6 +425,60 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{ return (0); } /* }}} int wh_write_command */ +static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + wh_callback_t *cb) +{ + int status; + + pthread_mutex_lock (&cb->send_lock); + + if (cb->curl == NULL) + { + status = wh_callback_init (cb); + if (status != 0) + { + ERROR ("write_http plugin: wh_callback_init failed."); + pthread_mutex_unlock (&cb->send_lock); + return (-1); + } + } + + status = format_json_value_list (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free, + ds, vl, cb->store_rates); + if (status == (-ENOMEM)) + { + status = wh_flush_nolock (/* timeout = */ -1, cb); + if (status != 0) + { + wh_reset_buffer (cb); + pthread_mutex_unlock (&cb->send_lock); + return (status); + } + + status = format_json_value_list (cb->send_buffer, + &cb->send_buffer_fill, + &cb->send_buffer_free, + ds, vl, cb->store_rates); + } + if (status != 0) + { + pthread_mutex_unlock (&cb->send_lock); + return (status); + } + + DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)", + cb->location, + cb->send_buffer_fill, sizeof (cb->send_buffer), + 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer))); + + /* Check if we have enough space for this command. */ + pthread_mutex_unlock (&cb->send_lock); + + return (0); +} /* }}} int wh_write_json */ + static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { @@ -382,10 +490,83 @@ static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ cb = user_data->data; - status = wh_write_command (ds, vl, cb); + if (cb->format == WH_FORMAT_JSON) + status = wh_write_json (ds, vl, cb); + else + status = wh_write_command (ds, vl, cb); + return (status); } /* }}} int wh_write */ +static int config_set_string (char **ret_string, /* {{{ */ + oconfig_item_t *ci) +{ + char *string; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING ("write_http plugin: The `%s' config option " + "needs exactly one string argument.", ci->key); + return (-1); + } + + string = strdup (ci->values[0].value.string); + if (string == NULL) + { + ERROR ("write_http plugin: strdup failed."); + return (-1); + } + + if (*ret_string != NULL) + free (*ret_string); + *ret_string = string; + + return (0); +} /* }}} int config_set_string */ + +static int config_set_boolean (int *dest, oconfig_item_t *ci) /* {{{ */ +{ + if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_BOOLEAN)) + { + WARNING ("write_http plugin: The `%s' config option " + "needs exactly one boolean argument.", ci->key); + return (-1); + } + + *dest = ci->values[0].value.boolean ? 1 : 0; + + return (0); +} /* }}} int config_set_boolean */ + +static int config_set_format (wh_callback_t *cb, /* {{{ */ + oconfig_item_t *ci) +{ + char *string; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING ("write_http plugin: The `%s' config option " + "needs exactly one string argument.", ci->key); + return (-1); + } + + string = ci->values[0].value.string; + if (strcasecmp ("Command", string) == 0) + cb->format = WH_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + cb->format = WH_FORMAT_JSON; + else + { + ERROR ("write_http plugin: Invalid format string: %s", + string); + return (-1); + } + + return (0); +} /* }}} int config_set_string */ + static int wh_config_url (oconfig_item_t *ci) /* {{{ */ { wh_callback_t *cb; @@ -399,6 +580,15 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ return (-1); } memset (cb, 0, sizeof (*cb)); + cb->location = NULL; + cb->user = NULL; + cb->pass = NULL; + cb->credentials = NULL; + cb->verify_peer = 1; + cb->verify_host = 1; + cb->cacert = NULL; + cb->format = WH_FORMAT_COMMAND; + cb->curl = NULL; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); @@ -414,6 +604,16 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ config_set_string (&cb->user, child); else if (strcasecmp ("Password", child->key) == 0) config_set_string (&cb->pass, child); + else if (strcasecmp ("VerifyPeer", child->key) == 0) + config_set_boolean (&cb->verify_peer, child); + else if (strcasecmp ("VerifyHost", child->key) == 0) + config_set_boolean (&cb->verify_host, child); + else if (strcasecmp ("CACert", child->key) == 0) + config_set_string (&cb->cacert, child); + else if (strcasecmp ("Format", child->key) == 0) + config_set_format (cb, child); + else if (strcasecmp ("StoreRates", child->key) == 0) + config_set_boolean (&cb->store_rates, child); else { ERROR ("write_http plugin: Invalid configuration "