#define RIEMANN_HOST "localhost"
#define RIEMANN_PORT "5555"
+#define RIEMANN_TTL_FACTOR 2.0
struct riemann_host {
char *name;
char *service;
_Bool use_tcp;
int s;
+ double ttl_factor;
int reference_count;
};
sfree (event);
} /* }}} void riemann_event_protobuf_free */
-static void riemann_msg_protobuf_free (Msg *msg) /* {{{ */
+static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
{
size_t i;
} /* }}} void riemann_msg_protobuf_free */
/* host->lock must be held when calling this function. */
-static int
-riemann_connect(struct riemann_host *host)
+static int riemann_connect(struct riemann_host *host) /* {{{ */
{
int e;
struct addrinfo *ai, *res, hints;
return -1;
}
return 0;
-}
+} /* }}} int riemann_connect */
/* host->lock must be held when calling this function. */
-static int
-riemann_disconnect (struct riemann_host *host)
+static int riemann_disconnect (struct riemann_host *host) /* {{{ */
{
if ((host->flags & F_CONNECT) == 0)
return (0);
host->flags &= ~F_CONNECT;
return (0);
-}
+} /* }}} int riemann_disconnect */
-static int
-riemann_send(struct riemann_host *host, Msg const *msg)
+static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */
{
- u_char *buffer;
+ int status = 0;
+ u_char *buffer = NULL;
size_t buffer_len;
- int status;
-
- pthread_mutex_lock (&host->lock);
status = riemann_connect (host);
if (status != 0)
- {
- pthread_mutex_unlock (&host->lock);
return status;
- }
buffer_len = msg__get_packed_size(msg);
+
if (host->use_tcp)
buffer_len += 4;
buffer = malloc (buffer_len);
if (buffer == NULL) {
- pthread_mutex_unlock (&host->lock);
ERROR ("write_riemann plugin: malloc failed.");
return ENOMEM;
}
if (status != 0)
{
char errbuf[1024];
-
- riemann_disconnect (host);
- pthread_mutex_unlock (&host->lock);
-
ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
(host->node != NULL) ? host->node : RIEMANN_HOST,
(host->service != NULL) ? host->service : RIEMANN_PORT,
return -1;
}
- pthread_mutex_unlock (&host->lock);
sfree (buffer);
return 0;
-}
+} /* }}} int riemann_send_msg */
+
+static int riemann_recv_ack(struct riemann_host *host) /* {{{ */
+{
+ int status = 0;
+ Msg *msg = NULL;
+ uint32_t header;
+
+ status = (int) sread (host->s, &header, 4);
+
+ if (status != 0)
+ return -1;
+
+ size_t size = ntohl(header);
+
+ // Buffer on the stack since acknowledges are typically small.
+ u_char buffer[size];
+ memset (buffer, 0, size);
+
+ status = (int) sread (host->s, buffer, size);
+
+ if (status != 0)
+ return status;
+
+ msg = msg__unpack (NULL, size, buffer);
+
+ if (msg == NULL)
+ return -1;
+
+ if (!msg->ok)
+ {
+ ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
+ (host->node != NULL) ? host->node : RIEMANN_HOST,
+ (host->service != NULL) ? host->service : RIEMANN_PORT,
+ msg->error);
+
+ msg__free_unpacked(msg, NULL);
+ return -1;
+ }
+
+ msg__free_unpacked (msg, NULL);
+ return 0;
+} /* }}} int riemann_recv_ack */
+
+/**
+ * Function to send messages (Msg) to riemann.
+ *
+ * Acquires the host lock, disconnects on errors.
+ */
+static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */
+{
+ int status = 0;
+ pthread_mutex_lock (&host->lock);
+
+ status = riemann_send_msg(host, msg);
+ if (status != 0) {
+ riemann_disconnect (host);
+ pthread_mutex_unlock (&host->lock);
+ return status;
+ }
+
+ /*
+ * For TCP we need to receive message acknowledgemenent.
+ */
+ if (host->use_tcp)
+ {
+ status = riemann_recv_ack(host);
+
+ if (status != 0)
+ {
+ riemann_disconnect (host);
+ pthread_mutex_unlock (&host->lock);
+ return status;
+ }
+ }
+
+ pthread_mutex_unlock (&host->lock);
+ return 0;
+} /* }}} int riemann_send */
static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
{
return (strarray_add (&event->tags, &event->n_tags, tag));
} /* }}} int riemann_event_add_tag */
-static int riemann_event_add_attribute (Event *event, /* {{{ */
+static int riemann_event_add_attribute(Event *event, /* {{{ */
char const *key, char const *value)
{
Attribute **new_attributes;
return (0);
} /* }}} int riemann_event_add_attribute */
-static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ */
+static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
notification_t const *n)
{
Msg *msg;
return (msg);
} /* }}} Msg *riemann_notification_to_protobuf */
-static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */
+static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
data_set_t const *ds,
value_list_t const *vl, size_t index,
gauge_t const *rates)
Event *event;
char name_buffer[5 * DATA_MAX_NAME_LEN];
char service_buffer[6 * DATA_MAX_NAME_LEN];
+ double ttl;
int i;
event = malloc (sizeof (*event));
event->host = strdup (vl->host);
event->time = CDTIME_T_TO_TIME_T (vl->time);
event->has_time = 1;
- event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval);
+
+ ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
+ event->ttl = (float) ttl;
event->has_ttl = 1;
riemann_event_add_attribute (event, "plugin", vl->plugin);
return (event);
} /* }}} Event *riemann_value_to_protobuf */
-static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
+static Msg *riemann_value_list_to_protobuf(struct riemann_host const *host, /* {{{ */
data_set_t const *ds,
value_list_t const *vl)
{
return (msg);
} /* }}} Msg *riemann_value_list_to_protobuf */
-static int
-riemann_notification(const notification_t *n, user_data_t *ud)
+static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
{
int status;
struct riemann_host *host = ud->data;
return (status);
} /* }}} int riemann_notification */
-static int
-riemann_write(const data_set_t *ds,
+static int riemann_write(const data_set_t *ds, /* {{{ */
const value_list_t *vl,
user_data_t *ud)
{
riemann_msg_protobuf_free (msg);
return status;
-}
+} /* }}} int riemann_write */
-static void
-riemann_free(void *p)
+static void riemann_free(void *p) /* {{{ */
{
struct riemann_host *host = p;
sfree(host->service);
pthread_mutex_destroy (&host->lock);
sfree(host);
-}
+} /* }}} void riemann_free */
-static int
-riemann_config_node(oconfig_item_t *ci)
+static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
{
struct riemann_host *host = NULL;
int status = 0;
host->store_rates = 1;
host->always_append_ds = 0;
host->use_tcp = 0;
+ host->ttl_factor = RIEMANN_TTL_FACTOR;
status = cf_util_get_string (ci, &host->name);
if (status != 0) {
&host->always_append_ds);
if (status != 0)
break;
+ } else if (strcasecmp ("TTLFactor", child->key) == 0) {
+ double tmp = NAN;
+ status = cf_util_get_double (child, &tmp);
+ if (status != 0)
+ break;
+ if (tmp >= 2.0) {
+ host->ttl_factor = tmp;
+ } else if (tmp >= 1.0) {
+ NOTICE ("write_riemann plugin: The configured "
+ "TTLFactor is very small "
+ "(%.1f). A value of 2.0 or "
+ "greater is recommended.",
+ tmp);
+ host->ttl_factor = tmp;
+ } else if (tmp > 0.0) {
+ WARNING ("write_riemann plugin: The configured "
+ "TTLFactor is too small to be "
+ "useful (%.1f). I'll use it "
+ "since the user knows best, "
+ "but under protest.",
+ tmp);
+ host->ttl_factor = tmp;
+ } else { /* zero, negative and NAN */
+ ERROR ("write_riemann plugin: The configured "
+ "TTLFactor is invalid (%.1f).",
+ tmp);
+ }
} else {
WARNING("write_riemann plugin: ignoring unknown config "
"option: \"%s\"", child->key);
pthread_mutex_unlock (&host->lock);
return status;
-}
+} /* }}} int riemann_config_node */
-static int
-riemann_config(oconfig_item_t *ci)
+static int riemann_config(oconfig_item_t *ci) /* {{{ */
{
int i;
oconfig_item_t *child;
}
}
return (0);
-}
+} /* }}} int riemann_config */
-void
-module_register(void)
+void module_register(void)
{
plugin_register_complex_config ("write_riemann", riemann_config);
}