connectivity notifications
authorAndrew Bays <abays@redhat.com>
Thu, 18 Jan 2018 14:20:11 +0000 (09:20 -0500)
committerAndrew Bays <abays@redhat.com>
Thu, 18 Jan 2018 14:20:11 +0000 (09:20 -0500)
Makefile.am
configure.ac
src/connectivity.c

index e9a3813..df4cccd 100644 (file)
@@ -641,8 +641,9 @@ if BUILD_PLUGIN_CONNECTIVITY
 pkglib_LTLIBRARIES += connectivity.la
 connectivity_la_SOURCES = src/connectivity.c
 connectivity_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBMNL_CFLAGS)
-connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS)
-connectivity_la_LIBADD = $(BUILD_WITH_LIBMNL_LIBS)
+connectivity_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+connectivity_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) $(BUILD_WITH_LIBMNL_LIBS)
 endif
 
 if BUILD_PLUGIN_CONNTRACK
index e5b7eba..d664448 100644 (file)
@@ -6111,6 +6111,7 @@ plugin_battery="no"
 plugin_bind="no"
 plugin_ceph="no"
 plugin_cgroups="no"
+plugin_connectivity="no"
 plugin_conntrack="no"
 plugin_contextswitch="no"
 plugin_cpu="no"
@@ -6223,6 +6224,10 @@ if test "x$ac_system" = "xLinux"; then
   if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
     plugin_ovs_events="yes"
     plugin_ovs_stats="yes"
+
+    if test "x$with_libmnl" = "xyes"; then
+      plugin_connectivity="yes"
+    fi
   fi
 fi
 
