*
* Copyright(c) 2016 Intel Corporation. All rights reserved.
*
- * Permission is hereby granted, free of charge, to any person obtaining a copy of
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ *of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
- * of the Software, and to permit persons to whom the Software is furnished to do
+ * of the Software, and to permit persons to whom the Software is furnished to
+ *do
* so, subject to the following conditions:
*
- * The above copyright notice and this permission notice shall be included in all
+ * The above copyright notice and this permission notice shall be included in
+ *all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
*
* Authors:
* Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
- *
+ **/
+
+/* clang-format off */
+/*
* OVS DB API internal architecture diagram
* +------------------------------------------------------------------------------+
* |OVS plugin |OVS utils |
* +-------------------+----------------------------------------------+-----------+
* | TCP/UNIX socket |
* +-------------------------------------------------------------------------------
- *
- **/
+ */
+/* clang-format on */
/* collectd headers */
+#include "collectd.h"
+
#include "common.h"
/* private headers */
#include "utils_ovs.h"
/* system libraries */
-#include <semaphore.h>
+#if HAVE_NETDB_H
+#include <netdb.h>
+#endif
+#if HAVE_ARPA_INET_H
#include <arpa/inet.h>
+#endif
+#if HAVE_POLL_H
#include <poll.h>
+#endif
+#if HAVE_SYS_UN_H
#include <sys/un.h>
+#endif
-#define OVS_ERROR(fmt, ...) do { \
- ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
-#define OVS_DEBUG(fmt, ...) do { \
- DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
- ## __VA_ARGS__); } while (0)
+#include <semaphore.h>
-#define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
-#define OVS_DB_POLL_READ_BLOCK_SIZE 5 /* read block size (bytes) */
-#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
-#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
+#define OVS_ERROR(fmt, ...) \
+ do { \
+ ERROR("ovs_utils: " fmt, ##__VA_ARGS__); \
+ } while (0)
+#define OVS_DEBUG(fmt, ...) \
+ do { \
+ DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__, \
+ ##__VA_ARGS__); \
+ } while (0)
+
+#define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */
+#define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
+#define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
-#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
-#define OVS_DB_EVENT_TERMINATE 1
-#define OVS_DB_EVENT_CONNECTED 2
+#define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
+#define OVS_DB_EVENT_TERMINATE 1
+#define OVS_DB_EVENT_CONN_ESTABLISHED 2
+#define OVS_DB_EVENT_CONN_TERMINATED 3
-#define OVS_DB_POLL_STATE_RUNNING 1
-#define OVS_DB_POLL_STATE_EXITING 2
+#define OVS_DB_POLL_STATE_RUNNING 1
+#define OVS_DB_POLL_STATE_EXITING 2
-#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
+#define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
-#define OVS_YAJL_CALL(func, ...) \
- do { \
- yajl_gen_ret = yajl_gen_status_ok; \
- if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
- goto yajl_gen_failure; \
+#define OVS_YAJL_CALL(func, ...) \
+ do { \
+ yajl_gen_ret = yajl_gen_status_ok; \
+ if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
+ goto yajl_gen_failure; \
} while (0)
-#define OVS_YAJL_ERROR_BUFFER_SIZE 1024
-#define OVS_ERROR_BUFF_SIZE 512
-#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
+#define OVS_YAJL_ERROR_BUFFER_SIZE 1024
+#define OVS_ERROR_BUFF_SIZE 512
+#define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
/* JSON reader internal data */
struct ovs_json_reader_s {
};
typedef struct ovs_callback_s ovs_callback_t;
-/* Connection declaration */
-struct ovs_conn_s {
- int sock;
- int domain;
- int type;
- int addr_size;
- union {
- struct sockaddr_in s_inet;
- struct sockaddr_un s_unix;
- } addr;
-};
-typedef struct ovs_conn_s ovs_conn_t;
-
/* Event thread data declaration */
struct ovs_event_thread_s {
pthread_t tid;
ovs_poll_thread_t poll_thread;
ovs_event_thread_t event_thread;
pthread_mutex_t mutex;
- ovs_callback_t *cb;
- ovs_conn_t conn;
- ovs_db_init_cb_t init_cb;
+ ovs_callback_t *remote_cb;
+ ovs_db_callback_t cb;
+ char service[OVS_DB_ADDR_SERVICE_SIZE];
+ char node[OVS_DB_ADDR_NODE_SIZE];
+ char unix_path[OVS_DB_ADDR_NODE_SIZE];
+ int sock;
};
-typedef struct ovs_db_s ovs_db_t;
+
+/* Global variables */
+static uint64_t ovs_uid = 0;
+static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Post an event to event thread.
* Possible events are:
* OVS_DB_EVENT_TERMINATE
- * OVS_DB_EVENT_CONNECTED
+ * OVS_DB_EVENT_CONN_ESTABLISHED
+ * OVS_DB_EVENT_CONN_TERMINATED
*/
-static void
-ovs_db_event_post(ovs_db_t *pdb, int event)
-{
+static void ovs_db_event_post(ovs_db_t *pdb, int event) {
pthread_mutex_lock(&pdb->event_thread.mutex);
pdb->event_thread.value = event;
pthread_mutex_unlock(&pdb->event_thread.mutex);
/* Check if POLL thread is still running. Returns
* 1 if running otherwise 0 is returned */
-static inline int
-ovs_db_poll_is_running(ovs_db_t *pdb)
-{
+static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
int state = 0;
pthread_mutex_lock(&pdb->poll_thread.mutex);
state = pdb->poll_thread.state;
pthread_mutex_unlock(&pdb->poll_thread.mutex);
- return (state == OVS_DB_POLL_STATE_RUNNING);
-}
-
-/* Terminate POLL thread */
-static inline void
-ovs_db_poll_terminate(ovs_db_t *pdb)
-{
- pthread_mutex_lock(&pdb->poll_thread.mutex);
- pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
- pthread_mutex_unlock(&pdb->poll_thread.mutex);
+ return state == OVS_DB_POLL_STATE_RUNNING;
}
/* Generate unique identifier (UID). It is used by OVS DB API
* to set "id" field for any OVS DB JSON request. */
-static uint64_t
-ovs_uid_generate()
-{
- struct timespec ts;
- clock_gettime(CLOCK_MONOTONIC, &ts);
- return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
+static uint64_t ovs_uid_generate() {
+ uint64_t new_uid;
+ pthread_mutex_lock(&ovs_uid_mutex);
+ new_uid = ++ovs_uid;
+ pthread_mutex_unlock(&ovs_uid_mutex);
+ return new_uid;
}
/*
*/
/* Add new callback into OVS DB object */
-static void
-ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
-{
+static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
pthread_mutex_lock(&pdb->mutex);
- if (pdb->cb)
- pdb->cb->prev = new_cb;
- new_cb->next = pdb->cb;
+ if (pdb->remote_cb)
+ pdb->remote_cb->prev = new_cb;
+ new_cb->next = pdb->remote_cb;
new_cb->prev = NULL;
- pdb->cb = new_cb;
+ pdb->remote_cb = new_cb;
pthread_mutex_unlock(&pdb->mutex);
}
/* Remove callback from OVS DB object */
-static void
-ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
-{
+static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
+ pthread_mutex_lock(&pdb->mutex);
ovs_callback_t *pre_cb = del_cb->prev;
ovs_callback_t *next_cb = del_cb->next;
- pthread_mutex_lock(&pdb->mutex);
if (next_cb)
next_cb->prev = del_cb->prev;
if (pre_cb)
pre_cb->next = del_cb->next;
else
- pdb->cb = del_cb->next;
+ pdb->remote_cb = del_cb->next;
free(del_cb);
pthread_mutex_unlock(&pdb->mutex);
}
/* Remove all callbacks form OVS DB object */
-static void
-ovs_db_callback_remove_all(ovs_db_t *pdb)
-{
+static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
pthread_mutex_lock(&pdb->mutex);
- for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
- pdb->cb = pdb->cb->next;
+ for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
+ del_cb = pdb->remote_cb) {
+ pdb->remote_cb = del_cb->next;
free(del_cb);
}
- pdb->cb = NULL;
+ pdb->remote_cb = NULL;
pthread_mutex_unlock(&pdb->mutex);
}
/* Get/find callback in OVS DB object by UID. Returns pointer
- * to requested callback otherwise NULL is returned */
-static ovs_callback_t *
-ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
-{
- pthread_mutex_lock(&pdb->mutex);
- for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
- if (cb->uid == uid) {
- pthread_mutex_unlock(&pdb->mutex);
+ * to requested callback otherwise NULL is returned.
+ *
+ * IMPORTANT NOTE:
+ * The OVS DB mutex MUST be locked by the caller
+ * to make sure that returned callback is still valid.
+ */
+static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
+ for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
+ if (cb->uid == uid)
return cb;
- }
- pthread_mutex_unlock(&pdb->mutex);
return NULL;
}
/* Send all requested data to the socket. Returns 0 if
* ALL request data has been sent otherwise negative value
* is returned */
-static int
-ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
ssize_t nbytes = 0;
size_t rem = len;
size_t off = 0;
while (rem > 0) {
- if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
- return (-1);
+ if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
+ return -1;
rem -= (size_t)nbytes;
off += (size_t)nbytes;
}
- return (0);
-}
-
-/* Parse OVS server URL.
- * Format of the URL:
- * "tcp:a.b.c.d:port" - define TCP connection (INET domain)
- * "unix:file" - define UNIX socket file (UNIX domain)
- */
-static int
-ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
-{
- ovs_conn_t tmp_conn;
- char *nexttok = NULL;
- char *in_str = NULL;
- char *saveptr;
- int ret = 0;
-
- /* sanity check */
- if ((surl == NULL) || (strlen(surl) < 1))
- return (-1);
-
- /* parse domain */
- tmp_conn = *conn;
- in_str = sstrdup(surl);
- if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
- if (strcmp("tcp", nexttok) == 0) {
- tmp_conn.domain = AF_INET;
- tmp_conn.type = SOCK_STREAM;
- tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
- } else if (strcmp("unix", nexttok) == 0) {
- tmp_conn.domain = AF_UNIX;
- tmp_conn.type = SOCK_STREAM;
- tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
- } else
- goto failure;
- } else
- goto failure;
-
- /* parse url depending on domain */
- if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
- if (tmp_conn.domain == AF_UNIX) {
- /* <UNIX-NAME> */
- tmp_conn.addr.s_inet.sin_family = AF_UNIX;
- sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
- } else {
- /* <IP:PORT> */
- tmp_conn.addr.s_inet.sin_family = AF_INET;
- ret =
- inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
- if (ret == 1) {
- if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
- tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
- else
- goto failure;
- } else
- goto failure;
- }
- }
-
- /* save result and return success */
- *conn = tmp_conn;
- sfree(in_str);
- return (0);
-
-failure:
- OVS_ERROR("%s() : invalid OVS DB URL provided");
- sfree(in_str);
- return (-1);
+ return 0;
}
/*
* jgen - YAJL generator handle allocated by yajl_gen_alloc()
* string - Null-terminated string
*/
-static inline yajl_gen_status
-ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
-{
- return yajl_gen_string(hander, string, strlen(string));
+static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
+ const char *string) {
+ return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
}
/* Add YAJL value into YAJL generator handle (JSON object)
* jgen - YAJL generator handle allocated by yajl_gen_alloc()
* jval - YAJL value usually returned by yajl_tree_get()
*/
-static yajl_gen_status
-ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
-{
+static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
size_t array_len = 0;
yajl_val *jvalues = NULL;
yajl_val jobj_value = NULL;
const char *obj_key = NULL;
size_t obj_len = 0;
- yajl_gen_status yajl_gen_ret;
+ yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
+
+ if (jval == NULL)
+ return yajl_gen_generation_complete;
if (YAJL_IS_STRING(jval))
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
array_len = YAJL_GET_ARRAY(jval)->len;
jvalues = YAJL_GET_ARRAY(jval)->values;
OVS_YAJL_CALL(yajl_gen_array_open, jgen);
- for (int i = 0; i < array_len; i++)
+ for (size_t i = 0; i < array_len; i++)
OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
OVS_YAJL_CALL(yajl_gen_array_close, jgen);
} else if (YAJL_IS_OBJECT(jval)) {
/* create new object and add all elements into the object */
OVS_YAJL_CALL(yajl_gen_map_open, jgen);
obj_len = YAJL_GET_OBJECT(jval)->len;
- for (int i = 0; i < obj_len; i++) {
+ for (size_t i = 0; i < obj_len; i++) {
obj_key = YAJL_GET_OBJECT(jval)->keys[i];
jobj_value = YAJL_GET_OBJECT(jval)->values[i];
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
* "echo" request to the client, client should generate
* "echo" replay with the same content received in the
* request */
-static int
-ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
yajl_val jparams;
yajl_val jid;
yajl_gen jgen;
yajl_gen_status yajl_gen_ret;
if ((jgen = yajl_gen_alloc(NULL)) == NULL)
- return (-1);
+ return -1;
/* check & get request attributes */
if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
}
/* clean up and return success */
yajl_gen_clear(jgen);
- return (0);
+ return 0;
yajl_gen_failure:
/* release memory */
yajl_gen_clear(jgen);
- return (-1);
+ return -1;
}
/* Get OVS DB registered callback by YAJL val. The YAJL
* value should be YAJL string (UID). Returns NULL if
- * callback hasn't been found.
+ * callback hasn't been found. See also ovs_db_callback_get()
+ * description for addition info.
*/
-static ovs_callback_t *
-ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
-{
+static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
char *endptr = NULL;
const char *suid = NULL;
uint64_t uid;
if (jid && YAJL_IS_STRING(jid)) {
suid = YAJL_GET_STRING(jid);
- uid = (uint64_t) strtoul(suid, &endptr, 16);
+ uid = (uint64_t)strtoul(suid, &endptr, 16);
if (*endptr == '\0' && uid)
return ovs_db_callback_get(pdb, uid);
}
* table update callback is received from the DB
* server. Once registered callback found, it's called
* by this handler. */
-static int
-ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
ovs_callback_t *cb = NULL;
yajl_val jvalue;
yajl_val jparams;
yajl_val jtable_updates;
- yajl_val jtable_update;
- size_t obj_len = 0;
- const char *table_name = NULL;
const char *params_path[] = {"params", NULL};
const char *id_path[] = {"id", NULL};
/* check & get request attributes */
if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
- (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
- goto ovs_failure;
+ (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
+ OVS_ERROR("invalid OVS DB request received");
+ return -1;
+ }
/* check array length: [<json-value>, <table-updates>] */
- if (YAJL_GET_ARRAY(jparams)->len != 2)
- goto ovs_failure;
+ if ((YAJL_GET_ARRAY(jparams) == NULL) ||
+ (YAJL_GET_ARRAY(jparams)->len != 2)) {
+ OVS_ERROR("invalid OVS DB request received");
+ return -1;
+ }
jvalue = YAJL_GET_ARRAY(jparams)->values[0];
jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
- if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
- goto ovs_failure;
+ if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
+ OVS_ERROR("invalid OVS DB request id or table update received");
+ return -1;
+ }
/* find registered callback based on <json-value> */
+ pthread_mutex_lock(&pdb->mutex);
cb = ovs_db_table_callback_get(pdb, jvalue);
- if (cb == NULL || cb->table.call == NULL)
- goto ovs_failure;
+ if (cb == NULL || cb->table.call == NULL) {
+ OVS_ERROR("No OVS DB table update callback found");
+ pthread_mutex_unlock(&pdb->mutex);
+ return -1;
+ }
/* call registered callback */
cb->table.call(jtable_updates);
+ pthread_mutex_unlock(&pdb->mutex);
return 0;
-
-ovs_failure:
- OVS_ERROR("invalid OVS DB table update event");
- return (-1);
}
/* OVS DB result request handler.
* result reply is received from the DB server.
* Once registered callback found, it's called
* by this handler. */
-static int
-ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
-{
+static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
ovs_callback_t *cb = NULL;
yajl_val jresult;
yajl_val jerror;
/* check & get result attributes */
if (!jresult || !jerror || !jid)
- return (-1);
+ return -1;
/* try to find registered callback */
+ pthread_mutex_lock(&pdb->mutex);
cb = ovs_db_table_callback_get(pdb, jid);
if (cb != NULL && cb->result.call != NULL) {
/* call registered callback */
sem_post(&cb->result.sync);
}
- return (0);
+ pthread_mutex_unlock(&pdb->mutex);
+ return 0;
}
/* Handle JSON data (one request) and call
* update callback 'ovs_db_table_update_cb' and
* result callback 'ovs_db_result_cb' is supported.
*/
-static int
-ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
-{
+static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
+ size_t len) {
const char *method = NULL;
char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
const char *method_path[] = {"method", NULL};
/* duplicate the data to make null-terminated string
* required for yajl_tree_parse() */
- if ((sjson = strndup(data, len)) == NULL)
- return (-1);
+ if ((sjson = calloc(1, len + 1)) == NULL)
+ return -1;
- OVS_DEBUG("%s", sjson);
+ sstrncpy(sjson, data, len + 1);
+ OVS_DEBUG("[len=%zu] %s", len, sjson);
/* parse json data */
jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
if (jnode == NULL) {
OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
- return (-1);
+ sfree(sjson);
+ return -1;
}
/* get method name */
- if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
- method = YAJL_GET_STRING(jval);
+ if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
+ if ((method = YAJL_GET_STRING(jval)) == NULL) {
+ yajl_tree_free(jnode);
+ sfree(sjson);
+ return -1;
+ }
if (strcmp("echo", method) == 0) {
/* echo request from the server */
if (ovs_db_table_echo_cb(pdb, jnode) < 0)
if (ovs_db_table_update_cb(pdb, jnode) < 0)
OVS_ERROR("handle update notification failed");
}
- } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
+ } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
/* result notification */
if (ovs_db_result_cb(pdb, jnode) < 0)
OVS_ERROR("handle result reply failed");
- }
+ } else
+ OVS_ERROR("connot find method or result failed");
/* release memory */
yajl_tree_free(jnode);
sfree(sjson);
- return (0);
+ return 0;
}
/*
*/
/* Allocate JSON reader instance */
-static inline ovs_json_reader_t *
-ovs_json_reader_alloc()
-{
+static ovs_json_reader_t *ovs_json_reader_alloc() {
ovs_json_reader_t *jreader = NULL;
if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
}
/* Push raw data into into the JSON reader for processing */
-static inline int
-ovs_json_reader_push_data(ovs_json_reader_t *jreader,
- const char *data, size_t data_len)
-{
+static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
+ const char *data, size_t data_len) {
char *new_buff = NULL;
size_t available = jreader->buff_size - jreader->buff_offset;
/* allocate new chunk of memory */
new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
if (new_buff == NULL)
- return (-1);
+ return -1;
/* point to new allocated memory */
jreader->buff_ptr = new_buff;
/* store input data */
memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
jreader->buff_offset += data_len;
- return (0);
+ return 0;
}
/* Pop one fully-fledged JSON if already exists. Returns 0 if
* completed JSON already exists otherwise negative value is
* returned */
-static inline int
-ovs_json_reader_pop(ovs_json_reader_t *jreader,
- const char **json_ptr, size_t *json_len_ptr)
-{
+static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
+ const char **json_ptr, size_t *json_len_ptr) {
size_t nbraces = 0;
size_t json_len = 0;
char *json = NULL;
/* search open/close brace */
- for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
+ for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
if (jreader->buff_ptr[i] == '{') {
nbraces++;
} else if (jreader->buff_ptr[i] == '}')
*json_ptr = jreader->buff_ptr + jreader->json_offset;
*json_len_ptr = json_len + 1;
jreader->json_offset = i + 1;
- return (0);
+ return 0;
}
/* increase JSON data length */
* and zero rest of the buffer data */
json = &jreader->buff_ptr[jreader->json_offset];
json_len = jreader->buff_offset - jreader->json_offset;
- for (int i = 0; i < jreader->buff_size; i++)
+ for (size_t i = 0; i < jreader->buff_size; i++)
jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
jreader->buff_offset = json_len;
} else
jreader->json_offset = 0;
}
- return (-1);
+ return -1;
}
/* Reset JSON reader. It is useful when start processing
* new raw data. E.g.: in case of lost stream connection.
*/
-static inline void
-ovs_json_reader_reset(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
if (jreader) {
jreader->buff_offset = 0;
jreader->json_offset = 0;
}
/* Release internal data allocated for JSON reader */
-static inline void
-ovs_json_reader_free(ovs_json_reader_t *jreader)
-{
+static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
if (jreader) {
free(jreader->buff_ptr);
free(jreader);
}
}
-/* Reconnect to OVD DB and call init OVS DB callback
- * 'init_cb' if connection has been established.
+/* Reconnect to OVS DB and call the OVS DB post connection init callback
+ * if connection has been established.
*/
-static int
-ovs_db_reconnect(ovs_db_t *pdb)
-{
- char errbuff[OVS_ERROR_BUFF_SIZE];
-
- /* remove all registered OVS DB table/result callbacks */
- ovs_db_callback_remove_all(pdb);
-
- /* open new socket */
- if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
- sstrerror(errno, errbuff, sizeof(errbuff));
- OVS_ERROR("socket(): %s", errbuff);
- return (-1);
+static void ovs_db_reconnect(ovs_db_t *pdb) {
+ const char *node_info = pdb->node;
+ struct addrinfo *result;
+
+ if (pdb->unix_path[0] != '\0') {
+ /* use UNIX socket instead of INET address */
+ node_info = pdb->unix_path;
+ result = calloc(1, sizeof(struct addrinfo));
+ struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
+ if (result == NULL || sa_unix == NULL) {
+ sfree(result);
+ sfree(sa_unix);
+ return;
+ }
+ result->ai_family = AF_UNIX;
+ result->ai_socktype = SOCK_STREAM;
+ result->ai_addrlen = sizeof(*sa_unix);
+ result->ai_addr = (struct sockaddr *)sa_unix;
+ sa_unix->sun_family = result->ai_family;
+ sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
+ } else {
+ /* inet socket address */
+ struct addrinfo hints;
+
+ /* setup criteria for selecting the socket address */
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+
+ /* get socket addresses */
+ int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
+ if (ret != 0) {
+ OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
+ return;
+ }
}
-
- /* try to connect to server */
- if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
- pdb->conn.addr_size) < 0) {
- sstrerror(errno, errbuff, sizeof(errbuff));
- OVS_ERROR("connect(): %s", errbuff);
- close(pdb->conn.sock);
- return (-1);
+ /* try to connect to the server */
+ for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
+ char errbuff[OVS_ERROR_BUFF_SIZE];
+ int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sock < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_DEBUG("socket(): %s", errbuff);
+ continue;
+ }
+ if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
+ close(sock);
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
+ } else {
+ /* send notification to event thread */
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
+ pdb->sock = sock;
+ break;
+ }
}
- /* send notification to event thread */
- ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
- return (0);
+ if (pdb->sock < 0)
+ OVS_ERROR("connect to \"%s\" failed", node_info);
+
+ freeaddrinfo(result);
}
/* POLL worker thread.
* requests/reply/events etc. Also, it reconnects to OVS DB
* if connection has been lost.
*/
-static void *
-ovs_poll_worker(void *arg)
-{
- ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
+static void *ovs_poll_worker(void *arg) {
+ ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
ovs_json_reader_t *jreader = NULL;
- const char *json;
- size_t json_len;
- ssize_t nbytes = 0;
- char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
- struct pollfd poll_fd;
- int poll_ret = 0;
+ struct pollfd poll_fd = {
+ .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
+ };
+ /* create JSON reader instance */
if ((jreader = ovs_json_reader_alloc()) == NULL) {
OVS_ERROR("initialize json reader failed");
- goto thread_exit;
+ return NULL;
}
- /* start polling data */
- poll_fd.fd = pdb->conn.sock;
- poll_fd.events = POLLIN | POLLPRI;
- poll_fd.revents = 0;
-
/* poll data */
while (ovs_db_poll_is_running(pdb)) {
- poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
- if (poll_ret > 0) {
- if (poll_fd.revents & POLLNVAL) {
- /* invalid file descriptor, reconnect */
- if (ovs_db_reconnect(pdb) != 0) {
- /* sleep awhile until next reconnect */
- usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
- }
- ovs_json_reader_reset(jreader);
- poll_fd.fd = pdb->conn.sock;
- } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
- /* connection is broken */
- OVS_ERROR("poll() peer closed its end of the channel");
- close(poll_fd.fd);
- } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
- /* read incoming data */
- nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
- if (nbytes > 0) {
- OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
- ovs_json_reader_push_data(jreader, buff, nbytes);
- while (!ovs_json_reader_pop(jreader, &json, &json_len))
- /* process JSON data */
- ovs_db_json_data_process(pdb, json, json_len);
- } else if (nbytes == 0) {
- OVS_ERROR("recv() peer has performed an orderly shutdown");
- close(poll_fd.fd);
- } else {
- OVS_ERROR("recv() receive data error");
- break;
- }
- } /* poll() POLLIN & POLLPRI */
- } else if (poll_ret == 0)
- OVS_DEBUG("poll() timeout");
- else {
- OVS_ERROR("poll() error");
+ char errbuff[OVS_ERROR_BUFF_SIZE];
+ poll_fd.fd = pdb->sock;
+ int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
+ if (poll_ret < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("poll(): %s", errbuff);
break;
+ } else if (poll_ret == 0) {
+ OVS_DEBUG("poll(): timeout");
+ if (pdb->sock < 0)
+ /* invalid fd, so try to reconnect */
+ ovs_db_reconnect(pdb);
+ continue;
+ }
+ if (poll_fd.revents & POLLNVAL) {
+ /* invalid file descriptor, clean-up */
+ ovs_db_callback_remove_all(pdb);
+ ovs_json_reader_reset(jreader);
+ /* setting poll FD to -1 tells poll() call to ignore this FD.
+ * In that case poll() call will return timeout all the time */
+ pdb->sock = (-1);
+ } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
+ /* connection is broken */
+ close(poll_fd.fd);
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+ OVS_ERROR("poll() peer closed its end of the channel");
+ } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
+ /* read incoming data */
+ char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
+ ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
+ if (nbytes < 0) {
+ sstrerror(errno, errbuff, sizeof(errbuff));
+ OVS_ERROR("recv(): %s", errbuff);
+ /* read error? Try to reconnect */
+ close(poll_fd.fd);
+ continue;
+ } else if (nbytes == 0) {
+ close(poll_fd.fd);
+ ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
+ OVS_ERROR("recv() peer has performed an orderly shutdown");
+ continue;
+ }
+ /* read incoming data */
+ size_t json_len = 0;
+ const char *json = NULL;
+ OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
+ ovs_json_reader_push_data(jreader, buff, nbytes);
+ while (!ovs_json_reader_pop(jreader, &json, &json_len))
+ /* process JSON data */
+ ovs_db_json_data_process(pdb, json, json_len);
}
}
-thread_exit:
OVS_DEBUG("poll thread has been completed");
ovs_json_reader_free(jreader);
- pthread_exit((void *)0);
- return ((void *)0);
+ return NULL;
}
/* EVENT worker thread.
* Perform task based on incoming events. This
* task can be done asynchronously which allows to
- * handle OVD DB callback like 'init_cb'.
+ * handle OVS DB callback like 'init_cb'.
*/
-static void *
-ovs_event_worker(void *arg)
-{
- int ret = 0;
+static void *ovs_event_worker(void *arg) {
ovs_db_t *pdb = (ovs_db_t *)arg;
- struct timespec ts;
while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
/* wait for an event */
+ struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
- ret = pthread_cond_timedwait(&pdb->event_thread.cond,
- &pdb->event_thread.mutex, &ts);
+ int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
+ &pdb->event_thread.mutex, &ts);
if (!ret) {
/* handle the event */
OVS_DEBUG("handle event %d", pdb->event_thread.value);
- if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
- if (pdb->init_cb)
- pdb->init_cb(pdb);
+ switch (pdb->event_thread.value) {
+ case OVS_DB_EVENT_CONN_ESTABLISHED:
+ if (pdb->cb.post_conn_init)
+ pdb->cb.post_conn_init(pdb);
+ break;
+ case OVS_DB_EVENT_CONN_TERMINATED:
+ if (pdb->cb.post_conn_terminate)
+ pdb->cb.post_conn_terminate();
+ break;
+ default:
+ OVS_DEBUG("unknown event received");
+ break;
+ }
} else if (ret == ETIMEDOUT) {
/* wait timeout */
OVS_DEBUG("no event received (timeout)");
}
}
-thread_exit:
OVS_DEBUG("event thread has been completed");
- pthread_exit((void *)0);
- return ((void *)0);
+ return NULL;
+}
+
+/* Initialize EVENT thread */
+static int ovs_db_event_thread_init(ovs_db_t *pdb) {
+ pdb->event_thread.tid = (pthread_t)-1;
+ /* init event thread condition variable */
+ if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
+ return -1;
+ }
+ /* init event thread mutex */
+ if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return -1;
+ }
+ /* Hold the event thread mutex. It ensures that no events
+ * will be lost while thread is still starting. Once event
+ * thread is started and ready to accept events, it will release
+ * the mutex */
+ if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
+ pthread_mutex_destroy(&pdb->event_thread.mutex);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return -1;
+ }
+ /* start event thread */
+ pthread_t tid;
+ if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
+ "utils_ovs:event") != 0) {
+ pthread_mutex_unlock(&pdb->event_thread.mutex);
+ pthread_mutex_destroy(&pdb->event_thread.mutex);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ return -1;
+ }
+ pdb->event_thread.tid = tid;
+ return 0;
}
-/* Stop EVENT thread */
-static int
-ovs_db_event_thread_stop(ovs_db_t *pdb)
-{
+/* Destroy EVENT thread */
+static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
+ if (pdb->event_thread.tid == (pthread_t)-1)
+ /* already destroyed */
+ return 0;
ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
if (pthread_join(pdb->event_thread.tid, NULL) != 0)
- return (-1);
+ return -1;
+ /* Event thread always holds the thread mutex when
+ * performs some task (handles event) and releases it when
+ * while sleeping. Thus, if event thread exits, the mutex
+ * remains locked */
pthread_mutex_unlock(&pdb->event_thread.mutex);
pthread_mutex_destroy(&pdb->event_thread.mutex);
- return (0);
+ pthread_cond_destroy(&pdb->event_thread.cond);
+ pdb->event_thread.tid = (pthread_t)-1;
+ return 0;
+}
+
+/* Initialize POLL thread */
+static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
+ pdb->poll_thread.tid = (pthread_t)-1;
+ /* init event thread mutex */
+ if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
+ return -1;
+ }
+ /* start poll thread */
+ pthread_t tid;
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
+ if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
+ "utils_ovs:poll") != 0) {
+ pthread_mutex_destroy(&pdb->poll_thread.mutex);
+ return -1;
+ }
+ pdb->poll_thread.tid = tid;
+ return 0;
}
-/* Stop POLL thread */
-static int
-ovs_db_poll_thread_stop(ovs_db_t *pdb)
-{
- ovs_db_poll_terminate(pdb);
+/* Destroy POLL thread */
+static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
+ if (pdb->poll_thread.tid == (pthread_t)-1)
+ /* already destroyed */
+ return 0;
+ /* change thread state */
+ pthread_mutex_lock(&pdb->poll_thread.mutex);
+ pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
+ pthread_mutex_unlock(&pdb->poll_thread.mutex);
+ /* join the thread */
if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
- return (-1);
+ return -1;
pthread_mutex_destroy(&pdb->poll_thread.mutex);
- return (0);
+ pdb->poll_thread.tid = (pthread_t)-1;
+ return 0;
}
/*
* Public OVS DB API implementation
*/
-ovs_db_t *
-ovs_db_init(const char *surl, ovs_db_callback_t *cb)
-{
- pthread_mutexattr_t mutex_attr;
- ovs_db_t *pdb = NULL;
+ovs_db_t *ovs_db_init(const char *node, const char *service,
+ const char *unix_path, ovs_db_callback_t *cb) {
+ /* sanity check */
+ if (node == NULL || service == NULL || unix_path == NULL)
+ return NULL;
/* allocate db data & fill it */
- if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
- return (NULL);
+ ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
+ if (pdb == NULL)
+ return NULL;
- /* convert string url to socket addr */
- if (ovs_db_url_parse(surl, &pdb->conn) < 0)
- goto failure;
+ /* store the OVS DB address */
+ sstrncpy(pdb->node, node, sizeof(pdb->node));
+ sstrncpy(pdb->service, service, sizeof(pdb->service));
+ sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
/* setup OVS DB callbacks */
if (cb)
- pdb->init_cb = cb->init_cb;
+ pdb->cb = *cb;
- /* prepare event thread */
- pthread_cond_init(&pdb->event_thread.cond, NULL);
- pthread_mutex_init(&pdb->event_thread.mutex, NULL);
- pthread_mutex_lock(&pdb->event_thread.mutex);
- if (plugin_thread_create(&pdb->event_thread.tid, NULL,
- ovs_event_worker, pdb) != 0) {
- OVS_ERROR("event worker start failed");
- goto failure;
+ /* init OVS DB mutex attributes */
+ pthread_mutexattr_t mutex_attr;
+ if (pthread_mutexattr_init(&mutex_attr)) {
+ OVS_ERROR("OVS DB mutex attribute init failed");
+ sfree(pdb);
+ return NULL;
}
-
- /* prepare polling thread */
- ovs_db_reconnect(pdb);
- pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
- pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
- if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
- ovs_poll_worker, pdb) != 0) {
- OVS_ERROR("pull worker start failed");
- goto failure;
+ /* set OVS DB mutex as recursive */
+ if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
+ OVS_ERROR("Failed to set OVS DB mutex as recursive");
+ pthread_mutexattr_destroy(&mutex_attr);
+ sfree(pdb);
+ return NULL;
}
-
/* init OVS DB mutex */
- if (pthread_mutexattr_init(&mutex_attr) ||
- pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
- pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
+ if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
OVS_ERROR("OVS DB mutex init failed");
- goto failure;
+ pthread_mutexattr_destroy(&mutex_attr);
+ sfree(pdb);
+ return NULL;
}
+ /* destroy mutex attributes */
+ pthread_mutexattr_destroy(&mutex_attr);
- /* return db to the caller */
- return pdb;
+ /* init event thread */
+ if (ovs_db_event_thread_init(pdb) < 0) {
+ ovs_db_destroy(pdb);
+ return NULL;
+ }
-failure:
- if (pdb->conn.sock)
- /* close connection */
- close(pdb->conn.sock);
- if (pdb->event_thread.tid != 0)
- /* stop event thread */
- if (ovs_db_event_thread_stop(pdb) < 0)
- OVS_ERROR("stop event thread failed");
- if (pdb->poll_thread.tid != 0)
- /* stop poll thread */
- if (ovs_db_poll_thread_stop(pdb) < 0)
- OVS_ERROR("stop poll thread failed");
- sfree(pdb);
- return NULL;
+ /* init polling thread */
+ pdb->sock = -1;
+ if (ovs_db_poll_thread_init(pdb) < 0) {
+ ovs_db_destroy(pdb);
+ return NULL;
+ }
+ return pdb;
}
-int
-ovs_db_send_request(ovs_db_t *pdb, const char *method,
- const char *params, ovs_db_result_cb_t cb)
-{
+int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
+ ovs_db_result_cb_t cb) {
int ret = 0;
yajl_gen_status yajl_gen_ret;
yajl_val jparams;
/* sanity check */
if (!pdb || !method || !params)
- return (-1);
+ return -1;
if ((jgen = yajl_gen_alloc(NULL)) == NULL)
- return (-1);
+ return -1;
/* try to parse params */
if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
OVS_ERROR("params is not a JSON string");
yajl_gen_clear(jgen);
- return (-1);
+ return -1;
}
/* generate method field */
/* generate id field */
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
uid = ovs_uid_generate();
- ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
+ snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
OVS_YAJL_CALL(yajl_gen_map_close, jgen);
if (cb) {
/* register result callback */
- if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
+ if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
goto yajl_gen_failure;
/* add new callback to front */
}
/* send the request */
- OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
- &req_len);
+ OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
OVS_DEBUG("%s", req);
if (!ovs_db_data_send(pdb, req, req_len)) {
if (cb) {
return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
}
-int
-ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
- const char **tb_column, ovs_db_table_cb_t update_cb,
- ovs_db_result_cb_t result_cb, unsigned int flags)
-{
+int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
+ const char **tb_column,
+ ovs_db_table_cb_t update_cb,
+ ovs_db_result_cb_t result_cb, unsigned int flags) {
yajl_gen jgen;
yajl_gen_status yajl_gen_ret;
ovs_callback_t *new_cb = NULL;
/* sanity check */
if (pdb == NULL || tb_name == NULL || update_cb == NULL)
- return (-1);
+ return -1;
- if ((jgen = yajl_gen_alloc(NULL)) == NULL)
- return (-1);
+ /* allocate new update callback */
+ if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
+ return -1;
- /* register table update callback */
- if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
- return (-1);
+ /* init YAJL generator */
+ if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
+ sfree(new_cb);
+ return -1;
+ }
/* add new callback to front */
new_cb->table.call = update_cb;
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
/* uid string <json-value> */
- ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
+ snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
/* <monitor-requests> */
return ovs_db_ret;
}
-int
-ovs_db_destroy(ovs_db_t *pdb)
-{
+int ovs_db_destroy(ovs_db_t *pdb) {
int ovs_db_ret = 0;
int ret = 0;
/* sanity check */
if (pdb == NULL)
- return (-1);
+ return -1;
/* try to lock the structure before releasing */
- if (ret = pthread_mutex_lock(&pdb->mutex)) {
- OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
- return (-1);
+ if ((ret = pthread_mutex_lock(&pdb->mutex))) {
+ OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
+ return -1;
}
/* stop poll thread */
- if (ovs_db_event_thread_stop(pdb) < 0) {
- OVS_ERROR("stop poll thread failed");
+ if (ovs_db_event_thread_destroy(pdb) < 0) {
+ OVS_ERROR("destroy poll thread failed");
ovs_db_ret = (-1);
}
/* stop event thread */
- if (ovs_db_poll_thread_stop(pdb) < 0) {
+ if (ovs_db_poll_thread_destroy(pdb) < 0) {
OVS_ERROR("stop event thread failed");
ovs_db_ret = (-1);
}
ovs_db_callback_remove_all(pdb);
/* close connection */
- if (pdb->conn.sock)
- close(pdb->conn.sock);
+ if (pdb->sock >= 0)
+ close(pdb->sock);
/* release DB handler */
pthread_mutex_unlock(&pdb->mutex);
* Public OVS utils API implementation
*/
-/* Get YAJL value by key from YAJL dictionary */
-yajl_val
-ovs_utils_get_value_by_key(yajl_val jval, const char *key)
-{
+/* Get YAJL value by key from YAJL dictionary
+ *
+ * EXAMPLE:
+ * {
+ * "key_a" : <YAJL return value>
+ * "key_b" : <YAJL return value>
+ * }
+ */
+yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
const char *obj_key = NULL;
/* check params */
- if (!YAJL_IS_OBJECT(jval) || !key)
+ if (!YAJL_IS_OBJECT(jval) || (key == NULL))
return NULL;
/* find a value by key */
- for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
+ for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
obj_key = YAJL_GET_OBJECT(jval)->keys[i];
if (strcmp(obj_key, key) == 0)
return YAJL_GET_OBJECT(jval)->values[i];
return NULL;
}
+
+/* Get OVS DB map value by given map key
+ *
+ * FROM RFC7047:
+ *
+ * <pair>
+ * A 2-element JSON array that represents a pair within a database
+ * map. The first element is an <atom> that represents the key, and
+ * the second element is an <atom> that represents the value.
+ *
+ * <map>
+ * A 2-element JSON array that represents a database map value. The
+ * first element of the array must be the string "map", and the
+ * second element must be an array of zero or more <pair>s giving the
+ * values in the map. All of the <pair>s must have the same key and
+ * value types.
+ *
+ * EXAMPLE:
+ * [
+ * "map", [
+ * [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
+ * ]
+ * ]
+ */
+yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
+ size_t map_len = 0;
+ size_t array_len = 0;
+ yajl_val *map_values = NULL;
+ yajl_val *array_values = NULL;
+ const char *str_val = NULL;
+
+ /* check YAJL array */
+ if (!YAJL_IS_ARRAY(jval) || (key == NULL))
+ return NULL;
+
+ /* check a database map value (2-element, first one should be a string */
+ array_len = YAJL_GET_ARRAY(jval)->len;
+ array_values = YAJL_GET_ARRAY(jval)->values;
+ if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
+ (!YAJL_IS_ARRAY(array_values[1])))
+ return NULL;
+
+ /* check first element of the array */
+ str_val = YAJL_GET_STRING(array_values[0]);
+ if (strcmp("map", str_val) != 0)
+ return NULL;
+
+ /* try to find map value by map key */
+ map_len = YAJL_GET_ARRAY(array_values[1])->len;
+ map_values = YAJL_GET_ARRAY(array_values[1])->values;
+ for (size_t i = 0; i < map_len; i++) {
+ /* check YAJL array */
+ if (!YAJL_IS_ARRAY(map_values[i]))
+ break;
+
+ /* check a database pair value (2-element, first one represents a key
+ * and it should be a string in our case */
+ array_len = YAJL_GET_ARRAY(map_values[i])->len;
+ array_values = YAJL_GET_ARRAY(map_values[i])->values;
+ if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
+ break;
+
+ /* return map value if given key equals map key */
+ str_val = YAJL_GET_STRING(array_values[0]);
+ if (strcmp(key, str_val) == 0)
+ return array_values[1];
+ }
+ return NULL;
+}