X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_riemann.c;h=ac6b8fbf7b2b7a42afe1eecd07895ef1f2d82cef;hb=73a6f82a63747f088352a61e201beac2d185b2ac;hp=fd82650e779931d43b68e2544712d9b33445e175;hpb=2742a564d13f713c8e339749fda25222623c4e5e;p=collectd.git diff --git a/src/write_riemann.c b/src/write_riemann.c index fd82650e..ac6b8fbf 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -28,11 +28,9 @@ * Gergely Nagy */ -#include -#include -#include - #include "collectd.h" + + #include "plugin.h" #include "common.h" #include "configfile.h" @@ -40,12 +38,16 @@ #include "utils_complain.h" #include "write_riemann_threshold.h" +#include +#include + #define RIEMANN_HOST "localhost" #define RIEMANN_PORT 5555 #define RIEMANN_TTL_FACTOR 2.0 #define RIEMANN_BATCH_MAX 8192 struct riemann_host { + c_complain_t init_complaint; char *name; char *event_service_prefix; pthread_mutex_t lock; @@ -56,13 +58,12 @@ struct riemann_host { _Bool always_append_ds; char *node; int port; - c_complain_t init_complaint; - c_complain_t init_send_complaint; riemann_client_type_t client_type; riemann_client_t *client; double ttl_factor; cdtime_t batch_init; int batch_max; + int batch_timeout; int reference_count; riemann_message_t *batch_msg; char *tls_ca_file; @@ -136,21 +137,18 @@ static int wrr_disconnect(struct riemann_host *host) /* {{{ */ * * Acquires the host lock, disconnects on errors. */ -static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ +static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ { int status = 0; - pthread_mutex_lock (&host->lock); status = wrr_connect(host); if (status != 0) { - pthread_mutex_unlock(&host->lock); return status; } status = riemann_client_send_message(host->client, msg); if (status != 0) { wrr_disconnect(host); - pthread_mutex_unlock(&host->lock); return status; } @@ -166,16 +164,24 @@ static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ if (response == NULL) { wrr_disconnect(host); - pthread_mutex_unlock(&host->lock); return errno; } riemann_message_free(response); } - pthread_mutex_unlock (&host->lock); return 0; } /* }}} int wrr_send */ +static int wrr_send(struct riemann_host *host, riemann_message_t *msg) +{ + int status = 0; + + pthread_mutex_lock (&host->lock); + status = wrr_send_nolock(host, msg); + pthread_mutex_unlock (&host->lock); + return status; +} + static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */ notification_t const *n) { @@ -456,30 +462,16 @@ static int wrr_batch_flush_nolock(cdtime_t timeout, cdtime_t now; int status = 0; + now = cdtime(); if (timeout > 0) { - now = cdtime(); - if ((host->batch_init + timeout) > now) + if ((host->batch_init + timeout) > now) { return status; + } } - wrr_send(host, host->batch_msg); + wrr_send_nolock(host, host->batch_msg); riemann_message_free(host->batch_msg); - if (host->client_type != RIEMANN_CLIENT_UDP) - { - riemann_message_t *response; - - response = riemann_client_recv_message(host->client); - - if (!response) - { - wrr_disconnect(host); - return errno; - } - - riemann_message_free(response); - } - - host->batch_init = cdtime(); + host->batch_init = now; host->batch_msg = NULL; return status; } @@ -516,6 +508,7 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */ riemann_message_t *msg; size_t len; int ret; + cdtime_t timeout; msg = wrr_value_list_to_message(host, ds, vl, statuses); if (msg == NULL) @@ -547,7 +540,12 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */ ret = 0; if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) { ret = wrr_batch_flush_nolock(0, host); - } + } else { + if (host->batch_timeout > 0) { + timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout); + ret = wrr_batch_flush_nolock(timeout, host); + } + } pthread_mutex_unlock(&host->lock); return ret; @@ -600,7 +598,7 @@ static int wrr_write(const data_set_t *ds, /* {{{ */ } if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) { - wrr_batch_add_value_list(host, ds, vl, statuses); + wrr_batch_add_value_list(host, ds, vl, statuses); } else { msg = wrr_value_list_to_message(host, ds, vl, statuses); if (msg == NULL) @@ -660,6 +658,7 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ host->batch_mode = 1; host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */ host->batch_init = cdtime(); + host->batch_timeout = 0; host->ttl_factor = RIEMANN_TTL_FACTOR; host->client = NULL; host->client_type = RIEMANN_CLIENT_TCP; @@ -705,6 +704,10 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_int(child, &host->batch_max); if (status != 0) break; + } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) { + status = cf_util_get_int(child, &host->batch_timeout); + if (status != 0) + break; } else if (strcasecmp("Timeout", child->key) == 0) { status = cf_util_get_int(child, (int *)&host->timeout.tv_sec); if (status != 0)