Merge remote-tracking branch 'origin/collectd-5.8'
[collectd.git] / src / utils_ovs.c
index 983f249..0ee05e0 100644 (file)
@@ -3,14 +3,17 @@
  *
  * Copyright(c) 2016 Intel Corporation. All rights reserved.
  *
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ *of
  * this software and associated documentation files (the "Software"), to deal in
  * the Software without restriction, including without limitation the rights to
  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
- * of the Software, and to permit persons to whom the Software is furnished to do
+ * of the Software, and to permit persons to whom the Software is furnished to
+ *do
  * so, subject to the following conditions:
  *
- * The above copyright notice and this permission notice shall be included in all
+ * The above copyright notice and this permission notice shall be included in
+ *all
  * copies or substantial portions of the Software.
  *
  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  *
  * Authors:
  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
- *
+ **/
+
+/* clang-format off */
+/*
  *                         OVS DB API internal architecture diagram
  * +------------------------------------------------------------------------------+
  * |OVS plugin      |OVS utils                                                    |
  * +-------------------+----------------------------------------------+-----------+
  * |                                 TCP/UNIX socket                              |
  * +-------------------------------------------------------------------------------
- *
- **/
+ */
+/* clang-format on */
 
 /* collectd headers */
+#include "collectd.h"
+
 #include "common.h"
 
 /* private headers */
 #include "utils_ovs.h"
 
 /* system libraries */
-#include <semaphore.h>
+#if HAVE_NETDB_H
+#include <netdb.h>
+#endif
+#if HAVE_ARPA_INET_H
 #include <arpa/inet.h>
+#endif
+#if HAVE_POLL_H
 #include <poll.h>
+#endif
+#if HAVE_SYS_UN_H
 #include <sys/un.h>
+#endif
 
-#define OVS_ERROR(fmt, ...) do { \
-  ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
-#define OVS_DEBUG(fmt, ...) do { \
-  DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
-        ## __VA_ARGS__); } while (0)
+#include <semaphore.h>
 
-#define OVS_DB_POLL_TIMEOUT          1  /* poll receive timeout (sec) */
-#define OVS_DB_POLL_READ_BLOCK_SIZE  512        /* read block size (bytes) */
-#define OVS_DB_DEFAULT_DB_NAME       "Open_vSwitch"
-#define OVS_DB_RECONNECT_TIMEOUT     1  /* reconnect timeout (sec) */
+#define OVS_ERROR(fmt, ...)                                                    \
+  do {                                                                         \
+    ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
+  } while (0)
+#define OVS_DEBUG(fmt, ...)                                                    \
+  do {                                                                         \
+    DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
+          ##__VA_ARGS__);                                                      \
+  } while (0)
+
+#define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
+#define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
+#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
 
-#define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
-#define OVS_DB_EVENT_TERMINATE       1
-#define OVS_DB_EVENT_CONN_ESTABLISHED     2
-#define OVS_DB_EVENT_CONN_TERMINATED      3
+#define OVS_DB_EVENT_NONE 0
+#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
+#define OVS_DB_EVENT_TERMINATE 1
+#define OVS_DB_EVENT_CONN_ESTABLISHED 2
+#define OVS_DB_EVENT_CONN_TERMINATED 3
 
-#define OVS_DB_POLL_STATE_RUNNING    1
-#define OVS_DB_POLL_STATE_EXITING    2
+#define OVS_DB_POLL_STATE_RUNNING 1
+#define OVS_DB_POLL_STATE_EXITING 2
 
-#define OVS_DB_SEND_REQ_TIMEOUT      5  /* send request timeout (sec) */
+#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
 
