OVS events: Fix multiple interface config issue
[collectd.git] / src / utils_ovs.c
index 4c99eef..2a4bdf8 100644 (file)
  *
  * Authors:
  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
- *
+ **/
+
+/* clang-format off */
+/*
  *                         OVS DB API internal architecture diagram
  * +------------------------------------------------------------------------------+
  * |OVS plugin      |OVS utils                                                    |
  * +-------------------+----------------------------------------------+-----------+
  * |                                 TCP/UNIX socket                              |
  * +-------------------------------------------------------------------------------
- *
- **/
+ */
+/* clang-format on */
 
 /* collectd headers */
+#include "collectd.h"
+
 #include "common.h"
 
 /* private headers */
 #include "utils_ovs.h"
 
 /* system libraries */
-#include <semaphore.h>
+#if HAVE_NETDB_H
+#include <netdb.h>
+#endif
+#if HAVE_ARPA_INET_H
 #include <arpa/inet.h>
+#endif
+#if HAVE_POLL_H
 #include <poll.h>
+#endif
+#if HAVE_SYS_UN_H
 #include <sys/un.h>
+#endif
 
-#define OVS_ERROR(fmt, ...) do { \
-  ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
-#define OVS_DEBUG(fmt, ...) do { \
-  DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
-        ## __VA_ARGS__); } while (0)
+#include <semaphore.h>
 
-#define OVS_DB_POLL_TIMEOUT          1  /* poll receive timeout (sec) */
-#define OVS_DB_POLL_READ_BLOCK_SIZE  5  /* read block size (bytes) */
-#define OVS_DB_DEFAULT_DB_NAME       "Open_vSwitch"
-#define OVS_DB_RECONNECT_TIMEOUT     1  /* reconnect timeout (sec) */
+#define OVS_ERROR(fmt, ...)                                                    \
+  do {                                                                         \
+    ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
+  } while (0)
+#define OVS_DEBUG(fmt, ...)                                                    \
+  do {                                                                         \
+    DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
+          ##__VA_ARGS__);                                                      \
+  } while (0)
+
+#define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
+#define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
+#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
+#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
 
-#define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
-#define OVS_DB_EVENT_TERMINATE       1
-#define OVS_DB_EVENT_CONNECTED       2
+#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
+#define OVS_DB_EVENT_TERMINATE 1
+#define OVS_DB_EVENT_CONN_ESTABLISHED 2
+#define OVS_DB_EVENT_CONN_TERMINATED 3
 
-#define OVS_DB_POLL_STATE_RUNNING    1
-#define OVS_DB_POLL_STATE_EXITING    2
+#define OVS_DB_POLL_STATE_RUNNING 1
+#define OVS_DB_POLL_STATE_EXITING 2
 
-#define OVS_DB_SEND_REQ_TIMEOUT      5  /* send request timeout (sec) */
+#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
 
