#include "collectd.h"
-#include "common.h"
-#include "configfile.h"
#include "plugin.h"
+#include "utils/common/common.h"
#include "utils_cache.h"
#include "utils_complain.h"
#include "write_riemann_threshold.h"
-#include <errno.h>
#include <riemann/riemann-client.h>
#define RIEMANN_HOST "localhost"
char *name;
char *event_service_prefix;
pthread_mutex_t lock;
- _Bool batch_mode;
- _Bool notifications;
- _Bool check_thresholds;
- _Bool store_rates;
- _Bool always_append_ds;
+ bool batch_mode;
+ bool notifications;
+ bool check_thresholds;
+ bool store_rates;
+ bool always_append_ds;
char *node;
int port;
riemann_client_type_t client_type;
node, port);
return -1;
}
+#if RCC_VERSION_NUMBER >= 0x010800
if (host->timeout.tv_sec != 0) {
if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
riemann_client_free(host->client);
return -1;
}
}
+#endif
set_sock_opts(riemann_client_get_fd(host->client));
static int wrr_disconnect(struct riemann_host *host) /* {{{ */
{
if (!host->client)
- return (0);
+ return 0;
riemann_client_free(host->client);
host->client = NULL;
- return (0);
+ return 0;
} /* }}} int wrr_disconnect */
/**
return status;
}
-static riemann_message_t *
-wrr_notification_to_message(struct riemann_host *host, /* {{{ */
- notification_t const *n) {
+static riemann_message_t *wrr_notification_to_message(notification_t const *n) {
riemann_message_t *msg;
riemann_event_t *event;
char service_buffer[6 * DATA_MAX_NAME_LEN];
RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
RIEMANN_EVENT_FIELD_NONE);
+#if RCC_VERSION_NUMBER >= 0x010A00
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
+ (int64_t)CDTIME_T_TO_US(n->time));
+#endif
+
if (n->host[0] != 0)
riemann_event_string_attribute_add(event, "host", n->host);
if (n->plugin[0] != 0)
if (msg == NULL) {
ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
riemann_event_free(event);
- return (NULL);
+ return NULL;
}
DEBUG("write_riemann plugin: Successfully created message for notification: "
"host = \"%s\", service = \"%s\", state = \"%s\"",
event->host, event->service, event->state);
- return (msg);
-} /* }}} riemann_message_t *wrr_notification_to_message */
+ return msg;
+}
static riemann_event_t *
wrr_value_to_event(struct riemann_host const *host, /* {{{ */
event = riemann_event_new();
if (event == NULL) {
ERROR("write_riemann plugin: riemann_event_new() failed.");
- return (NULL);
+ return NULL;
}
format_name(name_buffer, sizeof(name_buffer),
vl->type, "ds_name", ds->ds[index].name, NULL,
RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
+#if RCC_VERSION_NUMBER >= 0x010A00
+ riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
+ (int64_t)CDTIME_T_TO_US(vl->time));
+#endif
+
if (host->check_thresholds) {
const char *state = NULL;
{
char ds_index[DATA_MAX_NAME_LEN];
- ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
+ ssnprintf(ds_index, sizeof(ds_index), "%" PRIsz, index);
riemann_event_string_attribute_add(event, "ds_index", ds_index);
}
RIEMANN_EVENT_FIELD_NONE);
}
+ if (vl->meta) {
+ char **toc;
+ int n = meta_data_toc(vl->meta, &toc);
+
+ for (int i = 0; i < n; i++) {
+ char *key = toc[i];
+ char *value;
+
+ if (0 == meta_data_as_string(vl->meta, key, &value)) {
+ riemann_event_string_attribute_add(event, key, value);
+ free(value);
+ }
+ }
+
+ free(toc);
+ }
+
DEBUG("write_riemann plugin: Successfully created message for metric: "
"host = \"%s\", service = \"%s\"",
event->host, event->service);
- return (event);
+ return event;
} /* }}} riemann_event_t *wrr_value_to_event */
static riemann_message_t *
msg = riemann_message_new();
if (msg == NULL) {
ERROR("write_riemann plugin: riemann_message_new failed.");
- return (NULL);
+ return NULL;
}
if (host->store_rates) {
if (rates == NULL) {
ERROR("write_riemann plugin: uc_get_rate failed.");
riemann_message_free(msg);
- return (NULL);
+ return NULL;
}
}
if (event == NULL) {
riemann_message_free(msg);
sfree(rates);
- return (NULL);
+ return NULL;
}
riemann_message_append_events(msg, event, NULL);
}
sfree(rates);
- return (msg);
+ return msg;
} /* }}} riemann_message_t *wrr_value_list_to_message */
/*
int status;
if (user_data == NULL)
- return (-EINVAL);
+ return -EINVAL;
host = user_data->data;
pthread_mutex_lock(&host->lock);
/*
* Never batch for notifications, send them ASAP
*/
- msg = wrr_notification_to_message(host, n);
+ msg = wrr_notification_to_message(n);
if (msg == NULL)
- return (-1);
+ return -1;
status = wrr_send(host, msg);
if (status != 0)
"write_riemann plugin: riemann_client_send succeeded");
riemann_message_free(msg);
- return (status);
+ return status;
} /* }}} int wrr_notification */
static int wrr_write(const data_set_t *ds, /* {{{ */
} else {
msg = wrr_value_list_to_message(host, ds, vl, statuses);
if (msg == NULL)
- return (-1);
+ return -1;
status = wrr_send(host, msg);
wrr_disconnect(host);
+ pthread_mutex_lock(&host->lock);
pthread_mutex_destroy(&host->lock);
sfree(host);
} /* }}} void wrr_free */
int i;
oconfig_item_t *child;
char callback_name[DATA_MAX_NAME_LEN];
- user_data_t ud;
if ((host = calloc(1, sizeof(*host))) == NULL) {
ERROR("write_riemann plugin: calloc failed.");
host->reference_count = 1;
host->node = NULL;
host->port = 0;
- host->notifications = 1;
- host->check_thresholds = 0;
- host->store_rates = 1;
- host->always_append_ds = 0;
- host->batch_mode = 1;
+ host->notifications = true;
+ host->check_thresholds = false;
+ host->store_rates = true;
+ host->always_append_ds = false;
+ host->batch_mode = true;
host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
host->batch_init = cdtime();
host->batch_timeout = 0;
if (status != 0)
break;
} else if (strcasecmp("Timeout", child->key) == 0) {
+#if RCC_VERSION_NUMBER >= 0x010800
status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
if (status != 0)
break;
+#else
+ WARNING("write_riemann plugin: The Timeout option is not supported. "
+ "Please upgrade the Riemann client to at least 1.8.0.");
+#endif
} else if (strcasecmp("Port", child->key) == 0) {
host->port = cf_util_get_port_number(child);
if (host->port == -1) {
- ERROR("write_riemann plugin: Invalid argument "
- "configured for the \"Port\" "
- "option.");
break;
}
} else if (strcasecmp("Protocol", child->key) == 0) {
char tmp[16];
status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
- if (status != 0) {
- ERROR("write_riemann plugin: cf_util_get_"
- "string_buffer failed with "
- "status %i.",
- status);
+ if (status != 0)
break;
- }
if (strcasecmp("UDP", tmp) == 0)
host->client_type = RIEMANN_CLIENT_UDP;
tmp);
} else if (strcasecmp("TLSCAFile", child->key) == 0) {
status = cf_util_get_string(child, &host->tls_ca_file);
- if (status != 0) {
- ERROR("write_riemann plugin: cf_util_get_"
- "string_buffer failed with "
- "status %i.",
- status);
+ if (status != 0)
break;
- }
} else if (strcasecmp("TLSCertFile", child->key) == 0) {
status = cf_util_get_string(child, &host->tls_cert_file);
- if (status != 0) {
- ERROR("write_riemann plugin: cf_util_get_"
- "string_buffer failed with "
- "status %i.",
- status);
+ if (status != 0)
break;
- }
} else if (strcasecmp("TLSKeyFile", child->key) == 0) {
status = cf_util_get_string(child, &host->tls_key_file);
- if (status != 0) {
- ERROR("write_riemann plugin: cf_util_get_"
- "string_buffer failed with "
- "status %i.",
- status);
+ if (status != 0)
break;
- }
} else if (strcasecmp("StoreRates", child->key) == 0) {
status = cf_util_get_boolean(child, &host->store_rates);
if (status != 0)
ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
host->name);
- ud.data = host;
- ud.free_func = wrr_free;
+
+ user_data_t ud = {.data = host, .free_func = wrr_free};
pthread_mutex_lock(&host->lock);
* holding a reference. */
pthread_mutex_unlock(&host->lock);
wrr_free(host);
- return (-1);
+ return -1;
}
host->reference_count--;
if (child->values_num != 2) {
WARNING("riemann attributes need both a key and a value.");
- return (-1);
+ return -1;
}
if (child->values[0].type != OCONFIG_TYPE_STRING ||
child->values[1].type != OCONFIG_TYPE_STRING) {
WARNING("riemann attribute needs string arguments.");
- return (-1);
+ return -1;
}
if ((key = strdup(child->values[0].value.string)) == NULL) {
WARNING("cannot allocate memory for attribute key.");
- return (-1);
+ return -1;
}
if ((val = strdup(child->values[1].value.string)) == NULL) {
WARNING("cannot allocate memory for attribute value.");
sfree(key);
- return (-1);
+ return -1;
}
strarray_add(&riemann_attrs, &riemann_attrs_num, key);
strarray_add(&riemann_attrs, &riemann_attrs_num, val);
child->key);
}
}
- return (0);
+ return 0;
} /* }}} int wrr_config */
void module_register(void) {
plugin_register_complex_config("write_riemann", wrr_config);
}
-
-/* vim: set sw=8 sts=8 ts=8 noet : */