-#define OVS_YAJL_CALL(func, ...) \
-  do { \
-    yajl_gen_ret = yajl_gen_status_ok; \
-    if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
-      goto yajl_gen_failure; \
+#define OVS_YAJL_CALL(func, ...)                                               \
+  do {                                                                         \
+    yajl_gen_ret = yajl_gen_status_ok;                                         \
+    if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
+      goto yajl_gen_failure;                                                   \
   } while (0)
-#define OVS_YAJL_ERROR_BUFFER_SIZE       1024
-#define OVS_ERROR_BUFF_SIZE              512
-#define OVS_UID_STR_SIZE                 17     /* 64-bit HEX string len + '\0' */
+#define OVS_YAJL_ERROR_BUFFER_SIZE 1024
+#define OVS_ERROR_BUFF_SIZE 512
+#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
 
 /* JSON reader internal data */
 struct ovs_json_reader_s {
@@ -138,19 +160,6 @@ struct ovs_callback_s {
 };
 typedef struct ovs_callback_s ovs_callback_t;
 
-/* Connection declaration */
-struct ovs_conn_s {
-  int sock;
-  int domain;
-  int type;
-  int addr_size;
-  union {
-    struct sockaddr_in s_inet;
-    struct sockaddr_un s_unix;
-  } addr;
-};
-typedef struct ovs_conn_s ovs_conn_t;
-
 /* Event thread data declaration */
 struct ovs_event_thread_s {
   pthread_t tid;
@@ -175,9 +184,15 @@ struct ovs_db_s {
   pthread_mutex_t mutex;
   ovs_callback_t *remote_cb;
   ovs_db_callback_t cb;
-  ovs_conn_t conn;
+  char service[OVS_DB_ADDR_SERVICE_SIZE];
+  char node[OVS_DB_ADDR_NODE_SIZE];
+  char unix_path[OVS_DB_ADDR_NODE_SIZE];
+  int sock;
 };
-typedef struct ovs_db_s ovs_db_t;
+
+/* Global variables */
+static uint64_t ovs_uid = 0;
+static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 /* Post an event to event thread.
  * Possible events are:
@@ -185,9 +200,7 @@ typedef struct ovs_db_s ovs_db_t;
  *  OVS_DB_EVENT_CONN_ESTABLISHED
  *  OVS_DB_EVENT_CONN_TERMINATED
  */
-static void
-ovs_db_event_post(ovs_db_t *pdb, int event)
-{
+static void ovs_db_event_post(ovs_db_t *pdb, int event) {
   pthread_mutex_lock(&pdb->event_thread.mutex);
   pdb->event_thread.value = event;
   pthread_mutex_unlock(&pdb->event_thread.mutex);
@@ -196,33 +209,22 @@ ovs_db_event_post(ovs_db_t *pdb, int event)
 
 /* Check if POLL thread is still running. Returns
  * 1 if running otherwise 0 is returned */
-static inline int
-ovs_db_poll_is_running(ovs_db_t *pdb)
-{
+static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
   int state = 0;
   pthread_mutex_lock(&pdb->poll_thread.mutex);
   state = pdb->poll_thread.state;
   pthread_mutex_unlock(&pdb->poll_thread.mutex);
-  return (state == OVS_DB_POLL_STATE_RUNNING);
-}
-
-/* Terminate POLL thread */
-static inline void
-ovs_db_poll_terminate(ovs_db_t *pdb)
-{
-  pthread_mutex_lock(&pdb->poll_thread.mutex);
-  pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
-  pthread_mutex_unlock(&pdb->poll_thread.mutex);
+  return state == OVS_DB_POLL_STATE_RUNNING;
 }
 
 /* Generate unique identifier (UID). It is used by OVS DB API
  * to set "id" field for any OVS DB JSON request. */
-static uint64_t
-ovs_uid_generate()
-{
-  struct timespec ts;
-  clock_gettime(CLOCK_MONOTONIC, &ts);
-  return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+static uint64_t ovs_uid_generate() {
+  uint64_t new_uid;
+  pthread_mutex_lock(&ovs_uid_mutex);
+  new_uid = ++ovs_uid;
+  pthread_mutex_unlock(&ovs_uid_mutex);
+  return new_uid;
 }
 
 /*
@@ -231,9 +233,7 @@ ovs_uid_generate()
  */
 
 /* Add new callback into OVS DB object */
-static void
-ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
-{
+static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
   pthread_mutex_lock(&pdb->mutex);
   if (pdb->remote_cb)
     pdb->remote_cb->prev = new_cb;
@@ -244,13 +244,11 @@ ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
 }
 
 /* Remove callback from OVS DB object */
-static void
-ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
-{
+static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
+  pthread_mutex_lock(&pdb->mutex);
   ovs_callback_t *pre_cb = del_cb->prev;
   ovs_callback_t *next_cb = del_cb->next;
 
-  pthread_mutex_lock(&pdb->mutex);
   if (next_cb)
     next_cb->prev = del_cb->prev;
 
@@ -264,118 +262,45 @@ ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
 }
 
 /* Remove all callbacks form OVS DB object */
-static void
-ovs_db_callback_remove_all(ovs_db_t *pdb)
-{
+static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
   pthread_mutex_lock(&pdb->mutex);
-  for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
-       del_cb = pdb->remote_cb) {
-    pdb->remote_cb = pdb->remote_cb->next;
-    free(del_cb);
+  while (pdb->remote_cb != NULL) {
+    ovs_callback_t *del_cb = pdb->remote_cb;
+    pdb->remote_cb = del_cb->next;
+    sfree(del_cb);
   }
-  pdb->remote_cb = NULL;
   pthread_mutex_unlock(&pdb->mutex);
 }
 
 /* Get/find callback in OVS DB object by UID. Returns pointer
- * to requested callback otherwise NULL is returned */
-static ovs_callback_t *
-ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
-{
-  pthread_mutex_lock(&pdb->mutex);
+ * to requested callback otherwise NULL is returned.
+ *
+ * IMPORTANT NOTE:
+ *   The OVS DB mutex MUST be locked by the caller
+ *   to make sure that returned callback is still valid.
+ */
+static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
-    if (cb->uid == uid) {
-      pthread_mutex_unlock(&pdb->mutex);
+    if (cb->uid == uid)
       return cb;
-    }
-  pthread_mutex_unlock(&pdb->mutex);
   return NULL;
 }
 
 /* Send all requested data to the socket. Returns 0 if
  * ALL request data has been sent otherwise negative value
  * is returned */
-static int
-ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
   ssize_t nbytes = 0;
   size_t rem = len;
   size_t off = 0;
 
   while (rem > 0) {
-    if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
-      return (-1);
+    if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
+      return -1;
     rem -= (size_t)nbytes;
     off += (size_t)nbytes;
   }
-  return (0);
-}
-
-/* Parse OVS server URL.
- * Format of the URL:
- *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
- *   "unix:file" - define UNIX socket file (UNIX domain)
- */
-static int
-ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
-{
-  ovs_conn_t tmp_conn;
-  char *nexttok = NULL;
-  char *in_str = NULL;
-  char *saveptr;
-  int ret = 0;
-
-  /* sanity check */
-  if ((surl == NULL) || (strlen(surl) < 1))
-    return (-1);
-
-  /* parse domain */
-  tmp_conn = *conn;
-  in_str = sstrdup(surl);
-  if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
-    if (strcmp("tcp", nexttok) == 0) {
-      tmp_conn.domain = AF_INET;
-      tmp_conn.type = SOCK_STREAM;
-      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
-    } else if (strcmp("unix", nexttok) == 0) {
-      tmp_conn.domain = AF_UNIX;
-      tmp_conn.type = SOCK_STREAM;
-      tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
-    } else
-      goto failure;
-  } else
-    goto failure;
-
-  /* parse url depending on domain */
-  if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
-    if (tmp_conn.domain == AF_UNIX) {
-      /* <UNIX-NAME> */
-      tmp_conn.addr.s_inet.sin_family = AF_UNIX;
-      sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
-    } else {
-      /* <IP:PORT> */
-      tmp_conn.addr.s_inet.sin_family = AF_INET;
-      ret =
-        inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
-      if (ret == 1) {
-        if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
-          tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
-        else
-          goto failure;
-      } else
-        goto failure;
-    }
-  }
-
-  /* save result and return success */
-  *conn = tmp_conn;
-  sfree(in_str);
-  return (0);
-
-failure:
-  OVS_ERROR("%s() : invalid OVS DB URL provided");
-  sfree(in_str);
-  return (-1);
+  return 0;
 }
 
 /*
@@ -390,10 +315,9 @@ failure:
  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
  * string - Null-terminated string
  */
-static inline yajl_gen_status
-ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
-{
-  return yajl_gen_string(hander, string, strlen(string));
+static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
+                                            const char *string) {
+  return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
 }
 
 /* Add YAJL value into YAJL generator handle (JSON object)
@@ -401,15 +325,16 @@ ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
  * jval - YAJL value usually returned by yajl_tree_get()
  */
-static yajl_gen_status
-ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
-{
+static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
   size_t array_len = 0;
   yajl_val *jvalues = NULL;
   yajl_val jobj_value = NULL;
   const char *obj_key = NULL;
   size_t obj_len = 0;
-  yajl_gen_status yajl_gen_ret;
+  yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
+
+  if (jval == NULL)
+    return yajl_gen_generation_complete;
 
   if (YAJL_IS_STRING(jval))
     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
@@ -428,14 +353,14 @@ ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
     array_len = YAJL_GET_ARRAY(jval)->len;
     jvalues = YAJL_GET_ARRAY(jval)->values;
     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
-    for (int i = 0; i < array_len; i++)
+    for (size_t i = 0; i < array_len; i++)
       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
   } else if (YAJL_IS_OBJECT(jval)) {
     /* create new object and add all elements into the object */
     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
     obj_len = YAJL_GET_OBJECT(jval)->len;
-    for (int i = 0; i < obj_len; i++) {
+    for (size_t i = 0; i < obj_len; i++) {
       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
@@ -458,9 +383,7 @@ yajl_gen_failure:
  * "echo" request to the client, client should generate
  * "echo" replay with the same content received in the
  * request */
-static int
-ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
   yajl_val jparams;
   yajl_val jid;
   yajl_gen jgen;
@@ -471,7 +394,7 @@ ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
   yajl_gen_status yajl_gen_ret;
 
   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
-    return (-1);
+    return -1;
 
   /* check & get request attributes */
   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
@@ -504,28 +427,27 @@ ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
   }
   /* clean up and return success */
   yajl_gen_clear(jgen);
-  return (0);
+  return 0;
 
 yajl_gen_failure:
   /* release memory */
   yajl_gen_clear(jgen);
-  return (-1);
+  return -1;
 }
 
 /* Get OVS DB registered callback by YAJL val. The YAJL
  * value should be YAJL string (UID). Returns NULL if
- * callback hasn't been found.
+ * callback hasn't been found. See also ovs_db_callback_get()
+ * description for addition info.
  */
-static ovs_callback_t *
-ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
-{
+static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
   char *endptr = NULL;
   const char *suid = NULL;
   uint64_t uid;
 
   if (jid && YAJL_IS_STRING(jid)) {
     suid = YAJL_GET_STRING(jid);
-    uid = (uint64_t) strtoul(suid, &endptr, 16);
+    uid = (uint64_t)strtoul(suid, &endptr, 16);
     if (*endptr == '\0' && uid)
       return ovs_db_callback_get(pdb, uid);
   }
@@ -538,45 +460,48 @@ ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
  * table update callback is received from the DB
  * server. Once registered callback found, it's called
  * by this handler. */
-static int
-ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
   ovs_callback_t *cb = NULL;
   yajl_val jvalue;
   yajl_val jparams;
   yajl_val jtable_updates;
-  yajl_val jtable_update;
-  size_t obj_len = 0;
-  const char *table_name = NULL;
   const char *params_path[] = {"params", NULL};
   const char *id_path[] = {"id", NULL};
 
   /* check & get request attributes */
   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
-      (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
-    goto ovs_failure;
+      (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
+    OVS_ERROR("invalid OVS DB request received");
+    return -1;
+  }
 
   /* check array length: [<json-value>, <table-updates>] */
-  if (YAJL_GET_ARRAY(jparams)->len != 2)
-    goto ovs_failure;
+  if ((YAJL_GET_ARRAY(jparams) == NULL) ||
+      (YAJL_GET_ARRAY(jparams)->len != 2)) {
+    OVS_ERROR("invalid OVS DB request received");
+    return -1;
+  }
 
   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
-  if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
-    goto ovs_failure;
+  if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
+    OVS_ERROR("invalid OVS DB request id or table update received");
+    return -1;
+  }
 
   /* find registered callback based on <json-value> */
+  pthread_mutex_lock(&pdb->mutex);
   cb = ovs_db_table_callback_get(pdb, jvalue);
-  if (cb == NULL || cb->table.call == NULL)
-    goto ovs_failure;
+  if (cb == NULL || cb->table.call == NULL) {
+    OVS_ERROR("No OVS DB table update callback found");
+    pthread_mutex_unlock(&pdb->mutex);
+    return -1;
+  }
 
   /* call registered callback */
   cb->table.call(jtable_updates);
+  pthread_mutex_unlock(&pdb->mutex);
   return 0;
-
-ovs_failure:
-  OVS_ERROR("invalid OVS DB table update event");
-  return (-1);
 }
 
 /* OVS DB result request handler.
@@ -584,9 +509,7 @@ ovs_failure:
  * result reply is received from the DB server.
  * Once registered callback found, it's called
  * by this handler. */
-static int
-ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
   ovs_callback_t *cb = NULL;
   yajl_val jresult;
   yajl_val jerror;
@@ -601,9 +524,10 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
 
   /* check & get result attributes */
   if (!jresult || !jerror || !jid)
-    return (-1);
+    return -1;
 
   /* try to find registered callback */
+  pthread_mutex_lock(&pdb->mutex);
   cb = ovs_db_table_callback_get(pdb, jid);
   if (cb != NULL && cb->result.call != NULL) {
     /* call registered callback */
@@ -612,7 +536,8 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
     sem_post(&cb->result.sync);
   }
 
-  return (0);
+  pthread_mutex_unlock(&pdb->mutex);
+  return 0;
 }
 
 /* Handle JSON data (one request) and call
@@ -620,9 +545,8 @@ ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
  * update callback 'ovs_db_table_update_cb' and
  * result callback 'ovs_db_result_cb' is supported.
  */
-static int
-ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
+                                    size_t len) {
   const char *method = NULL;
   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
   const char *method_path[] = {"method", NULL};
@@ -632,23 +556,27 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
 
   /* duplicate the data to make null-terminated string
    * required for yajl_tree_parse() */
-  if ((sjson = malloc(len + 1)) == NULL)
-    return (-1);
+  if ((sjson = calloc(1, len + 1)) == NULL)
+    return -1;
 
   sstrncpy(sjson, data, len + 1);
-  OVS_DEBUG("[len=%d] %s", len, sjson);
+  OVS_DEBUG("[len=%" PRIsz "] %s", len, sjson);
 
   /* parse json data */
   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
   if (jnode == NULL) {
     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
     sfree(sjson);
-    return (-1);
+    return -1;
   }
 
   /* get method name */
-  if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
-    method = YAJL_GET_STRING(jval);
+  if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
+    if ((method = YAJL_GET_STRING(jval)) == NULL) {
+      yajl_tree_free(jnode);
+      sfree(sjson);
+      return -1;
+    }
     if (strcmp("echo", method) == 0) {
       /* echo request from the server */
       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
@@ -658,7 +586,7 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
       if (ovs_db_table_update_cb(pdb, jnode) < 0)
         OVS_ERROR("handle update notification failed");
     }
-  } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_any)) {
+  } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
     /* result notification */
     if (ovs_db_result_cb(pdb, jnode) < 0)
       OVS_ERROR("handle result reply failed");
