#define WRITE_HTTP_DEFAULT_BUFFER_SIZE 4096
#endif
+#ifndef WRITE_HTTP_DEFAULT_PREFIX
+#define WRITE_HTTP_DEFAULT_PREFIX "collectd"
+#endif
+
/*
* Private variables
*/
cdtime_t send_buffer_init_time;
pthread_mutex_t send_lock;
+
+ int data_ttl;
+ char *metrics_prefix;
};
typedef struct wh_callback_s wh_callback_t;
+static char **http_attrs;
+static size_t http_attrs_num;
+
static void wh_log_http_error(wh_callback_t *cb) {
if (!cb->log_http_error)
return;
{
int status = 0;
+ curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
curl_easy_setopt(cb->curl, CURLOPT_POSTFIELDS, data);
status = curl_easy_perform(cb->curl);
"status %i: %s",
status, cb->curl_errbuf);
}
- return (status);
+ return status;
} /* }}} wh_post_nolock */
static int wh_callback_init(wh_callback_t *cb) /* {{{ */
{
if (cb->curl != NULL)
- return (0);
+ return 0;
cb->curl = curl_easy_init();
if (cb->curl == NULL) {
ERROR("curl plugin: curl_easy_init failed.");
- return (-1);
+ return -1;
}
if (cb->low_speed_limit > 0 && cb->low_speed_time > 0) {
curl_easy_setopt(cb->curl, CURLOPT_HTTPHEADER, cb->headers);
curl_easy_setopt(cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
- curl_easy_setopt(cb->curl, CURLOPT_URL, cb->location);
curl_easy_setopt(cb->curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(cb->curl, CURLOPT_MAXREDIRS, 50L);
cb->credentials = malloc(credentials_size);
if (cb->credentials == NULL) {
ERROR("curl plugin: malloc failed.");
- return (-1);
+ return -1;
}
- ssnprintf(cb->credentials, credentials_size, "%s:%s", cb->user,
- (cb->pass == NULL) ? "" : cb->pass);
+ snprintf(cb->credentials, credentials_size, "%s:%s", cb->user,
+ (cb->pass == NULL) ? "" : cb->pass);
curl_easy_setopt(cb->curl, CURLOPT_USERPWD, cb->credentials);
#endif
curl_easy_setopt(cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_ANY);
wh_reset_buffer(cb);
- return (0);
+ return 0;
} /* }}} int wh_callback_init */
static int wh_flush_nolock(cdtime_t timeout, wh_callback_t *cb) /* {{{ */
now = cdtime();
if ((cb->send_buffer_init_time + timeout) > now)
- return (0);
+ return 0;
}
if (cb->format == WH_FORMAT_COMMAND) {
if (cb->send_buffer_fill == 0) {
cb->send_buffer_init_time = cdtime();
- return (0);
+ return 0;
}
status = wh_post_nolock(cb, cb->send_buffer);
} else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB) {
if (cb->send_buffer_fill <= 2) {
cb->send_buffer_init_time = cdtime();
- return (0);
+ return 0;
}
status = format_json_finalize(cb->send_buffer, &cb->send_buffer_fill,
ERROR("write_http: wh_flush_nolock: "
"format_json_finalize failed.");
wh_reset_buffer(cb);
- return (status);
+ return status;
}
status = wh_post_nolock(cb, cb->send_buffer);
ERROR("write_http: wh_flush_nolock: "
"Unknown format: %i",
cb->format);
- return (-1);
+ return -1;
}
- return (status);
+ return status;
} /* }}} wh_flush_nolock */
static int wh_flush(cdtime_t timeout, /* {{{ */
int status;
if (user_data == NULL)
- return (-EINVAL);
+ return -EINVAL;
cb = user_data->data;
if (wh_callback_init(cb) != 0) {
ERROR("write_http plugin: wh_callback_init failed.");
pthread_mutex_unlock(&cb->send_lock);
- return (-1);
+ return -1;
}
status = wh_flush_nolock(timeout, cb);
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
} /* }}} int wh_flush */
static void wh_callback_free(void *data) /* {{{ */
sfree(cb->clientcert);
sfree(cb->clientkeypass);
sfree(cb->send_buffer);
+ sfree(cb->metrics_prefix);
sfree(cb);
} /* }}} void wh_callback_free */
status = FORMAT_VL(key, sizeof(key), vl);
if (status != 0) {
ERROR("write_http plugin: error with format_name");
- return (status);
+ return status;
}
escape_string(key, sizeof(key));
if (status != 0) {
ERROR("write_http plugin: error with "
"wh_value_list_to_string");
- return (status);
+ return status;
}
- command_len = (size_t)ssnprintf(command, sizeof(command),
- "PUTVAL %s interval=%.3f %s\r\n", key,
- CDTIME_T_TO_DOUBLE(vl->interval), values);
+ command_len = (size_t)snprintf(command, sizeof(command),
+ "PUTVAL %s interval=%.3f %s\r\n", key,
+ CDTIME_T_TO_DOUBLE(vl->interval), values);
if (command_len >= sizeof(command)) {
ERROR("write_http plugin: Command buffer too small: "
"Need %zu bytes.",
command_len + 1);
- return (-1);
+ return -1;
}
pthread_mutex_lock(&cb->send_lock);
if (wh_callback_init(cb) != 0) {
ERROR("write_http plugin: wh_callback_init failed.");
pthread_mutex_unlock(&cb->send_lock);
- return (-1);
+ return -1;
}
if (command_len >= cb->send_buffer_free) {
status = wh_flush_nolock(/* timeout = */ 0, cb);
if (status != 0) {
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
}
}
assert(command_len < cb->send_buffer_free);
/* Check if we have enough space for this command. */
pthread_mutex_unlock(&cb->send_lock);
- return (0);
+ return 0;
} /* }}} int wh_write_command */
static int wh_write_json(const data_set_t *ds, const value_list_t *vl, /* {{{ */
if (wh_callback_init(cb) != 0) {
ERROR("write_http plugin: wh_callback_init failed.");
pthread_mutex_unlock(&cb->send_lock);
- return (-1);
+ return -1;
}
status =
if (status != 0) {
wh_reset_buffer(cb);
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
}
status =
}
if (status != 0) {
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
}
DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location,
/* Check if we have enough space for this command. */
pthread_mutex_unlock(&cb->send_lock);
- return (0);
+ return 0;
} /* }}} int wh_write_json */
static int wh_write_kairosdb(const data_set_t *ds,
if (status != 0) {
ERROR("write_http plugin: wh_callback_init failed.");
pthread_mutex_unlock(&cb->send_lock);
- return (-1);
+ return -1;
}
}
- status = format_kairosdb_value_list(cb->send_buffer, &cb->send_buffer_fill,
- &cb->send_buffer_free, ds, vl,
- cb->store_rates);
+ status = format_kairosdb_value_list(
+ cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+ cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
+ cb->data_ttl, cb->metrics_prefix);
if (status == -ENOMEM) {
status = wh_flush_nolock(/* timeout = */ 0, cb);
if (status != 0) {
wh_reset_buffer(cb);
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
}
- status = format_kairosdb_value_list(cb->send_buffer, &cb->send_buffer_fill,
- &cb->send_buffer_free, ds, vl,
- cb->store_rates);
+ status = format_kairosdb_value_list(
+ cb->send_buffer, &cb->send_buffer_fill, &cb->send_buffer_free, ds, vl,
+ cb->store_rates, (char const *const *)http_attrs, http_attrs_num,
+ cb->data_ttl, cb->metrics_prefix);
}
if (status != 0) {
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
}
DEBUG("write_http plugin: <%s> buffer %zu/%zu (%g%%)", cb->location,
/* Check if we have enough space for this command. */
pthread_mutex_unlock(&cb->send_lock);
- return (0);
+ return 0;
} /* }}} int wh_write_kairosdb */
static int wh_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
int status;
if (user_data == NULL)
- return (-EINVAL);
+ return -EINVAL;
cb = user_data->data;
assert(cb->send_metrics);
status = wh_write_command(ds, vl, cb);
break;
}
- return (status);
+ return status;
} /* }}} int wh_write */
static int wh_notify(notification_t const *n, user_data_t *ud) /* {{{ */
int status;
if ((ud == NULL) || (ud->data == NULL))
- return (EINVAL);
+ return EINVAL;
cb = ud->data;
assert(cb->send_notifications);
if (wh_callback_init(cb) != 0) {
ERROR("write_http plugin: wh_callback_init failed.");
pthread_mutex_unlock(&cb->send_lock);
- return (-1);
+ return -1;
}
status = wh_post_nolock(cb, alert);
pthread_mutex_unlock(&cb->send_lock);
- return (status);
+ return status;
} /* }}} int wh_notify */
static int config_set_format(wh_callback_t *cb, /* {{{ */
WARNING("write_http plugin: The `%s' config option "
"needs exactly one string argument.",
ci->key);
- return (-1);
+ return -1;
}
string = ci->values[0].value.string;
cb->format = WH_FORMAT_KAIROSDB;
else {
ERROR("write_http plugin: Invalid format string: %s", string);
- return (-1);
+ return -1;
}
- return (0);
+ return 0;
} /* }}} int config_set_format */
static int wh_config_append_string(const char *name,
struct curl_slist *temp = NULL;
if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
WARNING("write_http plugin: `%s' needs exactly one string argument.", name);
- return (-1);
+ return -1;
}
temp = curl_slist_append(*dest, ci->values[0].value.string);
if (temp == NULL)
- return (-1);
+ return -1;
*dest = temp;
- return (0);
+ return 0;
} /* }}} int wh_config_append_string */
static int wh_config_node(oconfig_item_t *ci) /* {{{ */
cb = calloc(1, sizeof(*cb));
if (cb == NULL) {
ERROR("write_http plugin: calloc failed.");
- return (-1);
+ return -1;
}
cb->verify_peer = 1;
cb->verify_host = 1;
cb->headers = NULL;
cb->send_metrics = 1;
cb->send_notifications = 0;
+ cb->data_ttl = 0;
+ cb->metrics_prefix = strdup(WRITE_HTTP_DEFAULT_PREFIX);
+
+ if (cb->metrics_prefix == NULL) {
+ ERROR("write_http plugin: strdup failed.");
+ sfree(cb);
+ return -1;
+ }
pthread_mutex_init(&cb->send_lock, /* attr = */ NULL);
status = cf_util_get_boolean(child, &cb->log_http_error);
else if (strcasecmp("Header", child->key) == 0)
status = wh_config_append_string("Header", &cb->headers, child);
- else {
+ else if (strcasecmp("Attribute", child->key) == 0) {
+ char *key = NULL;
+ char *val = NULL;
+
+ if (child->values_num != 2) {
+ WARNING("write_http plugin: Attribute need both a key and a value.");
+ break;
+ }
+ if (child->values[0].type != OCONFIG_TYPE_STRING ||
+ child->values[1].type != OCONFIG_TYPE_STRING) {
+ WARNING("write_http plugin: Attribute needs string arguments.");
+ break;
+ }
+ if ((key = strdup(child->values[0].value.string)) == NULL) {
+ WARNING("cannot allocate memory for attribute key.");
+ break;
+ }
+ if ((val = strdup(child->values[1].value.string)) == NULL) {
+ WARNING("cannot allocate memory for attribute value.");
+ sfree(key);
+ break;
+ }
+ strarray_add(&http_attrs, &http_attrs_num, key);
+ strarray_add(&http_attrs, &http_attrs_num, val);
+ DEBUG("write_http plugin: got attribute: %s => %s", key, val);
+ sfree(key);
+ sfree(val);
+ } else if (strcasecmp("TTL", child->key) == 0) {
+ status = cf_util_get_int(child, &cb->data_ttl);
+ } else if (strcasecmp("Prefix", child->key) == 0) {
+ status = cf_util_get_string(child, &cb->metrics_prefix);
+ } else {
ERROR("write_http plugin: Invalid configuration "
"option: %s.",
child->key);
if (status != 0) {
wh_callback_free(cb);
- return (status);
+ return status;
}
if (cb->location == NULL) {
ERROR("write_http plugin: no URL defined for instance '%s'", cb->name);
wh_callback_free(cb);
- return (-1);
+ return -1;
}
if (!cb->send_metrics && !cb->send_notifications) {
"are enabled for \"%s\".",
cb->name);
wh_callback_free(cb);
- return (-1);
+ return -1;
}
+ if (strlen(cb->metrics_prefix) == 0)
+ sfree(cb->metrics_prefix);
+
if (cb->low_speed_limit > 0)
cb->low_speed_time = CDTIME_T_TO_TIME_T(plugin_get_interval());
if (cb->send_buffer == NULL) {
ERROR("write_http plugin: malloc(%zu) failed.", cb->send_buffer_size);
wh_callback_free(cb);
- return (-1);
+ return -1;
}
/* Nulls the buffer and sets ..._free and ..._fill. */
wh_reset_buffer(cb);
- ssnprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name);
+ snprintf(callback_name, sizeof(callback_name), "write_http/%s", cb->name);
DEBUG("write_http: Registering write callback '%s' with URL '%s'",
callback_name, cb->location);
user_data.free_func = NULL;
}
- return (0);
+ return 0;
} /* }}} int wh_config_node */
static int wh_config(oconfig_item_t *ci) /* {{{ */
}
}
- return (0);
+ return 0;
} /* }}} int wh_config */
static int wh_init(void) /* {{{ */
/* Call this while collectd is still single-threaded to avoid
* initialization issues in libgcrypt. */
curl_global_init(CURL_GLOBAL_SSL);
- return (0);
+ return 0;
} /* }}} int wh_init */
void module_register(void) /* {{{ */
plugin_register_complex_config("write_http", wh_config);
plugin_register_init("write_http", wh_init);
} /* }}} void module_register */
-
-/* vim: set fdm=marker sw=8 ts=8 tw=78 et : */