From a50d4b50b4e067e9f7b6b1c091dc0c6bdfc884a2 Mon Sep 17 00:00:00 2001 From: Andrew Bays Date: Thu, 18 Jan 2018 09:20:11 -0500 Subject: [PATCH] connectivity notifications --- Makefile.am | 5 +- configure.ac | 7 +- src/connectivity.c | 428 +++++++++++++++++++++++++++++++++++++++++++++-------- 3 files changed, 374 insertions(+), 66 deletions(-) diff --git a/Makefile.am b/Makefile.am index e9a38135..df4cccd7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -641,8 +641,9 @@ if BUILD_PLUGIN_CONNECTIVITY pkglib_LTLIBRARIES += connectivity.la connectivity_la_SOURCES = src/connectivity.c connectivity_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBMNL_CFLAGS) -connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS) -connectivity_la_LIBADD = $(BUILD_WITH_LIBMNL_LIBS) +connectivity_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) +connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +connectivity_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) $(BUILD_WITH_LIBMNL_LIBS) endif if BUILD_PLUGIN_CONNTRACK diff --git a/configure.ac b/configure.ac index e5b7ebab..d6644484 100644 --- a/configure.ac +++ b/configure.ac @@ -6111,6 +6111,7 @@ plugin_battery="no" plugin_bind="no" plugin_ceph="no" plugin_cgroups="no" +plugin_connectivity="no" plugin_conntrack="no" plugin_contextswitch="no" plugin_cpu="no" @@ -6223,6 +6224,10 @@ if test "x$ac_system" = "xLinux"; then if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then plugin_ovs_events="yes" plugin_ovs_stats="yes" + + if test "x$with_libmnl" = "xyes"; then + plugin_connectivity="yes" + fi fi fi @@ -6522,7 +6527,7 @@ AC_PLUGIN([bind], [$plugin_bind], [ISC Bind nameserver AC_PLUGIN([ceph], [$plugin_ceph], [Ceph daemon statistics]) AC_PLUGIN([cgroups], [$plugin_cgroups], [CGroups CPU usage accounting]) AC_PLUGIN([chrony], [yes], [Chrony statistics]) -AC_PLUGIN([connectivity], [$with_libmnl], [Network interface up/down statistics]) +AC_PLUGIN([connectivity], [$plugin_connectivity], [Network interface up/down events]) AC_PLUGIN([conntrack], [$plugin_conntrack], [nf_conntrack statistics]) AC_PLUGIN([contextswitch], [$plugin_contextswitch], [context switch statistics]) AC_PLUGIN([cpu], [$plugin_cpu], [CPU usage statistics]) diff --git a/src/connectivity.c b/src/connectivity.c index 5a27fe1f..fea24fe0 100644 --- a/src/connectivity.c +++ b/src/connectivity.c @@ -45,8 +45,47 @@ #include #include +#include +#include +#if HAVE_YAJL_YAJL_VERSION_H +#include +#endif +#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1) +#define HAVE_YAJL_V2 1 +#endif + #define MYPROTO NETLINK_ROUTE +#define CONNECTIVITY_DOMAIN_FIELD "domain" +#define CONNECTIVITY_DOMAIN_VALUE "stateChange" +#define CONNECTIVITY_EVENT_ID_FIELD "eventId" +#define CONNECTIVITY_EVENT_NAME_FIELD "eventName" +#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down" +#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up" +#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec" +#define CONNECTIVITY_PRIORITY_FIELD "priority" +#define CONNECTIVITY_PRIORITY_VALUE "high" +#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName" +#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin" +#define CONNECTIVITY_SEQUENCE_FIELD "sequence" +#define CONNECTIVITY_SEQUENCE_VALUE "0" +#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName" +#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec" +#define CONNECTIVITY_VERSION_FIELD "version" +#define CONNECTIVITY_VERSION_VALUE "1.0" + +#define CONNECTIVITY_NEW_STATE_FIELD "newState" +#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService" +#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService" +#define CONNECTIVITY_OLD_STATE_FIELD "oldState" +#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService" +#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD \ + "stateChangeFieldsVersion" +#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0" +#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface" + /* * Private data types */ @@ -56,8 +95,7 @@ struct interfacelist_s { uint32_t status; uint32_t prev_status; uint32_t sent; - uint32_t sec; - uint32_t usec; + long long unsigned int timestamp; struct interfacelist_s *next; }; @@ -74,6 +112,7 @@ static pthread_t connectivity_thread_id; static pthread_mutex_t connectivity_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER; static struct mnl_socket *sock; +static int event_id = 0; static const char *config_keys[] = {"Interface"}; static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); @@ -82,6 +121,269 @@ static int config_keys_num = STATIC_ARRAY_SIZE(config_keys); * Private functions */ +static int gen_message_payload(int state, int old_state, const char *interface, + long long unsigned int timestamp, char **buf) { + const unsigned char *buf2; + yajl_gen g; + +#if !defined(HAVE_YAJL_V2) + yajl_gen_config conf = {}; + + conf.beautify = 0; +#endif + +#if HAVE_YAJL_V2 + size_t len; + g = yajl_gen_alloc(NULL); + yajl_gen_config(g, yajl_gen_beautify, 0); +#else + unsigned int len; + g = yajl_gen_alloc(&conf, NULL); +#endif + + yajl_gen_clear(g); + + // *** BEGIN common event header *** + + if (yajl_gen_map_open(g) != yajl_gen_status_ok) + goto err; + + // domain + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD, + strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE, + strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok) + goto err; + + // eventId + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD, + strlen(CONNECTIVITY_EVENT_ID_FIELD)) != + yajl_gen_status_ok) + goto err; + + event_id = event_id + 1; + int event_id_len = sizeof(char) * sizeof(int) * 4 + 1; + char *event_id_str = malloc(event_id_len); + snprintf(event_id_str, event_id_len, "%d", event_id); + + if (yajl_gen_number(g, event_id_str, strlen(event_id_str)) != + yajl_gen_status_ok) { + sfree(event_id_str); + goto err; + } + + sfree(event_id_str); + + // eventName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD, + strlen(CONNECTIVITY_EVENT_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + int event_name_len = 0; + event_name_len = event_name_len + strlen(interface); // interface name + event_name_len = event_name_len + (state == 0 ? 4 : 2); // "down" or "up" + event_name_len = + event_name_len + 12; // "interface", 2 spaces and null-terminator + char *event_name_str = malloc(event_name_len); + memset(event_name_str, '\0', event_name_len); + snprintf(event_name_str, event_name_len, "interface %s %s", interface, + (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE + : CONNECTIVITY_EVENT_NAME_UP_VALUE)); + + if (yajl_gen_string(g, (u_char *)event_name_str, strlen(event_name_str)) != + yajl_gen_status_ok) { + sfree(event_name_str); + goto err; + } + + sfree(event_name_str); + + // lastEpochMicrosec + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD, + strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) != + yajl_gen_status_ok) + goto err; + + int last_epoch_microsec_len = + sizeof(char) * sizeof(long long unsigned int) * 4 + 1; + char *last_epoch_microsec_str = malloc(last_epoch_microsec_len); + snprintf(last_epoch_microsec_str, last_epoch_microsec_len, "%llu", + (long long unsigned int)CDTIME_T_TO_US(cdtime())); + + if (yajl_gen_number(g, last_epoch_microsec_str, + strlen(last_epoch_microsec_str)) != yajl_gen_status_ok) { + sfree(last_epoch_microsec_str); + goto err; + } + + sfree(last_epoch_microsec_str); + + // priority + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD, + strlen(CONNECTIVITY_PRIORITY_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE, + strlen(CONNECTIVITY_PRIORITY_VALUE)) != + yajl_gen_status_ok) + goto err; + + // reportingEntityName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD, + strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE, + strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) != + yajl_gen_status_ok) + goto err; + + // sequence + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD, + strlen(CONNECTIVITY_SEQUENCE_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE, + strlen(CONNECTIVITY_SEQUENCE_VALUE)) != + yajl_gen_status_ok) + goto err; + + // sourceName + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD, + strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) != + yajl_gen_status_ok) + goto err; + + // startEpochMicrosec + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD, + strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) != + yajl_gen_status_ok) + goto err; + + int start_epoch_microsec_len = + sizeof(char) * sizeof(long long unsigned int) * 4 + 1; + char *start_epoch_microsec_str = malloc(start_epoch_microsec_len); + snprintf(start_epoch_microsec_str, start_epoch_microsec_len, "%llu", + (long long unsigned int)timestamp); + + if (yajl_gen_number(g, start_epoch_microsec_str, + strlen(start_epoch_microsec_str)) != yajl_gen_status_ok) { + sfree(start_epoch_microsec_str); + goto err; + } + + sfree(start_epoch_microsec_str); + + // version + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD, + strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE, + strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok) + goto err; + + // *** END common event header *** + + // *** BEGIN state change fields *** + + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_map_open(g) != yajl_gen_status_ok) + goto err; + + // newState + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD, + strlen(CONNECTIVITY_NEW_STATE_FIELD)) != + yajl_gen_status_ok) + goto err; + + int new_state_len = + (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE) + : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE)); + + if (yajl_gen_string( + g, (u_char *)(state == 0 ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE), + new_state_len) != yajl_gen_status_ok) + goto err; + + // oldState + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD, + strlen(CONNECTIVITY_OLD_STATE_FIELD)) != + yajl_gen_status_ok) + goto err; + + int old_state_len = + (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE) + : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE)); + + if (yajl_gen_string( + g, (u_char *)(old_state == 0 ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE + : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE), + old_state_len) != yajl_gen_status_ok) + goto err; + + // stateChangeFieldsVersion + if (yajl_gen_string(g, + (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE, + strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) != + yajl_gen_status_ok) + goto err; + + // stateInterface + if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD, + strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) != + yajl_gen_status_ok) + goto err; + + if (yajl_gen_map_close(g) != yajl_gen_status_ok) + goto err; + + // *** END state change fields *** + + if (yajl_gen_map_close(g) != yajl_gen_status_ok) + goto err; + + if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok) + goto err; + + *buf = malloc(strlen((char *)buf2) + 1); + + sstrncpy(*buf, (char *)buf2, strlen((char *)buf2) + 1); + + yajl_gen_free(g); + + return 0; + +err: + yajl_gen_free(g); + ERROR("connectivity plugin: gen_message_payload failed to generate JSON"); + return -1; +} + static int connectivity_link_state(struct nlmsghdr *msg) { int retval = 0; struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg); @@ -112,35 +414,29 @@ static int connectivity_link_state(struct nlmsghdr *msg) { break; if (il == NULL) { - INFO("connectivity plugin: Ignoring link state change for unmonitored " - "interface: %s", - dev); + DEBUG("connectivity plugin: Ignoring link state change for unmonitored " + "interface: %s", + dev); } else { uint32_t prev_status; - struct timeval tv; - gettimeofday(&tv, NULL); - - unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; - - INFO("connectivity plugin (%llu): Interface %s status is now %s", - millisecondsSinceEpoch, dev, - ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); prev_status = il->status; il->status = ((ifi->ifi_flags & IFF_RUNNING) ? 1 : 0); - il->sec = tv.tv_sec; - il->usec = tv.tv_usec; + il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); + // If the new status is different than the previous status, // store the previous status and set sent to zero if (il->status != prev_status) { il->prev_status = prev_status; il->sent = 0; } + + DEBUG("connectivity plugin (%llu): Interface %s status is now %s", + il->timestamp, dev, + ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN")); } - // no need to loop again, we found the interface name + // no need to loop again, we found the interface name attr // (otherwise the first if-statement in the loop would // have moved us on with 'continue') break; @@ -210,8 +506,7 @@ static int read_event(struct mnl_socket *nl, /* Message is some kind of error */ if (h->nlmsg_type == NLMSG_ERROR) { - ERROR("connectivity plugin: read_event: Message is an error - decode " - "TBD\n"); + ERROR("connectivity plugin: read_event: Message is an error\n"); return -1; // Error } @@ -335,7 +630,7 @@ static int stop_thread(int shutdown) /* {{{ */ // the case of a shutdown is just assures that the thread is // gone and that the process has been fully terminated. - INFO("connectivity plugin: Canceling thread for process shutdown"); + DEBUG("connectivity plugin: Canceling thread for process shutdown"); status = pthread_cancel(connectivity_thread_id); @@ -356,7 +651,7 @@ static int stop_thread(int shutdown) /* {{{ */ connectivity_thread_error = 0; pthread_mutex_unlock(&connectivity_lock); - INFO("connectivity plugin: Finished requesting stop of thread"); + DEBUG("connectivity plugin: Finished requesting stop of thread"); return (status); } /* }}} int stop_thread */ @@ -397,6 +692,7 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ il->interface = interface; il->status = 2; // "unknown" il->prev_status = 2; + il->timestamp = (long long unsigned int)CDTIME_T_TO_US(cdtime()); il->sent = 0; il->next = interfacelist_head; interfacelist_head = il; @@ -408,46 +704,54 @@ static int connectivity_config(const char *key, const char *value) /* {{{ */ return (0); } /* }}} int connectivity_config */ -static void submit(const char *interface, const char *type, /* {{{ */ - gauge_t value, uint32_t sec, uint32_t usec) { - value_list_t vl = VALUE_LIST_INIT; +static void connectivity_dispatch_notification( + const char *interface, const char *type, /* {{{ */ + gauge_t value, gauge_t old_value, long long unsigned int timestamp) { + char *buf = NULL; + notification_t n = { + NOTIF_FAILURE, cdtime(), "", "", "connectivity", "", "", "", NULL}; + + if (value == 1) + n.severity = NOTIF_OKAY; + char hostname[1024]; - vl.values = &(value_t){.gauge = value}; - vl.values_len = 1; - sstrncpy(vl.plugin, "connectivity", sizeof(vl.plugin)); - sstrncpy(vl.type_instance, interface, sizeof(vl.type_instance)); - sstrncpy(vl.type, type, sizeof(vl.type)); - - // Create metadata to store JSON key-values - meta_data_t *meta = meta_data_create(); - - vl.meta = meta; - // For latency measurement - struct timeval tv; - gettimeofday(&tv, NULL); gethostname(hostname, sizeof(hostname)); - char strSec[11]; - char struSec[11]; - snprintf(strSec, sizeof strSec, "%" PRIu32, sec); - snprintf(struSec, sizeof struSec, "%" PRIu32, usec); - if (value == 1) { - meta_data_add_string(meta, "condition", "interface_up"); - meta_data_add_string(meta, "entity", interface); - meta_data_add_string(meta, "source", hostname); - meta_data_add_string(meta, "sec", strSec); - meta_data_add_string(meta, "usec", struSec); - meta_data_add_string(meta, "dest", "interface_down"); - } else { - meta_data_add_string(meta, "condition", "interface_down"); - meta_data_add_string(meta, "entity", interface); - meta_data_add_string(meta, "source", hostname); - meta_data_add_string(meta, "sec", strSec); - meta_data_add_string(meta, "usec", struSec); - meta_data_add_string(meta, "dest", "interface_up"); + + sstrncpy(n.host, hostname, sizeof(n.host)); + sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance)); + sstrncpy(n.type, "gauge", sizeof(n.type)); + sstrncpy(n.type_instance, "interface_status", sizeof(n.type_instance)); + + gen_message_payload(value, old_value, interface, timestamp, &buf); + + notification_meta_t *m = calloc(1, sizeof(*m)); + + if (m == NULL) { + char errbuf[1024]; + sfree(buf); + ERROR("connectivity plugin: unable to allocate metadata: %s", + sstrerror(errno, errbuf, sizeof(errbuf))); + return; } - plugin_dispatch_values(&vl); -} /* }}} void interface_submit */ + sstrncpy(m->name, "ves", sizeof(m->name)); + m->nm_value.nm_string = sstrdup(buf); + m->type = NM_TYPE_STRING; + n.meta = m; + + DEBUG("connectivity plugin: notification message: %s", + n.meta->nm_value.nm_string); + + DEBUG("connectivity plugin: dispatching state %d for interface %s", + (int)value, interface); + + plugin_dispatch_notification(&n); + plugin_notification_meta_free(n.meta); + + // malloc'd in gen_message_payload + if (buf != NULL) + sfree(buf); +} static int connectivity_read(void) /* {{{ */ { @@ -475,8 +779,6 @@ static int connectivity_read(void) /* {{{ */ uint32_t prev_status; uint32_t sent; - /* Locking here works, because the structure of the linked list is only - * changed during configure and shutdown. */ pthread_mutex_lock(&connectivity_lock); status = il->status; @@ -484,8 +786,8 @@ static int connectivity_read(void) /* {{{ */ sent = il->sent; if (status != prev_status && sent == 0) { - submit(il->interface, "gauge", status, il->sec, il->usec); - + connectivity_dispatch_notification(il->interface, "gauge", status, + prev_status, il->timestamp); il->sent = 1; } @@ -499,7 +801,7 @@ static int connectivity_shutdown(void) /* {{{ */ { interfacelist_t *il; - INFO("connectivity plugin: Shutting down thread."); + DEBUG("connectivity plugin: Shutting down thread."); if (stop_thread(1) < 0) return (-1); -- 2.11.0