X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fconnectivity.c;h=45b65aab5b09c1c5ba35110bbff618f6144de153;hp=ec2f7e38631dce51f20da2bf743a3db37812bbb1;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=f3f3b71f2088037c9f36c52d0cecc2fd1c769531 diff --git a/src/connectivity.c b/src/connectivity.c index ec2f7e38..45b65aab 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -27,10 +27,10 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" +#include "utils/common/common.h" +#include "utils/ignorelist/ignorelist.h" #include "utils_complain.h" -#include "utils_ignorelist.h" #include #include @@ -101,7 +101,7 @@ struct interface_list_s { uint32_t status; uint32_t prev_status; uint32_t sent; - long long unsigned int timestamp; + cdtime_t timestamp; struct interface_list_s *next; }; @@ -126,34 +126,23 @@ static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; static int nl_sock = -1; static int event_id = 0; -static int unsent_statuses = 0; +static int statuses_to_send = 0; static const char *config_keys[] = {"Interface", "IgnoreSelected"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); /* - * Prototype - */ - -static void -connectivity_dispatch_notification(const char *interface, const char *type, - gauge_t value, gauge_t old_value, - long long unsigned int timestamp); - -/* * Private functions */ static int gen_message_payload(int state, int old_state, const char *interface, - long long unsigned int timestamp, char **buf) { + cdtime_t timestamp, char **buf) { const unsigned char *buf2; yajl_gen g; char json_str[DATA_MAX_NAME_LEN]; #if !defined(HAVE_YAJL_V2) - yajl_gen_config conf = {}; - - conf.beautify = 0; + yajl_gen_config conf = {0}; #endif #if HAVE_YAJL_V2 @@ -188,9 +177,9 @@ static int gen_message_payload(int state, int old_state, const char *interface, goto err; event_id = event_id + 1; - int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_id_len, "%d", event_id); + if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -202,15 +191,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int event_name_len = 0; - event_name_len = event_name_len + strlen(interface); // interface name - event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" - event_name_len = - event_name_len + 12; // "interface", 2 spaces and null-terminator - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, event_name_len, "interface %s %s", interface, - (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE - : CONNECTIVITY_EVENT_NAME_UP_VALUE)); + if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface, + (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE + : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) { + goto err; + } if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) != yajl_gen_status_ok) { @@ -223,11 +208,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int last_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, last_epoch_microsec_len, "%llu", - (long long unsigned int)CDTIME_T_TO_US(cdtime())); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(cdtime())) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -282,11 +266,10 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - int start_epoch_microsec_len = - sizeof(char) * sizeof(long long unsigned int) * 4 + 1; - memset(json_str, '\0', DATA_MAX_NAME_LEN); - snprintf(json_str, start_epoch_microsec_len, "%llu", - (long long unsigned int)timestamp); + if (snprintf(json_str, sizeof(json_str), "%" PRIu64, + CDTIME_T_TO_US(timestamp)) < 0) { + goto err; + } if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) { goto err; @@ -323,10 +306,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), - new_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(state == 0 + ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), + new_state_len) != yajl_gen_status_ok) goto err; // oldState @@ -339,10 +323,11 @@ static int gen_message_payload(int state, int old_state, const char *interface, (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE) : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE)); - if (yajl_gen_string( - g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE - : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), - old_state_len) != yajl_gen_status_ok) + if (yajl_gen_string(g, + (u_char *)(old_state == 0 + ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), + old_state_len) != yajl_gen_status_ok) goto err; // stateChangeFieldsVersion @@ -367,14 +352,13 @@ static int gen_message_payload(int state, int old_state, const char *interface, yajl_gen_status_ok) goto err; - if (yajl_gen_map_close(g) != yajl_gen_status_ok) + // close state change and header fields + if (yajl_gen_map_close(g) != yajl_gen_status_ok || + yajl_gen_map_close(g) != yajl_gen_status_ok) goto err; // *** END state change fields *** - if (yajl_gen_map_close(g) != yajl_gen_status_ok) - goto err; - if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) goto err; @@ -417,7 +401,7 @@ static interface_list_t *add_interface(const char *interface, int status, il->interface = interface2; il->status = status; il->prev_status = prev_status; - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->timestamp = cdtime(); il->sent = 0; il->next = interface_list_head; interface_list_head = il; @@ -475,12 +459,10 @@ static int connectivity_link_state(struct nlmsghdr *msg) { } } - uint32_t prev_status; - - prev_status = il->status; + uint32_t prev_status = il->status; il->status = ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN); - il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + il->timestamp = cdtime(); // If the new status is different than the previous status, // store the previous status and set sent to zero, and set the @@ -488,11 +470,12 @@ static int connectivity_link_state(struct nlmsghdr *msg) { if (il->status != prev_status) { il->prev_status = prev_status; il->sent = 0; - unsent_statuses = 1; + statuses_to_send = 1; } DEBUG("connectivity plugin (%llu): Interface %s status is now %s", - il->timestamp, dev, ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); + (unsigned long long)il->timestamp, dev, + ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would @@ -506,31 +489,19 @@ static int connectivity_link_state(struct nlmsghdr *msg) { } static int msg_handler(struct nlmsghdr *msg) { - switch (msg->nlmsg_type) { - case RTM_NEWADDR: - case RTM_DELADDR: - case RTM_NEWROUTE: - case RTM_DELROUTE: - case RTM_DELLINK: - // Not of interest in current version - break; - case RTM_NEWLINK: - connectivity_link_state(msg); - break; - default: - ERROR("connectivity plugin: msg_handler: Unknown netlink nlmsg_type %d", - msg->nlmsg_type); - break; + // We are only interested in RTM_NEWLINK messages + if (msg->nlmsg_type != RTM_NEWLINK) { + return 0; } - return 0; + return connectivity_link_state(msg); } -static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { +static int read_event(int (*msg_handler)(struct nlmsghdr *)) { int ret = 0; int recv_flags = MSG_DONTWAIT; - if (nl == -1) - return ret; + if (nl_sock == -1 || msg_handler == NULL) + return EINVAL; while (42) { pthread_mutex_lock(&connectivity_threads_lock); @@ -543,7 +514,7 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { pthread_mutex_unlock(&connectivity_threads_lock); char buf[4096]; - int status = recv(nl, buf, sizeof(buf), recv_flags); + int status = recv(nl_sock, buf, sizeof(buf), recv_flags); if (status < 0) { @@ -558,6 +529,11 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { continue; } + if (errno == EINTR) { + // Interrupt, so just continue and try again + continue; + } + /* Anything else is an error */ ERROR("connectivity plugin: read_event: Error recv: %d", status); return status; @@ -581,7 +557,9 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { /* Message is some kind of error */ if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error"); + struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h); + ERROR("connectivity plugin: read_event: Message is an error: %d", + l_err->error); return -1; // Error } @@ -603,34 +581,74 @@ static int read_event(int nl, int (*msg_handler)(struct nlmsghdr *)) { return ret; } +static void connectivity_dispatch_notification(const char *interface, + gauge_t value, gauge_t old_value, + cdtime_t timestamp) { + + notification_t n = { + .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), + .time = cdtime(), + .plugin = "connectivity", + .type = "gauge", + .type_instance = "interface_status", + }; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); + + char *buf = NULL; + + gen_message_payload(value, old_value, interface, timestamp, &buf); + + int status = plugin_notification_meta_add_string(&n, "ves", buf); + + if (status < 0) { + sfree(buf); + ERROR("connectivity plugin: unable to set notification VES metadata: %s", + STRERRNO); + return; + } + + DEBUG("connectivity plugin: notification VES metadata: %s", + n.meta->nm_value.nm_string); + + DEBUG("connectivity plugin: dispatching state %d for interface %s", + (int)value, interface); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // strdup'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} + // NOTE: Caller MUST hold connectivity_data_lock when calling this function static void send_interface_status() { for (interface_list_t *il = interface_list_head; il != NULL; il = il->next) /* {{{ */ { - uint32_t status; - uint32_t prev_status; - uint32_t sent; - - status = il->status; - prev_status = il->prev_status; - sent = il->sent; + uint32_t status = il->status; + uint32_t prev_status = il->prev_status; + uint32_t sent = il->sent; if (status != prev_status && sent == 0) { - connectivity_dispatch_notification(il->interface, "gauge", status, - prev_status, il->timestamp); + connectivity_dispatch_notification(il->interface, status, prev_status, + il->timestamp); il->sent = 1; } } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */ - unsent_statuses = 0; + statuses_to_send = 0; } static void read_interface_status() /* {{{ */ { pthread_mutex_lock(&connectivity_data_lock); - if (!unsent_statuses) + // If we don't have any interface statuses to dispatch, + // then we wait until signalled + if (!statuses_to_send) pthread_cond_wait(&connectivity_cond, &connectivity_data_lock); send_interface_status(); @@ -645,7 +663,7 @@ static void *connectivity_netlink_thread(void *arg) /* {{{ */ while (connectivity_netlink_thread_loop > 0) { pthread_mutex_unlock(&connectivity_threads_lock); - int status = read_event(nl_sock, msg_handler); + int status = read_event(msg_handler); pthread_mutex_lock(&connectivity_threads_lock); @@ -679,7 +697,9 @@ static void *connectivity_dequeue_thread(void *arg) /* {{{ */ static int nl_connect() { struct sockaddr_nl sa_nl = { - .nl_family = AF_NETLINK, .nl_groups = RTMGRP_LINK, .nl_pid = getpid(), + .nl_family = AF_NETLINK, + .nl_groups = RTMGRP_LINK, + .nl_pid = getpid(), }; nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE); @@ -692,6 +712,7 @@ static int nl_connect() { if (rc == -1) { ERROR("connectivity plugin: socket bind failed: %s", STRERRNO); close(nl_sock); + nl_sock = -1; return -1; } @@ -734,8 +755,9 @@ static int start_netlink_thread(void) /* {{{ */ if (status2 != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, status2, STRERRNO); - } else - nl_sock = -1; + } + + nl_sock = -1; return -1; } @@ -792,8 +814,9 @@ static int stop_netlink_thread(int shutdown) /* {{{ */ if (socket_status != 0) { ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock, socket_status, STRERRNO); - } else - nl_sock = -1; + } + + nl_sock = -1; } else socket_status = 0; @@ -862,7 +885,7 @@ static int stop_netlink_thread(int shutdown) /* {{{ */ return thread_status; } -static int stop_dequeue_thread(int shutdown) /* {{{ */ +static int stop_dequeue_thread() /* {{{ */ { pthread_mutex_lock(&connectivity_threads_lock); @@ -879,30 +902,18 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */ // on such that they'll see the threads termination status pthread_cond_broadcast(&connectivity_cond); - int status; + // Calling pthread_cancel here just assures that the thread is + // gone and that the process has been fully terminated. - if (shutdown == 1) { - // Calling pthread_cancel here in - // the case of a shutdown just assures that the thread is - // gone and that the process has been fully terminated. - - DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); + DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown"); - status = pthread_cancel(connectivity_dequeue_thread_id); + int status = pthread_cancel(connectivity_dequeue_thread_id); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); - status = -1; - } else - status = 0; - } else { - status = pthread_join(connectivity_dequeue_thread_id, /* return = */ NULL); - if (status != 0 && status != ESRCH) { - ERROR("connectivity plugin: Stopping dequeue thread failed."); - status = -1; - } else - status = 0; - } + if (status != 0 && status != ESRCH) { + ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status); + status = -1; + } else + status = 0; pthread_mutex_lock(&connectivity_threads_lock); memset(&connectivity_dequeue_thread_id, 0, @@ -914,10 +925,10 @@ static int stop_dequeue_thread(int shutdown) /* {{{ */ return status; } /* }}} int stop_dequeue_thread */ -static int stop_threads(int shutdown) /* {{{ */ +static int stop_threads() /* {{{ */ { - int status = stop_netlink_thread(shutdown); - int status2 = stop_dequeue_thread(shutdown); + int status = stop_netlink_thread(1); + int status2 = stop_dequeue_thread(); if (status != 0) return status; @@ -939,6 +950,9 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ { if (ignorelist == NULL) { ignorelist = ignorelist_create(/* invert = */ 1); + + if (ignorelist == NULL) + return -1; } if (strcasecmp(key, "Interface") == 0) { @@ -956,57 +970,6 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ return 0; } /* }}} int connectivity_config */ -static void -connectivity_dispatch_notification(const char *interface, const char *type, - gauge_t value, gauge_t old_value, - long long unsigned int timestamp) { - - notification_t n = {(value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE), - cdtime(), - "", - "", - "connectivity", - "", - "", - "", - NULL}; - - sstrncpy(n.host, hostname_g, sizeof(n.host)); - sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); - sstrncpy(n.type, "gauge", sizeof(n.type)); - sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); - - char *buf = NULL; - - gen_message_payload(value, old_value, interface, timestamp, &buf); - - notification_meta_t *m = calloc(1, sizeof(*m)); - - if (m == NULL) { - sfree(buf); - ERROR("connectivity plugin: unable to allocate metadata: %s", STRERRNO); - return; - } - - sstrncpy(m->name, "ves", sizeof(m->name)); - m->nm_value.nm_string = sstrdup(buf); - m->type = NM_TYPE_STRING; - n.meta = m; - - DEBUG("connectivity plugin: notification message: %s", - n.meta->nm_value.nm_string); - - DEBUG("connectivity plugin: dispatching state %d for interface %s", - (int)value, interface); - - plugin_dispatch_notification(&n); - plugin_notification_meta_free(n.meta); - - // strdup'd in gen_message_payload - if (buf != NULL) - sfree(buf); -} - static int connectivity_read(void) /* {{{ */ { pthread_mutex_lock(&connectivity_threads_lock); @@ -1041,7 +1004,7 @@ static int connectivity_shutdown(void) /* {{{ */ { DEBUG("connectivity plugin: Shutting down thread."); - int status = stop_threads(1); + int status = stop_threads(); interface_list_t *il = interface_list_head; while (il != NULL) {