OVS link: Implement OVS link plugin
authorMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Wed, 31 Aug 2016 16:55:10 +0000 (17:55 +0100)
committerMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Mon, 26 Dec 2016 13:26:05 +0000 (13:26 +0000)
This plugin consists of two parts:
 - OVS link
    The implementation of the plugin itself, which uses
    OVS utils API to be able to monitor a link status of
    OVS connected interfaces and dispatch the values
    through collectd notification mechanism whenever
    the link state change occurs.

 - OVS utils
    This module implements the OVS DB communication routine
    specified by RFC7047. It includes:
      - Connecting/disconnecting to/from OVS DB (via TCP/UNIX);
      - Mechanism to subscribe to OVS DB table events like
        init/insert/modify/delete table rows;
      - API to send custom request and receive result;
      - Recovery connection mechanism with OVS DB;
      - Handling of ECHO request to verify the liveness
        of a database connection;
      - Helpers functions.

Change-Id: Icac392bd1bd40f7dd156bfd2fc4ff08d9725a22f
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Makefile.am
README
configure.ac
src/collectd.conf.in
src/collectd.conf.pod
src/ovs_link.c [new file with mode: 0644]
src/utils_ovs.c [new file with mode: 0644]
src/utils_ovs.h [new file with mode: 0644]

index f6554a1..d4d7cc0 100644 (file)
@@ -1278,6 +1278,14 @@ oracle_la_LIBADD = $(BUILD_WITH_ORACLE_LIBS)
 oracle_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 endif
 
+if BUILD_PLUGIN_OVS_LINK
+pkglib_LTLIBRARIES += ovs_link.la
+ovs_link_la_SOURCES = ovs_link.c utils_ovs.c
+ovs_link_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+ovs_link_la_CFLAGS = $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+ovs_link_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS)
+endif
+
 if BUILD_PLUGIN_PERL
 pkglib_LTLIBRARIES += perl.la
 perl_la_SOURCES = src/perl.c
diff --git a/README b/README
index 925f364..16640ab 100644 (file)
--- a/README
+++ b/README
@@ -288,6 +288,14 @@ Features
     - oracle
       Query data from an Oracle database.
 
+    - ovs_link
+      The plugin monitors the link status of OVS connected interfaces and
+      dispatches the values through collectd notification mechanism. It
+      requires YAJL library to be installed.
+      Detailed instructions for installing and setting up Open vSwitch, see
+      OVS documentation.
+      <http://openvswitch.org/support/dist-docs/INSTALL.md.html>
+
     - perl
       The perl plugin implements a Perl-interpreter into collectd. You can
       write your own plugins in Perl and return arbitrary values using this
@@ -926,7 +934,7 @@ Prerequisites
     <http://www.xmms.org/>
 
   * libyajl (optional)
