From: Mytnyk, VolodymyrX Date: Wed, 31 Aug 2016 16:55:10 +0000 (+0100) Subject: OVS link: Implement OVS link plugin X-Git-Tag: collectd-5.8.0~256^2~18 X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=7f0ab40f34e685128edca1ab489bc36dbbb7bd50 OVS link: Implement OVS link plugin This plugin consists of two parts: - OVS link The implementation of the plugin itself, which uses OVS utils API to be able to monitor a link status of OVS connected interfaces and dispatch the values through collectd notification mechanism whenever the link state change occurs. - OVS utils This module implements the OVS DB communication routine specified by RFC7047. It includes: - Connecting/disconnecting to/from OVS DB (via TCP/UNIX); - Mechanism to subscribe to OVS DB table events like init/insert/modify/delete table rows; - API to send custom request and receive result; - Recovery connection mechanism with OVS DB; - Handling of ECHO request to verify the liveness of a database connection; - Helpers functions. Change-Id: Icac392bd1bd40f7dd156bfd2fc4ff08d9725a22f Signed-off-by: Mytnyk, VolodymyrX --- diff --git a/Makefile.am b/Makefile.am index f6554a1a..d4d7cc02 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1278,6 +1278,14 @@ oracle_la_LIBADD = $(BUILD_WITH_ORACLE_LIBS) oracle_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_OVS_LINK +pkglib_LTLIBRARIES += ovs_link.la +ovs_link_la_SOURCES = ovs_link.c utils_ovs.c +ovs_link_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +ovs_link_la_CFLAGS = $(BUILD_WITH_LIBYAJL_CPPFLAGS) +ovs_link_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) +endif + if BUILD_PLUGIN_PERL pkglib_LTLIBRARIES += perl.la perl_la_SOURCES = src/perl.c diff --git a/README b/README index 925f3645..16640ab1 100644 --- a/README +++ b/README @@ -288,6 +288,14 @@ Features - oracle Query data from an Oracle database. + - ovs_link + The plugin monitors the link status of OVS connected interfaces and + dispatches the values through collectd notification mechanism. It + requires YAJL library to be installed. + Detailed instructions for installing and setting up Open vSwitch, see + OVS documentation. + + - perl The perl plugin implements a Perl-interpreter into collectd. You can write your own plugins in Perl and return arbitrary values using this @@ -926,7 +934,7 @@ Prerequisites * libyajl (optional) - Parse JSON data. This is needed for the `ceph', `curl_json' and + Parse JSON data. This is needed for the `ceph', `curl_json', 'ovs_link' and `log_logstash' plugins. diff --git a/configure.ac b/configure.ac index 2d7f3e2a..92431415 100644 --- a/configure.ac +++ b/configure.ac @@ -6314,6 +6314,7 @@ AC_PLUGIN([onewire], [$with_libowcapi], [OneWire sensor stat AC_PLUGIN([openldap], [$with_libldap], [OpenLDAP statistics]) AC_PLUGIN([openvpn], [yes], [OpenVPN client statistics]) AC_PLUGIN([oracle], [$with_oracle], [Oracle plugin]) +AC_PLUGIN([ovs_link], [$with_libyajl], [OVS link status plugin]) AC_PLUGIN([perl], [$plugin_perl], [Embed a Perl interpreter]) AC_PLUGIN([pf], [$have_net_pfvar_h], [BSD packet filter (PF) statistics]) # FIXME: Check for libevent, too. @@ -6738,6 +6739,7 @@ AC_MSG_RESULT([ onewire . . . . . . . $enable_onewire]) AC_MSG_RESULT([ openldap . . . . . . $enable_openldap]) AC_MSG_RESULT([ openvpn . . . . . . . $enable_openvpn]) AC_MSG_RESULT([ oracle . . . . . . . $enable_oracle]) +AC_MSG_RESULT([ ovs_link . . . . . . $enable_ovs_link]) AC_MSG_RESULT([ perl . . . . . . . . $enable_perl]) AC_MSG_RESULT([ pf . . . . . . . . . $enable_pf]) AC_MSG_RESULT([ pinba . . . . . . . . $enable_pinba]) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 62b24560..4b938252 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -168,6 +168,7 @@ #@BUILD_PLUGIN_OPENLDAP_TRUE@LoadPlugin openldap #@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn #@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle +#@BUILD_PLUGIN_OVS_LINK_TRUE@LoadPlugin ovs_link #@BUILD_PLUGIN_PERL_TRUE@LoadPlugin perl #@BUILD_PLUGIN_PINBA_TRUE@LoadPlugin pinba #@BUILD_PLUGIN_PING_TRUE@LoadPlugin ping @@ -991,6 +992,11 @@ # # +# +# OvsDbServerUrl "tcp:127.0.0.1:6640" +# Interfaces "br0" "veth0" +# + # # IncludeDir "/my/include/path" # BaseName "Collectd::Plugins" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 2f29713a..e814af5b 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -5453,6 +5453,58 @@ refer to them from. =back +=head2 Plugin C + + +The I plugin monitors the link status of OVS connected interfaces and +dispatches the values through collectd notification mechanism whenever the link +state change occurs. This plugin uses OVSDB to get a link state change +notification. + +B + + + OvsDbServerUrl "tcp:127.0.0.1:6640" + Interfaces "br0" "veth0" + + +The plugin provides the following configuration options: + +=over 4 + +=item B I + +The URL is an address of OVS DB server JSON-RPC interface used by the plugin. +To enable the interface, OVS DB daemon should be running with '--remote=ptcp:' +or '--remote=punix:' option. See L for more details. The URL +must take one of the following forms: + +=over 4 + +=item BI:I + +Connect to the given tcp I on I, where I is IPv4 address +of OVS DB server which is listening on TCP I for incoming +JSON-RPC client connection. + +=item BI + +Connect to the unix domain server socket named I which is +used by OVS DB for incoming JSON-RPC client connection. + +=back + +Default: C + +=item B [I ...] + +List of interface names to be monitored by this plugin. If this option is missed +or it's empty then all OVS connected interfaces on all bridges are monitored. + +Default: empty (all interfaces on all bridges are monitored) + +=back + =head2 Plugin C This plugin embeds a Perl-interpreter into collectd and provides an interface diff --git a/src/ovs_link.c b/src/ovs_link.c new file mode 100644 index 00000000..b578a09f --- /dev/null +++ b/src/ovs_link.c @@ -0,0 +1,358 @@ +/** + * collectd - src/ovs_link.c + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * Authors: + * Volodymyr Mytnyk + **/ + +#include "common.h" /* auxiliary functions */ +#include "utils_ovs.h" /* OVS helpers */ + +#define OVS_LINK_PLUGIN "ovs_link" +#define OVS_LINK_DEFAULT_OVS_DB_SERVER_URL "tcp:127.0.0.1:6640" +#define CONFIG_LOCK for (int __i = config_lock(); __i != 0 ; \ + __i = config_unlock()) + +struct interface_s { + char *name; /* interface name */ + struct interface_s *next; /* next interface name */ +}; +typedef struct interface_s interface_t; + +struct ovs_link_config_s { + pthread_mutex_t mutex; /* mutex to lock the config structure */ + char *ovs_db_server_url; /* OVS DB server URL */ + ovs_db_t *ovs_db; /* pointer to OVS DB instance */ + interface_t *ifaces; /* interface names */ +}; +typedef struct ovs_link_config_s ovs_link_config_t; + +/* + * Private variables + */ +ovs_link_config_t config = {PTHREAD_MUTEX_INITIALIZER, NULL, NULL, NULL}; + +/* This function is used only by "CONFIG_LOCK" defined above. + * It always returns 1 when the config is locked. + */ +static inline int +config_lock() +{ + pthread_mutex_lock(&config.mutex); + return (1); +} + +/* This function is used only by "CONFIG_LOCK" defined above. + * It always returns 0 when config is unlocked. + */ +static inline int +config_unlock() +{ + pthread_mutex_unlock(&config.mutex); + return (0); +} + +/* Check if given interface name exists in configuration file. It + * returns 1 if exists otherwise 0. If no interfaces are configured, + * 1 is returned + */ +static int +ovs_link_config_iface_exists(const char *ifname) +{ + int rc = 0; + CONFIG_LOCK { + if (!(rc = (config.ifaces == NULL))) { + for (interface_t *iface = config.ifaces; iface; iface = iface->next) + if (rc = (strcmp(ifname, iface->name) == 0)) + break; + } + } + return rc; +} + +/* Release memory allocated for configuration data */ +static void +ovs_link_config_free() +{ + interface_t *del_iface = NULL; + CONFIG_LOCK { + sfree(config.ovs_db_server_url); + while (config.ifaces) { + del_iface = config.ifaces; + config.ifaces = config.ifaces->next; + free(del_iface->name); + free(del_iface); + } + } +} + +/* Parse plugin configuration file and store the config + * in allocated memory. Returns negative value in case of error. + */ +static int +ovs_link_plugin_config(oconfig_item_t *ci) +{ + interface_t *new_iface; + char *if_name; + char *ovs_db_url; + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + if (strcasecmp("OvsDbServerUrl", child->key) == 0) { + if (cf_util_get_string(child, &ovs_db_url) < 0) { + ERROR(OVS_LINK_PLUGIN ": parse '%s' option failed", child->key); + goto failure; + } else + config.ovs_db_server_url = ovs_db_url; + } else if (strcasecmp("Interfaces", child->key) == 0) { + for (int j = 0; j < child->values_num; j++) { + /* check value type */ + if (child->values[j].type != OCONFIG_TYPE_STRING) { + ERROR(OVS_LINK_PLUGIN + ": given interface name is not a string [idx=%d]", j); + goto failure; + } + + /* get value */ + if ((if_name = strdup(child->values[j].value.string)) == NULL) { + ERROR(OVS_LINK_PLUGIN " strdup() copy interface name fail"); + goto failure; + } + + if ((new_iface = malloc(sizeof(*new_iface))) == NULL) { + ERROR(OVS_LINK_PLUGIN ": malloc () copy interface name fail"); + goto failure; + } else { + /* store interface name */ + new_iface->name = if_name; + new_iface->next = config.ifaces; + CONFIG_LOCK { + config.ifaces = new_iface; + } + DEBUG(OVS_LINK_PLUGIN ": found monitored interface \"%s\"", + if_name); + } + } + } else { + ERROR(OVS_LINK_PLUGIN ": option '%s' is not allowed here", child->key); + goto failure; + } + } + return (0); + +failure: + ovs_link_config_free(); + return (-1); +} + +/* Dispatch OVS interface link status event to collectd */ +static int +ovs_link_dispatch_notification(const char *link_name, const char *link_state) +{ + notification_t n = {NOTIF_FAILURE, time(NULL), "", "", OVS_LINK_PLUGIN, + "", "", "", NULL}; + + /* fill the notification data */ + if (link_state != NULL) + n.severity = ((strcmp(link_state, "up") == 0) ? + NOTIF_OKAY : NOTIF_WARNING); + else + link_state = "UNKNOWN"; + + sstrncpy(n.host, hostname_g, sizeof(n.host)); + ssnprintf(n.message, sizeof(n.message), + "link state of \"%s\" interface has been changed to \"%s\"", + link_name, link_state); + + /* send the notification */ + return plugin_dispatch_notification(&n); +} + +/* Process OVS DB update table event. It handles link status update event(s) + * and dispatches the value(s) to collectd if interface name matches one of + * interfaces specified in configuration file. + */ +static void +ovs_link_table_update_cb(yajl_val jupdates) +{ + yajl_val jnew_val = NULL; + yajl_val jupdate = NULL; + yajl_val jrow_update = NULL; + yajl_val jlink_name = NULL; + yajl_val jlink_state = NULL; + const char *link_name = NULL; + + /* JSON "Interface" table update example: + * --------------------------------- + * {"Interface": + * { + * "9adf1db2-29ca-4140-ab22-ae347a4484de": + * { + * "new": + * { + * "name":"br0", + * "link_state":"up" + * }, + * "old": + * { + * "link_state":"down" + * } + * } + * } + * } + */ + if (!YAJL_IS_OBJECT(jupdates) || !(YAJL_GET_OBJECT(jupdates)->len > 0)) { + ERROR(OVS_LINK_PLUGIN ": unexpected OVS DB update event received"); + return; + } + /* verify if this is a table event */ + jupdate = YAJL_GET_OBJECT(jupdates)->values[0]; + if (!YAJL_IS_OBJECT(jupdate)) { + ERROR(OVS_LINK_PLUGIN ": unexpected table update event received"); + return; + } + /* go through all row updates */ + for (int row_index = 0; row_index < YAJL_GET_OBJECT(jupdate)->len; + ++row_index) { + jrow_update = YAJL_GET_OBJECT(jupdate)->values[row_index]; + + /* check row update */ + jnew_val = ovs_utils_get_value_by_key(jrow_update, "new"); + if (jnew_val == NULL) { + ERROR(OVS_LINK_PLUGIN ": unexpected row update received"); + return; + } + /* get link status update */ + jlink_name = ovs_utils_get_value_by_key(jnew_val, "name"); + jlink_state = ovs_utils_get_value_by_key(jnew_val, "link_state"); + if (jlink_name && jlink_state) { + link_name = YAJL_GET_STRING(jlink_name); + if (link_name && ovs_link_config_iface_exists(link_name)) { + /* dispatch notification */ + ovs_link_dispatch_notification(link_name, + YAJL_GET_STRING(jlink_state)); + } + } + } +} + +/* Process OVS DB result table callback. It handles init link status value + * and dispatches the value(s) to collectd. The logic to handle init status + * is same as 'ovs_link_table_update_cb'. + */ +static void +ovs_link_table_result_cb(yajl_val jresult, yajl_val jerror) +{ + (void)jerror; + /* jerror is not used as it is the same all the time + (rfc7047, "Monitor" section, return value) */ + ovs_link_table_update_cb(jresult); +} + +/* Setup OVS DB table callback. It subscribes to 'Interface' tables + * to receive link status events. + */ +static void +ovs_link_initialize(ovs_db_t *pdb) +{ + int ret = 0; + const char tb_name[] = "Interface"; + const char *columns[] = {"name", "link_state", NULL}; + + /* register the update callback */ + ret = ovs_db_table_cb_register(pdb, tb_name, columns, + ovs_link_table_update_cb, + ovs_link_table_result_cb, + OVS_DB_TABLE_CB_FLAG_MODIFY | + OVS_DB_TABLE_CB_FLAG_INITIAL); + if (ret < 0) { + ERROR(OVS_LINK_PLUGIN ": register OVS DB update callback failed"); + return; + } + + DEBUG(OVS_LINK_PLUGIN ": OVS DB has been initialized"); +} + +/* Set default config values (update config) if some of them aren't + * specified in configuration file + */ +static inline int +ovs_link_config_set_default() +{ + if (!config.ovs_db_server_url) + config.ovs_db_server_url = strdup(OVS_LINK_DEFAULT_OVS_DB_SERVER_URL); + return (config.ovs_db_server_url == NULL); +} + +/* Initialize OVS plugin */ +static int +ovs_link_plugin_init(void) +{ + ovs_db_t *ovs_db = NULL; + ovs_db_callback_t cb = {.init_cb = ovs_link_initialize}; + + if (ovs_link_config_set_default()) { + ERROR(OVS_LINK_PLUGIN ": fail to make configuration"); + ovs_link_config_free(); + return (-1); + } + + /* initialize OVS DB */ + if ((ovs_db = ovs_db_init(config.ovs_db_server_url, &cb)) == NULL) { + ERROR(OVS_LINK_PLUGIN ": fail to connect to OVS DB server"); + ovs_link_config_free(); + return (-1); + } + + /* store OVSDB handler */ + CONFIG_LOCK { + config.ovs_db = ovs_db; + } + + DEBUG(OVS_LINK_PLUGIN ": plugin has been initialized"); + return (0); +} + +/* Shutdown OVS plugin */ +static int +ovs_link_plugin_shutdown(void) +{ + /* release memory allocated for config */ + ovs_link_config_free(); + + /* destroy OVS DB */ + if (ovs_db_destroy(config.ovs_db)) + ERROR(OVS_LINK_PLUGIN ": OVSDB object destroy failed"); + + DEBUG(OVS_LINK_PLUGIN ": plugin has been destroyed"); + return (0); +} + +/* Register OVS plugin callbacks */ +void +module_register(void) +{ + plugin_register_complex_config(OVS_LINK_PLUGIN, ovs_link_plugin_config); + plugin_register_init(OVS_LINK_PLUGIN, ovs_link_plugin_init); + plugin_register_shutdown(OVS_LINK_PLUGIN, ovs_link_plugin_shutdown); +} diff --git a/src/utils_ovs.c b/src/utils_ovs.c new file mode 100644 index 00000000..4c99eef2 --- /dev/null +++ b/src/utils_ovs.c @@ -0,0 +1,1284 @@ +/** + * collectd - src/utils_ovs.c + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * Authors: + * Volodymyr Mytnyk + * + * OVS DB API internal architecture diagram + * +------------------------------------------------------------------------------+ + * |OVS plugin |OVS utils | + * | | +------------------------+ | + * | | | echo handler | JSON request/ | + * | | +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ | + * | | | | | | | result | + * | | | +------------------------+ | | | + * | | | | +----+---+--------+ | + * | +----------+ | | +------------------------+ | | | | | + * | | update | | | | update handler | | | YAJL | JSON | | + * | | callback +<-------+(ovs_db_table_update_cp)+<---+ | parser | reader | | + * | +----------+ | | | | | | | | | + * | | | +------------------------+ | +--------+---+----+ | + * | | | | ^ | + * | +----------+ | | +------------------------+ | | | + * | | result | | | | result handler | | | | + * | | callback +<-------+ (ovs_db_result_cb) +<---+ JSON raw | | + * | +----------+ | | | | data | | + * | | | +------------------------+ | | + * | | | | | + * | | | +------------------+ +------------+----+ | + * | +----------+ | | |thread| | |thread| | | + * | | init | | | | | reconnect | | | + * | | callback +<---------+ EVENT WORKER +<------------+ POLL WORKER | | + * | +----------+ | | +------------------+ +--------+--------+ | + * | | | ^ | + * +----------------+-------------------------------------------------------------+ + * | | + * JSON|echo reply raw|data + * v v + * +-------------------+----------------------------------------------+-----------+ + * | TCP/UNIX socket | + * +------------------------------------------------------------------------------- + * + **/ + +/* collectd headers */ +#include "common.h" + +/* private headers */ +#include "utils_ovs.h" + +/* system libraries */ +#include +#include +#include +#include + +#define OVS_ERROR(fmt, ...) do { \ + ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0) +#define OVS_DEBUG(fmt, ...) do { \ + DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \ + ## __VA_ARGS__); } while (0) + +#define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */ +#define OVS_DB_POLL_READ_BLOCK_SIZE 5 /* read block size (bytes) */ +#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch" +#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */ + +#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */ +#define OVS_DB_EVENT_TERMINATE 1 +#define OVS_DB_EVENT_CONNECTED 2 + +#define OVS_DB_POLL_STATE_RUNNING 1 +#define OVS_DB_POLL_STATE_EXITING 2 + +#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */ + +#define OVS_YAJL_CALL(func, ...) \ + do { \ + yajl_gen_ret = yajl_gen_status_ok; \ + if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \ + goto yajl_gen_failure; \ + } while (0) +#define OVS_YAJL_ERROR_BUFFER_SIZE 1024 +#define OVS_ERROR_BUFF_SIZE 512 +#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */ + +/* JSON reader internal data */ +struct ovs_json_reader_s { + char *buff_ptr; + size_t buff_size; + size_t buff_offset; + size_t json_offset; +}; +typedef struct ovs_json_reader_s ovs_json_reader_t; + +/* Result callback declaration */ +struct ovs_result_cb_s { + sem_t sync; + ovs_db_result_cb_t call; +}; +typedef struct ovs_result_cb_s ovs_result_cb_t; + +/* Table callback declaration */ +struct ovs_table_cb_s { + ovs_db_table_cb_t call; +}; +typedef struct ovs_table_cb_s ovs_table_cb_t; + +/* Callback declaration */ +struct ovs_callback_s { + uint64_t uid; + union { + ovs_result_cb_t result; + ovs_table_cb_t table; + }; + struct ovs_callback_s *next; + struct ovs_callback_s *prev; +}; +typedef struct ovs_callback_s ovs_callback_t; + +/* Connection declaration */ +struct ovs_conn_s { + int sock; + int domain; + int type; + int addr_size; + union { + struct sockaddr_in s_inet; + struct sockaddr_un s_unix; + } addr; +}; +typedef struct ovs_conn_s ovs_conn_t; + +/* Event thread data declaration */ +struct ovs_event_thread_s { + pthread_t tid; + pthread_mutex_t mutex; + pthread_cond_t cond; + int value; +}; +typedef struct ovs_event_thread_s ovs_event_thread_t; + +/* Poll thread data declaration */ +struct ovs_poll_thread_s { + pthread_t tid; + pthread_mutex_t mutex; + int state; +}; +typedef struct ovs_poll_thread_s ovs_poll_thread_t; + +/* OVS DB internal data declaration */ +struct ovs_db_s { + ovs_poll_thread_t poll_thread; + ovs_event_thread_t event_thread; + pthread_mutex_t mutex; + ovs_callback_t *cb; + ovs_conn_t conn; + ovs_db_init_cb_t init_cb; +}; +typedef struct ovs_db_s ovs_db_t; + +/* Post an event to event thread. + * Possible events are: + * OVS_DB_EVENT_TERMINATE + * OVS_DB_EVENT_CONNECTED + */ +static void +ovs_db_event_post(ovs_db_t *pdb, int event) +{ + pthread_mutex_lock(&pdb->event_thread.mutex); + pdb->event_thread.value = event; + pthread_mutex_unlock(&pdb->event_thread.mutex); + pthread_cond_signal(&pdb->event_thread.cond); +} + +/* Check if POLL thread is still running. Returns + * 1 if running otherwise 0 is returned */ +static inline int +ovs_db_poll_is_running(ovs_db_t *pdb) +{ + int state = 0; + pthread_mutex_lock(&pdb->poll_thread.mutex); + state = pdb->poll_thread.state; + pthread_mutex_unlock(&pdb->poll_thread.mutex); + return (state == OVS_DB_POLL_STATE_RUNNING); +} + +/* Terminate POLL thread */ +static inline void +ovs_db_poll_terminate(ovs_db_t *pdb) +{ + pthread_mutex_lock(&pdb->poll_thread.mutex); + pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING; + pthread_mutex_unlock(&pdb->poll_thread.mutex); +} + +/* Generate unique identifier (UID). It is used by OVS DB API + * to set "id" field for any OVS DB JSON request. */ +static uint64_t +ovs_uid_generate() +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX)); +} + +/* + * Callback API. These function are used to store + * registered callbacks in OVS DB API. + */ + +/* Add new callback into OVS DB object */ +static void +ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) +{ + pthread_mutex_lock(&pdb->mutex); + if (pdb->cb) + pdb->cb->prev = new_cb; + new_cb->next = pdb->cb; + new_cb->prev = NULL; + pdb->cb = new_cb; + pthread_mutex_unlock(&pdb->mutex); +} + +/* Remove callback from OVS DB object */ +static void +ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) +{ + ovs_callback_t *pre_cb = del_cb->prev; + ovs_callback_t *next_cb = del_cb->next; + + pthread_mutex_lock(&pdb->mutex); + if (next_cb) + next_cb->prev = del_cb->prev; + + if (pre_cb) + pre_cb->next = del_cb->next; + else + pdb->cb = del_cb->next; + + free(del_cb); + pthread_mutex_unlock(&pdb->mutex); +} + +/* Remove all callbacks form OVS DB object */ +static void +ovs_db_callback_remove_all(ovs_db_t *pdb) +{ + pthread_mutex_lock(&pdb->mutex); + for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) { + pdb->cb = pdb->cb->next; + free(del_cb); + } + pdb->cb = NULL; + pthread_mutex_unlock(&pdb->mutex); +} + +/* Get/find callback in OVS DB object by UID. Returns pointer + * to requested callback otherwise NULL is returned */ +static ovs_callback_t * +ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) +{ + pthread_mutex_lock(&pdb->mutex); + for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next) + if (cb->uid == uid) { + pthread_mutex_unlock(&pdb->mutex); + return cb; + } + pthread_mutex_unlock(&pdb->mutex); + return NULL; +} + +/* Send all requested data to the socket. Returns 0 if + * ALL request data has been sent otherwise negative value + * is returned */ +static int +ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) +{ + ssize_t nbytes = 0; + size_t rem = len; + size_t off = 0; + + while (rem > 0) { + if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0) + return (-1); + rem -= (size_t)nbytes; + off += (size_t)nbytes; + } + return (0); +} + +/* Parse OVS server URL. + * Format of the URL: + * "tcp:a.b.c.d:port" - define TCP connection (INET domain) + * "unix:file" - define UNIX socket file (UNIX domain) + */ +static int +ovs_db_url_parse(const char *surl, ovs_conn_t *conn) +{ + ovs_conn_t tmp_conn; + char *nexttok = NULL; + char *in_str = NULL; + char *saveptr; + int ret = 0; + + /* sanity check */ + if ((surl == NULL) || (strlen(surl) < 1)) + return (-1); + + /* parse domain */ + tmp_conn = *conn; + in_str = sstrdup(surl); + if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) { + if (strcmp("tcp", nexttok) == 0) { + tmp_conn.domain = AF_INET; + tmp_conn.type = SOCK_STREAM; + tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet); + } else if (strcmp("unix", nexttok) == 0) { + tmp_conn.domain = AF_UNIX; + tmp_conn.type = SOCK_STREAM; + tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix); + } else + goto failure; + } else + goto failure; + + /* parse url depending on domain */ + if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) { + if (tmp_conn.domain == AF_UNIX) { + /* */ + tmp_conn.addr.s_inet.sin_family = AF_UNIX; + sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1); + } else { + /* */ + tmp_conn.addr.s_inet.sin_family = AF_INET; + ret = + inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr); + if (ret == 1) { + if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) + tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok)); + else + goto failure; + } else + goto failure; + } + } + + /* save result and return success */ + *conn = tmp_conn; + sfree(in_str); + return (0); + +failure: + OVS_ERROR("%s() : invalid OVS DB URL provided"); + sfree(in_str); + return (-1); +} + +/* + * YAJL (Yet Another JSON Library) helper functions + * Documentation (https://lloyd.github.io/yajl/) + */ + +/* Add null-terminated string into YAJL generator handle (JSON object). + * Similar function to yajl_gen_string() but takes null-terminated string + * instead of string and its length. + * + * jgen - YAJL generator handle allocated by yajl_gen_alloc() + * string - Null-terminated string + */ +static inline yajl_gen_status +ovs_yajl_gen_tstring(yajl_gen hander, const char *string) +{ + return yajl_gen_string(hander, string, strlen(string)); +} + +/* Add YAJL value into YAJL generator handle (JSON object) + * + * jgen - YAJL generator handle allocated by yajl_gen_alloc() + * jval - YAJL value usually returned by yajl_tree_get() + */ +static yajl_gen_status +ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) +{ + size_t array_len = 0; + yajl_val *jvalues = NULL; + yajl_val jobj_value = NULL; + const char *obj_key = NULL; + size_t obj_len = 0; + yajl_gen_status yajl_gen_ret; + + if (YAJL_IS_STRING(jval)) + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval)); + else if (YAJL_IS_DOUBLE(jval)) + OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval)); + else if (YAJL_IS_INTEGER(jval)) + OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval)); + else if (YAJL_IS_TRUE(jval)) + OVS_YAJL_CALL(yajl_gen_bool, jgen, 1); + else if (YAJL_IS_FALSE(jval)) + OVS_YAJL_CALL(yajl_gen_bool, jgen, 0); + else if (YAJL_IS_NULL(jval)) + OVS_YAJL_CALL(yajl_gen_null, jgen); + else if (YAJL_IS_ARRAY(jval)) { + /* create new array and add all elements into the array */ + array_len = YAJL_GET_ARRAY(jval)->len; + jvalues = YAJL_GET_ARRAY(jval)->values; + OVS_YAJL_CALL(yajl_gen_array_open, jgen); + for (int i = 0; i < array_len; i++) + OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]); + OVS_YAJL_CALL(yajl_gen_array_close, jgen); + } else if (YAJL_IS_OBJECT(jval)) { + /* create new object and add all elements into the object */ + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + obj_len = YAJL_GET_OBJECT(jval)->len; + for (int i = 0; i < obj_len; i++) { + obj_key = YAJL_GET_OBJECT(jval)->keys[i]; + jobj_value = YAJL_GET_OBJECT(jval)->values[i]; + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key); + OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value); + } + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + } else { + OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__, + (int)(jval)->type); + goto yajl_gen_failure; + } + return yajl_gen_status_ok; + +yajl_gen_failure: + OVS_ERROR("%s() error to generate value", __FUNCTION__); + return yajl_gen_ret; +} + +/* OVS DB echo request handler. When OVS DB sends + * "echo" request to the client, client should generate + * "echo" replay with the same content received in the + * request */ +static int +ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) +{ + yajl_val jparams; + yajl_val jid; + yajl_gen jgen; + size_t resp_len = 0; + const char *resp = NULL; + const char *params_path[] = {"params", NULL}; + const char *id_path[] = {"id", NULL}; + yajl_gen_status yajl_gen_ret; + + if ((jgen = yajl_gen_alloc(NULL)) == NULL) + return (-1); + + /* check & get request attributes */ + if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL || + ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) { + OVS_ERROR("parse echo request failed"); + goto yajl_gen_failure; + } + + /* generate JSON echo response */ + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result"); + OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams); + + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error"); + OVS_YAJL_CALL(yajl_gen_null, jgen); + + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id"); + OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid); + + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp, + &resp_len); + + /* send the response */ + OVS_DEBUG("response: %s", resp); + if (ovs_db_data_send(pdb, resp, resp_len) < 0) { + OVS_ERROR("send echo reply failed"); + goto yajl_gen_failure; + } + /* clean up and return success */ + yajl_gen_clear(jgen); + return (0); + +yajl_gen_failure: + /* release memory */ + yajl_gen_clear(jgen); + return (-1); +} + +/* Get OVS DB registered callback by YAJL val. The YAJL + * value should be YAJL string (UID). Returns NULL if + * callback hasn't been found. + */ +static ovs_callback_t * +ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) +{ + char *endptr = NULL; + const char *suid = NULL; + uint64_t uid; + + if (jid && YAJL_IS_STRING(jid)) { + suid = YAJL_GET_STRING(jid); + uid = (uint64_t) strtoul(suid, &endptr, 16); + if (*endptr == '\0' && uid) + return ovs_db_callback_get(pdb, uid); + } + + return NULL; +} + +/* OVS DB table update event handler. + * This callback is called by POLL thread if OVS DB + * table update callback is received from the DB + * server. Once registered callback found, it's called + * by this handler. */ +static int +ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) +{ + ovs_callback_t *cb = NULL; + yajl_val jvalue; + yajl_val jparams; + yajl_val jtable_updates; + yajl_val jtable_update; + size_t obj_len = 0; + const char *table_name = NULL; + const char *params_path[] = {"params", NULL}; + const char *id_path[] = {"id", NULL}; + + /* check & get request attributes */ + if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL || + (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) + goto ovs_failure; + + /* check array length: [, ] */ + if (YAJL_GET_ARRAY(jparams)->len != 2) + goto ovs_failure; + + jvalue = YAJL_GET_ARRAY(jparams)->values[0]; + jtable_updates = YAJL_GET_ARRAY(jparams)->values[1]; + if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) + goto ovs_failure; + + /* find registered callback based on */ + cb = ovs_db_table_callback_get(pdb, jvalue); + if (cb == NULL || cb->table.call == NULL) + goto ovs_failure; + + /* call registered callback */ + cb->table.call(jtable_updates); + return 0; + +ovs_failure: + OVS_ERROR("invalid OVS DB table update event"); + return (-1); +} + +/* OVS DB result request handler. + * This callback is called by POLL thread if OVS DB + * result reply is received from the DB server. + * Once registered callback found, it's called + * by this handler. */ +static int +ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) +{ + ovs_callback_t *cb = NULL; + yajl_val jresult; + yajl_val jerror; + yajl_val jid; + const char *result_path[] = {"result", NULL}; + const char *error_path[] = {"error", NULL}; + const char *id_path[] = {"id", NULL}; + + jresult = yajl_tree_get(jnode, result_path, yajl_t_any); + jerror = yajl_tree_get(jnode, error_path, yajl_t_any); + jid = yajl_tree_get(jnode, id_path, yajl_t_string); + + /* check & get result attributes */ + if (!jresult || !jerror || !jid) + return (-1); + + /* try to find registered callback */ + cb = ovs_db_table_callback_get(pdb, jid); + if (cb != NULL && cb->result.call != NULL) { + /* call registered callback */ + cb->result.call(jresult, jerror); + /* unlock owner of the reply */ + sem_post(&cb->result.sync); + } + + return (0); +} + +/* Handle JSON data (one request) and call + * appropriate event OVS DB handler. Currently, + * update callback 'ovs_db_table_update_cb' and + * result callback 'ovs_db_result_cb' is supported. + */ +static int +ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len) +{ + const char *method = NULL; + char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE]; + const char *method_path[] = {"method", NULL}; + const char *result_path[] = {"result", NULL}; + char *sjson = NULL; + yajl_val jnode, jval; + + /* duplicate the data to make null-terminated string + * required for yajl_tree_parse() */ + if ((sjson = strndup(data, len)) == NULL) + return (-1); + + OVS_DEBUG("%s", sjson); + + /* parse json data */ + jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf)); + if (jnode == NULL) { + OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf); + return (-1); + } + + /* get method name */ + if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) { + method = YAJL_GET_STRING(jval); + if (strcmp("echo", method) == 0) { + /* echo request from the server */ + if (ovs_db_table_echo_cb(pdb, jnode) < 0) + OVS_ERROR("handle echo request failed"); + } else if (strcmp("update", method) == 0) { + /* update notification */ + if (ovs_db_table_update_cb(pdb, jnode) < 0) + OVS_ERROR("handle update notification failed"); + } + } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) { + /* result notification */ + if (ovs_db_result_cb(pdb, jnode) < 0) + OVS_ERROR("handle result reply failed"); + } + + /* release memory */ + yajl_tree_free(jnode); + sfree(sjson); + return (0); +} + +/* + * JSON reader implementation. + * + * This module process raw JSON data (byte stream) and + * returns fully-fledged JSON data which can be processed + * (parsed) by YAJL later. + */ + +/* Allocate JSON reader instance */ +static inline ovs_json_reader_t * +ovs_json_reader_alloc() +{ + ovs_json_reader_t *jreader = NULL; + + if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL) + return NULL; + + return jreader; +} + +/* Push raw data into into the JSON reader for processing */ +static inline int +ovs_json_reader_push_data(ovs_json_reader_t *jreader, + const char *data, size_t data_len) +{ + char *new_buff = NULL; + size_t available = jreader->buff_size - jreader->buff_offset; + + /* check/update required memory space */ + if (available < data_len) { + OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]", + (int)jreader->buff_size, (int)available, (int)data_len); + + /* allocate new chunk of memory */ + new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len)); + if (new_buff == NULL) + return (-1); + + /* point to new allocated memory */ + jreader->buff_ptr = new_buff; + jreader->buff_size += data_len; + } + + /* store input data */ + memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len); + jreader->buff_offset += data_len; + return (0); +} + +/* Pop one fully-fledged JSON if already exists. Returns 0 if + * completed JSON already exists otherwise negative value is + * returned */ +static inline int +ovs_json_reader_pop(ovs_json_reader_t *jreader, + const char **json_ptr, size_t *json_len_ptr) +{ + size_t nbraces = 0; + size_t json_len = 0; + char *json = NULL; + + /* search open/close brace */ + for (int i = jreader->json_offset; i < jreader->buff_offset; i++) { + if (jreader->buff_ptr[i] == '{') { + nbraces++; + } else if (jreader->buff_ptr[i] == '}') + if (nbraces) + if (!(--nbraces)) { + /* JSON data */ + *json_ptr = jreader->buff_ptr + jreader->json_offset; + *json_len_ptr = json_len + 1; + jreader->json_offset = i + 1; + return (0); + } + + /* increase JSON data length */ + if (nbraces) + json_len++; + } + + if (jreader->json_offset) { + if (jreader->json_offset < jreader->buff_offset) { + /* shift data to the beginning of the buffer + * and zero rest of the buffer data */ + json = &jreader->buff_ptr[jreader->json_offset]; + json_len = jreader->buff_offset - jreader->json_offset; + for (int i = 0; i < jreader->buff_size; i++) + jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0)); + jreader->buff_offset = json_len; + } else + /* reset the buffer */ + jreader->buff_offset = 0; + + /* data is at the beginning of the buffer */ + jreader->json_offset = 0; + } + + return (-1); +} + +/* Reset JSON reader. It is useful when start processing + * new raw data. E.g.: in case of lost stream connection. + */ +static inline void +ovs_json_reader_reset(ovs_json_reader_t *jreader) +{ + if (jreader) { + jreader->buff_offset = 0; + jreader->json_offset = 0; + } +} + +/* Release internal data allocated for JSON reader */ +static inline void +ovs_json_reader_free(ovs_json_reader_t *jreader) +{ + if (jreader) { + free(jreader->buff_ptr); + free(jreader); + } +} + +/* Reconnect to OVD DB and call init OVS DB callback + * 'init_cb' if connection has been established. + */ +static int +ovs_db_reconnect(ovs_db_t *pdb) +{ + char errbuff[OVS_ERROR_BUFF_SIZE]; + + /* remove all registered OVS DB table/result callbacks */ + ovs_db_callback_remove_all(pdb); + + /* open new socket */ + if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) { + sstrerror(errno, errbuff, sizeof(errbuff)); + OVS_ERROR("socket(): %s", errbuff); + return (-1); + } + + /* try to connect to server */ + if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr, + pdb->conn.addr_size) < 0) { + sstrerror(errno, errbuff, sizeof(errbuff)); + OVS_ERROR("connect(): %s", errbuff); + close(pdb->conn.sock); + return (-1); + } + + /* send notification to event thread */ + ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED); + return (0); +} + +/* POLL worker thread. + * It listens on OVS DB connection for incoming + * requests/reply/events etc. Also, it reconnects to OVS DB + * if connection has been lost. + */ +static void * +ovs_poll_worker(void *arg) +{ + ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */ + ovs_json_reader_t *jreader = NULL; + const char *json; + size_t json_len; + ssize_t nbytes = 0; + char buff[OVS_DB_POLL_READ_BLOCK_SIZE]; + struct pollfd poll_fd; + int poll_ret = 0; + + if ((jreader = ovs_json_reader_alloc()) == NULL) { + OVS_ERROR("initialize json reader failed"); + goto thread_exit; + } + + /* start polling data */ + poll_fd.fd = pdb->conn.sock; + poll_fd.events = POLLIN | POLLPRI; + poll_fd.revents = 0; + + /* poll data */ + while (ovs_db_poll_is_running(pdb)) { + poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000); + if (poll_ret > 0) { + if (poll_fd.revents & POLLNVAL) { + /* invalid file descriptor, reconnect */ + if (ovs_db_reconnect(pdb) != 0) { + /* sleep awhile until next reconnect */ + usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000); + } + ovs_json_reader_reset(jreader); + poll_fd.fd = pdb->conn.sock; + } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { + /* connection is broken */ + OVS_ERROR("poll() peer closed its end of the channel"); + close(poll_fd.fd); + } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { + /* read incoming data */ + nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0); + if (nbytes > 0) { + OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes); + ovs_json_reader_push_data(jreader, buff, nbytes); + while (!ovs_json_reader_pop(jreader, &json, &json_len)) + /* process JSON data */ + ovs_db_json_data_process(pdb, json, json_len); + } else if (nbytes == 0) { + OVS_ERROR("recv() peer has performed an orderly shutdown"); + close(poll_fd.fd); + } else { + OVS_ERROR("recv() receive data error"); + break; + } + } /* poll() POLLIN & POLLPRI */ + } else if (poll_ret == 0) + OVS_DEBUG("poll() timeout"); + else { + OVS_ERROR("poll() error"); + break; + } + } + +thread_exit: + OVS_DEBUG("poll thread has been completed"); + ovs_json_reader_free(jreader); + pthread_exit((void *)0); + return ((void *)0); +} + +/* EVENT worker thread. + * Perform task based on incoming events. This + * task can be done asynchronously which allows to + * handle OVD DB callback like 'init_cb'. + */ +static void * +ovs_event_worker(void *arg) +{ + int ret = 0; + ovs_db_t *pdb = (ovs_db_t *)arg; + struct timespec ts; + + while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) { + /* wait for an event */ + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (OVS_DB_EVENT_TIMEOUT); + ret = pthread_cond_timedwait(&pdb->event_thread.cond, + &pdb->event_thread.mutex, &ts); + if (!ret) { + /* handle the event */ + OVS_DEBUG("handle event %d", pdb->event_thread.value); + if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED) + if (pdb->init_cb) + pdb->init_cb(pdb); + } else if (ret == ETIMEDOUT) { + /* wait timeout */ + OVS_DEBUG("no event received (timeout)"); + continue; + } else { + /* unexpected error */ + OVS_ERROR("pthread_cond_timedwait() failed"); + break; + } + } + +thread_exit: + OVS_DEBUG("event thread has been completed"); + pthread_exit((void *)0); + return ((void *)0); +} + +/* Stop EVENT thread */ +static int +ovs_db_event_thread_stop(ovs_db_t *pdb) +{ + ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE); + if (pthread_join(pdb->event_thread.tid, NULL) != 0) + return (-1); + pthread_mutex_unlock(&pdb->event_thread.mutex); + pthread_mutex_destroy(&pdb->event_thread.mutex); + return (0); +} + +/* Stop POLL thread */ +static int +ovs_db_poll_thread_stop(ovs_db_t *pdb) +{ + ovs_db_poll_terminate(pdb); + if (pthread_join(pdb->poll_thread.tid, NULL) != 0) + return (-1); + pthread_mutex_destroy(&pdb->poll_thread.mutex); + return (0); +} + +/* + * Public OVS DB API implementation + */ + +ovs_db_t * +ovs_db_init(const char *surl, ovs_db_callback_t *cb) +{ + pthread_mutexattr_t mutex_attr; + ovs_db_t *pdb = NULL; + + /* allocate db data & fill it */ + if ((pdb = calloc(1, sizeof(*pdb))) == NULL) + return (NULL); + + /* convert string url to socket addr */ + if (ovs_db_url_parse(surl, &pdb->conn) < 0) + goto failure; + + /* setup OVS DB callbacks */ + if (cb) + pdb->init_cb = cb->init_cb; + + /* prepare event thread */ + pthread_cond_init(&pdb->event_thread.cond, NULL); + pthread_mutex_init(&pdb->event_thread.mutex, NULL); + pthread_mutex_lock(&pdb->event_thread.mutex); + if (plugin_thread_create(&pdb->event_thread.tid, NULL, + ovs_event_worker, pdb) != 0) { + OVS_ERROR("event worker start failed"); + goto failure; + } + + /* prepare polling thread */ + ovs_db_reconnect(pdb); + pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING; + pthread_mutex_init(&pdb->poll_thread.mutex, NULL); + if (plugin_thread_create(&pdb->poll_thread.tid, NULL, + ovs_poll_worker, pdb) != 0) { + OVS_ERROR("pull worker start failed"); + goto failure; + } + + /* init OVS DB mutex */ + if (pthread_mutexattr_init(&mutex_attr) || + pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) || + pthread_mutex_init(&pdb->mutex, &mutex_attr)) { + OVS_ERROR("OVS DB mutex init failed"); + goto failure; + } + + /* return db to the caller */ + return pdb; + +failure: + if (pdb->conn.sock) + /* close connection */ + close(pdb->conn.sock); + if (pdb->event_thread.tid != 0) + /* stop event thread */ + if (ovs_db_event_thread_stop(pdb) < 0) + OVS_ERROR("stop event thread failed"); + if (pdb->poll_thread.tid != 0) + /* stop poll thread */ + if (ovs_db_poll_thread_stop(pdb) < 0) + OVS_ERROR("stop poll thread failed"); + sfree(pdb); + return NULL; +} + +int +ovs_db_send_request(ovs_db_t *pdb, const char *method, + const char *params, ovs_db_result_cb_t cb) +{ + int ret = 0; + yajl_gen_status yajl_gen_ret; + yajl_val jparams; + yajl_gen jgen; + ovs_callback_t *new_cb = NULL; + uint64_t uid; + char uid_buff[OVS_UID_STR_SIZE]; + const char *req = NULL; + size_t req_len = 0; + struct timespec ts; + + /* sanity check */ + if (!pdb || !method || !params) + return (-1); + + if ((jgen = yajl_gen_alloc(NULL)) == NULL) + return (-1); + + /* try to parse params */ + if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) { + OVS_ERROR("params is not a JSON string"); + yajl_gen_clear(jgen); + return (-1); + } + + /* generate method field */ + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method"); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method); + + /* generate params field */ + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params"); + OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams); + yajl_tree_free(jparams); + + /* generate id field */ + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id"); + uid = ovs_uid_generate(); + ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff); + + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + + if (cb) { + /* register result callback */ + if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) + goto yajl_gen_failure; + + /* add new callback to front */ + sem_init(&new_cb->result.sync, 0, 0); + new_cb->result.call = cb; + new_cb->uid = uid; + ovs_db_callback_add(pdb, new_cb); + } + + /* send the request */ + OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, + &req_len); + OVS_DEBUG("%s", req); + if (!ovs_db_data_send(pdb, req, req_len)) { + if (cb) { + /* wait for result */ + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT; + if (sem_timedwait(&new_cb->result.sync, &ts) < 0) { + OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__, + OVS_DB_SEND_REQ_TIMEOUT); + ret = (-1); + } + } + } else { + OVS_ERROR("ovs_db_data_send() failed"); + ret = (-1); + } + +yajl_gen_failure: + if (new_cb) { + /* destroy callback */ + sem_destroy(&new_cb->result.sync); + ovs_db_callback_remove(pdb, new_cb); + } + + /* release memory */ + yajl_gen_clear(jgen); + return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret; +} + +int +ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name, + const char **tb_column, ovs_db_table_cb_t update_cb, + ovs_db_result_cb_t result_cb, unsigned int flags) +{ + yajl_gen jgen; + yajl_gen_status yajl_gen_ret; + ovs_callback_t *new_cb = NULL; + char uid_str[OVS_UID_STR_SIZE]; + char *params; + size_t params_len; + int ovs_db_ret = 0; + + /* sanity check */ + if (pdb == NULL || tb_name == NULL || update_cb == NULL) + return (-1); + + if ((jgen = yajl_gen_alloc(NULL)) == NULL) + return (-1); + + /* register table update callback */ + if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) + return (-1); + + /* add new callback to front */ + new_cb->table.call = update_cb; + new_cb->uid = ovs_uid_generate(); + ovs_db_callback_add(pdb, new_cb); + + /* make update notification request + * [, , ] */ + OVS_YAJL_CALL(yajl_gen_array_open, jgen); + { + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME); + + /* uid string */ + ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str); + + /* */ + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + { + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name); + OVS_YAJL_CALL(yajl_gen_array_open, jgen); + { + /* */ + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + { + if (tb_column) { + /* columns within the table to be monitored */ + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns"); + OVS_YAJL_CALL(yajl_gen_array_open, jgen); + for (; *tb_column; tb_column++) + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column); + OVS_YAJL_CALL(yajl_gen_array_close, jgen); + } + /* specify select option */ + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select"); + { + OVS_YAJL_CALL(yajl_gen_map_open, jgen); + { + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial"); + OVS_YAJL_CALL(yajl_gen_bool, jgen, + flags & OVS_DB_TABLE_CB_FLAG_INITIAL); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert"); + OVS_YAJL_CALL(yajl_gen_bool, jgen, + flags & OVS_DB_TABLE_CB_FLAG_INSERT); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete"); + OVS_YAJL_CALL(yajl_gen_bool, jgen, + flags & OVS_DB_TABLE_CB_FLAG_DELETE); + OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify"); + OVS_YAJL_CALL(yajl_gen_bool, jgen, + flags & OVS_DB_TABLE_CB_FLAG_MODIFY); + } + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + } + } + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + } + OVS_YAJL_CALL(yajl_gen_array_close, jgen); + } + OVS_YAJL_CALL(yajl_gen_map_close, jgen); + } + OVS_YAJL_CALL(yajl_gen_array_close, jgen); + + /* make a request to subscribe to given table */ + OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)¶ms, + ¶ms_len); + if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) { + OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name); + ovs_db_ret = (-1); + } + +yajl_gen_failure: + /* release memory */ + yajl_gen_clear(jgen); + return ovs_db_ret; +} + +int +ovs_db_destroy(ovs_db_t *pdb) +{ + int ovs_db_ret = 0; + int ret = 0; + + /* sanity check */ + if (pdb == NULL) + return (-1); + + /* try to lock the structure before releasing */ + if (ret = pthread_mutex_lock(&pdb->mutex)) { + OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret); + return (-1); + } + + /* stop poll thread */ + if (ovs_db_event_thread_stop(pdb) < 0) { + OVS_ERROR("stop poll thread failed"); + ovs_db_ret = (-1); + } + + /* stop event thread */ + if (ovs_db_poll_thread_stop(pdb) < 0) { + OVS_ERROR("stop event thread failed"); + ovs_db_ret = (-1); + } + + /* unsubscribe callbacks */ + ovs_db_callback_remove_all(pdb); + + /* close connection */ + if (pdb->conn.sock) + close(pdb->conn.sock); + + /* release DB handler */ + pthread_mutex_unlock(&pdb->mutex); + pthread_mutex_destroy(&pdb->mutex); + sfree(pdb); + return ovs_db_ret; +} + +/* + * Public OVS utils API implementation + */ + +/* Get YAJL value by key from YAJL dictionary */ +yajl_val +ovs_utils_get_value_by_key(yajl_val jval, const char *key) +{ + const char *obj_key = NULL; + + /* check params */ + if (!YAJL_IS_OBJECT(jval) || !key) + return NULL; + + /* find a value by key */ + for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) { + obj_key = YAJL_GET_OBJECT(jval)->keys[i]; + if (strcmp(obj_key, key) == 0) + return YAJL_GET_OBJECT(jval)->values[i]; + } + + return NULL; +} diff --git a/src/utils_ovs.h b/src/utils_ovs.h new file mode 100644 index 00000000..484260f6 --- /dev/null +++ b/src/utils_ovs.h @@ -0,0 +1,202 @@ +/** + * collectd - src/utils_ovs.h + * + * Copyright(c) 2016 Intel Corporation. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * Authors: + * Volodymyr Mytnyk + * + * Description: + * The OVS util module provides the following features: + * - Implements the OVS DB communication transport specified + * by RFC7047: + * * Connect/disconnect to OVS DB; + * * Recovery mechanism in case of OVS DB connection lost; + * * Subscription mechanism to OVS DB table update events + * (insert/modify/delete); + * * Send custom JSON request to OVS DB (poll table data, etc.) + * * Handling of echo request from OVS DB server to verify the + * liveness of the connection. + * - Provides YAJL helpers functions. + * + * OVS DB API User Guide: + * All OVS DB function/structure names begins from 'ovs_db_*' prefix. To + * start using OVS DB API, client (plugin) should initialize the OVS DB + * object (`ovs_db_t') by calling `ovs_db_init' function. It initializes + * internal data and creates two main workers (threads). The result of the + * function is a pointer to new OVS DB object which can be used by other + * OVS DB API later and must be released by `ovs_db_destroy' function if + * the object isn't needed anymore. + * Once OVS DB API is initialized, the `init_cb' callback is called if + * the connection to OVS DB has been established. This callback is called + * every time the OVS DB is reconnected. So, if the client registers table + * update event callbacks or does any other OVS DB setup that can be lost + * after OVS DB reconnecting, it should be done in `init_cb' callback. + * The `ovs_db_table_cb_register` function is used to register OVS DB + * table update event callback and receive the table update notification + * when requested event occurs (registered callback is called). See + * function API for more info. + * To send custom JSON-RPC request to OVS DB, the `ovs_db_send_request' + * function is used. Please note, that connection to OVS DB should be + * established otherwise the function will return error. + * To verify the liveness of established connection, the OVS DB server + * sends echo request to the client with a given interval. The OVS utils + * takes care about this request and handles it properly. + **/ + +#ifndef UTILS_OVS_H +#define UTILS_OVS_H + +#include +#include + +/* Forward declaration */ +typedef struct ovs_db_s ovs_db_t; + +/* OVS DB callback type declaration */ +typedef void (*ovs_db_init_cb_t) (ovs_db_t *pdb); +typedef void (*ovs_db_table_cb_t) (yajl_val jupdates); +typedef void (*ovs_db_result_cb_t) (yajl_val jresult, yajl_val jerror); + +/* OVS DB structures */ +struct ovs_db_callback_s { + ovs_db_init_cb_t init_cb; +}; +typedef struct ovs_db_callback_s ovs_db_callback_t; + +/* OVS DB prototypes */ + +/* + * NAME + * ovs_db_init + * + * DESCRIPTION + * Initialize OVS DB internal data. The `ovs_db_destroy' function + * shall destroy the returned object. + * + * PARAMETERS + * `surl' OVS DB communication URL. + * `cb' OVS DB callbacks. + * + * RETURN VALUE + * New ovs_db_t object upon success or NULL if an error occurred. + */ +ovs_db_t *ovs_db_init(const char *surl, ovs_db_callback_t *cb); + +/* + * NAME + * ovs_db_destroy + * + * DESCRIPTION + * Destroy OVS DB object referenced by `pdb'. + * + * PARAMETERS + * `pdb' Pointer to OVS DB object. + * + * RETURN VALUE + * Zero upon success or non-zero if an error occurred. + */ +int ovs_db_destroy(ovs_db_t *pdb); + +/* + * NAME + * ovs_db_send_request + * + * DESCRIPTION + * Send JSON request to OVS DB server. + * + * PARAMETERS + * `pdb' Pointer to OVS DB object. + * `method' Request method name. + * `params' Method params to be sent (JSON value as a string). + * `cb' Result callback of the request. If NULL, the request + * is sent asynchronously. + * + * RETURN VALUE + * Zero upon success or non-zero if an error occurred. + */ +int ovs_db_send_request(ovs_db_t *pdb, const char *method, + const char *params, ovs_db_result_cb_t cb); + +/* callback types */ +#define OVS_DB_TABLE_CB_FLAG_INITIAL 0x01U +#define OVS_DB_TABLE_CB_FLAG_INSERT 0x02U +#define OVS_DB_TABLE_CB_FLAG_DELETE 0x04U +#define OVS_DB_TABLE_CB_FLAG_MODIFY 0x08U +#define OVS_DB_TABLE_CB_FLAG_ALL 0x0FU + +/* + * NAME + * ovs_db_table_cb_register + * + * DESCRIPTION + * Subscribe a callback on OVS DB table event. It allows to + * receive notifications (`update_cb' callback is called) of + * changes to requested table. + * + * PARAMETERS + * `pdb' Pointer to OVS DB object. + * `tb_name' OVS DB Table name to be monitored. + * `tb_column' OVS DB Table columns to be monitored. Last + * element in the array should be NULL. + * `update_cb' Callback function that is called when + * requested table columns are changed. + * `cb' Result callback of the request. If NULL, the call + * becomes asynchronous. + * Useful, if OVS_DB_TABLE_CB_FLAG_INITIAL is set. + * `flags' Bit mask of: + * OVS_DB_TABLE_CB_FLAG_INITIAL Receive initial values in + * result callback. + * OVS_DB_TABLE_CB_FLAG_INSERT Receive table insert events. + * OVS_DB_TABLE_CB_FLAG_DELETE Receive table remove events. + * OVS_DB_TABLE_CB_FLAG_MODIFY Receive table update events. + * OVS_DB_TABLE_CB_FLAG_ALL Receive all events. + * + * RETURN VALUE + * Zero upon success or non-zero if an error occurred. + */ +int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name, + const char **tb_column, + ovs_db_table_cb_t update_cb, + ovs_db_result_cb_t result_cb, + unsigned int flags); + +/* + * OVS utils API + */ + +/* + * NAME + * ovs_utils_get_value_by_key + * + * DESCRIPTION + * Get YAJL value by object name. + * + * PARAMETERS + * `jval' YAJL object value. + * `key' Object key name. + * + * RETURN VALUE + * YAJL value upon success or NULL if key not found. + */ +yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key); + +#endif