More styling + remove superfluous dequeue thread error logic
[collectd.git] / src / connectivity.c
index f6356f0..ec2f7e3 100644 (file)
@@ -120,13 +120,13 @@ static int connectivity_netlink_thread_loop = 0;
 static int connectivity_netlink_thread_error = 0;
 static pthread_t connectivity_netlink_thread_id;
 static int connectivity_dequeue_thread_loop = 0;
-static int connectivity_dequeue_thread_error = 0;
 static pthread_t connectivity_dequeue_thread_id;
-static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
-// static struct mnl_socket *sock;
 static int nl_sock = -1;
 static int event_id = 0;
+static int unsent_statuses = 0;
 
 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
@@ -378,17 +378,14 @@ static int gen_message_payload(int state, int old_state, const char *interface,
   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
     goto err;
 
-  *buf = malloc(strlen((char *)buf2) + 1);
+  *buf = strdup((char *)buf2);
 
   if (*buf == NULL) {
-    char errbuf[1024];
-    ERROR("connectivity plugin: malloc failed during gen_message_payload: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
+          STRERRNO);
     goto err;
   }
 
-  sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
-
   yajl_gen_free(g);
 
   return 0;
@@ -401,23 +398,19 @@ err:
 
 static interface_list_t *add_interface(const char *interface, int status,
                                        int prev_status) {
-  interface_list_t *il;
-  char *interface2;
+  interface_list_t *il = calloc(1, sizeof(*il));
 
-  il = malloc(sizeof(*il));
   if (il == NULL) {
-    char errbuf[1024];
-    ERROR("connectivity plugin: malloc failed during add_interface: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("connectivity plugin: calloc failed during add_interface: %s",
+          STRERRNO);
     return NULL;
   }
 
-  interface2 = strdup(interface);
+  char *interface2 = strdup(interface);
   if (interface2 == NULL) {
-    char errbuf[1024];
     sfree(il);
     ERROR("connectivity plugin: strdup failed during add_interface: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+          STRERRNO);
     return NULL;
   }
 
@@ -435,14 +428,10 @@ static interface_list_t *add_interface(const char *interface, int status,
 }
 
 static int connectivity_link_state(struct nlmsghdr *msg) {
-  int retval = 0;
-  struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
-  struct nlattr *attr;
-  const char *dev = NULL;
+  pthread_mutex_lock(&connectivity_data_lock);
 
-  pthread_mutex_lock(&connectivity_lock);
-
-  interface_list_t *il = NULL;
+  struct nlattr *attr;
+  struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
 
   /* Scan attribute list for device name. */
   mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
@@ -453,11 +442,11 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
       ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
             "mnl_attr_validate "
             "failed.");
-      pthread_mutex_unlock(&connectivity_lock);
+      pthread_mutex_unlock(&connectivity_data_lock);
       return MNL_CB_ERROR;
     }
 
-    dev = mnl_attr_get_str(attr);
+    const char *dev = mnl_attr_get_str(attr);
 
     // Check the list of interfaces we should monitor, if we've chosen
     // a subset.  If we don't care about this one, abort.
@@ -468,12 +457,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
       break;
     }
 
+    interface_list_t *il = NULL;
+
     for (il = interface_list_head; il != NULL; il = il->next)
       if (strcmp(dev, il->interface) == 0)
         break;
 
-    uint32_t prev_status;
-
     if (il == NULL) {
       // We haven't encountered this interface yet, so add it to the linked list
       il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
@@ -486,16 +475,20 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
       }
     }
 
+    uint32_t prev_status;
+
     prev_status = il->status;
     il->status =
         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
     il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
 
     // If the new status is different than the previous status,
-    // store the previous status and set sent to zero
+    // store the previous status and set sent to zero, and set the
+    // global flag to indicate there are statuses to dispatch
     if (il->status != prev_status) {
       il->prev_status = prev_status;
       il->sent = 0;
+      unsent_statuses = 1;
     }
 
     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
@@ -507,9 +500,9 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
     break;
   }
 
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_data_lock);
 
-  return retval;
+  return 0;
 }
 
 static int msg_handler(struct nlmsghdr *msg) {
@@ -525,39 +518,32 @@ static int msg_handler(struct nlmsghdr *msg) {
     connectivity_link_state(msg);
     break;
   default:
-    ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
+    ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d",
           msg->nlmsg_type);
     break;
   }
   return 0;
 }
 
