X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fconnectivity.c;h=c470c9957c792fcdb7751712ff640f4597d455a9;hb=f614051d01f19146c9b2edce456c443c5fcdd884;hp=fea24fe0a85a734315d14d5adb34ec883d25e400;hpb=ba5440ec6dcdef1895366128f7652b7f08f81daf;p=collectd.git diff --git a/src/connectivity.c b/src/connectivity.c index fea24fe0..c470c995 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -30,6 +30,7 @@ #include "common.h" #include "plugin.h" #include "utils_complain.h" +#include "utils_ignorelist.h" #include #include @@ -56,6 +57,10 @@ #define MYPROTO NETLINK_ROUTE +#define LINK_STATE_DOWN 0 +#define LINK_STATE_UP 1 +#define LINK_STATE_UNKNOWN 2 + #define CONNECTIVITY_DOMAIN_FIELD "domain" #define CONNECTIVITY_DOMAIN_VALUE "stateChange" #define CONNECTIVITY_EVENT_ID_FIELD "eventId" @@ -89,7 +94,8 @@ /* * Private data types */ -struct interfacelist_s { + +struct interface_list_s { char *interface; uint32_t status; @@ -97,27 +103,44 @@ struct interfacelist_s { uint32_t sent; long long unsigned int timestamp; - struct interfacelist_s *next; + struct interface_list_s *next; }; -typedef struct interfacelist_s interfacelist_t; +typedef struct interface_list_s interface_list_t; /* * Private variables */ -static interfacelist_t *interfacelist_head = NULL; -static int connectivity_thread_loop = 0; -static int connectivity_thread_error = 0; -static pthread_t connectivity_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static ignorelist_t *ignorelist = NULL; + +static interface_list_t *interface_list_head = NULL; +static int monitor_all_interfaces = 1; + +static int connectivity_netlink_thread_loop = 0; +static int connectivity_netlink_thread_error = 0; +static pthread_t connectivity_netlink_thread_id; +static int connectivity_dequeue_thread_loop = 0; +static pthread_t connectivity_dequeue_thread_id; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -static struct mnl_socket *sock; +static int nl_sock = -1; static int event_id = 0; +static int unsent_statuses = 0; -static const char *config_keys[] = {"Interface"}; +static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* + * Prototype + */ + +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp); + +/* * Private functions */ @@ -125,6 +148,7 @@ static int gen_message_payload(int state, int old_state, const char *interface, long long unsigned int timestamp, char **buf) { const unsigned char *buf2; yajl_gen g; + char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) yajl_gen_config conf = {}; @@ -165,17 +189,13 @@ static int gen_message_payload(int state, int old_state, const char *interface, event_id = event_id + 1; int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - char *event_id_str = malloc(event_id_len); - snprintf(event_id_str, event_id_len, "%d", event_id); + memset(json_str, '\0', DATA_MAX_NAME_LEN); + snprintf(json_str, event_id_len, "%d", event_id); - if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) != - yajl_gen_status_ok) { - sfree(event_id_str); + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; } - sfree(event_id_str); - // eventName if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD, strlen(CONNECTIVITY_EVENT_NAME_FIELD)) != @@ -187,20 +207,16 @@ static int gen_message_payload(int state, int old_state, const char *interface, event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" event_name_len = event_name_len + 12; // "interface", 2 spaces and null-terminator - char *event_name_str = malloc(event_name_len); - memset(event_name_str, '\0', event_name_len); - snprintf(event_name_str, event_name_len, "interface %s %s", interface, + memset(json_str, '\0', DATA_MAX_NAME_LEN); + snprintf(json_str, event_name_len, "interface %s %s", interface, (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE : CONNECTIVITY_EVENT_NAME_UP_VALUE)); - if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) != + if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { - sfree(event_name_str); goto err; } - sfree(event_name_str); - // lastEpochMicrosec if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD, strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) != @@ -209,18 +225,14 @@ static int gen_message_payload(int state, int old_state, const char *interface, int last_epoch_microsec_len = sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - char *last_epoch_microsec_str = malloc(last_epoch_microsec_len); - snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu", + memset(json_str, '\0', DATA_MAX_NAME_LEN); + snprintf(json_str, last_epoch_microsec_len, "%llu", (long long unsigned int)CDTIME_T_TO_US(cdtime())); - if (yajl_gen_number(g, last_epoch_microsec_str, - strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) { - sfree(last_epoch_microsec_str); + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; } - sfree(last_epoch_microsec_str); - // priority if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD, strlen(CONNECTIVITY_PRIORITY_FIELD)) != @@ -272,18 +284,14 @@ static int gen_message_payload(int state, int old_state, const char *interface, int start_epoch_microsec_len = sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - char *start_epoch_microsec_str = malloc(start_epoch_microsec_len); - snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu", + memset(json_str, '\0', DATA_MAX_NAME_LEN); + snprintf(json_str, start_epoch_microsec_len, "%llu", (long long unsigned int)timestamp); - if (yajl_gen_number(g, start_epoch_microsec_str, - strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) { - sfree(start_epoch_microsec_str); + if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; } - sfree(start_epoch_microsec_str); - // version if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD, strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok) @@ -370,9 +378,13 @@ static int gen_message_payload(int state, int old_state, const char *interface, if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); + if (*buf == NULL) { + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); + goto err; + } yajl_gen_free(g); @@ -384,15 +396,42 @@ err: return -1; } -static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; - struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); - struct nlattr *attr; - const char *dev = NULL; +static interface_list_t *add_interface(const char *interface, int status, + int prev_status) { + interface_list_t *il = calloc(1, sizeof(*il)); - pthread_mutex_lock(&connectivity_lock); + if (il == NULL) { + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); + return NULL; + } - interfacelist_t *il; + char *interface2 = strdup(interface); + if (interface2 == NULL) { + sfree(il); + ERROR("connectivity plugin: strdup failed during add_interface: %s", + STRERRNO); + return NULL; + } + + il->interface = interface2; + il->status = status; + il->prev_status = prev_status; + il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->sent = 0; + il->next = interface_list_head; + interface_list_head = il; + + DEBUG("connectivity plugin: added interface %s", interface2); + + return il; +} + +static int connectivity_link_state(struct nlmsghdr *msg) { + pthread_mutex_lock(&connectivity_data_lock); + + struct nlattr *attr; + struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); /* Scan attribute list for device name. */ mnl_attr_for_each(attr, msg, sizeof(*ifi)) { @@ -403,226 +442,390 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); - for (il = interfacelist_head; il != NULL; il = il->next) + // Check the list of interfaces we should monitor, if we've chosen + // a subset. If we don't care about this one, abort. + if (ignorelist_match(ignorelist, dev) != 0) { + DEBUG("connectivity plugin: Ignoring link state change for unmonitored " + "interface: %s", + dev); + break; + } + + interface_list_t *il = NULL; + + for (il = interface_list_head; il != NULL; il = il->next) if (strcmp(dev, il->interface) == 0) break; if (il == NULL) { - DEBUG("connectivity plugin: Ignoring link state change for unmonitored " - "interface: %s", - dev); - } else { - uint32_t prev_status; - - prev_status = il->status; - il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0); - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); - - // If the new status is different than the previous status, - // store the previous status and set sent to zero - if (il->status != prev_status) { - il->prev_status = prev_status; - il->sent = 0; + // We haven't encountered this interface yet, so add it to the linked list + il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); + + if (il == NULL) { + ERROR("connectivity plugin: unable to add interface %s during " + "connectivity_link_state", + dev); + return MNL_CB_ERROR; } + } + + uint32_t prev_status; + + prev_status = il->status; + il->status = + ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); + il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); - DEBUG("connectivity plugin (%llu): Interface %s status is now %s", - il->timestamp, dev, - ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + // If the new status is different than the previous status, + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch + if (il->status != prev_status) { + il->prev_status = prev_status; + il->sent = 0; + unsent_statuses = 1; } + DEBUG("connectivity plugin (%llu): Interface %s status is now %s", + il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would // have moved us on with 'continue') break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { switch (msg->nlmsg_type) { case RTM_NEWADDR: - break; case RTM_DELADDR: - break; case RTM_NEWROUTE: - break; case RTM_DELROUTE: + case RTM_DELLINK: + // Not of interest in current version break; case RTM_NEWLINK: connectivity_link_state(msg); break; - case RTM_DELLINK: - break; default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", + ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d", msg->nlmsg_type); break; } return 0; } -static int read_event(struct mnl_socket *nl, - int (*msg_handler)(struct nlmsghdr *)) { - int status; +static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; - char buf[4096]; - struct nlmsghdr *h; + int recv_flags = MSG_DONTWAIT; - if (nl == NULL) + if (nl == -1) return ret; - status = mnl_socket_recvfrom(nl, buf, sizeof(buf)); + while (42) { + pthread_mutex_lock(&connectivity_threads_lock); - if (status < 0) { - /* Socket non-blocking so bail out once we have read everything */ - if (errno == EWOULDBLOCK || errno == EAGAIN) + if (connectivity_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return ret; + } - /* Anything else is an error */ - ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n", - status); - return status; - } + pthread_mutex_unlock(&connectivity_threads_lock); - if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); - } + char buf[4096]; + int status = recv(nl, buf, sizeof(buf), recv_flags); - /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { - /* Finish reading */ - if (h->nlmsg_type == NLMSG_DONE) - return ret; + if (status < 0) { + + // If there were no more messages to drain from the socket, + // then signal the dequeue thread and allow it to dispatch + // any saved interface status changes. Then continue, but + // block and wait for new messages + if (errno == EWOULDBLOCK || errno == EAGAIN) { + pthread_cond_signal(&connectivity_cond); + + recv_flags = 0; + continue; + } - /* Message is some kind of error */ - if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); - return -1; // Error + if (errno == EINTR) + { + // Interrupt, so just return + return 0; + } + + /* Anything else is an error */ + ERROR("connectivity plugin: read_event: Error recv: %d", status); + return status; } - /* Call message handler */ - if (msg_handler) { - ret = (*msg_handler)(h); - if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", - ret); + // Message received successfully, so we'll stop blocking on the + // receive call for now (until we get a "would block" error, which + // will be handled above) + recv_flags = MSG_DONTWAIT; + + if (status == 0) { + DEBUG("connectivity plugin: read_event: EOF"); + } + + /* We need to handle more than one message per 'recvmsg' */ + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) return ret; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) { + ERROR("connectivity plugin: read_event: Message is an error"); + return -1; // Error + } + + /* Call message handler */ + if (msg_handler) { + ret = (*msg_handler)(h); + if (ret < 0) { + ERROR("connectivity plugin: read_event: Message handler error %d", + ret); + return ret; + } + } else { + ERROR("connectivity plugin: read_event: Error NULL message handler"); + return -1; } - } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); - return -1; } } return ret; } -static void *connectivity_thread(void *arg) /* {{{ */ +// NOTE: Caller MUST hold connectivity_data_lock when calling this function +static void send_interface_status() { + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) /* {{{ */ + { + uint32_t status; + uint32_t prev_status; + uint32_t sent; + + status = il->status; + prev_status = il->prev_status; + sent = il->sent; + + if (status != prev_status && sent == 0) { + connectivity_dispatch_notification(il->interface, "gauge", status, + prev_status, il->timestamp); + il->sent = 1; + } + } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + unsent_statuses = 0; +} + +static void read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); - while (connectivity_thread_loop > 0) { - int status; + if (!unsent_statuses) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); - pthread_mutex_unlock(&connectivity_lock); + send_interface_status(); - status = read_event(sock, msg_handler); + pthread_mutex_unlock(&connectivity_data_lock); +} /* }}} int *read_interface_status */ - pthread_mutex_lock(&connectivity_lock); +static void *connectivity_netlink_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_netlink_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + int status = read_event(nl_sock, msg_handler); + + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { - connectivity_thread_error = 1; + connectivity_netlink_thread_error = 1; break; } + } /* while (connectivity_netlink_thread_loop > 0) */ - if (connectivity_thread_loop <= 0) - break; - } /* while (connectivity_thread_loop > 0) */ + pthread_mutex_unlock(&connectivity_threads_lock); + + return (void *)0; +} /* }}} void *connectivity_netlink_thread */ - pthread_mutex_unlock(&connectivity_lock); +static void *connectivity_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + + read_interface_status(); + + pthread_mutex_lock(&connectivity_threads_lock); + } /* while (connectivity_dequeue_thread_loop > 0) */ + + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); -} /* }}} void *connectivity_thread */ +} /* }}} void *connectivity_dequeue_thread */ -static int start_thread(void) /* {{{ */ -{ - int status; +static int nl_connect() { + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(), + }; - pthread_mutex_lock(&connectivity_lock); + nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); + if (nl_sock == -1) { + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); + return -1; + } - if (connectivity_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + if (rc == -1) { + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); + close(nl_sock); + return -1; } - connectivity_thread_loop = 1; - connectivity_thread_error = 0; + return 0; +} - if (sock == NULL) { - sock = mnl_socket_open(NETLINK_ROUTE); - if (sock == NULL) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_open failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); - } +static int start_netlink_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); - if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_bind failed."); - pthread_mutex_unlock(&connectivity_lock); - return (1); + if (connectivity_netlink_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; + } + + connectivity_netlink_thread_loop = 1; + connectivity_netlink_thread_error = 0; + + int status; + + if (nl_sock == -1) { + status = nl_connect(); + + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return status; } } - status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL, - connectivity_thread, + status = plugin_thread_create(&connectivity_netlink_thread_id, + /* attr = */ NULL, connectivity_netlink_thread, /* arg = */ (void *)0, "connectivity"); if (status != 0) { - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); - mnl_socket_close(sock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, STRERRNO); + } else + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&connectivity_lock); - return (0); -} /* }}} int start_thread */ + pthread_mutex_unlock(&connectivity_threads_lock); -static int stop_thread(int shutdown) /* {{{ */ + return status; +} + +static int start_dequeue_thread(void) /* {{{ */ { - int status; + pthread_mutex_lock(&connectivity_threads_lock); - if (sock != NULL) - mnl_socket_close(sock); + if (connectivity_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; + } - pthread_mutex_lock(&connectivity_lock); + connectivity_dequeue_thread_loop = 1; - if (connectivity_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); + if (status != 0) { + connectivity_dequeue_thread_loop = 0; + ERROR("connectivity plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; } - connectivity_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); + + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ + +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int socket_status; + + if (nl_sock != -1) { + socket_status = close(nl_sock); + if (socket_status != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + socket_status, STRERRNO); + } else + nl_sock = -1; + } else + socket_status = 0; + + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; + } + + // Set thread termination status + connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); + + int thread_status; if (shutdown == 1) { // Since the thread is blocking, calling pthread_join // doesn't actually succeed in stopping it. It will stick around // until a NETLINK message is received on the socket (at which - // it will realize that "connectivity_thread_loop" is 0 and will + // it will realize that "connectivity_netlink_thread_loop" is 0 and will // break out of the read loop and be allowed to die). This is // fine when the process isn't supposed to be exiting, but in // the case of a process shutdown, we don't want to have an @@ -630,107 +833,164 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_thread_id); + thread_status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0) { - ERROR("connectivity plugin: Unable to cancel thread: %d", status); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; + } else { + thread_status = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; + } + + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_netlink_thread_id, 0, + sizeof(connectivity_netlink_thread_id)); + connectivity_netlink_thread_error = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); + + if (socket_status != 0) + return socket_status; + else + return thread_status; +} + +static int stop_dequeue_thread(int shutdown) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; + } + + // Set thread termination status + connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status + pthread_cond_broadcast(&connectivity_cond); + + int status; + + if (shutdown == 1) { + // Calling pthread_cancel here in + // the case of a shutdown just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); + + status = pthread_cancel(connectivity_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); status = -1; - } + } else + status = 0; } else { - status = pthread_join(connectivity_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("connectivity plugin: Stopping thread failed."); + status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Stopping dequeue thread failed."); status = -1; - } + } else + status = 0; } - pthread_mutex_lock(&connectivity_lock); - memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id)); - connectivity_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_dequeue_thread_id, 0, + sizeof(connectivity_dequeue_thread_id)); + pthread_mutex_unlock(&connectivity_threads_lock); - DEBUG("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); - return (status); -} /* }}} int stop_thread */ + return status; +} /* }}} int stop_dequeue_thread */ + +static int stop_threads(int shutdown) /* {{{ */ +{ + int status = stop_netlink_thread(shutdown); + int status2 = stop_dequeue_thread(shutdown); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ static int connectivity_init(void) /* {{{ */ { - if (interfacelist_head == NULL) { - NOTICE("connectivity plugin: No interfaces have been configured."); - return (-1); + if (monitor_all_interfaces) { + NOTICE("connectivity plugin: No interfaces have been selected, so all will " + "be monitored"); } - return (start_thread()); + return start_threads(); } /* }}} int connectivity_init */ static int connectivity_config(const char *key, const char *value) /* {{{ */ { - if (strcasecmp(key, "Interface") == 0) { - interfacelist_t *il; - char *interface; - - il = malloc(sizeof(*il)); - if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during connectivity_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } - - interface = strdup(value); - if (interface == NULL) { - char errbuf[1024]; - sfree(il); - ERROR("connectivity plugin: strdup failed connectivity_config: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return (1); - } - - il->interface = interface; - il->status = 2; // "unknown" - il->prev_status = 2; - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); - il->sent = 0; - il->next = interfacelist_head; - interfacelist_head = il; + if (ignorelist == NULL) { + ignorelist = ignorelist_create(/* invert = */ 1); + } + if (strcasecmp(key, "Interface") == 0) { + ignorelist_add(ignorelist, value); + monitor_all_interfaces = 0; + } else if (strcasecmp(key, "IgnoreSelected") == 0) { + int invert = 1; + if (IS_TRUE(value)) + invert = 0; + ignorelist_set_invert(ignorelist, invert); } else { - return (-1); + return -1; } - return (0); + return 0; } /* }}} int connectivity_config */ -static void connectivity_dispatch_notification( - const char *interface, const char *type, /* {{{ */ - gauge_t value, gauge_t old_value, long long unsigned int timestamp) { - char *buf = NULL; - notification_t n = { - NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; - - if (value == 1) - n.severity = NOTIF_OKAY; - - char hostname[1024]; - gethostname(hostname, sizeof(hostname)); - - sstrncpy(n.host, hostname, sizeof(n.host)); +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp) { + + notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + cdtime(), + "", + "", + "connectivity", + "", + "", + "", + NULL}; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); sstrncpy(n.type, "gauge", sizeof(n.type)); sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); + char *buf = NULL; + gen_message_payload(value, old_value, interface, timestamp, &buf); notification_meta_t *m = calloc(1, sizeof(*m)); if (m == NULL) { - char errbuf[1024]; sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO); return; } @@ -748,66 +1008,50 @@ static void connectivity_dispatch_notification( plugin_dispatch_notification(&n); plugin_notification_meta_free(n.meta); - // malloc'd in gen_message_payload + // strdup'd in gen_message_payload if (buf != NULL) sfree(buf); } static int connectivity_read(void) /* {{{ */ { - if (connectivity_thread_error != 0) { - ERROR("connectivity plugin: The interface thread had a problem. Restarting " + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_error != 0) { + + pthread_mutex_unlock(&connectivity_threads_lock); + + ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); - stop_thread(0); + stop_netlink_thread(0); - for (interfacelist_t *il = interfacelist_head; il != NULL; il = il->next) { - il->status = 2; // signifies "unknown" - il->prev_status = 2; + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) { + il->status = LINK_STATE_UNKNOWN; + il->prev_status = LINK_STATE_UNKNOWN; il->sent = 0; } - start_thread(); + start_netlink_thread(); - return (-1); - } /* if (connectivity_thread_error != 0) */ + return -1; + } /* if (connectivity_netlink_thread_error != 0) */ - for (interfacelist_t *il = interfacelist_head; il != NULL; - il = il->next) /* {{{ */ - { - uint32_t status; - uint32_t prev_status; - uint32_t sent; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = il->status; - prev_status = il->prev_status; - sent = il->sent; - - if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); - il->sent = 1; - } - - pthread_mutex_unlock(&connectivity_lock); - } /* }}} for (il = interfacelist_head; il != NULL; il = il->next) */ - - return (0); + return 0; } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interfacelist_t *il; - DEBUG("connectivity plugin: Shutting down thread."); - if (stop_thread(1) < 0) - return (-1); - il = interfacelist_head; + int status = stop_threads(1); + + interface_list_t *il = interface_list_head; while (il != NULL) { - interfacelist_t *il_next; + interface_list_t *il_next; il_next = il->next; @@ -817,7 +1061,9 @@ static int connectivity_shutdown(void) /* {{{ */ il = il_next; } - return (0); + ignorelist_free(ignorelist); + + return status; } /* }}} int connectivity_shutdown */ void module_register(void) {