Merge pull request #3329 from efuss/fix-3311
[collectd.git] / src / connectivity.c
index c470c99..45b65aa 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
 #include "utils_complain.h"
-#include "utils_ignorelist.h"
 
 #include <asm/types.h>
 #include <errno.h>
@@ -101,7 +101,7 @@ struct interface_list_s {
   uint32_t status;
   uint32_t prev_status;
   uint32_t sent;
-  long long unsigned int timestamp;
+  cdtime_t timestamp;
 
   struct interface_list_s *next;
 };
@@ -126,34 +126,23 @@ static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
 static int nl_sock = -1;
 static int event_id = 0;
-static int unsent_statuses = 0;
+static int statuses_to_send = 0;
 
 static const char *config_keys[] = {"Interface", "IgnoreSelected"};
 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
 
 /*
- * Prototype
- */
-
-static void
-connectivity_dispatch_notification(const char *interface, const char *type,
-                                   gauge_t value, gauge_t old_value,
-                                   long long unsigned int timestamp);
-
-/*
  * Private functions
  */
 
 static int gen_message_payload(int state, int old_state, const char *interface,
-                               long long unsigned int timestamp, char **buf) {
+                               cdtime_t timestamp, char **buf) {
   const unsigned char *buf2;
   yajl_gen g;
   char json_str[DATA_MAX_NAME_LEN];
 
 #if !defined(HAVE_YAJL_V2)
-  yajl_gen_config conf = {};
-
-  conf.beautify = 0;
+  yajl_gen_config conf = {0};
 #endif
 
 #if HAVE_YAJL_V2
@@ -188,9 +177,9 @@ static int gen_message_payload(int state, int old_state, const char *interface,
     goto err;
 
   event_id = event_id + 1;
-  int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_id_len, "%d", event_id);
+  if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -202,15 +191,11 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       yajl_gen_status_ok)
     goto err;
 
-  int event_name_len = 0;
-  event_name_len = event_name_len + strlen(interface);    // interface name
-  event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
-  event_name_len =
-      event_name_len + 12; // "interface", 2 spaces and null-terminator
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, event_name_len, "interface %s %s", interface,
-           (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
-                       : CONNECTIVITY_EVENT_NAME_UP_VALUE));
+  if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
+               (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+                           : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
       yajl_gen_status_ok) {
@@ -223,11 +208,10 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       yajl_gen_status_ok)
     goto err;
 
-  int last_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, last_epoch_microsec_len, "%llu",
-           (long long unsigned int)CDTIME_T_TO_US(cdtime()));
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(cdtime())) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -282,11 +266,10 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       yajl_gen_status_ok)
     goto err;
 
-  int start_epoch_microsec_len =
-      sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
-  memset(json_str, '\0', DATA_MAX_NAME_LEN);
-  snprintf(json_str, start_epoch_microsec_len, "%llu",
-           (long long unsigned int)timestamp);
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(timestamp)) < 0) {
+    goto err;
+  }
 
   if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
     goto err;
@@ -323,10 +306,11 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
                   : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
 
