From: Florian Forster Date: Thu, 14 Aug 2014 15:49:52 +0000 (+0200) Subject: Merge remote-tracking branch 'github/pr/703' X-Git-Tag: collectd-5.5.0~223 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=d4547a2861caad836701e70f805f7be68a91272e;hp=831665cda72d63624b758860a1b89bd234f6de49;p=collectd.git Merge remote-tracking branch 'github/pr/703' --- diff --git a/AUTHORS b/AUTHORS index 6d0eae7e..d5dff3de 100644 --- a/AUTHORS +++ b/AUTHORS @@ -53,6 +53,9 @@ Benjamin Gilbert Bert Vermeulen - sigrok plugin +Brett Hawn + - write_tsdb plugin for http://opentsdb.net/ + Bruno Prémont - BIND plugin. - Many bugreports and -fixes in various plugins, @@ -112,6 +115,9 @@ J. Javier Maestro Jérôme Renard - varnish plugin. +Kevin Bowling + - write_tsdb plugin for http://opentsdb.net/ + Kris Nielander - tail_csv plugin. diff --git a/README b/README index 60de1edd..cd581425 100644 --- a/README +++ b/README @@ -411,6 +411,10 @@ Features can be configured to avoid logging send errors (especially useful when using UDP). + - write_tsdb + Sends data OpenTSDB, a scalable no master, no shared state time series + database. + - write_http Sends the values collected by collectd to a web-server using HTTP POST requests. The transmitted data is either in a form understood by the diff --git a/configure.ac b/configure.ac index fe206091..92fd8778 100644 --- a/configure.ac +++ b/configure.ac @@ -5354,6 +5354,7 @@ AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) +AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin]) AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics]) AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics]) @@ -5717,6 +5718,7 @@ Configuration: write_mongodb . . . . $enable_write_mongodb write_redis . . . . . $enable_write_redis write_riemann . . . . $enable_write_riemann + write_tsdb . . . . . $enable_write_tsdb xmms . . . . . . . . $enable_xmms zfs_arc . . . . . . . $enable_zfs_arc diff --git a/src/Makefile.am b/src/Makefile.am index 1a8ebbb8..d25370a5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1467,6 +1467,14 @@ collectd_LDADD += "-dlopen" write_riemann.la collectd_DEPENDENCIES += write_riemann.la endif +if BUILD_PLUGIN_WRITE_TSDB +pkglib_LTLIBRARIES += write_tsdb.la +write_tsdb_la_SOURCES = write_tsdb.c +write_tsdb_la_LDFLAGS = -module -avoid-version +collectd_LDADD += "-dlopen" write_tsdb.la +collectd_DEPENDENCIES += write_tsdb.la +endif + if BUILD_PLUGIN_XMMS pkglib_LTLIBRARIES += xmms.la xmms_la_SOURCES = xmms.c diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 5780722e..18a74c65 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -191,6 +191,7 @@ #@BUILD_PLUGIN_WRITE_MONGODB_TRUE@LoadPlugin write_mongodb #@BUILD_PLUGIN_WRITE_REDIS_TRUE@LoadPlugin write_redis #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann +#@BUILD_PLUGIN_WRITE_TSDB_TRUE@LoadPlugin write_tsdb #@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms #@BUILD_PLUGIN_ZFS_ARC_TRUE@LoadPlugin zfs_arc @@ -1243,6 +1244,16 @@ # Attribute "foo" "bar" # +# +# +# Host "localhost" +# Port "4242" +# HostTags "status=production" +# StoreRates false +# AlwaysAppendDS false +# +# + ############################################################################## # Filter configuration # #----------------------------------------------------------------------------# diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 3acf0f89..acd0cbbe 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -6518,6 +6518,59 @@ more than one DS. =back +=head2 Plugin C + +The C plugin writes data to I, a scalable open-source +time series database. The plugin connects to a I, a masterless, no shared +state daemon that ingests metrics and stores them in HBase. The plugin uses +I over the "line based" protocol with a default port 4242. The data will +be sent in blocks of at most 1428 bytes to minimize the number of network +packets. + +Synopsis: + + + + Host "tsd-1.my.domain" + Port "4242" + HostTags "status=production" + + + +The configuration consists of one or more EBEIE +blocks. Inside the B blocks, the following options are recognized: + +=over 4 + +=item B I
+ +Hostname or address to connect to. Defaults to C. + +=item B I + +Service name or port number to connect to. Defaults to C<4242>. + + +=item B I + +When set, I is added to the end of the metric. It is intended to be +used for name=value pairs that the TSD will tag the metric with. Dots and +whitespace are I escaped in this string. + +=item B B|B + +If set to B, convert counter values to rates. If set to B +(the default) counter values are stored as is, as an increasing +integer number. + +=item B B|B + +If set the B, append the name of the I (DS) to the "metric" +identifier. If set to B (the default), this is only done when there is +more than one DS. + +=back + =head2 Plugin C The I will send values to I, a schema-less diff --git a/src/write_tsdb.c b/src/write_tsdb.c new file mode 100644 index 00000000..2eca77e1 --- /dev/null +++ b/src/write_tsdb.c @@ -0,0 +1,647 @@ +/** + * collectd - src/write_tsdb.c + * Copyright (C) 2012 Pierre-Yves Ritschard + * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2009 Paul Sadauskas + * Copyright (C) 2009 Doug MacEachern + * Copyright (C) 2007-2012 Florian octo Forster + * Copyright (C) 2013-2014 Limelight Networks, Inc. + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Based on the write_graphite plugin. Authors: + * Florian octo Forster + * Doug MacEachern + * Paul Sadauskas + * Scott Sanders + * Pierre-Yves Ritschard + * write_tsdb Authors: + * Brett Hawn + * Kevin Bowling + **/ + +/* write_tsdb plugin configuation example + * + * + * + * Host "localhost" + * Port "4242" + * HostTags "status=production deviceclass=www" + * + * + */ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" +#include "configfile.h" + +#include "utils_cache.h" +#include "utils_parse_option.h" + +#include +#include +#include + +#ifndef WT_DEFAULT_NODE +# define WT_DEFAULT_NODE "localhost" +#endif + +#ifndef WT_DEFAULT_SERVICE +# define WT_DEFAULT_SERVICE "4242" +#endif + +#ifndef WT_DEFAULT_ESCAPE +# define WT_DEFAULT_ESCAPE '.' +#endif + +/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ +#ifndef WT_SEND_BUF_SIZE +# define WT_SEND_BUF_SIZE 1428 +#endif + +/* + * Private variables + */ +struct wt_callback +{ + int sock_fd; + + char *node; + char *service; + char *host_tags; + + _Bool store_rates; + _Bool always_append_ds; + + char send_buf[WT_SEND_BUF_SIZE]; + size_t send_buf_free; + size_t send_buf_fill; + cdtime_t send_buf_init_time; + + pthread_mutex_t send_lock; +}; + + +/* + * Functions + */ +static void wt_reset_buffer(struct wt_callback *cb) +{ + memset(cb->send_buf, 0, sizeof(cb->send_buf)); + cb->send_buf_free = sizeof(cb->send_buf); + cb->send_buf_fill = 0; + cb->send_buf_init_time = cdtime(); +} + +static int wt_send_buffer(struct wt_callback *cb) +{ + ssize_t status = 0; + + status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf)); + if (status < 0) + { + char errbuf[1024]; + ERROR("write_tsdb plugin: send failed with status %zi (%s)", + status, sstrerror (errno, errbuf, sizeof (errbuf))); + + close (cb->sock_fd); + cb->sock_fd = -1; + + return -1; + } + + return 0; +} + +/* NOTE: You must hold cb->send_lock when calling this function! */ +static int wt_flush_nolock(cdtime_t timeout, struct wt_callback *cb) +{ + int status; + + DEBUG("write_tsdb plugin: wt_flush_nolock: timeout = %.3f; " + "send_buf_fill = %zu;", + (double)timeout, + cb->send_buf_fill); + + /* timeout == 0 => flush unconditionally */ + if (timeout > 0) + { + cdtime_t now; + + now = cdtime(); + if ((cb->send_buf_init_time + timeout) > now) + return 0; + } + + if (cb->send_buf_fill <= 0) + { + cb->send_buf_init_time = cdtime(); + return 0; + } + + status = wt_send_buffer(cb); + wt_reset_buffer(cb); + + return status; +} + +static int wt_callback_init(struct wt_callback *cb) +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + int status; + + const char *node = cb->node ? cb->node : WT_DEFAULT_NODE; + const char *service = cb->service ? cb->service : WT_DEFAULT_SERVICE; + + if (cb->sock_fd > 0) + return 0; + + memset(&ai_hints, 0, sizeof(ai_hints)); +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_list = NULL; + + status = getaddrinfo(node, service, &ai_hints, &ai_list); + if (status != 0) + { + ERROR("write_tsdb plugin: getaddrinfo (%s, %s) failed: %s", + node, service, gai_strerror (status)); + return -1; + } + + assert (ai_list != NULL); + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + cb->sock_fd = socket(ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (cb->sock_fd < 0) + continue; + + status = connect(cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + close(cb->sock_fd); + cb->sock_fd = -1; + continue; + } + + break; + } + + freeaddrinfo(ai_list); + + if (cb->sock_fd < 0) + { + char errbuf[1024]; + ERROR("write_tsdb plugin: Connecting to %s:%s failed. " + "The last error was: %s", node, service, + sstrerror (errno, errbuf, sizeof(errbuf))); + close(cb->sock_fd); + return -1; + } + + wt_reset_buffer(cb); + + return 0; +} + +static void wt_callback_free(void *data) +{ + struct wt_callback *cb; + + if (data == NULL) + return; + + cb = data; + + pthread_mutex_lock(&cb->send_lock); + + wt_flush_nolock(0, cb); + + close(cb->sock_fd); + cb->sock_fd = -1; + + sfree(cb->node); + sfree(cb->service); + sfree(cb->host_tags); + + pthread_mutex_destroy(&cb->send_lock); + + sfree(cb); +} + +static int wt_flush(cdtime_t timeout, + const char *identifier __attribute__((unused)), + user_data_t *user_data) +{ + struct wt_callback *cb; + int status; + + if (user_data == NULL) + return -EINVAL; + + cb = user_data->data; + + pthread_mutex_lock(&cb->send_lock); + + if (cb->sock_fd < 0) + { + status = wt_callback_init(cb); + if (status != 0) + { + ERROR("write_tsdb plugin: wt_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; + } + } + + status = wt_flush_nolock(timeout, cb); + pthread_mutex_unlock(&cb->send_lock); + + return status; +} + +static int wt_format_values(char *ret, size_t ret_len, + int ds_num, const data_set_t *ds, + const value_list_t *vl, + _Bool store_rates) +{ + size_t offset = 0; + int status; + gauge_t *rates = NULL; + + assert(0 == strcmp (ds->type, vl->type)); + + memset(ret, 0, ret_len); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (ret + offset, ret_len - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + { \ + sfree(rates); \ + return -1; \ + } \ + else if (((size_t) status) >= (ret_len - offset)) \ + { \ + sfree(rates); \ + return -1; \ + } \ + else \ + offset += ((size_t) status); \ +} while (0) + + if (ds->ds[ds_num].type == DS_TYPE_GAUGE) + BUFFER_ADD("%f", vl->values[ds_num].gauge); + else if (store_rates) + { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + if (rates == NULL) + { + WARNING("format_values: " + "uc_get_rate failed."); + return -1; + } + BUFFER_ADD("%f", rates[ds_num]); + } + else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) + BUFFER_ADD("%llu", vl->values[ds_num].counter); + else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) + BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive); + else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute); + else + { + ERROR("format_values plugin: Unknown data source type: %i", + ds->ds[ds_num].type); + sfree(rates); + return -1; + } + +#undef BUFFER_ADD + + sfree(rates); + return 0; +} + +static int wt_format_name(char *ret, int ret_len, + const value_list_t *vl, + const struct wt_callback *cb, + const char *ds_name) +{ + int status; + char *temp = NULL; + char *prefix = ""; + const char *meta_prefix = "tsdb_prefix"; + + if (vl->meta) { + status = meta_data_get_string(vl->meta, meta_prefix, &temp); + if (status == -ENOENT) { + /* defaults to empty string */ + } else if (status < 0) { + sfree(temp); + return status; + } else { + prefix = temp; + } + } + + if (ds_name != NULL) { + if (vl->plugin_instance[0] == '\0') { + ssnprintf(ret, ret_len, "%s%s.%s", + prefix, vl->plugin, ds_name); + } else if (vl->type_instance == '\0') { + ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", + prefix, vl->plugin, vl->plugin_instance, + vl->type_instance, ds_name); + } else { + ssnprintf(ret, ret_len, "%s%s.%s.%s.%s", + prefix, vl->plugin, vl->plugin_instance, vl->type, + ds_name); + } + } else if (vl->plugin_instance[0] == '\0') { + if (vl->type_instance[0] == '\0') + ssnprintf(ret, ret_len, "%s%s.%s", + prefix, vl->plugin, vl->type); + else + ssnprintf(ret, ret_len, "%s%s.%s", + prefix, vl->plugin, vl->type_instance); + } else if (vl->type_instance[0] == '\0') { + ssnprintf(ret, ret_len, "%s%s.%s.%s", + prefix, vl->plugin, vl->plugin_instance, vl->type); + } else { + ssnprintf(ret, ret_len, "%s%s.%s.%s", + prefix, vl->plugin, vl->plugin_instance, vl->type_instance); + } + + sfree(temp); + return 0; +} + +static int wt_send_message (const char* key, const char* value, + cdtime_t time, struct wt_callback *cb, + const char* host, meta_data_t *md) +{ + int status; + int message_len; + char *temp = NULL; + char *tags = ""; + char message[1024]; + char *host_tags = cb->host_tags ? cb->host_tags : ""; + const char *meta_tsdb = "tsdb_tags"; + + /* skip if value is NaN */ + if (value[0] == 'n') + return 0; + + if (md) { + status = meta_data_get_string(md, meta_tsdb, &temp); + if (status == -ENOENT) { + /* defaults to empty string */ + } else if (status < 0) { + ERROR("write_tsdb plugin: tags metadata get failure"); + sfree(temp); + pthread_mutex_unlock(&cb->send_lock); + return status; + } else { + tags = temp; + } + } + + message_len = ssnprintf (message, + sizeof(message), + "put %s %.0f %s fqdn=%s %s %s\r\n", + key, + CDTIME_T_TO_DOUBLE(time), + value, + host, + tags, + host_tags); + + sfree(temp); + + if (message_len >= sizeof(message)) { + ERROR("write_tsdb plugin: message buffer too small: " + "Need %d bytes.", message_len + 1); + return -1; + } + + pthread_mutex_lock(&cb->send_lock); + + if (cb->sock_fd < 0) + { + status = wt_callback_init(cb); + if (status != 0) + { + ERROR("write_tsdb plugin: wt_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; + } + } + + if (message_len >= cb->send_buf_free) + { + status = wt_flush_nolock(0, cb); + if (status != 0) + { + pthread_mutex_unlock(&cb->send_lock); + return status; + } + } + + /* Assert that we have enough space for this message. */ + assert(message_len < cb->send_buf_free); + + /* `message_len + 1' because `message_len' does not include the + * trailing null byte. Neither does `send_buffer_fill'. */ + memcpy(cb->send_buf + cb->send_buf_fill, + message, message_len + 1); + cb->send_buf_fill += message_len; + cb->send_buf_free -= message_len; + + DEBUG("write_tsdb plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", + cb->node, + cb->service, + cb->send_buf_fill, sizeof(cb->send_buf), + 100.0 * ((double) cb->send_buf_fill) / + ((double) sizeof(cb->send_buf)), + message); + + pthread_mutex_unlock(&cb->send_lock); + + return 0; +} + +static int wt_write_messages(const data_set_t *ds, const value_list_t *vl, + struct wt_callback *cb) +{ + char key[10*DATA_MAX_NAME_LEN]; + char values[512]; + + int status, i; + + if (0 != strcmp(ds->type, vl->type)) + { + ERROR("write_tsdb plugin: DS type does not match " + "value list type"); + return -1; + } + + for (i = 0; i < ds->ds_num; i++) + { + const char *ds_name = NULL; + + if (cb->always_append_ds || (ds->ds_num > 1)) + ds_name = ds->ds[i].name; + + /* Copy the identifier to 'key' and escape it. */ + status = wt_format_name(key, sizeof(key), vl, cb, ds_name); + if (status != 0) + { + ERROR("write_tsdb plugin: error with format_name"); + return status; + } + + escape_string(key, sizeof(key)); + /* Convert the values to an ASCII representation and put that into + * 'values'. */ + status = wt_format_values(values, sizeof(values), i, ds, vl, + cb->store_rates); + if (status != 0) + { + ERROR("write_tsdb plugin: error with " + "wt_format_values"); + return status; + } + + /* Send the message to tsdb */ + status = wt_send_message(key, values, vl->time, cb, vl->host, vl->meta); + if (status != 0) + { + ERROR("write_tsdb plugin: error with " + "wt_send_message"); + return status; + } + } + + return 0; +} + +static int wt_write(const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data) +{ + struct wt_callback *cb; + int status; + + if (user_data == NULL) + return EINVAL; + + cb = user_data->data; + + status = wt_write_messages(ds, vl, cb); + + return status; +} + +static int wt_config_tsd(oconfig_item_t *ci) +{ + struct wt_callback *cb; + user_data_t user_data; + char callback_name[DATA_MAX_NAME_LEN]; + int i; + + cb = malloc(sizeof(*cb)); + if (cb == NULL) + { + ERROR("write_tsdb plugin: malloc failed."); + return -1; + } + memset(cb, 0, sizeof(*cb)); + cb->sock_fd = -1; + cb->node = NULL; + cb->service = NULL; + cb->host_tags = NULL; + cb->store_rates = 0; + + pthread_mutex_init (&cb->send_lock, NULL); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Host", child->key) == 0) + cf_util_get_string(child, &cb->node); + else if (strcasecmp("Port", child->key) == 0) + cf_util_get_service(child, &cb->service); + else if (strcasecmp("HostTags", child->key) == 0) + cf_util_get_string(child, &cb->host_tags); + else if (strcasecmp("StoreRates", child->key) == 0) + cf_util_get_boolean(child, &cb->store_rates); + else if (strcasecmp("AlwaysAppendDS", child->key) == 0) + cf_util_get_boolean(child, &cb->always_append_ds); + else + { + ERROR("write_tsdb plugin: Invalid configuration " + "option: %s.", child->key); + } + } + + ssnprintf(callback_name, sizeof(callback_name), "write_tsdb/%s/%s", + cb->node != NULL ? cb->node : WT_DEFAULT_NODE, + cb->service != NULL ? cb->service : WT_DEFAULT_SERVICE); + + memset(&user_data, 0, sizeof(user_data)); + user_data.data = cb; + user_data.free_func = wt_callback_free; + plugin_register_write(callback_name, wt_write, &user_data); + + user_data.free_func = NULL; + plugin_register_flush(callback_name, wt_flush, &user_data); + + return 0; +} + +static int wt_config(oconfig_item_t *ci) +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Node", child->key) == 0) + wt_config_tsd(child); + else + { + ERROR("write_tsdb plugin: Invalid configuration " + "option: %s.", child->key); + } + } + + return 0; +} + +void module_register(void) +{ + plugin_register_complex_config("write_tsdb", wt_config); +} + +/* vim: set sw=4 ts=4 sts=4 tw=78 et : */