ovs_events: Address PR comments
authorMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Fri, 28 Oct 2016 10:18:17 +0000 (11:18 +0100)
committerMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Mon, 26 Dec 2016 13:26:05 +0000 (13:26 +0000)
- Change configuration format to suggested one;
- Fix init/destroy API;
- Fix memory leaks;
- Code-clean-up.

Change-Id: I1ff94271b777c69f3d07a66f43dc10d034e71101
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
src/collectd.conf.in
src/collectd.conf.pod
src/ovs_events.c
src/utils_ovs.c
src/utils_ovs.h

index 883a079..e754fa8 100644 (file)
 #</Plugin>
 
 #<Plugin ovs_events>
-#  OvsDbAddress "127.0.0.1" "6640"
+#  Port "6640"
+#  Address "127.0.0.1"
+#  Socket "/var/run/openvswitch/db.sock"
 #  Interfaces "br0" "veth0"
 #  SendNotification false
 #</Plugin>
index 061c4ba..210e107 100644 (file)
@@ -5463,7 +5463,9 @@ notification.
 B<Synopsis:>
 
  <Plugin "ovs_events">
-   OvsDbAddress "127.0.0.1" "6640"
+   Port "6640"
+   Address "127.0.0.1"
+   Socket "/var/run/openvswitch/db.sock"
    Interfaces "br0" "veth0"
    SendNotification false
  </Plugin>
@@ -5472,31 +5474,26 @@ The plugin provides the following configuration options:
 
 =over 4
 
-=item B<OvsDbAddress> I<node> I<service>
+=item B<Address> I<node>
 
-The address of OVS DB server JSON-RPC interface used by the plugin.
-To enable the interface, OVS DB daemon should be running with '--remote=ptcp:'
-or '--remote=punix:' option. See L<ovsdb-server(1)> for more details. The
-address arguments must take one of the following forms:
+The address of OVS DB server JSON-RPC interface used by the plugin. To enable
+the interface, OVS DB daemon should be running with '--remote=ptcp:' option.
+See L<ovsdb-server(1)> for more details. The option may be either network
+hostname, IPv4 numbers-and-dots notation or IPv6 hexadecimal string format.
+Defaults to 'localhost'.
 
-=over 4
-
-=item I<node>
-
-The I<node> argument of the address can be either network hostname, IPv4
-numbers-and-dots notation or IPv6 hexadecimal string format. In case of Unix
-domain socket, the "I<unix:>file" format should be used, where I<file> is
-the full name of OVS DB Unix domain socket.
+=item B<Port> I<service>
 
-=item I<service>
+TCP-port to connect to. Either a service name or a port number may be given.
+Please note that numerical port numbers must be given as a string. Defaults
+to "6640".
 
-The I<service> argument of the address specifies the service name used to
-connect to OVS DB. See L<services(5)> for more details. This argument is
-skipped if Unix domain address is used.
-
-=back
+=item B<Socket> I<path>
 
-Default: C<"localhost" "6640">
+The UNIX domain socket path of OVS DB server JSON-RPC interface used by the
+plugin. To enable the interface, the OVS DB daemon should be running with
+'--remote=punix:' option. See L<ovsdb-server(1)> for more details. If this
+option is set, B<Address> and B<Port> options are ignored.
 
 =item B<Interfaces> [I<ifname> ...]
 
index ce5254d..1f63a06 100644 (file)
@@ -71,6 +71,7 @@ struct ovs_events_config_s {
   _Bool send_notification;                 /* sent notification to collectd? */
   char ovs_db_node[OVS_DB_ADDR_NODE_SIZE]; /* OVS DB node */
   char ovs_db_serv[OVS_DB_ADDR_SERVICE_SIZE]; /* OVS DB service */
+  char ovs_db_unix[OVS_DB_ADDR_UNIX_SIZE];    /* OVS DB unix socket path */
   ovs_events_iface_list_t *ifaces;            /* interface info */
 };
 typedef struct ovs_events_config_s ovs_events_config_t;