@@ -668,7 +596,7 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
   /* release memory */
   yajl_tree_free(jnode);
   sfree(sjson);
-  return (0);
+  return 0;
 }
 
 /*
@@ -680,9 +608,7 @@ ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
  */
 
 /* Allocate JSON reader instance */
-static inline ovs_json_reader_t *
-ovs_json_reader_alloc()
-{
+static ovs_json_reader_t *ovs_json_reader_alloc() {
   ovs_json_reader_t *jreader = NULL;
 
   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
@@ -692,10 +618,8 @@ ovs_json_reader_alloc()
 }
 
 /* Push raw data into into the JSON reader for processing */
-static inline int
-ovs_json_reader_push_data(ovs_json_reader_t *jreader,
-                          const char *data, size_t data_len)
-{
+static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
+                                     const char *data, size_t data_len) {
   char *new_buff = NULL;
   size_t available = jreader->buff_size - jreader->buff_offset;
 
@@ -707,7 +631,7 @@ ovs_json_reader_push_data(ovs_json_reader_t *jreader,
     /* allocate new chunk of memory */
     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
     if (new_buff == NULL)
-      return (-1);
+      return -1;
 
     /* point to new allocated memory */
     jreader->buff_ptr = new_buff;
@@ -717,22 +641,20 @@ ovs_json_reader_push_data(ovs_json_reader_t *jreader,
   /* store input data */
   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
   jreader->buff_offset += data_len;
-  return (0);
+  return 0;
 }
 
 /* Pop one fully-fledged JSON if already exists. Returns 0 if
  * completed JSON already exists otherwise negative value is
  * returned */
-static inline int
-ovs_json_reader_pop(ovs_json_reader_t *jreader,
-                    const char **json_ptr, size_t *json_len_ptr)
-{
+static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
+                               const char **json_ptr, size_t *json_len_ptr) {
   size_t nbraces = 0;
   size_t json_len = 0;
   char *json = NULL;
 
   /* search open/close brace */
-  for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
+  for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
     if (jreader->buff_ptr[i] == '{') {
       nbraces++;
     } else if (jreader->buff_ptr[i] == '}')
@@ -742,7 +664,7 @@ ovs_json_reader_pop(ovs_json_reader_t *jreader,
           *json_ptr = jreader->buff_ptr + jreader->json_offset;
           *json_len_ptr = json_len + 1;
           jreader->json_offset = i + 1;
-          return (0);
+          return 0;
         }
 
     /* increase JSON data length */
@@ -756,7 +678,7 @@ ovs_json_reader_pop(ovs_json_reader_t *jreader,
        * and zero rest of the buffer data */
       json = &jreader->buff_ptr[jreader->json_offset];
       json_len = jreader->buff_offset - jreader->json_offset;
-      for (int i = 0; i < jreader->buff_size; i++)
+      for (size_t i = 0; i < jreader->buff_size; i++)
         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
       jreader->buff_offset = json_len;
     } else
@@ -767,15 +689,13 @@ ovs_json_reader_pop(ovs_json_reader_t *jreader,
     jreader->json_offset = 0;
   }
 
-  return (-1);
+  return -1;
 }
 
 /* Reset JSON reader. It is useful when start processing
  * new raw data. E.g.: in case of lost stream connection.
  */
-static inline void
-ovs_json_reader_reset(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
   if (jreader) {
     jreader->buff_offset = 0;
     jreader->json_offset = 0;
@@ -783,45 +703,74 @@ ovs_json_reader_reset(ovs_json_reader_t *jreader)
 }
 
 /* Release internal data allocated for JSON reader */
-static inline void
-ovs_json_reader_free(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
   if (jreader) {
     free(jreader->buff_ptr);
     free(jreader);
   }
 }
 
-/* Reconnect to OVD DB and call init OVS DB callback
- * 'init_cb' if connection has been established.
+/* Reconnect to OVS DB and call the OVS DB post connection init callback
+ * if connection has been established.
  */
-static int
-ovs_db_reconnect(ovs_db_t *pdb)
-{
-  char errbuff[OVS_ERROR_BUFF_SIZE];
-
-  /* remove all registered OVS DB table/result callbacks */
-  ovs_db_callback_remove_all(pdb);
-
-  /* open new socket */
-  if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
-    sstrerror(errno, errbuff, sizeof(errbuff));
-    OVS_ERROR("socket(): %s", errbuff);
-    return (-1);
+static void ovs_db_reconnect(ovs_db_t *pdb) {
+  const char *node_info = pdb->node;
+  struct addrinfo *result;
+
+  if (pdb->unix_path[0] != '\0') {
+    /* use UNIX socket instead of INET address */
+    node_info = pdb->unix_path;
+    result = calloc(1, sizeof(struct addrinfo));
+    struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
+    if (result == NULL || sa_unix == NULL) {
+      sfree(result);
+      sfree(sa_unix);
+      return;
+    }
+    result->ai_family = AF_UNIX;
+    result->ai_socktype = SOCK_STREAM;
+    result->ai_addrlen = sizeof(*sa_unix);
+    result->ai_addr = (struct sockaddr *)sa_unix;
+    sa_unix->sun_family = result->ai_family;
+    sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
+  } else {
+    /* inet socket address */
+    struct addrinfo hints;
+
+    /* setup criteria for selecting the socket address */
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_UNSPEC;
+    hints.ai_socktype = SOCK_STREAM;
+
+    /* get socket addresses */
+    int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
+    if (ret != 0) {
+      OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
+      return;
+    }
   }
-
-  /* try to connect to server */
-  if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
-              pdb->conn.addr_size) < 0) {
-    sstrerror(errno, errbuff, sizeof(errbuff));
-    OVS_ERROR("connect(): %s", errbuff);
-    close(pdb->conn.sock);
-    return (-1);
+  /* try to connect to the server */
+  for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
+    int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+    if (sock < 0) {
+      OVS_DEBUG("socket(): %s", STRERRNO);
+      continue;
+    }
+    if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+      close(sock);
+      OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family);
+    } else {
+      /* send notification to event thread */
+      pdb->sock = sock;
+      ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
+      break;
+    }
   }
 
-  /* send notification to event thread */
-  ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
-  return (0);
+  if (pdb->sock < 0)
+    OVS_ERROR("connect to \"%s\" failed", node_info);
+
+  freeaddrinfo(result);
 }
 
 /* POLL worker thread.
@@ -829,116 +778,115 @@ ovs_db_reconnect(ovs_db_t *pdb)
  * requests/reply/events etc. Also, it reconnects to OVS DB
  * if connection has been lost.
  */
-static void *
-ovs_poll_worker(void *arg)
-{
-  ovs_db_t *pdb = (ovs_db_t *)arg;      /* pointer to OVS DB */
+static void *ovs_poll_worker(void *arg) {
+  ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
   ovs_json_reader_t *jreader = NULL;
-  const char *json;
-  size_t json_len;
-  ssize_t nbytes = 0;
-  char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
-  struct pollfd poll_fd;
-  int poll_ret = 0;
+  struct pollfd poll_fd = {
+      .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
+  };
 
+  /* create JSON reader instance */
   if ((jreader = ovs_json_reader_alloc()) == NULL) {
     OVS_ERROR("initialize json reader failed");
-    goto thread_exit;
+    return NULL;
   }
 
-  /* start polling data */
-  poll_fd.fd = pdb->conn.sock;
-  poll_fd.events = POLLIN | POLLPRI;
-  poll_fd.revents = 0;
-
   /* poll data */
   while (ovs_db_poll_is_running(pdb)) {
-    poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
-    if (poll_ret > 0) {
-      if (poll_fd.revents & POLLNVAL) {
-        /* invalid file descriptor, reconnect */
-        if (ovs_db_reconnect(pdb) != 0) {
-          /* sleep awhile until next reconnect */
-          usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
-        }
-        ovs_json_reader_reset(jreader);
-        poll_fd.fd = pdb->conn.sock;
-      } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
-        /* connection is broken */
-        OVS_ERROR("poll() peer closed its end of the channel");
-        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
-        close(poll_fd.fd);
-      } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
-        /* read incoming data */
-        nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
-        if (nbytes > 0) {
-          OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
-          ovs_json_reader_push_data(jreader, buff, nbytes);
-          while (!ovs_json_reader_pop(jreader, &json, &json_len))
-            /* process JSON data */
-            ovs_db_json_data_process(pdb, json, json_len);
-        } else if (nbytes == 0) {
-          OVS_ERROR("recv() peer has performed an orderly shutdown");
-          ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
-          close(poll_fd.fd);
-        } else {
-          OVS_ERROR("recv() receive data error");
-          break;
-        }
-      }                         /* poll() POLLIN & POLLPRI */
-    } else if (poll_ret == 0)
-      OVS_DEBUG("poll() timeout");
-    else {
-      OVS_ERROR("poll() error");
+    poll_fd.fd = pdb->sock;
+    int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+    if (poll_ret < 0) {
+      OVS_ERROR("poll(): %s", STRERRNO);
       break;
+    } else if (poll_ret == 0) {
+      OVS_DEBUG("poll(): timeout");
+      if (pdb->sock < 0)
+        /* invalid fd, so try to reconnect */
+        ovs_db_reconnect(pdb);
+      continue;
+    }
+    if (poll_fd.revents & POLLNVAL) {
+      /* invalid file descriptor, clean-up */
+      ovs_db_callback_remove_all(pdb);
+      ovs_json_reader_reset(jreader);
+      /* setting poll FD to -1 tells poll() call to ignore this FD.
+       * In that case poll() call will return timeout all the time */
+      pdb->sock = (-1);
+    } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+      /* connection is broken */
+      close(poll_fd.fd);
+      ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+      OVS_ERROR("poll() peer closed its end of the channel");
+    } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+      /* read incoming data */
+      char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+      ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
+      if (nbytes < 0) {
+        OVS_ERROR("recv(): %s", STRERRNO);
+        /* read error? Try to reconnect */
+        close(poll_fd.fd);
+        continue;
+      } else if (nbytes == 0) {
+        close(poll_fd.fd);
+        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+        OVS_ERROR("recv() peer has performed an orderly shutdown");
+        continue;
+      }
+      /* read incoming data */
+      size_t json_len = 0;
+      const char *json = NULL;
+      OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
+      ovs_json_reader_push_data(jreader, buff, nbytes);
+      while (!ovs_json_reader_pop(jreader, &json, &json_len))
+        /* process JSON data */
+        ovs_db_json_data_process(pdb, json, json_len);
     }
   }
 
