X-Git-Url: https://git.octo.it/?a=blobdiff_plain;ds=sidebyside;f=src%2Fconnectivity.c;h=c470c9957c792fcdb7751712ff640f4597d455a9;hb=f4e7eb5fb59872c1a8b1e1f5314249489c314fb0;hp=e39bce17c94d04bbfd5e84df2136d46eff07b7b5;hpb=2aa26125065085bb47bbc7afdc9310abb6e2be21;p=collectd.git diff --git a/src/connectivity.c b/src/connectivity.c index e39bce17..c470c995 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -94,6 +94,7 @@ /* * Private data types */ + struct interface_list_s { char *interface; @@ -109,23 +110,37 @@ typedef struct interface_list_s interface_list_t; /* * Private variables */ + static ignorelist_t *ignorelist = NULL; static interface_list_t *interface_list_head = NULL; static int monitor_all_interfaces = 1; -static int connectivity_thread_loop = 0; -static int connectivity_thread_error = 0; -static pthread_t connectivity_thread_id; -static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; +static int connectivity_netlink_thread_loop = 0; +static int connectivity_netlink_thread_error = 0; +static pthread_t connectivity_netlink_thread_id; +static int connectivity_dequeue_thread_loop = 0; +static pthread_t connectivity_dequeue_thread_id; +static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; -static struct mnl_socket *sock; +static int nl_sock = -1; static int event_id = 0; +static int unsent_statuses = 0; -static const char *config_keys[] = {"Interface"}; +static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* + * Prototype + */ + +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp); + +/* * Private functions */ @@ -363,17 +378,14 @@ static int gen_message_payload(int state, int old_state, const char *interface, if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; - *buf = malloc(strlen((char *)buf2) + 1); + *buf = strdup((char *)buf2); if (*buf == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during gen_message_payload: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: strdup failed during gen_message_payload: %s", + STRERRNO); goto err; } - sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); - yajl_gen_free(g); return 0; @@ -386,23 +398,19 @@ err: static interface_list_t *add_interface(const char *interface, int status, int prev_status) { - interface_list_t *il; - char *interface2; + interface_list_t *il = calloc(1, sizeof(*il)); - il = malloc(sizeof(*il)); if (il == NULL) { - char errbuf[1024]; - ERROR("connectivity plugin: malloc failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: calloc failed during add_interface: %s", + STRERRNO); return NULL; } - interface2 = strdup(interface); + char *interface2 = strdup(interface); if (interface2 == NULL) { - char errbuf[1024]; sfree(il); ERROR("connectivity plugin: strdup failed during add_interface: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + STRERRNO); return NULL; } @@ -420,14 +428,10 @@ static interface_list_t *add_interface(const char *interface, int status, } static int connectivity_link_state(struct nlmsghdr *msg) { - int retval = 0; - struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); - struct nlattr *attr; - const char *dev = NULL; - - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); - interface_list_t *il = NULL; + struct nlattr *attr; + struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); /* Scan attribute list for device name. */ mnl_attr_for_each(attr, msg, sizeof(*ifi)) { @@ -438,11 +442,11 @@ static int connectivity_link_state(struct nlmsghdr *msg) { ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME " "mnl_attr_validate " "failed."); - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); return MNL_CB_ERROR; } - dev = mnl_attr_get_str(attr); + const char *dev = mnl_attr_get_str(attr); // Check the list of interfaces we should monitor, if we've chosen // a subset. If we don't care about this one, abort. @@ -453,12 +457,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; } + interface_list_t *il = NULL; + for (il = interface_list_head; il != NULL; il = il->next) if (strcmp(dev, il->interface) == 0) break; - uint32_t prev_status; - if (il == NULL) { // We haven't encountered this interface yet, so add it to the linked list il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN); @@ -471,16 +475,20 @@ static int connectivity_link_state(struct nlmsghdr *msg) { } } + uint32_t prev_status; + prev_status = il->status; il->status = ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); // If the new status is different than the previous status, - // store the previous status and set sent to zero + // store the previous status and set sent to zero, and set the + // global flag to indicate there are statuses to dispatch if (il->status != prev_status) { il->prev_status = prev_status; il->sent = 0; + unsent_statuses = 1; } DEBUG("connectivity plugin (%llu): Interface %s status is now %s", @@ -492,9 +500,9 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; } - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_unlock(&connectivity_data_lock); - return retval; + return 0; } static int msg_handler(struct nlmsghdr *msg) { @@ -510,166 +518,314 @@ static int msg_handler(struct nlmsghdr *msg) { connectivity_link_state(msg); break; default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d\n", + ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d", msg->nlmsg_type); break; } return 0; } -static int read_event(struct mnl_socket *nl, - int (*msg_handler)(struct nlmsghdr *)) { - int status; +static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; - char buf[4096]; - struct nlmsghdr *h; + int recv_flags = MSG_DONTWAIT; - if (nl == NULL) + if (nl == -1) return ret; - status = mnl_socket_recvfrom(nl, buf, sizeof(buf)); + while (42) { + pthread_mutex_lock(&connectivity_threads_lock); - if (status < 0) { - /* Socket non-blocking so bail out once we have read everything */ - if (errno == EWOULDBLOCK || errno == EAGAIN) + if (connectivity_netlink_thread_loop <= 0) { + pthread_mutex_unlock(&connectivity_threads_lock); return ret; + } - /* Anything else is an error */ - ERROR("connectivity plugin: read_event: Error mnl_socket_recvfrom: %d\n", - status); - return status; - } + pthread_mutex_unlock(&connectivity_threads_lock); - if (status == 0) { - DEBUG("connectivity plugin: read_event: EOF\n"); - } + char buf[4096]; + int status = recv(nl, buf, sizeof(buf), recv_flags); - /* We need to handle more than one message per 'recvmsg' */ - for (h = (struct nlmsghdr *)buf; NLMSG_OK(h, (unsigned int)status); - h = NLMSG_NEXT(h, status)) { - /* Finish reading */ - if (h->nlmsg_type == NLMSG_DONE) - return ret; + if (status < 0) { + + // If there were no more messages to drain from the socket, + // then signal the dequeue thread and allow it to dispatch + // any saved interface status changes. Then continue, but + // block and wait for new messages + if (errno == EWOULDBLOCK || errno == EAGAIN) { + pthread_cond_signal(&connectivity_cond); + + recv_flags = 0; + continue; + } + + if (errno == EINTR) + { + // Interrupt, so just return + return 0; + } - /* Message is some kind of error */ - if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error\n"); - return -1; // Error + /* Anything else is an error */ + ERROR("connectivity plugin: read_event: Error recv: %d", status); + return status; } - /* Call message handler */ - if (msg_handler) { - ret = (*msg_handler)(h); - if (ret < 0) { - ERROR("connectivity plugin: read_event: Message handler error %d\n", - ret); + // Message received successfully, so we'll stop blocking on the + // receive call for now (until we get a "would block" error, which + // will be handled above) + recv_flags = MSG_DONTWAIT; + + if (status == 0) { + DEBUG("connectivity plugin: read_event: EOF"); + } + + /* We need to handle more than one message per 'recvmsg' */ + for (struct nlmsghdr *h = (struct nlmsghdr *)buf; + NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) { + /* Finish reading */ + if (h->nlmsg_type == NLMSG_DONE) return ret; + + /* Message is some kind of error */ + if (h->nlmsg_type == NLMSG_ERROR) { + ERROR("connectivity plugin: read_event: Message is an error"); + return -1; // Error + } + + /* Call message handler */ + if (msg_handler) { + ret = (*msg_handler)(h); + if (ret < 0) { + ERROR("connectivity plugin: read_event: Message handler error %d", + ret); + return ret; + } + } else { + ERROR("connectivity plugin: read_event: Error NULL message handler"); + return -1; } - } else { - ERROR("connectivity plugin: read_event: Error NULL message handler\n"); - return -1; } } return ret; } -static void *connectivity_thread(void *arg) /* {{{ */ +// NOTE: Caller MUST hold connectivity_data_lock when calling this function +static void send_interface_status() { + for (interface_list_t *il = interface_list_head; il != NULL; + il = il->next) /* {{{ */ + { + uint32_t status; + uint32_t prev_status; + uint32_t sent; + + status = il->status; + prev_status = il->prev_status; + sent = il->sent; + + if (status != prev_status && sent == 0) { + connectivity_dispatch_notification(il->interface, "gauge", status, + prev_status, il->timestamp); + il->sent = 1; + } + } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ + + unsent_statuses = 0; +} + +static void read_interface_status() /* {{{ */ { - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_data_lock); + + if (!unsent_statuses) + pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); + + send_interface_status(); - while (connectivity_thread_loop > 0) { - int status; + pthread_mutex_unlock(&connectivity_data_lock); +} /* }}} int *read_interface_status */ + +static void *connectivity_netlink_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); - pthread_mutex_unlock(&connectivity_lock); + while (connectivity_netlink_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); - status = read_event(sock, msg_handler); + int status = read_event(nl_sock, msg_handler); - pthread_mutex_lock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); if (status < 0) { - connectivity_thread_error = 1; + connectivity_netlink_thread_error = 1; break; } + } /* while (connectivity_netlink_thread_loop > 0) */ - if (connectivity_thread_loop <= 0) - break; - } /* while (connectivity_thread_loop > 0) */ + pthread_mutex_unlock(&connectivity_threads_lock); + + return (void *)0; +} /* }}} void *connectivity_netlink_thread */ + +static void *connectivity_dequeue_thread(void *arg) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + while (connectivity_dequeue_thread_loop > 0) { + pthread_mutex_unlock(&connectivity_threads_lock); - pthread_mutex_unlock(&connectivity_lock); + read_interface_status(); + + pthread_mutex_lock(&connectivity_threads_lock); + } /* while (connectivity_dequeue_thread_loop > 0) */ + + pthread_mutex_unlock(&connectivity_threads_lock); return ((void *)0); -} /* }}} void *connectivity_thread */ +} /* }}} void *connectivity_dequeue_thread */ -static int start_thread(void) /* {{{ */ -{ - int status; +static int nl_connect() { + struct sockaddr_nl sa_nl = { + .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(), + }; + + nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); + if (nl_sock == -1) { + ERROR("connectivity plugin: socket open failed: %s", STRERRNO); + return -1; + } + + int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl)); + if (rc == -1) { + ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); + close(nl_sock); + return -1; + } + + return 0; +} - pthread_mutex_lock(&connectivity_lock); +static int start_netlink_thread(void) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); - if (connectivity_thread_loop != 0) { - pthread_mutex_unlock(&connectivity_lock); - return (0); + if (connectivity_netlink_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; } - connectivity_thread_loop = 1; - connectivity_thread_error = 0; + connectivity_netlink_thread_loop = 1; + connectivity_netlink_thread_error = 0; - if (sock == NULL) { - sock = mnl_socket_open(NETLINK_ROUTE); - if (sock == NULL) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_open failed."); - pthread_mutex_unlock(&connectivity_lock); - return (-1); - } + int status; + + if (nl_sock == -1) { + status = nl_connect(); - if (mnl_socket_bind(sock, RTMGRP_LINK, MNL_SOCKET_AUTOPID) < 0) { - ERROR( - "connectivity plugin: connectivity_thread: mnl_socket_bind failed."); - pthread_mutex_unlock(&connectivity_lock); - return (1); + if (status != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return status; } } - status = plugin_thread_create(&connectivity_thread_id, /* attr = */ NULL, - connectivity_thread, + status = plugin_thread_create(&connectivity_netlink_thread_id, + /* attr = */ NULL, connectivity_netlink_thread, /* arg = */ (void *)0, "connectivity"); if (status != 0) { - connectivity_thread_loop = 0; + connectivity_netlink_thread_loop = 0; ERROR("connectivity plugin: Starting thread failed."); - pthread_mutex_unlock(&connectivity_lock); - mnl_socket_close(sock); - return (-1); + pthread_mutex_unlock(&connectivity_threads_lock); + + int status2 = close(nl_sock); + + if (status2 != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + status2, STRERRNO); + } else + nl_sock = -1; + + return -1; } - pthread_mutex_unlock(&connectivity_lock); - return (0); -} /* }}} int start_thread */ + pthread_mutex_unlock(&connectivity_threads_lock); + + return status; +} -static int stop_thread(int shutdown) /* {{{ */ +static int start_dequeue_thread(void) /* {{{ */ { - int status; + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop != 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return 0; + } + + connectivity_dequeue_thread_loop = 1; - if (sock != NULL) - mnl_socket_close(sock); + int status = + plugin_thread_create(&connectivity_dequeue_thread_id, + /* attr = */ NULL, connectivity_dequeue_thread, + /* arg = */ (void *)0, "connectivity"); + if (status != 0) { + connectivity_dequeue_thread_loop = 0; + ERROR("connectivity plugin: Starting dequeue thread failed."); + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; + } + + pthread_mutex_unlock(&connectivity_threads_lock); + + return status; +} /* }}} int start_dequeue_thread */ + +static int start_threads(void) /* {{{ */ +{ + int status = start_netlink_thread(); + int status2 = start_dequeue_thread(); - pthread_mutex_lock(&connectivity_lock); + if (status != 0) + return status; + else + return status2; +} /* }}} int start_threads */ - if (connectivity_thread_loop == 0) { - pthread_mutex_unlock(&connectivity_lock); - return (-1); +static int stop_netlink_thread(int shutdown) /* {{{ */ +{ + int socket_status; + + if (nl_sock != -1) { + socket_status = close(nl_sock); + if (socket_status != 0) { + ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, + socket_status, STRERRNO); + } else + nl_sock = -1; + } else + socket_status = 0; + + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + // Thread has already been terminated, nothing more to attempt + return socket_status; } - connectivity_thread_loop = 0; + // Set thread termination status + connectivity_netlink_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the thread's termination status pthread_cond_broadcast(&connectivity_cond); - pthread_mutex_unlock(&connectivity_lock); + + int thread_status; if (shutdown == 1) { // Since the thread is blocking, calling pthread_join // doesn't actually succeed in stopping it. It will stick around // until a NETLINK message is received on the socket (at which - // it will realize that "connectivity_thread_loop" is 0 and will + // it will realize that "connectivity_netlink_thread_loop" is 0 and will // break out of the read loop and be allowed to die). This is // fine when the process isn't supposed to be exiting, but in // the case of a process shutdown, we don't want to have an @@ -677,31 +833,103 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - DEBUG("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling netlink thread for process shutdown"); - status = pthread_cancel(connectivity_thread_id); + thread_status = pthread_cancel(connectivity_netlink_thread_id); - if (status != 0) { - ERROR("connectivity plugin: Unable to cancel thread: %d", status); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel netlink thread: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; + } else { + thread_status = + pthread_join(connectivity_netlink_thread_id, /* return = */ NULL); + if (thread_status != 0 && thread_status != ESRCH) { + ERROR("connectivity plugin: Stopping netlink thread failed: %d", + thread_status); + thread_status = -1; + } else + thread_status = 0; + } + + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_netlink_thread_id, 0, + sizeof(connectivity_netlink_thread_id)); + connectivity_netlink_thread_error = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + DEBUG("connectivity plugin: Finished requesting stop of netlink thread"); + + if (socket_status != 0) + return socket_status; + else + return thread_status; +} + +static int stop_dequeue_thread(int shutdown) /* {{{ */ +{ + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_dequeue_thread_loop == 0) { + pthread_mutex_unlock(&connectivity_threads_lock); + return -1; + } + + // Set thread termination status + connectivity_dequeue_thread_loop = 0; + pthread_mutex_unlock(&connectivity_threads_lock); + + // Let threads waiting on access to the interface list know to move + // on such that they'll see the threads termination status + pthread_cond_broadcast(&connectivity_cond); + + int status; + + if (shutdown == 1) { + // Calling pthread_cancel here in + // the case of a shutdown just assures that the thread is + // gone and that the process has been fully terminated. + + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); + + status = pthread_cancel(connectivity_dequeue_thread_id); + + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); status = -1; - } + } else + status = 0; } else { - status = pthread_join(connectivity_thread_id, /* return = */ NULL); - if (status != 0) { - ERROR("connectivity plugin: Stopping thread failed."); + status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL); + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Stopping dequeue thread failed."); status = -1; - } + } else + status = 0; } - pthread_mutex_lock(&connectivity_lock); - memset(&connectivity_thread_id, 0, sizeof(connectivity_thread_id)); - connectivity_thread_error = 0; - pthread_mutex_unlock(&connectivity_lock); + pthread_mutex_lock(&connectivity_threads_lock); + memset(&connectivity_dequeue_thread_id, 0, + sizeof(connectivity_dequeue_thread_id)); + pthread_mutex_unlock(&connectivity_threads_lock); - DEBUG("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of dequeue thread"); - return (status); -} /* }}} int stop_thread */ + return status; +} /* }}} int stop_dequeue_thread */ + +static int stop_threads(int shutdown) /* {{{ */ +{ + int status = stop_netlink_thread(shutdown); + int status2 = stop_dequeue_thread(shutdown); + + if (status != 0) + return status; + else + return status2; +} /* }}} int stop_threads */ static int connectivity_init(void) /* {{{ */ { @@ -710,7 +938,7 @@ static int connectivity_init(void) /* {{{ */ "be monitored"); } - return (start_thread()); + return start_threads(); } /* }}} int connectivity_init */ static int connectivity_config(const char *key, const char *value) /* {{{ */ @@ -722,37 +950,47 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ if (strcasecmp(key, "Interface") == 0) { ignorelist_add(ignorelist, value); monitor_all_interfaces = 0; + } else if (strcasecmp(key, "IgnoreSelected") == 0) { + int invert = 1; + if (IS_TRUE(value)) + invert = 0; + ignorelist_set_invert(ignorelist, invert); } else { - return (-1); + return -1; } - return (0); + return 0; } /* }}} int connectivity_config */ -static void connectivity_dispatch_notification( - const char *interface, const char *type, /* {{{ */ - gauge_t value, gauge_t old_value, long long unsigned int timestamp) { - char *buf = NULL; - notification_t n = { - NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; - - if (value == LINK_STATE_UP) - n.severity = NOTIF_OKAY; +static void +connectivity_dispatch_notification(const char *interface, const char *type, + gauge_t value, gauge_t old_value, + long long unsigned int timestamp) { + + notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + cdtime(), + "", + "", + "connectivity", + "", + "", + "", + NULL}; sstrncpy(n.host, hostname_g, sizeof(n.host)); sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); sstrncpy(n.type, "gauge", sizeof(n.type)); sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); + char *buf = NULL; + gen_message_payload(value, old_value, interface, timestamp, &buf); notification_meta_t *m = calloc(1, sizeof(*m)); if (m == NULL) { - char errbuf[1024]; sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO); return; } @@ -770,18 +1008,23 @@ static void connectivity_dispatch_notification( plugin_dispatch_notification(&n); plugin_notification_meta_free(n.meta); - // malloc'd in gen_message_payload + // strdup'd in gen_message_payload if (buf != NULL) sfree(buf); } static int connectivity_read(void) /* {{{ */ { - if (connectivity_thread_error != 0) { - ERROR("connectivity plugin: The interface thread had a problem. Restarting " + pthread_mutex_lock(&connectivity_threads_lock); + + if (connectivity_netlink_thread_error != 0) { + + pthread_mutex_unlock(&connectivity_threads_lock); + + ERROR("connectivity plugin: The netlink thread had a problem. Restarting " "it."); - stop_thread(0); + stop_netlink_thread(0); for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) { @@ -790,45 +1033,23 @@ static int connectivity_read(void) /* {{{ */ il->sent = 0; } - start_thread(); + start_netlink_thread(); - return (-1); - } /* if (connectivity_thread_error != 0) */ + return -1; + } /* if (connectivity_netlink_thread_error != 0) */ - for (interface_list_t *il = interface_list_head; il != NULL; - il = il->next) /* {{{ */ - { - uint32_t status; - uint32_t prev_status; - uint32_t sent; - - pthread_mutex_lock(&connectivity_lock); - - status = il->status; - prev_status = il->prev_status; - sent = il->sent; + pthread_mutex_unlock(&connectivity_threads_lock); - if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); - il->sent = 1; - } - - pthread_mutex_unlock(&connectivity_lock); - } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ - - return (0); + return 0; } /* }}} int connectivity_read */ static int connectivity_shutdown(void) /* {{{ */ { - interface_list_t *il; - DEBUG("connectivity plugin: Shutting down thread."); - if (stop_thread(1) < 0) - return (-1); - il = interface_list_head; + int status = stop_threads(1); + + interface_list_t *il = interface_list_head; while (il != NULL) { interface_list_t *il_next; @@ -842,7 +1063,7 @@ static int connectivity_shutdown(void) /* {{{ */ ignorelist_free(ignorelist); - return (0); + return status; } /* }}} int connectivity_shutdown */ void module_register(void) {