@@ -93,6 +94,7 @@ static ovs_events_ctx_t ovs_events_ctx = {
     .config = {.send_notification = 0,     /* do not send notification */
                .ovs_db_node = "localhost", /* use default OVS DB node */
                .ovs_db_serv = "6640",      /* use default OVS DB service */
+               .ovs_db_unix = "",          /* UNIX path empty by default */
                .ifaces = NULL},
     .ovs_db_select_params = NULL,
     .is_db_available = 0,
@@ -137,9 +139,8 @@ static int ovs_events_config_iface_exists(const char *ifname) {
 static char *ovs_events_get_select_params() {
   int ret = 0;
   size_t buff_size = 0;
-  size_t offset = 0;
-  char *buff = NULL;
-  char *new_buff = NULL;
+  size_t buff_off = 0;
+  char *opt_buff = NULL;
   const char params_fmt[] = "[\"Open_vSwitch\"%s]";
   const char option_fmt[] = ",{\"op\":\"select\",\"table\":\"Interface\","
                             "\"where\":[[\"name\",\"==\",\"%s\"]],"
@@ -150,40 +151,40 @@ static char *ovs_events_get_select_params() {
                              "\"external_ids\",\"name\",\"_uuid\"]}";
   /* setup OVS DB interface condition */
   for (ovs_events_iface_list_t *iface = ovs_events_ctx.config.ifaces; iface;
-       iface = iface->next, offset += ret) {
+       iface = iface->next, buff_off += ret) {
     /* allocate new buffer (format size + ifname len is good enough) */
     buff_size += (sizeof(option_fmt) + strlen(iface->name));
-    new_buff = realloc(buff, buff_size);
-    if (new_buff == NULL)
-      goto failure;
-    buff = new_buff;
-    ret = ssnprintf(buff + offset, buff_size, option_fmt, iface->name);
-    if (ret < 0)
-      goto failure;
+    char *new_buff = realloc(opt_buff, buff_size);
+    if (new_buff == NULL) {
+      sfree(opt_buff);
+      return NULL;
+    }
+    opt_buff = new_buff;
+    ret = ssnprintf(opt_buff + buff_off, buff_size - buff_off, option_fmt,
+                    iface->name);
+    if (ret < 0) {
+      sfree(opt_buff);
+      return NULL;
+    }
   }
   /* if no interfaces are configured, use default params */
-  if (buff == NULL) {
-    buff = strdup(default_opt);
-    offset = strlen(default_opt);
-  }
+  if (opt_buff == NULL)
+    opt_buff = strdup(default_opt);
 
   /* allocate memory for OVS DB select params */
-  buff_size = offset + sizeof(params_fmt);
-  new_buff = malloc(buff_size);
-  if (new_buff == NULL)
-    goto failure;
+  size_t params_size = sizeof(params_fmt) + strlen(opt_buff);
+  char *params_buff = malloc(params_size);
+  if (params_buff == NULL) {
+    sfree(opt_buff);
+    return NULL;
+  }
 
   /* create OVS DB select params */
-  if (ssnprintf(new_buff, buff_size, params_fmt, buff) < 0)
-    goto failure;
+  if (ssnprintf(params_buff, params_size, params_fmt, opt_buff) < 0)
+    sfree(params_buff);
 
-  sfree(buff);
-  return new_buff;
-
-failure:
-  sfree(new_buff);
-  sfree(buff);
-  return NULL;
+  sfree(opt_buff);
+  return params_buff;
 }
 
 /* Release memory allocated for configuration data */
@@ -206,31 +207,24 @@ static int ovs_events_plugin_config(oconfig_item_t *ci) {
   for (int i = 0; i < ci->children_num; i++) {
     oconfig_item_t *child = ci->children + i;
     if (strcasecmp("SendNotification", child->key) == 0) {
-      if (cf_util_get_boolean(child, &ovs_events_ctx.config.send_notification) <
-          0)
+      if (cf_util_get_boolean(child,
+                              &ovs_events_ctx.config.send_notification) != 0)
+        OVS_EVENTS_CONFIG_ERROR(child->key);
+    } else if (strcasecmp("Address", child->key) == 0) {
+      if (cf_util_get_string_buffer(
+              child, ovs_events_ctx.config.ovs_db_node,
+              sizeof(ovs_events_ctx.config.ovs_db_node)) != 0)
+        OVS_EVENTS_CONFIG_ERROR(child->key);
+    } else if (strcasecmp("Port", child->key) == 0) {
+      if (cf_util_get_string_buffer(
+              child, ovs_events_ctx.config.ovs_db_serv,
+              sizeof(ovs_events_ctx.config.ovs_db_serv)) != 0)
+        OVS_EVENTS_CONFIG_ERROR(child->key);
+    } else if (strcasecmp("Socket", child->key) == 0) {
+      if (cf_util_get_string_buffer(
+              child, ovs_events_ctx.config.ovs_db_unix,
+              sizeof(ovs_events_ctx.config.ovs_db_unix)) != 0)
         OVS_EVENTS_CONFIG_ERROR(child->key);
-    } else if (strcasecmp("OvsDbAddress", child->key) == 0) {
-      if (child->values_num < 1) {
-        ERROR(OVS_EVENTS_PLUGIN ": invalid OVS DB address specified");
-        goto failure;
-      }
-      /* check node type and get the value */
-      if (child->values[0].type != OCONFIG_TYPE_STRING) {
-        ERROR(OVS_EVENTS_PLUGIN ": OVS DB node is not a string");
-        goto failure;
-      }
-      sstrncpy(ovs_events_ctx.config.ovs_db_node, child->values[0].value.string,
-               sizeof(ovs_events_ctx.config.ovs_db_node));
-      /* get OVS DB address service name (optional) */
-      if (child->values_num > 1) {
-        if (child->values[1].type != OCONFIG_TYPE_STRING) {
-          ERROR(OVS_EVENTS_PLUGIN ": OVS DB service is not a string");
-          goto failure;
-        }
-        sstrncpy(ovs_events_ctx.config.ovs_db_serv,
-                 child->values[1].value.string,
-                 sizeof(ovs_events_ctx.config.ovs_db_serv));
-      }
     } else if (strcasecmp("Interfaces", child->key) == 0) {
       for (int j = 0; j < child->values_num; j++) {
         /* check value type */
@@ -481,9 +475,9 @@ static void ovs_events_table_update_cb(yajl_val jupdates) {
   }
 }
 
-/* OVD DB reply callback. It parses reply, receives
+/* OVS DB reply callback. It parses reply, receives
  * interface information and dispatches the info to
- * collecd
+ * collectd
  */
 static void ovs_events_poll_result_cb(yajl_val jresult, yajl_val jerror) {
   yajl_val *jvalues = NULL;
@@ -544,7 +538,7 @@ static void ovs_events_conn_initialize(ovs_db_t *pdb) {
     }
   }
   OVS_EVENTS_CTX_LOCK { ovs_events_ctx.is_db_available = 1; }
-  DEBUG(OVS_EVENTS_PLUGIN ": OVS DB has been initialized");
+  DEBUG(OVS_EVENTS_PLUGIN ": OVS DB connection has been initialized");
 }
 
 /* OVS DB terminate connection notification callback */
@@ -576,8 +570,9 @@ static int ovs_events_plugin_init(void) {
   ovs_db_callback_t cb = {.post_conn_init = ovs_events_conn_initialize,
                           .post_conn_terminate = ovs_events_conn_terminate};
 
-  DEBUG(OVS_EVENTS_PLUGIN ": OVS DB node = %s, service=%s",
-        ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv);
+  DEBUG(OVS_EVENTS_PLUGIN ": OVS DB address=%s, service=%s, unix=%s",
+        ovs_events_ctx.config.ovs_db_node, ovs_events_ctx.config.ovs_db_serv,
+        ovs_events_ctx.config.ovs_db_unix);
 
   /* generate OVS DB select condition based on list on configured interfaces */
   ovs_events_ctx.ovs_db_select_params = ovs_events_get_select_params();
@@ -588,7 +583,8 @@ static int ovs_events_plugin_init(void) {
 
   /* initialize OVS DB */
   ovs_db = ovs_db_init(ovs_events_ctx.config.ovs_db_node,
-                       ovs_events_ctx.config.ovs_db_serv, &cb);
+                       ovs_events_ctx.config.ovs_db_serv,
+                       ovs_events_ctx.config.ovs_db_unix, &cb);
   if (ovs_db == NULL) {
     ERROR(OVS_EVENTS_PLUGIN ": fail to connect to OVS DB server");
     goto ovs_events_failure;
index 2a4bdf8..2b14849 100644 (file)
 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
-#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
 
 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
 #define OVS_DB_EVENT_TERMINATE 1
@@ -183,9 +182,14 @@ struct ovs_db_s {
   ovs_db_callback_t cb;
   char service[OVS_DB_ADDR_SERVICE_SIZE];
   char node[OVS_DB_ADDR_NODE_SIZE];
+  char unix_path[OVS_DB_ADDR_NODE_SIZE];
   int sock;
 };
 
+/* Global variables */
+static uint64_t ovs_uid = 0;
+static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 /* Post an event to event thread.
  * Possible events are:
  *  OVS_DB_EVENT_TERMINATE
@@ -209,19 +213,14 @@ static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
   return (state == OVS_DB_POLL_STATE_RUNNING);
 }
 
-/* Terminate POLL thread */
-static void ovs_db_poll_terminate(ovs_db_t *pdb) {
-  pthread_mutex_lock(&pdb->poll_thread.mutex);
-  pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
-  pthread_mutex_unlock(&pdb->poll_thread.mutex);
-}
-
 /* Generate unique identifier (UID). It is used by OVS DB API
  * to set "id" field for any OVS DB JSON request. */
 static uint64_t ovs_uid_generate() {
-  struct timespec ts;
-  clock_gettime(CLOCK_MONOTONIC, &ts);
-  return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+  uint64_t new_uid;
+  pthread_mutex_lock(&ovs_uid_mutex);
+  new_uid = ++ovs_uid;
+  pthread_mutex_unlock(&ovs_uid_mutex);
+  return new_uid;
 }
 
 /*
@@ -242,10 +241,10 @@ static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
 
 /* Remove callback from OVS DB object */
 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
+  pthread_mutex_lock(&pdb->mutex);
   ovs_callback_t *pre_cb = del_cb->prev;
   ovs_callback_t *next_cb = del_cb->next;
 
-  pthread_mutex_lock(&pdb->mutex);
   if (next_cb)
     next_cb->prev = del_cb->prev;
 
@@ -274,7 +273,7 @@ static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
  * to requested callback otherwise NULL is returned.
  *
  * IMPORTANT NOTE:
- *   The OVS DB mutex should be locked by the caller
+ *   The OVS DB mutex MUST be locked by the caller
  *   to make sure that returned callback is still valid.
  */
 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
@@ -468,34 +467,38 @@ static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
 
   /* check & get request attributes */
   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
-      (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
-    goto ovs_failure;
+      (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
+    OVS_ERROR("invalid OVS DB request received");
+    return (-1);
+  }
 
   /* check array length: [<json-value>, <table-updates>] */
-  if (YAJL_GET_ARRAY(jparams)->len != 2)
-    goto ovs_failure;
+  if ((YAJL_GET_ARRAY(jparams) == NULL) ||
+      (YAJL_GET_ARRAY(jparams)->len != 2)) {
+    OVS_ERROR("invalid OVS DB request received");
+    return (-1);
+  }
 
   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
-  if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
-    goto ovs_failure;
+  if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
+    OVS_ERROR("invalid OVS DB request id or table update received");
+    return (-1);
+  }
 
   /* find registered callback based on <json-value> */
   pthread_mutex_lock(&pdb->mutex);
   cb = ovs_db_table_callback_get(pdb, jvalue);
   if (cb == NULL || cb->table.call == NULL) {
+    OVS_ERROR("No OVS DB table update callback found");
     pthread_mutex_unlock(&pdb->mutex);
-    goto ovs_failure;
+    return (-1);
   }
 
   /* call registered callback */
   cb->table.call(jtable_updates);
   pthread_mutex_unlock(&pdb->mutex);
   return 0;
-
-ovs_failure:
-  OVS_ERROR("invalid OVS DB table update event");
-  return (-1);
 }
 
 /* OVS DB result request handler.
@@ -550,7 +553,7 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
 
   /* duplicate the data to make null-terminated string
    * required for yajl_tree_parse() */
-  if ((sjson = malloc(len + 1)) == NULL)
+  if ((sjson = calloc(1, len + 1)) == NULL)
     return (-1);
 
   sstrncpy(sjson, data, len + 1);
@@ -700,39 +703,33 @@ static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
   }
 }
 
-/* Reconnect to OVD DB and call the OVS DB post connection init callback
+/* Reconnect to OVS DB and call the OVS DB post connection init callback
  * if connection has been established.
  */
-static int ovs_db_reconnect(ovs_db_t *pdb) {
-  char errbuff[OVS_ERROR_BUFF_SIZE];
+static void ovs_db_reconnect(ovs_db_t *pdb) {
   const char unix_prefix[] = "unix:";
-  struct addrinfo *result, *rp;
-  _Bool is_connected = 0;
+  const char *node_info = pdb->node;
+  struct addrinfo *result;
   struct sockaddr_un saunix;
 
-  /* remove all registered OVS DB table/result callbacks */
-  ovs_db_callback_remove_all(pdb);
-
-  if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) {
-    /* create unix socket address */
-    rp = calloc(1, sizeof(struct addrinfo));
+  if (pdb->unix_path[0] != '\0') {
+    /* use UNIX socket instead of INET address */
+    node_info = pdb->unix_path;
+    result = calloc(1, sizeof(struct addrinfo));
     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
-    if (rp == NULL || sa_unix == NULL) {
-      sfree(rp);
+    if (result == NULL || sa_unix == NULL) {
+      sfree(result);
       sfree(sa_unix);
-      return (1);
+      return;
     }
-    rp->ai_family = AF_UNIX;
-    rp->ai_socktype = SOCK_STREAM;
-    rp->ai_addrlen = sizeof(*sa_unix);
-    rp->ai_addr = (struct sockaddr *)sa_unix;
-    sa_unix->sun_family = rp->ai_family;
-    sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)),
-             sizeof(sa_unix->sun_path));
-    result = rp;
+    result->ai_family = AF_UNIX;
+    result->ai_socktype = SOCK_STREAM;
+    result->ai_addrlen = sizeof(*sa_unix);
+    result->ai_addr = (struct sockaddr *)sa_unix;
+    sa_unix->sun_family = result->ai_family;
+    sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
   } else {
-    /* intet socket address */
-    int ret = 0;
+    /* inet socket address */
     struct addrinfo hints;
 
     /* setup criteria for selecting the socket address */
@@ -741,37 +738,37 @@ static int ovs_db_reconnect(ovs_db_t *pdb) {
     hints.ai_socktype = SOCK_STREAM;
 
     /* get socket addresses */
-    if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) {
+    int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
+    if (ret != 0) {
       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
-      return (1);
+      return;
     }
   }
   /* try to connect to the server */
-  for (rp = result; rp != NULL; rp = rp->ai_next) {
-    if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) <
-        0) {
+  for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
+    char errbuff[OVS_ERROR_BUFF_SIZE];
+    int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+    if (sock < 0) {
       sstrerror(errno, errbuff, sizeof(errbuff));
       OVS_DEBUG("socket(): %s", errbuff);
       continue;
     }
-    if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+    if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+      close(sock);
       sstrerror(errno, errbuff, sizeof(errbuff));
       OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
-      close(pdb->sock);
     } else {
-      is_connected = 1;
+      /* send notification to event thread */
+      ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
+      pdb->sock = sock;
       break;
     }
   }
 
-  /* send notification to event thread */
-  if (is_connected)
-    ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
-  else
-    OVS_ERROR("connect to \"%s\" failed", pdb->node);
+  if (pdb->sock < 0)
+    OVS_ERROR("connect to \"%s\" failed", node_info);
 
   freeaddrinfo(result);
-  return !is_connected;
 }
 
 /* POLL worker thread.
@@ -782,89 +779,91 @@ static int ovs_db_reconnect(ovs_db_t *pdb) {
 static void *ovs_poll_worker(void *arg) {
   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
   ovs_json_reader_t *jreader = NULL;
-  const char *json;
-  size_t json_len;
-  ssize_t nbytes = 0;
-  char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
-  struct pollfd poll_fd;
-  int poll_ret = 0;
+  struct pollfd poll_fd = {
+      .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
+  };
 
+  /* create JSON reader instance */
   if ((jreader = ovs_json_reader_alloc()) == NULL) {
     OVS_ERROR("initialize json reader failed");
-    goto thread_exit;
+    return (NULL);
   }
 
-  /* start polling data */
-  poll_fd.fd = pdb->sock;
-  poll_fd.events = POLLIN | POLLPRI;
-  poll_fd.revents = 0;
-
   /* poll data */
   while (ovs_db_poll_is_running(pdb)) {
-    poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
-    if (poll_ret > 0) {
-      if (poll_fd.revents & POLLNVAL) {
-        /* invalid file descriptor, reconnect */
-        if (ovs_db_reconnect(pdb) != 0) {
-          /* sleep awhile until next reconnect */
-          usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
-        }
-        ovs_json_reader_reset(jreader);
-        poll_fd.fd = pdb->sock;
-      } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
-        /* connection is broken */
-        OVS_ERROR("poll() peer closed its end of the channel");
-        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
-        close(poll_fd.fd);
-      } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
-        /* read incoming data */
-        nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
-        if (nbytes > 0) {
-          OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
-          ovs_json_reader_push_data(jreader, buff, nbytes);
-          while (!ovs_json_reader_pop(jreader, &json, &json_len))
-            /* process JSON data */
-            ovs_db_json_data_process(pdb, json, json_len);
-        } else if (nbytes == 0) {
-          OVS_ERROR("recv() peer has performed an orderly shutdown");
-          ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
-          close(poll_fd.fd);
-        } else {
-          OVS_ERROR("recv() receive data error");
-          break;
-        }
-      } /* poll() POLLIN & POLLPRI */
-    } else if (poll_ret == 0)
-      OVS_DEBUG("poll() timeout");
-    else {
-      OVS_ERROR("poll() error");
+    char errbuff[OVS_ERROR_BUFF_SIZE];
+    poll_fd.fd = pdb->sock;
+    int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+    if (poll_ret < 0) {
+      sstrerror(errno, errbuff, sizeof(errbuff));
+      OVS_ERROR("poll(): %s", errbuff);
       break;
+    } else if (poll_ret == 0) {
+      OVS_DEBUG("poll(): timeout");
+      if (pdb->sock < 0)
+        /* invalid fd, so try to reconnect */
+        ovs_db_reconnect(pdb);
+      continue;
+    }
+    if (poll_fd.revents & POLLNVAL) {
+      /* invalid file descriptor, clean-up */
+      ovs_db_callback_remove_all(pdb);
+      ovs_json_reader_reset(jreader);
+      /* setting poll FD to -1 tells poll() call to ignore this FD.
+       * In that case poll() call will return timeout all the time */
+      pdb->sock = (-1);
+    } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+      /* connection is broken */
+      close(poll_fd.fd);
+      ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+      OVS_ERROR("poll() peer closed its end of the channel");
+    } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+      /* read incoming data */
+      char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+      ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
+      if (nbytes < 0) {
+        sstrerror(errno, errbuff, sizeof(errbuff));
+        OVS_ERROR("recv(): %s", errbuff);
+        /* read error? Try to reconnect */
+        close(poll_fd.fd);
+        continue;
+      } else if (nbytes == 0) {
+        close(poll_fd.fd);
+        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+        OVS_ERROR("recv() peer has performed an orderly shutdown");
+        continue;
+      }
+      /* read incoming data */
+      size_t json_len = 0;
+      const char *json = NULL;
+      OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
+      ovs_json_reader_push_data(jreader, buff, nbytes);
+      while (!ovs_json_reader_pop(jreader, &json, &json_len))
+        /* process JSON data */
+        ovs_db_json_data_process(pdb, json, json_len);
     }
   }
 
