Merge branch 'master' into connectivity
authorAndrew Bays <andrew.bays@gmail.com>
Mon, 2 Jul 2018 10:41:44 +0000 (06:41 -0400)
committerGitHub <noreply@github.com>
Mon, 2 Jul 2018 10:41:44 +0000 (06:41 -0400)
AUTHORS
Makefile.am
README
configure.ac
contrib/redhat/collectd.spec
src/collectd.conf.in
src/collectd.conf.pod
src/connectivity.c [new file with mode: 0644]
src/types.db

diff --git a/AUTHORS b/AUTHORS
index 409655a..bf785f6 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -56,9 +56,15 @@ Amit Gupta <amit.gupta221 at gmail.com>
 Andreas Henriksson <andreas at fatal.se>
  - libmnl support in the netlink plugin.
 
+Andrew Bays <abays at redhat.com>
+ - connectivity plugin.
+
 Andy Parkins <andyp at fussylogic.co.uk>
  - battery plugin: sysfs code.
 
+Aneesh Puttur <aputtur at redhat.com>
+ - connectivity plugin.
+
 Andy Smith <ansmith at redhat.com>
  - AMQP 1.0 plugin.
 
index 190ce8e..6c6b30a 100644 (file)
@@ -661,6 +661,15 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 chrony_la_LIBADD = -lm
 endif
 
+if BUILD_PLUGIN_CONNECTIVITY
+pkglib_LTLIBRARIES += connectivity.la
+connectivity_la_SOURCES = src/connectivity.c
+connectivity_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBMNL_CFLAGS)
+connectivity_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+connectivity_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) $(BUILD_WITH_LIBMNL_LIBS) libignorelist.la
+endif
+
 if BUILD_PLUGIN_CONNTRACK
 pkglib_LTLIBRARIES += conntrack.la
 conntrack_la_SOURCES = src/conntrack.c
diff --git a/README b/README
index 2210b2b..bd577f7 100644 (file)
--- a/README
+++ b/README
@@ -54,6 +54,9 @@ Features
     - chrony
       Chrony daemon statistics: Local clock drift, offset to peers, etc.
 
+    - connectivity
+      Event-based interface status.
+
     - conntrack
       Number of nf_conntrack entries.
 
index 7a14e01..992527d 100644 (file)
@@ -6253,6 +6253,7 @@ plugin_battery="no"
 plugin_bind="no"
 plugin_ceph="no"
 plugin_cgroups="no"
+plugin_connectivity="no"
 plugin_conntrack="no"
 plugin_contextswitch="no"
 plugin_cpu="no"
@@ -6365,6 +6366,10 @@ if test "x$ac_system" = "xLinux"; then
   if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
     plugin_ovs_events="yes"
     plugin_ovs_stats="yes"
+
+    if test "x$with_libmnl" = "xyes"; then
+      plugin_connectivity="yes"
+    fi
   fi
 fi
 
