From cc0fe32830bc2cfd1353c151aa1028107f5c05de Mon Sep 17 00:00:00 2001 From: Shirly Radco Date: Wed, 5 Dec 2018 21:32:16 +0200 Subject: [PATCH] Add write_syslog output plugin This patch adds an output plugin to send metrics as CEE-enhanced Syslog log messages by TCP . The syslog message includes the metrics data in human readable structured data format and in json format. It allows adding additional metedata. This plugin is based on the write_tsdb plugin. Signed-off-by: Shirly Radco --- AUTHORS | 3 + ChangeLog | 2 + Makefile.am | 6 + README | 4 + configure.ac | 2 + contrib/redhat/collectd.spec | 11 + src/collectd.conf.in | 13 + src/collectd.conf.pod | 135 +++++++++ src/write_syslog.c | 639 +++++++++++++++++++++++++++++++++++++++++++ 9 files changed, 815 insertions(+) create mode 100644 src/write_syslog.c diff --git a/AUTHORS b/AUTHORS index 409655ac..d05b86f0 100644 --- a/AUTHORS +++ b/AUTHORS @@ -172,6 +172,9 @@ Jiri Tyr Julien Ammous - Lua plugin. +Shirly Radco + - write_syslog plugin. + Kevin Bowling - write_tsdb plugin for http://opentsdb.net/ diff --git a/ChangeLog b/ChangeLog index e9a8415b..7ea5a07f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -72,6 +72,8 @@ newer libmicrohttpd. Thanks to Pavel Rochnyak. #2849 * Write Prometheus plugin: set "SO_REUSEADDRESS" on listening socket. Thanks to Pavel Rochnyak. #2570, #2673 + * Write Syslog plugin: The new "write_syslog" plugin writes value + lists as syslog messages. Thanks to Shirly Radco. #3019 2017-11-17, Version 5.8.0 * collectd: The core daemon is now completely licensed under the MIT diff --git a/Makefile.am b/Makefile.am index 485869e8..5957e59f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2057,6 +2057,12 @@ write_stackdriver_la_LIBADD = libformat_stackdriver.la libgce.la liboauth.la \ $(BUILD_WITH_LIBCURL_LIBS) endif +if BUILD_PLUGIN_WRITE_SYSLOG +pkglib_LTLIBRARIES += write_syslog.la +write_syslog_la_SOURCES = src/write_syslog.c +write_syslog_la_LDFLAGS = $(PLUGIN_LDFLAGS) +endif + if BUILD_PLUGIN_WRITE_TSDB pkglib_LTLIBRARIES += write_tsdb.la write_tsdb_la_SOURCES = src/write_tsdb.c diff --git a/README b/README index 87760548..f28d4992 100644 --- a/README +++ b/README @@ -563,6 +563,10 @@ Features Sends data to Sensu, a stream processing and monitoring system, via the Sensu client local TCP socket. + - write_syslog + Sends data in syslog format, using TCP, where the message + contains the metric in human or JSON format. + - write_tsdb Sends data OpenTSDB, a scalable no master, no shared state time series database. diff --git a/configure.ac b/configure.ac index 5df84b6a..b272364b 100644 --- a/configure.ac +++ b/configure.ac @@ -6931,6 +6931,7 @@ AC_PLUGIN([write_redis], [$with_libhiredis], [Redis output plug AC_PLUGIN([write_riemann], [$with_libriemann_client], [Riemann output plugin]) AC_PLUGIN([write_sensu], [yes], [Sensu output plugin]) AC_PLUGIN([write_stackdriver], [$plugin_write_stackdriver], [Google Stackdriver Monitoring output plugin]) +AC_PLUGIN([write_syslog], [yes], [Syslog output plugin]) AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin]) AC_PLUGIN([xencpu], [$plugin_xencpu], [Xen Host CPU usage]) AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics]) @@ -7356,6 +7357,7 @@ AC_MSG_RESULT([ write_redis . . . . . $enable_write_redis]) AC_MSG_RESULT([ write_riemann . . . . $enable_write_riemann]) AC_MSG_RESULT([ write_sensu . . . . . $enable_write_sensu]) AC_MSG_RESULT([ write_stackdriver . . $enable_write_stackdriver]) +AC_MSG_RESULT([ write_syslog . . . . $enable_write_syslog]) AC_MSG_RESULT([ write_tsdb . . . . . $enable_write_tsdb]) AC_MSG_RESULT([ xencpu . . . . . . . $enable_xencpu]) AC_MSG_RESULT([ xmms . . . . . . . . $enable_xmms]) diff --git a/contrib/redhat/collectd.spec b/contrib/redhat/collectd.spec index 6f86b7e8..1ba7d119 100644 --- a/contrib/redhat/collectd.spec +++ b/contrib/redhat/collectd.spec @@ -161,6 +161,7 @@ %define with_write_redis 0%{!?_without_write_redis:1} %define with_write_riemann 0%{!?_without_write_riemann:1} %define with_write_sensu 0%{!?_without_write_sensu:1} +%define with_write_syslog 0%{!?_without_write_syslog:1} %define with_write_tsdb 0%{!?_without_write_tsdb:1} %define with_xmms 0%{!?_without_xmms:0%{?_has_xmms}} %define with_zfs_arc 0%{!?_without_zfs_arc:1} @@ -1857,6 +1858,12 @@ Collectd utilities %define _with_write_sensu --disable-write_sensu %endif +%if %{with_write_syslog} +%define _with_write_syslog --enable-write_syslog +%else +%define _with_write_syslog --disable-write_syslog +%endif + %if %{with_write_tsdb} %define _with_write_tsdb --enable-write_tsdb %else @@ -2054,6 +2061,7 @@ Collectd utilities %{?_with_write_redis} \ %{?_with_write_riemann} \ %{?_with_write_sensu} \ + %{?_with_write_syslog} \ %{?_with_write_tsdb} \ %{?_with_xencpu} \ %{?_with_xmms} \ @@ -2388,6 +2396,9 @@ fi %if %{with_write_log} %{_libdir}/%{name}/write_log.so %endif +%if %{with_write_syslog} +%{_libdir}/%{name}/write_syslog.so +%endif %if %{with_write_sensu} %{_libdir}/%{name}/write_sensu.so %endif diff --git a/src/collectd.conf.in b/src/collectd.conf.in index f58d3b49..e37c5e86 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -223,6 +223,7 @@ #@BUILD_PLUGIN_WRITE_RIEMANN_TRUE@LoadPlugin write_riemann #@BUILD_PLUGIN_WRITE_SENSU_TRUE@LoadPlugin write_sensu #@BUILD_PLUGIN_WRITE_STACKDRIVER_TRUE@LoadPlugin write_stackdriver +#@BUILD_PLUGIN_WRITE_SYSLOG_TRUE@LoadPlugin write_syslog #@BUILD_PLUGIN_WRITE_TSDB_TRUE@LoadPlugin write_tsdb #@BUILD_PLUGIN_XENCPU_TRUE@LoadPlugin xencpu #@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms @@ -1808,6 +1809,18 @@ # Url "https://monitoring.googleapis.com/v3" # +# +# +# Host "localhost" +# Port "44514" +# Prefix "collectd" +# MessageFormat "human" +# HostTags "" +# StoreRates false +# AlwaysAppendDS false +# +# + # # # Host "localhost" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 88fa37c4..d13932bb 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -10596,6 +10596,141 @@ C. =back +=head2 Plugin C + +The C plugin writes data in I format log messages. +It implements the basic syslog protocol, RFC 5424, extends it with +content-based filtering, rich filtering capabilities, +flexible configuration options and adds features such as using TCP for transport. +The plugin can connect to a I daemon, like syslog-ng and rsyslog, that will +ingest metrics, transform and ship them to the specified output. +The plugin uses I over the "line based" protocol with a default port 44514. +The data will be sent in blocks of at most 1428 bytes to minimize the number of +network packets. + +Synopsis: + + + ResolveInterval 60 + ResolveJitter 60 + + Host "syslog-1.my.domain" + Port "44514" + Prefix "collectd" + MessageFormat "human" + HostTags "" + + + +The configuration consists of one or more EBEIE +blocks and global directives. + +Global directives are: + +=over 4 + +=item B I + +=item B I + +When I connects to a syslog node, it will request the hostname from +DNS. This can become a problem if the syslog node is unavailable or badly +configured because collectd will request DNS in order to reconnect for every +metric, which can flood your DNS. So you can cache the last value for +I seconds. +Defaults to the I of the I, e.g. 10Eseconds. + +You can also define a jitter, a random interval to wait in addition to +I. This prevents all your collectd servers to resolve the +hostname at the same time when the connection fails. +Defaults to the I of the I, e.g. 10Eseconds. + +B If the DNS resolution has already been successful when the socket +closes, the plugin will try to reconnect immediately with the cached +information. DNS is queried only when the socket is closed for a longer than +I + I seconds. + +=back + +Inside the B blocks, the following options are recognized: + +=over 4 + +=item B I
+ +Hostname or address to connect to. Defaults to C. + +=item B I + +Service name or port number to connect to. Defaults to C<44514>. + + +=item B I + +When set, I is added to the end of the metric. +It is intended to be used for adding additional metadata to tag the metric with. +Dots and whitespace are I escaped in this string. + +Examples: + +When MessageFormat is set to "human". + + ["prefix1" "example1"="example1_v"]["prefix2" "example2"="example2_v"]" + +When MessageFormat is set to "JSON", text should be in JSON format. +Escaping the quotation marks is required. + + HostTags "\"prefix1\": {\"example1\":\"example1_v\",\"example2\":\"example2_v\"}" + +=item B I + +I selects the format in which messages are sent to the +syslog deamon, human or JSON. Defaults to human. + +Syslog message format: + +VERSION ISOTIMESTAMP HOSTNAME APPLICATION PID MESSAGEID STRUCTURED-DATA MSG + +The difference between the message formats are in the STRUCTURED-DATA and MSG parts. + +Human format: + + <166>1 ISOTIMESTAMP HOSTNAME collectd PID MESSAGEID + ["collectd" "value": "v1" "plugin"="plugin_v" "plugin_instance"="plugin_instance_v" + "type_instance"="type_instance_v" "type"="type_v" "ds_name"="ds_name_v" "interval"="interval_v" ] + "host_tag_example"="host_tag_example_v" plugin_v.type_v.ds_name_v="v1" + +JSON format: + + <166>1 ISOTIMESTAMP HOSTNAME collectd PID MESSAGEID STRUCTURED-DATA + { + "collectd": { + "time": time_as_epoch, "interval": interval_v, "plugin": "plugin_v", + "plugin_instance": "plugin_instance_v", "type":"type_v", + "type_instance": "type_instance_v", "plugin_v": {"type_v": v1} + } , "host":"host_v", "host_tag_example": "host_tag_example_v" + } + +=item B B|B + +If set to B, convert counter values to rates. If set to B +(the default) counter values are stored as is, as an increasing +integer number. + +=item B B|B + +If set to B, append the name of the I (DS) to the "metric" +identifier. If set to B (the default), this is only done when there is +more than one DS. + +=item B I + +When set, I is added to all metrics names as a prefix. It is intended in +case you want to be able to define the source of the specific metric. Dots and +whitespace are I escaped in this string. + +=back + =head2 Plugin C This plugin collects metrics of hardware CPU load for machine running Xen diff --git a/src/write_syslog.c b/src/write_syslog.c new file mode 100644 index 00000000..92c5ddeb --- /dev/null +++ b/src/write_syslog.c @@ -0,0 +1,639 @@ +/** + * collectd - src/write_syslog.c + * Copyright (C) 2012 Pierre-Yves Ritschard + * Copyright (C) 2011 Scott Sanders + * Copyright (C) 2009 Paul Sadauskas + * Copyright (C) 2009 Doug MacEachern + * Copyright (C) 2007-2012 Florian octo Forster + * Copyright (C) 2013-2014 Limelight Networks, Inc. + * Copyright (C) 2019 Shirly Radco + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Based on the write_graphite plugin. Authors: + * Florian octo Forster + * Doug MacEachern + * Paul Sadauskas + * Scott Sanders + * Pierre-Yves Ritschard + * Based on the write_tsdb plugin. Authors: + * Brett Hawn + * Kevin Bowling + * write_syslog. Authors: + * Shirly Radco + **/ + +/* write_syslog plugin configuration example + * + * + * + * Host "localhost" + * Port "44514" + * Prefix "collectd" + * MessageFormat "human" + * HostTags "["prefix1" "example1"="example1_v"] + * + * + * + */ + +#include "collectd.h" +#include "utils/common/common.h" + +#include "plugin.h" +#include "utils_cache.h" +#include "utils_random.h" + +#include + +#define WS_DEFAULT_NODE "localhost" + +#define WS_DEFAULT_SERVICE "44514" + +#define WS_DEFAULT_FORMAT "human" + +#define WS_DEFAULT_PREFIX "collectd" + +#define WS_DEFAULT_ESCAPE '.' + +/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ +#define WS_SEND_BUF_SIZE 1428 + +/* + * Private variables + */ +struct ws_callback { + struct addrinfo *ai; + cdtime_t ai_last_update; + int sock_fd; + + char *node; + char *service; + char *host_tags; + char *msg_format; + char *metrics_prefix; + bool store_rates; + bool always_append_ds; + + char send_buf[WS_SEND_BUF_SIZE]; + size_t send_buf_free; + size_t send_buf_fill; + cdtime_t send_buf_init_time; + + pthread_mutex_t send_lock; + + bool connect_failed_log_enabled; + int connect_dns_failed_attempts_remaining; + cdtime_t next_random_ttl; +}; + +static cdtime_t resolve_interval; +static cdtime_t resolve_jitter; + +/* + * Functions + */ +static void ws_reset_buffer(struct ws_callback *cb) { + memset(cb->send_buf, 0, sizeof(cb->send_buf)); + cb->send_buf_free = sizeof(cb->send_buf); + cb->send_buf_fill = 0; + cb->send_buf_init_time = cdtime(); +} + +static int ws_send_buffer(struct ws_callback *cb) { + ssize_t status = 0; + + status = swrite(cb->sock_fd, cb->send_buf, strlen(cb->send_buf)); + if (status != 0) { + ERROR("write_syslog plugin: send failed with status %zi (%s)", status, + STRERRNO); + + if (cb->sock_fd > 0) { + close(cb->sock_fd); + cb->sock_fd = -1; + } + + return -1; + } + + return 0; +} + +/* NOTE: You must hold cb->send_lock when calling this function! */ +static int ws_flush_nolock(cdtime_t timeout, struct ws_callback *cb) { + int status; + + DEBUG("write_syslog plugin: ws_flush_nolock: timeout = %.3f; " + "send_buf_fill = %" PRIsz ";", + (double)timeout, cb->send_buf_fill); + + /* timeout == 0 => flush unconditionally */ + if (timeout > 0) { + cdtime_t now; + + now = cdtime(); + if ((cb->send_buf_init_time + timeout) > now) + return 0; + } + + if (cb->send_buf_fill == 0) { + cb->send_buf_init_time = cdtime(); + return 0; + } + + status = ws_send_buffer(cb); + ws_reset_buffer(cb); + + return status; +} + +static cdtime_t new_random_ttl(void) { + if (resolve_jitter == 0) + return 0; + + return (cdtime_t)cdrand_range(0, (long)resolve_jitter); +} + +static int ws_callback_init(struct ws_callback *cb) { + int status; + cdtime_t now; + + const char *node = cb->node ? cb->node : WS_DEFAULT_NODE; + const char *service = cb->service ? cb->service : WS_DEFAULT_SERVICE; + + if (cb->sock_fd > 0) + return 0; + + now = cdtime(); + if (cb->ai) { + /* When we are here, we still have the IP in cache. + * If we have remaining attempts without calling the DNS, we update the + * last_update date so we keep the info until next time. + * If there is no more attempts, we need to flush the cache. + */ + + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) < now) { + cb->next_random_ttl = new_random_ttl(); + if (cb->connect_dns_failed_attempts_remaining > 0) { + /* Warning : this is run under send_lock mutex. + * This is why we do not use another mutex here. + * */ + cb->ai_last_update = now; + cb->connect_dns_failed_attempts_remaining--; + } else { + freeaddrinfo(cb->ai); + cb->ai = NULL; + } + } + } + + if (cb->ai == NULL) { + if ((cb->ai_last_update + resolve_interval + cb->next_random_ttl) >= now) { + DEBUG("write_syslog plugin: too many getaddrinfo(%s, %s) failures", node, + service); + return -1; + } + cb->ai_last_update = now; + cb->next_random_ttl = new_random_ttl(); + + struct addrinfo ai_hints = { + .ai_family = AF_UNSPEC, + .ai_flags = AI_ADDRCONFIG, + .ai_socktype = SOCK_STREAM, + }; + + status = getaddrinfo(node, service, &ai_hints, &cb->ai); + if (status != 0) { + if (cb->ai) { + freeaddrinfo(cb->ai); + cb->ai = NULL; + } + if (cb->connect_failed_log_enabled) { + ERROR("write_syslog plugin: getaddrinfo(%s, %s) failed: %s", node, + service, gai_strerror(status)); + cb->connect_failed_log_enabled = 0; + } + return -1; + } + } + + assert(cb->ai != NULL); + for (struct addrinfo *ai = cb->ai; ai != NULL; ai = ai->ai_next) { + cb->sock_fd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (cb->sock_fd < 0) + continue; + + set_sock_opts(cb->sock_fd); + + status = connect(cb->sock_fd, ai->ai_addr, ai->ai_addrlen); + if (status != 0) { + close(cb->sock_fd); + cb->sock_fd = -1; + continue; + } + + break; + } + + if (cb->sock_fd < 0) { + ERROR("write_syslog plugin: Connecting to %s:%s failed. " + "The last error was: %s", + node, service, STRERRNO); + return -1; + } + + if (cb->connect_failed_log_enabled == 0) { + INFO("write_syslog plugin: Connecting to %s:%s succeeded.", node, service); + cb->connect_failed_log_enabled = 1; + } + cb->connect_dns_failed_attempts_remaining = 1; + + ws_reset_buffer(cb); + + return 0; +} + +static void ws_callback_free(void *data) { + struct ws_callback *cb; + + if (data == NULL) + return; + + cb = data; + + pthread_mutex_lock(&cb->send_lock); + + ws_flush_nolock(0, cb); + + close(cb->sock_fd); + cb->sock_fd = -1; + + sfree(cb->node); + sfree(cb->service); + sfree(cb->host_tags); + sfree(cb->msg_format); + sfree(cb->metrics_prefix); + + pthread_mutex_unlock(&cb->send_lock); + pthread_mutex_destroy(&cb->send_lock); + + sfree(cb); +} + +static int ws_flush(cdtime_t timeout, + const char *identifier __attribute__((unused)), + user_data_t *user_data) { + struct ws_callback *cb; + int status; + + if (user_data == NULL) + return -EINVAL; + + cb = user_data->data; + + pthread_mutex_lock(&cb->send_lock); + + if (cb->sock_fd < 0) { + status = ws_callback_init(cb); + if (status != 0) { + ERROR("write_syslog plugin: ws_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; + } + } + + status = ws_flush_nolock(timeout, cb); + pthread_mutex_unlock(&cb->send_lock); + + return status; +} + +static int ws_format_values(char *ret, size_t ret_len, int ds_num, + const data_set_t *ds, const value_list_t *vl, + bool store_rates) { + size_t offset = 0; + int status; + gauge_t *rates = NULL; + + assert(strcmp(ds->type, vl->type) == 0); + + memset(ret, 0, ret_len); + +#define BUFFER_ADD(...) \ + do { \ + status = snprintf(ret + offset, ret_len - offset, __VA_ARGS__); \ + if (status < 1) { \ + sfree(rates); \ + return -1; \ + } else if (((size_t)status) >= (ret_len - offset)) { \ + sfree(rates); \ + return -1; \ + } else \ + offset += ((size_t)status); \ + } while (0) + + if (ds->ds[ds_num].type == DS_TYPE_GAUGE) + BUFFER_ADD(GAUGE_FORMAT, vl->values[ds_num].gauge); + else if (store_rates) { + if (rates == NULL) + rates = uc_get_rate(ds, vl); + if (rates == NULL) { + WARNING("format_values: " + "uc_get_rate failed."); + return -1; + } + BUFFER_ADD(GAUGE_FORMAT, rates[ds_num]); + } else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) + BUFFER_ADD("%" PRIu64, (uint64_t)vl->values[ds_num].counter); + else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) + BUFFER_ADD("%" PRIi64, vl->values[ds_num].derive); + else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD("%" PRIu64, vl->values[ds_num].absolute); + else { + ERROR("format_values plugin: Unknown data source type: %i", + ds->ds[ds_num].type); + sfree(rates); + return -1; + } + +#undef BUFFER_ADD + + sfree(rates); + return 0; +} + +static int ws_format_name(char *ret, int ret_len, const value_list_t *vl, + const struct ws_callback *cb, const char *ds_name) { + + if (ds_name != NULL) { + snprintf(ret, ret_len, "%s.%s", vl->type, ds_name); + } else { /* ds_name == NULL */ + snprintf(ret, ret_len, "%s", vl->type); + } + + return 0; +} + +static int ws_send_message(const char *key, const char *value, cdtime_t time, + struct ws_callback *cb, const char *plugin, + const char *plugin_instance, + const char *type_instance, const char *type, + const char *ds_name, cdtime_t interval, + const char *host) { + int status; + size_t message_len; + char message[1024]; + char rfc3339_timestamp[64]; + const char *host_tags = cb->host_tags ? cb->host_tags : ""; + const char *host_tags_json_prefix = ""; + const char *metrics_prefix = + cb->metrics_prefix ? cb->metrics_prefix : WS_DEFAULT_PREFIX; + const char *msg_format = cb->msg_format ? cb->msg_format : WS_DEFAULT_FORMAT; + int pid; + + pid = getpid(); + + rfc3339_local(rfc3339_timestamp, sizeof(rfc3339_timestamp), time); + + /* skip if value is NaN */ + if (value[0] == 'n') + return 0; + + if (strcasecmp("JSON", msg_format) == 0) { + if (cb->host_tags) { + host_tags_json_prefix = ","; + } + status = snprintf( + /* The metric key-values are are part of the syslog msg, in json + format */ + message, sizeof(message), + "<166>1 %s %s collectd %d - - {\"time\":%.0f, \"%s\":{ \"%s\":{ " + "\"%s\":%s }, " + "\"plugin\":\"%s\", \"plugin_instance\":\"%s\", " + "\"type_instance\":\"%s\"," + " \"type\":\"%s\", \"interval\":%.0f }, \"hostname\":\"%s\" %s " + "%s}\n", + rfc3339_timestamp, host, pid, CDTIME_T_TO_DOUBLE(time), metrics_prefix, + plugin, key, value, plugin, plugin_instance, type_instance, type, + CDTIME_T_TO_DOUBLE(interval), host, host_tags_json_prefix, host_tags); + } else { + status = snprintf( + /* The metric key-values are part of the syslog structrude data, + * MessageFormat = "human" */ + message, sizeof(message), + "<166>1 %s %s collectd %d - [%s value=\"%s\"" + " plugin=\"%s\" plugin_instance=\"%s\"" + " type_instance=\"%s\" type=\"%s\"" + " ds_name=\"%s\" interval=\"%.0f\"] %s %s.%s=\"%s\"\n", + rfc3339_timestamp, host, pid, metrics_prefix, value, plugin, + plugin_instance, type_instance, type, ds_name, + CDTIME_T_TO_DOUBLE(interval), host_tags, plugin, key, value); + } + if (status < 0) + return -1; + message_len = (size_t)status; + + if (message_len >= sizeof(message)) { + ERROR("write_syslog plugin: message buffer too small: " + "Need %" PRIsz " bytes.", + message_len + 1); + return -1; + } + + pthread_mutex_lock(&cb->send_lock); + + if (cb->sock_fd < 0) { + status = ws_callback_init(cb); + if (status != 0) { + ERROR("write_syslog plugin: ws_callback_init failed."); + pthread_mutex_unlock(&cb->send_lock); + return -1; + } + } + + if (message_len >= cb->send_buf_free) { + status = ws_flush_nolock(0, cb); + if (status != 0) { + pthread_mutex_unlock(&cb->send_lock); + return status; + } + } + + /* Assert that we have enough space for this message. */ + assert(message_len < cb->send_buf_free); + + /* `message_len + 1' because `message_len' does not include the + * trailing null byte. Neither does `send_buffer_fill'. */ + memcpy(cb->send_buf + cb->send_buf_fill, message, message_len + 1); + cb->send_buf_fill += message_len; + cb->send_buf_free -= message_len; + + DEBUG("write_syslog plugin: [%s]:%s buf %" PRIsz "/%" PRIsz + " (%.1f %%) \"%s\"", + cb->node, cb->service, cb->send_buf_fill, sizeof(cb->send_buf), + 100.0 * ((double)cb->send_buf_fill) / ((double)sizeof(cb->send_buf)), + message); + + pthread_mutex_unlock(&cb->send_lock); + + return 0; +} + +static int ws_write_messages(const data_set_t *ds, const value_list_t *vl, + struct ws_callback *cb) { + char key[10 * DATA_MAX_NAME_LEN]; + char values[512]; + + int status; + + if (0 != strcmp(ds->type, vl->type)) { + ERROR("write_syslog plugin: DS type does not match " + "value list type"); + return -1; + } + + for (size_t i = 0; i < ds->ds_num; i++) { + const char *ds_name = NULL; + + if (cb->always_append_ds || (ds->ds_num > 1)) + ds_name = ds->ds[i].name; + + /* Copy the identifier to 'key' and escape it. */ + status = ws_format_name(key, sizeof(key), vl, cb, ds_name); + if (status != 0) { + ERROR("write_syslog plugin: error with format_name"); + return status; + } + + escape_string(key, sizeof(key)); + /* Convert the values to an ASCII representation and put that into + * 'values'. */ + status = + ws_format_values(values, sizeof(values), i, ds, vl, cb->store_rates); + if (status != 0) { + ERROR("write_syslog plugin: error with " + "ws_format_values"); + return status; + } + + /* Send the message to tcp */ + status = ws_send_message(key, values, vl->time, cb, vl->plugin, + vl->plugin_instance, vl->type_instance, vl->type, + ds_name, vl->interval, vl->host); + if (status != 0) { + ERROR("write_syslog plugin: error with " + "ws_send_message"); + return status; + } + } + + return 0; +} + +static int ws_write(const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data) { + struct ws_callback *cb; + int status; + + if (user_data == NULL) + return EINVAL; + + cb = user_data->data; + + status = ws_write_messages(ds, vl, cb); + + return status; +} + +static int ws_config_tsd(oconfig_item_t *ci) { + struct ws_callback *cb; + char callback_name[DATA_MAX_NAME_LEN]; + + cb = calloc(1, sizeof(*cb)); + if (cb == NULL) { + ERROR("write_syslog plugin: calloc failed."); + return -1; + } + cb->sock_fd = -1; + cb->connect_failed_log_enabled = 1; + cb->next_random_ttl = new_random_ttl(); + + pthread_mutex_init(&cb->send_lock, NULL); + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Host", child->key) == 0) + cf_util_get_string(child, &cb->node); + else if (strcasecmp("Port", child->key) == 0) + cf_util_get_service(child, &cb->service); + else if (strcasecmp("MessageFormat", child->key) == 0) + cf_util_get_string(child, &cb->msg_format); + else if (strcasecmp("HostTags", child->key) == 0) + cf_util_get_string(child, &cb->host_tags); + else if (strcasecmp("StoreRates", child->key) == 0) + cf_util_get_boolean(child, &cb->store_rates); + else if (strcasecmp("AlwaysAppendDS", child->key) == 0) + cf_util_get_boolean(child, &cb->always_append_ds); + else if (strcasecmp("Prefix", child->key) == 0) + cf_util_get_string(child, &cb->metrics_prefix); + else { + ERROR("write_syslog plugin: Invalid configuration " + "option: %s.", + child->key); + return -1; + } + } + + snprintf(callback_name, sizeof(callback_name), "write_syslog/%s/%s", + cb->node != NULL ? cb->node : WS_DEFAULT_NODE, + cb->service != NULL ? cb->service : WS_DEFAULT_SERVICE); + + user_data_t user_data = {.data = cb, .free_func = ws_callback_free}; + + plugin_register_write(callback_name, ws_write, &user_data); + + user_data.free_func = NULL; + plugin_register_flush(callback_name, ws_flush, &user_data); + + return 0; +} + +static int ws_config(oconfig_item_t *ci) { + if ((resolve_interval == 0) && (resolve_jitter == 0)) + resolve_interval = resolve_jitter = plugin_get_interval(); + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Node", child->key) == 0) { + if (ws_config_tsd(child) < 0) + return -1; + } else if (strcasecmp("ResolveInterval", child->key) == 0) + cf_util_get_cdtime(child, &resolve_interval); + else if (strcasecmp("ResolveJitter", child->key) == 0) + cf_util_get_cdtime(child, &resolve_jitter); + else { + ERROR("write_syslog plugin: Invalid configuration " + "option: %s.", + child->key); + return -1; + } + } + + return 0; +} + +void module_register(void) { + plugin_register_complex_config("write_syslog", ws_config); +} -- 2.11.0