X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fconnectivity.c;h=45b65aab5b09c1c5ba35110bbff618f6144de153;hp=a91b57df7e3021b3d912a9eb48bea521cf37cd70;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=61a29253f72ffe706f8d4227d2737ce984f0110d diff --git a/src/connectivity.c b/src/connectivity.c index a91b57df..45b65aab 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -27,8 +27,9 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" #include @@ -93,13 +94,14 @@ /* * Private data types */ + struct interface_list_s { char *interface; uint32_t status; uint32_t prev_status; uint32_t sent; - long long unsigned int timestamp; + cdtime_t timestamp; struct interface_list_s *next; }; @@ -108,18 +110,25 @@ typedef struct interface_list_s interface_list_t; /* * Private variables */ -static interface_list_t *interface_list_head = NULL; -static int monitor_all_interfaces = 0; -static int connectivity_thread_loop = 0; -static int connectivity_thread_error = 0; -static pthread_t connectivity_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static ignorelist_t *ignorelist = NULL; + +static interface_list_t *interface_list_head = NULL; +static int monitor_all_interfaces = 1; + +static int connectivity_netlink_thread_loop = 0; +static int connectivity_netlink_thread_error = 0; +static pthread_t connectivity_netlink_thread_id; +static int connectivity_dequeue_thread_loop = 0; +static pthread_t connectivity_dequeue_thread_id; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -static struct mnl_socket *sock; +static int nl_sock = -1; static int event_id = 0; +static int statuses_to_send = 0; -static const char *config_keys[] = {"Interface"}; +static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* @@ -127,15 +136,13 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); */ static int gen_message_payload(int state, int old_state, const char *interface, - long long unsigned int timestamp, char **buf) { + cdtime_t timestamp, char **buf) { const unsigned char *buf2; yajl_gen g; char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) - yajl_gen_config conf = {}; - - conf.beautify = 0; + yajl_gen_config conf = {0}; #endif #if HAVE_YAJL_V2 @@ -170,9 +177,9 @@ static int gen_message_payload(int state, int old_state, const char *interface, goto err; event_id = event_id + 1; - int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_id_len, "%d", event_id); + if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -184,15 +191,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int event_name_len = 0; - event_name_len = event_name_len + strlen(interface); // interface name - event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" - event_name_len = - event_name_len + 12; // "interface", 2 spaces and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_name_len, "interface %s %s", interface, - (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE - : CONNECTIVITY_EVENT_NAME_UP_VALUE)); + if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface, + (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE + : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -205,11 +208,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int last_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, last_epoch_microsec_len, "%llu", - (long long unsigned int)CDTIME_T_TO_US(cdtime())); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(cdtime())) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -264,11 +266,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int start_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, start_epoch_microsec_len, "%llu", - (long long unsigned int)timestamp); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(timestamp)) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -305,10 +306,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), - new_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(state == 0 + ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), + new_state_len) != yajl_gen_status_ok) goto err; // oldState @@ -321,10 +323,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), - old_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(old_state == 0 + ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), + old_state_len) != yajl_gen_status_ok) goto err; // stateChangeFieldsVersion @@ -349,20 +352,23 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - if (yajl_gen_map_close(g) != yajl_gen_status_ok) + // close state change and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) goto err; // *** END state change fields *** - if (yajl_gen_map_close(g) != yajl_gen_status_ok) - goto err; - if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); + if (*buf == NULL) { + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); + goto err; + } yajl_gen_free(g); @@ -376,30 +382,26 @@ err: static interface_list_t *add_interface(const char *interface, int status, int prev_status) { - interface_list_t *il; - char *interface2; + interface_list_t *il = calloc(1, sizeof(*il)); - il = malloc(sizeof(*il)); if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); return NULL; } - interface2 = strdup(interface); + char *interface2 = strdup(interface); if (interface2 == NULL) { - char errbuf[1024]; sfree(il); ERROR("connectivity plugin: strdup failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); return NULL; } il->interface = interface2; il->status = status; il->prev_status = prev_status; - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->timestamp = cdtime(); il->sent = 0; il->next = interface_list_head; interface_list_head = il; @@ -410,14 +412,10 @@ static interface_list_t *add_interface(const char *interface, int status, } static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; - struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); - struct nlattr *attr; - const char *dev = NULL; + pthread_mutex_lock(&connectivity_data_lock); - pthread_mutex_lock(&connectivity_lock); - - interface_list_t *il; + struct nlattr *attr; + struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); /* Scan attribute list for device name. */ mnl_attr_for_each(attr, msg, sizeof(*ifi)) { @@ -428,237 +426,423 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); + + // Check the list of interfaces we should monitor, if we've chosen + // a subset. If we don't care about this one, abort. + if (ignorelist_match(ignorelist, dev) != 0) { + DEBUG("connectivity plugin: Ignoring link state change for unmonitored " + "interface: %s", + dev); + break; + } + + interface_list_t *il = NULL; for (il = interface_list_head; il != NULL; il = il->next) if (strcmp(dev, il->interface) == 0) break; - if (il == NULL && monitor_all_interfaces == 0) { - DEBUG("connectivity plugin: Ignoring link state change for unmonitored " - "interface: %s", - dev); - } else { - uint32_t prev_status; + if (il == NULL) { + // We haven't encountered this interface yet, so add it to the linked list + il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); if (il == NULL) { - // We're monitoring all interfaces and we haven't encountered this one - // yet, so add it to the linked list - il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); - - if (il == NULL) { - ERROR("connectivity plugin: unable to add interface %s during " - "connectivity_link_state", - dev); - return MNL_CB_ERROR; - } + ERROR("connectivity plugin: unable to add interface %s during " + "connectivity_link_state", + dev); + return MNL_CB_ERROR; } + } - prev_status = il->status; - il->status = - ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); - - // If the new status is different than the previous status, - // store the previous status and set sent to zero - if (il->status != prev_status) { - il->prev_status = prev_status; - il->sent = 0; - } + uint32_t prev_status = il->status; + il->status = + ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); + il->timestamp = cdtime(); - DEBUG("connectivity plugin (%llu): Interface %s status is now %s", - il->timestamp, dev, - ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + // If the new status is different than the previous status, + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch + if (il->status != prev_status) { + il->prev_status = prev_status; + il->sent = 0; + statuses_to_send = 1; } + DEBUG("connectivity plugin (%llu): Interface %s status is now %s", + (unsigned long long)il->timestamp, dev, + ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would // have moved us on with 'continue') break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { - switch (msg->nlmsg_type) { - case RTM_NEWADDR: - case RTM_DELADDR: - case RTM_NEWROUTE: - case RTM_DELROUTE: - case RTM_DELLINK: - // Not of interest in current version - break; - case RTM_NEWLINK: - connectivity_link_state(msg); - break; - default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", - msg->nlmsg_type); - break; + // We are only interested in RTM_NEWLINK messages + if (msg->nlmsg_type != RTM_NEWLINK) { + return 0; } - return 0; + return connectivity_link_state(msg); } -static int read_event(struct mnl_socket *nl, - int (*msg_handler)(struct nlmsghdr *)) { - int status; +static int read_event(int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; - char buf[4096]; - struct nlmsghdr *h; + int recv_flags = MSG_DONTWAIT; - if (nl == NULL) - return ret; + if (nl_sock == -1 || msg_handler == NULL) + return EINVAL; - status = mnl_socket_recvfrom(nl, buf, sizeof(buf)); + while (42) { + pthread_mutex_lock(&connectivity_threads_lock); - if (status < 0) { - /* Socket non-blocking so bail out once we have read everything */ - if (errno == EWOULDBLOCK || errno == EAGAIN) + if (connectivity_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return ret; + } - /* Anything else is an error */ - ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n", - status); - return status; - } + pthread_mutex_unlock(&connectivity_threads_lock); - if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); - } + char buf[4096]; + int status = recv(nl_sock, buf, sizeof(buf), recv_flags); - /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { - /* Finish reading */ - if (h->nlmsg_type == NLMSG_DONE) - return ret; + if (status < 0) { + + // If there were no more messages to drain from the socket, + // then signal the dequeue thread and allow it to dispatch + // any saved interface status changes. Then continue, but + // block and wait for new messages + if (errno == EWOULDBLOCK || errno == EAGAIN) { + pthread_cond_signal(&connectivity_cond); - /* Message is some kind of error */ - if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); - return -1; // Error + recv_flags = 0; + continue; + } + + if (errno == EINTR) { + // Interrupt, so just continue and try again + continue; + } + + /* Anything else is an error */ + ERROR("connectivity plugin: read_event: Error recv: %d", status); + return status; } - /* Call message handler */ - if (msg_handler) { - ret = (*msg_handler)(h); - if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", - ret); + // Message received successfully, so we'll stop blocking on the + // receive call for now (until we get a "would block" error, which + // will be handled above) + recv_flags = MSG_DONTWAIT; + + if (status == 0) { + DEBUG("connectivity plugin: read_event: EOF"); + } + + /* We need to handle more than one message per 'recvmsg' */ + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) return ret; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) { + struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h); + ERROR("connectivity plugin: read_event: Message is an error: %d", + l_err->error); + return -1; // Error + } + + /* Call message handler */ + if (msg_handler) { + ret = (*msg_handler)(h); + if (ret < 0) { + ERROR("connectivity plugin: read_event: Message handler error %d", + ret); + return ret; + } + } else { + ERROR("connectivity plugin: read_event: Error NULL message handler"); + return -1; } - } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); - return -1; } } return ret; } -static void *connectivity_thread(void *arg) /* {{{ */ +static void connectivity_dispatch_notification(const char *interface, + gauge_t value, gauge_t old_value, + cdtime_t timestamp) { + + notification_t n = { + .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + .time = cdtime(), + .plugin = "connectivity", + .type = "gauge", + .type_instance = "interface_status", + }; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); + + char *buf = NULL; + + gen_message_payload(value, old_value, interface, timestamp, &buf); + + int status = plugin_notification_meta_add_string(&n, "ves", buf); + + if (status < 0) { + sfree(buf); + ERROR("connectivity plugin: unable to set notification VES metadata: %s", + STRERRNO); + return; + } + + DEBUG("connectivity plugin: notification VES metadata: %s", + n.meta->nm_value.nm_string); + + DEBUG("connectivity plugin: dispatching state %d for interface %s", + (int)value, interface); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // strdup'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} + +// NOTE: Caller MUST hold connectivity_data_lock when calling this function +static void send_interface_status() { + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) /* {{{ */ + { + uint32_t status = il->status; + uint32_t prev_status = il->prev_status; + uint32_t sent = il->sent; + + if (status != prev_status && sent == 0) { + connectivity_dispatch_notification(il->interface, status, prev_status, + il->timestamp); + il->sent = 1; + } + } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + statuses_to_send = 0; +} + +static void read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); - while (connectivity_thread_loop > 0) { - int status; + // If we don't have any interface statuses to dispatch, + // then we wait until signalled + if (!statuses_to_send) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); - pthread_mutex_unlock(&connectivity_lock); + send_interface_status(); - status = read_event(sock, msg_handler); + pthread_mutex_unlock(&connectivity_data_lock); +} /* }}} int *read_interface_status */ - pthread_mutex_lock(&connectivity_lock); +static void *connectivity_netlink_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_netlink_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + int status = read_event(msg_handler); + + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { - connectivity_thread_error = 1; + connectivity_netlink_thread_error = 1; break; } + } /* while (connectivity_netlink_thread_loop > 0) */ - if (connectivity_thread_loop <= 0) - break; - } /* while (connectivity_thread_loop > 0) */ + pthread_mutex_unlock(&connectivity_threads_lock); + + return (void *)0; +} /* }}} void *connectivity_netlink_thread */ + +static void *connectivity_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + read_interface_status(); + + pthread_mutex_lock(&connectivity_threads_lock); + } /* while (connectivity_dequeue_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); -} /* }}} void *connectivity_thread */ +} /* }}} void *connectivity_dequeue_thread */ + +static int nl_connect() { + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, + .nl_groups = RTMGRP_LINK, + .nl_pid = getpid(), + }; + + nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); + if (nl_sock == -1) { + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); + return -1; + } -static int start_thread(void) /* {{{ */ -{ - int status; + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + if (rc == -1) { + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); + close(nl_sock); + nl_sock = -1; + return -1; + } - pthread_mutex_lock(&connectivity_lock); + return 0; +} - if (connectivity_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); +static int start_netlink_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; } - connectivity_thread_loop = 1; - connectivity_thread_error = 0; + connectivity_netlink_thread_loop = 1; + connectivity_netlink_thread_error = 0; - if (sock == NULL) { - sock = mnl_socket_open(NETLINK_ROUTE); - if (sock == NULL) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_open failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); - } + int status; - if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_bind failed."); - pthread_mutex_unlock(&connectivity_lock); - return (1); + if (nl_sock == -1) { + status = nl_connect(); + + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return status; } } - status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL, - connectivity_thread, + status = plugin_thread_create(&connectivity_netlink_thread_id, + /* attr = */ NULL, connectivity_netlink_thread, /* arg = */ (void *)0, "connectivity"); if (status != 0) { - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); - mnl_socket_close(sock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, STRERRNO); + } + + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&connectivity_lock); - return (0); -} /* }}} int start_thread */ + pthread_mutex_unlock(&connectivity_threads_lock); -static int stop_thread(int shutdown) /* {{{ */ + return status; +} + +static int start_dequeue_thread(void) /* {{{ */ { - int status; + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; + } + + connectivity_dequeue_thread_loop = 1; + + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); + if (status != 0) { + connectivity_dequeue_thread_loop = 0; + ERROR("connectivity plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; + } + + pthread_mutex_unlock(&connectivity_threads_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int socket_status; - if (sock != NULL) - mnl_socket_close(sock); + if (nl_sock != -1) { + socket_status = close(nl_sock); + if (socket_status != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + socket_status, STRERRNO); + } - pthread_mutex_lock(&connectivity_lock); + nl_sock = -1; + } else + socket_status = 0; - if (connectivity_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; } - connectivity_thread_loop = 0; + // Set thread termination status + connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); + + int thread_status; if (shutdown == 1) { // Since the thread is blocking, calling pthread_join // doesn't actually succeed in stopping it. It will stick around // until a NETLINK message is received on the socket (at which - // it will realize that "connectivity_thread_loop" is 0 and will + // it will realize that "connectivity_netlink_thread_loop" is 0 and will // break out of the read loop and be allowed to die). This is // fine when the process isn't supposed to be exiting, but in // the case of a process shutdown, we don't want to have an @@ -666,117 +850,138 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_thread_id); + thread_status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0) { - ERROR("connectivity plugin: Unable to cancel thread: %d", status); - status = -1; - } + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; } else { - status = pthread_join(connectivity_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("connectivity plugin: Stopping thread failed."); - status = -1; - } + thread_status = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; } - pthread_mutex_lock(&connectivity_lock); - memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id)); - connectivity_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_netlink_thread_id, 0, + sizeof(connectivity_netlink_thread_id)); + connectivity_netlink_thread_error = 0; + pthread_mutex_unlock(&connectivity_threads_lock); - DEBUG("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); - return (status); -} /* }}} int stop_thread */ + if (socket_status != 0) + return socket_status; + else + return thread_status; +} -static int connectivity_init(void) /* {{{ */ +static int stop_dequeue_thread() /* {{{ */ { - if (interface_list_head == NULL) { - NOTICE("connectivity plugin: No interfaces have been selected, so all will " - "be monitored"); - monitor_all_interfaces = 1; + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; } - return (start_thread()); -} /* }}} int connectivity_init */ + // Set thread termination status + connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); -static int connectivity_config(const char *key, const char *value) /* {{{ */ -{ - if (strcasecmp(key, "Interface") == 0) { - interface_list_t *il = - add_interface(value, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); - if (il == NULL) { - ERROR("connectivity plugin: unable to add interface %s during " - "connectivity_init", - value); - return (-1); - } - } else { - return (-1); - } + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status + pthread_cond_broadcast(&connectivity_cond); - return (0); -} /* }}} int connectivity_config */ + // Calling pthread_cancel here just assures that the thread is + // gone and that the process has been fully terminated. -static void connectivity_dispatch_notification( - const char *interface, const char *type, /* {{{ */ - gauge_t value, gauge_t old_value, long long unsigned int timestamp) { - char *buf = NULL; - notification_t n = { - NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); - if (value == LINK_STATE_UP) - n.severity = NOTIF_OKAY; + int status = pthread_cancel(connectivity_dequeue_thread_id); - char hostname[1024]; - gethostname(hostname, sizeof(hostname)); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; - sstrncpy(n.host, hostname, sizeof(n.host)); - sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); - sstrncpy(n.type, "gauge", sizeof(n.type)); - sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_dequeue_thread_id, 0, + sizeof(connectivity_dequeue_thread_id)); + pthread_mutex_unlock(&connectivity_threads_lock); - gen_message_payload(value, old_value, interface, timestamp, &buf); + DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); - notification_meta_t *m = calloc(1, sizeof(*m)); + return status; +} /* }}} int stop_dequeue_thread */ - if (m == NULL) { - char errbuf[1024]; - sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return; +static int stop_threads() /* {{{ */ +{ + int status = stop_netlink_thread(1); + int status2 = stop_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ + +static int connectivity_init(void) /* {{{ */ +{ + if (monitor_all_interfaces) { + NOTICE("connectivity plugin: No interfaces have been selected, so all will " + "be monitored"); } - sstrncpy(m->name, "ves", sizeof(m->name)); - m->nm_value.nm_string = sstrdup(buf); - m->type = NM_TYPE_STRING; - n.meta = m; + return start_threads(); +} /* }}} int connectivity_init */ - DEBUG("connectivity plugin: notification message: %s", - n.meta->nm_value.nm_string); +static int connectivity_config(const char *key, const char *value) /* {{{ */ +{ + if (ignorelist == NULL) { + ignorelist = ignorelist_create(/* invert = */ 1); - DEBUG("connectivity plugin: dispatching state %d for interface %s", - (int)value, interface); + if (ignorelist == NULL) + return -1; + } - plugin_dispatch_notification(&n); - plugin_notification_meta_free(n.meta); + if (strcasecmp(key, "Interface") == 0) { + ignorelist_add(ignorelist, value); + monitor_all_interfaces = 0; + } else if (strcasecmp(key, "IgnoreSelected") == 0) { + int invert = 1; + if (IS_TRUE(value)) + invert = 0; + ignorelist_set_invert(ignorelist, invert); + } else { + return -1; + } - // malloc'd in gen_message_payload - if (buf != NULL) - sfree(buf); -} + return 0; +} /* }}} int connectivity_config */ static int connectivity_read(void) /* {{{ */ { - if (connectivity_thread_error != 0) { - ERROR("connectivity plugin: The interface thread had a problem. Restarting " + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_error != 0) { + + pthread_mutex_unlock(&connectivity_threads_lock); + + ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); - stop_thread(0); + stop_netlink_thread(0); for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) { @@ -785,45 +990,23 @@ static int connectivity_read(void) /* {{{ */ il->sent = 0; } - start_thread(); + start_netlink_thread(); - return (-1); - } /* if (connectivity_thread_error != 0) */ + return -1; + } /* if (connectivity_netlink_thread_error != 0) */ - for (interface_list_t *il = interface_list_head; il != NULL; - il = il->next) /* {{{ */ - { - uint32_t status; - uint32_t prev_status; - uint32_t sent; - - pthread_mutex_lock(&connectivity_lock); - - status = il->status; - prev_status = il->prev_status; - sent = il->sent; - - if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); - il->sent = 1; - } + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_unlock(&connectivity_lock); - } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ - - return (0); + return 0; } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interface_list_t *il; - DEBUG("connectivity plugin: Shutting down thread."); - if (stop_thread(1) < 0) - return (-1); - il = interface_list_head; + int status = stop_threads(); + + interface_list_t *il = interface_list_head; while (il != NULL) { interface_list_t *il_next; @@ -835,7 +1018,9 @@ static int connectivity_shutdown(void) /* {{{ */ il = il_next; } - return (0); + ignorelist_free(ignorelist); + + return status; } /* }}} int connectivity_shutdown */ void module_register(void) {