-  if (yajl_gen_string(
-          g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
-                                   : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
-          new_state_len) != yajl_gen_status_ok)
+  if (yajl_gen_string(g,
+                      (u_char *)(state == 0
+                                     ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
+                                     : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
+                      new_state_len) != yajl_gen_status_ok)
     goto err;
 
   // oldState
@@ -339,10 +323,11 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
                       : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
 
-  if (yajl_gen_string(
-          g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
-                                       : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
-          old_state_len) != yajl_gen_status_ok)
+  if (yajl_gen_string(g,
+                      (u_char *)(old_state == 0
+                                     ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
+                                     : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
+                      old_state_len) != yajl_gen_status_ok)
     goto err;
 
   // stateChangeFieldsVersion
@@ -367,14 +352,13 @@ static int gen_message_payload(int state, int old_state, const char *interface,
       yajl_gen_status_ok)
     goto err;
 
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+  // close state change and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
     goto err;
 
   // *** END state change fields ***
 
-  if (yajl_gen_map_close(g) != yajl_gen_status_ok)
-    goto err;
-
   if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
     goto err;
 
@@ -417,7 +401,7 @@ static interface_list_t *add_interface(const char *interface, int status,
   il->interface = interface2;
   il->status = status;
   il->prev_status = prev_status;
-  il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+  il->timestamp = cdtime();
   il->sent = 0;
   il->next = interface_list_head;
   interface_list_head = il;
@@ -475,12 +459,10 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
       }
     }
 
-    uint32_t prev_status;
-
-    prev_status = il->status;
+    uint32_t prev_status = il->status;
     il->status =
         ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
-    il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
+    il->timestamp = cdtime();
 
     // If the new status is different than the previous status,
     // store the previous status and set sent to zero, and set the
@@ -488,11 +470,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
     if (il->status != prev_status) {
       il->prev_status = prev_status;
       il->sent = 0;
-      unsent_statuses = 1;
+      statuses_to_send = 1;
     }
 
     DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
-          il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
+          (unsigned long long)il->timestamp, dev,
+          ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
 
     // no need to loop again, we found the interface name attr
     // (otherwise the first if-statement in the loop would
@@ -506,31 +489,19 @@ static int connectivity_link_state(struct nlmsghdr *msg) {
 }
 
 static int msg_handler(struct nlmsghdr *msg) {
-  switch (msg->nlmsg_type) {
-  case RTM_NEWADDR:
-  case RTM_DELADDR:
-  case RTM_NEWROUTE:
-  case RTM_DELROUTE:
-  case RTM_DELLINK:
-    // Not of interest in current version
-    break;
-  case RTM_NEWLINK:
-    connectivity_link_state(msg);
-    break;
-  default:
-    ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d",
-          msg->nlmsg_type);
-    break;
+  // We are only interested in RTM_NEWLINK messages
+  if (msg->nlmsg_type != RTM_NEWLINK) {
+    return 0;
   }
-  return 0;
+  return connectivity_link_state(msg);
 }
 
-static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
+static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
   int ret = 0;
   int recv_flags = MSG_DONTWAIT;
 
-  if (nl == -1)
-    return ret;
+  if (nl_sock == -1 || msg_handler == NULL)
+    return EINVAL;
 
   while (42) {
     pthread_mutex_lock(&connectivity_threads_lock);
@@ -543,7 +514,7 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
     pthread_mutex_unlock(&connectivity_threads_lock);
 
     char buf[4096];
-    int status = recv(nl, buf, sizeof(buf), recv_flags);
+    int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
 
     if (status < 0) {
 
@@ -558,10 +529,9 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
         continue;
       }
 
-      if (errno == EINTR)
-      {
-        // Interrupt, so just return
-        return 0;
+      if (errno == EINTR) {
+        // Interrupt, so just continue and try again
+        continue;
       }
 
       /* Anything else is an error */
@@ -587,7 +557,9 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
 
       /* Message is some kind of error */
       if (h->nlmsg_type == NLMSG_ERROR) {
-        ERROR("connectivity plugin: read_event: Message is an error");
+        struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
+        ERROR("connectivity plugin: read_event: Message is an error: %d",
+              l_err->error);
         return -1; // Error
       }
 
@@ -609,34 +581,74 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) {
   return ret;
 }
 
+static void connectivity_dispatch_notification(const char *interface,
+                                               gauge_t value, gauge_t old_value,
+                                               cdtime_t timestamp) {
+
+  notification_t n = {
+      .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
+      .time = cdtime(),
+      .plugin = "connectivity",
+      .type = "gauge",
+      .type_instance = "interface_status",
+  };
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+
+  char *buf = NULL;
+
+  gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+  if (status < 0) {
+    sfree(buf);
+    ERROR("connectivity plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
+    return;
+  }
+
+  DEBUG("connectivity plugin: notification VES metadata: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("connectivity plugin: dispatching state %d for interface %s",
+        (int)value, interface);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // strdup'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
 // NOTE: Caller MUST hold connectivity_data_lock when calling this function
 static void send_interface_status() {
   for (interface_list_t *il = interface_list_head; il != NULL;
        il = il->next) /* {{{ */
   {
-    uint32_t status;
-    uint32_t prev_status;
-    uint32_t sent;
-
-    status = il->status;
-    prev_status = il->prev_status;
-    sent = il->sent;
+    uint32_t status = il->status;
+    uint32_t prev_status = il->prev_status;
+    uint32_t sent = il->sent;
 
     if (status != prev_status && sent == 0) {
-      connectivity_dispatch_notification(il->interface, "gauge", status,
-                                         prev_status, il->timestamp);
+      connectivity_dispatch_notification(il->interface, status, prev_status,
+                                         il->timestamp);
       il->sent = 1;
     }
   } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
 
-  unsent_statuses = 0;
+  statuses_to_send = 0;
 }
 
 static void read_interface_status() /* {{{ */
 {
   pthread_mutex_lock(&connectivity_data_lock);
 
-  if (!unsent_statuses)
+  // If we don't have any interface statuses to dispatch,
+  // then we wait until signalled
+  if (!statuses_to_send)
     pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
 
   send_interface_status();
@@ -651,7 +663,7 @@ static void *connectivity_netlink_thread(void *arg) /* {{{ */
   while (connectivity_netlink_thread_loop > 0) {
     pthread_mutex_unlock(&connectivity_threads_lock);
 
-    int status = read_event(nl_sock, msg_handler);
+    int status = read_event(msg_handler);
 
     pthread_mutex_lock(&connectivity_threads_lock);
 
@@ -685,7 +697,9 @@ static void *connectivity_dequeue_thread(void *arg) /* {{{ */
 
 static int nl_connect() {
   struct sockaddr_nl sa_nl = {
-      .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
+      .nl_family = AF_NETLINK,
+      .nl_groups = RTMGRP_LINK,
+      .nl_pid = getpid(),
   };
 
   nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
@@ -698,6 +712,7 @@ static int nl_connect() {
   if (rc == -1) {
     ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
     close(nl_sock);
+    nl_sock = -1;
     return -1;
   }
 
@@ -740,8 +755,9 @@ static int start_netlink_thread(void) /* {{{ */
     if (status2 != 0) {
       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
             status2, STRERRNO);
-    } else
-      nl_sock = -1;
+    }
+
+    nl_sock = -1;
 
     return -1;
   }
@@ -798,8 +814,9 @@ static int stop_netlink_thread(int shutdown) /* {{{ */
     if (socket_status != 0) {
       ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
             socket_status, STRERRNO);
-    } else
-      nl_sock = -1;
+    }
+
+    nl_sock = -1;
   } else
     socket_status = 0;
 
@@ -868,7 +885,7 @@ static int stop_netlink_thread(int shutdown) /* {{{ */
     return thread_status;
 }
 
-static int stop_dequeue_thread(int shutdown) /* {{{ */
+static int stop_dequeue_thread() /* {{{ */
 {
   pthread_mutex_lock(&connectivity_threads_lock);
 
@@ -885,30 +902,18 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */
   // on such that they'll see the threads termination status
   pthread_cond_broadcast(&connectivity_cond);
 
-  int status;
-
-  if (shutdown == 1) {
-    // Calling pthread_cancel here in
-    // the case of a shutdown just assures that the thread is
-    // gone and that the process has been fully terminated.
+  // Calling pthread_cancel here just assures that the thread is
+  // gone and that the process has been fully terminated.
 
-    DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
+  DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
 
-    status = pthread_cancel(connectivity_dequeue_thread_id);
+  int status = pthread_cancel(connectivity_dequeue_thread_id);
 
-    if (status != 0 && status != ESRCH) {
-      ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
-      status = -1;
-    } else
-      status = 0;
-  } else {
-    status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL);
-    if (status != 0 && status != ESRCH) {
-      ERROR("connectivity plugin: Stopping dequeue thread failed.");
-      status = -1;
-    } else
-      status = 0;
-  }
+  if (status != 0 && status != ESRCH) {
+    ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
+    status = -1;
+  } else
+    status = 0;
 
   pthread_mutex_lock(&connectivity_threads_lock);
   memset(&connectivity_dequeue_thread_id, 0,
@@ -920,10 +925,10 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */
   return status;
 } /* }}} int stop_dequeue_thread */
 
-static int stop_threads(int shutdown) /* {{{ */
+static int stop_threads() /* {{{ */
 {
-  int status = stop_netlink_thread(shutdown);
-  int status2 = stop_dequeue_thread(shutdown);
+  int status = stop_netlink_thread(1);
+  int status2 = stop_dequeue_thread();
 
   if (status != 0)
     return status;
@@ -945,6 +950,9 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
 {
   if (ignorelist == NULL) {
     ignorelist = ignorelist_create(/* invert = */ 1);
+
+    if (ignorelist == NULL)
+      return -1;
   }
 
   if (strcasecmp(key, "Interface") == 0) {
@@ -962,57 +970,6 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */
   return 0;
 } /* }}} int connectivity_config */
 
-static void
-connectivity_dispatch_notification(const char *interface, const char *type,
-                                   gauge_t value, gauge_t old_value,
-                                   long long unsigned int timestamp) {
-
-  notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
-                      cdtime(),
-                      "",
-                      "",
-                      "connectivity",
-                      "",
-                      "",
-                      "",
-                      NULL};
-
-  sstrncpy(n.host, hostname_g, sizeof(n.host));
-  sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
-  sstrncpy(n.type, "gauge", sizeof(n.type));
-  sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
-
-  char *buf = NULL;
-
-  gen_message_payload(value, old_value, interface, timestamp, &buf);
-
-  notification_meta_t *m = calloc(1, sizeof(*m));
-
-  if (m == NULL) {
-    sfree(buf);
-    ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO);
-    return;
-  }
-
-  sstrncpy(m->name, "ves", sizeof(m->name));
-  m->nm_value.nm_string = sstrdup(buf);
-  m->type = NM_TYPE_STRING;
-  n.meta = m;
-
-  DEBUG("connectivity plugin: notification message: %s",
-        n.meta->nm_value.nm_string);
-
-  DEBUG("connectivity plugin: dispatching state %d for interface %s",
-        (int)value, interface);
-
-  plugin_dispatch_notification(&n);
-  plugin_notification_meta_free(n.meta);
-
-  // strdup'd in gen_message_payload
-  if (buf != NULL)
-    sfree(buf);
-}
-
 static int connectivity_read(void) /* {{{ */
 {
   pthread_mutex_lock(&connectivity_threads_lock);
@@ -1047,7 +1004,7 @@ static int connectivity_shutdown(void) /* {{{ */
 {
   DEBUG("connectivity plugin: Shutting down thread.");
 
-  int status = stop_threads(1);
+  int status = stop_threads();
 
   interface_list_t *il = interface_list_head;
   while (il != NULL) {