-thread_exit:
   OVS_DEBUG("poll thread has been completed");
   ovs_json_reader_free(jreader);
-  pthread_exit((void *)0);
-  return ((void *)0);
+  return (NULL);
 }
 
 /* EVENT worker thread.
  * Perform task based on incoming events. This
  * task can be done asynchronously which allows to
- * handle OVD DB callback like 'init_cb'.
+ * handle OVS DB callback like 'init_cb'.
  */
 static void *ovs_event_worker(void *arg) {
-  int ret = 0;
   ovs_db_t *pdb = (ovs_db_t *)arg;
-  struct timespec ts;
 
   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
     /* wait for an event */
+    struct timespec ts;
     clock_gettime(CLOCK_REALTIME, &ts);
     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
-    ret = pthread_cond_timedwait(&pdb->event_thread.cond,
-                                 &pdb->event_thread.mutex, &ts);
+    int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+                                     &pdb->event_thread.mutex, &ts);
     if (!ret) {
       /* handle the event */
       OVS_DEBUG("handle event %d", pdb->event_thread.value);
@@ -892,28 +891,94 @@ static void *ovs_event_worker(void *arg) {
     }
   }
 
-thread_exit:
   OVS_DEBUG("event thread has been completed");
-  pthread_exit((void *)0);
-  return ((void *)0);
+  return (NULL);
+}
+
+/* Initialize EVENT thread */
+static int ovs_db_event_thread_init(ovs_db_t *pdb) {
+  pdb->event_thread.tid = -1;
+  /* init event thread condition variable */
+  if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
+    return (-1);
+  }
+  /* init event thread mutex */
+  if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return (-1);
+  }
+  /* Hold the event thread mutex. It ensures that no events
+   * will be lost while thread is still starting. Once event
+   * thread is started and ready to accept events, it will release
+   * the mutex */
+  if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
+    pthread_mutex_destroy(&pdb->event_thread.mutex);
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return (-1);
+  }
+  /* start event thread */
+  pthread_t tid;
+  if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb) != 0) {
+    pthread_mutex_unlock(&pdb->event_thread.mutex);
+    pthread_mutex_destroy(&pdb->event_thread.mutex);
+    pthread_cond_destroy(&pdb->event_thread.cond);
+    return (-1);
+  }
+  pdb->event_thread.tid = tid;
+  return (0);
 }
 