-thread_exit:
   OVS_DEBUG("poll thread has been completed");
   ovs_json_reader_free(jreader);
-  pthread_exit((void *)0);
-  return ((void *)0);
+  return NULL;
 }
 
 /* EVENT worker thread.
  * Perform task based on incoming events. This
  * task can be done asynchronously which allows to
- * handle OVD DB callback like 'init_cb'.
+ * handle OVS DB callback like 'init_cb'.
  */
-static void *
-ovs_event_worker(void *arg)
-{
-  int ret = 0;
+static void *ovs_event_worker(void *arg) {
   ovs_db_t *pdb = (ovs_db_t *)arg;
-  struct timespec ts;
 
   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
     /* wait for an event */
+    struct timespec ts;
     clock_gettime(CLOCK_REALTIME, &ts);
     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
-    ret = pthread_cond_timedwait(&pdb->event_thread.cond,
-                                 &pdb->event_thread.mutex, &ts);
-    if (!ret) {
+    int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+                                     &pdb->event_thread.mutex, &ts);
+    if (!ret || ret == ETIMEDOUT) {
       /* handle the event */
       OVS_DEBUG("handle event %d", pdb->event_thread.value);
       switch (pdb->event_thread.value) {
       case OVS_DB_EVENT_CONN_ESTABLISHED:
         if (pdb->cb.post_conn_init)
           pdb->cb.post_conn_init(pdb);
+        /* reset event */
+        pdb->event_thread.value = OVS_DB_EVENT_NONE;
         break;
       case OVS_DB_EVENT_CONN_TERMINATED:
         if (pdb->cb.post_conn_terminate)
           pdb->cb.post_conn_terminate();
+        /* reset event */
+        pdb->event_thread.value = OVS_DB_EVENT_NONE;
+        break;
+      case OVS_DB_EVENT_NONE:
+        /* wait timeout */
+        OVS_DEBUG("no event received (timeout)");
         break;
       default:
         OVS_DEBUG("unknown event received");
         break;
       }
-    } else if (ret == ETIMEDOUT) {
-      /* wait timeout */
-      OVS_DEBUG("no event received (timeout)");
-      continue;
     } else {
       /* unexpected error */
       OVS_ERROR("pthread_cond_timedwait() failed");
@@ -946,108 +894,172 @@ ovs_event_worker(void *arg)
     }
   }
 
-thread_exit:
   OVS_DEBUG("event thread has been completed");
-  pthread_exit((void *)0);
-  return ((void *)0);
+  return NULL;
 }
 
