Merge pull request #797 from vincentbernat/feature/libatasmart
authorPierre-Yves Ritschard <pyr@spootnik.org>
Sat, 15 Nov 2014 09:49:32 +0000 (10:49 +0100)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Sat, 15 Nov 2014 09:49:32 +0000 (10:49 +0100)
smart: add a SMART plugin

src/Makefile.am
src/collectd.conf.pod
src/daemon/Makefile.am
src/daemon/collectd.c
src/libcollectdclient/Makefile.am
src/libcollectdclient/network_buffer.c
src/network.c
src/write_riemann.c

index 9ffcdbc..74c5007 100644 (file)
@@ -30,7 +30,7 @@ collectdmon_SOURCES = collectdmon.c
 collectdmon_CPPFLAGS = $(AM_CPPFLAGS)
 
 collectd_nagios_SOURCES = collectd-nagios.c
-collectd_nagios_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_builddir)/src/libcollectdclient/collectd
+collectd_nagios_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_srcdir)/src/libcollectdclient/collectd -I$(top_builddir)/src/libcollectdclient/collectd
 collectd_nagios_LDADD =
 if BUILD_WITH_LIBSOCKET
 collectd_nagios_LDADD += -lsocket
@@ -44,7 +44,7 @@ collectd_nagios_DEPENDENCIES = libcollectdclient/libcollectdclient.la
 
 
 collectdctl_SOURCES = collectdctl.c
-collectdctl_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_builddir)/src/libcollectdclient/collectd
+collectdctl_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_srcdir)/src/libcollectdclient/collectd -I$(top_builddir)/src/libcollectdclient/collectd
 collectdctl_LDADD =
 if BUILD_WITH_LIBSOCKET
 collectdctl_LDADD += -lsocket
@@ -57,7 +57,7 @@ collectdctl_DEPENDENCIES = libcollectdclient/libcollectdclient.la
 
 collectd_tg_SOURCES = collectd-tg.c \
                      daemon/utils_heap.c daemon/utils_heap.h
-collectd_tg_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_builddir)/src/libcollectdclient/collectd
+collectd_tg_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_srcdir)/src/libcollectdclient/collectd -I$(top_builddir)/src/libcollectdclient/collectd
 collectd_tg_LDADD =
 if BUILD_WITH_LIBSOCKET
 collectd_tg_LDADD += -lsocket
@@ -85,7 +85,7 @@ pkglib_LTLIBRARIES += aggregation.la
 aggregation_la_SOURCES = aggregation.c \
                          utils_vl_lookup.c utils_vl_lookup.h
 aggregation_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-aggregation_la_LIBADD =
+aggregation_la_LIBADD = -lm
 endif
 
 if BUILD_PLUGIN_AMQP
index 81c1ac7..7da36b8 100644 (file)
@@ -9,17 +9,17 @@ collectd.conf - Configuration for the system statistics collection daemon B<coll
   BaseDir "/var/lib/collectd"
   PIDFile "/run/collectd.pid"
   Interval 10.0
-  
+
   LoadPlugin cpu
   LoadPlugin load
-  
+
   <LoadPlugin df>
     Interval 3600
   </LoadPlugin>
   <Plugin df>
     ValuesPercentage true
   </Plugin>
-  
+
   LoadPlugin ping
   <Plugin ping>
     Host "example.org"
@@ -7044,6 +7044,20 @@ Service name or port number to connect to. Defaults to C<5555>.
 Specify the protocol to use when communicating with I<Riemann>. Defaults to
 B<UDP>.
 
+=item B<Batch> B<true>|B<false>
+
+If set to B<true> and B<Protocol> is set to B<TCP>,
+events will be batched in memory and flushed at
+regular intervals or when B<BatchMaxSize> is exceeded.
+
+Notifications are not batched and sent as soon as possible.
+
+Defaults to false
+
+=item B<BatchMaxSize> I<size>
+
+Maximum payload size for a riemann packet. Defaults to 8192
+
 =item B<StoreRates> B<true>|B<false>
 
 If set to B<true> (the default), convert counter values to rates. If set to
index d6067d7..fc81554 100644 (file)
@@ -2,7 +2,8 @@ if COMPILER_IS_GCC
 AM_CFLAGS = -Wall -Werror
 endif
 