-/* Stop EVENT thread */
-static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
+/* Destroy EVENT thread */
+static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
+  if (pdb->event_thread.tid < 0)
+    /* already destroyed */
+    return (0);
   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
     return (-1);
+  /* Event thread always holds the thread mutex when
+   * performs some task (handles event) and releases it when
+   * while sleeping. Thus, if event thread exits, the mutex
+   * remains locked */
   pthread_mutex_unlock(&pdb->event_thread.mutex);
   pthread_mutex_destroy(&pdb->event_thread.mutex);
+  pthread_cond_destroy(&pdb->event_thread.cond);
+  pdb->event_thread.tid = -1;
+  return (0);
+}
+
+/* Initialize POLL thread */
+static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
+  pdb->poll_thread.tid = -1;
+  /* init event thread mutex */
+  if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
+    return (-1);
+  }
+  /* start poll thread */
+  pthread_t tid;
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+  if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb) != 0) {
+    pthread_mutex_destroy(&pdb->poll_thread.mutex);
+    return (-1);
+  }
+  pdb->poll_thread.tid = tid;
   return (0);
 }
 
-/* Stop POLL thread */
-static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
-  ovs_db_poll_terminate(pdb);
+/* Destroy POLL thread */
+static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
+  if (pdb->poll_thread.tid < 0)
+    /* already destroyed */
+    return (0);
+  /* change thread state */
+  pthread_mutex_lock(&pdb->poll_thread.mutex);
+  pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+  pthread_mutex_unlock(&pdb->poll_thread.mutex);
+  /* join the thread */
   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
     return (-1);
   pthread_mutex_destroy(&pdb->poll_thread.mutex);
