Merge pull request #795 from vincentbernat/fix/libstatgrab
authorPierre-Yves Ritschard <pyr@spootnik.org>
Sat, 15 Nov 2014 09:51:49 +0000 (10:51 +0100)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Sat, 15 Nov 2014 09:51:49 +0000 (10:51 +0100)
libstatgrab: fix sg_init() invocation for libstatgrab >= 0.9

README
configure.ac
src/Makefile.am
src/collectd.conf.in
src/collectd.conf.pod
src/daemon/collectd.c
src/smart.c [new file with mode: 0644]
src/types.db
src/write_riemann.c

diff --git a/README b/README
index 3e3a030..7aa83b0 100644 (file)
--- a/README
+++ b/README
@@ -293,6 +293,10 @@ Features
       to have its measurements fed to collectd. This includes multimeters,
       sound level meters, thermometers, and much more.
 
+    - smart
+      Collect SMART statistics, notably load cycle count, temperature
+      and bad sectors.
+
     - snmp
       Read values from SNMP (Simple Network Management Protocol) enabled
       network devices such as switches, routers, thermometers, rack monitoring
index 7341050..13577c9 100644 (file)
@@ -4897,6 +4897,62 @@ then
 fi
 # }}}
 
+# --with-libatasmart {{{
+with_libatasmart_cppflags=""
+with_libatasmart_ldflags=""
+AC_ARG_WITH(libatasmart, [AS_HELP_STRING([--with-libatasmart@<:@=PREFIX@:>@], [Path to libatasmart.])],
+[
+       if test "x$withval" != "xno" && test "x$withval" != "xyes"
+       then
+               with_libatasmart_cppflags="-I$withval/include"
+               with_libatasmart_ldflags="-L$withval/lib"
+               with_libatasmart="yes"
+       else
+               with_libatasmart="$withval"
+       fi
+],
+[
+       if test "x$ac_system" = "xLinux"
+       then
+               with_libatasmart="yes"
+       else
+               with_libatasmart="no (Linux only library)"
+       fi
+])
+if test "x$with_libatasmart" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_libatasmart_cppflags"
+
+       AC_CHECK_HEADERS(atasmart.h, [with_libatasmart="yes"], [with_libatasmart="no (atasmart.h not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_libatasmart" = "xyes"
+then
+       SAVE_CPPFLAGS="$CPPFLAGS"
+       SAVE_LDFLAGS="$LDFLAGS"
+       CPPFLAGS="$CPPFLAGS $with_libatasmart_cppflags"
+       LDFLAGS="$LDFLAGS $with_libatasmart_ldflags"
+
+       AC_CHECK_LIB(atasmart, sk_disk_open, [with_libatasmart="yes"], [with_libatasmart="no (Symbol 'sk_disk_open' not found)"])
+
+       CPPFLAGS="$SAVE_CPPFLAGS"
+       LDFLAGS="$SAVE_LDFLAGS"
+fi
+if test "x$with_libatasmart" = "xyes"
+then
+       BUILD_WITH_LIBATASMART_CPPFLAGS="$with_libatasmart_cppflags"
+       BUILD_WITH_LIBATASMART_LDFLAGS="$with_libatasmart_ldflags"
+       BUILD_WITH_LIBATASMART_LIBS="-latasmart"
+       AC_SUBST(BUILD_WITH_LIBATASMART_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_LIBATASMART_LDFLAGS)
+       AC_SUBST(BUILD_WITH_LIBATASMART_LIBS)
+       AC_DEFINE(HAVE_LIBATASMART, 1, [Define if libatasmart is present and usable.])
+fi
+AM_CONDITIONAL(BUILD_WITH_LIBATASMART, test "x$with_libatasmart" = "xyes")
+# }}}
+
 PKG_CHECK_MODULES([LIBNOTIFY], [libnotify],
                [with_libnotify="yes"],
                [if test "x$LIBNOTIFY_PKG_ERRORS" = "x"; then
@@ -5478,6 +5534,7 @@ AC_PLUGIN([rrdtool],     [$with_librrd],       [RRDTool output plugin])
 AC_PLUGIN([sensors],     [$with_libsensors],   [lm_sensors statistics])
 AC_PLUGIN([serial],      [$plugin_serial],     [serial port traffic])
 AC_PLUGIN([sigrok],      [$with_libsigrok],    [sigrok acquisition sources])
+AC_PLUGIN([smart],       [$with_libatasmart],  [SMART statistics])
 AC_PLUGIN([snmp],        [$with_libnetsnmp],   [SNMP querying plugin])
 AC_PLUGIN([statsd],      [yes],                [StatsD plugin])
 AC_PLUGIN([swap],        [$plugin_swap],       [Swap usage statistics])
@@ -5698,6 +5755,7 @@ Configuration:
   Libraries:
     intel mic . . . . . . $with_mic
     libaquaero5 . . . . . $with_libaquaero5
+    libatasmart . . . . . $with_libatasmart
     libcurl . . . . . . . $with_libcurl
     libdbi  . . . . . . . $with_libdbi
     libcredis . . . . . . $with_libcredis
@@ -5842,6 +5900,7 @@ Configuration:
     sensors . . . . . . . $enable_sensors
     serial  . . . . . . . $enable_serial
     sigrok  . . . . . . . $enable_sigrok
+    smart . . . . . . . . $enable_smart
     snmp  . . . . . . . . $enable_snmp
     statsd  . . . . . . . $enable_statsd
     swap  . . . . . . . . $enable_swap
index b8aab9a..74c5007 100644 (file)
@@ -885,6 +885,17 @@ sigrok_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBSIGROK_LDFLAGS)
 sigrok_la_LIBADD = -lsigrok
 endif
 
+if BUILD_PLUGIN_SMART
+if BUILD_WITH_LIBUDEV
+pkglib_LTLIBRARIES += smart.la
+smart_la_SOURCES = smart.c \
+                  utils_ignorelist.c utils_ignorelist.h
+smart_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBATASMART_CPPFLAGS)
+smart_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBATASMART_LDFLAGS)
+smart_la_LIBADD = $(BUILD_WITH_LIBATASMART_LIBS) -ludev
+endif
+endif
+
 if BUILD_PLUGIN_SNMP
 pkglib_LTLIBRARIES += snmp.la
 snmp_la_SOURCES = snmp.c
index fabf634..8e7f3fc 100644 (file)
 #@BUILD_PLUGIN_SENSORS_TRUE@LoadPlugin sensors
 #@BUILD_PLUGIN_SERIAL_TRUE@LoadPlugin serial
 #@BUILD_PLUGIN_SIGROK_TRUE@LoadPlugin sigrok
+#@BUILD_PLUGIN_SMART_TRUE@LoadPlugin smart
 #@BUILD_PLUGIN_SNMP_TRUE@LoadPlugin snmp
 #@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd
 #@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap
 #  </Device>
 #</Plugin>
 
+#<Plugin smart>
+#  Disk "/^[hs]d[a-f][0-9]?$/"
+#  IgnoreSelected false
+#</Plugin>
+
 #<Plugin snmp>
 #   <Data "powerplus_voltge_input">
 #       Type "voltage"
index da2030d..7da36b8 100644 (file)
@@ -9,17 +9,17 @@ collectd.conf - Configuration for the system statistics collection daemon B<coll
   BaseDir "/var/lib/collectd"
   PIDFile "/run/collectd.pid"
   Interval 10.0
-  
+
   LoadPlugin cpu
   LoadPlugin load
-  
+
   <LoadPlugin df>
     Interval 3600
   </LoadPlugin>
   <Plugin df>
     ValuesPercentage true
   </Plugin>
-  
+
   LoadPlugin ping
   <Plugin ping>
     Host "example.org"
@@ -5639,6 +5639,40 @@ measurements are discarded.
 
 =back
 
+=head2 Plugin C<smart>
+
+The C<smart> plugin collects SMART information from physical
+disks. Values collectd include temperature, power cycle count, poweron
+time and bad sectors. Also, all SMART attributes are collected along
+with the normalized current value, the worst value, the threshold and
+a human readable value.
+
+Using the following two options you can ignore some disks or configure the
+collection only of specific disks.
+
+=over 4
+
+=item B<Disk> I<Name>
+
+Select the disk I<Name>. Whether it is collected or ignored depends on the
+B<IgnoreSelected> setting, see below. As with other plugins that use the
+daemon's ignorelist functionality, a string that starts and ends with a slash
+is interpreted as a regular expression. Examples:
+
+  Disk "sdd"
+  Disk "/hda[34]/"
+
+=item B<IgnoreSelected> B<true>|B<false>
+
+Sets whether selected disks, i.E<nbsp>e. the ones matches by any of the B<Disk>
+statements, are ignored or if all other disks are ignored. The behavior
+(hopefully) is intuitive: If no B<Disk> option is configured, all disks are
+collected. If at least one B<Disk> option is given and no B<IgnoreSelected> or
+set to B<false>, B<only> matching disks will be collected. If B<IgnoreSelected>
+is set to B<true>, all disks are collected B<except> the ones matched.
+
+=back
+
 =head2 Plugin C<snmp>
 
 Since the configuration of the C<snmp plugin> is a little more complicated than
@@ -7010,6 +7044,20 @@ Service name or port number to connect to. Defaults to C<5555>.
 Specify the protocol to use when communicating with I<Riemann>. Defaults to
 B<UDP>.
 
+=item B<Batch> B<true>|B<false>
+
+If set to B<true> and B<Protocol> is set to B<TCP>,
+events will be batched in memory and flushed at
+regular intervals or when B<BatchMaxSize> is exceeded.
+
+Notifications are not batched and sent as soon as possible.
+
+Defaults to false
+
+=item B<BatchMaxSize> I<size>
+
+Maximum payload size for a riemann packet. Defaults to 8192
+
 =item B<StoreRates> B<true>|B<false>
 
 If set to B<true> (the default), convert counter values to rates. If set to
index 8c4d6e6..9cba913 100644 (file)
@@ -33,6 +33,7 @@
 
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <netdb.h>
 
 #include <pthread.h>
@@ -191,7 +192,7 @@ static int change_basedir (const char *orig_dir)
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
-       
+
        dirlen = strlen (dir);
        while ((dirlen > 0) && (dir[dirlen - 1] == '/'))
                dir[--dirlen] = '\0';
@@ -270,7 +271,7 @@ static void update_kstat (void)
 static void exit_usage (int status)
 {
        printf ("Usage: "PACKAGE" [OPTIONS]\n\n"
-                       
+
                        "Available options:\n"
                        "  General:\n"
                        "    -C <file>       Configuration file.\n"
@@ -413,6 +414,72 @@ static int pidfile_remove (void)
 } /* static int pidfile_remove (const char *file) */
 #endif /* COLLECT_DAEMON */
 
+int notify_upstart (void)
+{
+    const char  *upstart_job = getenv("UPSTART_JOB");
+
+    if (upstart_job == NULL)
+        return 0;
+
+    if (strcmp(upstart_job, "collectd") != 0)
+        return 0;
+
+    WARNING ("supervised by upstart, will stop to signal readyness");
+    raise(SIGSTOP);
+    unsetenv("UPSTART_JOB");
+
+    return 1;
+}
+
+int notify_systemd (void)
+{
+    int                  fd = -1;
+    const char          *notifysocket = getenv("NOTIFY_SOCKET");
+    struct sockaddr_un   su;
+    struct iovec         iov;
+    struct msghdr        hdr;
+
+    if (notifysocket == NULL)
+        return 0;
+
+    if ((strchr("@/", notifysocket[0])) == NULL ||
+        strlen(notifysocket) < 2)
+        return 0;
+
+    WARNING ("supervised by systemd, will signal readyness");
+    if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) < 0) {
+        WARNING ("cannot contact systemd socket %s", notifysocket);
+        return 0;
+    }
+
+    bzero(&su, sizeof(su));
+    su.sun_family = AF_UNIX;
+    sstrncpy (su.sun_path, notifysocket, sizeof(su.sun_path));
+
+    if (notifysocket[0] == '@')
+        su.sun_path[0] = 0;
+
+    bzero(&iov, sizeof(iov));
+    iov.iov_base = "READY=1";
+    iov.iov_len = strlen("READY=1");
+
+    bzero(&hdr, sizeof(hdr));
+    hdr.msg_name = &su;
+    hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
+        strlen(notifysocket);
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+
+    unsetenv("NOTIFY_SOCKET");
+    if (sendmsg(fd, &hdr, MSG_NOSIGNAL) < 0) {
+        WARNING ("cannot send notification to systemd");
+        close(fd);
+        return 0;
+    }
+    close(fd);
+    return 1;
+}
+
 int main (int argc, char **argv)
 {
        struct sigaction sig_int_action;
@@ -529,7 +596,11 @@ int main (int argc, char **argv)
        sig_chld_action.sa_handler = SIG_IGN;
        sigaction (SIGCHLD, &sig_chld_action, NULL);
 
-       if (daemonize)
+    /*
+     * Only daemonize if we're not being supervised
+     * by upstart or systemd.
+     */
+       if (daemonize && notify_upstart() == 0 && notify_systemd() == 0)
        {
                if ((pid = fork ()) == -1)
                {
diff --git a/src/smart.c b/src/smart.c
new file mode 100644 (file)
index 0000000..3b113bd
--- /dev/null
@@ -0,0 +1,269 @@
+/**
+ * collectd - src/smart.c
+ * Copyright (C) 2014       Vincent Bernat
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Vincent Bernat <vbe at exoscale.ch>
+ **/
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+#include "utils_ignorelist.h"
+
+#include <atasmart.h>
+#include <libudev.h>
+
+static const char *config_keys[] =
+{
+  "Disk",
+  "IgnoreSelected"
+};
+
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+static ignorelist_t *ignorelist = NULL;
+
+static int smart_config (const char *key, const char *value)
+{
+  if (ignorelist == NULL)
+    ignorelist = ignorelist_create (/* invert = */ 1);
+  if (ignorelist == NULL)
+    return (1);
+
+  if (strcasecmp ("Disk", key) == 0)
+  {
+    ignorelist_add (ignorelist, value);
+  }
+  else if (strcasecmp ("IgnoreSelected", key) == 0)
+  {
+    int invert = 1;
+    if (IS_TRUE (value))
+      invert = 0;
+    ignorelist_set_invert (ignorelist, invert);
+  }
+  else
+  {
+    return (-1);
+  }
+
+  return (0);
+} /* int smart_config */
+
+static void smart_submit (const char *dev, char *type, char *type_inst, double value)
+{
+       value_t values[1];
+       value_list_t vl = VALUE_LIST_INIT;
+
+       values[0].gauge = value;
+
+       vl.values = values;
+       vl.values_len = 1;
+       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+       sstrncpy (vl.plugin, "smart", sizeof (vl.plugin));
+       sstrncpy (vl.plugin_instance, dev, sizeof (vl.plugin_instance));
+       sstrncpy (vl.type, type, sizeof (vl.type));
+       sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+
+       plugin_dispatch_values (&vl);
+}
+
+static void smart_handle_disk_attribute(SkDisk *d, const SkSmartAttributeParsedData *a,
+                                        void* userdata)
+{
+  const char *dev = userdata;
+  value_t values[4];
+  value_list_t vl = VALUE_LIST_INIT;
+
+  if (!a->current_value_valid || !a->worst_value_valid) return;
+  values[0].gauge = a->current_value;
+  values[1].gauge = a->worst_value;
+  values[2].gauge = a->threshold_valid?a->threshold:0;
+  values[3].gauge = a->pretty_value;
+
+  vl.values = values;
+  vl.values_len = 4;
+  sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+  sstrncpy (vl.plugin, "smart", sizeof (vl.plugin));
+  sstrncpy (vl.plugin_instance, dev, sizeof (vl.plugin_instance));
+  sstrncpy (vl.type, "smart_attribute", sizeof (vl.type));
+  sstrncpy (vl.type_instance, a->name, sizeof (vl.type_instance));
+
+  plugin_dispatch_values (&vl);
+
+  if (a->threshold_valid && a->current_value <= a->threshold)
+  {
+    notification_t notif = { NOTIF_WARNING,
+                             cdtime (),
+                             "",
+                             "",
+                             "smart", "",
+                             "smart_attribute",
+                             "",
+                             NULL };
+    sstrncpy (notif.host, hostname_g, sizeof (notif.host));
+    sstrncpy (notif.plugin_instance, dev, sizeof (notif.plugin_instance));
+    sstrncpy (notif.type_instance, a->name, sizeof (notif.type_instance));
+    ssnprintf (notif.message, sizeof (notif.message),
+               "attribute %s is below allowed threshold (%d < %d)",
+               a->name, a->current_value, a->threshold);
+    plugin_dispatch_notification (&notif);
+  }
+}
+
+static void smart_handle_disk (const char *dev)
+{
+  SkDisk *d = NULL;
+  SkBool awake = FALSE;
+  SkBool available = FALSE;
+  const char *shortname;
+  const SkSmartParsedData *spd;
+  uint64_t poweron, powercycles, badsectors, temperature;
+
+  shortname = strrchr(dev, '/');
+  if (!shortname) return;
+  shortname++;
+  if (ignorelist_match (ignorelist, shortname) != 0) {
+    DEBUG ("smart plugin: ignoring %s.", dev);
+    return;
+  }
+
+  DEBUG ("smart plugin: checking SMART status of %s.",
+         dev);
+
+  if (sk_disk_open (dev, &d) < 0)
+  {
+    ERROR ("smart plugin: unable to open %s.", dev);
+    return;
+  }
+  if (sk_disk_identify_is_available (d, &available) < 0 || !available)
+  {
+    DEBUG ("smart plugin: disk %s cannot be identified.", dev);
+    goto end;
+  }
+  if (sk_disk_smart_is_available (d, &available) < 0 || !available)
+  {
+    DEBUG ("smart plugin: disk %s has no SMART support.", dev);
+    goto end;
+  }
+  if (sk_disk_check_sleep_mode (d, &awake) < 0 || !awake)
+  {
+    DEBUG ("smart plugin: disk %s is sleeping.", dev);
+    goto end;
+  }
+  if (sk_disk_smart_read_data (d) < 0)
+  {
+    ERROR ("smart plugin: unable to get SMART data for disk %s.", dev);
+    goto end;
+  }
+  if (sk_disk_smart_parse (d, &spd) < 0)
+  {
+    ERROR ("smart plugin: unable to parse SMART data for disk %s.", dev);
+    goto end;
+  }
+
+  /* Get some specific values */
+  if (sk_disk_smart_get_power_on (d, &poweron) < 0)
+  {
+    WARNING ("smart plugin: unable to get milliseconds since power on for %s.",
+             dev);
+  }
+  else
+    smart_submit (shortname, "smart_poweron", "", poweron / 1000.);
+
+  if (sk_disk_smart_get_power_cycle (d, &powercycles) < 0)
+  {
+    WARNING ("smart plugin: unable to get number of power cycles for %s.",
+             dev);
+  }
+  else
+    smart_submit (shortname, "smart_powercycles", "", powercycles);
+
+  if (sk_disk_smart_get_bad (d, &badsectors) < 0)
+  {
+    WARNING ("smart plugin: unable to get number of bad sectors for %s.",
+             dev);
+  }
+  else
+    smart_submit (shortname, "smart_badsectors", "", badsectors);
+
+  if (sk_disk_smart_get_temperature (d, &temperature) < 0)
+  {
+    WARNING ("smart plugin: unable to get temperature for %s.",
+             dev);
+  }
+  else
+    smart_submit (shortname, "smart_temperature", "", temperature / 1000. - 273.15);
+
+  /* Grab all attributes */
+  if (sk_disk_smart_parse_attributes(d, smart_handle_disk_attribute,
+                                     (char *)shortname) < 0)
+  {
+    ERROR ("smart plugin: unable to handle SMART attributes for %s.",
+           dev);
+  }
+
+end:
+  sk_disk_free(d);
+}
+
+static int smart_read (void)
+{
+  struct udev *handle_udev;
+  struct udev_enumerate *enumerate;
+  struct udev_list_entry *devices, *dev_list_entry;
+  struct udev_device *dev;
+
+  /* Use udev to get a list of disks */
+  handle_udev = udev_new();
+  if (!handle_udev)
+  {
+    ERROR ("smart plugin: unable to initialize udev.");
+    return (-1);
+  }
+  enumerate = udev_enumerate_new (handle_udev);
+  udev_enumerate_add_match_subsystem (enumerate, "block");
+  udev_enumerate_add_match_property (enumerate, "DEVTYPE", "disk");
+  udev_enumerate_scan_devices (enumerate);
+  devices = udev_enumerate_get_list_entry (enumerate);
+  udev_list_entry_foreach (dev_list_entry, devices)
+  {
+    const char *path, *devpath;
+    path = udev_list_entry_get_name (dev_list_entry);
+    dev = udev_device_new_from_syspath (handle_udev, path);
+    devpath = udev_device_get_devnode (dev);
+
+    /* Query status with libatasmart */
+    smart_handle_disk (devpath);
+  }
+
+  udev_enumerate_unref (enumerate);
+  udev_unref (handle_udev);
+
+  return (0);
+} /* int smart_read */
+
+void module_register (void)
+{
+  plugin_register_config ("smart", smart_config,
+                          config_keys, config_keys_num);
+  plugin_register_read ("smart", smart_read);
+} /* void module_register */
index 64137b0..ec34bd4 100644 (file)
@@ -168,6 +168,11 @@ serial_octets              rx:DERIVE:0:U, tx:DERIVE:0:U
 signal_noise           value:GAUGE:U:0
 signal_power           value:GAUGE:U:0
 signal_quality         value:GAUGE:0:U
+smart_poweron          value:GAUGE:0:U
+smart_powercycles      value:GAUGE:0:U
+smart_badsectors       value:GAUGE:0:U
+smart_temperature      value:GAUGE:-300:300
+smart_attribute         current:GAUGE:0:255, worst:GAUGE:0:255, threshold:GAUGE:0:255, pretty:GAUGE:0:U
 snr                    value:GAUGE:0:U
 spam_check             value:GAUGE:0:U
 spam_score             value:GAUGE:U:U
index c3740e1..0a8df6f 100644 (file)
 #define RIEMANN_HOST           "localhost"
 #define RIEMANN_PORT           "5555"
 #define RIEMANN_TTL_FACTOR      2.0
+#define RIEMANN_BATCH_MAX      8192
 
 int write_riemann_threshold_check(const data_set_t *, const value_list_t *, int *);
 
 struct riemann_host {
        char                    *name;
        char                    *event_service_prefix;
-#define F_CONNECT               0x01
+#define F_CONNECT       0x01
        uint8_t                  flags;
-       pthread_mutex_t          lock;
-       _Bool                    notifications;
-       _Bool                    check_thresholds;
+       pthread_mutex_t  lock;
+    _Bool            batch_mode;
+       _Bool            notifications;
+       _Bool            check_thresholds;
        _Bool                    store_rates;
        _Bool                    always_append_ds;
        char                    *node;
        char                    *service;
        _Bool                    use_tcp;
-       int                      s;
+       int                          s;
        double                   ttl_factor;
-
-       int                      reference_count;
+    Msg             *batch_msg;
+    cdtime_t         batch_init;
+    int              batch_max;
+       int                          reference_count;
 };
 
 static char    **riemann_tags;
@@ -651,6 +655,103 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /*
        return (msg);
 } /* }}} Msg *riemann_value_list_to_protobuf */
 
+
+/*
+ * Always call while holding host->lock !
+ */
+static int riemann_batch_flush_nolock (cdtime_t timeout,
+                                       struct riemann_host *host)
+{
+    cdtime_t    now;
+    int         status = 0;
+
+    if (timeout > 0) {
+        now = cdtime ();
+        if ((host->batch_init + timeout) > now)
+            return status;
+    }
+    riemann_send_msg(host, host->batch_msg);
+    riemann_msg_protobuf_free(host->batch_msg);
+
+       if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0))
+        riemann_disconnect (host);
+
+    host->batch_init = cdtime();
+    host->batch_msg = NULL;
+    return status;
+}
+
+static int riemann_batch_flush (cdtime_t timeout,
+        const char *identifier __attribute__((unused)),
+        user_data_t *user_data)
+{
+    struct riemann_host *host;
+    int status;
+
+    if (user_data == NULL)
+        return (-EINVAL);
+
+    host = user_data->data;
+    pthread_mutex_lock (&host->lock);
+    status = riemann_batch_flush_nolock (timeout, host);
+    if (status != 0)
+        ERROR ("write_riemann plugin: riemann_send failed with status %i",
+               status);
+
+    pthread_mutex_unlock(&host->lock);
+    return status;
+}
+
+static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */
+                                         data_set_t const *ds,
+                                         value_list_t const *vl,
+                                         int *statuses)
+{
+       size_t i;
+    Event **events;
+    Msg *msg;
+    size_t len;
+    int ret;
+
+    msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+    if (msg == NULL)
+        return -1;
+
+    pthread_mutex_lock(&host->lock);
+
+    if (host->batch_msg == NULL) {
+        host->batch_msg = msg;
+    } else {
+        len = msg->n_events + host->batch_msg->n_events;
+        events = realloc(host->batch_msg->events,
+                         (len * sizeof(*host->batch_msg->events)));
+        if (events == NULL) {
+            pthread_mutex_unlock(&host->lock);
+            ERROR ("write_riemann plugin: out of memory");
+            riemann_msg_protobuf_free (msg);
+            return -1;
+        }
+        host->batch_msg->events = events;
+
+        for (i = host->batch_msg->n_events; i < len; i++)
+            host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events];
+
+        host->batch_msg->n_events = len;
+        sfree (msg->events);
+        msg->n_events = 0;
+        sfree (msg);
+    }
+
+       len = msg__get_packed_size(host->batch_msg);
+    ret = 0;
+    if (len >= host->batch_max) {
+        ret = riemann_batch_flush_nolock(0, host);
+    }
+
+    pthread_mutex_unlock(&host->lock);
+    return ret;
+} /* }}} Msg *riemann_batch_add_value_list */
+
 static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
 {
        int                      status;
@@ -660,6 +761,9 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{
        if (!host->notifications)
                return 0;
 
+    /*
+     * Never batch for notifications, send them ASAP
+     */
        msg = riemann_notification_to_protobuf (host, n);
        if (msg == NULL)
                return (-1);
@@ -677,23 +781,32 @@ static int riemann_write(const data_set_t *ds, /* {{{ */
              const value_list_t *vl,
              user_data_t *ud)
 {
-       int                      status;
+       int                      status = 0;
        int                      statuses[vl->values_len];
        struct riemann_host     *host = ud->data;
        Msg                     *msg;
 
        if (host->check_thresholds)
                write_riemann_threshold_check(ds, vl, statuses);
-       msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
-       if (msg == NULL)
-               return (-1);
 
-       status = riemann_send (host, msg);
-       if (status != 0)
-               ERROR ("write_riemann plugin: riemann_send failed with status %i",
-                               status);
+    if (host->use_tcp == 1 && host->batch_mode) {
 
-       riemann_msg_protobuf_free (msg);
+        riemann_batch_add_value_list (host, ds, vl, statuses);
+
+
+    } else {
+
+        msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+        if (msg == NULL)
+            return (-1);
+
+        status = riemann_send (host, msg);
+        if (status != 0)
+            ERROR ("write_riemann plugin: riemann_send failed with status %i",
+                   status);
+
+        riemann_msg_protobuf_free (msg);
+    }
        return status;
 } /* }}} int riemann_write */
 
@@ -742,6 +855,9 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
        host->store_rates = 1;
        host->always_append_ds = 0;
        host->use_tcp = 0;
+    host->batch_mode = 0;
+    host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
+    host->batch_init = cdtime();
        host->ttl_factor = RIEMANN_TTL_FACTOR;
 
        status = cf_util_get_string (ci, &host->name);
@@ -775,6 +891,14 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
                        status = cf_util_get_boolean(child, &host->check_thresholds);
                        if (status != 0)
                                break;
+        } else if (strcasecmp ("Batch", child->key) == 0) {
+            status = cf_util_get_boolean(child, &host->batch_mode);
+            if (status != 0)
+                break;
+        } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
+            status = cf_util_get_int(child, &host->batch_max);
+            if (status != 0)
+                break;
                } else if (strcasecmp ("Port", child->key) == 0) {
                        status = cf_util_get_service (child, &host->service);
                        if (status != 0) {
@@ -859,6 +983,11 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
        pthread_mutex_lock (&host->lock);
 
        status = plugin_register_write (callback_name, riemann_write, &ud);
+
+    if (host->use_tcp == 1 && host->batch_mode) {
+        ud.free_func = NULL;
+        plugin_register_flush(callback_name, riemann_batch_flush, &ud);
+    }
        if (status != 0)
                WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
                                "failed with status %i.",