X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fwrite_http.c;h=7cd19c3b69750c05fcdf019cf24167583d1ec8f2;hp=468892808f82e8bf5293664caa10c48ba79c4378;hb=54619dc85fd308b21ed09a0271e5c7383c7921b9;hpb=01d23e3f5daf016d03f82d92a76be2fe3decdca4 diff --git a/src/write_http.c b/src/write_http.c index 46889280..7cd19c3b 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -25,10 +25,10 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_format_json.h" -#include "utils_format_kairosdb.h" +#include "utils/common/common.h" +#include "utils/format_json/format_json.h" +#include "utils/format_kairosdb/format_kairosdb.h" #include @@ -36,6 +36,10 @@ #define WRITE_HTTP_DEFAULT_BUFFER_SIZE 4096 #endif +#ifndef WRITE_HTTP_DEFAULT_PREFIX +#define WRITE_HTTP_DEFAULT_PREFIX "collectd" +#endif + /* * Private variables */ @@ -46,16 +50,16 @@ struct wh_callback_s { char *user; char *pass; char *credentials; - _Bool verify_peer; - _Bool verify_host; + bool verify_peer; + bool verify_host; char *cacert; char *capath; char *clientkey; char *clientcert; char *clientkeypass; long sslversion; - _Bool store_rates; - _Bool log_http_error; + bool store_rates; + bool log_http_error; int low_speed_limit; time_t low_speed_time; int timeout; @@ -64,8 +68,8 @@ struct wh_callback_s { #define WH_FORMAT_JSON 1 #define WH_FORMAT_KAIROSDB 2 int format; - _Bool send_metrics; - _Bool send_notifications; + bool send_metrics; + bool send_notifications; CURL *curl; struct curl_slist *headers; @@ -78,9 +82,15 @@ struct wh_callback_s { cdtime_t send_buffer_init_time; pthread_mutex_t send_lock; + + int data_ttl; + char *metrics_prefix; }; typedef struct wh_callback_s wh_callback_t; +static char **http_attrs; +static size_t http_attrs_num; + static void wh_log_http_error(wh_callback_t *cb) { if (!cb->log_http_error) return; @@ -125,18 +135,18 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ "status %i: %s", status, cb->curl_errbuf); } - return (status); + return status; } /* }}} wh_post_nolock */ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ { if (cb->curl != NULL) - return (0); + return 0; cb->curl = curl_easy_init(); if (cb->curl == NULL) { ERROR("curl plugin: curl_easy_init failed."); - return (-1); + return -1; } if (cb->low_speed_limit > 0 && cb->low_speed_time > 0) { @@ -182,11 +192,11 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ cb->credentials = malloc(credentials_size); if (cb->credentials == NULL) { ERROR("curl plugin: malloc failed."); - return (-1); + return -1; } - ssnprintf(cb->credentials, credentials_size, "%s:%s", cb->user, - (cb->pass == NULL) ? "" : cb->pass); + snprintf(cb->credentials, credentials_size, "%s:%s", cb->user, + (cb->pass == NULL) ? "" : cb->pass); curl_easy_setopt(cb->curl, CURLOPT_USERPWD, cb->credentials); #endif curl_easy_setopt(cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY); @@ -210,7 +220,7 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ wh_reset_buffer(cb); - return (0); + return 0; } /* }}} int wh_callback_init */ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ @@ -218,7 +228,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ int status; DEBUG("write_http plugin: wh_flush_nolock: timeout = %.3f; " - "send_buffer_fill = %zu;", + "send_buffer_fill = %" PRIsz ";", CDTIME_T_TO_DOUBLE(timeout), cb->send_buffer_fill); /* timeout == 0 => flush unconditionally */ @@ -227,13 +237,13 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ now = cdtime(); if ((cb->send_buffer_init_time + timeout) > now) - return (0); + return 0; } if (cb->format == WH_FORMAT_COMMAND) { if (cb->send_buffer_fill == 0) { cb->send_buffer_init_time = cdtime(); - return (0); + return 0; } status = wh_post_nolock(cb, cb->send_buffer); @@ -241,7 +251,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { if (cb->send_buffer_fill <= 2) { cb->send_buffer_init_time = cdtime(); - return (0); + return 0; } status = format_json_finalize(cb->send_buffer, &cb->send_buffer_fill, @@ -250,7 +260,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ ERROR("write_http: wh_flush_nolock: " "format_json_finalize failed."); wh_reset_buffer(cb); - return (status); + return status; } status = wh_post_nolock(cb, cb->send_buffer); @@ -259,10 +269,10 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ ERROR("write_http: wh_flush_nolock: " "Unknown format: %i", cb->format); - return (-1); + return -1; } - return (status); + return status; } /* }}} wh_flush_nolock */ static int wh_flush(cdtime_t timeout, /* {{{ */ @@ -272,7 +282,7 @@ static int wh_flush(cdtime_t timeout, /* {{{ */ int status; if (user_data == NULL) - return (-EINVAL); + return -EINVAL; cb = user_data->data; @@ -281,13 +291,13 @@ static int wh_flush(cdtime_t timeout, /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = wh_flush_nolock(timeout, cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } /* }}} int wh_flush */ static void wh_callback_free(void *data) /* {{{ */ @@ -323,6 +333,7 @@ static void wh_callback_free(void *data) /* {{{ */ sfree(cb->clientcert); sfree(cb->clientkeypass); sfree(cb->send_buffer); + sfree(cb->metrics_prefix); sfree(cb); } /* }}} void wh_callback_free */ @@ -351,7 +362,7 @@ static int wh_write_command(const data_set_t *ds, status = FORMAT_VL(key, sizeof(key), vl); if (status != 0) { ERROR("write_http plugin: error with format_name"); - return (status); + return status; } escape_string(key, sizeof(key)); @@ -361,31 +372,31 @@ static int wh_write_command(const data_set_t *ds, if (status != 0) { ERROR("write_http plugin: error with " "wh_value_list_to_string"); - return (status); + return status; } - command_len = (size_t)ssnprintf(command, sizeof(command), - "PUTVAL %s interval=%.3f %s\r\n", key, - CDTIME_T_TO_DOUBLE(vl->interval), values); + command_len = (size_t)snprintf(command, sizeof(command), + "PUTVAL %s interval=%.3f %s\r\n", key, + CDTIME_T_TO_DOUBLE(vl->interval), values); if (command_len >= sizeof(command)) { ERROR("write_http plugin: Command buffer too small: " - "Need %zu bytes.", + "Need %" PRIsz " bytes.", command_len + 1); - return (-1); + return -1; } pthread_mutex_lock(&cb->send_lock); if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } if (command_len >= cb->send_buffer_free) { status = wh_flush_nolock(/* timeout = */ 0, cb); if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } } assert(command_len < cb->send_buffer_free); @@ -399,15 +410,15 @@ static int wh_write_command(const data_set_t *ds, cb->send_buffer_fill += command_len; cb->send_buffer_free -= command_len; - DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%) \"%s\"", cb->location, - cb->send_buffer_fill, cb->send_buffer_size, + DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%) \"%s\"", + cb->location, cb->send_buffer_fill, cb->send_buffer_size, 100.0 * ((double)cb->send_buffer_fill) / ((double)cb->send_buffer_size), command); /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_command */ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -418,7 +429,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = @@ -429,7 +440,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (status != 0) { wh_reset_buffer(cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } status = @@ -438,18 +449,18 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ } if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } - DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location, - cb->send_buffer_fill, cb->send_buffer_size, + DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)", + cb->location, cb->send_buffer_fill, cb->send_buffer_size, 100.0 * ((double)cb->send_buffer_fill) / ((double)cb->send_buffer_size)); /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_json */ static int wh_write_kairosdb(const data_set_t *ds, @@ -464,39 +475,41 @@ static int wh_write_kairosdb(const data_set_t *ds, if (status != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } } - status = format_kairosdb_value_list(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free, ds, vl, - cb->store_rates); + status = format_kairosdb_value_list( + cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl, + cb->store_rates, (char const *const *)http_attrs, http_attrs_num, + cb->data_ttl, cb->metrics_prefix); if (status == -ENOMEM) { status = wh_flush_nolock(/* timeout = */ 0, cb); if (status != 0) { wh_reset_buffer(cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } - status = format_kairosdb_value_list(cb->send_buffer, &cb->send_buffer_fill, - &cb->send_buffer_free, ds, vl, - cb->store_rates); + status = format_kairosdb_value_list( + cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl, + cb->store_rates, (char const *const *)http_attrs, http_attrs_num, + cb->data_ttl, cb->metrics_prefix); } if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } - DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location, - cb->send_buffer_fill, cb->send_buffer_size, + DEBUG("write_http plugin: <%s> buffer %" PRIsz "/%" PRIsz " (%g%%)", + cb->location, cb->send_buffer_fill, cb->send_buffer_size, 100.0 * ((double)cb->send_buffer_fill) / ((double)cb->send_buffer_size)); /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_kairosdb */ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -505,7 +518,7 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ int status; if (user_data == NULL) - return (-EINVAL); + return -EINVAL; cb = user_data->data; assert(cb->send_metrics); @@ -521,7 +534,7 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = wh_write_command(ds, vl, cb); break; } - return (status); + return status; } /* }}} int wh_write */ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ @@ -531,7 +544,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ int status; if ((ud == NULL) || (ud->data == NULL)) - return (EINVAL); + return EINVAL; cb = ud->data; assert(cb->send_notifications); @@ -546,13 +559,13 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = wh_post_nolock(cb, alert); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } /* }}} int wh_notify */ static int config_set_format(wh_callback_t *cb, /* {{{ */ @@ -563,7 +576,7 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ WARNING("write_http plugin: The `%s' config option " "needs exactly one string argument.", ci->key); - return (-1); + return -1; } string = ci->values[0].value.string; @@ -575,10 +588,10 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ cb->format = WH_FORMAT_KAIROSDB; else { ERROR("write_http plugin: Invalid format string: %s", string); - return (-1); + return -1; } - return (0); + return 0; } /* }}} int config_set_format */ static int wh_config_append_string(const char *name, @@ -587,16 +600,16 @@ static int wh_config_append_string(const char *name, struct curl_slist *temp = NULL; if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { WARNING("write_http plugin: `%s' needs exactly one string argument.", name); - return (-1); + return -1; } temp = curl_slist_append(*dest, ci->values[0].value.string); if (temp == NULL) - return (-1); + return -1; *dest = temp; - return (0); + return 0; } /* }}} int wh_config_append_string */ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ @@ -609,18 +622,26 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ cb = calloc(1, sizeof(*cb)); if (cb == NULL) { ERROR("write_http plugin: calloc failed."); - return (-1); + return -1; } - cb->verify_peer = 1; - cb->verify_host = 1; + cb->verify_peer = true; + cb->verify_host = true; cb->format = WH_FORMAT_COMMAND; cb->sslversion = CURL_SSLVERSION_DEFAULT; cb->low_speed_limit = 0; cb->timeout = 0; - cb->log_http_error = 0; + cb->log_http_error = false; cb->headers = NULL; - cb->send_metrics = 1; - cb->send_notifications = 0; + cb->send_metrics = true; + cb->send_notifications = false; + cb->data_ttl = 0; + cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX); + + if (cb->metrics_prefix == NULL) { + ERROR("write_http plugin: strdup failed."); + sfree(cb); + return -1; + } pthread_mutex_init(&cb->send_lock, /* attr = */ NULL); @@ -703,7 +724,38 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_boolean(child, &cb->log_http_error); else if (strcasecmp("Header", child->key) == 0) status = wh_config_append_string("Header", &cb->headers, child); - else { + else if (strcasecmp("Attribute", child->key) == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("write_http plugin: Attribute need both a key and a value."); + break; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("write_http plugin: Attribute needs string arguments."); + break; + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + break; + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + sfree(key); + break; + } + strarray_add(&http_attrs, &http_attrs_num, key); + strarray_add(&http_attrs, &http_attrs_num, val); + DEBUG("write_http plugin: got attribute: %s => %s", key, val); + sfree(key); + sfree(val); + } else if (strcasecmp("TTL", child->key) == 0) { + status = cf_util_get_int(child, &cb->data_ttl); + } else if (strcasecmp("Prefix", child->key) == 0) { + status = cf_util_get_string(child, &cb->metrics_prefix); + } else { ERROR("write_http plugin: Invalid configuration " "option: %s.", child->key); @@ -716,13 +768,13 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ if (status != 0) { wh_callback_free(cb); - return (status); + return status; } if (cb->location == NULL) { ERROR("write_http plugin: no URL defined for instance '%s'", cb->name); wh_callback_free(cb); - return (-1); + return -1; } if (!cb->send_metrics && !cb->send_notifications) { @@ -730,9 +782,12 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ "are enabled for \"%s\".", cb->name); wh_callback_free(cb); - return (-1); + return -1; } + if (strlen(cb->metrics_prefix) == 0) + sfree(cb->metrics_prefix); + if (cb->low_speed_limit > 0) cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval()); @@ -747,19 +802,21 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ /* Allocate the buffer. */ cb->send_buffer = malloc(cb->send_buffer_size); if (cb->send_buffer == NULL) { - ERROR("write_http plugin: malloc(%zu) failed.", cb->send_buffer_size); + ERROR("write_http plugin: malloc(%" PRIsz ") failed.", + cb->send_buffer_size); wh_callback_free(cb); - return (-1); + return -1; } /* Nulls the buffer and sets ..._free and ..._fill. */ wh_reset_buffer(cb); - ssnprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name); + snprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name); DEBUG("write_http: Registering write callback '%s' with URL '%s'", callback_name, cb->location); user_data_t user_data = { - .data = cb, .free_func = wh_callback_free, + .data = cb, + .free_func = wh_callback_free, }; if (cb->send_metrics) { @@ -774,7 +831,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ user_data.free_func = NULL; } - return (0); + return 0; } /* }}} int wh_config_node */ static int wh_config(oconfig_item_t *ci) /* {{{ */ @@ -796,7 +853,7 @@ static int wh_config(oconfig_item_t *ci) /* {{{ */ } } - return (0); + return 0; } /* }}} int wh_config */ static int wh_init(void) /* {{{ */ @@ -804,7 +861,7 @@ static int wh_init(void) /* {{{ */ /* Call this while collectd is still single-threaded to avoid * initialization issues in libgcrypt. */ curl_global_init(CURL_GLOBAL_SSL); - return (0); + return 0; } /* }}} int wh_init */ void module_register(void) /* {{{ */ @@ -812,5 +869,3 @@ void module_register(void) /* {{{ */ plugin_register_complex_config("write_http", wh_config); plugin_register_init("write_http", wh_init); } /* }}} void module_register */ - -/* vim: set fdm=marker sw=8 ts=8 tw=78 et : */