-/* Stop EVENT thread */
-static int
-ovs_db_event_thread_stop(ovs_db_t *pdb)
-{
+/* Initialize EVENT thread */
+static int ovs_db_event_thread_init(ovs_db_t *pdb) {
+  pdb->event_thread.tid = (pthread_t){0};
+  /* init event thread condition variable */
+  if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
+    return -1;
+  }
+  /* init event thread mutex */
+  if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return -1;
+  }
+  /* Hold the event thread mutex. It ensures that no events
+   * will be lost while thread is still starting. Once event
+   * thread is started and ready to accept events, it will release
+   * the mutex */
+  if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
+    pthread_mutex_destroy(&pdb->event_thread.mutex);
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return -1;
+  }
+  /* start event thread */
+  pthread_t tid;
+  if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
+                           "utils_ovs:event") != 0) {
+    pthread_mutex_unlock(&pdb->event_thread.mutex);
+    pthread_mutex_destroy(&pdb->event_thread.mutex);
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return -1;
+  }
+  pdb->event_thread.tid = tid;
+  return 0;
+}
+
+/* Terminate EVENT thread */
+static int ovs_db_event_thread_terminate(ovs_db_t *pdb) {
+  if (pthread_equal(pdb->event_thread.tid, (pthread_t){0})) {
+    /* already terminated */
+    return 0;
+  }
   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
-    return (-1);
+    return -1;
+  /* Event thread always holds the thread mutex when
+   * performs some task (handles event) and releases it when
+   * while sleeping. Thus, if event thread exits, the mutex
+   * remains locked */
+  pdb->event_thread.tid = (pthread_t){0};
   pthread_mutex_unlock(&pdb->event_thread.mutex);
+  return 0;
+}
+
+/* Destroy EVENT thread private data */
+static void ovs_db_event_thread_data_destroy(ovs_db_t *pdb) {
+  /* destroy mutex */
   pthread_mutex_destroy(&pdb->event_thread.mutex);
-  return (0);
+  pthread_cond_destroy(&pdb->event_thread.cond);
+}
+
+/* Initialize POLL thread */
+static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
+  pdb->poll_thread.tid = (pthread_t){0};
+  /* init event thread mutex */
+  if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
+    return -1;
+  }
+  /* start poll thread */
+  pthread_t tid;
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+  if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
+                           "utils_ovs:poll") != 0) {
+    pthread_mutex_destroy(&pdb->poll_thread.mutex);
+    return -1;
+  }
+  pdb->poll_thread.tid = tid;
+  return 0;
 }
 