-#define OVS_YAJL_CALL(func, ...) \
-  do { \
-    yajl_gen_ret = yajl_gen_status_ok; \
-    if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
-      goto yajl_gen_failure; \
+#define OVS_YAJL_CALL(func, ...)                                               \
+  do {                                                                         \
+    yajl_gen_ret = yajl_gen_status_ok;                                         \
+    if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
+      goto yajl_gen_failure;                                                   \
   } while (0)
-#define OVS_YAJL_ERROR_BUFFER_SIZE       1024
-#define OVS_ERROR_BUFF_SIZE              512
-#define OVS_UID_STR_SIZE                 17     /* 64-bit HEX string len + '\0' */
+#define OVS_YAJL_ERROR_BUFFER_SIZE 1024
+#define OVS_ERROR_BUFF_SIZE 512
+#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
 
 /* JSON reader internal data */
 struct ovs_json_reader_s {
@@ -137,19 +157,6 @@ struct ovs_callback_s {
 };
 typedef struct ovs_callback_s ovs_callback_t;
 
-/* Connection declaration */
-struct ovs_conn_s {
-  int sock;
-  int domain;
-  int type;
-  int addr_size;
-  union {
-    struct sockaddr_in s_inet;
-    struct sockaddr_un s_unix;
-  } addr;
-};
-typedef struct ovs_conn_s ovs_conn_t;
-
 /* Event thread data declaration */
 struct ovs_event_thread_s {
   pthread_t tid;
@@ -172,20 +179,20 @@ struct ovs_db_s {
   ovs_poll_thread_t poll_thread;
   ovs_event_thread_t event_thread;
   pthread_mutex_t mutex;
-  ovs_callback_t *cb;
-  ovs_conn_t conn;
-  ovs_db_init_cb_t init_cb;
+  ovs_callback_t *remote_cb;
+  ovs_db_callback_t cb;
+  char service[OVS_DB_ADDR_SERVICE_SIZE];
+  char node[OVS_DB_ADDR_NODE_SIZE];
+  int sock;
 };
-typedef struct ovs_db_s ovs_db_t;
 
 /* Post an event to event thread.
  * Possible events are:
  *  OVS_DB_EVENT_TERMINATE
- *  OVS_DB_EVENT_CONNECTED
+ *  OVS_DB_EVENT_CONN_ESTABLISHED
+ *  OVS_DB_EVENT_CONN_TERMINATED
  */
-static void
-ovs_db_event_post(ovs_db_t *pdb, int event)
-{
+static void ovs_db_event_post(ovs_db_t *pdb, int event) {
   pthread_mutex_lock(&pdb->event_thread.mutex);
   pdb->event_thread.value = event;
   pthread_mutex_unlock(&pdb->event_thread.mutex);
@@ -194,9 +201,7 @@ ovs_db_event_post(ovs_db_t *pdb, int event)
 
 /* Check if POLL thread is still running. Returns
  * 1 if running otherwise 0 is returned */
-static inline int
-ovs_db_poll_is_running(ovs_db_t *pdb)
-{
+static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
   int state = 0;
   pthread_mutex_lock(&pdb->poll_thread.mutex);
   state = pdb->poll_thread.state;
@@ -205,9 +210,7 @@ ovs_db_poll_is_running(ovs_db_t *pdb)
 }
 
 /* Terminate POLL thread */
-static inline void
-ovs_db_poll_terminate(ovs_db_t *pdb)
-{
+static void ovs_db_poll_terminate(ovs_db_t *pdb) {
   pthread_mutex_lock(&pdb->poll_thread.mutex);
   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
   pthread_mutex_unlock(&pdb->poll_thread.mutex);
@@ -215,9 +218,7 @@ ovs_db_poll_terminate(ovs_db_t *pdb)
 
 /* Generate unique identifier (UID). It is used by OVS DB API
  * to set "id" field for any OVS DB JSON request. */
-static uint64_t
-ovs_uid_generate()
-{
+static uint64_t ovs_uid_generate() {
   struct timespec ts;
   clock_gettime(CLOCK_MONOTONIC, &ts);
   return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
@@ -229,22 +230,18 @@ ovs_uid_generate()
  */
 
 /* Add new callback into OVS DB object */
-static void
-ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
-{
+static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
   pthread_mutex_lock(&pdb->mutex);
-  if (pdb->cb)
-    pdb->cb->prev = new_cb;
-  new_cb->next = pdb->cb;
+  if (pdb->remote_cb)
+    pdb->remote_cb->prev = new_cb;
+  new_cb->next = pdb->remote_cb;
   new_cb->prev = NULL;
-  pdb->cb = new_cb;
+  pdb->remote_cb = new_cb;
   pthread_mutex_unlock(&pdb->mutex);
 }
 
 /* Remove callback from OVS DB object */
-static void
-ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
-{
+static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
   ovs_callback_t *pre_cb = del_cb->prev;
   ovs_callback_t *next_cb = del_cb->next;
 
@@ -255,52 +252,48 @@ ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
   if (pre_cb)
     pre_cb->next = del_cb->next;
   else
-    pdb->cb = del_cb->next;
+    pdb->remote_cb = del_cb->next;
 
   free(del_cb);
   pthread_mutex_unlock(&pdb->mutex);
 }
 
 /* Remove all callbacks form OVS DB object */
-static void
-ovs_db_callback_remove_all(ovs_db_t *pdb)
-{
+static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
   pthread_mutex_lock(&pdb->mutex);
-  for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
-    pdb->cb = pdb->cb->next;
+  for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
+       del_cb = pdb->remote_cb) {
+    pdb->remote_cb = del_cb->next;
     free(del_cb);
   }
-  pdb->cb = NULL;
+  pdb->remote_cb = NULL;
   pthread_mutex_unlock(&pdb->mutex);
 }
 
 /* Get/find callback in OVS DB object by UID. Returns pointer
- * to requested callback otherwise NULL is returned */
-static ovs_callback_t *
-ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
-{
-  pthread_mutex_lock(&pdb->mutex);
-  for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
-    if (cb->uid == uid) {
-      pthread_mutex_unlock(&pdb->mutex);
+ * to requested callback otherwise NULL is returned.
+ *
+ * IMPORTANT NOTE:
+ *   The OVS DB mutex should be locked by the caller
+ *   to make sure that returned callback is still valid.
+ */
+static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
+  for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
+    if (cb->uid == uid)
       return cb;
-    }
-  pthread_mutex_unlock(&pdb->mutex);
   return NULL;
 }
 
 /* Send all requested data to the socket. Returns 0 if
  * ALL request data has been sent otherwise negative value
  * is returned */
-static int
-ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
   ssize_t nbytes = 0;
   size_t rem = len;
   size_t off = 0;
 
   while (rem > 0) {
-    if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
+    if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
       return (-1);
     rem -= (size_t)nbytes;
     off += (size_t)nbytes;
@@ -308,73 +301,6 @@ ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
   return (0);
 }
 
-/* Parse OVS server URL.
- * Format of the URL:
- *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
- *   "unix:file" - define UNIX socket file (UNIX domain)
- */
-static int
-ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
-{
-  ovs_conn_t tmp_conn;
-  char *nexttok = NULL;
-  char *in_str = NULL;
-  char *saveptr;
-  int ret = 0;
-
-  /* sanity check */
-  if ((surl == NULL) || (strlen(surl) < 1))
-    return (-1);
-
-  /* parse domain */
-  tmp_conn = *conn;
-  in_str = sstrdup(surl);
-  if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
-    if (strcmp("tcp", nexttok) == 0) {
-      tmp_conn.domain = AF_INET;
-      tmp_conn.type = SOCK_STREAM;
-      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
-    } else if (strcmp("unix", nexttok) == 0) {
-      tmp_conn.domain = AF_UNIX;
-      tmp_conn.type = SOCK_STREAM;
-      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
-    } else
-      goto failure;
-  } else
-    goto failure;
-
-  /* parse url depending on domain */
-  if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
-    if (tmp_conn.domain == AF_UNIX) {
-      /* <UNIX-NAME> */
-      tmp_conn.addr.s_inet.sin_family = AF_UNIX;
-      sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
-    } else {
-      /* <IP:PORT> */
-      tmp_conn.addr.s_inet.sin_family = AF_INET;
-      ret =
-        inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
-      if (ret == 1) {
-        if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
-          tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
-        else
-          goto failure;
-      } else
-        goto failure;
-    }
-  }
-
-  /* save result and return success */
-  *conn = tmp_conn;
-  sfree(in_str);
-  return (0);
-
-failure:
-  OVS_ERROR("%s() : invalid OVS DB URL provided");
-  sfree(in_str);
-  return (-1);
-}
-
 /*
  * YAJL (Yet Another JSON Library) helper functions
  * Documentation (https://lloyd.github.io/yajl/)
@@ -387,10 +313,9 @@ failure:
  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
  * string - Null-terminated string
  */
-static inline yajl_gen_status
-ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
-{
-  return yajl_gen_string(hander, string, strlen(string));
+static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
+                                            const char *string) {
+  return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
 }
 
 /* Add YAJL value into YAJL generator handle (JSON object)
@@ -398,9 +323,7 @@ ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
  * jval - YAJL value usually returned by yajl_tree_get()
  */
-static yajl_gen_status
-ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
-{
+static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
   size_t array_len = 0;
   yajl_val *jvalues = NULL;
   yajl_val jobj_value = NULL;
@@ -455,9 +378,7 @@ yajl_gen_failure:
  * "echo" request to the client, client should generate
  * "echo" replay with the same content received in the
  * request */
-static int
-ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
   yajl_val jparams;
   yajl_val jid;
   yajl_gen jgen;
@@ -511,18 +432,17 @@ yajl_gen_failure:
 
 /* Get OVS DB registered callback by YAJL val. The YAJL
  * value should be YAJL string (UID). Returns NULL if
- * callback hasn't been found.
+ * callback hasn't been found. See also ovs_db_callback_get()
+ * description for addition info.
  */
-static ovs_callback_t *
-ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
-{
+static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
   char *endptr = NULL;
   const char *suid = NULL;
   uint64_t uid;
 
   if (jid && YAJL_IS_STRING(jid)) {
     suid = YAJL_GET_STRING(jid);
-    uid = (uint64_t) strtoul(suid, &endptr, 16);
+    uid = (uint64_t)strtoul(suid, &endptr, 16);
     if (*endptr == '\0' && uid)
       return ovs_db_callback_get(pdb, uid);
   }
@@ -535,9 +455,7 @@ ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
  * table update callback is received from the DB
  * server. Once registered callback found, it's called
  * by this handler. */
-static int
-ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
   ovs_callback_t *cb = NULL;
   yajl_val jvalue;
   yajl_val jparams;
@@ -563,12 +481,16 @@ ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
     goto ovs_failure;
 
   /* find registered callback based on <json-value> */
+  pthread_mutex_lock(&pdb->mutex);
   cb = ovs_db_table_callback_get(pdb, jvalue);
-  if (cb == NULL || cb->table.call == NULL)
+  if (cb == NULL || cb->table.call == NULL) {
+    pthread_mutex_unlock(&pdb->mutex);
     goto ovs_failure;
+  }
 
   /* call registered callback */
   cb->table.call(jtable_updates);
+  pthread_mutex_unlock(&pdb->mutex);
   return 0;
 
 ovs_failure:
@@ -581,9 +503,7 @@ ovs_failure:
  * result reply is received from the DB server.
  * Once registered callback found, it's called
  * by this handler. */
-static int
-ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
   ovs_callback_t *cb = NULL;
   yajl_val jresult;
   yajl_val jerror;
@@ -601,6 +521,7 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
     return (-1);
 
   /* try to find registered callback */
+  pthread_mutex_lock(&pdb->mutex);
   cb = ovs_db_table_callback_get(pdb, jid);
   if (cb != NULL && cb->result.call != NULL) {
     /* call registered callback */
@@ -609,6 +530,7 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
     sem_post(&cb->result.sync);
   }
 
+  pthread_mutex_unlock(&pdb->mutex);
   return (0);
 }
 
@@ -617,9 +539,8 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
  * update callback 'ovs_db_table_update_cb' and
  * result callback 'ovs_db_result_cb' is supported.
  */
-static int
-ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
+                                    size_t len) {
   const char *method = NULL;
   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
   const char *method_path[] = {"method", NULL};
@@ -629,20 +550,22 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
 
   /* duplicate the data to make null-terminated string
    * required for yajl_tree_parse() */
-  if ((sjson = strndup(data, len)) == NULL)
+  if ((sjson = malloc(len + 1)) == NULL)
     return (-1);
 
-  OVS_DEBUG("%s", sjson);
+  sstrncpy(sjson, data, len + 1);
+  OVS_DEBUG("[len=%zu] %s", len, sjson);
 
   /* parse json data */
   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
   if (jnode == NULL) {
     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
+    sfree(sjson);
     return (-1);
   }
 
   /* get method name */
-  if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
+  if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
     method = YAJL_GET_STRING(jval);
     if (strcmp("echo", method) == 0) {
       /* echo request from the server */
@@ -653,11 +576,12 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
       if (ovs_db_table_update_cb(pdb, jnode) < 0)
         OVS_ERROR("handle update notification failed");
     }
-  } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
+  } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
     /* result notification */
     if (ovs_db_result_cb(pdb, jnode) < 0)
       OVS_ERROR("handle result reply failed");
-  }
+  } else
+    OVS_ERROR("connot find method or result failed");
 
   /* release memory */
   yajl_tree_free(jnode);
@@ -674,9 +598,7 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
  */
 
 /* Allocate JSON reader instance */
-static inline ovs_json_reader_t *
-ovs_json_reader_alloc()
-{
+static ovs_json_reader_t *ovs_json_reader_alloc() {
   ovs_json_reader_t *jreader = NULL;
 
   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
@@ -686,10 +608,8 @@ ovs_json_reader_alloc()
 }
 
 /* Push raw data into into the JSON reader for processing */
-static inline int
-ovs_json_reader_push_data(ovs_json_reader_t *jreader,
-                          const char *data, size_t data_len)
-{
+static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
+                                     const char *data, size_t data_len) {
   char *new_buff = NULL;
   size_t available = jreader->buff_size - jreader->buff_offset;
 
@@ -717,10 +637,8 @@ ovs_json_reader_push_data(ovs_json_reader_t *jreader,
 /* Pop one fully-fledged JSON if already exists. Returns 0 if
  * completed JSON already exists otherwise negative value is
  * returned */
-static inline int
-ovs_json_reader_pop(ovs_json_reader_t *jreader,
-                    const char **json_ptr, size_t *json_len_ptr)
-{
+static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
+                               const char **json_ptr, size_t *json_len_ptr) {
   size_t nbraces = 0;
   size_t json_len = 0;
   char *json = NULL;
@@ -767,9 +685,7 @@ ovs_json_reader_pop(ovs_json_reader_t *jreader,
 /* Reset JSON reader. It is useful when start processing
  * new raw data. E.g.: in case of lost stream connection.
  */
-static inline void
-ovs_json_reader_reset(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
   if (jreader) {
     jreader->buff_offset = 0;
     jreader->json_offset = 0;
@@ -777,45 +693,85 @@ ovs_json_reader_reset(ovs_json_reader_t *jreader)
 }
 
 /* Release internal data allocated for JSON reader */
-static inline void
-ovs_json_reader_free(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
   if (jreader) {
     free(jreader->buff_ptr);
     free(jreader);
   }
 }
 
-/* Reconnect to OVD DB and call init OVS DB callback
- * 'init_cb' if connection has been established.
+/* Reconnect to OVD DB and call the OVS DB post connection init callback
+ * if connection has been established.
  */
-static int
-ovs_db_reconnect(ovs_db_t *pdb)
-{
+static int ovs_db_reconnect(ovs_db_t *pdb) {
   char errbuff[OVS_ERROR_BUFF_SIZE];
+  const char unix_prefix[] = "unix:";
+  struct addrinfo *result, *rp;
+  _Bool is_connected = 0;
+  struct sockaddr_un saunix;
 
   /* remove all registered OVS DB table/result callbacks */
   ovs_db_callback_remove_all(pdb);
 
-  /* open new socket */
-  if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
-    sstrerror(errno, errbuff, sizeof(errbuff));
-    OVS_ERROR("socket(): %s", errbuff);
-    return (-1);
+  if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) {
+    /* create unix socket address */
+    rp = calloc(1, sizeof(struct addrinfo));
+    struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
+    if (rp == NULL || sa_unix == NULL) {
+      sfree(rp);
+      sfree(sa_unix);
+      return (1);
+    }
+    rp->ai_family = AF_UNIX;
+    rp->ai_socktype = SOCK_STREAM;
+    rp->ai_addrlen = sizeof(*sa_unix);
+    rp->ai_addr = (struct sockaddr *)sa_unix;
+    sa_unix->sun_family = rp->ai_family;
+    sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)),
+             sizeof(sa_unix->sun_path));
+    result = rp;
+  } else {
+    /* intet socket address */
+    int ret = 0;
+    struct addrinfo hints;
+
+    /* setup criteria for selecting the socket address */
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+
+    /* get socket addresses */
+    if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) {
+      OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
+      return (1);
+    }
   }
-
-  /* try to connect to server */
-  if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
-              pdb->conn.addr_size) < 0) {
-    sstrerror(errno, errbuff, sizeof(errbuff));
-    OVS_ERROR("connect(): %s", errbuff);
-    close(pdb->conn.sock);
-    return (-1);
+  /* try to connect to the server */
+  for (rp = result; rp != NULL; rp = rp->ai_next) {
+    if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) <
+        0) {
+      sstrerror(errno, errbuff, sizeof(errbuff));
+      OVS_DEBUG("socket(): %s", errbuff);
+      continue;
+    }
+    if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+      sstrerror(errno, errbuff, sizeof(errbuff));
+      OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
+      close(pdb->sock);
+    } else {
+      is_connected = 1;
+      break;
+    }
   }
 
   /* send notification to event thread */
-  ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
-  return (0);
+  if (is_connected)
+    ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
+  else
+    OVS_ERROR("connect to \"%s\" failed", pdb->node);
+
+  freeaddrinfo(result);
+  return !is_connected;
 }
 
 /* POLL worker thread.
@@ -823,10 +779,8 @@ ovs_db_reconnect(ovs_db_t *pdb)
  * requests/reply/events etc. Also, it reconnects to OVS DB
  * if connection has been lost.
  */
-static void *
-ovs_poll_worker(void *arg)
-{
-  ovs_db_t *pdb = (ovs_db_t *)arg;      /* pointer to OVS DB */
+static void *ovs_poll_worker(void *arg) {
+  ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
   ovs_json_reader_t *jreader = NULL;
   const char *json;
   size_t json_len;
@@ -841,7 +795,7 @@ ovs_poll_worker(void *arg)
   }
 
   /* start polling data */
-  poll_fd.fd = pdb->conn.sock;
+  poll_fd.fd = pdb->sock;
   poll_fd.events = POLLIN | POLLPRI;
   poll_fd.revents = 0;
 
@@ -856,10 +810,11 @@ ovs_poll_worker(void *arg)
           usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
         }
         ovs_json_reader_reset(jreader);
-        poll_fd.fd = pdb->conn.sock;
+        poll_fd.fd = pdb->sock;
       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
         /* connection is broken */
         OVS_ERROR("poll() peer closed its end of the channel");
+        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
         close(poll_fd.fd);
       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
         /* read incoming data */
@@ -872,12 +827,13 @@ ovs_poll_worker(void *arg)
             ovs_db_json_data_process(pdb, json, json_len);
         } else if (nbytes == 0) {
           OVS_ERROR("recv() peer has performed an orderly shutdown");
+          ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
           close(poll_fd.fd);
         } else {
           OVS_ERROR("recv() receive data error");
           break;
         }
