to have its measurements fed to collectd. This includes multimeters,
sound level meters, thermometers, and much more.
+ - smart
+ Collect SMART statistics, notably load cycle count, temperature
+ and bad sectors.
+
- snmp
Read values from SNMP (Simple Network Management Protocol) enabled
network devices such as switches, routers, thermometers, rack monitoring
fi
# }}}
+# --with-libatasmart {{{
+with_libatasmart_cppflags=""
+with_libatasmart_ldflags=""
+AC_ARG_WITH(libatasmart, [AS_HELP_STRING([--with-libatasmart@<:@=PREFIX@:>@], [Path to libatasmart.])],
+[
+ if test "x$withval" != "xno" && test "x$withval" != "xyes"
+ then
+ with_libatasmart_cppflags="-I$withval/include"
+ with_libatasmart_ldflags="-L$withval/lib"
+ with_libatasmart="yes"
+ else
+ with_libatasmart="$withval"
+ fi
+],
+[
+ if test "x$ac_system" = "xLinux"
+ then
+ with_libatasmart="yes"
+ else
+ with_libatasmart="no (Linux only library)"
+ fi
+])
+if test "x$with_libatasmart" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_libatasmart_cppflags"
+
+ AC_CHECK_HEADERS(atasmart.h, [with_libatasmart="yes"], [with_libatasmart="no (atasmart.h not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_libatasmart" = "xyes"
+then
+ SAVE_CPPFLAGS="$CPPFLAGS"
+ SAVE_LDFLAGS="$LDFLAGS"
+ CPPFLAGS="$CPPFLAGS $with_libatasmart_cppflags"
+ LDFLAGS="$LDFLAGS $with_libatasmart_ldflags"
+
+ AC_CHECK_LIB(atasmart, sk_disk_open, [with_libatasmart="yes"], [with_libatasmart="no (Symbol 'sk_disk_open' not found)"])
+
+ CPPFLAGS="$SAVE_CPPFLAGS"
+ LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_libatasmart" = "xyes"
+then
+ BUILD_WITH_LIBATASMART_CPPFLAGS="$with_libatasmart_cppflags"
+ BUILD_WITH_LIBATASMART_LDFLAGS="$with_libatasmart_ldflags"
+ BUILD_WITH_LIBATASMART_LIBS="-latasmart"
+ AC_SUBST(BUILD_WITH_LIBATASMART_CPPFLAGS)
+ AC_SUBST(BUILD_WITH_LIBATASMART_LDFLAGS)
+ AC_SUBST(BUILD_WITH_LIBATASMART_LIBS)
+ AC_DEFINE(HAVE_LIBATASMART, 1, [Define if libatasmart is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBATASMART, test "x$with_libatasmart" = "xyes")
+# }}}
+
PKG_CHECK_MODULES([LIBNOTIFY], [libnotify],
[with_libnotify="yes"],
[if test "x$LIBNOTIFY_PKG_ERRORS" = "x"; then
AC_PLUGIN([sensors], [$with_libsensors], [lm_sensors statistics])
AC_PLUGIN([serial], [$plugin_serial], [serial port traffic])
AC_PLUGIN([sigrok], [$with_libsigrok], [sigrok acquisition sources])
+AC_PLUGIN([smart], [$with_libatasmart], [SMART statistics])
AC_PLUGIN([snmp], [$with_libnetsnmp], [SNMP querying plugin])
AC_PLUGIN([statsd], [yes], [StatsD plugin])
AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics])
Libraries:
intel mic . . . . . . $with_mic
libaquaero5 . . . . . $with_libaquaero5
+ libatasmart . . . . . $with_libatasmart
libcurl . . . . . . . $with_libcurl
libdbi . . . . . . . $with_libdbi
libcredis . . . . . . $with_libcredis
sensors . . . . . . . $enable_sensors
serial . . . . . . . $enable_serial
sigrok . . . . . . . $enable_sigrok
+ smart . . . . . . . . $enable_smart
snmp . . . . . . . . $enable_snmp
statsd . . . . . . . $enable_statsd
swap . . . . . . . . $enable_swap
sigrok_la_LIBADD = -lsigrok
endif
+if BUILD_PLUGIN_SMART
+if BUILD_WITH_LIBUDEV
+pkglib_LTLIBRARIES += smart.la
+smart_la_SOURCES = smart.c \
+ utils_ignorelist.c utils_ignorelist.h
+smart_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBATASMART_CPPFLAGS)
+smart_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBATASMART_LDFLAGS)
+smart_la_LIBADD = $(BUILD_WITH_LIBATASMART_LIBS) -ludev
+endif
+endif
+
if BUILD_PLUGIN_SNMP
pkglib_LTLIBRARIES += snmp.la
snmp_la_SOURCES = snmp.c
#@BUILD_PLUGIN_SENSORS_TRUE@LoadPlugin sensors
#@BUILD_PLUGIN_SERIAL_TRUE@LoadPlugin serial
#@BUILD_PLUGIN_SIGROK_TRUE@LoadPlugin sigrok
+#@BUILD_PLUGIN_SMART_TRUE@LoadPlugin smart
#@BUILD_PLUGIN_SNMP_TRUE@LoadPlugin snmp
#@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd
#@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap
# </Device>
#</Plugin>
+#<Plugin smart>
+# Disk "/^[hs]d[a-f][0-9]?$/"
+# IgnoreSelected false
+#</Plugin>
+
#<Plugin snmp>
# <Data "powerplus_voltge_input">
# Type "voltage"
BaseDir "/var/lib/collectd"
PIDFile "/run/collectd.pid"
Interval 10.0
-
+
LoadPlugin cpu
LoadPlugin load
-
+
<LoadPlugin df>
Interval 3600
</LoadPlugin>
<Plugin df>
ValuesPercentage true
</Plugin>
-
+
LoadPlugin ping
<Plugin ping>
Host "example.org"
=back
+=head2 Plugin C<smart>
+
+The C<smart> plugin collects SMART information from physical
+disks. Values collectd include temperature, power cycle count, poweron
+time and bad sectors. Also, all SMART attributes are collected along
+with the normalized current value, the worst value, the threshold and
+a human readable value.
+
+Using the following two options you can ignore some disks or configure the
+collection only of specific disks.
+
+=over 4
+
+=item B<Disk> I<Name>
+
+Select the disk I<Name>. Whether it is collected or ignored depends on the
+B<IgnoreSelected> setting, see below. As with other plugins that use the
+daemon's ignorelist functionality, a string that starts and ends with a slash
+is interpreted as a regular expression. Examples:
+
+ Disk "sdd"
+ Disk "/hda[34]/"
+
+=item B<IgnoreSelected> B<true>|B<false>
+
+Sets whether selected disks, i.E<nbsp>e. the ones matches by any of the B<Disk>
+statements, are ignored or if all other disks are ignored. The behavior
+(hopefully) is intuitive: If no B<Disk> option is configured, all disks are
+collected. If at least one B<Disk> option is given and no B<IgnoreSelected> or
+set to B<false>, B<only> matching disks will be collected. If B<IgnoreSelected>
+is set to B<true>, all disks are collected B<except> the ones matched.
+
+=back
+
=head2 Plugin C<snmp>
Since the configuration of the C<snmp plugin> is a little more complicated than
Specify the protocol to use when communicating with I<Riemann>. Defaults to
B<UDP>.
+=item B<Batch> B<true>|B<false>
+
+If set to B<true> and B<Protocol> is set to B<TCP>,
+events will be batched in memory and flushed at
+regular intervals or when B<BatchMaxSize> is exceeded.
+
+Notifications are not batched and sent as soon as possible.
+
+Defaults to false
+
+=item B<BatchMaxSize> I<size>
+
+Maximum payload size for a riemann packet. Defaults to 8192
+
=item B<StoreRates> B<true>|B<false>
If set to B<true> (the default), convert counter values to rates. If set to
#include <sys/types.h>
#include <sys/socket.h>
+#include <sys/un.h>
#include <netdb.h>
#include <pthread.h>
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
-
+
dirlen = strlen (dir);
while ((dirlen > 0) && (dir[dirlen - 1] == '/'))
dir[--dirlen] = '\0';
static void exit_usage (int status)
{
printf ("Usage: "PACKAGE" [OPTIONS]\n\n"
-
+
"Available options:\n"
" General:\n"
" -C <file> Configuration file.\n"
} /* static int pidfile_remove (const char *file) */
#endif /* COLLECT_DAEMON */
+int notify_upstart (void)
+{
+ const char *upstart_job = getenv("UPSTART_JOB");
+
+ if (upstart_job == NULL)
+ return 0;
+
+ if (strcmp(upstart_job, "collectd") != 0)
+ return 0;
+
+ WARNING ("supervised by upstart, will stop to signal readyness");
+ raise(SIGSTOP);
+ unsetenv("UPSTART_JOB");
+
+ return 1;
+}
+
+int notify_systemd (void)
+{
+ int fd = -1;
+ const char *notifysocket = getenv("NOTIFY_SOCKET");
+ struct sockaddr_un su;
+ struct iovec iov;
+ struct msghdr hdr;
+
+ if (notifysocket == NULL)
+ return 0;
+
+ if ((strchr("@/", notifysocket[0])) == NULL ||
+ strlen(notifysocket) < 2)
+ return 0;
+
+ WARNING ("supervised by systemd, will signal readyness");
+ if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) < 0) {
+ WARNING ("cannot contact systemd socket %s", notifysocket);
+ return 0;
+ }
+
+ bzero(&su, sizeof(su));
+ su.sun_family = AF_UNIX;
+ sstrncpy (su.sun_path, notifysocket, sizeof(su.sun_path));
+
+ if (notifysocket[0] == '@')
+ su.sun_path[0] = 0;
+
+ bzero(&iov, sizeof(iov));
+ iov.iov_base = "READY=1";
+ iov.iov_len = strlen("READY=1");
+
+ bzero(&hdr, sizeof(hdr));
+ hdr.msg_name = &su;
+ hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
+ strlen(notifysocket);
+ hdr.msg_iov = &iov;
+ hdr.msg_iovlen = 1;
+
+ unsetenv("NOTIFY_SOCKET");
+ if (sendmsg(fd, &hdr, MSG_NOSIGNAL) < 0) {
+ WARNING ("cannot send notification to systemd");
+ close(fd);
+ return 0;
+ }
+ close(fd);
+ return 1;
+}
+
int main (int argc, char **argv)
{
struct sigaction sig_int_action;
sig_chld_action.sa_handler = SIG_IGN;
sigaction (SIGCHLD, &sig_chld_action, NULL);
- if (daemonize)
+ /*
+ * Only daemonize if we're not being supervised
+ * by upstart or systemd.
+ */
+ if (daemonize && notify_upstart() == 0 && notify_systemd() == 0)
{
if ((pid = fork ()) == -1)
{
--- /dev/null
+/**
+ * collectd - src/smart.c
+ * Copyright (C) 2014 Vincent Bernat
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ * Vincent Bernat <vbe at exoscale.ch>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_ignorelist.h"
+
+#include <atasmart.h>
+#include <libudev.h>
+
+static const char *config_keys[] =
+{
+ "Disk",
+ "IgnoreSelected"
+};
+
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static ignorelist_t *ignorelist = NULL;
+
+static int smart_config (const char *key, const char *value)
+{
+ if (ignorelist == NULL)
+ ignorelist = ignorelist_create (/* invert = */ 1);
+ if (ignorelist == NULL)
+ return (1);
+
+ if (strcasecmp ("Disk", key) == 0)
+ {
+ ignorelist_add (ignorelist, value);
+ }
+ else if (strcasecmp ("IgnoreSelected", key) == 0)
+ {
+ int invert = 1;
+ if (IS_TRUE (value))
+ invert = 0;
+ ignorelist_set_invert (ignorelist, invert);
+ }
+ else
+ {
+ return (-1);
+ }
+
+ return (0);
+} /* int smart_config */
+
+static void smart_submit (const char *dev, char *type, char *type_inst, double value)
+{
+ value_t values[1];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ values[0].gauge = value;
+
+ vl.values = values;
+ vl.values_len = 1;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "smart", sizeof (vl.plugin));
+ sstrncpy (vl.plugin_instance, dev, sizeof (vl.plugin_instance));
+ sstrncpy (vl.type, type, sizeof (vl.type));
+ sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+
+ plugin_dispatch_values (&vl);
+}
+
+static void smart_handle_disk_attribute(SkDisk *d, const SkSmartAttributeParsedData *a,
+ void* userdata)
+{
+ const char *dev = userdata;
+ value_t values[4];
+ value_list_t vl = VALUE_LIST_INIT;
+
+ if (!a->current_value_valid || !a->worst_value_valid) return;
+ values[0].gauge = a->current_value;
+ values[1].gauge = a->worst_value;
+ values[2].gauge = a->threshold_valid?a->threshold:0;
+ values[3].gauge = a->pretty_value;
+
+ vl.values = values;
+ vl.values_len = 4;
+ sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+ sstrncpy (vl.plugin, "smart", sizeof (vl.plugin));
+ sstrncpy (vl.plugin_instance, dev, sizeof (vl.plugin_instance));
+ sstrncpy (vl.type, "smart_attribute", sizeof (vl.type));
+ sstrncpy (vl.type_instance, a->name, sizeof (vl.type_instance));
+
+ plugin_dispatch_values (&vl);
+
+ if (a->threshold_valid && a->current_value <= a->threshold)
+ {
+ notification_t notif = { NOTIF_WARNING,
+ cdtime (),
+ "",
+ "",
+ "smart", "",
+ "smart_attribute",
+ "",
+ NULL };
+ sstrncpy (notif.host, hostname_g, sizeof (notif.host));
+ sstrncpy (notif.plugin_instance, dev, sizeof (notif.plugin_instance));
+ sstrncpy (notif.type_instance, a->name, sizeof (notif.type_instance));
+ ssnprintf (notif.message, sizeof (notif.message),
+ "attribute %s is below allowed threshold (%d < %d)",
+ a->name, a->current_value, a->threshold);
+ plugin_dispatch_notification (¬if);
+ }
+}
+
+static void smart_handle_disk (const char *dev)
+{
+ SkDisk *d = NULL;
+ SkBool awake = FALSE;
+ SkBool available = FALSE;
+ const char *shortname;
+ const SkSmartParsedData *spd;
+ uint64_t poweron, powercycles, badsectors, temperature;
+
+ shortname = strrchr(dev, '/');
+ if (!shortname) return;
+ shortname++;
+ if (ignorelist_match (ignorelist, shortname) != 0) {
+ DEBUG ("smart plugin: ignoring %s.", dev);
+ return;
+ }
+
+ DEBUG ("smart plugin: checking SMART status of %s.",
+ dev);
+
+ if (sk_disk_open (dev, &d) < 0)
+ {
+ ERROR ("smart plugin: unable to open %s.", dev);
+ return;
+ }
+ if (sk_disk_identify_is_available (d, &available) < 0 || !available)
+ {
+ DEBUG ("smart plugin: disk %s cannot be identified.", dev);
+ goto end;
+ }
+ if (sk_disk_smart_is_available (d, &available) < 0 || !available)
+ {
+ DEBUG ("smart plugin: disk %s has no SMART support.", dev);
+ goto end;
+ }
+ if (sk_disk_check_sleep_mode (d, &awake) < 0 || !awake)
+ {
+ DEBUG ("smart plugin: disk %s is sleeping.", dev);
+ goto end;
+ }
+ if (sk_disk_smart_read_data (d) < 0)
+ {
+ ERROR ("smart plugin: unable to get SMART data for disk %s.", dev);
+ goto end;
+ }
+ if (sk_disk_smart_parse (d, &spd) < 0)
+ {
+ ERROR ("smart plugin: unable to parse SMART data for disk %s.", dev);
+ goto end;
+ }
+
+ /* Get some specific values */
+ if (sk_disk_smart_get_power_on (d, &poweron) < 0)
+ {
+ WARNING ("smart plugin: unable to get milliseconds since power on for %s.",
+ dev);
+ }
+ else
+ smart_submit (shortname, "smart_poweron", "", poweron / 1000.);
+
+ if (sk_disk_smart_get_power_cycle (d, &powercycles) < 0)
+ {
+ WARNING ("smart plugin: unable to get number of power cycles for %s.",
+ dev);
+ }
+ else
+ smart_submit (shortname, "smart_powercycles", "", powercycles);
+
+ if (sk_disk_smart_get_bad (d, &badsectors) < 0)
+ {
+ WARNING ("smart plugin: unable to get number of bad sectors for %s.",
+ dev);
+ }
+ else
+ smart_submit (shortname, "smart_badsectors", "", badsectors);
+
+ if (sk_disk_smart_get_temperature (d, &temperature) < 0)
+ {
+ WARNING ("smart plugin: unable to get temperature for %s.",
+ dev);
+ }
+ else
+ smart_submit (shortname, "smart_temperature", "", temperature / 1000. - 273.15);
+
+ /* Grab all attributes */
+ if (sk_disk_smart_parse_attributes(d, smart_handle_disk_attribute,
+ (char *)shortname) < 0)
+ {
+ ERROR ("smart plugin: unable to handle SMART attributes for %s.",
+ dev);
+ }
+
+end:
+ sk_disk_free(d);
+}
+
+static int smart_read (void)
+{
+ struct udev *handle_udev;
+ struct udev_enumerate *enumerate;
+ struct udev_list_entry *devices, *dev_list_entry;
+ struct udev_device *dev;
+
+ /* Use udev to get a list of disks */
+ handle_udev = udev_new();
+ if (!handle_udev)
+ {
+ ERROR ("smart plugin: unable to initialize udev.");
+ return (-1);
+ }
+ enumerate = udev_enumerate_new (handle_udev);
+ udev_enumerate_add_match_subsystem (enumerate, "block");
+ udev_enumerate_add_match_property (enumerate, "DEVTYPE", "disk");
+ udev_enumerate_scan_devices (enumerate);
+ devices = udev_enumerate_get_list_entry (enumerate);
+ udev_list_entry_foreach (dev_list_entry, devices)
+ {
+ const char *path, *devpath;
+ path = udev_list_entry_get_name (dev_list_entry);
+ dev = udev_device_new_from_syspath (handle_udev, path);
+ devpath = udev_device_get_devnode (dev);
+
+ /* Query status with libatasmart */
+ smart_handle_disk (devpath);
+ }
+
+ udev_enumerate_unref (enumerate);
+ udev_unref (handle_udev);
+
+ return (0);
+} /* int smart_read */
+
+void module_register (void)
+{
+ plugin_register_config ("smart", smart_config,
+ config_keys, config_keys_num);
+ plugin_register_read ("smart", smart_read);
+} /* void module_register */
signal_noise value:GAUGE:U:0
signal_power value:GAUGE:U:0
signal_quality value:GAUGE:0:U
+smart_poweron value:GAUGE:0:U
+smart_powercycles value:GAUGE:0:U
+smart_badsectors value:GAUGE:0:U
+smart_temperature value:GAUGE:-300:300
+smart_attribute current:GAUGE:0:255, worst:GAUGE:0:255, threshold:GAUGE:0:255, pretty:GAUGE:0:U
snr value:GAUGE:0:U
spam_check value:GAUGE:0:U
spam_score value:GAUGE:U:U
#define RIEMANN_HOST "localhost"
#define RIEMANN_PORT "5555"
#define RIEMANN_TTL_FACTOR 2.0
+#define RIEMANN_BATCH_MAX 8192
int write_riemann_threshold_check(const data_set_t *, const value_list_t *, int *);
struct riemann_host {
char *name;
char *event_service_prefix;
-#define F_CONNECT 0x01
+#define F_CONNECT 0x01
uint8_t flags;
- pthread_mutex_t lock;
- _Bool notifications;
- _Bool check_thresholds;
+ pthread_mutex_t lock;
+ _Bool batch_mode;
+ _Bool notifications;
+ _Bool check_thresholds;
_Bool store_rates;
_Bool always_append_ds;
char *node;
char *service;
_Bool use_tcp;
- int s;
+ int s;
double ttl_factor;
-
- int reference_count;
+ Msg *batch_msg;
+ cdtime_t batch_init;
+ int batch_max;
+ int reference_count;
};
static char **riemann_tags;
return (msg);
} /* }}} Msg *riemann_value_list_to_protobuf */
+
+/*
+ * Always call while holding host->lock !
+ */
+static int riemann_batch_flush_nolock (cdtime_t timeout,
+ struct riemann_host *host)
+{
+ cdtime_t now;
+ int status = 0;
+
+ if (timeout > 0) {
+ now = cdtime ();
+ if ((host->batch_init + timeout) > now)
+ return status;
+ }
+ riemann_send_msg(host, host->batch_msg);
+ riemann_msg_protobuf_free(host->batch_msg);
+
+ if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0))
+ riemann_disconnect (host);
+
+ host->batch_init = cdtime();
+ host->batch_msg = NULL;
+ return status;
+}
+
+static int riemann_batch_flush (cdtime_t timeout,
+ const char *identifier __attribute__((unused)),
+ user_data_t *user_data)
+{
+ struct riemann_host *host;
+ int status;
+
+ if (user_data == NULL)
+ return (-EINVAL);
+
+ host = user_data->data;
+ pthread_mutex_lock (&host->lock);
+ status = riemann_batch_flush_nolock (timeout, host);
+ if (status != 0)
+ ERROR ("write_riemann plugin: riemann_send failed with status %i",
+ status);
+
+ pthread_mutex_unlock(&host->lock);
+ return status;
+}
+
+static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */
+ data_set_t const *ds,
+ value_list_t const *vl,
+ int *statuses)
+{
+ size_t i;
+ Event **events;
+ Msg *msg;
+ size_t len;
+ int ret;
+
+ msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+ if (msg == NULL)
+ return -1;
+
+ pthread_mutex_lock(&host->lock);
+
+ if (host->batch_msg == NULL) {
+ host->batch_msg = msg;
+ } else {
+ len = msg->n_events + host->batch_msg->n_events;
+ events = realloc(host->batch_msg->events,
+ (len * sizeof(*host->batch_msg->events)));
+ if (events == NULL) {
+ pthread_mutex_unlock(&host->lock);
+ ERROR ("write_riemann plugin: out of memory");
+ riemann_msg_protobuf_free (msg);
+ return -1;
+ }
+ host->batch_msg->events = events;
+
+ for (i = host->batch_msg->n_events; i < len; i++)
+ host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events];
+
+ host->batch_msg->n_events = len;
+ sfree (msg->events);
+ msg->n_events = 0;
+ sfree (msg);
+ }
+
+ len = msg__get_packed_size(host->batch_msg);
+ ret = 0;
+ if (len >= host->batch_max) {
+ ret = riemann_batch_flush_nolock(0, host);
+ }
+
+ pthread_mutex_unlock(&host->lock);
+ return ret;
+} /* }}} Msg *riemann_batch_add_value_list */
+
static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
{
int status;
if (!host->notifications)
return 0;
+ /*
+ * Never batch for notifications, send them ASAP
+ */
msg = riemann_notification_to_protobuf (host, n);
if (msg == NULL)
return (-1);
const value_list_t *vl,
user_data_t *ud)
{
- int status;
+ int status = 0;
int statuses[vl->values_len];
struct riemann_host *host = ud->data;
Msg *msg;
if (host->check_thresholds)
write_riemann_threshold_check(ds, vl, statuses);
- msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
- if (msg == NULL)
- return (-1);
- status = riemann_send (host, msg);
- if (status != 0)
- ERROR ("write_riemann plugin: riemann_send failed with status %i",
- status);
+ if (host->use_tcp == 1 && host->batch_mode) {
- riemann_msg_protobuf_free (msg);
+ riemann_batch_add_value_list (host, ds, vl, statuses);
+
+
+ } else {
+
+ msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+ if (msg == NULL)
+ return (-1);
+
+ status = riemann_send (host, msg);
+ if (status != 0)
+ ERROR ("write_riemann plugin: riemann_send failed with status %i",
+ status);
+
+ riemann_msg_protobuf_free (msg);
+ }
return status;
} /* }}} int riemann_write */
host->store_rates = 1;
host->always_append_ds = 0;
host->use_tcp = 0;
+ host->batch_mode = 0;
+ host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
+ host->batch_init = cdtime();
host->ttl_factor = RIEMANN_TTL_FACTOR;
status = cf_util_get_string (ci, &host->name);
status = cf_util_get_boolean(child, &host->check_thresholds);
if (status != 0)
break;
+ } else if (strcasecmp ("Batch", child->key) == 0) {
+ status = cf_util_get_boolean(child, &host->batch_mode);
+ if (status != 0)
+ break;
+ } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
+ status = cf_util_get_int(child, &host->batch_max);
+ if (status != 0)
+ break;
} else if (strcasecmp ("Port", child->key) == 0) {
status = cf_util_get_service (child, &host->service);
if (status != 0) {
pthread_mutex_lock (&host->lock);
status = plugin_register_write (callback_name, riemann_write, &ud);
+
+ if (host->use_tcp == 1 && host->batch_mode) {
+ ud.free_func = NULL;
+ plugin_register_flush(callback_name, riemann_batch_flush, &ud);
+ }
if (status != 0)
WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
"failed with status %i.",