X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fconnectivity.c;h=45b65aab5b09c1c5ba35110bbff618f6144de153;hp=f6356f03987275ffb888589b112d5ee19c18cfbc;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=7023393d66c024facbcc5f6603e52bc86a6a930e diff --git a/src/connectivity.c b/src/connectivity.c index f6356f03..45b65aab 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -27,10 +27,10 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" -#include "utils_ignorelist.h" #include #include @@ -101,7 +101,7 @@ struct interface_list_s { uint32_t status; uint32_t prev_status; uint32_t sent; - long long unsigned int timestamp; + cdtime_t timestamp; struct interface_list_s *next; }; @@ -120,40 +120,29 @@ static int connectivity_netlink_thread_loop = 0; static int connectivity_netlink_thread_error = 0; static pthread_t connectivity_netlink_thread_id; static int connectivity_dequeue_thread_loop = 0; -static int connectivity_dequeue_thread_error = 0; static pthread_t connectivity_dequeue_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -// static struct mnl_socket *sock; static int nl_sock = -1; static int event_id = 0; +static int statuses_to_send = 0; static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* - * Prototype - */ - -static void -connectivity_dispatch_notification(const char *interface, const char *type, - gauge_t value, gauge_t old_value, - long long unsigned int timestamp); - -/* * Private functions */ static int gen_message_payload(int state, int old_state, const char *interface, - long long unsigned int timestamp, char **buf) { + cdtime_t timestamp, char **buf) { const unsigned char *buf2; yajl_gen g; char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) - yajl_gen_config conf = {}; - - conf.beautify = 0; + yajl_gen_config conf = {0}; #endif #if HAVE_YAJL_V2 @@ -188,9 +177,9 @@ static int gen_message_payload(int state, int old_state, const char *interface, goto err; event_id = event_id + 1; - int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_id_len, "%d", event_id); + if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -202,15 +191,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int event_name_len = 0; - event_name_len = event_name_len + strlen(interface); // interface name - event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" - event_name_len = - event_name_len + 12; // "interface", 2 spaces and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_name_len, "interface %s %s", interface, - (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE - : CONNECTIVITY_EVENT_NAME_UP_VALUE)); + if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface, + (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE + : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -223,11 +208,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int last_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, last_epoch_microsec_len, "%llu", - (long long unsigned int)CDTIME_T_TO_US(cdtime())); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(cdtime())) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -282,11 +266,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int start_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, start_epoch_microsec_len, "%llu", - (long long unsigned int)timestamp); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(timestamp)) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -323,10 +306,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), - new_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(state == 0 + ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), + new_state_len) != yajl_gen_status_ok) goto err; // oldState @@ -339,10 +323,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), - old_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(old_state == 0 + ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), + old_state_len) != yajl_gen_status_ok) goto err; // stateChangeFieldsVersion @@ -367,28 +352,24 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - if (yajl_gen_map_close(g) != yajl_gen_status_ok) + // close state change and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) goto err; // *** END state change fields *** - if (yajl_gen_map_close(g) != yajl_gen_status_ok) - goto err; - if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); if (*buf == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during gen_message_payload: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); goto err; } - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); - yajl_gen_free(g); return 0; @@ -401,30 +382,26 @@ err: static interface_list_t *add_interface(const char *interface, int status, int prev_status) { - interface_list_t *il; - char *interface2; + interface_list_t *il = calloc(1, sizeof(*il)); - il = malloc(sizeof(*il)); if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); return NULL; } - interface2 = strdup(interface); + char *interface2 = strdup(interface); if (interface2 == NULL) { - char errbuf[1024]; sfree(il); ERROR("connectivity plugin: strdup failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); return NULL; } il->interface = interface2; il->status = status; il->prev_status = prev_status; - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->timestamp = cdtime(); il->sent = 0; il->next = interface_list_head; interface_list_head = il; @@ -435,14 +412,10 @@ static interface_list_t *add_interface(const char *interface, int status, } static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; - struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); - struct nlattr *attr; - const char *dev = NULL; + pthread_mutex_lock(&connectivity_data_lock); - pthread_mutex_lock(&connectivity_lock); - - interface_list_t *il = NULL; + struct nlattr *attr; + struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); /* Scan attribute list for device name. */ mnl_attr_for_each(attr, msg, sizeof(*ifi)) { @@ -453,11 +426,11 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); // Check the list of interfaces we should monitor, if we've chosen // a subset. If we don't care about this one, abort. @@ -468,12 +441,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; } + interface_list_t *il = NULL; + for (il = interface_list_head; il != NULL; il = il->next) if (strcmp(dev, il->interface) == 0) break; - uint32_t prev_status; - if (il == NULL) { // We haven't encountered this interface yet, so add it to the linked list il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); @@ -486,20 +459,23 @@ static int connectivity_link_state(struct nlmsghdr *msg) { } } - prev_status = il->status; + uint32_t prev_status = il->status; il->status = ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->timestamp = cdtime(); // If the new status is different than the previous status, - // store the previous status and set sent to zero + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch if (il->status != prev_status) { il->prev_status = prev_status; il->sent = 0; + statuses_to_send = 1; } DEBUG("connectivity plugin (%llu): Interface %s status is now %s", - il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + (unsigned long long)il->timestamp, dev, + ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would @@ -507,57 +483,38 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { - switch (msg->nlmsg_type) { - case RTM_NEWADDR: - case RTM_DELADDR: - case RTM_NEWROUTE: - case RTM_DELROUTE: - case RTM_DELLINK: - // Not of interest in current version - break; - case RTM_NEWLINK: - connectivity_link_state(msg); - break; - default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", - msg->nlmsg_type); - break; + // We are only interested in RTM_NEWLINK messages + if (msg->nlmsg_type != RTM_NEWLINK) { + return 0; } - return 0; + return connectivity_link_state(msg); } -// static int read_event(struct mnl_socket *nl, -// int (*msg_handler)(struct nlmsghdr *)) { -static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { - int status; +static int read_event(int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; - char buf[4096]; - struct nlmsghdr *h; int recv_flags = MSG_DONTWAIT; - // if (nl == NULL) - // return ret; - - if (nl == -1) - return ret; + if (nl_sock == -1 || msg_handler == NULL) + return EINVAL; while (42) { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop <= 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ret; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = recv(nl, buf, sizeof(buf), recv_flags); + char buf[4096]; + int status = recv(nl_sock, buf, sizeof(buf), recv_flags); if (status < 0) { @@ -566,19 +523,19 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { // any saved interface status changes. Then continue, but // block and wait for new messages if (errno == EWOULDBLOCK || errno == EAGAIN) { - pthread_mutex_lock(&connectivity_lock); pthread_cond_signal(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); recv_flags = 0; continue; } + if (errno == EINTR) { + // Interrupt, so just continue and try again + continue; + } + /* Anything else is an error */ - // ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: - // %d\n", - // status); - ERROR("connectivity plugin: read_event: Error recv: %d\n", status); + ERROR("connectivity plugin: read_event: Error recv: %d", status); return status; } @@ -588,19 +545,21 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { recv_flags = MSG_DONTWAIT; if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); + DEBUG("connectivity plugin: read_event: EOF"); } /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { /* Finish reading */ if (h->nlmsg_type == NLMSG_DONE) return ret; /* Message is some kind of error */ if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); + struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h); + ERROR("connectivity plugin: read_event: Message is an error: %d", + l_err->error); return -1; // Error } @@ -608,12 +567,12 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { if (msg_handler) { ret = (*msg_handler)(h); if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", + ERROR("connectivity plugin: read_event: Message handler error %d", ret); return ret; } } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); + ERROR("connectivity plugin: read_event: Error NULL message handler"); return -1; } } @@ -622,121 +581,138 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { return ret; } +static void connectivity_dispatch_notification(const char *interface, + gauge_t value, gauge_t old_value, + cdtime_t timestamp) { + + notification_t n = { + .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + .time = cdtime(), + .plugin = "connectivity", + .type = "gauge", + .type_instance = "interface_status", + }; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); + + char *buf = NULL; + + gen_message_payload(value, old_value, interface, timestamp, &buf); + + int status = plugin_notification_meta_add_string(&n, "ves", buf); + + if (status < 0) { + sfree(buf); + ERROR("connectivity plugin: unable to set notification VES metadata: %s", + STRERRNO); + return; + } + + DEBUG("connectivity plugin: notification VES metadata: %s", + n.meta->nm_value.nm_string); + + DEBUG("connectivity plugin: dispatching state %d for interface %s", + (int)value, interface); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // strdup'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} + +// NOTE: Caller MUST hold connectivity_data_lock when calling this function static void send_interface_status() { for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) /* {{{ */ { - uint32_t status; - uint32_t prev_status; - uint32_t sent; - - status = il->status; - prev_status = il->prev_status; - sent = il->sent; + uint32_t status = il->status; + uint32_t prev_status = il->prev_status; + uint32_t sent = il->sent; if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); + connectivity_dispatch_notification(il->interface, status, prev_status, + il->timestamp); il->sent = 1; } } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + statuses_to_send = 0; } -static int read_interface_status() /* {{{ */ +static void read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); - - // This first attempt is necessary because the netlink thread - // might have held the lock while this thread was blocked on - // the lock acquisition just above. And while the netlink thread - // had the lock, it could have called pthread_cond_singal, which - // obviously wouldn't have woken this thread, since this thread - // was not yet waiting on the condition signal. So we need to - // loop through the interfaces and check if any have changed - // status before we wait on the condition signal - send_interface_status(); + pthread_mutex_lock(&connectivity_data_lock); - pthread_cond_wait(&connectivity_cond, &connectivity_lock); + // If we don't have any interface statuses to dispatch, + // then we wait until signalled + if (!statuses_to_send) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); send_interface_status(); - pthread_mutex_unlock(&connectivity_lock); - - return 0; + pthread_mutex_unlock(&connectivity_data_lock); } /* }}} int *read_interface_status */ static void *connectivity_netlink_thread(void *arg) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); while (connectivity_netlink_thread_loop > 0) { - int status; - - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - status = read_event(nl_sock, msg_handler); + int status = read_event(msg_handler); - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { connectivity_netlink_thread_error = 1; break; } - - if (connectivity_netlink_thread_loop <= 0) - break; } /* while (connectivity_netlink_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); - return ((void *)0); + return (void *)0; } /* }}} void *connectivity_netlink_thread */ static void *connectivity_dequeue_thread(void *arg) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); while (connectivity_dequeue_thread_loop > 0) { - int status; - - pthread_mutex_unlock(&connectivity_lock); - - status = read_interface_status(); + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_lock(&connectivity_lock); + read_interface_status(); - if (status < 0) { - connectivity_dequeue_thread_error = 1; - break; - } - - if (connectivity_dequeue_thread_loop <= 0) - break; + pthread_mutex_lock(&connectivity_threads_lock); } /* while (connectivity_dequeue_thread_loop > 0) */ - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); } /* }}} void *connectivity_dequeue_thread */ static int nl_connect() { - int rc; - struct sockaddr_nl sa_nl; + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, + .nl_groups = RTMGRP_LINK, + .nl_pid = getpid(), + }; nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); if (nl_sock == -1) { - ERROR("connectivity plugin: socket open failed: %d", errno); + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); return -1; } - sa_nl.nl_family = AF_NETLINK; - sa_nl.nl_groups = RTMGRP_LINK; - sa_nl.nl_pid = getpid(); - - rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); if (rc == -1) { - ERROR("connectivity plugin: socket bind failed: %d", errno); + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); close(nl_sock); + nl_sock = -1; return -1; } @@ -745,23 +721,25 @@ static int nl_connect() { static int start_netlink_thread(void) /* {{{ */ { - int status; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; } connectivity_netlink_thread_loop = 1; connectivity_netlink_thread_error = 0; + int status; + if (nl_sock == -1) { status = nl_connect(); - if (status != 0) + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return status; + } } status = plugin_thread_create(&connectivity_netlink_thread_id, @@ -770,61 +748,58 @@ static int start_netlink_thread(void) /* {{{ */ if (status != 0) { connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); int status2 = close(nl_sock); if (status2 != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, - status2, strerror(errno)); - } else - nl_sock = -1; + status2, STRERRNO); + } - return (-1); + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return status; } static int start_dequeue_thread(void) /* {{{ */ { - int status; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_dequeue_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; } connectivity_dequeue_thread_loop = 1; - connectivity_dequeue_thread_error = 0; - status = plugin_thread_create(&connectivity_dequeue_thread_id, - /* attr = */ NULL, connectivity_dequeue_thread, - /* arg = */ (void *)0, "connectivity"); + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); if (status != 0) { connectivity_dequeue_thread_loop = 0; ERROR("connectivity plugin: Starting dequeue thread failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); return status; } /* }}} int start_dequeue_thread */ static int start_threads(void) /* {{{ */ { - int status, status2; - - status = start_netlink_thread(); - status2 = start_dequeue_thread(); + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); - if (status < 0) + if (status != 0) return status; else return status2; @@ -832,28 +807,36 @@ static int start_threads(void) /* {{{ */ static int stop_netlink_thread(int shutdown) /* {{{ */ { - int status; + int socket_status; if (nl_sock != -1) { - status = close(nl_sock); - if (status != 0) { + socket_status = close(nl_sock); + if (socket_status != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, - status, strerror(errno)); - return (-1); - } else - nl_sock = -1; - } + socket_status, STRERRNO); + } - pthread_mutex_lock(&connectivity_lock); + nl_sock = -1; + } else + socket_status = 0; + + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; } + // Set thread termination status connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); + + int thread_status; if (shutdown == 1) { // Since the thread is blocking, calling pthread_join @@ -869,90 +852,85 @@ static int stop_netlink_thread(int shutdown) /* {{{ */ DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_netlink_thread_id); + thread_status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Unable to cancel netlink thread: %d", status); - status = -1; + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; } else - status = 0; + thread_status = 0; } else { - status = pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Stopping netlink thread failed."); - status = -1; + thread_status = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_status); + thread_status = -1; } else - return 0; + thread_status = 0; } - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); memset(&connectivity_netlink_thread_id, 0, sizeof(connectivity_netlink_thread_id)); connectivity_netlink_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); - return status; + if (socket_status != 0) + return socket_status; + else + return thread_status; } -static int stop_dequeue_thread(int shutdown) /* {{{ */ +static int stop_dequeue_thread() /* {{{ */ { - int status; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_dequeue_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; } + // Set thread termination status connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); - if (shutdown == 1) { - // Calling pthread_cancel here in - // the case of a shutdown just assures that the thread is - // gone and that the process has been fully terminated. + // Calling pthread_cancel here just assures that the thread is + // gone and that the process has been fully terminated. - DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); - status = pthread_cancel(connectivity_dequeue_thread_id); + int status = pthread_cancel(connectivity_dequeue_thread_id); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); - status = -1; - } else - status = 0; - } else { - status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Stopping dequeue thread failed."); - status = -1; - } else - status = 0; - } + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); memset(&connectivity_dequeue_thread_id, 0, sizeof(connectivity_dequeue_thread_id)); - connectivity_dequeue_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); - return (status); + return status; } /* }}} int stop_dequeue_thread */ -static int stop_threads(int shutdown) /* {{{ */ +static int stop_threads() /* {{{ */ { - int status, status2; - - status = stop_netlink_thread(shutdown); - status2 = stop_dequeue_thread(shutdown); + int status = stop_netlink_thread(1); + int status2 = stop_dequeue_thread(); - if (status < 0) + if (status != 0) return status; else return status2; @@ -965,13 +943,16 @@ static int connectivity_init(void) /* {{{ */ "be monitored"); } - return (start_threads()); + return start_threads(); } /* }}} int connectivity_init */ static int connectivity_config(const char *key, const char *value) /* {{{ */ { if (ignorelist == NULL) { ignorelist = ignorelist_create(/* invert = */ 1); + + if (ignorelist == NULL) + return -1; } if (strcasecmp(key, "Interface") == 0) { @@ -983,66 +964,19 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ invert = 0; ignorelist_set_invert(ignorelist, invert); } else { - return (-1); + return -1; } - return (0); + return 0; } /* }}} int connectivity_config */ -static void -connectivity_dispatch_notification(const char *interface, const char *type, - gauge_t value, gauge_t old_value, - long long unsigned int timestamp) { - char *buf = NULL; - notification_t n = { - NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; - - if (value == LINK_STATE_UP) - n.severity = NOTIF_OKAY; - - sstrncpy(n.host, hostname_g, sizeof(n.host)); - sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); - sstrncpy(n.type, "gauge", sizeof(n.type)); - sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); - - gen_message_payload(value, old_value, interface, timestamp, &buf); - - notification_meta_t *m = calloc(1, sizeof(*m)); - - if (m == NULL) { - char errbuf[1024]; - sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); - return; - } - - sstrncpy(m->name, "ves", sizeof(m->name)); - m->nm_value.nm_string = sstrdup(buf); - m->type = NM_TYPE_STRING; - n.meta = m; - - DEBUG("connectivity plugin: notification message: %s", - n.meta->nm_value.nm_string); - - DEBUG("connectivity plugin: dispatching state %d for interface %s", - (int)value, interface); - - plugin_dispatch_notification(&n); - plugin_notification_meta_free(n.meta); - - // malloc'd in gen_message_payload - if (buf != NULL) - sfree(buf); -} - static int connectivity_read(void) /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (connectivity_netlink_thread_error != 0) { - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_threads_lock); ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); @@ -1058,37 +992,21 @@ static int connectivity_read(void) /* {{{ */ start_netlink_thread(); - return (-1); + return -1; } /* if (connectivity_netlink_thread_error != 0) */ - if (connectivity_dequeue_thread_error != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_unlock(&connectivity_lock); - - ERROR("connectivity plugin: The dequeue thread had a problem. Restarting " - "it."); - - stop_dequeue_thread(0); - - start_dequeue_thread(); - - return (-1); - } /* if (connectivity_dequeue_thread_error != 0) */ - - pthread_mutex_unlock(&connectivity_lock); - - return (0); + return 0; } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interface_list_t *il; - DEBUG("connectivity plugin: Shutting down thread."); - if (stop_threads(1) < 0) - return (-1); - il = interface_list_head; + int status = stop_threads(); + + interface_list_t *il = interface_list_head; while (il != NULL) { interface_list_t *il_next; @@ -1102,7 +1020,7 @@ static int connectivity_shutdown(void) /* {{{ */ ignorelist_free(ignorelist); - return (0); + return status; } /* }}} int connectivity_shutdown */ void module_register(void) {