-AM_CPPFLAGS = -DPREFIX='"${prefix}"'
+AM_CPPFLAGS = -I$(top_srcdir)/src
+AM_CPPFLAGS += -DPREFIX='"${prefix}"'
 AM_CPPFLAGS += -DCONFIGFILE='"${sysconfdir}/${PACKAGE_NAME}.conf"'
 AM_CPPFLAGS += -DLOCALSTATEDIR='"${localstatedir}"'
 AM_CPPFLAGS += -DPKGLOCALSTATEDIR='"${localstatedir}/lib/${PACKAGE_NAME}"'
index 2e2d821..eb5404f 100644 (file)
@@ -33,6 +33,7 @@
 
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <netdb.h>
 
 #include <pthread.h>
@@ -191,7 +192,7 @@ static int change_basedir (const char *orig_dir)
                                sstrerror (errno, errbuf, sizeof (errbuf)));
                return (-1);
        }
-       
+
        dirlen = strlen (dir);
        while ((dirlen > 0) && (dir[dirlen - 1] == '/'))
                dir[--dirlen] = '\0';
@@ -270,7 +271,7 @@ static void update_kstat (void)
 static void exit_usage (int status)
 {
        printf ("Usage: "PACKAGE" [OPTIONS]\n\n"
-                       
+
                        "Available options:\n"
                        "  General:\n"
                        "    -C <file>       Configuration file.\n"
@@ -409,6 +410,72 @@ static int pidfile_remove (void)
 } /* static int pidfile_remove (const char *file) */
 #endif /* COLLECT_DAEMON */
 
+int notify_upstart (void)
+{
+    const char  *upstart_job = getenv("UPSTART_JOB");
+
+    if (upstart_job == NULL)
+        return 0;
+
+    if (strcmp(upstart_job, "collectd") != 0)
+        return 0;
+
+    WARNING ("supervised by upstart, will stop to signal readyness");
+    raise(SIGSTOP);
+    unsetenv("UPSTART_JOB");
+
+    return 1;
+}
+
+int notify_systemd (void)
+{
+    int                  fd = -1;
+    const char          *notifysocket = getenv("NOTIFY_SOCKET");
+    struct sockaddr_un   su;
+    struct iovec         iov;
+    struct msghdr        hdr;
+
+    if (notifysocket == NULL)
+        return 0;
+
+    if ((strchr("@/", notifysocket[0])) == NULL ||
+        strlen(notifysocket) < 2)
+        return 0;
+
+    WARNING ("supervised by systemd, will signal readyness");
+    if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) < 0) {
+        WARNING ("cannot contact systemd socket %s", notifysocket);
+        return 0;
+    }
+
+    bzero(&su, sizeof(su));
+    su.sun_family = AF_UNIX;
+    sstrncpy (su.sun_path, notifysocket, sizeof(su.sun_path));
+
+    if (notifysocket[0] == '@')
+        su.sun_path[0] = 0;
+
+    bzero(&iov, sizeof(iov));
+    iov.iov_base = "READY=1";
+    iov.iov_len = strlen("READY=1");
+
+    bzero(&hdr, sizeof(hdr));
+    hdr.msg_name = &su;
+    hdr.msg_namelen = offsetof(struct sockaddr_un, sun_path) +
+        strlen(notifysocket);
+    hdr.msg_iov = &iov;
+    hdr.msg_iovlen = 1;
+
+    unsetenv("NOTIFY_SOCKET");
+    if (sendmsg(fd, &hdr, MSG_NOSIGNAL) < 0) {
+        WARNING ("cannot send notification to systemd");
+        close(fd);
+        return 0;
+    }
+    close(fd);
+    return 1;
+}
+
 int main (int argc, char **argv)
 {
        struct sigaction sig_int_action;
@@ -525,7 +592,11 @@ int main (int argc, char **argv)
        sig_chld_action.sa_handler = SIG_IGN;
        sigaction (SIGCHLD, &sig_chld_action, NULL);
 
-       if (daemonize)
+    /*
+     * Only daemonize if we're not being supervised
+     * by upstart or systemd.
+     */
+       if (daemonize && notify_upstart() == 0 && notify_systemd() == 0)
        {
                if ((pid = fork ()) == -1)
                {
index 2fc3152..5abee2f 100644 (file)
@@ -11,7 +11,10 @@ nodist_pkgconfig_DATA = libcollectdclient.pc
 BUILT_SOURCES = collectd/lcc_features.h
 
 libcollectdclient_la_SOURCES = client.c network.c network_buffer.c
-libcollectdclient_la_CPPFLAGS = $(AM_CPPFLAGS) -I$(top_builddir)/src/libcollectdclient/collectd -I$(top_srcdir)/src/daemon
+libcollectdclient_la_CPPFLAGS = $(AM_CPPFLAGS) \
+                               -I$(top_srcdir)/src/libcollectdclient/collectd \
+                               -I$(top_builddir)/src/libcollectdclient/collectd \
+                               -I$(top_srcdir)/src/daemon
 libcollectdclient_la_LDFLAGS = -version-info 1:0:0
 libcollectdclient_la_LIBADD = 
 if BUILD_WITH_LIBGCRYPT
index 7b06620..61c7c22 100644 (file)
@@ -54,7 +54,9 @@
 /* Re enable deprecation warnings */
 #  pragma GCC diagnostic warning "-Wdeprecated-declarations"
 # endif
+# if GCRYPT_VERSION_NUMBER < 0x010600
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
+# endif
 #endif
 
 #include "collectd/network_buffer.h"
@@ -131,7 +133,9 @@ static _Bool have_gcrypt (void) /* {{{ */
   need_init = 0;
 
 #if HAVE_LIBGCRYPT
+# if GCRYPT_VERSION_NUMBER < 0x010600
   gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+# endif
 
   if (!gcry_check_version (GCRYPT_VERSION))
     return (0);
index 9fbba96..551bd5c 100644 (file)
@@ -77,7 +77,9 @@
 /* Re enable deprecation warnings */
 #  pragma GCC diagnostic warning "-Wdeprecated-declarations"
 # endif
+# if GCRYPT_VERSION_NUMBER < 0x010600
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
+# endif
 #endif
 
 #ifndef IPV6_ADD_MEMBERSHIP
@@ -511,7 +513,9 @@ static void network_init_gcrypt (void) /* {{{ */
   * above doesn't count, as it doesn't implicitly initalize Libgcrypt.
   *
   * tl;dr: keep all these gry_* statements in this exact order please. */
+# if GCRYPT_VERSION_NUMBER < 0x010600
   gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+# endif
   gcry_check_version (NULL);
   gcry_control (GCRYCTL_INIT_SECMEM, 32768);
   gcry_control (GCRYCTL_INITIALIZATION_FINISHED);
index c3740e1..0a8df6f 100644 (file)
 #define RIEMANN_HOST           "localhost"
 #define RIEMANN_PORT           "5555"
 #define RIEMANN_TTL_FACTOR      2.0
+#define RIEMANN_BATCH_MAX      8192
 
 int write_riemann_threshold_check(const data_set_t *, const value_list_t *, int *);
 
 struct riemann_host {
        char                    *name;
        char                    *event_service_prefix;
-#define F_CONNECT               0x01
+#define F_CONNECT       0x01
        uint8_t                  flags;
-       pthread_mutex_t          lock;
-       _Bool                    notifications;
-       _Bool                    check_thresholds;
+       pthread_mutex_t  lock;
+    _Bool            batch_mode;
+       _Bool            notifications;
+       _Bool            check_thresholds;
        _Bool                    store_rates;
        _Bool                    always_append_ds;
        char                    *node;
        char                    *service;
        _Bool                    use_tcp;
-       int                      s;
+       int                          s;
        double                   ttl_factor;
-
-       int                      reference_count;
+    Msg             *batch_msg;
+    cdtime_t         batch_init;
+    int              batch_max;
+       int                          reference_count;
 };
 
 static char    **riemann_tags;
@@ -651,6 +655,103 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /*
        return (msg);
 } /* }}} Msg *riemann_value_list_to_protobuf */
 
+
+/*
+ * Always call while holding host->lock !
+ */
+static int riemann_batch_flush_nolock (cdtime_t timeout,
+                                       struct riemann_host *host)
+{
+    cdtime_t    now;
+    int         status = 0;
+
+    if (timeout > 0) {
+        now = cdtime ();
+        if ((host->batch_init + timeout) > now)
+            return status;
+    }
+    riemann_send_msg(host, host->batch_msg);
+    riemann_msg_protobuf_free(host->batch_msg);
+
+       if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0))
+        riemann_disconnect (host);
+
+    host->batch_init = cdtime();
+    host->batch_msg = NULL;
+    return status;
+}
+
+static int riemann_batch_flush (cdtime_t timeout,
+        const char *identifier __attribute__((unused)),
+        user_data_t *user_data)
+{
+    struct riemann_host *host;
+    int status;
+
+    if (user_data == NULL)
+        return (-EINVAL);
+
+    host = user_data->data;
+    pthread_mutex_lock (&host->lock);
+    status = riemann_batch_flush_nolock (timeout, host);
+    if (status != 0)
+        ERROR ("write_riemann plugin: riemann_send failed with status %i",
+               status);
+
+    pthread_mutex_unlock(&host->lock);
+    return status;
+}
+
+static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */
+                                         data_set_t const *ds,
+                                         value_list_t const *vl,
+                                         int *statuses)
+{
+       size_t i;
+    Event **events;
+    Msg *msg;
+    size_t len;
+    int ret;
+
+    msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+    if (msg == NULL)
+        return -1;
+
+    pthread_mutex_lock(&host->lock);
+
+    if (host->batch_msg == NULL) {
+        host->batch_msg = msg;
+    } else {
+        len = msg->n_events + host->batch_msg->n_events;
+        events = realloc(host->batch_msg->events,
+                         (len * sizeof(*host->batch_msg->events)));
+        if (events == NULL) {
+            pthread_mutex_unlock(&host->lock);
+            ERROR ("write_riemann plugin: out of memory");
+            riemann_msg_protobuf_free (msg);
+            return -1;
+        }
+        host->batch_msg->events = events;
+
+        for (i = host->batch_msg->n_events; i < len; i++)
+            host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events];
+
+        host->batch_msg->n_events = len;
+        sfree (msg->events);
+        msg->n_events = 0;
+        sfree (msg);
+    }
+
+       len = msg__get_packed_size(host->batch_msg);
+    ret = 0;
+    if (len >= host->batch_max) {
+        ret = riemann_batch_flush_nolock(0, host);
+    }
+
+    pthread_mutex_unlock(&host->lock);
+    return ret;
+} /* }}} Msg *riemann_batch_add_value_list */
+
 static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
 {
        int                      status;
@@ -660,6 +761,9 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{
        if (!host->notifications)
                return 0;
 
+    /*
+     * Never batch for notifications, send them ASAP
+     */
        msg = riemann_notification_to_protobuf (host, n);
        if (msg == NULL)
                return (-1);
@@ -677,23 +781,32 @@ static int riemann_write(const data_set_t *ds, /* {{{ */
              const value_list_t *vl,
              user_data_t *ud)
 {
-       int                      status;
+       int                      status = 0;
        int                      statuses[vl->values_len];
        struct riemann_host     *host = ud->data;
        Msg                     *msg;
 
        if (host->check_thresholds)
                write_riemann_threshold_check(ds, vl, statuses);
-       msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
-       if (msg == NULL)
-               return (-1);
 
-       status = riemann_send (host, msg);
-       if (status != 0)
-               ERROR ("write_riemann plugin: riemann_send failed with status %i",
-                               status);
+    if (host->use_tcp == 1 && host->batch_mode) {
 
-       riemann_msg_protobuf_free (msg);
+        riemann_batch_add_value_list (host, ds, vl, statuses);
+
+
+    } else {
+
+        msg = riemann_value_list_to_protobuf (host, ds, vl, statuses);
+        if (msg == NULL)
+            return (-1);
+
+        status = riemann_send (host, msg);
+        if (status != 0)
+            ERROR ("write_riemann plugin: riemann_send failed with status %i",
+                   status);
+
+        riemann_msg_protobuf_free (msg);
+    }
        return status;
 } /* }}} int riemann_write */
 
@@ -742,6 +855,9 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
        host->store_rates = 1;
        host->always_append_ds = 0;
        host->use_tcp = 0;
+    host->batch_mode = 0;
+    host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
+    host->batch_init = cdtime();
        host->ttl_factor = RIEMANN_TTL_FACTOR;
 
        status = cf_util_get_string (ci, &host->name);
@@ -775,6 +891,14 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
                        status = cf_util_get_boolean(child, &host->check_thresholds);
                        if (status != 0)
                                break;
+        } else if (strcasecmp ("Batch", child->key) == 0) {
+            status = cf_util_get_boolean(child, &host->batch_mode);
+            if (status != 0)
+                break;
+        } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
+            status = cf_util_get_int(child, &host->batch_max);
+            if (status != 0)
+                break;
                } else if (strcasecmp ("Port", child->key) == 0) {
                        status = cf_util_get_service (child, &host->service);
                        if (status != 0) {
@@ -859,6 +983,11 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
        pthread_mutex_lock (&host->lock);
 
        status = plugin_register_write (callback_name, riemann_write, &ud);
+
+    if (host->use_tcp == 1 && host->batch_mode) {
+        ud.free_func = NULL;
+        plugin_register_flush(callback_name, riemann_batch_flush, &ud);
+    }
        if (status != 0)
                WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
                                "failed with status %i.",