+  pdb->poll_thread.tid = -1;
   return (0);
 }
 
@@ -922,72 +987,62 @@ static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
  */
 
 ovs_db_t *ovs_db_init(const char *node, const char *service,
-                      ovs_db_callback_t *cb) {
-  pthread_mutexattr_t mutex_attr;
-  ovs_db_t *pdb = NULL;
-
-  /* allocate db data & fill it */
-  if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
+                      const char *unix_path, ovs_db_callback_t *cb) {
+  /* sanity check */
+  if (node == NULL || service == NULL || unix_path == NULL)
     return (NULL);
 
-  /* node cannot be unset */
-  if (node == NULL || strlen(node) == 0)
+  /* allocate db data & fill it */
+  ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
+  if (pdb == NULL)
     return (NULL);
 
   /* store the OVS DB address */
   sstrncpy(pdb->node, node, sizeof(pdb->node));
-  if (service != NULL)
-    sstrncpy(pdb->service, service, sizeof(pdb->service));
+  sstrncpy(pdb->service, service, sizeof(pdb->service));
+  sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
 
   /* setup OVS DB callbacks */
   if (cb)
     pdb->cb = *cb;
 
-  /* prepare event thread */
-  pthread_cond_init(&pdb->event_thread.cond, NULL);
-  pthread_mutex_init(&pdb->event_thread.mutex, NULL);
-  pthread_mutex_lock(&pdb->event_thread.mutex);
-  if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
-                           pdb) != 0) {
-    OVS_ERROR("event worker start failed");
-    goto failure;
+  /* init OVS DB mutex attributes */
+  pthread_mutexattr_t mutex_attr;
+  if (pthread_mutexattr_init(&mutex_attr)) {
+    OVS_ERROR("OVS DB mutex attribute init failed");
+    sfree(pdb);
+    return (NULL);
   }
-
-  /* prepare polling thread */
-  ovs_db_reconnect(pdb);
-  pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
-  pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
-  if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
-      0) {
-    OVS_ERROR("pull worker start failed");
-    goto failure;
+  /* set OVS DB mutex as recursive */
+  if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
+    OVS_ERROR("Failed to set OVS DB mutex as recursive");
+    pthread_mutexattr_destroy(&mutex_attr);
+    sfree(pdb);
+    return (NULL);
   }