-    Parse JSON data. This is needed for the `ceph', `curl_json' and
+    Parse JSON data. This is needed for the `ceph', `curl_json', 'ovs_link' and
     `log_logstash' plugins.
     <http://github.com/lloyd/yajl>
 
index 2d7f3e2..9243141 100644 (file)
@@ -6314,6 +6314,7 @@ AC_PLUGIN([onewire],             [$with_libowcapi],         [OneWire sensor stat
 AC_PLUGIN([openldap],            [$with_libldap],           [OpenLDAP statistics])
 AC_PLUGIN([openvpn],             [yes],                     [OpenVPN client statistics])
 AC_PLUGIN([oracle],              [$with_oracle],            [Oracle plugin])
+AC_PLUGIN([ovs_link],            [$with_libyajl],           [OVS link status plugin])
 AC_PLUGIN([perl],                [$plugin_perl],            [Embed a Perl interpreter])
 AC_PLUGIN([pf],                  [$have_net_pfvar_h],       [BSD packet filter (PF) statistics])
 # FIXME: Check for libevent, too.
@@ -6738,6 +6739,7 @@ AC_MSG_RESULT([    onewire . . . . . . . $enable_onewire])
 AC_MSG_RESULT([    openldap  . . . . . . $enable_openldap])
 AC_MSG_RESULT([    openvpn . . . . . . . $enable_openvpn])
 AC_MSG_RESULT([    oracle  . . . . . . . $enable_oracle])
+AC_MSG_RESULT([    ovs_link  . . . . . . $enable_ovs_link])
 AC_MSG_RESULT([    perl  . . . . . . . . $enable_perl])
 AC_MSG_RESULT([    pf  . . . . . . . . . $enable_pf])
 AC_MSG_RESULT([    pinba . . . . . . . . $enable_pinba])
index 62b2456..4b93825 100644 (file)
 #@BUILD_PLUGIN_OPENLDAP_TRUE@LoadPlugin openldap
 #@BUILD_PLUGIN_OPENVPN_TRUE@LoadPlugin openvpn
 #@BUILD_PLUGIN_ORACLE_TRUE@LoadPlugin oracle
+#@BUILD_PLUGIN_OVS_LINK_TRUE@LoadPlugin ovs_link
 #@BUILD_PLUGIN_PERL_TRUE@LoadPlugin perl
 #@BUILD_PLUGIN_PINBA_TRUE@LoadPlugin pinba
 #@BUILD_PLUGIN_PING_TRUE@LoadPlugin ping
 #  </Database>
 #</Plugin>
 
+#<Plugin ovs_link>
+#  OvsDbServerUrl "tcp:127.0.0.1:6640"
+#  Interfaces "br0" "veth0"
+#</Plugin>
+
 #<Plugin perl>
 #      IncludeDir "/my/include/path"
 #      BaseName "Collectd::Plugins"
index 2f29713..e814af5 100644 (file)
@@ -5453,6 +5453,58 @@ refer to them from.
 
 =back
 
+=head2 Plugin C<ovs_link>
+
+
+The I<ovs_link> plugin monitors the link status of OVS connected interfaces and
+dispatches the values through collectd notification mechanism whenever the link
+state change occurs. This plugin uses OVSDB to get a link state change
+notification.
+
+B<Synopsis:>
+
+ <Plugin "ovs_link">
+   OvsDbServerUrl "tcp:127.0.0.1:6640"
+   Interfaces "br0" "veth0"
+ </Plugin>
+
+The plugin provides the following configuration options:
+
+=over 4
+
+=item B<OvsDbServerUrl> I<server>
+
+The URL is an address of OVS DB server JSON-RPC interface used by the plugin.
+To enable the interface, OVS DB daemon should be running with '--remote=ptcp:'
+or '--remote=punix:' option. See L<ovsdb-server(1)> for more details. The URL
+must take one of the following forms:
+
+=over 4
+
+=item B<tcp:>I<ip>:I<port>
+
+Connect to the given tcp I<port> on I<ip>, where I<ip> is IPv4 address
+of OVS DB server which is listening on TCP I<port> for incoming
+JSON-RPC client connection.
+
+=item B<unix:>I<file>
+
+Connect to the unix domain server socket named I<file> which is
+used by OVS DB for incoming JSON-RPC client connection.
+
+=back
+
+Default: C<tcp:127.0.0.1:6640>
+
+=item B<Interfaces> [I<ifname> ...]
+
+List of interface names to be monitored by this plugin. If this option is missed
+or it's empty then all OVS connected interfaces on all bridges are monitored.
+
+Default: empty (all interfaces on all bridges are monitored)
+
+=back
+
 =head2 Plugin C<perl>
 
 This plugin embeds a Perl-interpreter into collectd and provides an interface
diff --git a/src/ovs_link.c b/src/ovs_link.c
new file mode 100644 (file)
index 0000000..b578a09
--- /dev/null
@@ -0,0 +1,358 @@
+/**
+ * collectd - src/ovs_link.c
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ **/
+
+#include "common.h"             /* auxiliary functions */
+#include "utils_ovs.h"          /* OVS helpers */
+
+#define OVS_LINK_PLUGIN "ovs_link"
+#define OVS_LINK_DEFAULT_OVS_DB_SERVER_URL "tcp:127.0.0.1:6640"
+#define CONFIG_LOCK for (int __i = config_lock(); __i != 0 ; \
+                         __i = config_unlock())
+
+struct interface_s {
+  char *name;                   /* interface name */
+  struct interface_s *next;     /* next interface name */
+};
+typedef struct interface_s interface_t;
+
+struct ovs_link_config_s {
+  pthread_mutex_t mutex;        /* mutex to lock the config structure */
+  char *ovs_db_server_url;      /* OVS DB server URL */
+  ovs_db_t *ovs_db;             /* pointer to OVS DB instance */
+  interface_t *ifaces;          /* interface names */
+};
+typedef struct ovs_link_config_s ovs_link_config_t;
+
+/*
+ * Private variables
+ */
+ovs_link_config_t config = {PTHREAD_MUTEX_INITIALIZER, NULL, NULL, NULL};
+
+/* This function is used only by "CONFIG_LOCK" defined above.
+ * It always returns 1 when the config is locked.
+ */
+static inline int
+config_lock()
+{
+  pthread_mutex_lock(&config.mutex);
+  return (1);
+}
+
+/* This function is used only by "CONFIG_LOCK" defined above.
+ * It always returns 0 when config is unlocked.
+ */
+static inline int
+config_unlock()
+{
+  pthread_mutex_unlock(&config.mutex);
+  return (0);
+}
+
+/* Check if given interface name exists in configuration file. It
+ * returns 1 if exists otherwise 0. If no interfaces are configured,
+ * 1 is returned
+ */
+static int
+ovs_link_config_iface_exists(const char *ifname)
+{
+  int rc = 0;
+  CONFIG_LOCK {
+    if (!(rc = (config.ifaces == NULL))) {
+      for (interface_t *iface = config.ifaces; iface; iface = iface->next)
+        if (rc = (strcmp(ifname, iface->name) == 0))
+          break;
+    }
+  }
+  return rc;
+}
+
+/* Release memory allocated for configuration data */
+static void
+ovs_link_config_free()
+{
+  interface_t *del_iface = NULL;
+  CONFIG_LOCK {
+    sfree(config.ovs_db_server_url);
+    while (config.ifaces) {
+      del_iface = config.ifaces;
+      config.ifaces = config.ifaces->next;
+      free(del_iface->name);
+      free(del_iface);
+    }
+  }
+}
+
+/* Parse plugin configuration file and store the config
+ * in allocated memory. Returns negative value in case of error.
+ */
+static int
+ovs_link_plugin_config(oconfig_item_t *ci)
+{
+  interface_t *new_iface;
+  char *if_name;
+  char *ovs_db_url;
+
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+    if (strcasecmp("OvsDbServerUrl", child->key) == 0) {
+      if (cf_util_get_string(child, &ovs_db_url) < 0) {
+        ERROR(OVS_LINK_PLUGIN ": parse '%s' option failed", child->key);
+        goto failure;
+      } else
+        config.ovs_db_server_url = ovs_db_url;
+    } else if (strcasecmp("Interfaces", child->key) == 0) {
+      for (int j = 0; j < child->values_num; j++) {
+        /* check value type */
+        if (child->values[j].type != OCONFIG_TYPE_STRING) {
+          ERROR(OVS_LINK_PLUGIN
+                ": given interface name is not a string [idx=%d]", j);
+          goto failure;
+        }
+
+        /* get value */
+        if ((if_name = strdup(child->values[j].value.string)) == NULL) {
+          ERROR(OVS_LINK_PLUGIN " strdup() copy interface name fail");
+          goto failure;
+        }
+
+        if ((new_iface = malloc(sizeof(*new_iface))) == NULL) {
+          ERROR(OVS_LINK_PLUGIN ": malloc () copy interface name fail");
+          goto failure;
+        } else {
+          /* store interface name */
+          new_iface->name = if_name;
+          new_iface->next = config.ifaces;
+          CONFIG_LOCK {
+            config.ifaces = new_iface;
+          }
+          DEBUG(OVS_LINK_PLUGIN ": found monitored interface \"%s\"",
+                if_name);
+        }
+      }
+    } else {
+      ERROR(OVS_LINK_PLUGIN ": option '%s' is not allowed here", child->key);
+      goto failure;
+    }
+  }
+  return (0);
+
+failure:
+  ovs_link_config_free();
+  return (-1);
+}
+
+/* Dispatch OVS interface link status event to collectd */
+static int
+ovs_link_dispatch_notification(const char *link_name, const char *link_state)
+{
+  notification_t n = {NOTIF_FAILURE, time(NULL), "", "", OVS_LINK_PLUGIN,
+                      "", "", "", NULL};
+
+  /* fill the notification data */
+  if (link_state != NULL)
+    n.severity = ((strcmp(link_state, "up") == 0) ?
+                  NOTIF_OKAY : NOTIF_WARNING);
+  else
+    link_state = "UNKNOWN";
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  ssnprintf(n.message, sizeof(n.message),
+            "link state of \"%s\" interface has been changed to \"%s\"",
+            link_name, link_state);
+
+  /* send the notification */
+  return plugin_dispatch_notification(&n);
+}
+
+/* Process OVS DB update table event. It handles link status update event(s)
+ * and dispatches the value(s) to collectd if interface name matches one of
+ * interfaces specified in configuration file.
+ */
+static void
+ovs_link_table_update_cb(yajl_val jupdates)
+{
+  yajl_val jnew_val = NULL;
+  yajl_val jupdate = NULL;
+  yajl_val jrow_update = NULL;
+  yajl_val jlink_name = NULL;
+  yajl_val jlink_state = NULL;
+  const char *link_name = NULL;
+
+  /* JSON "Interface" table update example:
+   * ---------------------------------
+   * {"Interface":
+   *  {
+   *   "9adf1db2-29ca-4140-ab22-ae347a4484de":
+   *    {
+   *     "new":
+   *      {
+   *       "name":"br0",
+   *       "link_state":"up"
+   *      },
+   *     "old":
+   *      {
+   *       "link_state":"down"
+   *      }
+   *    }
+   *  }
+   * }
+   */
+  if (!YAJL_IS_OBJECT(jupdates) || !(YAJL_GET_OBJECT(jupdates)->len > 0)) {
+    ERROR(OVS_LINK_PLUGIN ": unexpected OVS DB update event received");
+    return;
+  }
+  /* verify if this is a table event */
+  jupdate = YAJL_GET_OBJECT(jupdates)->values[0];
+  if (!YAJL_IS_OBJECT(jupdate)) {
+    ERROR(OVS_LINK_PLUGIN ": unexpected table update event received");
+    return;
+  }
+  /* go through all row updates  */
+  for (int row_index = 0; row_index < YAJL_GET_OBJECT(jupdate)->len;
+       ++row_index) {
+    jrow_update = YAJL_GET_OBJECT(jupdate)->values[row_index];
+
+    /* check row update */
+    jnew_val = ovs_utils_get_value_by_key(jrow_update, "new");
+    if (jnew_val == NULL) {
+      ERROR(OVS_LINK_PLUGIN ": unexpected row update received");
+      return;
+    }
+    /* get link status update */
+    jlink_name = ovs_utils_get_value_by_key(jnew_val, "name");
+    jlink_state = ovs_utils_get_value_by_key(jnew_val, "link_state");
+    if (jlink_name && jlink_state) {
+      link_name = YAJL_GET_STRING(jlink_name);
+      if (link_name && ovs_link_config_iface_exists(link_name)) {
+        /* dispatch notification */
+        ovs_link_dispatch_notification(link_name,
+                                       YAJL_GET_STRING(jlink_state));
+      }
+    }
+  }
+}
+
+/* Process OVS DB result table callback. It handles init link status value
+ * and dispatches the value(s) to collectd. The logic to handle init status
+ * is same as 'ovs_link_table_update_cb'.
+ */
+static void
+ovs_link_table_result_cb(yajl_val jresult, yajl_val jerror)
+{
+  (void)jerror;
+  /* jerror is not used as it is the same all the time
+     (rfc7047, "Monitor" section, return value) */
+  ovs_link_table_update_cb(jresult);
+}
+
+/* Setup OVS DB table callback. It subscribes to 'Interface' tables
+ * to receive link status events.
+ */
+static void
+ovs_link_initialize(ovs_db_t *pdb)
+{
+  int ret = 0;
+  const char tb_name[] = "Interface";
+  const char *columns[] = {"name", "link_state", NULL};
+
+  /* register the update callback */
+  ret = ovs_db_table_cb_register(pdb, tb_name, columns,
+                                 ovs_link_table_update_cb,
+                                 ovs_link_table_result_cb,
+                                 OVS_DB_TABLE_CB_FLAG_MODIFY |
+                                 OVS_DB_TABLE_CB_FLAG_INITIAL);
+  if (ret < 0) {
+    ERROR(OVS_LINK_PLUGIN ": register OVS DB update callback failed");
+    return;
+  }
+
+  DEBUG(OVS_LINK_PLUGIN ": OVS DB has been initialized");
+}
+
+/* Set default config values (update config) if some of them aren't
+ * specified in configuration file
+ */
+static inline int
+ovs_link_config_set_default()
+{
+  if (!config.ovs_db_server_url)
+    config.ovs_db_server_url = strdup(OVS_LINK_DEFAULT_OVS_DB_SERVER_URL);
+  return (config.ovs_db_server_url == NULL);
+}
+
+/* Initialize OVS plugin */
+static int
+ovs_link_plugin_init(void)
+{
+  ovs_db_t *ovs_db = NULL;
+  ovs_db_callback_t cb = {.init_cb = ovs_link_initialize};
+
+  if (ovs_link_config_set_default()) {
+    ERROR(OVS_LINK_PLUGIN ": fail to make configuration");
+    ovs_link_config_free();
+    return (-1);
+  }
+
+  /* initialize OVS DB */
+  if ((ovs_db = ovs_db_init(config.ovs_db_server_url, &cb)) == NULL) {
+    ERROR(OVS_LINK_PLUGIN ": fail to connect to OVS DB server");
+    ovs_link_config_free();
+    return (-1);
+  }
+
+  /* store OVSDB handler */
+  CONFIG_LOCK {
+    config.ovs_db = ovs_db;
+  }
+
+  DEBUG(OVS_LINK_PLUGIN ": plugin has been initialized");
+  return (0);
+}
+
+/* Shutdown OVS plugin */
+static int
+ovs_link_plugin_shutdown(void)
+{
+  /* release memory allocated for config */
+  ovs_link_config_free();
+
+  /* destroy OVS DB */
+  if (ovs_db_destroy(config.ovs_db))
+    ERROR(OVS_LINK_PLUGIN ": OVSDB object destroy failed");
+
+  DEBUG(OVS_LINK_PLUGIN ": plugin has been destroyed");
+  return (0);
+}
+
+/* Register OVS plugin callbacks */
+void
+module_register(void)
+{
+  plugin_register_complex_config(OVS_LINK_PLUGIN, ovs_link_plugin_config);
+  plugin_register_init(OVS_LINK_PLUGIN, ovs_link_plugin_init);
+  plugin_register_shutdown(OVS_LINK_PLUGIN, ovs_link_plugin_shutdown);
+}
diff --git a/src/utils_ovs.c b/src/utils_ovs.c
new file mode 100644 (file)
index 0000000..4c99eef
--- /dev/null
@@ -0,0 +1,1284 @@
+/**
+ * collectd - src/utils_ovs.c
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ *
+ *                         OVS DB API internal architecture diagram
+ * +------------------------------------------------------------------------------+
+ * |OVS plugin      |OVS utils                                                    |
+ * |                |     +------------------------+                              |
+ * |                |     |      echo handler      |                JSON request/ |
+ * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
+ * |                |  |  |                        |    |         | result        |
+ * |                |  |  +------------------------+    |         |               |
+ * |                |  |                                |    +----+---+--------+  |
+ * |  +----------+  |  |  +------------------------+    |    |        |        |  |
+ * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
+ * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
+ * |  +----------+  |  |  |                        |    |    |        |        |  |
+ * |                |  |  +------------------------+    |    +--------+---+----+  |
+ * |                |  |                                |                 ^       |
+ * |  +----------+  |  |  +------------------------+    |                 |       |
+ * |  |  result  |  |  |  |     result handler     |    |                 |       |
+ * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
+ * |  +----------+  |  |  |                        |               data   |       |
+ * |                |  |  +------------------------+                      |       |
+ * |                |  |                                                  |       |
+ * |                |  |    +------------------+             +------------+----+  |
+ * |  +----------+  |  |    |thread|           |             |thread|          |  |
+ * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
+ * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
+ * |  +----------+  |  |    +------------------+             +--------+--------+  |
+ * |                |  |                                              ^           |
+ * +----------------+-------------------------------------------------------------+
+ *                     |                                              |
+ *                 JSON|echo reply                                 raw|data
+ *                     v                                              v
+ * +-------------------+----------------------------------------------+-----------+
+ * |                                 TCP/UNIX socket                              |
+ * +-------------------------------------------------------------------------------
+ *
+ **/
+
+/* collectd headers */
+#include "common.h"
+
+/* private headers */
+#include "utils_ovs.h"
+
+/* system libraries */
+#include <semaphore.h>
+#include <arpa/inet.h>
+#include <poll.h>
+#include <sys/un.h>
+
+#define OVS_ERROR(fmt, ...) do { \
+  ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
+#define OVS_DEBUG(fmt, ...) do { \
+  DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
+        ## __VA_ARGS__); } while (0)
+
+#define OVS_DB_POLL_TIMEOUT          1  /* poll receive timeout (sec) */
+#define OVS_DB_POLL_READ_BLOCK_SIZE  5  /* read block size (bytes) */
+#define OVS_DB_DEFAULT_DB_NAME       "Open_vSwitch"
+#define OVS_DB_RECONNECT_TIMEOUT     1  /* reconnect timeout (sec) */
+
+#define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
+#define OVS_DB_EVENT_TERMINATE       1
+#define OVS_DB_EVENT_CONNECTED       2
+
+#define OVS_DB_POLL_STATE_RUNNING    1
+#define OVS_DB_POLL_STATE_EXITING    2
+
+#define OVS_DB_SEND_REQ_TIMEOUT      5  /* send request timeout (sec) */
+
+#define OVS_YAJL_CALL(func, ...) \
+  do { \
+    yajl_gen_ret = yajl_gen_status_ok; \
+    if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
+      goto yajl_gen_failure; \
+  } while (0)
+#define OVS_YAJL_ERROR_BUFFER_SIZE       1024
+#define OVS_ERROR_BUFF_SIZE              512
+#define OVS_UID_STR_SIZE                 17     /* 64-bit HEX string len + '\0' */
+
+/* JSON reader internal data */
+struct ovs_json_reader_s {
+  char *buff_ptr;
+  size_t buff_size;
+  size_t buff_offset;
+  size_t json_offset;
+};
+typedef struct ovs_json_reader_s ovs_json_reader_t;
+
+/* Result callback declaration */
+struct ovs_result_cb_s {
+  sem_t sync;
+  ovs_db_result_cb_t call;
+};
+typedef struct ovs_result_cb_s ovs_result_cb_t;
+
+/* Table callback declaration */
+struct ovs_table_cb_s {
+  ovs_db_table_cb_t call;
+};
+typedef struct ovs_table_cb_s ovs_table_cb_t;
+
+/* Callback declaration */
+struct ovs_callback_s {
+  uint64_t uid;
+  union {
+    ovs_result_cb_t result;
+    ovs_table_cb_t table;
+  };
+  struct ovs_callback_s *next;
+  struct ovs_callback_s *prev;
+};
+typedef struct ovs_callback_s ovs_callback_t;
+
+/* Connection declaration */
+struct ovs_conn_s {
+  int sock;
+  int domain;
+  int type;
+  int addr_size;
+  union {
+    struct sockaddr_in s_inet;
+    struct sockaddr_un s_unix;
+  } addr;
+};
+typedef struct ovs_conn_s ovs_conn_t;
+
+/* Event thread data declaration */
+struct ovs_event_thread_s {
+  pthread_t tid;
+  pthread_mutex_t mutex;
+  pthread_cond_t cond;
+  int value;
+};
+typedef struct ovs_event_thread_s ovs_event_thread_t;
+
+/* Poll thread data declaration */
+struct ovs_poll_thread_s {
+  pthread_t tid;
+  pthread_mutex_t mutex;
+  int state;
+};
+typedef struct ovs_poll_thread_s ovs_poll_thread_t;
+
+/* OVS DB internal data declaration */
+struct ovs_db_s {
+  ovs_poll_thread_t poll_thread;
+  ovs_event_thread_t event_thread;
+  pthread_mutex_t mutex;
+  ovs_callback_t *cb;
+  ovs_conn_t conn;
+  ovs_db_init_cb_t init_cb;
+};
+typedef struct ovs_db_s ovs_db_t;
+
+/* Post an event to event thread.
+ * Possible events are:
+ *  OVS_DB_EVENT_TERMINATE
+ *  OVS_DB_EVENT_CONNECTED
+ */
+static void
+ovs_db_event_post(ovs_db_t *pdb, int event)
+{
+  pthread_mutex_lock(&pdb->event_thread.mutex);
+  pdb->event_thread.value = event;
+  pthread_mutex_unlock(&pdb->event_thread.mutex);
+  pthread_cond_signal(&pdb->event_thread.cond);
+}
+
+/* Check if POLL thread is still running. Returns
+ * 1 if running otherwise 0 is returned */
+static inline int
+ovs_db_poll_is_running(ovs_db_t *pdb)
+{
+  int state = 0;
+  pthread_mutex_lock(&pdb->poll_thread.mutex);
+  state = pdb->poll_thread.state;
+  pthread_mutex_unlock(&pdb->poll_thread.mutex);
+  return (state == OVS_DB_POLL_STATE_RUNNING);
+}
+
+/* Terminate POLL thread */
+static inline void
+ovs_db_poll_terminate(ovs_db_t *pdb)
+{
+  pthread_mutex_lock(&pdb->poll_thread.mutex);
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+  pthread_mutex_unlock(&pdb->poll_thread.mutex);
+}
+
+/* Generate unique identifier (UID). It is used by OVS DB API
+ * to set "id" field for any OVS DB JSON request. */
+static uint64_t
+ovs_uid_generate()
+{
+  struct timespec ts;
+  clock_gettime(CLOCK_MONOTONIC, &ts);
+  return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+}
+
+/*
+ * Callback API. These function are used to store
+ * registered callbacks in OVS DB API.
+ */
+
+/* Add new callback into OVS DB object */
+static void
+ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
+{
+  pthread_mutex_lock(&pdb->mutex);
+  if (pdb->cb)
+    pdb->cb->prev = new_cb;
+  new_cb->next = pdb->cb;
+  new_cb->prev = NULL;
+  pdb->cb = new_cb;
+  pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Remove callback from OVS DB object */
+static void
+ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
+{
+  ovs_callback_t *pre_cb = del_cb->prev;
+  ovs_callback_t *next_cb = del_cb->next;
+
+  pthread_mutex_lock(&pdb->mutex);
+  if (next_cb)
+    next_cb->prev = del_cb->prev;
+
+  if (pre_cb)
+    pre_cb->next = del_cb->next;
+  else
+    pdb->cb = del_cb->next;
+
+  free(del_cb);
+  pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Remove all callbacks form OVS DB object */
+static void
+ovs_db_callback_remove_all(ovs_db_t *pdb)
+{
+  pthread_mutex_lock(&pdb->mutex);
+  for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
+    pdb->cb = pdb->cb->next;
+    free(del_cb);
+  }
+  pdb->cb = NULL;
+  pthread_mutex_unlock(&pdb->mutex);
+}
+
+/* Get/find callback in OVS DB object by UID. Returns pointer
+ * to requested callback otherwise NULL is returned */
+static ovs_callback_t *
+ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
+{
+  pthread_mutex_lock(&pdb->mutex);
+  for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
+    if (cb->uid == uid) {
+      pthread_mutex_unlock(&pdb->mutex);
+      return cb;
+    }
+  pthread_mutex_unlock(&pdb->mutex);
+  return NULL;
+}
+
+/* Send all requested data to the socket. Returns 0 if
+ * ALL request data has been sent otherwise negative value
+ * is returned */
+static int
+ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
+{
+  ssize_t nbytes = 0;
+  size_t rem = len;
+  size_t off = 0;
+
+  while (rem > 0) {
+    if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
+      return (-1);
+    rem -= (size_t)nbytes;
+    off += (size_t)nbytes;
+  }
+  return (0);
+}
+
+/* Parse OVS server URL.
+ * Format of the URL:
+ *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
+ *   "unix:file" - define UNIX socket file (UNIX domain)
+ */
+static int
+ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
+{
+  ovs_conn_t tmp_conn;
+  char *nexttok = NULL;
+  char *in_str = NULL;
+  char *saveptr;
+  int ret = 0;
+
+  /* sanity check */
+  if ((surl == NULL) || (strlen(surl) < 1))
+    return (-1);
+
+  /* parse domain */
+  tmp_conn = *conn;
+  in_str = sstrdup(surl);
+  if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
+    if (strcmp("tcp", nexttok) == 0) {
+      tmp_conn.domain = AF_INET;
+      tmp_conn.type = SOCK_STREAM;
+      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
+    } else if (strcmp("unix", nexttok) == 0) {
+      tmp_conn.domain = AF_UNIX;
+      tmp_conn.type = SOCK_STREAM;
+      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
+    } else
+      goto failure;
+  } else
+    goto failure;
+
+  /* parse url depending on domain */
+  if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
+    if (tmp_conn.domain == AF_UNIX) {
+      /* <UNIX-NAME> */
+      tmp_conn.addr.s_inet.sin_family = AF_UNIX;
+      sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
+    } else {
+      /* <IP:PORT> */
+      tmp_conn.addr.s_inet.sin_family = AF_INET;
+      ret =
+        inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
+      if (ret == 1) {
+        if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
+          tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
+        else
+          goto failure;
+      } else
+        goto failure;
+    }
+  }
+
+  /* save result and return success */
+  *conn = tmp_conn;
+  sfree(in_str);
+  return (0);
+
+failure:
+  OVS_ERROR("%s() : invalid OVS DB URL provided");
+  sfree(in_str);
+  return (-1);
+}
+
+/*
+ * YAJL (Yet Another JSON Library) helper functions
+ * Documentation (https://lloyd.github.io/yajl/)
+ */
+
+/* Add null-terminated string into YAJL generator handle (JSON object).
+ * Similar function to yajl_gen_string() but takes null-terminated string
+ * instead of string and its length.
+ *
+ * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
+ * string - Null-terminated string
+ */
+static inline yajl_gen_status
+ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
+{
+  return yajl_gen_string(hander, string, strlen(string));
+}
+
+/* Add YAJL value into YAJL generator handle (JSON object)
+ *
+ * jgen - YAJL generator handle allocated by yajl_gen_alloc()
+ * jval - YAJL value usually returned by yajl_tree_get()
+ */
+static yajl_gen_status
+ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
+{
+  size_t array_len = 0;
+  yajl_val *jvalues = NULL;
+  yajl_val jobj_value = NULL;
+  const char *obj_key = NULL;
+  size_t obj_len = 0;
+  yajl_gen_status yajl_gen_ret;
+
+  if (YAJL_IS_STRING(jval))
+    OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
+  else if (YAJL_IS_DOUBLE(jval))
+    OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
+  else if (YAJL_IS_INTEGER(jval))
+    OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
+  else if (YAJL_IS_TRUE(jval))
+    OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
+  else if (YAJL_IS_FALSE(jval))
+    OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
+  else if (YAJL_IS_NULL(jval))
+    OVS_YAJL_CALL(yajl_gen_null, jgen);
+  else if (YAJL_IS_ARRAY(jval)) {
+    /* create new array and add all elements into the array */
+    array_len = YAJL_GET_ARRAY(jval)->len;
+    jvalues = YAJL_GET_ARRAY(jval)->values;
+    OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+    for (int i = 0; i < array_len; i++)
+      OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
+    OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+  } else if (YAJL_IS_OBJECT(jval)) {
+    /* create new object and add all elements into the object */
+    OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+    obj_len = YAJL_GET_OBJECT(jval)->len;
+    for (int i = 0; i < obj_len; i++) {
+      obj_key = YAJL_GET_OBJECT(jval)->keys[i];
+      jobj_value = YAJL_GET_OBJECT(jval)->values[i];
+      OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
+      OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
+    }
+    OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+  } else {
+    OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
+              (int)(jval)->type);
+    goto yajl_gen_failure;
+  }
+  return yajl_gen_status_ok;
+
+yajl_gen_failure:
+  OVS_ERROR("%s() error to generate value", __FUNCTION__);
+  return yajl_gen_ret;
+}
+
+/* OVS DB echo request handler. When OVS DB sends
+ * "echo" request to the client, client should generate
+ * "echo" replay with the same content received in the
+ * request */
+static int
+ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
+{
+  yajl_val jparams;
+  yajl_val jid;
+  yajl_gen jgen;
+  size_t resp_len = 0;
+  const char *resp = NULL;
+  const char *params_path[] = {"params", NULL};
+  const char *id_path[] = {"id", NULL};
+  yajl_gen_status yajl_gen_ret;
+
+  if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+    return (-1);
+
+  /* check & get request attributes */
+  if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
+      ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
+    OVS_ERROR("parse echo request failed");
+    goto yajl_gen_failure;
+  }
+
+  /* generate JSON echo response */
+  OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
+  OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
+
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
+  OVS_YAJL_CALL(yajl_gen_null, jgen);
+
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
+  OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
+
+  OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
+                &resp_len);
+
+  /* send the response */
+  OVS_DEBUG("response: %s", resp);
+  if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
+    OVS_ERROR("send echo reply failed");
+    goto yajl_gen_failure;
+  }
+  /* clean up and return success */
+  yajl_gen_clear(jgen);
+  return (0);
+
+yajl_gen_failure:
+  /* release memory */
+  yajl_gen_clear(jgen);
+  return (-1);
+}
+
+/* Get OVS DB registered callback by YAJL val. The YAJL
+ * value should be YAJL string (UID). Returns NULL if
+ * callback hasn't been found.
+ */
+static ovs_callback_t *
+ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
+{
+  char *endptr = NULL;
+  const char *suid = NULL;
+  uint64_t uid;
+
+  if (jid && YAJL_IS_STRING(jid)) {
+    suid = YAJL_GET_STRING(jid);
+    uid = (uint64_t) strtoul(suid, &endptr, 16);
+    if (*endptr == '\0' && uid)
+      return ovs_db_callback_get(pdb, uid);
+  }
+
+  return NULL;
+}
+
+/* OVS DB table update event handler.
+ * This callback is called by POLL thread if OVS DB
+ * table update callback is received from the DB
+ * server. Once registered callback found, it's called
+ * by this handler. */
+static int
+ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
+{
+  ovs_callback_t *cb = NULL;
+  yajl_val jvalue;
+  yajl_val jparams;
+  yajl_val jtable_updates;
+  yajl_val jtable_update;
+  size_t obj_len = 0;
+  const char *table_name = NULL;
+  const char *params_path[] = {"params", NULL};
+  const char *id_path[] = {"id", NULL};
+
+  /* check & get request attributes */
+  if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
+      (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
+    goto ovs_failure;
+
+  /* check array length: [<json-value>, <table-updates>] */
+  if (YAJL_GET_ARRAY(jparams)->len != 2)
+    goto ovs_failure;
+
+  jvalue = YAJL_GET_ARRAY(jparams)->values[0];
+  jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
+  if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
+    goto ovs_failure;
+
+  /* find registered callback based on <json-value> */
+  cb = ovs_db_table_callback_get(pdb, jvalue);
+  if (cb == NULL || cb->table.call == NULL)
+    goto ovs_failure;
+
+  /* call registered callback */
+  cb->table.call(jtable_updates);
+  return 0;
+
+ovs_failure:
+  OVS_ERROR("invalid OVS DB table update event");
+  return (-1);
+}
+
+/* OVS DB result request handler.
+ * This callback is called by POLL thread if OVS DB
+ * result reply is received from the DB server.
+ * Once registered callback found, it's called
+ * by this handler. */
+static int
+ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
+{
+  ovs_callback_t *cb = NULL;
+  yajl_val jresult;
+  yajl_val jerror;
+  yajl_val jid;
+  const char *result_path[] = {"result", NULL};
+  const char *error_path[] = {"error", NULL};
+  const char *id_path[] = {"id", NULL};
+
+  jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
+  jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
+  jid = yajl_tree_get(jnode, id_path, yajl_t_string);
+
+  /* check & get result attributes */
+  if (!jresult || !jerror || !jid)
+    return (-1);
+
+  /* try to find registered callback */
+  cb = ovs_db_table_callback_get(pdb, jid);
+  if (cb != NULL && cb->result.call != NULL) {
+    /* call registered callback */
+    cb->result.call(jresult, jerror);
+    /* unlock owner of the reply */
+    sem_post(&cb->result.sync);
+  }
+
+  return (0);
+}
+
+/* Handle JSON data (one request) and call
+ * appropriate event OVS DB handler. Currently,
+ * update callback 'ovs_db_table_update_cb' and
+ * result callback 'ovs_db_result_cb' is supported.
+ */
+static int
+ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
+{
+  const char *method = NULL;
+  char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
+  const char *method_path[] = {"method", NULL};
+  const char *result_path[] = {"result", NULL};
+  char *sjson = NULL;
+  yajl_val jnode, jval;
+
+  /* duplicate the data to make null-terminated string
+   * required for yajl_tree_parse() */
+  if ((sjson = strndup(data, len)) == NULL)
+    return (-1);
+
+  OVS_DEBUG("%s", sjson);
+
+  /* parse json data */
+  jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
+  if (jnode == NULL) {
+    OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
+    return (-1);
+  }
+
+  /* get method name */
+  if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
+    method = YAJL_GET_STRING(jval);
+    if (strcmp("echo", method) == 0) {
+      /* echo request from the server */
+      if (ovs_db_table_echo_cb(pdb, jnode) < 0)
+        OVS_ERROR("handle echo request failed");
+    } else if (strcmp("update", method) == 0) {
+      /* update notification */
+      if (ovs_db_table_update_cb(pdb, jnode) < 0)
+        OVS_ERROR("handle update notification failed");
+    }
+  } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
+    /* result notification */
+    if (ovs_db_result_cb(pdb, jnode) < 0)
+      OVS_ERROR("handle result reply failed");
+  }
+
+  /* release memory */
+  yajl_tree_free(jnode);
+  sfree(sjson);
+  return (0);
+}
+
+/*
+ * JSON reader implementation.
+ *
+ * This module process raw JSON data (byte stream) and
+ * returns fully-fledged JSON data which can be processed
+ * (parsed) by YAJL later.
+ */
+
+/* Allocate JSON reader instance */
+static inline ovs_json_reader_t *
+ovs_json_reader_alloc()
+{
+  ovs_json_reader_t *jreader = NULL;
+
+  if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
+    return NULL;
+
+  return jreader;
+}
+
+/* Push raw data into into the JSON reader for processing */
+static inline int
+ovs_json_reader_push_data(ovs_json_reader_t *jreader,
+                          const char *data, size_t data_len)
+{
+  char *new_buff = NULL;
+  size_t available = jreader->buff_size - jreader->buff_offset;
+
+  /* check/update required memory space */
+  if (available < data_len) {
+    OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
+              (int)jreader->buff_size, (int)available, (int)data_len);
+
+    /* allocate new chunk of memory */
+    new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
+    if (new_buff == NULL)
+      return (-1);
+
+    /* point to new allocated memory */
+    jreader->buff_ptr = new_buff;
+    jreader->buff_size += data_len;
+  }
+
+  /* store input data */
+  memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
+  jreader->buff_offset += data_len;
+  return (0);
+}
+
+/* Pop one fully-fledged JSON if already exists. Returns 0 if
+ * completed JSON already exists otherwise negative value is
+ * returned */
+static inline int
+ovs_json_reader_pop(ovs_json_reader_t *jreader,
+                    const char **json_ptr, size_t *json_len_ptr)
+{
+  size_t nbraces = 0;
+  size_t json_len = 0;
+  char *json = NULL;
+
+  /* search open/close brace */
+  for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
+    if (jreader->buff_ptr[i] == '{') {
+      nbraces++;
+    } else if (jreader->buff_ptr[i] == '}')
+      if (nbraces)
+        if (!(--nbraces)) {
+          /* JSON data */
+          *json_ptr = jreader->buff_ptr + jreader->json_offset;
+          *json_len_ptr = json_len + 1;
+          jreader->json_offset = i + 1;
+          return (0);
+        }
+
+    /* increase JSON data length */
+    if (nbraces)
+      json_len++;
+  }
+
+  if (jreader->json_offset) {
+    if (jreader->json_offset < jreader->buff_offset) {
+      /* shift data to the beginning of the buffer
+       * and zero rest of the buffer data */
+      json = &jreader->buff_ptr[jreader->json_offset];
+      json_len = jreader->buff_offset - jreader->json_offset;
+      for (int i = 0; i < jreader->buff_size; i++)
+        jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
+      jreader->buff_offset = json_len;
+    } else
+      /* reset the buffer */
+      jreader->buff_offset = 0;
+
+    /* data is at the beginning of the buffer */
+    jreader->json_offset = 0;
+  }
+
+  return (-1);
+}
+
+/* Reset JSON reader. It is useful when start processing
+ * new raw data. E.g.: in case of lost stream connection.
+ */
+static inline void
+ovs_json_reader_reset(ovs_json_reader_t *jreader)
+{
+  if (jreader) {
+    jreader->buff_offset = 0;
+    jreader->json_offset = 0;
+  }
+}
+
+/* Release internal data allocated for JSON reader */
+static inline void
+ovs_json_reader_free(ovs_json_reader_t *jreader)
+{
+  if (jreader) {
+    free(jreader->buff_ptr);
+    free(jreader);
+  }
+}
+
+/* Reconnect to OVD DB and call init OVS DB callback
+ * 'init_cb' if connection has been established.
+ */
+static int
+ovs_db_reconnect(ovs_db_t *pdb)
+{
+  char errbuff[OVS_ERROR_BUFF_SIZE];
+
+  /* remove all registered OVS DB table/result callbacks */
+  ovs_db_callback_remove_all(pdb);
+
+  /* open new socket */
+  if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
+    sstrerror(errno, errbuff, sizeof(errbuff));
+    OVS_ERROR("socket(): %s", errbuff);
+    return (-1);
+  }
+
+  /* try to connect to server */
+  if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
+              pdb->conn.addr_size) < 0) {
+    sstrerror(errno, errbuff, sizeof(errbuff));
+    OVS_ERROR("connect(): %s", errbuff);
+    close(pdb->conn.sock);
+    return (-1);
+  }
+
+  /* send notification to event thread */
+  ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
+  return (0);
+}
+
+/* POLL worker thread.
+ * It listens on OVS DB connection for incoming
+ * requests/reply/events etc. Also, it reconnects to OVS DB
+ * if connection has been lost.
+ */
+static void *
+ovs_poll_worker(void *arg)
+{
+  ovs_db_t *pdb = (ovs_db_t *)arg;      /* pointer to OVS DB */
+  ovs_json_reader_t *jreader = NULL;
+  const char *json;
+  size_t json_len;
+  ssize_t nbytes = 0;
+  char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+  struct pollfd poll_fd;
+  int poll_ret = 0;
+
+  if ((jreader = ovs_json_reader_alloc()) == NULL) {
+    OVS_ERROR("initialize json reader failed");
+    goto thread_exit;
+  }
+
+  /* start polling data */
+  poll_fd.fd = pdb->conn.sock;
+  poll_fd.events = POLLIN | POLLPRI;
+  poll_fd.revents = 0;
+
+  /* poll data */
+  while (ovs_db_poll_is_running(pdb)) {
+    poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+    if (poll_ret > 0) {
+      if (poll_fd.revents & POLLNVAL) {
+        /* invalid file descriptor, reconnect */
+        if (ovs_db_reconnect(pdb) != 0) {
+          /* sleep awhile until next reconnect */
+          usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
+        }
+        ovs_json_reader_reset(jreader);
+        poll_fd.fd = pdb->conn.sock;
+      } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+        /* connection is broken */
+        OVS_ERROR("poll() peer closed its end of the channel");
+        close(poll_fd.fd);
+      } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+        /* read incoming data */
+        nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
+        if (nbytes > 0) {
+          OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
+          ovs_json_reader_push_data(jreader, buff, nbytes);
+          while (!ovs_json_reader_pop(jreader, &json, &json_len))
+            /* process JSON data */
+            ovs_db_json_data_process(pdb, json, json_len);
+        } else if (nbytes == 0) {
+          OVS_ERROR("recv() peer has performed an orderly shutdown");
+          close(poll_fd.fd);
+        } else {
+          OVS_ERROR("recv() receive data error");
+          break;
+        }
+      }                         /* poll() POLLIN & POLLPRI */
+    } else if (poll_ret == 0)
+      OVS_DEBUG("poll() timeout");
+    else {
+      OVS_ERROR("poll() error");
+      break;
+    }
+  }
+
+thread_exit:
+  OVS_DEBUG("poll thread has been completed");
+  ovs_json_reader_free(jreader);
+  pthread_exit((void *)0);
+  return ((void *)0);
+}
+
+/* EVENT worker thread.
+ * Perform task based on incoming events. This
+ * task can be done asynchronously which allows to
+ * handle OVD DB callback like 'init_cb'.
+ */
+static void *
+ovs_event_worker(void *arg)
+{
+  int ret = 0;
+  ovs_db_t *pdb = (ovs_db_t *)arg;
+  struct timespec ts;
+
+  while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
+    /* wait for an event */
+    clock_gettime(CLOCK_REALTIME, &ts);
+    ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
+    ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+                                 &pdb->event_thread.mutex, &ts);
+    if (!ret) {
+      /* handle the event */
+      OVS_DEBUG("handle event %d", pdb->event_thread.value);
+      if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
+        if (pdb->init_cb)
+          pdb->init_cb(pdb);
+    } else if (ret == ETIMEDOUT) {
+      /* wait timeout */
+      OVS_DEBUG("no event received (timeout)");
+      continue;
+    } else {
+      /* unexpected error */
+      OVS_ERROR("pthread_cond_timedwait() failed");
+      break;
+    }
+  }
+
+thread_exit:
+  OVS_DEBUG("event thread has been completed");
+  pthread_exit((void *)0);
+  return ((void *)0);
+}
+
+/* Stop EVENT thread */
+static int
+ovs_db_event_thread_stop(ovs_db_t *pdb)
+{
+  ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
+  if (pthread_join(pdb->event_thread.tid, NULL) != 0)
+    return (-1);
+  pthread_mutex_unlock(&pdb->event_thread.mutex);
+  pthread_mutex_destroy(&pdb->event_thread.mutex);
+  return (0);
+}
+
+/* Stop POLL thread */
+static int
+ovs_db_poll_thread_stop(ovs_db_t *pdb)
+{
+  ovs_db_poll_terminate(pdb);
+  if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
+    return (-1);
+  pthread_mutex_destroy(&pdb->poll_thread.mutex);
+  return (0);
+}
+
+/*
+ * Public OVS DB API implementation
+ */
+
+ovs_db_t *
+ovs_db_init(const char *surl, ovs_db_callback_t *cb)
+{
+  pthread_mutexattr_t mutex_attr;
+  ovs_db_t *pdb = NULL;
+
+  /* allocate db data & fill it */
+  if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
+    return (NULL);
+
+  /* convert string url to socket addr */
+  if (ovs_db_url_parse(surl, &pdb->conn) < 0)
+    goto failure;
+
+  /* setup OVS DB callbacks */
+  if (cb)
+    pdb->init_cb = cb->init_cb;
+
+  /* prepare event thread */
+  pthread_cond_init(&pdb->event_thread.cond, NULL);
+  pthread_mutex_init(&pdb->event_thread.mutex, NULL);
+  pthread_mutex_lock(&pdb->event_thread.mutex);
+  if (plugin_thread_create(&pdb->event_thread.tid, NULL,
+                           ovs_event_worker, pdb) != 0) {
+    OVS_ERROR("event worker start failed");
+    goto failure;
+  }
+
+  /* prepare polling thread */
+  ovs_db_reconnect(pdb);
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+  pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
+  if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
+                           ovs_poll_worker, pdb) != 0) {
+    OVS_ERROR("pull worker start failed");
+    goto failure;
+  }
+
+  /* init OVS DB mutex */
+  if (pthread_mutexattr_init(&mutex_attr) ||
+      pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
+      pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+    OVS_ERROR("OVS DB mutex init failed");
+    goto failure;
+  }
+
+  /* return db to the caller */
+  return pdb;
+
+failure:
+  if (pdb->conn.sock)
+    /* close connection */
+    close(pdb->conn.sock);
+  if (pdb->event_thread.tid != 0)
+    /* stop event thread */
+    if (ovs_db_event_thread_stop(pdb) < 0)
+      OVS_ERROR("stop event thread failed");
+  if (pdb->poll_thread.tid != 0)
+    /* stop poll thread */
+    if (ovs_db_poll_thread_stop(pdb) < 0)
+      OVS_ERROR("stop poll thread failed");
+  sfree(pdb);
+  return NULL;
+}
+
+int
+ovs_db_send_request(ovs_db_t *pdb, const char *method,
+                    const char *params, ovs_db_result_cb_t cb)
+{
+  int ret = 0;
+  yajl_gen_status yajl_gen_ret;
+  yajl_val jparams;
+  yajl_gen jgen;
+  ovs_callback_t *new_cb = NULL;
+  uint64_t uid;
+  char uid_buff[OVS_UID_STR_SIZE];
+  const char *req = NULL;
+  size_t req_len = 0;
+  struct timespec ts;
+
+  /* sanity check */
+  if (!pdb || !method || !params)
+    return (-1);
+
+  if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+    return (-1);
+
+  /* try to parse params */
+  if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
+    OVS_ERROR("params is not a JSON string");
+    yajl_gen_clear(jgen);
+    return (-1);
+  }
+
+  /* generate method field */
+  OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
+
+  /* generate params field */
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
+  OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
+  yajl_tree_free(jparams);
+
+  /* generate id field */
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
+  uid = ovs_uid_generate();
+  ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
+  OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
+
+  OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+
+  if (cb) {
+    /* register result callback */
+    if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+      goto yajl_gen_failure;
+
+    /* add new callback to front */
+    sem_init(&new_cb->result.sync, 0, 0);
+    new_cb->result.call = cb;
+    new_cb->uid = uid;
+    ovs_db_callback_add(pdb, new_cb);
+  }
+
+  /* send the request */
+  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
+                &req_len);
+  OVS_DEBUG("%s", req);
+  if (!ovs_db_data_send(pdb, req, req_len)) {
+    if (cb) {
+      /* wait for result */
+      clock_gettime(CLOCK_REALTIME, &ts);
+      ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
+      if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
+        OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
+                  OVS_DB_SEND_REQ_TIMEOUT);
+        ret = (-1);
+      }
+    }
+  } else {
+    OVS_ERROR("ovs_db_data_send() failed");
+    ret = (-1);
+  }
+
+yajl_gen_failure:
+  if (new_cb) {
+    /* destroy callback */
+    sem_destroy(&new_cb->result.sync);
+    ovs_db_callback_remove(pdb, new_cb);
+  }
+
+  /* release memory */
+  yajl_gen_clear(jgen);
+  return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
+}
+
+int
+ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+                         const char **tb_column, ovs_db_table_cb_t update_cb,
+                         ovs_db_result_cb_t result_cb, unsigned int flags)
+{
+  yajl_gen jgen;
+  yajl_gen_status yajl_gen_ret;
+  ovs_callback_t *new_cb = NULL;
+  char uid_str[OVS_UID_STR_SIZE];
+  char *params;
+  size_t params_len;
+  int ovs_db_ret = 0;
+
+  /* sanity check */
+  if (pdb == NULL || tb_name == NULL || update_cb == NULL)
+    return (-1);
+
+  if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+    return (-1);
+
+  /* register table update callback */
+  if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+    return (-1);
+
+  /* add new callback to front */
+  new_cb->table.call = update_cb;
+  new_cb->uid = ovs_uid_generate();
+  ovs_db_callback_add(pdb, new_cb);
+
+  /* make update notification request
+   * [<db-name>, <json-value>, <monitor-requests>] */
+  OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+  {
+    OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
+
+    /* uid string <json-value> */
+    ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
+    OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
+
+    /* <monitor-requests> */
+    OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+    {
+      OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
+      OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+      {
+        /* <monitor-request> */
+        OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+        {
+          if (tb_column) {
+            /* columns within the table to be monitored */
+            OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
+            OVS_YAJL_CALL(yajl_gen_array_open, jgen);
+            for (; *tb_column; tb_column++)
+              OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
+            OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+          }
+          /* specify select option */
+          OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
+          {
+            OVS_YAJL_CALL(yajl_gen_map_open, jgen);
+            {
+              OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
+              OVS_YAJL_CALL(yajl_gen_bool, jgen,
+                            flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
+              OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
+              OVS_YAJL_CALL(yajl_gen_bool, jgen,
+                            flags & OVS_DB_TABLE_CB_FLAG_INSERT);
+              OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
+              OVS_YAJL_CALL(yajl_gen_bool, jgen,
+                            flags & OVS_DB_TABLE_CB_FLAG_DELETE);
+              OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
+              OVS_YAJL_CALL(yajl_gen_bool, jgen,
+                            flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
+            }
+            OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+          }
+        }
+        OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+      }
+      OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+    }
+    OVS_YAJL_CALL(yajl_gen_map_close, jgen);
+  }
+  OVS_YAJL_CALL(yajl_gen_array_close, jgen);
+
+  /* make a request to subscribe to given table */
+  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
+                &params_len);
+  if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
+    OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
+    ovs_db_ret = (-1);
+  }
+
+yajl_gen_failure:
+  /* release memory */
+  yajl_gen_clear(jgen);
+  return ovs_db_ret;
+}
+
+int
+ovs_db_destroy(ovs_db_t *pdb)
+{
+  int ovs_db_ret = 0;
+  int ret = 0;
+
+  /* sanity check */
+  if (pdb == NULL)
+    return (-1);
+
+  /* try to lock the structure before releasing */
+  if (ret = pthread_mutex_lock(&pdb->mutex)) {
+    OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
+    return (-1);
+  }
+
+  /* stop poll thread */
+  if (ovs_db_event_thread_stop(pdb) < 0) {
+    OVS_ERROR("stop poll thread failed");
+    ovs_db_ret = (-1);
+  }
+
+  /* stop event thread */
+  if (ovs_db_poll_thread_stop(pdb) < 0) {
+    OVS_ERROR("stop event thread failed");
+    ovs_db_ret = (-1);
+  }
+
+  /* unsubscribe callbacks */
+  ovs_db_callback_remove_all(pdb);
+
+  /* close connection */
+  if (pdb->conn.sock)
+    close(pdb->conn.sock);
+
+  /* release DB handler */
+  pthread_mutex_unlock(&pdb->mutex);
+  pthread_mutex_destroy(&pdb->mutex);
+  sfree(pdb);
+  return ovs_db_ret;
+}
+
+/*
+ * Public OVS utils API implementation
+ */
+
+/* Get YAJL value by key from YAJL dictionary */
+yajl_val
+ovs_utils_get_value_by_key(yajl_val jval, const char *key)
+{
+  const char *obj_key = NULL;
+
+  /* check params */
+  if (!YAJL_IS_OBJECT(jval) || !key)
+    return NULL;
+
+  /* find a value by key */
+  for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
+    obj_key = YAJL_GET_OBJECT(jval)->keys[i];
+    if (strcmp(obj_key, key) == 0)
+      return YAJL_GET_OBJECT(jval)->values[i];
+  }
+
+  return NULL;
+}
diff --git a/src/utils_ovs.h b/src/utils_ovs.h
new file mode 100644 (file)
index 0000000..484260f
--- /dev/null
@@ -0,0 +1,202 @@
+/**
+ * collectd - src/utils_ovs.h
+ *
+ * Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * Authors:
+ *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+ *
+ * Description:
+ *  The OVS util module provides the following features:
+ *   - Implements the OVS DB communication transport specified
+ *     by RFC7047:
+ *     * Connect/disconnect to OVS DB;
+ *     * Recovery mechanism in case of OVS DB connection lost;
+ *     * Subscription mechanism to OVS DB table update events
+ *       (insert/modify/delete);
+ *     * Send custom JSON request to OVS DB (poll table data, etc.)
+ *     * Handling of echo request from OVS DB server to verify the
+ *       liveness of the connection.
+ *   - Provides YAJL helpers functions.
+ *
+ *  OVS DB API User Guide:
+ *    All OVS DB function/structure names begins from 'ovs_db_*' prefix. To
+ *   start using OVS DB API, client (plugin) should initialize the OVS DB
+ *   object (`ovs_db_t') by calling `ovs_db_init' function. It initializes
+ *   internal data and creates two main workers (threads). The result of the
+ *   function is a pointer to new OVS DB object which can be used by other
+ *   OVS DB API later and must be released by `ovs_db_destroy' function if
+ *   the object isn't needed anymore.
+ *    Once OVS DB API is initialized, the `init_cb' callback is called if
+ *   the connection to OVS DB has been established. This callback is called
+ *   every time the OVS DB is reconnected. So, if the client registers table
+ *   update event callbacks or does any other OVS DB setup that can be lost
+ *   after OVS DB reconnecting, it should be done in `init_cb' callback.
+ *    The `ovs_db_table_cb_register` function is used to register OVS DB
+ *   table update event callback and receive the table update notification
+ *   when requested event occurs (registered callback is called). See
+ *   function API for more info.
+ *    To send custom JSON-RPC request to OVS DB, the `ovs_db_send_request'
+ *   function is used. Please note, that connection to OVS DB should be
+ *   established otherwise the function will return error.
+ *    To verify the liveness of established connection, the OVS DB server
+ *   sends echo request to the client with a given interval. The OVS utils
+ *   takes care about this request and handles it properly.
+ **/
+
+#ifndef UTILS_OVS_H
+#define UTILS_OVS_H
+
+#include <yajl/yajl_tree.h>
+#include <yajl/yajl_gen.h>
+
+/* Forward declaration */
+typedef struct ovs_db_s ovs_db_t;
+
+/* OVS DB callback type declaration */
+typedef void (*ovs_db_init_cb_t) (ovs_db_t *pdb);
+typedef void (*ovs_db_table_cb_t) (yajl_val jupdates);
+typedef void (*ovs_db_result_cb_t) (yajl_val jresult, yajl_val jerror);
+
+/* OVS DB structures */
+struct ovs_db_callback_s {
+  ovs_db_init_cb_t init_cb;
+};
+typedef struct ovs_db_callback_s ovs_db_callback_t;
+
+/* OVS DB prototypes */
+
+/*
+ * NAME
+ *   ovs_db_init
+ *
+ * DESCRIPTION
+ *   Initialize OVS DB internal data. The `ovs_db_destroy' function
+ *   shall destroy the returned object.
+ *
+ * PARAMETERS
+ *   `surl'        OVS DB communication URL.
+ *   `cb'          OVS DB callbacks.
+ *
+ * RETURN VALUE
+ *   New ovs_db_t object upon success or NULL if an error occurred.
+ */
+ovs_db_t *ovs_db_init(const char *surl, ovs_db_callback_t *cb);
+
+/*
+ * NAME
+ *   ovs_db_destroy
+ *
+ * DESCRIPTION
+ *   Destroy OVS DB object referenced by `pdb'.
+ *
+ * PARAMETERS
+ *   `pdb'         Pointer to OVS DB object.
+ *
+ * RETURN VALUE
+ *   Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_destroy(ovs_db_t *pdb);
+
+/*
+ * NAME
+ *   ovs_db_send_request
+ *
+ * DESCRIPTION
+ *   Send JSON request to OVS DB server.
+ *
+ * PARAMETERS
+ *   `pdb'         Pointer to OVS DB object.
+ *   `method'      Request method name.
+ *   `params'      Method params to be sent (JSON value as a string).
+ *   `cb'          Result callback of the request. If NULL, the request
+ *                 is sent asynchronously.
+ *
+ * RETURN VALUE
+ *   Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_send_request(ovs_db_t *pdb, const char *method,
+                        const char *params, ovs_db_result_cb_t cb);
+
+/* callback types */
+#define OVS_DB_TABLE_CB_FLAG_INITIAL 0x01U
+#define OVS_DB_TABLE_CB_FLAG_INSERT  0x02U
+#define OVS_DB_TABLE_CB_FLAG_DELETE  0x04U
+#define OVS_DB_TABLE_CB_FLAG_MODIFY  0x08U
+#define OVS_DB_TABLE_CB_FLAG_ALL     0x0FU
+
+/*
+ * NAME
+ *   ovs_db_table_cb_register
+ *
+ * DESCRIPTION
+ *   Subscribe a callback on OVS DB table event. It allows to
+ *   receive notifications (`update_cb' callback is called) of
+ *   changes to requested table.
+ *
+ * PARAMETERS
+ *   `pdb'         Pointer to OVS DB object.
+ *   `tb_name'     OVS DB Table name to be monitored.
+ *   `tb_column'   OVS DB Table columns to be monitored. Last
+ *                 element in the array should be NULL.
+ *   `update_cb'   Callback function that is called when
+ *                 requested table columns are changed.
+ *   `cb'          Result callback of the request. If NULL, the call
+ *                 becomes asynchronous.
+ *                 Useful, if OVS_DB_TABLE_CB_FLAG_INITIAL is set.
+ *   `flags'       Bit mask of:
+ *                   OVS_DB_TABLE_CB_FLAG_INITIAL Receive initial values in
+ *                                               result callback.
+ *                   OVS_DB_TABLE_CB_FLAG_INSERT  Receive table insert events.
+ *                   OVS_DB_TABLE_CB_FLAG_DELETE  Receive table remove events.
+ *                   OVS_DB_TABLE_CB_FLAG_MODIFY  Receive table update events.
+ *                   OVS_DB_TABLE_CB_FLAG_ALL     Receive all events.
+ *
+ * RETURN VALUE
+ *   Zero upon success or non-zero if an error occurred.
+ */
+int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+                             const char **tb_column,
+                             ovs_db_table_cb_t update_cb,
+                             ovs_db_result_cb_t result_cb,
+                             unsigned int flags);
+
+/*
+ * OVS utils API
+ */
+
+/*
+ * NAME
+ *   ovs_utils_get_value_by_key
+ *
+ * DESCRIPTION
+ *   Get YAJL value by object name.
+ *
+ * PARAMETERS
+ *   `jval'        YAJL object value.
+ *   `key'         Object key name.
+ *
+ * RETURN VALUE
+ *   YAJL value upon success or NULL if key not found.
+ */
+yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key);
+
+#endif