-// static int read_event(struct mnl_socket *nl,
-//                       int (*msg_handler)(struct nlmsghdr *)) {
 static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
-  int status;
   int ret = 0;
-  char buf[4096];
-  struct nlmsghdr *h;
   int recv_flags = MSG_DONTWAIT;
 
-  // if (nl == NULL)
-  //   return ret;
-
   if (nl == -1)
     return ret;
 
   while (42) {
-    pthread_mutex_lock(&connectivity_lock);
+    pthread_mutex_lock(&connectivity_threads_lock);
 
     if (connectivity_netlink_thread_loop <= 0) {
-      pthread_mutex_unlock(&connectivity_lock);
+      pthread_mutex_unlock(&connectivity_threads_lock);
       return ret;
     }
 
-    pthread_mutex_unlock(&connectivity_lock);
+    pthread_mutex_unlock(&connectivity_threads_lock);
 
-    status = recv(nl, buf, sizeof(buf), recv_flags);
+    char buf[4096];
+    int status = recv(nl, buf, sizeof(buf), recv_flags);
 
     if (status < 0) {
 
@@ -566,19 +552,14 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
       // any saved interface status changes.  Then continue, but
       // block and wait for new messages
       if (errno == EWOULDBLOCK || errno == EAGAIN) {
-        pthread_mutex_lock(&connectivity_lock);
         pthread_cond_signal(&connectivity_cond);
-        pthread_mutex_unlock(&connectivity_lock);
 
         recv_flags = 0;
         continue;
       }
 
       /* Anything else is an error */
-      // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom:
-      // %d\n",
-      //       status);
-      ERROR("connectivity plugin: read_event: Error recv: %d\n", status);
+      ERROR("connectivity plugin: read_event: Error recv: %d", status);
       return status;
     }
 
@@ -588,19 +569,19 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
     recv_flags = MSG_DONTWAIT;
 
     if (status == 0) {
-      DEBUG("connectivity plugin: read_event: EOF\n");
+      DEBUG("connectivity plugin: read_event: EOF");
     }
 
     /* We need to handle more than one message per 'recvmsg' */
-    for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
-         h = NLMSG_NEXT(h, status)) {
+    for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
+         NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
       /* Finish reading */
       if (h->nlmsg_type == NLMSG_DONE)
         return ret;
 
       /* Message is some kind of error */
       if (h->nlmsg_type == NLMSG_ERROR) {
-        ERROR("connectivity plugin: read_event: Message is an error\n");
+        ERROR("connectivity plugin: read_event: Message is an error");
         return -1; // Error
       }
 
@@ -608,12 +589,12 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
       if (msg_handler) {
         ret = (*msg_handler)(h);
         if (ret < 0) {
-          ERROR("connectivity plugin: read_event: Message handler error %d\n",
+          ERROR("connectivity plugin: read_event: Message handler error %d",
                 ret);
           return ret;
         }
       } else {
-        ERROR("connectivity plugin: read_event: Error NULL message handler\n");
+        ERROR("connectivity plugin: read_event: Error NULL message handler");
         return -1;
       }
     }
@@ -622,6 +603,7 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
   return ret;
 }
 
+// NOTE: Caller MUST hold connectivity_data_lock when calling this function
 static void send_interface_status() {
   for (interface_list_t *il = interface_list_head; il != NULL;
        il = il->next) /* {{{ */
@@ -640,102 +622,75 @@ static void send_interface_status() {
       il->sent = 1;
     }
   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+
+  unsent_statuses = 0;
 }
 