-/* Stop POLL thread */
-static int
-ovs_db_poll_thread_stop(ovs_db_t *pdb)
-{
-  ovs_db_poll_terminate(pdb);
+/* Destroy POLL thread */
+/* XXX: Must hold pdb->mutex when calling! */
+static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
+  if (pthread_equal(pdb->poll_thread.tid, (pthread_t){0})) {
+    /* already destroyed */
+    return 0;
+  }
+  /* change thread state */
+  pthread_mutex_lock(&pdb->poll_thread.mutex);
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+  pthread_mutex_unlock(&pdb->poll_thread.mutex);
+  /* join the thread */
   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
-    return (-1);
+    return -1;
   pthread_mutex_destroy(&pdb->poll_thread.mutex);
-  return (0);
+  pdb->poll_thread.tid = (pthread_t){0};
+  return 0;
 }
 
 /*
  * Public OVS DB API implementation
  */
 
-ovs_db_t *
-ovs_db_init(const char *surl, ovs_db_callback_t *cb)
-{
-  pthread_mutexattr_t mutex_attr;
-  ovs_db_t *pdb = NULL;
+ovs_db_t *ovs_db_init(const char *node, const char *service,
+                      const char *unix_path, ovs_db_callback_t *cb) {
+  /* sanity check */
+  if (node == NULL || service == NULL || unix_path == NULL)
+    return NULL;
 
   /* allocate db data & fill it */
-  if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
-    return (NULL);
+  ovs_db_t *pdb = calloc(1, sizeof(*pdb));
+  if (pdb == NULL)
+    return NULL;
+  pdb->sock = -1;
 
-  /* convert string url to socket addr */
-  if (ovs_db_url_parse(surl, &pdb->conn) < 0)
-    goto failure;
+  /* store the OVS DB address */
+  sstrncpy(pdb->node, node, sizeof(pdb->node));
+  sstrncpy(pdb->service, service, sizeof(pdb->service));
+  sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
 
   /* setup OVS DB callbacks */
   if (cb)
     pdb->cb = *cb;
 
-  /* prepare event thread */
-  pthread_cond_init(&pdb->event_thread.cond, NULL);
-  pthread_mutex_init(&pdb->event_thread.mutex, NULL);
-  pthread_mutex_lock(&pdb->event_thread.mutex);
-  if (plugin_thread_create(&pdb->event_thread.tid, NULL,
-                           ovs_event_worker, pdb) != 0) {
-    OVS_ERROR("event worker start failed");
-    goto failure;
+  /* init OVS DB mutex attributes */
+  pthread_mutexattr_t mutex_attr;
+  if (pthread_mutexattr_init(&mutex_attr)) {
+    OVS_ERROR("OVS DB mutex attribute init failed");
+    sfree(pdb);
+    return NULL;
   }
-
-  /* prepare polling thread */
-  ovs_db_reconnect(pdb);
-  pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
-  pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
-  if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
-                           ovs_poll_worker, pdb) != 0) {
-    OVS_ERROR("pull worker start failed");
-    goto failure;
+  /* set OVS DB mutex as recursive */
+  if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
+    OVS_ERROR("Failed to set OVS DB mutex as recursive");
+    pthread_mutexattr_destroy(&mutex_attr);
+    sfree(pdb);
+    return NULL;
   }
-
   /* init OVS DB mutex */
-  if (pthread_mutexattr_init(&mutex_attr) ||
-      pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
-      pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+  if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
     OVS_ERROR("OVS DB mutex init failed");
-    goto failure;
+    pthread_mutexattr_destroy(&mutex_attr);
+    sfree(pdb);
+    return NULL;
   }
+  /* destroy mutex attributes */
+  pthread_mutexattr_destroy(&mutex_attr);
 
-  /* return db to the caller */
-  return pdb;
+  /* init event thread */
+  if (ovs_db_event_thread_init(pdb) < 0) {
+    ovs_db_destroy(pdb);
+    return NULL;
+  }
 
-failure:
-  if (pdb->conn.sock)
-    /* close connection */
-    close(pdb->conn.sock);
-  if (pdb->event_thread.tid != 0)
-    /* stop event thread */
-    if (ovs_db_event_thread_stop(pdb) < 0)
-      OVS_ERROR("stop event thread failed");
-  if (pdb->poll_thread.tid != 0)
-    /* stop poll thread */
-    if (ovs_db_poll_thread_stop(pdb) < 0)
-      OVS_ERROR("stop poll thread failed");
-  sfree(pdb);
-  return NULL;
+  /* init polling thread */
+  if (ovs_db_poll_thread_init(pdb) < 0) {
+    ovs_db_destroy(pdb);
+    return NULL;
+  }
+  return pdb;
 }
 