-
   /* init OVS DB mutex */
-  if (pthread_mutexattr_init(&mutex_attr) ||
-      pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
-      pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+  if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
     OVS_ERROR("OVS DB mutex init failed");
-    goto failure;
+    pthread_mutexattr_destroy(&mutex_attr);
+    sfree(pdb);
+    return (NULL);
   }
+  /* destroy mutex attributes */
+  pthread_mutexattr_destroy(&mutex_attr);
 
-  /* return db to the caller */
-  return pdb;
+  /* init event thread */
+  if (ovs_db_event_thread_init(pdb) < 0) {
+    ovs_db_destroy(pdb);
+    return (NULL);
+  }
 
-failure:
-  if (pdb->sock)
-    /* close connection */
-    close(pdb->sock);
-  if (pdb->event_thread.tid != 0)
-    /* stop event thread */
-    if (ovs_db_event_thread_stop(pdb) < 0)
-      OVS_ERROR("stop event thread failed");
-  if (pdb->poll_thread.tid != 0)
-    /* stop poll thread */
-    if (ovs_db_poll_thread_stop(pdb) < 0)
-      OVS_ERROR("stop poll thread failed");
-  sfree(pdb);
-  return NULL;
+  /* init polling thread */
+  pdb->sock = -1;
+  if (ovs_db_poll_thread_init(pdb) < 0) {
+    ovs_db_destroy(pdb);
+    return (NULL);
+  }
+  return pdb;
 }
 
 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