-static int read_interface_status() /* {{{ */
+static void read_interface_status() /* {{{ */
 {
-  pthread_mutex_lock(&connectivity_lock);
-
-  // This first attempt is necessary because the netlink thread
-  // might have held the lock while this thread was blocked on
-  // the lock acquisition just above.  And while the netlink thread
-  // had the lock, it could have called pthread_cond_singal, which
-  // obviously wouldn't have woken this thread, since this thread
-  // was not yet waiting on the condition signal.  So we need to
-  // loop through the interfaces and check if any have changed
-  // status before we wait on the condition signal
-  send_interface_status();
+  pthread_mutex_lock(&connectivity_data_lock);
 
-  pthread_cond_wait(&connectivity_cond, &connectivity_lock);
+  if (!unsent_statuses)
+    pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
 
   send_interface_status();
 
-  pthread_mutex_unlock(&connectivity_lock);
-
-  return 0;
+  pthread_mutex_unlock(&connectivity_data_lock);
 } /* }}} int *read_interface_status */
 
 static void *connectivity_netlink_thread(void *arg) /* {{{ */
 {
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   while (connectivity_netlink_thread_loop > 0) {
-    int status;
+    pthread_mutex_unlock(&connectivity_threads_lock);
 
-    pthread_mutex_unlock(&connectivity_lock);
+    int status = read_event(nl_sock, msg_handler);
 
-    status = read_event(nl_sock, msg_handler);
-
-    pthread_mutex_lock(&connectivity_lock);
+    pthread_mutex_lock(&connectivity_threads_lock);
 
     if (status < 0) {
       connectivity_netlink_thread_error = 1;
       break;
     }
-
-    if (connectivity_netlink_thread_loop <= 0)
-      break;
   } /* while (connectivity_netlink_thread_loop > 0) */
 
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
-  return ((void *)0);
+  return (void *)0;
 } /* }}} void *connectivity_netlink_thread */
 
 static void *connectivity_dequeue_thread(void *arg) /* {{{ */
 {
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   while (connectivity_dequeue_thread_loop > 0) {
-    int status;
-
-    pthread_mutex_unlock(&connectivity_lock);
-
-    status = read_interface_status();
-
-    pthread_mutex_lock(&connectivity_lock);
+    pthread_mutex_unlock(&connectivity_threads_lock);
 
-    if (status < 0) {
-      connectivity_dequeue_thread_error = 1;
-      break;
-    }
+    read_interface_status();
 
-    if (connectivity_dequeue_thread_loop <= 0)
-      break;
+    pthread_mutex_lock(&connectivity_threads_lock);
   } /* while (connectivity_dequeue_thread_loop > 0) */
 
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
   return ((void *)0);
 } /* }}} void *connectivity_dequeue_thread */
 
 static int nl_connect() {
-  int rc;
-  struct sockaddr_nl sa_nl;
+  struct sockaddr_nl sa_nl = {
+      .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
+  };
 
   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
   if (nl_sock == -1) {
-    ERROR("connectivity plugin: socket open failed: %d", errno);
+    ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
     return -1;
   }
 
-  sa_nl.nl_family = AF_NETLINK;
-  sa_nl.nl_groups = RTMGRP_LINK;
-  sa_nl.nl_pid = getpid();
-
-  rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+  int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
   if (rc == -1) {
-    ERROR("connectivity plugin: socket bind failed: %d", errno);
+    ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
     close(nl_sock);
     return -1;
   }
@@ -745,23 +700,25 @@ static int nl_connect() {
 
 static int start_netlink_thread(void) /* {{{ */
 {
-  int status;
-
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   if (connectivity_netlink_thread_loop != 0) {
-    pthread_mutex_unlock(&connectivity_lock);
-    return (0);
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return 0;
   }
 
   connectivity_netlink_thread_loop = 1;
   connectivity_netlink_thread_error = 0;
 
+  int status;
+
   if (nl_sock == -1) {
     status = nl_connect();
 
-    if (status != 0)
+    if (status != 0) {
+      pthread_mutex_unlock(&connectivity_threads_lock);
       return status;
+    }
   }
 
   status = plugin_thread_create(&connectivity_netlink_thread_id,
@@ -770,61 +727,57 @@ static int start_netlink_thread(void) /* {{{ */
   if (status != 0) {
     connectivity_netlink_thread_loop = 0;
     ERROR("connectivity plugin: Starting thread failed.");
-    pthread_mutex_unlock(&connectivity_lock);
+    pthread_mutex_unlock(&connectivity_threads_lock);
 
     int status2 = close(nl_sock);
 
     if (status2 != 0) {
       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
-            status2, strerror(errno));
+            status2, STRERRNO);
     } else
       nl_sock = -1;
 
-    return (-1);
+    return -1;
   }
 
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
   return status;
 }
 
 static int start_dequeue_thread(void) /* {{{ */
 {
-  int status;
-
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   if (connectivity_dequeue_thread_loop != 0) {
-    pthread_mutex_unlock(&connectivity_lock);
-    return (0);
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return 0;
   }
 
   connectivity_dequeue_thread_loop = 1;
-  connectivity_dequeue_thread_error = 0;
 
-  status = plugin_thread_create(&connectivity_dequeue_thread_id,
-                                /* attr = */ NULL, connectivity_dequeue_thread,
-                                /* arg = */ (void *)0, "connectivity");
+  int status =
+      plugin_thread_create(&connectivity_dequeue_thread_id,
+                           /* attr = */ NULL, connectivity_dequeue_thread,
+                           /* arg = */ (void *)0, "connectivity");
   if (status != 0) {
     connectivity_dequeue_thread_loop = 0;
     ERROR("connectivity plugin: Starting dequeue thread failed.");
-    pthread_mutex_unlock(&connectivity_lock);
-    return (-1);
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return -1;
   }
 
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
   return status;
 } /* }}} int start_dequeue_thread */
 
 static int start_threads(void) /* {{{ */
 {
-  int status, status2;
-
-  status = start_netlink_thread();
-  status2 = start_dequeue_thread();
+  int status = start_netlink_thread();
+  int status2 = start_dequeue_thread();
 
-  if (status < 0)
+  if (status != 0)
     return status;
   else
     return status2;
@@ -832,28 +785,35 @@ static int start_threads(void) /* {{{ */
 
 static int stop_netlink_thread(int shutdown) /* {{{ */
 {
-  int status;
+  int socket_status;
 
   if (nl_sock != -1) {
-    status = close(nl_sock);
-    if (status != 0) {
+    socket_status = close(nl_sock);
+    if (socket_status != 0) {
       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
-            status, strerror(errno));
-      return (-1);
+            socket_status, STRERRNO);
     } else
       nl_sock = -1;
-  }
+  } else
+    socket_status = 0;
 
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   if (connectivity_netlink_thread_loop == 0) {
-    pthread_mutex_unlock(&connectivity_lock);
-    return (-1);
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    // Thread has already been terminated, nothing more to attempt
+    return socket_status;
   }
 
+  // Set thread termination status
   connectivity_netlink_thread_loop = 0;
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  // Let threads waiting on access to the interface list know to move
+  // on such that they'll see the thread's termination status
   pthread_cond_broadcast(&connectivity_cond);
-  pthread_mutex_unlock(&connectivity_lock);
+
+  int thread_status;
 
   if (shutdown == 1) {
     // Since the thread is blocking, calling pthread_join
@@ -869,47 +829,57 @@ static int stop_netlink_thread(int shutdown) /* {{{ */
 
     DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
 
-    status = pthread_cancel(connectivity_netlink_thread_id);
+    thread_status = pthread_cancel(connectivity_netlink_thread_id);
 
-    if (status != 0 && status != ESRCH) {
-      ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status);
-      status = -1;
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
+            thread_status);
+      thread_status = -1;
     } else