@@ -6522,7 +6527,7 @@ AC_PLUGIN([bind],                [$plugin_bind],            [ISC Bind nameserver
 AC_PLUGIN([ceph],                [$plugin_ceph],            [Ceph daemon statistics])
 AC_PLUGIN([cgroups],             [$plugin_cgroups],         [CGroups CPU usage accounting])
 AC_PLUGIN([chrony],              [yes],                     [Chrony statistics])
-AC_PLUGIN([connectivity],        [$with_libmnl],            [Network interface up/down statistics])
+AC_PLUGIN([connectivity],        [$plugin_connectivity],    [Network interface up/down events])
 AC_PLUGIN([conntrack],           [$plugin_conntrack],       [nf_conntrack statistics])
 AC_PLUGIN([contextswitch],       [$plugin_contextswitch],   [context switch statistics])
 AC_PLUGIN([cpu],                 [$plugin_cpu],             [CPU usage statistics])
index 5a27fe1..fea24fe 100644 (file)
 #include <linux/netlink.h>
 #include <linux/rtnetlink.h>
 
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#define HAVE_YAJL_V2 1
+#endif
+
 #define MYPROTO NETLINK_ROUTE
 
+#define CONNECTIVITY_DOMAIN_FIELD "domain"
+#define CONNECTIVITY_DOMAIN_VALUE "stateChange"
+#define CONNECTIVITY_EVENT_ID_FIELD "eventId"
+#define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
+#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
+#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
+#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define CONNECTIVITY_PRIORITY_FIELD "priority"
+#define CONNECTIVITY_PRIORITY_VALUE "high"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
+#define CONNECTIVITY_SEQUENCE_FIELD "sequence"
+#define CONNECTIVITY_SEQUENCE_VALUE "0"
+#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
+#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define CONNECTIVITY_VERSION_FIELD "version"
+#define CONNECTIVITY_VERSION_VALUE "1.0"
+
+#define CONNECTIVITY_NEW_STATE_FIELD "newState"
+#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_OLD_STATE_FIELD "oldState"
+#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
+  "stateChangeFieldsVersion"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
+#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
+
 /*
  * Private data types
  */
@@ -56,8 +95,7 @@ struct interfacelist_s {
   uint32_t status;
   uint32_t prev_status;
   uint32_t sent;
-  uint32_t sec;
-  uint32_t usec;
+  long long unsigned int timestamp;
 
   struct interfacelist_s *next;
 };
@@ -74,6 +112,7 @@ static pthread_t connectivity_thread_id;
 static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
 static struct mnl_socket *sock;
+static int event_id = 0;
 
 static const char *config_keys[] = {"Interface"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
@@ -82,6 +121,269 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
  * Private functions
  */
 
+static int gen_message_payload(int state, int old_state, const char *interface,
+                               long long unsigned int timestamp, char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {};
+
+  conf.beautify = 0;
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
+                      strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
+                      strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
+                      strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
+  char *event_id_str = malloc(event_id_len);
+  snprintf(event_id_str, event_id_len, "%d", event_id);
+
+  if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
+      yajl_gen_status_ok) {
+    sfree(event_id_str);
+    goto err;
+  }
+
+  sfree(event_id_str);
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
+                      strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int event_name_len = 0;
+  event_name_len = event_name_len + strlen(interface);    // interface name
+  event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
+  event_name_len =
+      event_name_len + 12; // "interface", 2 spaces and null-terminator
+  char *event_name_str = malloc(event_name_len);
+  memset(event_name_str, '\0', event_name_len);
+  snprintf(event_name_str, event_name_len, "interface %s %s", interface,
+           (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+                       : CONNECTIVITY_EVENT_NAME_UP_VALUE));
+
+  if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
+      yajl_gen_status_ok) {
+    sfree(event_name_str);
+    goto err;
+  }
+
+  sfree(event_name_str);
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int last_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
+  snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
+           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+
+  if (yajl_gen_number(g, last_epoch_microsec_str,
+                      strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
+    sfree(last_epoch_microsec_str);
+    goto err;
+  }
+
+  sfree(last_epoch_microsec_str);
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
+                      strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
+                      strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
+                      strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
+                      strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
+                      strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int start_epoch_microsec_len =
+      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
+  char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
+  snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
+           (long long unsigned int)timestamp);
+
+  if (yajl_gen_number(g, start_epoch_microsec_str,
+                      strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
+    sfree(start_epoch_microsec_str);
+    goto err;
+  }
+
+  sfree(start_epoch_microsec_str);
+
+  // version
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
+                      strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
+                      strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN state change fields ***
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // newState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
+                      strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int new_state_len =
+      (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
+                  : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(
+          g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
+                                   : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
+          new_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // oldState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
+                      strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int old_state_len =
+      (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
+                      : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(
+          g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
+                                       : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
+          old_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // stateChangeFieldsVersion
+  if (yajl_gen_string(g,
+                      (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // stateInterface
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
+                      strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END state change fields ***
+
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = malloc(strlen((char *)buf2) + 1);
+
+  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
 static int connectivity_link_state(struct nlmsghdr *msg) {
   int retval = 0;
   struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
@@ -112,35 +414,29 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
         break;
 
     if (il == NULL) {
-      INFO("connectivity plugin: Ignoring link state change for unmonitored "
-           "interface: %s",
-           dev);
+      DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
+            "interface: %s",
+            dev);
     } else {
       uint32_t prev_status;
-      struct timeval tv;
 
-      gettimeofday(&tv, NULL);
-
-      unsigned long long millisecondsSinceEpoch =
-          (unsigned long long)(tv.tv_sec) * 1000 +
-          (unsigned long long)(tv.tv_usec) / 1000;
-
-      INFO("connectivity plugin (%llu): Interface %s status is now %s",
-           millisecondsSinceEpoch, dev,
-           ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
       prev_status = il->status;
       il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0);
-      il->sec = tv.tv_sec;
-      il->usec = tv.tv_usec;
+      il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+
       // If the new status is different than the previous status,
       // store the previous status and set sent to zero
       if (il->status != prev_status) {
         il->prev_status = prev_status;
         il->sent = 0;
       }
+
+      DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
+            il->timestamp, dev,
+            ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
     }
 
-    // no need to loop again, we found the interface name
+    // no need to loop again, we found the interface name attr
     // (otherwise the first if-statement in the loop would
     // have moved us on with 'continue')
     break;
@@ -210,8 +506,7 @@ static int read_event(struct mnl_socket *nl,
 
     /* Message is some kind of error */
     if (h->nlmsg_type == NLMSG_ERROR) {
-      ERROR("connectivity plugin: read_event: Message is an error - decode "
-            "TBD\n");
+      ERROR("connectivity plugin: read_event: Message is an error\n");
       return -1; // Error
     }
 
@@ -335,7 +630,7 @@ static int stop_thread(int shutdown) /* {{{ */
     // the case of a shutdown is just assures that the thread is
     // gone and that the process has been fully terminated.
 
-    INFO("connectivity plugin: Canceling thread for process shutdown");
+    DEBUG("connectivity plugin: Canceling thread for process shutdown");
 
     status = pthread_cancel(connectivity_thread_id);
 
@@ -356,7 +651,7 @@ static int stop_thread(int shutdown) /* {{{ */
   connectivity_thread_error = 0;
   pthread_mutex_unlock(&connectivity_lock);
 
-  INFO("connectivity plugin: Finished requesting stop of thread");
+  DEBUG("connectivity plugin: Finished requesting stop of thread");
 
   return (status);
 } /* }}} int stop_thread */
@@ -397,6 +692,7 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
     il->interface = interface;
     il->status = 2; // "unknown"
     il->prev_status = 2;
+    il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
     il->sent = 0;
     il->next = interfacelist_head;
     interfacelist_head = il;
@@ -408,46 +704,54 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
   return (0);
 } /* }}} int connectivity_config */
 
-static void submit(const char *interface, const char *type, /* {{{ */
-                   gauge_t value, uint32_t sec, uint32_t usec) {
-  value_list_t vl = VALUE_LIST_INIT;
+static void connectivity_dispatch_notification(
+    const char *interface, const char *type, /* {{{ */
+    gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
+  char *buf = NULL;
+  notification_t n = {
+      NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
+
+  if (value == 1)
+    n.severity = NOTIF_OKAY;
+
   char hostname[1024];
-  vl.values = &(value_t){.gauge = value};
-  vl.values_len = 1;
-  sstrncpy(vl.plugin, "connectivity", sizeof(vl.plugin));
-  sstrncpy(vl.type_instance, interface, sizeof(vl.type_instance));
-  sstrncpy(vl.type, type, sizeof(vl.type));
-
-  // Create metadata to store JSON key-values
-  meta_data_t *meta = meta_data_create();
-
-  vl.meta = meta;
-  // For latency measurement
-  struct timeval tv;
-  gettimeofday(&tv, NULL);
   gethostname(hostname, sizeof(hostname));
-  char strSec[11];
-  char struSec[11];
-  snprintf(strSec, sizeof strSec, "%" PRIu32, sec);
-  snprintf(struSec, sizeof struSec, "%" PRIu32, usec);
-  if (value == 1) {
-    meta_data_add_string(meta, "condition", "interface_up");
-    meta_data_add_string(meta, "entity", interface);
-    meta_data_add_string(meta, "source", hostname);
-    meta_data_add_string(meta, "sec", strSec);
-    meta_data_add_string(meta, "usec", struSec);
-    meta_data_add_string(meta, "dest", "interface_down");
-  } else {
-    meta_data_add_string(meta, "condition", "interface_down");
-    meta_data_add_string(meta, "entity", interface);
-    meta_data_add_string(meta, "source", hostname);
-    meta_data_add_string(meta, "sec", strSec);
-    meta_data_add_string(meta, "usec", struSec);
-    meta_data_add_string(meta, "dest", "interface_up");
+
+  sstrncpy(n.host, hostname, sizeof(n.host));
+  sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+  sstrncpy(n.type, "gauge", sizeof(n.type));
+  sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
+
+  gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+  notification_meta_t *m = calloc(1, sizeof(*m));
+
+  if (m == NULL) {
+    char errbuf[1024];
+    sfree(buf);
+    ERROR("connectivity plugin: unable to allocate metadata: %s",
+          sstrerror(errno, errbuf, sizeof(errbuf)));
+    return;
   }
 
-  plugin_dispatch_values(&vl);
-} /* }}} void interface_submit */
+  sstrncpy(m->name, "ves", sizeof(m->name));
+  m->nm_value.nm_string = sstrdup(buf);
+  m->type = NM_TYPE_STRING;
+  n.meta = m;
+
+  DEBUG("connectivity plugin: notification message: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("connectivity plugin: dispatching state %d for interface %s",
+        (int)value, interface);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // malloc'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
 
 static int connectivity_read(void) /* {{{ */
 {
@@ -475,8 +779,6 @@ static int connectivity_read(void) /* {{{ */
     uint32_t prev_status;
     uint32_t sent;
 
-    /* Locking here works, because the structure of the linked list is only
-     * changed during configure and shutdown. */
     pthread_mutex_lock(&connectivity_lock);
 
     status = il->status;
@@ -484,8 +786,8 @@ static int connectivity_read(void) /* {{{ */
     sent = il->sent;
 
     if (status != prev_status && sent == 0) {
-      submit(il->interface, "gauge", status, il->sec, il->usec);
-
+      connectivity_dispatch_notification(il->interface, "gauge", status,
+                                         prev_status, il->timestamp);
       il->sent = 1;
     }
 
@@ -499,7 +801,7 @@ static int connectivity_shutdown(void) /* {{{ */
 {
   interfacelist_t *il;
 
-  INFO("connectivity plugin: Shutting down thread.");
+  DEBUG("connectivity plugin: Shutting down thread.");
   if (stop_thread(1) < 0)
     return (-1);