**/
#include "collectd.h"
+
#include "plugin.h"
#include "common.h"
-#include "utils_cache.h"
#include "utils_format_json.h"
+#include "utils_format_kairosdb.h"
#include <curl/curl.h>
time_t low_speed_time;
int timeout;
-#define WH_FORMAT_COMMAND 0
-#define WH_FORMAT_JSON 1
+#define WH_FORMAT_COMMAND 0
+#define WH_FORMAT_JSON 1
+#define WH_FORMAT_KAIROSDB 2
int format;
CURL *curl;
static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */
{
+ if ((cb == NULL) || (cb->send_buffer == NULL))
+ return;
+
memset (cb->send_buffer, 0, cb->send_buffer_size);
cb->send_buffer_free = cb->send_buffer_size;
cb->send_buffer_fill = 0;
cb->send_buffer_init_time = cdtime ();
- if (cb->format == WH_FORMAT_JSON)
+ if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
{
format_json_initialize (cb->send_buffer,
&cb->send_buffer_fill,
curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, COLLECTD_USERAGENT);
cb->headers = curl_slist_append (cb->headers, "Accept: */*");
- if (cb->format == WH_FORMAT_JSON)
+ if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
cb->headers = curl_slist_append (cb->headers, "Content-Type: application/json");
else
cb->headers = curl_slist_append (cb->headers, "Content-Type: text/plain");
status = wh_send_buffer (cb);
wh_reset_buffer (cb);
}
- else if (cb->format == WH_FORMAT_JSON)
+ else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_KAIROSDB)
{
if (cb->send_buffer_fill <= 2)
{
cb = data;
- wh_flush_nolock (/* timeout = */ 0, cb);
+ if (cb->send_buffer != NULL)
+ wh_flush_nolock (/* timeout = */ 0, cb);
if (cb->curl != NULL)
{
&cb->send_buffer_fill,
&cb->send_buffer_free,
ds, vl, cb->store_rates);
- if (status == (-ENOMEM))
+ if (status == -ENOMEM)
{
status = wh_flush_nolock (/* timeout = */ 0, cb);
if (status != 0)
return (0);
} /* }}} int wh_write_json */
+static int wh_write_kairosdb (const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ wh_callback_t *cb)
+{
+ int status;
+
+ pthread_mutex_lock (&cb->send_lock);
+
+ if (cb->curl == NULL)
+ {
+ status = wh_callback_init (cb);
+ if (status != 0)
+ {
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
+ }
+ }
+
+ status = format_kairosdb_value_list (cb->send_buffer,
+ &cb->send_buffer_fill,
+ &cb->send_buffer_free,
+ ds, vl, cb->store_rates);
+ if (status == -ENOMEM)
+ {
+ status = wh_flush_nolock (/* timeout = */ 0, cb);
+ if (status != 0)
+ {
+ wh_reset_buffer (cb);
+ pthread_mutex_unlock (&cb->send_lock);
+ return (status);
+ }
+
+ status = format_kairosdb_value_list (cb->send_buffer,
+ &cb->send_buffer_fill,
+ &cb->send_buffer_free,
+ ds, vl, cb->store_rates);
+ }
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&cb->send_lock);
+ return (status);
+ }
+
+ DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)",
+ cb->location,
+ cb->send_buffer_fill, cb->send_buffer_size,
+ 100.0 * ((double) cb->send_buffer_fill) / ((double) cb->send_buffer_size));
+
+ /* Check if we have enough space for this command. */
+ pthread_mutex_unlock (&cb->send_lock);
+
+ return (0);
+} /* }}} int wh_write_kairosdb */
+
static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data)
{
cb = user_data->data;
- if (cb->format == WH_FORMAT_JSON)
+ switch(cb->format) {
+ case WH_FORMAT_JSON:
status = wh_write_json (ds, vl, cb);
- else
+ break;
+ case WH_FORMAT_KAIROSDB:
+ status = wh_write_kairosdb (ds, vl, cb);
+ break;
+ default:
status = wh_write_command (ds, vl, cb);
-
+ break;
+ }
return (status);
} /* }}} int wh_write */
cb->format = WH_FORMAT_COMMAND;
else if (strcasecmp ("JSON", string) == 0)
cb->format = WH_FORMAT_JSON;
+ else if (strcasecmp ("KAIROSDB", string) == 0)
+ cb->format = WH_FORMAT_KAIROSDB;
else
{
ERROR ("write_http plugin: Invalid format string: %s",
{
wh_callback_t *cb;
int buffer_size = 0;
- user_data_t user_data;
+ user_data_t user_data = { 0 };
char callback_name[DATA_MAX_NAME_LEN];
int status = 0;
- int i;
cb = calloc (1, sizeof (*cb));
if (cb == NULL)
if (strcasecmp ("URL", ci->key) == 0)
cf_util_get_string (ci, &cb->location);
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
DEBUG ("write_http: Registering write callback '%s' with URL '%s'",
callback_name, cb->location);
- memset (&user_data, 0, sizeof (user_data));
user_data.data = cb;
- user_data.free_func = NULL;
plugin_register_flush (callback_name, wh_flush, &user_data);
user_data.free_func = wh_callback_free;
static int wh_config (oconfig_item_t *ci) /* {{{ */
{
- int i;
-
- for (i = 0; i < ci->children_num; i++)
+ for (int i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;