-      status = 0;
+      thread_status = 0;
   } else {
-    status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
-    if (status != 0 && status != ESRCH) {
-      ERROR("connectivity plugin: Stopping netlink thread failed.");
-      status = -1;
+    thread_status =
+        pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("connectivity plugin: Stopping netlink thread failed: %d",
+            thread_status);
+      thread_status = -1;
     } else
-      return 0;
+      thread_status = 0;
   }
 
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
   memset(&connectivity_netlink_thread_id, 0,
          sizeof(connectivity_netlink_thread_id));
   connectivity_netlink_thread_error = 0;
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
   DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
 
-  return status;
+  if (socket_status != 0)
+    return socket_status;
+  else
+    return thread_status;
 }
 
 static int stop_dequeue_thread(int shutdown) /* {{{ */
 {
-  int status;
-
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   if (connectivity_dequeue_thread_loop == 0) {
-    pthread_mutex_unlock(&connectivity_lock);
-    return (-1);
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return -1;
   }
 
+  // Set thread termination status
   connectivity_dequeue_thread_loop = 0;
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  // Let threads waiting on access to the interface list know to move
+  // on such that they'll see the threads termination status
   pthread_cond_broadcast(&connectivity_cond);
-  pthread_mutex_unlock(&connectivity_lock);
+
+  int status;
 
   if (shutdown == 1) {
     // Calling pthread_cancel here in
@@ -934,25 +904,22 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */
       status = 0;
   }
 
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
   memset(&connectivity_dequeue_thread_id, 0,
          sizeof(connectivity_dequeue_thread_id));
-  connectivity_dequeue_thread_error = 0;
-  pthread_mutex_unlock(&connectivity_lock);
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
   DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
 
-  return (status);
+  return status;
 } /* }}} int stop_dequeue_thread */
 
 static int stop_threads(int shutdown) /* {{{ */
 {
-  int status, status2;
-
-  status = stop_netlink_thread(shutdown);
-  status2 = stop_dequeue_thread(shutdown);
+  int status = stop_netlink_thread(shutdown);
+  int status2 = stop_dequeue_thread(shutdown);
 
-  if (status < 0)
+  if (status != 0)
     return status;
   else
     return status2;
@@ -965,7 +932,7 @@ static int connectivity_init(void) /* {{{ */
            "be monitored");
   }
 