@@ -6665,6 +6670,7 @@ AC_PLUGIN([bind],                [$plugin_bind],            [ISC Bind nameserver
 AC_PLUGIN([ceph],                [$plugin_ceph],            [Ceph daemon statistics])
 AC_PLUGIN([cgroups],             [$plugin_cgroups],         [CGroups CPU usage accounting])
 AC_PLUGIN([chrony],              [yes],                     [Chrony statistics])
+AC_PLUGIN([connectivity],        [$plugin_connectivity],    [Network interface up/down events])
 AC_PLUGIN([conntrack],           [$plugin_conntrack],       [nf_conntrack statistics])
 AC_PLUGIN([contextswitch],       [$plugin_contextswitch],   [context switch statistics])
 AC_PLUGIN([cpu],                 [$plugin_cpu],             [CPU usage statistics])
@@ -7086,6 +7092,7 @@ AC_MSG_RESULT([    bind  . . . . . . . . $enable_bind])
 AC_MSG_RESULT([    ceph  . . . . . . . . $enable_ceph])
 AC_MSG_RESULT([    cgroups . . . . . . . $enable_cgroups])
 AC_MSG_RESULT([    chrony. . . . . . . . $enable_chrony])
+AC_MSG_RESULT([    connectivity. . . . . $enable_connectivity])
 AC_MSG_RESULT([    conntrack . . . . . . $enable_conntrack])
 AC_MSG_RESULT([    contextswitch . . . . $enable_contextswitch])
 AC_MSG_RESULT([    cpu . . . . . . . . . $enable_cpu])
index 6f86b7e..144ff50 100644 (file)
@@ -53,6 +53,7 @@
 %define with_ceph 0%{!?_without_ceph:1}
 %define with_cgroups 0%{!?_without_cgroups:1}
 %define with_chrony 0%{!?_without_chrony:1}
+%define with_connectivity 0%{!?_without_connectivity:1}
 %define with_conntrack 0%{!?_without_conntrack:1}
 %define with_contextswitch 0%{!?_without_contextswitch:1}
 %define with_cpu 0%{!?_without_cpu:1}
 
 # Plugins not buildable on RHEL < 7
 %if 0%{?rhel} && 0%{?rhel} < 7
+%define with_connectivity 0
 %define with_cpusleep 0
 %define with_gps 0
 %define with_mqtt 0
@@ -371,6 +373,16 @@ Requires:      %{name}%{?_isa} = %{version}-%{release}
 Chrony plugin for collectd
 %endif
 
+%if %{with_connectivity}
+%package connectivity
+Summary:       Connectivity plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: libmnl-devel, yajl-devel
+%description connectivity
+Monitors network interface up/down status via netlink library.
+%endif
+
 %if %{with_curl}
 %package curl
 Summary:       Curl plugin for collectd
@@ -1096,6 +1108,12 @@ Collectd utilities
 %define _with_chrony --disable-chrony
 %endif
 
+%if %{with_connectivity}
+%define _with_connectivity --enable-connectivity
+%else
+%define _with_connectivity --disable-connectivity
+%endif
+
 %if %{with_conntrack}
 %define _with_conntrack --enable-conntrack
 %else
@@ -1928,6 +1946,7 @@ Collectd utilities
        %{?_with_ceph} \
        %{?_with_cgroups} \
        %{?_with_chrony} \
+       %{?_with_connectivity} \
        %{?_with_conntrack} \
        %{?_with_contextswitch} \
        %{?_with_cpufreq} \
@@ -2468,6 +2487,11 @@ fi
 %{_libdir}/%{name}/chrony.so
 %endif
 
+%if %{with_connectivity}
+%files connectivity
+%{_libdir}/%{name}/connectivity.so
+%endif
+
 %if %{with_curl}
 %files curl
 %{_libdir}/%{name}/curl.so
index 74b6c88..4fd507a 100644 (file)
 #@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph
 #@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups
 #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony
+#@BUILD_PLUGIN_CONNECTIVITY_TRUE@LoadPlugin connectivity
 #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
 #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch
 @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
 #      Timeout "2"
 #</Plugin>
 
+#<Plugin connectivity>
+#  Interface eth0
+#</Plugin>
+
 #<Plugin cgroups>
 #  CGroup "libvirt"
 #  IgnoreSelected false
index ccc6949..4f12d3b 100644 (file)
@@ -1574,6 +1574,45 @@ Connection timeout in seconds. Defaults to B<2>.
 
 =back
 
+=head2 Plugin Connectivity
+
+connectivity - Documentation of collectd's C<connectivity plugin>
+
+
+  LoadPlugin connectivity
+  # ...
+  <Plugin connectivity>
+    Interface eth0
+  </Plugin>
+
+The C<connectivity plugin> queries interface status using netlink (man 7 netlink) which provides information about network interfaces via the NETLINK_ROUTE family (man 7 rtnetlink). The plugin translates the value it receives to collectd's internal format and, depending on the write plugins you have loaded, it may be written to disk or submitted to another instance.
+The plugin listens to interfaces enumerated within the plugin configuration (see below).  If no interfaces are listed, then the default is for all interfaces to be monitored.
+
+This example shows C<connectivity plugin> monitoring all interfaces.
+LoadPlugin connectivity
+<Plugin connectivity>
+</Plugin>
+
+This example shows C<connectivity plugin> monitoring 2 interfaces, "eth0" and "eth1".
+LoadPlugin connectivity
+<Plugin connectivity>
+  Interface eth0
+  Interface eth1
+</Plugin>
+
+=over 4
+
+=item B<Interface> I<interface_name>
+
+interface(s) to monitor connect to. 
+
+=item I<Status>
+
+If I<Status> is greater than or equal to zero the message indicates interface is up,
+if I<Status> is less than zero the message indicates interface is down. 
+
+=back
+
 =head2 Plugin C<conntrack>
 
 This plugin collects IP conntrack statistics.
diff --git a/src/connectivity.c b/src/connectivity.c
new file mode 100644 (file)
index 0000000..e39bce1
--- /dev/null
@@ -0,0 +1,854 @@
+/**
+ * collectd - src/connectivity.c
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Red Hat NFVPE
+ *     Andrew Bays <abays at redhat.com>
+ *     Aneesh Puttur <aputtur at redhat.com>
+ **/
+
+#include "collectd.h"
+
+#include "common.h"
+#include "plugin.h"
+#include "utils_complain.h"
+#include "utils_ignorelist.h"
+
+#include <asm/types.h>
+#include <errno.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <libmnl/libmnl.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#define HAVE_YAJL_V2 1
+#endif
+
+#define MYPROTO NETLINK_ROUTE
+
+#define LINK_STATE_DOWN 0
+#define LINK_STATE_UP 1
+#define LINK_STATE_UNKNOWN 2
+
+#define CONNECTIVITY_DOMAIN_FIELD "domain"
+#define CONNECTIVITY_DOMAIN_VALUE "stateChange"
+#define CONNECTIVITY_EVENT_ID_FIELD "eventId"
+#define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
+#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
+#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
+#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define CONNECTIVITY_PRIORITY_FIELD "priority"
+#define CONNECTIVITY_PRIORITY_VALUE "high"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
+#define CONNECTIVITY_SEQUENCE_FIELD "sequence"
+#define CONNECTIVITY_SEQUENCE_VALUE "0"
+#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
+#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define CONNECTIVITY_VERSION_FIELD "version"
+#define CONNECTIVITY_VERSION_VALUE "1.0"
+
+#define CONNECTIVITY_NEW_STATE_FIELD "newState"
+#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_OLD_STATE_FIELD "oldState"
+#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
+  "stateChangeFieldsVersion"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
+#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
+
+/*
+ * Private data types
+ */
+struct interface_list_s {
+  char *interface;
+
+  uint32_t status;
+  uint32_t prev_status;
+  uint32_t sent;
+  long long unsigned int timestamp;
+
+  struct interface_list_s *next;
+};
+typedef struct interface_list_s interface_list_t;
+
+/*
+ * Private variables
+ */
+static ignorelist_t *ignorelist = NULL;
+
+static interface_list_t *interface_list_head = NULL;
+static int monitor_all_interfaces = 1;
+
+static int connectivity_thread_loop = 0;
+static int connectivity_thread_error = 0;
+static pthread_t connectivity_thread_id;
+static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
+static struct mnl_socket *sock;
+static int event_id = 0;
+
+static const char *config_keys[] = {"Interface"};
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+
+/*
+ * Private functions
+ */
+
+static int gen_message_payload(int state, int old_state, const char *interface,
+                               long long unsigned int timestamp, char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+  char json_str[DATA_MAX_NAME_LEN];
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {};
+
+  conf.beautify = 0;
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
+                      strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
+                      strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
+                      strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, event_id_len, "%d", event_id);
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
+                      strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int event_name_len = 0;
+  event_name_len = event_name_len + strlen(interface);    // interface name
+  event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
+  event_name_len =
+      event_name_len + 12; // "interface", 2 spaces and null-terminator
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, event_name_len, "interface %s %s", interface,
+           (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+                       : CONNECTIVITY_EVENT_NAME_UP_VALUE));
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int last_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, last_epoch_microsec_len, "%llu",
+           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
+                      strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
+                      strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
+                      strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
+                      strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
+                      strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int start_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  memset(json_str, '\0', DATA_MAX_NAME_LEN);
+  snprintf(json_str, start_epoch_microsec_len, "%llu",
+           (long long unsigned int)timestamp);
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // version
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
+                      strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
+                      strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN state change fields ***
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // newState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
+                      strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int new_state_len =
+      (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
+                  : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(
+          g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
+                                   : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
+          new_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // oldState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
+                      strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int old_state_len =
+      (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
+                      : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(
+          g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
+                                       : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
+          old_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // stateChangeFieldsVersion
+  if (yajl_gen_string(g,
+                      (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // stateInterface
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
+                      strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END state change fields ***
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = malloc(strlen((char *)buf2) + 1);
+
+  if (*buf == NULL) {
+    char errbuf[1024];
+    ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    goto err;
+  }
+
+  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
+static interface_list_t *add_interface(const char *interface, int status,
+                                       int prev_status) {
+  interface_list_t *il;
+  char *interface2;
+
+  il = malloc(sizeof(*il));
+  if (il == NULL) {
+    char errbuf[1024];
+    ERROR("connectivity plugin: malloc failed during add_interface: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    return NULL;
+  }
+
+  interface2 = strdup(interface);
+  if (interface2 == NULL) {
+    char errbuf[1024];
+    sfree(il);
+    ERROR("connectivity plugin: strdup failed during add_interface: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    return NULL;
+  }
+
+  il->interface = interface2;
+  il->status = status;
+  il->prev_status = prev_status;
+  il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+  il->sent = 0;
+  il->next = interface_list_head;
+  interface_list_head = il;
+
+  DEBUG("connectivity plugin: added interface %s", interface2);
+
+  return il;
+}
+
+static int connectivity_link_state(struct nlmsghdr *msg) {
+  int retval = 0;
+  struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
+  struct nlattr *attr;
+  const char *dev = NULL;
+
+  pthread_mutex_lock(&connectivity_lock);
+
+  interface_list_t *il = NULL;
+
+  /* Scan attribute list for device name. */
+  mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
+    if (mnl_attr_get_type(attr) != IFLA_IFNAME)
+      continue;
+
+    if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
+      ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
+            "mnl_attr_validate "
+            "failed.");
+      pthread_mutex_unlock(&connectivity_lock);
+      return MNL_CB_ERROR;
+    }
+
+    dev = mnl_attr_get_str(attr);
+
+    // Check the list of interfaces we should monitor, if we've chosen
+    // a subset.  If we don't care about this one, abort.
+    if (ignorelist_match(ignorelist, dev) != 0) {
+      DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
+            "interface: %s",
+            dev);
+      break;
+    }
+
+    for (il = interface_list_head; il != NULL; il = il->next)
+      if (strcmp(dev, il->interface) == 0)
+        break;
+
+    uint32_t prev_status;
+
+    if (il == NULL) {
+      // We haven't encountered this interface yet, so add it to the linked list
+      il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
+
+      if (il == NULL) {
+        ERROR("connectivity plugin: unable to add interface %s during "
+              "connectivity_link_state",
+              dev);
+        return MNL_CB_ERROR;
+      }
+    }
+
+    prev_status = il->status;
+    il->status =
+        ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
+    il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+
+    // If the new status is different than the previous status,
+    // store the previous status and set sent to zero
+    if (il->status != prev_status) {
+      il->prev_status = prev_status;
+      il->sent = 0;
+    }
+
+    DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
+          il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
+
+    // no need to loop again, we found the interface name attr
+    // (otherwise the first if-statement in the loop would
+    // have moved us on with 'continue')
+    break;
+  }
+
+  pthread_mutex_unlock(&connectivity_lock);
+
+  return retval;
+}
+
+static int msg_handler(struct nlmsghdr *msg) {
+  switch (msg->nlmsg_type) {
+  case RTM_NEWADDR:
+  case RTM_DELADDR:
+  case RTM_NEWROUTE:
+  case RTM_DELROUTE:
+  case RTM_DELLINK:
+    // Not of interest in current version
+    break;
+  case RTM_NEWLINK:
+    connectivity_link_state(msg);
+    break;
+  default:
+    ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
+          msg->nlmsg_type);
+    break;
+  }
+  return 0;
+}
+
+static int read_event(struct mnl_socket *nl,
+                      int (*msg_handler)(struct nlmsghdr *)) {
+  int status;
+  int ret = 0;
+  char buf[4096];
+  struct nlmsghdr *h;
+
+  if (nl == NULL)
+    return ret;
+
+  status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
+
+  if (status < 0) {
+    /* Socket non-blocking so bail out once we have read everything */
+    if (errno == EWOULDBLOCK || errno == EAGAIN)
+      return ret;
+
+    /* Anything else is an error */
+    ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
+          status);
+    return status;
+  }
+
+  if (status == 0) {
+    DEBUG("connectivity plugin: read_event: EOF\n");
+  }
+
+  /* We need to handle more than one message per 'recvmsg' */
+  for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
+       h = NLMSG_NEXT(h, status)) {
+    /* Finish reading */
+    if (h->nlmsg_type == NLMSG_DONE)
+      return ret;
+
+    /* Message is some kind of error */
+    if (h->nlmsg_type == NLMSG_ERROR) {
+      ERROR("connectivity plugin: read_event: Message is an error\n");
+      return -1; // Error
+    }
+
+    /* Call message handler */
+    if (msg_handler) {
+      ret = (*msg_handler)(h);
+      if (ret < 0) {
+        ERROR("connectivity plugin: read_event: Message handler error %d\n",
+              ret);
+        return ret;
+      }
+    } else {
+      ERROR("connectivity plugin: read_event: Error NULL message handler\n");
+      return -1;
+    }
+  }
+
+  return ret;
+}
+
+static void *connectivity_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_lock);
+
+  while (connectivity_thread_loop > 0) {
+    int status;
+
+    pthread_mutex_unlock(&connectivity_lock);
+
+    status = read_event(sock, msg_handler);
+
+    pthread_mutex_lock(&connectivity_lock);
+
+    if (status < 0) {
+      connectivity_thread_error = 1;
+      break;
+    }
+
+    if (connectivity_thread_loop <= 0)
+      break;
+  } /* while (connectivity_thread_loop > 0) */
+
+  pthread_mutex_unlock(&connectivity_lock);
+
+  return ((void *)0);
+} /* }}} void *connectivity_thread */
+
+static int start_thread(void) /* {{{ */
+{
+  int status;
+
+  pthread_mutex_lock(&connectivity_lock);
+
+  if (connectivity_thread_loop != 0) {
+    pthread_mutex_unlock(&connectivity_lock);
+    return (0);
+  }
+
+  connectivity_thread_loop = 1;
+  connectivity_thread_error = 0;
+
+  if (sock == NULL) {
+    sock = mnl_socket_open(NETLINK_ROUTE);
+    if (sock == NULL) {
+      ERROR(
+          "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
+      pthread_mutex_unlock(&connectivity_lock);
+      return (-1);
+    }
+
+    if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
+      ERROR(
+          "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
+      pthread_mutex_unlock(&connectivity_lock);
+      return (1);
+    }
+  }
+
+  status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
+                                connectivity_thread,
+                                /* arg = */ (void *)0, "connectivity");
+  if (status != 0) {
+    connectivity_thread_loop = 0;
+    ERROR("connectivity plugin: Starting thread failed.");
+    pthread_mutex_unlock(&connectivity_lock);
+    mnl_socket_close(sock);
+    return (-1);
+  }
+
+  pthread_mutex_unlock(&connectivity_lock);
+  return (0);
+} /* }}} int start_thread */
+
+static int stop_thread(int shutdown) /* {{{ */
+{
+  int status;
+
+  if (sock != NULL)
+    mnl_socket_close(sock);
+
+  pthread_mutex_lock(&connectivity_lock);
+
+  if (connectivity_thread_loop == 0) {
+    pthread_mutex_unlock(&connectivity_lock);
+    return (-1);
+  }
+
+  connectivity_thread_loop = 0;
+  pthread_cond_broadcast(&connectivity_cond);
+  pthread_mutex_unlock(&connectivity_lock);
+
+  if (shutdown == 1) {
+    // Since the thread is blocking, calling pthread_join
+    // doesn't actually succeed in stopping it.  It will stick around
+    // until a NETLINK message is received on the socket (at which
+    // it will realize that "connectivity_thread_loop" is 0 and will
+    // break out of the read loop and be allowed to die).  This is
+    // fine when the process isn't supposed to be exiting, but in
+    // the case of a process shutdown, we don't want to have an
+    // idle thread hanging around.  Calling pthread_cancel here in
+    // the case of a shutdown is just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("connectivity plugin: Canceling thread for process shutdown");
+
+    status = pthread_cancel(connectivity_thread_id);
+
+    if (status != 0) {
+      ERROR("connectivity plugin: Unable to cancel thread: %d", status);
+      status = -1;
+    }
+  } else {
+    status = pthread_join(connectivity_thread_id, /* return = */ NULL);
+    if (status != 0) {
+      ERROR("connectivity plugin: Stopping thread failed.");
+      status = -1;
+    }
+  }
+
+  pthread_mutex_lock(&connectivity_lock);
+  memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
+  connectivity_thread_error = 0;
+  pthread_mutex_unlock(&connectivity_lock);
+
+  DEBUG("connectivity plugin: Finished requesting stop of thread");
+
+  return (status);
+} /* }}} int stop_thread */
+
+static int connectivity_init(void) /* {{{ */
+{
+  if (monitor_all_interfaces) {
+    NOTICE("connectivity plugin: No interfaces have been selected, so all will "
+           "be monitored");
+  }
+
+  return (start_thread());
+} /* }}} int connectivity_init */
+
+static int connectivity_config(const char *key, const char *value) /* {{{ */
+{
+  if (ignorelist == NULL) {
+    ignorelist = ignorelist_create(/* invert = */ 1);
+  }
+
+  if (strcasecmp(key, "Interface") == 0) {
+    ignorelist_add(ignorelist, value);
+    monitor_all_interfaces = 0;
+  } else {
+    return (-1);
+  }
+
+  return (0);
+} /* }}} int connectivity_config */
+
+static void connectivity_dispatch_notification(
+    const char *interface, const char *type, /* {{{ */
+    gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
+  char *buf = NULL;
+  notification_t n = {
+      NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
+
+  if (value == LINK_STATE_UP)
+    n.severity = NOTIF_OKAY;
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+  sstrncpy(n.type, "gauge", sizeof(n.type));
+  sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
+
+  gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+  notification_meta_t *m = calloc(1, sizeof(*m));
+
+  if (m == NULL) {
+    char errbuf[1024];
+    sfree(buf);
+    ERROR("connectivity plugin: unable to allocate metadata: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    return;
+  }
+
+  sstrncpy(m->name, "ves", sizeof(m->name));
+  m->nm_value.nm_string = sstrdup(buf);
+  m->type = NM_TYPE_STRING;
+  n.meta = m;
+
+  DEBUG("connectivity plugin: notification message: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("connectivity plugin: dispatching state %d for interface %s",
+        (int)value, interface);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // malloc'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
+static int connectivity_read(void) /* {{{ */
+{
+  if (connectivity_thread_error != 0) {
+    ERROR("connectivity plugin: The interface thread had a problem. Restarting "
+          "it.");
+
+    stop_thread(0);
+
+    for (interface_list_t *il = interface_list_head; il != NULL;
+         il = il->next) {
+      il->status = LINK_STATE_UNKNOWN;
+      il->prev_status = LINK_STATE_UNKNOWN;
+      il->sent = 0;
+    }
+
+    start_thread();
+
+    return (-1);
+  } /* if (connectivity_thread_error != 0) */
+
+  for (interface_list_t *il = interface_list_head; il != NULL;
+       il = il->next) /* {{{ */
+  {
+    uint32_t status;
+    uint32_t prev_status;
+    uint32_t sent;
+
+    pthread_mutex_lock(&connectivity_lock);
+
+    status = il->status;
+    prev_status = il->prev_status;
+    sent = il->sent;
+
+    if (status != prev_status && sent == 0) {
+      connectivity_dispatch_notification(il->interface, "gauge", status,
+                                         prev_status, il->timestamp);
+      il->sent = 1;
+    }
+
+    pthread_mutex_unlock(&connectivity_lock);
+  } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+
+  return (0);
+} /* }}} int connectivity_read */
+
+static int connectivity_shutdown(void) /* {{{ */
+{
+  interface_list_t *il;
+
+  DEBUG("connectivity plugin: Shutting down thread.");
+  if (stop_thread(1) < 0)
+    return (-1);
+
+  il = interface_list_head;
+  while (il != NULL) {
+    interface_list_t *il_next;
+
+    il_next = il->next;
+
+    sfree(il->interface);
+    sfree(il);
+
+    il = il_next;
+  }
+
+  ignorelist_free(ignorelist);
+
+  return (0);
+} /* }}} int connectivity_shutdown */
+
+void module_register(void) {
+  plugin_register_config("connectivity", connectivity_config, config_keys,
+                         config_keys_num);
+  plugin_register_init("connectivity", connectivity_init);
+  plugin_register_read("connectivity", connectivity_read);
+  plugin_register_shutdown("connectivity", connectivity_shutdown);
+} /* void module_register */
index f4933ee..3f8b581 100644 (file)
@@ -33,6 +33,7 @@ compression             uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
 compression_ratio       value:GAUGE:0:2
 commands                value:DERIVE:0:U
 connections             value:DERIVE:0:U
+connectivity            value:GAUGE:0:2
 conntrack               value:GAUGE:0:4294967295
 contextswitch           value:DERIVE:0:U
 cookies                 value:DERIVE:0:U