-int
-ovs_db_send_request(ovs_db_t *pdb, const char *method,
-                    const char *params, ovs_db_result_cb_t cb)
-{
+int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
+                        ovs_db_result_cb_t cb) {
   int ret = 0;
   yajl_gen_status yajl_gen_ret;
   yajl_val jparams;
@@ -1061,16 +1073,16 @@ ovs_db_send_request(ovs_db_t *pdb, const char *method,
 
   /* sanity check */
   if (!pdb || !method || !params)
-    return (-1);
+    return -1;
 
   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
-    return (-1);
+    return -1;
 
   /* try to parse params */
   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
     OVS_ERROR("params is not a JSON string");
     yajl_gen_clear(jgen);
-    return (-1);
+    return -1;
   }
 
   /* generate method field */
@@ -1087,14 +1099,14 @@ ovs_db_send_request(ovs_db_t *pdb, const char *method,
   /* generate id field */
   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
   uid = ovs_uid_generate();
-  ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
+  snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
 
   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
 
   if (cb) {
     /* register result callback */
-    if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+    if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
       goto yajl_gen_failure;
 
     /* add new callback to front */
@@ -1105,8 +1117,7 @@ ovs_db_send_request(ovs_db_t *pdb, const char *method,
   }
 
   /* send the request */
-  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
-                &req_len);
+  OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
   OVS_DEBUG("%s", req);
   if (!ovs_db_data_send(pdb, req, req_len)) {
     if (cb) {
@@ -1136,11 +1147,10 @@ yajl_gen_failure:
   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
 }
 
-int
-ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
-                         const char **tb_column, ovs_db_table_cb_t update_cb,
-                         ovs_db_result_cb_t result_cb, unsigned int flags)
-{
+int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+                             const char **tb_column,
+                             ovs_db_table_cb_t update_cb,
+                             ovs_db_result_cb_t result_cb, unsigned int flags) {
   yajl_gen jgen;
   yajl_gen_status yajl_gen_ret;
   ovs_callback_t *new_cb = NULL;
@@ -1151,14 +1161,17 @@ ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
 
   /* sanity check */
   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
-    return (-1);
+    return -1;
 
-  if ((jgen = yajl_gen_alloc(NULL)) == NULL)
-    return (-1);
+  /* allocate new update callback */
+  if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
+    return -1;
 
-  /* register table update callback */
-  if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
-    return (-1);
+  /* init YAJL generator */
+  if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
+    sfree(new_cb);
+    return -1;
+  }
 
   /* add new callback to front */
   new_cb->table.call = update_cb;
@@ -1172,7 +1185,7 @@ ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
 
     /* uid string <json-value> */
-    ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
+    snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
 
     /* <monitor-requests> */
@@ -1235,43 +1248,45 @@ yajl_gen_failure:
   return ovs_db_ret;
 }
 
-int
-ovs_db_destroy(ovs_db_t *pdb)
-{
+int ovs_db_destroy(ovs_db_t *pdb) {
   int ovs_db_ret = 0;
   int ret = 0;
 
   /* sanity check */
   if (pdb == NULL)
-    return (-1);
+    return -1;
 
-  /* try to lock the structure before releasing */
-  if (ret = pthread_mutex_lock(&pdb->mutex)) {
-    OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
-    return (-1);
+  /* stop event thread */
+  if (ovs_db_event_thread_terminate(pdb) < 0) {
+    OVS_ERROR("stop event thread failed");
+    ovs_db_ret = -1;
   }
 
-  /* stop poll thread */
-  if (ovs_db_event_thread_stop(pdb) < 0) {
-    OVS_ERROR("stop poll thread failed");
-    ovs_db_ret = (-1);
+  /* try to lock the structure before releasing */
+  if ((ret = pthread_mutex_lock(&pdb->mutex))) {
+    OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
+    return -1;
   }
 
-  /* stop event thread */
-  if (ovs_db_poll_thread_stop(pdb) < 0) {
-    OVS_ERROR("stop event thread failed");
-    ovs_db_ret = (-1);
+  /* stop poll thread and destroy thread's private data */
+  if (ovs_db_poll_thread_destroy(pdb) < 0) {
+    OVS_ERROR("destroy poll thread failed");
+    ovs_db_ret = -1;
   }
 
+  /* destroy event thread private data */
+  ovs_db_event_thread_data_destroy(pdb);
+
+  pthread_mutex_unlock(&pdb->mutex);
+
   /* unsubscribe callbacks */
   ovs_db_callback_remove_all(pdb);
 
   /* close connection */
-  if (pdb->conn.sock)
-    close(pdb->conn.sock);
+  if (pdb->sock >= 0)
+    close(pdb->sock);
 
   /* release DB handler */
-  pthread_mutex_unlock(&pdb->mutex);
   pthread_mutex_destroy(&pdb->mutex);
   sfree(pdb);
   return ovs_db_ret;
@@ -1281,10 +1296,15 @@ ovs_db_destroy(ovs_db_t *pdb)
  * Public OVS utils API implementation
  */
 
-/* Get YAJL value by key from YAJL dictionary */
-yajl_val
-ovs_utils_get_value_by_key(yajl_val jval, const char *key)
-{
+/* Get YAJL value by key from YAJL dictionary
+ *
+ * EXAMPLE:
+ *  {
+ *    "key_a" : <YAJL return value>
+ *    "key_b" : <YAJL return value>
+ *  }
+ */
+yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
   const char *obj_key = NULL;
 
   /* check params */
@@ -1292,7 +1312,7 @@ ovs_utils_get_value_by_key(yajl_val jval, const char *key)
     return NULL;
 
   /* find a value by key */
-  for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
+  for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
     if (strcmp(obj_key, key) == 0)
       return YAJL_GET_OBJECT(jval)->values[i];
@@ -1301,14 +1321,35 @@ ovs_utils_get_value_by_key(yajl_val jval, const char *key)
   return NULL;
 }
 
-/* Get OVS DB map value by given map key */
-yajl_val
-ovs_utils_get_map_value(yajl_val jval, const char *key)
-{
+/* Get OVS DB map value by given map key
+ *
+ * FROM RFC7047:
+ *
+ *   <pair>
+ *     A 2-element JSON array that represents a pair within a database
+ *     map.  The first element is an <atom> that represents the key, and
+ *     the second element is an <atom> that represents the value.
+ *
+ *   <map>
+ *     A 2-element JSON array that represents a database map value.  The
+ *     first element of the array must be the string "map", and the
+ *     second element must be an array of zero or more <pair>s giving the
+ *     values in the map.  All of the <pair>s must have the same key and
+ *     value types.
+ *
+ * EXAMPLE:
+ *  [
+ *    "map", [
+ *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
+ *           ]
+ *  ]
+ */
+yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
   size_t map_len = 0;
   size_t array_len = 0;
   yajl_val *map_values = NULL;
   yajl_val *array_values = NULL;
+  const char *str_val = NULL;
 
   /* check YAJL array */
   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
@@ -1322,13 +1363,14 @@ ovs_utils_get_map_value(yajl_val jval, const char *key)
     return NULL;
 
   /* check first element of the array */
-  if (strcmp("map", YAJL_GET_STRING(array_values[0])) != 0)
+  str_val = YAJL_GET_STRING(array_values[0]);
+  if (strcmp("map", str_val) != 0)
     return NULL;
 
   /* try to find map value by map key */
   map_len = YAJL_GET_ARRAY(array_values[1])->len;
   map_values = YAJL_GET_ARRAY(array_values[1])->values;
-  for (int i = 0; i < map_len; i++) {
+  for (size_t i = 0; i < map_len; i++) {
     /* check YAJL array */
     if (!YAJL_IS_ARRAY(map_values[i]))
       break;
@@ -1341,7 +1383,8 @@ ovs_utils_get_map_value(yajl_val jval, const char *key)
       break;
 
     /* return map value if given key equals map key */
-    if (strcmp(key, YAJL_GET_STRING(array_values[0])) == 0)
+    str_val = YAJL_GET_STRING(array_values[0]);
+    if (strcmp(key, str_val) == 0)
       return array_values[1];
   }
   return NULL;