-  return (start_threads());
+  return start_threads();
 } /* }}} int connectivity_init */
 
 static int connectivity_config(const char *key, const char *value) /* {{{ */
@@ -983,37 +950,41 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
       invert = 0;
     ignorelist_set_invert(ignorelist, invert);
   } else {
-    return (-1);
+    return -1;
   }
 
-  return (0);
+  return 0;
 } /* }}} int connectivity_config */
 
 static void
 connectivity_dispatch_notification(const char *interface, const char *type,
                                    gauge_t value, gauge_t old_value,
                                    long long unsigned int timestamp) {
-  char *buf = NULL;
-  notification_t n = {
-      NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
 
-  if (value == LINK_STATE_UP)
-    n.severity = NOTIF_OKAY;
+  notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
+                      cdtime(),
+                      "",
+                      "",
+                      "connectivity",
+                      "",
+                      "",
+                      "",
+                      NULL};
 
   sstrncpy(n.host, hostname_g, sizeof(n.host));
   sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
   sstrncpy(n.type, "gauge", sizeof(n.type));
   sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
 
+  char *buf = NULL;
+
   gen_message_payload(value, old_value, interface, timestamp, &buf);
 
   notification_meta_t *m = calloc(1, sizeof(*m));
 
   if (m == NULL) {
-    char errbuf[1024];
     sfree(buf);
-    ERROR("connectivity plugin: unable to allocate metadata: %s",
-          sstrerror(errno, errbuf, sizeof(errbuf)));
+    ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO);
     return;
   }
 
@@ -1031,18 +1002,18 @@ connectivity_dispatch_notification(const char *interface, const char *type,
   plugin_dispatch_notification(&n);
   plugin_notification_meta_free(n.meta);
 
-  // malloc'd in gen_message_payload
+  // strdup'd in gen_message_payload
   if (buf != NULL)
     sfree(buf);
 }
 
 static int connectivity_read(void) /* {{{ */
 {
-  pthread_mutex_lock(&connectivity_lock);
+  pthread_mutex_lock(&connectivity_threads_lock);
 
   if (connectivity_netlink_thread_error != 0) {
 
-    pthread_mutex_unlock(&connectivity_lock);
+    pthread_mutex_unlock(&connectivity_threads_lock);
 
     ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
           "it.");
@@ -1058,37 +1029,21 @@ static int connectivity_read(void) /* {{{ */
 
     start_netlink_thread();
 
-    return (-1);
+    return -1;
   } /* if (connectivity_netlink_thread_error != 0) */
 
-  if (connectivity_dequeue_thread_error != 0) {
+  pthread_mutex_unlock(&connectivity_threads_lock);
 
-    pthread_mutex_unlock(&connectivity_lock);
-
-    ERROR("connectivity plugin: The dequeue thread had a problem. Restarting "
-          "it.");
-
-    stop_dequeue_thread(0);
-
-    start_dequeue_thread();
-
-    return (-1);
-  } /* if (connectivity_dequeue_thread_error != 0) */
-
-  pthread_mutex_unlock(&connectivity_lock);
-
-  return (0);
+  return 0;
 } /* }}} int connectivity_read */
 
 static int connectivity_shutdown(void) /* {{{ */
 {
-  interface_list_t *il;
-
   DEBUG("connectivity plugin: Shutting down thread.");
-  if (stop_threads(1) < 0)
-    return (-1);
 
-  il = interface_list_head;
+  int status = stop_threads(1);
+
+  interface_list_t *il = interface_list_head;
   while (il != NULL) {
     interface_list_t *il_next;
 
@@ -1102,7 +1057,7 @@ static int connectivity_shutdown(void) /* {{{ */
 
   ignorelist_free(ignorelist);
 
-  return (0);
+  return status;
 } /* }}} int connectivity_shutdown */
 
 void module_register(void) {