@@ -1038,7 +1093,7 @@ int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
 
   if (cb) {
     /* register result callback */
-    if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+    if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
       goto yajl_gen_failure;
 
     /* add new callback to front */
@@ -1095,12 +1150,15 @@ int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
     return (-1);
 
-  if ((jgen = yajl_gen_alloc(NULL)) == NULL)
+  /* allocate new update callback */
+  if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
     return (-1);
 
-  /* register table update callback */
-  if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+  /* init YAJL generator */
+  if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
+    sfree(new_cb);
     return (-1);
+  }
 
   /* add new callback to front */
   new_cb->table.call = update_cb;
@@ -1187,18 +1245,18 @@ int ovs_db_destroy(ovs_db_t *pdb) {
 
   /* try to lock the structure before releasing */
   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
-    OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
+    OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
     return (-1);
   }
 
   /* stop poll thread */
-  if (ovs_db_event_thread_stop(pdb) < 0) {
-    OVS_ERROR("stop poll thread failed");
+  if (ovs_db_event_thread_destroy(pdb) < 0) {
+    OVS_ERROR("destroy poll thread failed");
     ovs_db_ret = (-1);
   }
 
   /* stop event thread */
-  if (ovs_db_poll_thread_stop(pdb) < 0) {
+  if (ovs_db_poll_thread_destroy(pdb) < 0) {
     OVS_ERROR("stop event thread failed");
     ovs_db_ret = (-1);
   }
@@ -1207,7 +1265,7 @@ int ovs_db_destroy(ovs_db_t *pdb) {
   ovs_db_callback_remove_all(pdb);
 
   /* close connection */
-  if (pdb->sock)
+  if (pdb->sock >= 0)
     close(pdb->sock);
 
   /* release DB handler */
@@ -1221,7 +1279,14 @@ int ovs_db_destroy(ovs_db_t *pdb) {
  * Public OVS utils API implementation
  */
 
-/* Get YAJL value by key from YAJL dictionary */
+/* Get YAJL value by key from YAJL dictionary
+ *
+ * EXAMPLE:
+ *  {
+ *    "key_a" : <YAJL return value>
+ *    "key_b" : <YAJL return value>
+ *  }
+ */
 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
   const char *obj_key = NULL;
 
@@ -1239,7 +1304,29 @@ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
   return NULL;
 }
 
-/* Get OVS DB map value by given map key */
+/* Get OVS DB map value by given map key
+ *
+ * FROM RFC7047:
+ *
+ *   <pair>
+ *     A 2-element JSON array that represents a pair within a database
+ *     map.  The first element is an <atom> that represents the key, and
+ *     the second element is an <atom> that represents the value.
+ *
+ *   <map>
+ *     A 2-element JSON array that represents a database map value.  The
+ *     first element of the array must be the string "map", and the
+ *     second element must be an array of zero or more <pair>s giving the
+ *     values in the map.  All of the <pair>s must have the same key and
+ *     value types.
+ *
+ * EXAMPLE:
+ *  [
+ *    "map", [
+ *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
+ *           ]
+ *  ]
+ */
 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
   size_t map_len = 0;
   size_t array_len = 0;
index e90bda3..52c2f91 100644 (file)
@@ -86,7 +86,7 @@ struct ovs_db_callback_s {
    */
   void (*post_conn_init)(ovs_db_t *pdb);
   /*
-   * This callback is called when OVD DB connection
+   * This callback is called when OVS DB connection
    * has been lost. This field can be NULL.
    */
   void (*post_conn_terminate)(void);
@@ -96,6 +96,7 @@ typedef struct ovs_db_callback_s ovs_db_callback_t;
 /* OVS DB defines */
 #define OVS_DB_ADDR_NODE_SIZE 256
 #define OVS_DB_ADDR_SERVICE_SIZE 128
+#define OVS_DB_ADDR_UNIX_SIZE 108
 
 /* OVS DB prototypes */
 
@@ -110,13 +111,14 @@ typedef struct ovs_db_callback_s ovs_db_callback_t;
  * PARAMETERS
  *   `node'        OVS DB Address.
  *   `service'     OVS DB service name.
+ *   `unix'        OVS DB unix socket path.
  *   `cb'          OVS DB callbacks.
  *
  * RETURN VALUE
  *   New ovs_db_t object upon success or NULL if an error occurred.
  */
 ovs_db_t *ovs_db_init(const char *node, const char *service,
-                      ovs_db_callback_t *cb);
+                      const char *unix_path, ovs_db_callback_t *cb);
 
 /*
  * NAME