*
* Authors:
* Red Hat NFVPE
- * Andrew Bays <abays at redhat.com>
+ * Andrew Bays <abays at redhat.com>
* Aneesh Puttur <aputtur at redhat.com>
**/
#include "common.h"
#include "plugin.h"
#include "utils_complain.h"
+#include "utils_ignorelist.h"
#include <asm/types.h>
#include <errno.h>
/*
* Private data types
*/
-struct interfacelist_s {
+
+struct interface_list_s {
char *interface;
uint32_t status;
uint32_t prev_status;
uint32_t sent;
- long long unsigned int timestamp;
+ cdtime_t timestamp;
- struct interfacelist_s *next;
+ struct interface_list_s *next;
};
-typedef struct interfacelist_s interfacelist_t;
+typedef struct interface_list_s interface_list_t;
/*
* Private variables
*/
-static interfacelist_t *interfacelist_head = NULL;
-static int connectivity_thread_loop = 0;
-static int connectivity_thread_error = 0;
-static pthread_t connectivity_thread_id;
-static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER;
+static ignorelist_t *ignorelist = NULL;
+
+static interface_list_t *interface_list_head = NULL;
+static int monitor_all_interfaces = 1;
+
+static int connectivity_netlink_thread_loop = 0;
+static int connectivity_netlink_thread_error = 0;
+static pthread_t connectivity_netlink_thread_id;
+static int connectivity_dequeue_thread_loop = 0;
+static pthread_t connectivity_dequeue_thread_id;
+static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
-static struct mnl_socket *sock;
+static int nl_sock = -1;
static int event_id = 0;
+static int statuses_to_send = 0;
-static const char *config_keys[] = {"Interface"};
+static const char *config_keys[] = {"Interface", "IgnoreSelected"};
static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
/*
*/
static int gen_message_payload(int state, int old_state, const char *interface,
- long long unsigned int timestamp, char **buf) {
+ cdtime_t timestamp, char **buf) {
const unsigned char *buf2;
yajl_gen g;
+ char json_str[DATA_MAX_NAME_LEN];
#if !defined(HAVE_YAJL_V2)
- yajl_gen_config conf = {};
-
- conf.beautify = 0;
+ yajl_gen_config conf = {0};
#endif
#if HAVE_YAJL_V2
goto err;
event_id = event_id + 1;
- int event_id_len = sizeof(char) * sizeof(int) * 4 + 1;
- char *event_id_str = malloc(event_id_len);
- snprintf(event_id_str, event_id_len, "%d", event_id);
-
- if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) !=
- yajl_gen_status_ok) {
- sfree(event_id_str);
+ if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
goto err;
}
- sfree(event_id_str);
+ if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+ goto err;
+ }
// eventName
if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
yajl_gen_status_ok)
goto err;
- int event_name_len = 0;
- event_name_len = event_name_len + strlen(interface); // interface name
- event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up"
- event_name_len =
- event_name_len + 12; // "interface", 2 spaces and null-terminator
- char *event_name_str = malloc(event_name_len);
- memset(event_name_str, '\0', event_name_len);
- snprintf(event_name_str, event_name_len, "interface %s %s", interface,
- (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
- : CONNECTIVITY_EVENT_NAME_UP_VALUE));
+ if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
+ (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+ : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
+ goto err;
+ }
- if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) !=
+ if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
yajl_gen_status_ok) {
- sfree(event_name_str);
goto err;
}
- sfree(event_name_str);
-
// lastEpochMicrosec
if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
yajl_gen_status_ok)
goto err;
- int last_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- char *last_epoch_microsec_str = malloc(last_epoch_microsec_len);
- snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu",
- (long long unsigned int)CDTIME_T_TO_US(cdtime()));
-
- if (yajl_gen_number(g, last_epoch_microsec_str,
- strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) {
- sfree(last_epoch_microsec_str);
+ if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+ CDTIME_T_TO_US(cdtime())) < 0) {
goto err;
}
- sfree(last_epoch_microsec_str);
+ if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+ goto err;
+ }
// priority
if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
yajl_gen_status_ok)
goto err;
- int start_epoch_microsec_len =
- sizeof(char) * sizeof(long long unsigned int) * 4 + 1;
- char *start_epoch_microsec_str = malloc(start_epoch_microsec_len);
- snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu",
- (long long unsigned int)timestamp);
-
- if (yajl_gen_number(g, start_epoch_microsec_str,
- strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) {
- sfree(start_epoch_microsec_str);
+ if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+ CDTIME_T_TO_US(timestamp)) < 0) {
goto err;
}
- sfree(start_epoch_microsec_str);
+ if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+ goto err;
+ }
// version
if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
yajl_gen_status_ok)
goto err;
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
+ // close state change and header fields
+ if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+ yajl_gen_map_close(g) != yajl_gen_status_ok)
goto err;
// *** END state change fields ***
- if (yajl_gen_map_close(g) != yajl_gen_status_ok)
- goto err;
-
if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
goto err;
- *buf = malloc(strlen((char *)buf2) + 1);
+ *buf = strdup((char *)buf2);
- sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1);
+ if (*buf == NULL) {
+ ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
+ STRERRNO);
+ goto err;
+ }
yajl_gen_free(g);
return -1;
}
-static int connectivity_link_state(struct nlmsghdr *msg) {
- int retval = 0;
- struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
- struct nlattr *attr;
- const char *dev = NULL;
+static interface_list_t *add_interface(const char *interface, int status,
+ int prev_status) {
+ interface_list_t *il = calloc(1, sizeof(*il));
+
+ if (il == NULL) {
+ ERROR("connectivity plugin: calloc failed during add_interface: %s",
+ STRERRNO);
+ return NULL;
+ }
+
+ char *interface2 = strdup(interface);
+ if (interface2 == NULL) {
+ sfree(il);
+ ERROR("connectivity plugin: strdup failed during add_interface: %s",
+ STRERRNO);
+ return NULL;
+ }
- pthread_mutex_lock(&connectivity_lock);
+ il->interface = interface2;
+ il->status = status;
+ il->prev_status = prev_status;
+ il->timestamp = cdtime();
+ il->sent = 0;
+ il->next = interface_list_head;
+ interface_list_head = il;
- interfacelist_t *il;
+ DEBUG("connectivity plugin: added interface %s", interface2);
+
+ return il;
+}
+
+static int connectivity_link_state(struct nlmsghdr *msg) {
+ pthread_mutex_lock(&connectivity_data_lock);
+
+ struct nlattr *attr;
+ struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
/* Scan attribute list for device name. */
mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
"mnl_attr_validate "
"failed.");
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_data_lock);
return MNL_CB_ERROR;
}
- dev = mnl_attr_get_str(attr);
+ const char *dev = mnl_attr_get_str(attr);
+
+ // Check the list of interfaces we should monitor, if we've chosen
+ // a subset. If we don't care about this one, abort.
+ if (ignorelist_match(ignorelist, dev) != 0) {
+ DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
+ "interface: %s",
+ dev);
+ break;
+ }
+
+ interface_list_t *il = NULL;
- for (il = interfacelist_head; il != NULL; il = il->next)
+ for (il = interface_list_head; il != NULL; il = il->next)
if (strcmp(dev, il->interface) == 0)
break;
if (il == NULL) {
- DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
- "interface: %s",
- dev);
- } else {
- uint32_t prev_status;
-
- prev_status = il->status;
- il->status =
- ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
- il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
-
- // If the new status is different than the previous status,
- // store the previous status and set sent to zero
- if (il->status != prev_status) {
- il->prev_status = prev_status;
- il->sent = 0;
+ // We haven't encountered this interface yet, so add it to the linked list
+ il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
+
+ if (il == NULL) {
+ ERROR("connectivity plugin: unable to add interface %s during "
+ "connectivity_link_state",
+ dev);
+ return MNL_CB_ERROR;
}
+ }
- DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
- il->timestamp, dev,
- ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
+ uint32_t prev_status = il->status;
+ il->status =
+ ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
+ il->timestamp = cdtime();
+
+ // If the new status is different than the previous status,
+ // store the previous status and set sent to zero, and set the
+ // global flag to indicate there are statuses to dispatch
+ if (il->status != prev_status) {
+ il->prev_status = prev_status;
+ il->sent = 0;
+ statuses_to_send = 1;
}
+ DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
+ il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
+
// no need to loop again, we found the interface name attr
// (otherwise the first if-statement in the loop would
// have moved us on with 'continue')
break;
}
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_unlock(&connectivity_data_lock);
- return retval;
+ return 0;
}
static int msg_handler(struct nlmsghdr *msg) {
- switch (msg->nlmsg_type) {
- case RTM_NEWADDR:
- case RTM_DELADDR:
- case RTM_NEWROUTE:
- case RTM_DELROUTE:
- case RTM_DELLINK:
- // Not of interest in current version
- break;
- case RTM_NEWLINK:
- connectivity_link_state(msg);
- break;
- default:
- ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n",
- msg->nlmsg_type);
- break;
+ // We are only interested in RTM_NEWLINK messages
+ if (msg->nlmsg_type != RTM_NEWLINK) {
+ return 0;
}
- return 0;
+ return connectivity_link_state(msg);
}
-static int read_event(struct mnl_socket *nl,
- int (*msg_handler)(struct nlmsghdr *)) {
- int status;
+static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
int ret = 0;
- char buf[4096];
- struct nlmsghdr *h;
+ int recv_flags = MSG_DONTWAIT;
- if (nl == NULL)
- return ret;
+ if (nl_sock == -1 || msg_handler == NULL)
+ return EINVAL;
- status = mnl_socket_recvfrom(nl, buf, sizeof(buf));
+ while (42) {
+ pthread_mutex_lock(&connectivity_threads_lock);
- if (status < 0) {
- /* Socket non-blocking so bail out once we have read everything */
- if (errno == EWOULDBLOCK || errno == EAGAIN)
+ if (connectivity_netlink_thread_loop <= 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
return ret;
+ }
- /* Anything else is an error */
- ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n",
- status);
- return status;
- }
+ pthread_mutex_unlock(&connectivity_threads_lock);
- if (status == 0) {
- DEBUG("connectivity plugin: read_event: EOF\n");
- }
+ char buf[4096];
+ int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
- /* We need to handle more than one message per 'recvmsg' */
- for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status);
- h = NLMSG_NEXT(h, status)) {
- /* Finish reading */
- if (h->nlmsg_type == NLMSG_DONE)
- return ret;
+ if (status < 0) {
+
+ // If there were no more messages to drain from the socket,
+ // then signal the dequeue thread and allow it to dispatch
+ // any saved interface status changes. Then continue, but
+ // block and wait for new messages
+ if (errno == EWOULDBLOCK || errno == EAGAIN) {
+ pthread_cond_signal(&connectivity_cond);
+
+ recv_flags = 0;
+ continue;
+ }
+
+ if (errno == EINTR) {
+ // Interrupt, so just continue and try again
+ continue;
+ }
+
+ /* Anything else is an error */
+ ERROR("connectivity plugin: read_event: Error recv: %d", status);
+ return status;
+ }
+
+ // Message received successfully, so we'll stop blocking on the
+ // receive call for now (until we get a "would block" error, which
+ // will be handled above)
+ recv_flags = MSG_DONTWAIT;
- /* Message is some kind of error */
- if (h->nlmsg_type == NLMSG_ERROR) {
- ERROR("connectivity plugin: read_event: Message is an error\n");
- return -1; // Error
+ if (status == 0) {
+ DEBUG("connectivity plugin: read_event: EOF");
}
- /* Call message handler */
- if (msg_handler) {
- ret = (*msg_handler)(h);
- if (ret < 0) {
- ERROR("connectivity plugin: read_event: Message handler error %d\n",
- ret);
+ /* We need to handle more than one message per 'recvmsg' */
+ for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
+ NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
+ /* Finish reading */
+ if (h->nlmsg_type == NLMSG_DONE)
return ret;
+
+ /* Message is some kind of error */
+ if (h->nlmsg_type == NLMSG_ERROR) {
+ struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
+ ERROR("connectivity plugin: read_event: Message is an error: %d",
+ l_err->error);
+ return -1; // Error
+ }
+
+ /* Call message handler */
+ if (msg_handler) {
+ ret = (*msg_handler)(h);
+ if (ret < 0) {
+ ERROR("connectivity plugin: read_event: Message handler error %d",
+ ret);
+ return ret;
+ }
+ } else {
+ ERROR("connectivity plugin: read_event: Error NULL message handler");
+ return -1;
}
- } else {
- ERROR("connectivity plugin: read_event: Error NULL message handler\n");
- return -1;
}
}
return ret;
}
-static void *connectivity_thread(void *arg) /* {{{ */
+static void connectivity_dispatch_notification(const char *interface,
+ gauge_t value, gauge_t old_value,
+ cdtime_t timestamp) {
+
+ notification_t n = {
+ .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
+ .time = cdtime(),
+ .plugin = "connectivity",
+ .type = "gauge",
+ .type_instance = "interface_status",
+ };
+
+ sstrncpy(n.host, hostname_g, sizeof(n.host));
+ sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+
+ char *buf = NULL;
+
+ gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+ int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+ if (status < 0) {
+ sfree(buf);
+ ERROR("connectivity plugin: unable to set notification VES metadata: %s",
+ STRERRNO);
+ return;
+ }
+
+ DEBUG("connectivity plugin: notification VES metadata: %s",
+ n.meta->nm_value.nm_string);
+
+ DEBUG("connectivity plugin: dispatching state %d for interface %s",
+ (int)value, interface);
+
+ plugin_dispatch_notification(&n);
+ plugin_notification_meta_free(n.meta);
+
+ // strdup'd in gen_message_payload
+ if (buf != NULL)
+ sfree(buf);
+}
+
+// NOTE: Caller MUST hold connectivity_data_lock when calling this function
+static void send_interface_status() {
+ for (interface_list_t *il = interface_list_head; il != NULL;
+ il = il->next) /* {{{ */
+ {
+ uint32_t status = il->status;
+ uint32_t prev_status = il->prev_status;
+ uint32_t sent = il->sent;
+
+ if (status != prev_status && sent == 0) {
+ connectivity_dispatch_notification(il->interface, status, prev_status,
+ il->timestamp);
+ il->sent = 1;
+ }
+ } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+
+ statuses_to_send = 0;
+}
+
+static void read_interface_status() /* {{{ */
{
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_data_lock);
- while (connectivity_thread_loop > 0) {
- int status;
+ // If we don't have any interface statuses to dispatch,
+ // then we wait until signalled
+ if (!statuses_to_send)
+ pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
- pthread_mutex_unlock(&connectivity_lock);
+ send_interface_status();
+
+ pthread_mutex_unlock(&connectivity_data_lock);
+} /* }}} int *read_interface_status */
+
+static void *connectivity_netlink_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_threads_lock);
- status = read_event(sock, msg_handler);
+ while (connectivity_netlink_thread_loop > 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
- pthread_mutex_lock(&connectivity_lock);
+ int status = read_event(msg_handler);
+
+ pthread_mutex_lock(&connectivity_threads_lock);
if (status < 0) {
- connectivity_thread_error = 1;
+ connectivity_netlink_thread_error = 1;
break;
}
+ } /* while (connectivity_netlink_thread_loop > 0) */
- if (connectivity_thread_loop <= 0)
- break;
- } /* while (connectivity_thread_loop > 0) */
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ return (void *)0;
+} /* }}} void *connectivity_netlink_thread */
+
+static void *connectivity_dequeue_thread(void *arg) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_threads_lock);
+
+ while (connectivity_dequeue_thread_loop > 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ read_interface_status();
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
+ } /* while (connectivity_dequeue_thread_loop > 0) */
+
+ pthread_mutex_unlock(&connectivity_threads_lock);
return ((void *)0);
-} /* }}} void *connectivity_thread */
+} /* }}} void *connectivity_dequeue_thread */
-static int start_thread(void) /* {{{ */
-{
- int status;
+static int nl_connect() {
+ struct sockaddr_nl sa_nl = {
+ .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(),
+ };
- pthread_mutex_lock(&connectivity_lock);
+ nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
+ if (nl_sock == -1) {
+ ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
+ return -1;
+ }
- if (connectivity_thread_loop != 0) {
- pthread_mutex_unlock(&connectivity_lock);
- return (0);
+ int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+ if (rc == -1) {
+ ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
+ close(nl_sock);
+ nl_sock = -1;
+ return -1;
}
- connectivity_thread_loop = 1;
- connectivity_thread_error = 0;
+ return 0;
+}
- if (sock == NULL) {
- sock = mnl_socket_open(NETLINK_ROUTE);
- if (sock == NULL) {
- ERROR(
- "connectivity plugin: connectivity_thread: mnl_socket_open failed.");
- pthread_mutex_unlock(&connectivity_lock);
- return (-1);
- }
+static int start_netlink_thread(void) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_threads_lock);
+
+ if (connectivity_netlink_thread_loop != 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ return 0;
+ }
+
+ connectivity_netlink_thread_loop = 1;
+ connectivity_netlink_thread_error = 0;
+
+ int status;
+
+ if (nl_sock == -1) {
+ status = nl_connect();
- if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) {
- ERROR(
- "connectivity plugin: connectivity_thread: mnl_socket_bind failed.");
- pthread_mutex_unlock(&connectivity_lock);
- return (1);
+ if (status != 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ return status;
}
}
- status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL,
- connectivity_thread,
+ status = plugin_thread_create(&connectivity_netlink_thread_id,
+ /* attr = */ NULL, connectivity_netlink_thread,
/* arg = */ (void *)0, "connectivity");
if (status != 0) {
- connectivity_thread_loop = 0;
+ connectivity_netlink_thread_loop = 0;
ERROR("connectivity plugin: Starting thread failed.");
- pthread_mutex_unlock(&connectivity_lock);
- mnl_socket_close(sock);
- return (-1);
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ int status2 = close(nl_sock);
+
+ if (status2 != 0) {
+ ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+ status2, STRERRNO);
+ }
+
+ nl_sock = -1;
+
+ return -1;
+ }
+
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ return status;
+}
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+ pthread_mutex_lock(&connectivity_threads_lock);
+
+ if (connectivity_dequeue_thread_loop != 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ return 0;
+ }
+
+ connectivity_dequeue_thread_loop = 1;
+
+ int status =
+ plugin_thread_create(&connectivity_dequeue_thread_id,
+ /* attr = */ NULL, connectivity_dequeue_thread,
+ /* arg = */ (void *)0, "connectivity");
+ if (status != 0) {
+ connectivity_dequeue_thread_loop = 0;
+ ERROR("connectivity plugin: Starting dequeue thread failed.");
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ return -1;
}
- pthread_mutex_unlock(&connectivity_lock);
- return (0);
-} /* }}} int start_thread */
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ return status;
+} /* }}} int start_dequeue_thread */
-static int stop_thread(int shutdown) /* {{{ */
+static int start_threads(void) /* {{{ */
{
- int status;
+ int status = start_netlink_thread();
+ int status2 = start_dequeue_thread();
+
+ if (status != 0)
+ return status;
+ else
+ return status2;
+} /* }}} int start_threads */
- if (sock != NULL)
- mnl_socket_close(sock);
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+ int socket_status;
+
+ if (nl_sock != -1) {
+ socket_status = close(nl_sock);
+ if (socket_status != 0) {
+ ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+ socket_status, STRERRNO);
+ }
+
+ nl_sock = -1;
+ } else
+ socket_status = 0;
- pthread_mutex_lock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
- if (connectivity_thread_loop == 0) {
- pthread_mutex_unlock(&connectivity_lock);
- return (-1);
+ if (connectivity_netlink_thread_loop == 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ // Thread has already been terminated, nothing more to attempt
+ return socket_status;
}
- connectivity_thread_loop = 0;
+ // Set thread termination status
+ connectivity_netlink_thread_loop = 0;
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ // Let threads waiting on access to the interface list know to move
+ // on such that they'll see the thread's termination status
pthread_cond_broadcast(&connectivity_cond);
- pthread_mutex_unlock(&connectivity_lock);
+
+ int thread_status;
if (shutdown == 1) {
// Since the thread is blocking, calling pthread_join
// doesn't actually succeed in stopping it. It will stick around
// until a NETLINK message is received on the socket (at which
- // it will realize that "connectivity_thread_loop" is 0 and will
+ // it will realize that "connectivity_netlink_thread_loop" is 0 and will
// break out of the read loop and be allowed to die). This is
// fine when the process isn't supposed to be exiting, but in
// the case of a process shutdown, we don't want to have an
// the case of a shutdown is just assures that the thread is
// gone and that the process has been fully terminated.
- DEBUG("connectivity plugin: Canceling thread for process shutdown");
+ DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
- status = pthread_cancel(connectivity_thread_id);
+ thread_status = pthread_cancel(connectivity_netlink_thread_id);
- if (status != 0) {
- ERROR("connectivity plugin: Unable to cancel thread: %d", status);
- status = -1;
- }
+ if (thread_status != 0 && thread_status != ESRCH) {
+ ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
+ thread_status);
+ thread_status = -1;
+ } else
+ thread_status = 0;
} else {
- status = pthread_join(connectivity_thread_id, /* return = */ NULL);
- if (status != 0) {
- ERROR("connectivity plugin: Stopping thread failed.");
- status = -1;
- }
+ thread_status =
+ pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+ if (thread_status != 0 && thread_status != ESRCH) {
+ ERROR("connectivity plugin: Stopping netlink thread failed: %d",
+ thread_status);
+ thread_status = -1;
+ } else
+ thread_status = 0;
}
- pthread_mutex_lock(&connectivity_lock);
- memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id));
- connectivity_thread_error = 0;
- pthread_mutex_unlock(&connectivity_lock);
+ pthread_mutex_lock(&connectivity_threads_lock);
+ memset(&connectivity_netlink_thread_id, 0,
+ sizeof(connectivity_netlink_thread_id));
+ connectivity_netlink_thread_error = 0;
+ pthread_mutex_unlock(&connectivity_threads_lock);
- DEBUG("connectivity plugin: Finished requesting stop of thread");
+ DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
- return (status);
-} /* }}} int stop_thread */
+ if (socket_status != 0)
+ return socket_status;
+ else
+ return thread_status;
+}
-static int connectivity_init(void) /* {{{ */
+static int stop_dequeue_thread() /* {{{ */
{
- if (interfacelist_head == NULL) {
- NOTICE("connectivity plugin: No interfaces have been configured.");
- return (-1);
- }
-
- return (start_thread());
-} /* }}} int connectivity_init */
+ pthread_mutex_lock(&connectivity_threads_lock);
-static int connectivity_config(const char *key, const char *value) /* {{{ */
-{
- if (strcasecmp(key, "Interface") == 0) {
- interfacelist_t *il;
- char *interface;
+ if (connectivity_dequeue_thread_loop == 0) {
+ pthread_mutex_unlock(&connectivity_threads_lock);
+ return -1;
+ }
- il = malloc(sizeof(*il));
- if (il == NULL) {
- char errbuf[1024];
- ERROR("connectivity plugin: malloc failed during connectivity_config: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- return (1);
- }
+ // Set thread termination status
+ connectivity_dequeue_thread_loop = 0;
+ pthread_mutex_unlock(&connectivity_threads_lock);
- interface = strdup(value);
- if (interface == NULL) {
- char errbuf[1024];
- sfree(il);
- ERROR("connectivity plugin: strdup failed connectivity_config: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- return (1);
- }
+ // Let threads waiting on access to the interface list know to move
+ // on such that they'll see the threads termination status
+ pthread_cond_broadcast(&connectivity_cond);
- il->interface = interface;
- il->status = LINK_STATE_UNKNOWN;
- il->prev_status = LINK_STATE_UNKNOWN;
- il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime());
- il->sent = 0;
- il->next = interfacelist_head;
- interfacelist_head = il;
+ // Calling pthread_cancel here just assures that the thread is
+ // gone and that the process has been fully terminated.
- } else {
- return (-1);
- }
+ DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
- return (0);
-} /* }}} int connectivity_config */
+ int status = pthread_cancel(connectivity_dequeue_thread_id);
-static void connectivity_dispatch_notification(
- const char *interface, const char *type, /* {{{ */
- gauge_t value, gauge_t old_value, long long unsigned int timestamp) {
- char *buf = NULL;
- notification_t n = {
- NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL};
+ if (status != 0 && status != ESRCH) {
+ ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
+ status = -1;
+ } else
+ status = 0;
- if (value == LINK_STATE_UP)
- n.severity = NOTIF_OKAY;
+ pthread_mutex_lock(&connectivity_threads_lock);
+ memset(&connectivity_dequeue_thread_id, 0,
+ sizeof(connectivity_dequeue_thread_id));
+ pthread_mutex_unlock(&connectivity_threads_lock);
- char hostname[1024];
- gethostname(hostname, sizeof(hostname));
+ DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
- sstrncpy(n.host, hostname, sizeof(n.host));
- sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
- sstrncpy(n.type, "gauge", sizeof(n.type));
- sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance));
+ return status;
+} /* }}} int stop_dequeue_thread */
- gen_message_payload(value, old_value, interface, timestamp, &buf);
+static int stop_threads() /* {{{ */
+{
+ int status = stop_netlink_thread(1);
+ int status2 = stop_dequeue_thread();
- notification_meta_t *m = calloc(1, sizeof(*m));
+ if (status != 0)
+ return status;
+ else
+ return status2;
+} /* }}} int stop_threads */
- if (m == NULL) {
- char errbuf[1024];
- sfree(buf);
- ERROR("connectivity plugin: unable to allocate metadata: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
- return;
+static int connectivity_init(void) /* {{{ */
+{
+ if (monitor_all_interfaces) {
+ NOTICE("connectivity plugin: No interfaces have been selected, so all will "
+ "be monitored");
}
- sstrncpy(m->name, "ves", sizeof(m->name));
- m->nm_value.nm_string = sstrdup(buf);
- m->type = NM_TYPE_STRING;
- n.meta = m;
+ return start_threads();
+} /* }}} int connectivity_init */
- DEBUG("connectivity plugin: notification message: %s",
- n.meta->nm_value.nm_string);
+static int connectivity_config(const char *key, const char *value) /* {{{ */
+{
+ if (ignorelist == NULL) {
+ ignorelist = ignorelist_create(/* invert = */ 1);
- DEBUG("connectivity plugin: dispatching state %d for interface %s",
- (int)value, interface);
+ if (ignorelist == NULL)
+ return -1;
+ }
- plugin_dispatch_notification(&n);
- plugin_notification_meta_free(n.meta);
+ if (strcasecmp(key, "Interface") == 0) {
+ ignorelist_add(ignorelist, value);
+ monitor_all_interfaces = 0;
+ } else if (strcasecmp(key, "IgnoreSelected") == 0) {
+ int invert = 1;
+ if (IS_TRUE(value))
+ invert = 0;
+ ignorelist_set_invert(ignorelist, invert);
+ } else {
+ return -1;
+ }
- // malloc'd in gen_message_payload
- if (buf != NULL)
- sfree(buf);
-}
+ return 0;
+} /* }}} int connectivity_config */
static int connectivity_read(void) /* {{{ */
{
- if (connectivity_thread_error != 0) {
- ERROR("connectivity plugin: The interface thread had a problem. Restarting "
+ pthread_mutex_lock(&connectivity_threads_lock);
+
+ if (connectivity_netlink_thread_error != 0) {
+
+ pthread_mutex_unlock(&connectivity_threads_lock);
+
+ ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
"it.");
- stop_thread(0);
+ stop_netlink_thread(0);
- for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) {
+ for (interface_list_t *il = interface_list_head; il != NULL;
+ il = il->next) {
il->status = LINK_STATE_UNKNOWN;
il->prev_status = LINK_STATE_UNKNOWN;
il->sent = 0;
}
- start_thread();
-
- return (-1);
- } /* if (connectivity_thread_error != 0) */
-
- for (interfacelist_t *il = interfacelist_head; il != NULL;
- il = il->next) /* {{{ */
- {
- uint32_t status;
- uint32_t prev_status;
- uint32_t sent;
-
- pthread_mutex_lock(&connectivity_lock);
-
- status = il->status;
- prev_status = il->prev_status;
- sent = il->sent;
+ start_netlink_thread();
- if (status != prev_status && sent == 0) {
- connectivity_dispatch_notification(il->interface, "gauge", status,
- prev_status, il->timestamp);
- il->sent = 1;
- }
+ return -1;
+ } /* if (connectivity_netlink_thread_error != 0) */
- pthread_mutex_unlock(&connectivity_lock);
- } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */
+ pthread_mutex_unlock(&connectivity_threads_lock);
- return (0);
+ return 0;
} /* }}} int connectivity_read */
static int connectivity_shutdown(void) /* {{{ */
{
- interfacelist_t *il;
-
DEBUG("connectivity plugin: Shutting down thread.");
- if (stop_thread(1) < 0)
- return (-1);
- il = interfacelist_head;
+ int status = stop_threads();
+
+ interface_list_t *il = interface_list_head;
while (il != NULL) {
- interfacelist_t *il_next;
+ interface_list_t *il_next;
il_next = il->next;
il = il_next;
}
- return (0);
+ ignorelist_free(ignorelist);
+
+ return status;
} /* }}} int connectivity_shutdown */
void module_register(void) {