X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fwrite_http.c;h=87e518b6857605a814abed1460a014958dc016ec;hp=68f6dc42b47f4295baa642773341ebb30a0f00a4;hb=da11ce02eb202b3e01d3e2d1b40f248a84430973;hpb=c0aeca0049de03c3da80f6072eda47eb722f931a diff --git a/src/write_http.c b/src/write_http.c index 68f6dc42..87e518b6 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -36,6 +36,10 @@ #define WRITE_HTTP_DEFAULT_BUFFER_SIZE 4096 #endif +#ifndef WRITE_HTTP_DEFAULT_PREFIX +#define WRITE_HTTP_DEFAULT_PREFIX "collectd" +#endif + /* * Private variables */ @@ -78,6 +82,9 @@ struct wh_callback_s { cdtime_t send_buffer_init_time; pthread_mutex_t send_lock; + + int data_ttl; + char *metrics_prefix; }; typedef struct wh_callback_s wh_callback_t; @@ -117,6 +124,7 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ { int status = 0; + curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location); curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data); status = curl_easy_perform(cb->curl); @@ -127,18 +135,18 @@ static int wh_post_nolock(wh_callback_t *cb, char const *data) /* {{{ */ "status %i: %s", status, cb->curl_errbuf); } - return (status); + return status; } /* }}} wh_post_nolock */ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ { if (cb->curl != NULL) - return (0); + return 0; cb->curl = curl_easy_init(); if (cb->curl == NULL) { ERROR("curl plugin: curl_easy_init failed."); - return (-1); + return -1; } if (cb->low_speed_limit > 0 && cb->low_speed_time > 0) { @@ -166,7 +174,6 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers); curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf); - curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location); curl_easy_setopt(cb->curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(cb->curl, CURLOPT_MAXREDIRS, 50L); @@ -185,11 +192,11 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ cb->credentials = malloc(credentials_size); if (cb->credentials == NULL) { ERROR("curl plugin: malloc failed."); - return (-1); + return -1; } - ssnprintf(cb->credentials, credentials_size, "%s:%s", cb->user, - (cb->pass == NULL) ? "" : cb->pass); + snprintf(cb->credentials, credentials_size, "%s:%s", cb->user, + (cb->pass == NULL) ? "" : cb->pass); curl_easy_setopt(cb->curl, CURLOPT_USERPWD, cb->credentials); #endif curl_easy_setopt(cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY); @@ -213,7 +220,7 @@ static int wh_callback_init(wh_callback_t *cb) /* {{{ */ wh_reset_buffer(cb); - return (0); + return 0; } /* }}} int wh_callback_init */ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ @@ -230,13 +237,13 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ now = cdtime(); if ((cb->send_buffer_init_time + timeout) > now) - return (0); + return 0; } if (cb->format == WH_FORMAT_COMMAND) { if (cb->send_buffer_fill == 0) { cb->send_buffer_init_time = cdtime(); - return (0); + return 0; } status = wh_post_nolock(cb, cb->send_buffer); @@ -244,7 +251,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ } else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) { if (cb->send_buffer_fill <= 2) { cb->send_buffer_init_time = cdtime(); - return (0); + return 0; } status = format_json_finalize(cb->send_buffer, &cb->send_buffer_fill, @@ -253,7 +260,7 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ ERROR("write_http: wh_flush_nolock: " "format_json_finalize failed."); wh_reset_buffer(cb); - return (status); + return status; } status = wh_post_nolock(cb, cb->send_buffer); @@ -262,10 +269,10 @@ static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */ ERROR("write_http: wh_flush_nolock: " "Unknown format: %i", cb->format); - return (-1); + return -1; } - return (status); + return status; } /* }}} wh_flush_nolock */ static int wh_flush(cdtime_t timeout, /* {{{ */ @@ -275,7 +282,7 @@ static int wh_flush(cdtime_t timeout, /* {{{ */ int status; if (user_data == NULL) - return (-EINVAL); + return -EINVAL; cb = user_data->data; @@ -284,13 +291,13 @@ static int wh_flush(cdtime_t timeout, /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = wh_flush_nolock(timeout, cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } /* }}} int wh_flush */ static void wh_callback_free(void *data) /* {{{ */ @@ -326,6 +333,7 @@ static void wh_callback_free(void *data) /* {{{ */ sfree(cb->clientcert); sfree(cb->clientkeypass); sfree(cb->send_buffer); + sfree(cb->metrics_prefix); sfree(cb); } /* }}} void wh_callback_free */ @@ -354,7 +362,7 @@ static int wh_write_command(const data_set_t *ds, status = FORMAT_VL(key, sizeof(key), vl); if (status != 0) { ERROR("write_http plugin: error with format_name"); - return (status); + return status; } escape_string(key, sizeof(key)); @@ -364,31 +372,31 @@ static int wh_write_command(const data_set_t *ds, if (status != 0) { ERROR("write_http plugin: error with " "wh_value_list_to_string"); - return (status); + return status; } - command_len = (size_t)ssnprintf(command, sizeof(command), - "PUTVAL %s interval=%.3f %s\r\n", key, - CDTIME_T_TO_DOUBLE(vl->interval), values); + command_len = (size_t)snprintf(command, sizeof(command), + "PUTVAL %s interval=%.3f %s\r\n", key, + CDTIME_T_TO_DOUBLE(vl->interval), values); if (command_len >= sizeof(command)) { ERROR("write_http plugin: Command buffer too small: " "Need %zu bytes.", command_len + 1); - return (-1); + return -1; } pthread_mutex_lock(&cb->send_lock); if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } if (command_len >= cb->send_buffer_free) { status = wh_flush_nolock(/* timeout = */ 0, cb); if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } } assert(command_len < cb->send_buffer_free); @@ -410,7 +418,7 @@ static int wh_write_command(const data_set_t *ds, /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_command */ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -421,7 +429,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = @@ -432,7 +440,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (status != 0) { wh_reset_buffer(cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } status = @@ -441,7 +449,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ } if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location, @@ -452,7 +460,7 @@ static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */ /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_json */ static int wh_write_kairosdb(const data_set_t *ds, @@ -467,28 +475,30 @@ static int wh_write_kairosdb(const data_set_t *ds, if (status != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } } status = format_kairosdb_value_list( cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl, - cb->store_rates, (char const *const *)http_attrs, http_attrs_num); + cb->store_rates, (char const *const *)http_attrs, http_attrs_num, + cb->data_ttl, cb->metrics_prefix); if (status == -ENOMEM) { status = wh_flush_nolock(/* timeout = */ 0, cb); if (status != 0) { wh_reset_buffer(cb); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } status = format_kairosdb_value_list( cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl, - cb->store_rates, (char const *const *)http_attrs, http_attrs_num); + cb->store_rates, (char const *const *)http_attrs, http_attrs_num, + cb->data_ttl, cb->metrics_prefix); } if (status != 0) { pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location, @@ -499,7 +509,7 @@ static int wh_write_kairosdb(const data_set_t *ds, /* Check if we have enough space for this command. */ pthread_mutex_unlock(&cb->send_lock); - return (0); + return 0; } /* }}} int wh_write_kairosdb */ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -508,7 +518,7 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ int status; if (user_data == NULL) - return (-EINVAL); + return -EINVAL; cb = user_data->data; assert(cb->send_metrics); @@ -524,7 +534,7 @@ static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = wh_write_command(ds, vl, cb); break; } - return (status); + return status; } /* }}} int wh_write */ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ @@ -534,7 +544,7 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ int status; if ((ud == NULL) || (ud->data == NULL)) - return (EINVAL); + return EINVAL; cb = ud->data; assert(cb->send_notifications); @@ -549,13 +559,13 @@ static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */ if (wh_callback_init(cb) != 0) { ERROR("write_http plugin: wh_callback_init failed."); pthread_mutex_unlock(&cb->send_lock); - return (-1); + return -1; } status = wh_post_nolock(cb, alert); pthread_mutex_unlock(&cb->send_lock); - return (status); + return status; } /* }}} int wh_notify */ static int config_set_format(wh_callback_t *cb, /* {{{ */ @@ -566,7 +576,7 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ WARNING("write_http plugin: The `%s' config option " "needs exactly one string argument.", ci->key); - return (-1); + return -1; } string = ci->values[0].value.string; @@ -578,10 +588,10 @@ static int config_set_format(wh_callback_t *cb, /* {{{ */ cb->format = WH_FORMAT_KAIROSDB; else { ERROR("write_http plugin: Invalid format string: %s", string); - return (-1); + return -1; } - return (0); + return 0; } /* }}} int config_set_format */ static int wh_config_append_string(const char *name, @@ -590,16 +600,16 @@ static int wh_config_append_string(const char *name, struct curl_slist *temp = NULL; if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) { WARNING("write_http plugin: `%s' needs exactly one string argument.", name); - return (-1); + return -1; } temp = curl_slist_append(*dest, ci->values[0].value.string); if (temp == NULL) - return (-1); + return -1; *dest = temp; - return (0); + return 0; } /* }}} int wh_config_append_string */ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ @@ -612,7 +622,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ cb = calloc(1, sizeof(*cb)); if (cb == NULL) { ERROR("write_http plugin: calloc failed."); - return (-1); + return -1; } cb->verify_peer = 1; cb->verify_host = 1; @@ -624,6 +634,14 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ cb->headers = NULL; cb->send_metrics = 1; cb->send_notifications = 0; + cb->data_ttl = 0; + cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX); + + if (cb->metrics_prefix == NULL) { + ERROR("write_http plugin: strdup failed."); + sfree(cb); + return -1; + } pthread_mutex_init(&cb->send_lock, /* attr = */ NULL); @@ -730,9 +748,13 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ } strarray_add(&http_attrs, &http_attrs_num, key); strarray_add(&http_attrs, &http_attrs_num, val); - DEBUG("write_http plugins: got attribute: %s => %s", key, val); + DEBUG("write_http plugin: got attribute: %s => %s", key, val); sfree(key); sfree(val); + } else if (strcasecmp("TTL", child->key) == 0) { + status = cf_util_get_int(child, &cb->data_ttl); + } else if (strcasecmp("Prefix", child->key) == 0) { + status = cf_util_get_string(child, &cb->metrics_prefix); } else { ERROR("write_http plugin: Invalid configuration " "option: %s.", @@ -746,13 +768,13 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ if (status != 0) { wh_callback_free(cb); - return (status); + return status; } if (cb->location == NULL) { ERROR("write_http plugin: no URL defined for instance '%s'", cb->name); wh_callback_free(cb); - return (-1); + return -1; } if (!cb->send_metrics && !cb->send_notifications) { @@ -760,9 +782,12 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ "are enabled for \"%s\".", cb->name); wh_callback_free(cb); - return (-1); + return -1; } + if (strlen(cb->metrics_prefix) == 0) + sfree(cb->metrics_prefix); + if (cb->low_speed_limit > 0) cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval()); @@ -779,12 +804,12 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ if (cb->send_buffer == NULL) { ERROR("write_http plugin: malloc(%zu) failed.", cb->send_buffer_size); wh_callback_free(cb); - return (-1); + return -1; } /* Nulls the buffer and sets ..._free and ..._fill. */ wh_reset_buffer(cb); - ssnprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name); + snprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name); DEBUG("write_http: Registering write callback '%s' with URL '%s'", callback_name, cb->location); @@ -804,7 +829,7 @@ static int wh_config_node(oconfig_item_t *ci) /* {{{ */ user_data.free_func = NULL; } - return (0); + return 0; } /* }}} int wh_config_node */ static int wh_config(oconfig_item_t *ci) /* {{{ */ @@ -826,7 +851,7 @@ static int wh_config(oconfig_item_t *ci) /* {{{ */ } } - return (0); + return 0; } /* }}} int wh_config */ static int wh_init(void) /* {{{ */ @@ -834,7 +859,7 @@ static int wh_init(void) /* {{{ */ /* Call this while collectd is still single-threaded to avoid * initialization issues in libgcrypt. */ curl_global_init(CURL_GLOBAL_SSL); - return (0); + return 0; } /* }}} int wh_init */ void module_register(void) /* {{{ */