X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fconnectivity.c;h=45b65aab5b09c1c5ba35110bbff618f6144de153;hp=5a27fe1f0e07ade68ce14c8b855bb9e37f27907a;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=20e8c46556ee1e2cb26a3d853c548f9da5031bf4 diff --git a/src/connectivity.c b/src/connectivity.c index 5a27fe1f..45b65aab 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -27,8 +27,9 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" #include @@ -45,52 +46,376 @@ #include #include +#include +#include +#if HAVE_YAJL_YAJL_VERSION_H +#include +#endif +#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1) +#define HAVE_YAJL_V2 1 +#endif + #define MYPROTO NETLINK_ROUTE +#define LINK_STATE_DOWN 0 +#define LINK_STATE_UP 1 +#define LINK_STATE_UNKNOWN 2 + +#define CONNECTIVITY_DOMAIN_FIELD "domain" +#define CONNECTIVITY_DOMAIN_VALUE "stateChange" +#define CONNECTIVITY_EVENT_ID_FIELD "eventId" +#define CONNECTIVITY_EVENT_NAME_FIELD "eventName" +#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down" +#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up" +#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec" +#define CONNECTIVITY_PRIORITY_FIELD "priority" +#define CONNECTIVITY_PRIORITY_VALUE "high" +#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName" +#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin" +#define CONNECTIVITY_SEQUENCE_FIELD "sequence" +#define CONNECTIVITY_SEQUENCE_VALUE "0" +#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName" +#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec" +#define CONNECTIVITY_VERSION_FIELD "version" +#define CONNECTIVITY_VERSION_VALUE "1.0" + +#define CONNECTIVITY_NEW_STATE_FIELD "newState" +#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService" +#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService" +#define CONNECTIVITY_OLD_STATE_FIELD "oldState" +#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService" +#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \ + "stateChangeFieldsVersion" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0" +#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface" + /* * Private data types */ -struct interfacelist_s { + +struct interface_list_s { char *interface; uint32_t status; uint32_t prev_status; uint32_t sent; - uint32_t sec; - uint32_t usec; + cdtime_t timestamp; - struct interfacelist_s *next; + struct interface_list_s *next; }; -typedef struct interfacelist_s interfacelist_t; +typedef struct interface_list_s interface_list_t; /* * Private variables */ -static interfacelist_t *interfacelist_head = NULL; -static int connectivity_thread_loop = 0; -static int connectivity_thread_error = 0; -static pthread_t connectivity_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static ignorelist_t *ignorelist = NULL; + +static interface_list_t *interface_list_head = NULL; +static int monitor_all_interfaces = 1; + +static int connectivity_netlink_thread_loop = 0; +static int connectivity_netlink_thread_error = 0; +static pthread_t connectivity_netlink_thread_id; +static int connectivity_dequeue_thread_loop = 0; +static pthread_t connectivity_dequeue_thread_id; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -static struct mnl_socket *sock; +static int nl_sock = -1; +static int event_id = 0; +static int statuses_to_send = 0; -static const char *config_keys[] = {"Interface"}; +static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* * Private functions */ -static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; - struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); - struct nlattr *attr; - const char *dev = NULL; +static int gen_message_payload(int state, int old_state, const char *interface, + cdtime_t timestamp, char **buf) { + const unsigned char *buf2; + yajl_gen g; + char json_str[DATA_MAX_NAME_LEN]; + +#if !defined(HAVE_YAJL_V2) + yajl_gen_config conf = {0}; +#endif + +#if HAVE_YAJL_V2 + size_t len; + g = yajl_gen_alloc(NULL); + yajl_gen_config(g, yajl_gen_beautify, 0); +#else + unsigned int len; + g = yajl_gen_alloc(&conf, NULL); +#endif + + yajl_gen_clear(g); + + // *** BEGIN common event header *** + + if (yajl_gen_map_open(g) != yajl_gen_status_ok) + goto err; + + // domain + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD, + strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE, + strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok) + goto err; + + // eventId + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD, + strlen(CONNECTIVITY_EVENT_ID_FIELD)) != + yajl_gen_status_ok) + goto err; + + event_id = event_id + 1; + if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) { + goto err; + } + + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { + goto err; + } + + // eventName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD, + strlen(CONNECTIVITY_EVENT_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface, + (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE + : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) { + goto err; + } + + if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != + yajl_gen_status_ok) { + goto err; + } + + // lastEpochMicrosec + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD, + strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(cdtime())) < 0) { + goto err; + } + + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { + goto err; + } + + // priority + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD, + strlen(CONNECTIVITY_PRIORITY_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE, + strlen(CONNECTIVITY_PRIORITY_VALUE)) != + yajl_gen_status_ok) + goto err; + + // reportingEntityName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD, + strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE, + strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) != + yajl_gen_status_ok) + goto err; + + // sequence + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD, + strlen(CONNECTIVITY_SEQUENCE_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE, + strlen(CONNECTIVITY_SEQUENCE_VALUE)) != + yajl_gen_status_ok) + goto err; + + // sourceName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD, + strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) != + yajl_gen_status_ok) + goto err; + + // startEpochMicrosec + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD, + strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(timestamp)) < 0) { + goto err; + } + + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { + goto err; + } + + // version + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD, + strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE, + strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok) + goto err; + + // *** END common event header *** + + // *** BEGIN state change fields *** + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_map_open(g) != yajl_gen_status_ok) + goto err; + + // newState + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD, + strlen(CONNECTIVITY_NEW_STATE_FIELD)) != + yajl_gen_status_ok) + goto err; + + int new_state_len = + (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE) + : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE)); + + if (yajl_gen_string(g, + (u_char *)(state == 0 + ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), + new_state_len) != yajl_gen_status_ok) + goto err; + + // oldState + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD, + strlen(CONNECTIVITY_OLD_STATE_FIELD)) != + yajl_gen_status_ok) + goto err; + + int old_state_len = + (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE) + : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE)); + + if (yajl_gen_string(g, + (u_char *)(old_state == 0 + ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), + old_state_len) != yajl_gen_status_ok) + goto err; + + // stateChangeFieldsVersion + if (yajl_gen_string(g, + (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) != + yajl_gen_status_ok) + goto err; + + // stateInterface + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD, + strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) != + yajl_gen_status_ok) + goto err; + + // close state change and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) + goto err; + + // *** END state change fields *** + + if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) + goto err; + + *buf = strdup((char *)buf2); + + if (*buf == NULL) { + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); + goto err; + } + + yajl_gen_free(g); + + return 0; + +err: + yajl_gen_free(g); + ERROR("connectivity plugin: gen_message_payload failed to generate JSON"); + return -1; +} + +static interface_list_t *add_interface(const char *interface, int status, + int prev_status) { + interface_list_t *il = calloc(1, sizeof(*il)); + + if (il == NULL) { + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); + return NULL; + } + + char *interface2 = strdup(interface); + if (interface2 == NULL) { + sfree(il); + ERROR("connectivity plugin: strdup failed during add_interface: %s", + STRERRNO); + return NULL; + } + + il->interface = interface2; + il->status = status; + il->prev_status = prev_status; + il->timestamp = cdtime(); + il->sent = 0; + il->next = interface_list_head; + interface_list_head = il; - pthread_mutex_lock(&connectivity_lock); + DEBUG("connectivity plugin: added interface %s", interface2); - interfacelist_t *il; + return il; +} + +static int connectivity_link_state(struct nlmsghdr *msg) { + pthread_mutex_lock(&connectivity_data_lock); + + struct nlattr *attr; + struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); /* Scan attribute list for device name. */ mnl_attr_for_each(attr, msg, sizeof(*ifi)) { @@ -101,233 +426,423 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); - for (il = interfacelist_head; il != NULL; il = il->next) + // Check the list of interfaces we should monitor, if we've chosen + // a subset. If we don't care about this one, abort. + if (ignorelist_match(ignorelist, dev) != 0) { + DEBUG("connectivity plugin: Ignoring link state change for unmonitored " + "interface: %s", + dev); + break; + } + + interface_list_t *il = NULL; + + for (il = interface_list_head; il != NULL; il = il->next) if (strcmp(dev, il->interface) == 0) break; if (il == NULL) { - INFO("connectivity plugin: Ignoring link state change for unmonitored " - "interface: %s", - dev); - } else { - uint32_t prev_status; - struct timeval tv; - - gettimeofday(&tv, NULL); - - unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; - - INFO("connectivity plugin (%llu): Interface %s status is now %s", - millisecondsSinceEpoch, dev, - ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); - prev_status = il->status; - il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0); - il->sec = tv.tv_sec; - il->usec = tv.tv_usec; - // If the new status is different than the previous status, - // store the previous status and set sent to zero - if (il->status != prev_status) { - il->prev_status = prev_status; - il->sent = 0; + // We haven't encountered this interface yet, so add it to the linked list + il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); + + if (il == NULL) { + ERROR("connectivity plugin: unable to add interface %s during " + "connectivity_link_state", + dev); + return MNL_CB_ERROR; } } - // no need to loop again, we found the interface name + uint32_t prev_status = il->status; + il->status = + ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); + il->timestamp = cdtime(); + + // If the new status is different than the previous status, + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch + if (il->status != prev_status) { + il->prev_status = prev_status; + il->sent = 0; + statuses_to_send = 1; + } + + DEBUG("connectivity plugin (%llu): Interface %s status is now %s", + (unsigned long long)il->timestamp, dev, + ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + + // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would // have moved us on with 'continue') break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { - switch (msg->nlmsg_type) { - case RTM_NEWADDR: - break; - case RTM_DELADDR: - break; - case RTM_NEWROUTE: - break; - case RTM_DELROUTE: - break; - case RTM_NEWLINK: - connectivity_link_state(msg); - break; - case RTM_DELLINK: - break; - default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", - msg->nlmsg_type); - break; + // We are only interested in RTM_NEWLINK messages + if (msg->nlmsg_type != RTM_NEWLINK) { + return 0; } - return 0; + return connectivity_link_state(msg); } -static int read_event(struct mnl_socket *nl, - int (*msg_handler)(struct nlmsghdr *)) { - int status; +static int read_event(int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; - char buf[4096]; - struct nlmsghdr *h; + int recv_flags = MSG_DONTWAIT; - if (nl == NULL) - return ret; + if (nl_sock == -1 || msg_handler == NULL) + return EINVAL; - status = mnl_socket_recvfrom(nl, buf, sizeof(buf)); + while (42) { + pthread_mutex_lock(&connectivity_threads_lock); - if (status < 0) { - /* Socket non-blocking so bail out once we have read everything */ - if (errno == EWOULDBLOCK || errno == EAGAIN) + if (connectivity_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return ret; + } - /* Anything else is an error */ - ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n", - status); - return status; - } + pthread_mutex_unlock(&connectivity_threads_lock); - if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); - } + char buf[4096]; + int status = recv(nl_sock, buf, sizeof(buf), recv_flags); - /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { - /* Finish reading */ - if (h->nlmsg_type == NLMSG_DONE) - return ret; + if (status < 0) { + + // If there were no more messages to drain from the socket, + // then signal the dequeue thread and allow it to dispatch + // any saved interface status changes. Then continue, but + // block and wait for new messages + if (errno == EWOULDBLOCK || errno == EAGAIN) { + pthread_cond_signal(&connectivity_cond); + + recv_flags = 0; + continue; + } + + if (errno == EINTR) { + // Interrupt, so just continue and try again + continue; + } + + /* Anything else is an error */ + ERROR("connectivity plugin: read_event: Error recv: %d", status); + return status; + } - /* Message is some kind of error */ - if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error - decode " - "TBD\n"); - return -1; // Error + // Message received successfully, so we'll stop blocking on the + // receive call for now (until we get a "would block" error, which + // will be handled above) + recv_flags = MSG_DONTWAIT; + + if (status == 0) { + DEBUG("connectivity plugin: read_event: EOF"); } - /* Call message handler */ - if (msg_handler) { - ret = (*msg_handler)(h); - if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", - ret); + /* We need to handle more than one message per 'recvmsg' */ + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) return ret; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) { + struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h); + ERROR("connectivity plugin: read_event: Message is an error: %d", + l_err->error); + return -1; // Error + } + + /* Call message handler */ + if (msg_handler) { + ret = (*msg_handler)(h); + if (ret < 0) { + ERROR("connectivity plugin: read_event: Message handler error %d", + ret); + return ret; + } + } else { + ERROR("connectivity plugin: read_event: Error NULL message handler"); + return -1; } - } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); - return -1; } } return ret; } -static void *connectivity_thread(void *arg) /* {{{ */ +static void connectivity_dispatch_notification(const char *interface, + gauge_t value, gauge_t old_value, + cdtime_t timestamp) { + + notification_t n = { + .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + .time = cdtime(), + .plugin = "connectivity", + .type = "gauge", + .type_instance = "interface_status", + }; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); + + char *buf = NULL; + + gen_message_payload(value, old_value, interface, timestamp, &buf); + + int status = plugin_notification_meta_add_string(&n, "ves", buf); + + if (status < 0) { + sfree(buf); + ERROR("connectivity plugin: unable to set notification VES metadata: %s", + STRERRNO); + return; + } + + DEBUG("connectivity plugin: notification VES metadata: %s", + n.meta->nm_value.nm_string); + + DEBUG("connectivity plugin: dispatching state %d for interface %s", + (int)value, interface); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // strdup'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} + +// NOTE: Caller MUST hold connectivity_data_lock when calling this function +static void send_interface_status() { + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) /* {{{ */ + { + uint32_t status = il->status; + uint32_t prev_status = il->prev_status; + uint32_t sent = il->sent; + + if (status != prev_status && sent == 0) { + connectivity_dispatch_notification(il->interface, status, prev_status, + il->timestamp); + il->sent = 1; + } + } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + statuses_to_send = 0; +} + +static void read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); - while (connectivity_thread_loop > 0) { - int status; + // If we don't have any interface statuses to dispatch, + // then we wait until signalled + if (!statuses_to_send) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); - pthread_mutex_unlock(&connectivity_lock); + send_interface_status(); - status = read_event(sock, msg_handler); + pthread_mutex_unlock(&connectivity_data_lock); +} /* }}} int *read_interface_status */ + +static void *connectivity_netlink_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); - pthread_mutex_lock(&connectivity_lock); + while (connectivity_netlink_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + int status = read_event(msg_handler); + + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { - connectivity_thread_error = 1; + connectivity_netlink_thread_error = 1; break; } + } /* while (connectivity_netlink_thread_loop > 0) */ - if (connectivity_thread_loop <= 0) - break; - } /* while (connectivity_thread_loop > 0) */ + pthread_mutex_unlock(&connectivity_threads_lock); + + return (void *)0; +} /* }}} void *connectivity_netlink_thread */ + +static void *connectivity_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + read_interface_status(); + + pthread_mutex_lock(&connectivity_threads_lock); + } /* while (connectivity_dequeue_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); -} /* }}} void *connectivity_thread */ +} /* }}} void *connectivity_dequeue_thread */ + +static int nl_connect() { + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, + .nl_groups = RTMGRP_LINK, + .nl_pid = getpid(), + }; + + nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); + if (nl_sock == -1) { + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); + return -1; + } -static int start_thread(void) /* {{{ */ -{ - int status; + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + if (rc == -1) { + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); + close(nl_sock); + nl_sock = -1; + return -1; + } - pthread_mutex_lock(&connectivity_lock); + return 0; +} - if (connectivity_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); +static int start_netlink_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; } - connectivity_thread_loop = 1; - connectivity_thread_error = 0; + connectivity_netlink_thread_loop = 1; + connectivity_netlink_thread_error = 0; - if (sock == NULL) { - sock = mnl_socket_open(NETLINK_ROUTE); - if (sock == NULL) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_open failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); - } + int status; - if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_bind failed."); - pthread_mutex_unlock(&connectivity_lock); - return (1); + if (nl_sock == -1) { + status = nl_connect(); + + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return status; } } - status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL, - connectivity_thread, + status = plugin_thread_create(&connectivity_netlink_thread_id, + /* attr = */ NULL, connectivity_netlink_thread, /* arg = */ (void *)0, "connectivity"); if (status != 0) { - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); - mnl_socket_close(sock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, STRERRNO); + } + + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&connectivity_lock); - return (0); -} /* }}} int start_thread */ + pthread_mutex_unlock(&connectivity_threads_lock); -static int stop_thread(int shutdown) /* {{{ */ + return status; +} + +static int start_dequeue_thread(void) /* {{{ */ { - int status; + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; + } + + connectivity_dequeue_thread_loop = 1; + + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); + if (status != 0) { + connectivity_dequeue_thread_loop = 0; + ERROR("connectivity plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; + } - if (sock != NULL) - mnl_socket_close(sock); + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_lock(&connectivity_lock); + return status; +} /* }}} int start_dequeue_thread */ - if (connectivity_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); +static int start_threads(void) /* {{{ */ +{ + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int socket_status; + + if (nl_sock != -1) { + socket_status = close(nl_sock); + if (socket_status != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + socket_status, STRERRNO); + } + + nl_sock = -1; + } else + socket_status = 0; + + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; } - connectivity_thread_loop = 0; + // Set thread termination status + connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); + + int thread_status; if (shutdown == 1) { // Since the thread is blocking, calling pthread_join // doesn't actually succeed in stopping it. It will stick around // until a NETLINK message is received on the socket (at which - // it will realize that "connectivity_thread_loop" is 0 and will + // it will realize that "connectivity_netlink_thread_loop" is 0 and will // break out of the read loop and be allowed to die). This is // fine when the process isn't supposed to be exiting, but in // the case of a process shutdown, we don't want to have an @@ -335,177 +850,165 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - INFO("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_thread_id); + thread_status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0) { - ERROR("connectivity plugin: Unable to cancel thread: %d", status); - status = -1; - } + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; } else { - status = pthread_join(connectivity_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("connectivity plugin: Stopping thread failed."); - status = -1; - } + thread_status = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; } - pthread_mutex_lock(&connectivity_lock); - memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id)); - connectivity_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_netlink_thread_id, 0, + sizeof(connectivity_netlink_thread_id)); + connectivity_netlink_thread_error = 0; + pthread_mutex_unlock(&connectivity_threads_lock); - INFO("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); - return (status); -} /* }}} int stop_thread */ + if (socket_status != 0) + return socket_status; + else + return thread_status; +} -static int connectivity_init(void) /* {{{ */ +static int stop_dequeue_thread() /* {{{ */ { - if (interfacelist_head == NULL) { - NOTICE("connectivity plugin: No interfaces have been configured."); - return (-1); + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; } - return (start_thread()); -} /* }}} int connectivity_init */ + // Set thread termination status + connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); -static int connectivity_config(const char *key, const char *value) /* {{{ */ -{ - if (strcasecmp(key, "Interface") == 0) { - interfacelist_t *il; - char *interface; + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status + pthread_cond_broadcast(&connectivity_cond); - il = malloc(sizeof(*il)); - if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during connectivity_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } + // Calling pthread_cancel here just assures that the thread is + // gone and that the process has been fully terminated. - interface = strdup(value); - if (interface == NULL) { - char errbuf[1024]; - sfree(il); - ERROR("connectivity plugin: strdup failed connectivity_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); - il->interface = interface; - il->status = 2; // "unknown" - il->prev_status = 2; - il->sent = 0; - il->next = interfacelist_head; - interfacelist_head = il; + int status = pthread_cancel(connectivity_dequeue_thread_id); - } else { - return (-1); - } + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; - return (0); -} /* }}} int connectivity_config */ + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_dequeue_thread_id, 0, + sizeof(connectivity_dequeue_thread_id)); + pthread_mutex_unlock(&connectivity_threads_lock); -static void submit(const char *interface, const char *type, /* {{{ */ - gauge_t value, uint32_t sec, uint32_t usec) { - value_list_t vl = VALUE_LIST_INIT; - char hostname[1024]; - vl.values = &(value_t){.gauge = value}; - vl.values_len = 1; - sstrncpy(vl.plugin, "connectivity", sizeof(vl.plugin)); - sstrncpy(vl.type_instance, interface, sizeof(vl.type_instance)); - sstrncpy(vl.type, type, sizeof(vl.type)); - - // Create metadata to store JSON key-values - meta_data_t *meta = meta_data_create(); - - vl.meta = meta; - // For latency measurement - struct timeval tv; - gettimeofday(&tv, NULL); - gethostname(hostname, sizeof(hostname)); - char strSec[11]; - char struSec[11]; - snprintf(strSec, sizeof strSec, "%" PRIu32, sec); - snprintf(struSec, sizeof struSec, "%" PRIu32, usec); - if (value == 1) { - meta_data_add_string(meta, "condition", "interface_up"); - meta_data_add_string(meta, "entity", interface); - meta_data_add_string(meta, "source", hostname); - meta_data_add_string(meta, "sec", strSec); - meta_data_add_string(meta, "usec", struSec); - meta_data_add_string(meta, "dest", "interface_down"); - } else { - meta_data_add_string(meta, "condition", "interface_down"); - meta_data_add_string(meta, "entity", interface); - meta_data_add_string(meta, "source", hostname); - meta_data_add_string(meta, "sec", strSec); - meta_data_add_string(meta, "usec", struSec); - meta_data_add_string(meta, "dest", "interface_up"); + DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); + + return status; +} /* }}} int stop_dequeue_thread */ + +static int stop_threads() /* {{{ */ +{ + int status = stop_netlink_thread(1); + int status2 = stop_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ + +static int connectivity_init(void) /* {{{ */ +{ + if (monitor_all_interfaces) { + NOTICE("connectivity plugin: No interfaces have been selected, so all will " + "be monitored"); } - plugin_dispatch_values(&vl); -} /* }}} void interface_submit */ + return start_threads(); +} /* }}} int connectivity_init */ -static int connectivity_read(void) /* {{{ */ +static int connectivity_config(const char *key, const char *value) /* {{{ */ { - if (connectivity_thread_error != 0) { - ERROR("connectivity plugin: The interface thread had a problem. Restarting " - "it."); + if (ignorelist == NULL) { + ignorelist = ignorelist_create(/* invert = */ 1); - stop_thread(0); + if (ignorelist == NULL) + return -1; + } - for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) { - il->status = 2; // signifies "unknown" - il->prev_status = 2; - il->sent = 0; - } + if (strcasecmp(key, "Interface") == 0) { + ignorelist_add(ignorelist, value); + monitor_all_interfaces = 0; + } else if (strcasecmp(key, "IgnoreSelected") == 0) { + int invert = 1; + if (IS_TRUE(value)) + invert = 0; + ignorelist_set_invert(ignorelist, invert); + } else { + return -1; + } - start_thread(); + return 0; +} /* }}} int connectivity_config */ - return (-1); - } /* if (connectivity_thread_error != 0) */ +static int connectivity_read(void) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); - for (interfacelist_t *il = interfacelist_head; il != NULL; - il = il->next) /* {{{ */ - { - uint32_t status; - uint32_t prev_status; - uint32_t sent; + if (connectivity_netlink_thread_error != 0) { - /* Locking here works, because the structure of the linked list is only - * changed during configure and shutdown. */ - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = il->status; - prev_status = il->prev_status; - sent = il->sent; + ERROR("connectivity plugin: The netlink thread had a problem. Restarting " + "it."); - if (status != prev_status && sent == 0) { - submit(il->interface, "gauge", status, il->sec, il->usec); + stop_netlink_thread(0); - il->sent = 1; + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) { + il->status = LINK_STATE_UNKNOWN; + il->prev_status = LINK_STATE_UNKNOWN; + il->sent = 0; } - pthread_mutex_unlock(&connectivity_lock); - } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */ + start_netlink_thread(); + + return -1; + } /* if (connectivity_netlink_thread_error != 0) */ - return (0); + pthread_mutex_unlock(&connectivity_threads_lock); + + return 0; } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interfacelist_t *il; + DEBUG("connectivity plugin: Shutting down thread."); - INFO("connectivity plugin: Shutting down thread."); - if (stop_thread(1) < 0) - return (-1); + int status = stop_threads(); - il = interfacelist_head; + interface_list_t *il = interface_list_head; while (il != NULL) { - interfacelist_t *il_next; + interface_list_t *il_next; il_next = il->next; @@ -515,7 +1018,9 @@ static int connectivity_shutdown(void) /* {{{ */ il = il_next; } - return (0); + ignorelist_free(ignorelist); + + return status; } /* }}} int connectivity_shutdown */ void module_register(void) {