-      }                         /* poll() POLLIN & POLLPRI */
+      } /* poll() POLLIN & POLLPRI */
     } else if (poll_ret == 0)
       OVS_DEBUG("poll() timeout");
     else {
@@ -898,9 +854,7 @@ thread_exit:
  * task can be done asynchronously which allows to
  * handle OVD DB callback like 'init_cb'.
  */
-static void *
-ovs_event_worker(void *arg)
-{
+static void *ovs_event_worker(void *arg) {
   int ret = 0;
   ovs_db_t *pdb = (ovs_db_t *)arg;
   struct timespec ts;
@@ -914,9 +868,19 @@ ovs_event_worker(void *arg)
     if (!ret) {
       /* handle the event */
       OVS_DEBUG("handle event %d", pdb->event_thread.value);
-      if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
-        if (pdb->init_cb)
-          pdb->init_cb(pdb);
+      switch (pdb->event_thread.value) {
+      case OVS_DB_EVENT_CONN_ESTABLISHED:
+        if (pdb->cb.post_conn_init)
+          pdb->cb.post_conn_init(pdb);
+        break;
+      case OVS_DB_EVENT_CONN_TERMINATED:
+        if (pdb->cb.post_conn_terminate)
+          pdb->cb.post_conn_terminate();
+        break;
+      default:
+        OVS_DEBUG("unknown event received");
+        break;
+      }
     } else if (ret == ETIMEDOUT) {
       /* wait timeout */
       OVS_DEBUG("no event received (timeout)");
@@ -935,9 +899,7 @@ thread_exit:
 }
 
 /* Stop EVENT thread */
-static int
-ovs_db_event_thread_stop(ovs_db_t *pdb)
-{
+static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
     return (-1);
@@ -947,9 +909,7 @@ ovs_db_event_thread_stop(ovs_db_t *pdb)
 }
 
 /* Stop POLL thread */
-static int
-ovs_db_poll_thread_stop(ovs_db_t *pdb)
-{
+static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
   ovs_db_poll_terminate(pdb);
   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
     return (-1);
@@ -961,9 +921,8 @@ ovs_db_poll_thread_stop(ovs_db_t *pdb)
  * Public OVS DB API implementation
  */
 
-ovs_db_t *
-ovs_db_init(const char *surl, ovs_db_callback_t *cb)
-{
+ovs_db_t *ovs_db_init(const char *node, const char *service,
+                      ovs_db_callback_t *cb) {
   pthread_mutexattr_t mutex_attr;
   ovs_db_t *pdb = NULL;
 
@@ -971,20 +930,25 @@ ovs_db_init(const char *surl, ovs_db_callback_t *cb)
   if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
     return (NULL);
 
-  /* convert string url to socket addr */
-  if (ovs_db_url_parse(surl, &pdb->conn) < 0)
-    goto failure;
+  /* node cannot be unset */
+  if (node == NULL || strlen(node) == 0)
+    return (NULL);
+
+  /* store the OVS DB address */
+  sstrncpy(pdb->node, node, sizeof(pdb->node));
+  if (service != NULL)
+    sstrncpy(pdb->service, service, sizeof(pdb->service));
 
   /* setup OVS DB callbacks */
   if (cb)
-    pdb->init_cb = cb->init_cb;
+    pdb->cb = *cb;
 
   /* prepare event thread */
   pthread_cond_init(&pdb->event_thread.cond, NULL);
   pthread_mutex_init(&pdb->event_thread.mutex, NULL);
   pthread_mutex_lock(&pdb->event_thread.mutex);
-  if (plugin_thread_create(&pdb->event_thread.tid, NULL,
-                           ovs_event_worker, pdb) != 0) {
+  if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
+                           pdb) != 0) {
     OVS_ERROR("event worker start failed");
     goto failure;
   }
@@ -993,8 +957,8 @@ ovs_db_init(const char *surl, ovs_db_callback_t *cb)
   ovs_db_reconnect(pdb);
   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
   pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
-  if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
-                           ovs_poll_worker, pdb) != 0) {
+  if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
+      0) {
     OVS_ERROR("pull worker start failed");
     goto failure;
   }
@@ -1011,9 +975,9 @@ ovs_db_init(const char *surl, ovs_db_callback_t *cb)
   return pdb;
 
 failure:
-  if (pdb->conn.sock)
+  if (pdb->sock)
     /* close connection */
-    close(pdb->conn.sock);
+    close(pdb->sock);
   if (pdb->event_thread.tid != 0)
     /* stop event thread */
     if (ovs_db_event_thread_stop(pdb) < 0)
@@ -1026,10 +990,8 @@ failure:
   return NULL;
 }
 
-int
-ovs_db_send_request(ovs_db_t *pdb, const char *method,
-                    const char *params, ovs_db_result_cb_t cb)
-{
+int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
+                        ovs_db_result_cb_t cb) {
   int ret = 0;
   yajl_gen_status yajl_gen_ret;
   yajl_val jparams;
@@ -1087,8 +1049,7 @@ ovs_db_send_request(ovs_db_t *pdb, const char *method,
   }
 
   /* send the request */
-  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
-                &req_len);
+  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
   OVS_DEBUG("%s", req);
   if (!ovs_db_data_send(pdb, req, req_len)) {
     if (cb) {
@@ -1118,11 +1079,10 @@ yajl_gen_failure:
   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
 }
 
-int
-ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
-                         const char **tb_column, ovs_db_table_cb_t update_cb,
-                         ovs_db_result_cb_t result_cb, unsigned int flags)
-{
+int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+                             const char **tb_column,
+                             ovs_db_table_cb_t update_cb,
+                             ovs_db_result_cb_t result_cb, unsigned int flags) {
   yajl_gen jgen;
   yajl_gen_status yajl_gen_ret;
   ovs_callback_t *new_cb = NULL;
@@ -1217,9 +1177,7 @@ yajl_gen_failure:
   return ovs_db_ret;
 }
 
-int
-ovs_db_destroy(ovs_db_t *pdb)
-{
+int ovs_db_destroy(ovs_db_t *pdb) {
   int ovs_db_ret = 0;
   int ret = 0;
 
@@ -1228,7 +1186,7 @@ ovs_db_destroy(ovs_db_t *pdb)
     return (-1);
 
   /* try to lock the structure before releasing */
-  if (ret = pthread_mutex_lock(&pdb->mutex)) {
+  if ((ret = pthread_mutex_lock(&pdb->mutex))) {
     OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
     return (-1);
   }
@@ -1249,8 +1207,8 @@ ovs_db_destroy(ovs_db_t *pdb)
   ovs_db_callback_remove_all(pdb);
 
   /* close connection */
-  if (pdb->conn.sock)
-    close(pdb->conn.sock);
+  if (pdb->sock)
+    close(pdb->sock);
 
   /* release DB handler */
   pthread_mutex_unlock(&pdb->mutex);
@@ -1264,13 +1222,11 @@ ovs_db_destroy(ovs_db_t *pdb)
  */
 
 /* Get YAJL value by key from YAJL dictionary */
-yajl_val
-ovs_utils_get_value_by_key(yajl_val jval, const char *key)
-{
+yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
   const char *obj_key = NULL;
 
   /* check params */
-  if (!YAJL_IS_OBJECT(jval) || !key)
+  if (!YAJL_IS_OBJECT(jval) || (key == NULL))
     return NULL;
 
   /* find a value by key */
@@ -1282,3 +1238,50 @@ ovs_utils_get_value_by_key(yajl_val jval, const char *key)
 
   return NULL;
 }
+
+/* Get OVS DB map value by given map key */
+yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
+  size_t map_len = 0;
+  size_t array_len = 0;
+  yajl_val *map_values = NULL;
+  yajl_val *array_values = NULL;
+  const char *str_val = NULL;
+
+  /* check YAJL array */
+  if (!YAJL_IS_ARRAY(jval) || (key == NULL))
+    return NULL;
+
+  /* check a database map value (2-element, first one should be a string */
+  array_len = YAJL_GET_ARRAY(jval)->len;
+  array_values = YAJL_GET_ARRAY(jval)->values;
+  if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
+      (!YAJL_IS_ARRAY(array_values[1])))
+    return NULL;
+
+  /* check first element of the array */
+  str_val = YAJL_GET_STRING(array_values[0]);
+  if (strcmp("map", str_val) != 0)
+    return NULL;
+
+  /* try to find map value by map key */
+  map_len = YAJL_GET_ARRAY(array_values[1])->len;
+  map_values = YAJL_GET_ARRAY(array_values[1])->values;
+  for (int i = 0; i < map_len; i++) {
+    /* check YAJL array */
+    if (!YAJL_IS_ARRAY(map_values[i]))
+      break;
+
+    /* check a database pair value (2-element, first one represents a key
+     * and it should be a string in our case */
+    array_len = YAJL_GET_ARRAY(map_values[i])->len;
+    array_values = YAJL_GET_ARRAY(map_values[i])->values;
+    if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
+      break;
+
+    /* return map value if given key equals map key */
+    str_val = YAJL_GET_STRING(array_values[0]);
+    if (strcmp(key, str_val) == 0)
+      return array_values[1];
+  }
+  return NULL;
+}