65e667950a04452e9234003ba2c6bba56a88b05f
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  **/
27
28 /* clang-format off */
29 /*
30  *                         OVS DB API internal architecture diagram
31  * +------------------------------------------------------------------------------+
32  * |OVS plugin      |OVS utils                                                    |
33  * |                |     +------------------------+                              |
34  * |                |     |      echo handler      |                JSON request/ |
35  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36  * |                |  |  |                        |    |         | result        |
37  * |                |  |  +------------------------+    |         |               |
38  * |                |  |                                |    +----+---+--------+  |
39  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
40  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
41  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
42  * |  +----------+  |  |  |                        |    |    |        |        |  |
43  * |                |  |  +------------------------+    |    +--------+---+----+  |
44  * |                |  |                                |                 ^       |
45  * |  +----------+  |  |  +------------------------+    |                 |       |
46  * |  |  result  |  |  |  |     result handler     |    |                 |       |
47  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
48  * |  +----------+  |  |  |                        |               data   |       |
49  * |                |  |  +------------------------+                      |       |
50  * |                |  |                                                  |       |
51  * |                |  |    +------------------+             +------------+----+  |
52  * |  +----------+  |  |    |thread|           |             |thread|          |  |
53  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
54  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
55  * |  +----------+  |  |    +------------------+             +--------+--------+  |
56  * |                |  |                                              ^           |
57  * +----------------+-------------------------------------------------------------+
58  *                     |                                              |
59  *                 JSON|echo reply                                 raw|data
60  *                     v                                              v
61  * +-------------------+----------------------------------------------+-----------+
62  * |                                 TCP/UNIX socket                              |
63  * +-------------------------------------------------------------------------------
64  */
65 /* clang-format on */
66
67 /* collectd headers */
68 #include "collectd.h"
69
70 #include "common.h"
71
72 /* private headers */
73 #include "utils_ovs.h"
74
75 /* system libraries */
76 #if HAVE_NETDB_H
77 #include <netdb.h>
78 #endif
79 #if HAVE_ARPA_INET_H
80 #include <arpa/inet.h>
81 #endif
82 #if HAVE_POLL_H
83 #include <poll.h>
84 #endif
85 #if HAVE_SYS_UN_H
86 #include <sys/un.h>
87 #endif
88
89 #include <semaphore.h>
90
91 #define OVS_ERROR(fmt, ...)                                                    \
92   do {                                                                         \
93     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
94   } while (0)
95 #define OVS_DEBUG(fmt, ...)                                                    \
96   do {                                                                         \
97     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
98           ##__VA_ARGS__);                                                      \
99   } while (0)
100
101 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
102 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
103 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
104
105 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
106 #define OVS_DB_EVENT_TERMINATE 1
107 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
108 #define OVS_DB_EVENT_CONN_TERMINATED 3
109
110 #define OVS_DB_POLL_STATE_RUNNING 1
111 #define OVS_DB_POLL_STATE_EXITING 2
112
113 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
114
115 #define OVS_YAJL_CALL(func, ...)                                               \
116   do {                                                                         \
117     yajl_gen_ret = yajl_gen_status_ok;                                         \
118     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
119       goto yajl_gen_failure;                                                   \
120   } while (0)
121 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
122 #define OVS_ERROR_BUFF_SIZE 512
123 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
124
125 /* JSON reader internal data */
126 struct ovs_json_reader_s {
127   char *buff_ptr;
128   size_t buff_size;
129   size_t buff_offset;
130   size_t json_offset;
131 };
132 typedef struct ovs_json_reader_s ovs_json_reader_t;
133
134 /* Result callback declaration */
135 struct ovs_result_cb_s {
136   sem_t sync;
137   ovs_db_result_cb_t call;
138 };
139 typedef struct ovs_result_cb_s ovs_result_cb_t;
140
141 /* Table callback declaration */
142 struct ovs_table_cb_s {
143   ovs_db_table_cb_t call;
144 };
145 typedef struct ovs_table_cb_s ovs_table_cb_t;
146
147 /* Callback declaration */
148 struct ovs_callback_s {
149   uint64_t uid;
150   union {
151     ovs_result_cb_t result;
152     ovs_table_cb_t table;
153   };
154   struct ovs_callback_s *next;
155   struct ovs_callback_s *prev;
156 };
157 typedef struct ovs_callback_s ovs_callback_t;
158
159 /* Event thread data declaration */
160 struct ovs_event_thread_s {
161   pthread_t tid;
162   pthread_mutex_t mutex;
163   pthread_cond_t cond;
164   int value;
165 };
166 typedef struct ovs_event_thread_s ovs_event_thread_t;
167
168 /* Poll thread data declaration */
169 struct ovs_poll_thread_s {
170   pthread_t tid;
171   pthread_mutex_t mutex;
172   int state;
173 };
174 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
175
176 /* OVS DB internal data declaration */
177 struct ovs_db_s {
178   ovs_poll_thread_t poll_thread;
179   ovs_event_thread_t event_thread;
180   pthread_mutex_t mutex;
181   ovs_callback_t *remote_cb;
182   ovs_db_callback_t cb;
183   char service[OVS_DB_ADDR_SERVICE_SIZE];
184   char node[OVS_DB_ADDR_NODE_SIZE];
185   char unix_path[OVS_DB_ADDR_NODE_SIZE];
186   int sock;
187 };
188
189 /* Global variables */
190 static uint64_t ovs_uid = 0;
191 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
192
193 /* Post an event to event thread.
194  * Possible events are:
195  *  OVS_DB_EVENT_TERMINATE
196  *  OVS_DB_EVENT_CONN_ESTABLISHED
197  *  OVS_DB_EVENT_CONN_TERMINATED
198  */
199 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
200   pthread_mutex_lock(&pdb->event_thread.mutex);
201   pdb->event_thread.value = event;
202   pthread_mutex_unlock(&pdb->event_thread.mutex);
203   pthread_cond_signal(&pdb->event_thread.cond);
204 }
205
206 /* Check if POLL thread is still running. Returns
207  * 1 if running otherwise 0 is returned */
208 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
209   int state = 0;
210   pthread_mutex_lock(&pdb->poll_thread.mutex);
211   state = pdb->poll_thread.state;
212   pthread_mutex_unlock(&pdb->poll_thread.mutex);
213   return (state == OVS_DB_POLL_STATE_RUNNING);
214 }
215
216 /* Generate unique identifier (UID). It is used by OVS DB API
217  * to set "id" field for any OVS DB JSON request. */
218 static uint64_t ovs_uid_generate() {
219   uint64_t new_uid;
220   pthread_mutex_lock(&ovs_uid_mutex);
221   new_uid = ++ovs_uid;
222   pthread_mutex_unlock(&ovs_uid_mutex);
223   return new_uid;
224 }
225
226 /*
227  * Callback API. These function are used to store
228  * registered callbacks in OVS DB API.
229  */
230
231 /* Add new callback into OVS DB object */
232 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
233   pthread_mutex_lock(&pdb->mutex);
234   if (pdb->remote_cb)
235     pdb->remote_cb->prev = new_cb;
236   new_cb->next = pdb->remote_cb;
237   new_cb->prev = NULL;
238   pdb->remote_cb = new_cb;
239   pthread_mutex_unlock(&pdb->mutex);
240 }
241
242 /* Remove callback from OVS DB object */
243 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
244   pthread_mutex_lock(&pdb->mutex);
245   ovs_callback_t *pre_cb = del_cb->prev;
246   ovs_callback_t *next_cb = del_cb->next;
247
248   if (next_cb)
249     next_cb->prev = del_cb->prev;
250
251   if (pre_cb)
252     pre_cb->next = del_cb->next;
253   else
254     pdb->remote_cb = del_cb->next;
255
256   free(del_cb);
257   pthread_mutex_unlock(&pdb->mutex);
258 }
259
260 /* Remove all callbacks form OVS DB object */
261 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
262   pthread_mutex_lock(&pdb->mutex);
263   for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
264        del_cb = pdb->remote_cb) {
265     pdb->remote_cb = del_cb->next;
266     free(del_cb);
267   }
268   pdb->remote_cb = NULL;
269   pthread_mutex_unlock(&pdb->mutex);
270 }
271
272 /* Get/find callback in OVS DB object by UID. Returns pointer
273  * to requested callback otherwise NULL is returned.
274  *
275  * IMPORTANT NOTE:
276  *   The OVS DB mutex MUST be locked by the caller
277  *   to make sure that returned callback is still valid.
278  */
279 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
280   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
281     if (cb->uid == uid)
282       return cb;
283   return NULL;
284 }
285
286 /* Send all requested data to the socket. Returns 0 if
287  * ALL request data has been sent otherwise negative value
288  * is returned */
289 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
290   ssize_t nbytes = 0;
291   size_t rem = len;
292   size_t off = 0;
293
294   while (rem > 0) {
295     if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
296       return (-1);
297     rem -= (size_t)nbytes;
298     off += (size_t)nbytes;
299   }
300   return (0);
301 }
302
303 /*
304  * YAJL (Yet Another JSON Library) helper functions
305  * Documentation (https://lloyd.github.io/yajl/)
306  */
307
308 /* Add null-terminated string into YAJL generator handle (JSON object).
309  * Similar function to yajl_gen_string() but takes null-terminated string
310  * instead of string and its length.
311  *
312  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
313  * string - Null-terminated string
314  */
315 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
316                                             const char *string) {
317   return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
318 }
319
320 /* Add YAJL value into YAJL generator handle (JSON object)
321  *
322  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
323  * jval - YAJL value usually returned by yajl_tree_get()
324  */
325 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
326   size_t array_len = 0;
327   yajl_val *jvalues = NULL;
328   yajl_val jobj_value = NULL;
329   const char *obj_key = NULL;
330   size_t obj_len = 0;
331   yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
332
333   if (jval == NULL)
334     return yajl_gen_generation_complete;
335
336   if (YAJL_IS_STRING(jval))
337     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
338   else if (YAJL_IS_DOUBLE(jval))
339     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
340   else if (YAJL_IS_INTEGER(jval))
341     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
342   else if (YAJL_IS_TRUE(jval))
343     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
344   else if (YAJL_IS_FALSE(jval))
345     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
346   else if (YAJL_IS_NULL(jval))
347     OVS_YAJL_CALL(yajl_gen_null, jgen);
348   else if (YAJL_IS_ARRAY(jval)) {
349     /* create new array and add all elements into the array */
350     array_len = YAJL_GET_ARRAY(jval)->len;
351     jvalues = YAJL_GET_ARRAY(jval)->values;
352     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
353     for (size_t i = 0; i < array_len; i++)
354       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
355     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
356   } else if (YAJL_IS_OBJECT(jval)) {
357     /* create new object and add all elements into the object */
358     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
359     obj_len = YAJL_GET_OBJECT(jval)->len;
360     for (size_t i = 0; i < obj_len; i++) {
361       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
362       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
363       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
364       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
365     }
366     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
367   } else {
368     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
369               (int)(jval)->type);
370     goto yajl_gen_failure;
371   }
372   return yajl_gen_status_ok;
373
374 yajl_gen_failure:
375   OVS_ERROR("%s() error to generate value", __FUNCTION__);
376   return yajl_gen_ret;
377 }
378
379 /* OVS DB echo request handler. When OVS DB sends
380  * "echo" request to the client, client should generate
381  * "echo" replay with the same content received in the
382  * request */
383 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
384   yajl_val jparams;
385   yajl_val jid;
386   yajl_gen jgen;
387   size_t resp_len = 0;
388   const char *resp = NULL;
389   const char *params_path[] = {"params", NULL};
390   const char *id_path[] = {"id", NULL};
391   yajl_gen_status yajl_gen_ret;
392
393   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
394     return (-1);
395
396   /* check & get request attributes */
397   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
398       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
399     OVS_ERROR("parse echo request failed");
400     goto yajl_gen_failure;
401   }
402
403   /* generate JSON echo response */
404   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
405
406   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
407   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
408
409   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
410   OVS_YAJL_CALL(yajl_gen_null, jgen);
411
412   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
413   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
414
415   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
416   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
417                 &resp_len);
418
419   /* send the response */
420   OVS_DEBUG("response: %s", resp);
421   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
422     OVS_ERROR("send echo reply failed");
423     goto yajl_gen_failure;
424   }
425   /* clean up and return success */
426   yajl_gen_clear(jgen);
427   return (0);
428
429 yajl_gen_failure:
430   /* release memory */
431   yajl_gen_clear(jgen);
432   return (-1);
433 }
434
435 /* Get OVS DB registered callback by YAJL val. The YAJL
436  * value should be YAJL string (UID). Returns NULL if
437  * callback hasn't been found. See also ovs_db_callback_get()
438  * description for addition info.
439  */
440 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
441   char *endptr = NULL;
442   const char *suid = NULL;
443   uint64_t uid;
444
445   if (jid && YAJL_IS_STRING(jid)) {
446     suid = YAJL_GET_STRING(jid);
447     uid = (uint64_t)strtoul(suid, &endptr, 16);
448     if (*endptr == '\0' && uid)
449       return ovs_db_callback_get(pdb, uid);
450   }
451
452   return NULL;
453 }
454
455 /* OVS DB table update event handler.
456  * This callback is called by POLL thread if OVS DB
457  * table update callback is received from the DB
458  * server. Once registered callback found, it's called
459  * by this handler. */
460 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
461   ovs_callback_t *cb = NULL;
462   yajl_val jvalue;
463   yajl_val jparams;
464   yajl_val jtable_updates;
465   const char *params_path[] = {"params", NULL};
466   const char *id_path[] = {"id", NULL};
467
468   /* check & get request attributes */
469   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
470       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
471     OVS_ERROR("invalid OVS DB request received");
472     return (-1);
473   }
474
475   /* check array length: [<json-value>, <table-updates>] */
476   if ((YAJL_GET_ARRAY(jparams) == NULL) ||
477       (YAJL_GET_ARRAY(jparams)->len != 2)) {
478     OVS_ERROR("invalid OVS DB request received");
479     return (-1);
480   }
481
482   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
483   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
484   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
485     OVS_ERROR("invalid OVS DB request id or table update received");
486     return (-1);
487   }
488
489   /* find registered callback based on <json-value> */
490   pthread_mutex_lock(&pdb->mutex);
491   cb = ovs_db_table_callback_get(pdb, jvalue);
492   if (cb == NULL || cb->table.call == NULL) {
493     OVS_ERROR("No OVS DB table update callback found");
494     pthread_mutex_unlock(&pdb->mutex);
495     return (-1);
496   }
497
498   /* call registered callback */
499   cb->table.call(jtable_updates);
500   pthread_mutex_unlock(&pdb->mutex);
501   return 0;
502 }
503
504 /* OVS DB result request handler.
505  * This callback is called by POLL thread if OVS DB
506  * result reply is received from the DB server.
507  * Once registered callback found, it's called
508  * by this handler. */
509 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
510   ovs_callback_t *cb = NULL;
511   yajl_val jresult;
512   yajl_val jerror;
513   yajl_val jid;
514   const char *result_path[] = {"result", NULL};
515   const char *error_path[] = {"error", NULL};
516   const char *id_path[] = {"id", NULL};
517
518   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
519   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
520   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
521
522   /* check & get result attributes */
523   if (!jresult || !jerror || !jid)
524     return (-1);
525
526   /* try to find registered callback */
527   pthread_mutex_lock(&pdb->mutex);
528   cb = ovs_db_table_callback_get(pdb, jid);
529   if (cb != NULL && cb->result.call != NULL) {
530     /* call registered callback */
531     cb->result.call(jresult, jerror);
532     /* unlock owner of the reply */
533     sem_post(&cb->result.sync);
534   }
535
536   pthread_mutex_unlock(&pdb->mutex);
537   return (0);
538 }
539
540 /* Handle JSON data (one request) and call
541  * appropriate event OVS DB handler. Currently,
542  * update callback 'ovs_db_table_update_cb' and
543  * result callback 'ovs_db_result_cb' is supported.
544  */
545 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
546                                     size_t len) {
547   const char *method = NULL;
548   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
549   const char *method_path[] = {"method", NULL};
550   const char *result_path[] = {"result", NULL};
551   char *sjson = NULL;
552   yajl_val jnode, jval;
553
554   /* duplicate the data to make null-terminated string
555    * required for yajl_tree_parse() */
556   if ((sjson = calloc(1, len + 1)) == NULL)
557     return (-1);
558
559   sstrncpy(sjson, data, len + 1);
560   OVS_DEBUG("[len=%zu] %s", len, sjson);
561
562   /* parse json data */
563   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
564   if (jnode == NULL) {
565     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
566     sfree(sjson);
567     return (-1);
568   }
569
570   /* get method name */
571   if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
572     if ((method = YAJL_GET_STRING(jval)) == NULL) {
573       yajl_tree_free(jnode);
574       sfree(sjson);
575       return (-1);
576     }
577     if (strcmp("echo", method) == 0) {
578       /* echo request from the server */
579       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
580         OVS_ERROR("handle echo request failed");
581     } else if (strcmp("update", method) == 0) {
582       /* update notification */
583       if (ovs_db_table_update_cb(pdb, jnode) < 0)
584         OVS_ERROR("handle update notification failed");
585     }
586   } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
587     /* result notification */
588     if (ovs_db_result_cb(pdb, jnode) < 0)
589       OVS_ERROR("handle result reply failed");
590   } else
591     OVS_ERROR("connot find method or result failed");
592
593   /* release memory */
594   yajl_tree_free(jnode);
595   sfree(sjson);
596   return (0);
597 }
598
599 /*
600  * JSON reader implementation.
601  *
602  * This module process raw JSON data (byte stream) and
603  * returns fully-fledged JSON data which can be processed
604  * (parsed) by YAJL later.
605  */
606
607 /* Allocate JSON reader instance */
608 static ovs_json_reader_t *ovs_json_reader_alloc() {
609   ovs_json_reader_t *jreader = NULL;
610
611   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
612     return NULL;
613
614   return jreader;
615 }
616
617 /* Push raw data into into the JSON reader for processing */
618 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
619                                      const char *data, size_t data_len) {
620   char *new_buff = NULL;
621   size_t available = jreader->buff_size - jreader->buff_offset;
622
623   /* check/update required memory space */
624   if (available < data_len) {
625     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
626               (int)jreader->buff_size, (int)available, (int)data_len);
627
628     /* allocate new chunk of memory */
629     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
630     if (new_buff == NULL)
631       return (-1);
632
633     /* point to new allocated memory */
634     jreader->buff_ptr = new_buff;
635     jreader->buff_size += data_len;
636   }
637
638   /* store input data */
639   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
640   jreader->buff_offset += data_len;
641   return (0);
642 }
643
644 /* Pop one fully-fledged JSON if already exists. Returns 0 if
645  * completed JSON already exists otherwise negative value is
646  * returned */
647 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
648                                const char **json_ptr, size_t *json_len_ptr) {
649   size_t nbraces = 0;
650   size_t json_len = 0;
651   char *json = NULL;
652
653   /* search open/close brace */
654   for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
655     if (jreader->buff_ptr[i] == '{') {
656       nbraces++;
657     } else if (jreader->buff_ptr[i] == '}')
658       if (nbraces)
659         if (!(--nbraces)) {
660           /* JSON data */
661           *json_ptr = jreader->buff_ptr + jreader->json_offset;
662           *json_len_ptr = json_len + 1;
663           jreader->json_offset = i + 1;
664           return (0);
665         }
666
667     /* increase JSON data length */
668     if (nbraces)
669       json_len++;
670   }
671
672   if (jreader->json_offset) {
673     if (jreader->json_offset < jreader->buff_offset) {
674       /* shift data to the beginning of the buffer
675        * and zero rest of the buffer data */
676       json = &jreader->buff_ptr[jreader->json_offset];
677       json_len = jreader->buff_offset - jreader->json_offset;
678       for (size_t i = 0; i < jreader->buff_size; i++)
679         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
680       jreader->buff_offset = json_len;
681     } else
682       /* reset the buffer */
683       jreader->buff_offset = 0;
684
685     /* data is at the beginning of the buffer */
686     jreader->json_offset = 0;
687   }
688
689   return (-1);
690 }
691
692 /* Reset JSON reader. It is useful when start processing
693  * new raw data. E.g.: in case of lost stream connection.
694  */
695 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
696   if (jreader) {
697     jreader->buff_offset = 0;
698     jreader->json_offset = 0;
699   }
700 }
701
702 /* Release internal data allocated for JSON reader */
703 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
704   if (jreader) {
705     free(jreader->buff_ptr);
706     free(jreader);
707   }
708 }
709
710 /* Reconnect to OVS DB and call the OVS DB post connection init callback
711  * if connection has been established.
712  */
713 static void ovs_db_reconnect(ovs_db_t *pdb) {
714   const char *node_info = pdb->node;
715   struct addrinfo *result;
716
717   if (pdb->unix_path[0] != '\0') {
718     /* use UNIX socket instead of INET address */
719     node_info = pdb->unix_path;
720     result = calloc(1, sizeof(struct addrinfo));
721     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
722     if (result == NULL || sa_unix == NULL) {
723       sfree(result);
724       sfree(sa_unix);
725       return;
726     }
727     result->ai_family = AF_UNIX;
728     result->ai_socktype = SOCK_STREAM;
729     result->ai_addrlen = sizeof(*sa_unix);
730     result->ai_addr = (struct sockaddr *)sa_unix;
731     sa_unix->sun_family = result->ai_family;
732     sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
733   } else {
734     /* inet socket address */
735     struct addrinfo hints;
736
737     /* setup criteria for selecting the socket address */
738     memset(&hints, 0, sizeof(hints));
739     hints.ai_family = AF_UNSPEC;
740     hints.ai_socktype = SOCK_STREAM;
741
742     /* get socket addresses */
743     int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
744     if (ret != 0) {
745       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
746       return;
747     }
748   }
749   /* try to connect to the server */
750   for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
751     char errbuff[OVS_ERROR_BUFF_SIZE];
752     int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
753     if (sock < 0) {
754       sstrerror(errno, errbuff, sizeof(errbuff));
755       OVS_DEBUG("socket(): %s", errbuff);
756       continue;
757     }
758     if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
759       close(sock);
760       sstrerror(errno, errbuff, sizeof(errbuff));
761       OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
762     } else {
763       /* send notification to event thread */
764       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
765       pdb->sock = sock;
766       break;
767     }
768   }
769
770   if (pdb->sock < 0)
771     OVS_ERROR("connect to \"%s\" failed", node_info);
772
773   freeaddrinfo(result);
774 }
775
776 /* POLL worker thread.
777  * It listens on OVS DB connection for incoming
778  * requests/reply/events etc. Also, it reconnects to OVS DB
779  * if connection has been lost.
780  */
781 static void *ovs_poll_worker(void *arg) {
782   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
783   ovs_json_reader_t *jreader = NULL;
784   struct pollfd poll_fd = {
785       .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
786   };
787
788   /* create JSON reader instance */
789   if ((jreader = ovs_json_reader_alloc()) == NULL) {
790     OVS_ERROR("initialize json reader failed");
791     return (NULL);
792   }
793
794   /* poll data */
795   while (ovs_db_poll_is_running(pdb)) {
796     char errbuff[OVS_ERROR_BUFF_SIZE];
797     poll_fd.fd = pdb->sock;
798     int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
799     if (poll_ret < 0) {
800       sstrerror(errno, errbuff, sizeof(errbuff));
801       OVS_ERROR("poll(): %s", errbuff);
802       break;
803     } else if (poll_ret == 0) {
804       OVS_DEBUG("poll(): timeout");
805       if (pdb->sock < 0)
806         /* invalid fd, so try to reconnect */
807         ovs_db_reconnect(pdb);
808       continue;
809     }
810     if (poll_fd.revents & POLLNVAL) {
811       /* invalid file descriptor, clean-up */
812       ovs_db_callback_remove_all(pdb);
813       ovs_json_reader_reset(jreader);
814       /* setting poll FD to -1 tells poll() call to ignore this FD.
815        * In that case poll() call will return timeout all the time */
816       pdb->sock = (-1);
817     } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
818       /* connection is broken */
819       close(poll_fd.fd);
820       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
821       OVS_ERROR("poll() peer closed its end of the channel");
822     } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
823       /* read incoming data */
824       char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
825       ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
826       if (nbytes < 0) {
827         sstrerror(errno, errbuff, sizeof(errbuff));
828         OVS_ERROR("recv(): %s", errbuff);
829         /* read error? Try to reconnect */
830         close(poll_fd.fd);
831         continue;
832       } else if (nbytes == 0) {
833         close(poll_fd.fd);
834         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
835         OVS_ERROR("recv() peer has performed an orderly shutdown");
836         continue;
837       }
838       /* read incoming data */
839       size_t json_len = 0;
840       const char *json = NULL;
841       OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
842       ovs_json_reader_push_data(jreader, buff, nbytes);
843       while (!ovs_json_reader_pop(jreader, &json, &json_len))
844         /* process JSON data */
845         ovs_db_json_data_process(pdb, json, json_len);
846     }
847   }
848
849   OVS_DEBUG("poll thread has been completed");
850   ovs_json_reader_free(jreader);
851   return (NULL);
852 }
853
854 /* EVENT worker thread.
855  * Perform task based on incoming events. This
856  * task can be done asynchronously which allows to
857  * handle OVS DB callback like 'init_cb'.
858  */
859 static void *ovs_event_worker(void *arg) {
860   ovs_db_t *pdb = (ovs_db_t *)arg;
861
862   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
863     /* wait for an event */
864     struct timespec ts;
865     clock_gettime(CLOCK_REALTIME, &ts);
866     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
867     int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
868                                      &pdb->event_thread.mutex, &ts);
869     if (!ret) {
870       /* handle the event */
871       OVS_DEBUG("handle event %d", pdb->event_thread.value);
872       switch (pdb->event_thread.value) {
873       case OVS_DB_EVENT_CONN_ESTABLISHED:
874         if (pdb->cb.post_conn_init)
875           pdb->cb.post_conn_init(pdb);
876         break;
877       case OVS_DB_EVENT_CONN_TERMINATED:
878         if (pdb->cb.post_conn_terminate)
879           pdb->cb.post_conn_terminate();
880         break;
881       default:
882         OVS_DEBUG("unknown event received");
883         break;
884       }
885     } else if (ret == ETIMEDOUT) {
886       /* wait timeout */
887       OVS_DEBUG("no event received (timeout)");
888       continue;
889     } else {
890       /* unexpected error */
891       OVS_ERROR("pthread_cond_timedwait() failed");
892       break;
893     }
894   }
895
896   OVS_DEBUG("event thread has been completed");
897   return (NULL);
898 }
899
900 /* Initialize EVENT thread */
901 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
902   pdb->event_thread.tid = (pthread_t)-1;
903   /* init event thread condition variable */
904   if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
905     return (-1);
906   }
907   /* init event thread mutex */
908   if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
909     pthread_cond_destroy(&pdb->event_thread.cond);
910     return (-1);
911   }
912   /* Hold the event thread mutex. It ensures that no events
913    * will be lost while thread is still starting. Once event
914    * thread is started and ready to accept events, it will release
915    * the mutex */
916   if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
917     pthread_mutex_destroy(&pdb->event_thread.mutex);
918     pthread_cond_destroy(&pdb->event_thread.cond);
919     return (-1);
920   }
921   /* start event thread */
922   pthread_t tid;
923   if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
924                            "utils_ovs:event") != 0) {
925     pthread_mutex_unlock(&pdb->event_thread.mutex);
926     pthread_mutex_destroy(&pdb->event_thread.mutex);
927     pthread_cond_destroy(&pdb->event_thread.cond);
928     return (-1);
929   }
930   pdb->event_thread.tid = tid;
931   return (0);
932 }
933
934 /* Destroy EVENT thread */
935 static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
936   if (pdb->event_thread.tid == (pthread_t)-1)
937     /* already destroyed */
938     return (0);
939   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
940   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
941     return (-1);
942   /* Event thread always holds the thread mutex when
943    * performs some task (handles event) and releases it when
944    * while sleeping. Thus, if event thread exits, the mutex
945    * remains locked */
946   pthread_mutex_unlock(&pdb->event_thread.mutex);
947   pthread_mutex_destroy(&pdb->event_thread.mutex);
948   pthread_cond_destroy(&pdb->event_thread.cond);
949   pdb->event_thread.tid = (pthread_t)-1;
950   return (0);
951 }
952
953 /* Initialize POLL thread */
954 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
955   pdb->poll_thread.tid = (pthread_t)-1;
956   /* init event thread mutex */
957   if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
958     return (-1);
959   }
960   /* start poll thread */
961   pthread_t tid;
962   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
963   if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
964                            "utils_ovs:poll") != 0) {
965     pthread_mutex_destroy(&pdb->poll_thread.mutex);
966     return (-1);
967   }
968   pdb->poll_thread.tid = tid;
969   return (0);
970 }
971
972 /* Destroy POLL thread */
973 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
974   if (pdb->poll_thread.tid == (pthread_t)-1)
975     /* already destroyed */
976     return (0);
977   /* change thread state */
978   pthread_mutex_lock(&pdb->poll_thread.mutex);
979   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
980   pthread_mutex_unlock(&pdb->poll_thread.mutex);
981   /* join the thread */
982   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
983     return (-1);
984   pthread_mutex_destroy(&pdb->poll_thread.mutex);
985   pdb->poll_thread.tid = (pthread_t)-1;
986   return (0);
987 }
988
989 /*
990  * Public OVS DB API implementation
991  */
992
993 ovs_db_t *ovs_db_init(const char *node, const char *service,
994                       const char *unix_path, ovs_db_callback_t *cb) {
995   /* sanity check */
996   if (node == NULL || service == NULL || unix_path == NULL)
997     return (NULL);
998
999   /* allocate db data & fill it */
1000   ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
1001   if (pdb == NULL)
1002     return (NULL);
1003
1004   /* store the OVS DB address */
1005   sstrncpy(pdb->node, node, sizeof(pdb->node));
1006   sstrncpy(pdb->service, service, sizeof(pdb->service));
1007   sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1008
1009   /* setup OVS DB callbacks */
1010   if (cb)
1011     pdb->cb = *cb;
1012
1013   /* init OVS DB mutex attributes */
1014   pthread_mutexattr_t mutex_attr;
1015   if (pthread_mutexattr_init(&mutex_attr)) {
1016     OVS_ERROR("OVS DB mutex attribute init failed");
1017     sfree(pdb);
1018     return (NULL);
1019   }
1020   /* set OVS DB mutex as recursive */
1021   if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1022     OVS_ERROR("Failed to set OVS DB mutex as recursive");
1023     pthread_mutexattr_destroy(&mutex_attr);
1024     sfree(pdb);
1025     return (NULL);
1026   }
1027   /* init OVS DB mutex */
1028   if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1029     OVS_ERROR("OVS DB mutex init failed");
1030     pthread_mutexattr_destroy(&mutex_attr);
1031     sfree(pdb);
1032     return (NULL);
1033   }
1034   /* destroy mutex attributes */
1035   pthread_mutexattr_destroy(&mutex_attr);
1036
1037   /* init event thread */
1038   if (ovs_db_event_thread_init(pdb) < 0) {
1039     ovs_db_destroy(pdb);
1040     return (NULL);
1041   }
1042
1043   /* init polling thread */
1044   pdb->sock = -1;
1045   if (ovs_db_poll_thread_init(pdb) < 0) {
1046     ovs_db_destroy(pdb);
1047     return (NULL);
1048   }
1049   return pdb;
1050 }
1051
1052 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1053                         ovs_db_result_cb_t cb) {
1054   int ret = 0;
1055   yajl_gen_status yajl_gen_ret;
1056   yajl_val jparams;
1057   yajl_gen jgen;
1058   ovs_callback_t *new_cb = NULL;
1059   uint64_t uid;
1060   char uid_buff[OVS_UID_STR_SIZE];
1061   const char *req = NULL;
1062   size_t req_len = 0;
1063   struct timespec ts;
1064
1065   /* sanity check */
1066   if (!pdb || !method || !params)
1067     return (-1);
1068
1069   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1070     return (-1);
1071
1072   /* try to parse params */
1073   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1074     OVS_ERROR("params is not a JSON string");
1075     yajl_gen_clear(jgen);
1076     return (-1);
1077   }
1078
1079   /* generate method field */
1080   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1081
1082   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1083   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1084
1085   /* generate params field */
1086   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1087   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1088   yajl_tree_free(jparams);
1089
1090   /* generate id field */
1091   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1092   uid = ovs_uid_generate();
1093   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1094   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1095
1096   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1097
1098   if (cb) {
1099     /* register result callback */
1100     if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1101       goto yajl_gen_failure;
1102
1103     /* add new callback to front */
1104     sem_init(&new_cb->result.sync, 0, 0);
1105     new_cb->result.call = cb;
1106     new_cb->uid = uid;
1107     ovs_db_callback_add(pdb, new_cb);
1108   }
1109
1110   /* send the request */
1111   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1112   OVS_DEBUG("%s", req);
1113   if (!ovs_db_data_send(pdb, req, req_len)) {
1114     if (cb) {
1115       /* wait for result */
1116       clock_gettime(CLOCK_REALTIME, &ts);
1117       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1118       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1119         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1120                   OVS_DB_SEND_REQ_TIMEOUT);
1121         ret = (-1);
1122       }
1123     }
1124   } else {
1125     OVS_ERROR("ovs_db_data_send() failed");
1126     ret = (-1);
1127   }
1128
1129 yajl_gen_failure:
1130   if (new_cb) {
1131     /* destroy callback */
1132     sem_destroy(&new_cb->result.sync);
1133     ovs_db_callback_remove(pdb, new_cb);
1134   }
1135
1136   /* release memory */
1137   yajl_gen_clear(jgen);
1138   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1139 }
1140
1141 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1142                              const char **tb_column,
1143                              ovs_db_table_cb_t update_cb,
1144                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1145   yajl_gen jgen;
1146   yajl_gen_status yajl_gen_ret;
1147   ovs_callback_t *new_cb = NULL;
1148   char uid_str[OVS_UID_STR_SIZE];
1149   char *params;
1150   size_t params_len;
1151   int ovs_db_ret = 0;
1152
1153   /* sanity check */
1154   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1155     return (-1);
1156
1157   /* allocate new update callback */
1158   if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1159     return (-1);
1160
1161   /* init YAJL generator */
1162   if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1163     sfree(new_cb);
1164     return (-1);
1165   }
1166
1167   /* add new callback to front */
1168   new_cb->table.call = update_cb;
1169   new_cb->uid = ovs_uid_generate();
1170   ovs_db_callback_add(pdb, new_cb);
1171
1172   /* make update notification request
1173    * [<db-name>, <json-value>, <monitor-requests>] */
1174   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1175   {
1176     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1177
1178     /* uid string <json-value> */
1179     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1180     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1181
1182     /* <monitor-requests> */
1183     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1184     {
1185       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1186       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1187       {
1188         /* <monitor-request> */
1189         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1190         {
1191           if (tb_column) {
1192             /* columns within the table to be monitored */
1193             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1194             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1195             for (; *tb_column; tb_column++)
1196               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1197             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1198           }
1199           /* specify select option */
1200           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1201           {
1202             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1203             {
1204               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1205               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1206                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1207               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1208               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1209                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1210               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1211               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1212                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1213               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1214               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1215                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1216             }
1217             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1218           }
1219         }
1220         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1221       }
1222       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1223     }
1224     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1225   }
1226   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1227
1228   /* make a request to subscribe to given table */
1229   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1230                 &params_len);
1231   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1232     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1233     ovs_db_ret = (-1);
1234   }
1235
1236 yajl_gen_failure:
1237   /* release memory */
1238   yajl_gen_clear(jgen);
1239   return ovs_db_ret;
1240 }
1241
1242 int ovs_db_destroy(ovs_db_t *pdb) {
1243   int ovs_db_ret = 0;
1244   int ret = 0;
1245
1246   /* sanity check */
1247   if (pdb == NULL)
1248     return (-1);
1249
1250   /* try to lock the structure before releasing */
1251   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1252     OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1253     return (-1);
1254   }
1255
1256   /* stop poll thread */
1257   if (ovs_db_event_thread_destroy(pdb) < 0) {
1258     OVS_ERROR("destroy poll thread failed");
1259     ovs_db_ret = (-1);
1260   }
1261
1262   /* stop event thread */
1263   if (ovs_db_poll_thread_destroy(pdb) < 0) {
1264     OVS_ERROR("stop event thread failed");
1265     ovs_db_ret = (-1);
1266   }
1267
1268   /* unsubscribe callbacks */
1269   ovs_db_callback_remove_all(pdb);
1270
1271   /* close connection */
1272   if (pdb->sock >= 0)
1273     close(pdb->sock);
1274
1275   /* release DB handler */
1276   pthread_mutex_unlock(&pdb->mutex);
1277   pthread_mutex_destroy(&pdb->mutex);
1278   sfree(pdb);
1279   return ovs_db_ret;
1280 }
1281
1282 /*
1283  * Public OVS utils API implementation
1284  */
1285
1286 /* Get YAJL value by key from YAJL dictionary
1287  *
1288  * EXAMPLE:
1289  *  {
1290  *    "key_a" : <YAJL return value>
1291  *    "key_b" : <YAJL return value>
1292  *  }
1293  */
1294 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1295   const char *obj_key = NULL;
1296
1297   /* check params */
1298   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1299     return NULL;
1300
1301   /* find a value by key */
1302   for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1303     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1304     if (strcmp(obj_key, key) == 0)
1305       return YAJL_GET_OBJECT(jval)->values[i];
1306   }
1307
1308   return NULL;
1309 }
1310
1311 /* Get OVS DB map value by given map key
1312  *
1313  * FROM RFC7047:
1314  *
1315  *   <pair>
1316  *     A 2-element JSON array that represents a pair within a database
1317  *     map.  The first element is an <atom> that represents the key, and
1318  *     the second element is an <atom> that represents the value.
1319  *
1320  *   <map>
1321  *     A 2-element JSON array that represents a database map value.  The
1322  *     first element of the array must be the string "map", and the
1323  *     second element must be an array of zero or more <pair>s giving the
1324  *     values in the map.  All of the <pair>s must have the same key and
1325  *     value types.
1326  *
1327  * EXAMPLE:
1328  *  [
1329  *    "map", [
1330  *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1331  *           ]
1332  *  ]
1333  */
1334 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1335   size_t map_len = 0;
1336   size_t array_len = 0;
1337   yajl_val *map_values = NULL;
1338   yajl_val *array_values = NULL;
1339   const char *str_val = NULL;
1340
1341   /* check YAJL array */
1342   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1343     return NULL;
1344
1345   /* check a database map value (2-element, first one should be a string */
1346   array_len = YAJL_GET_ARRAY(jval)->len;
1347   array_values = YAJL_GET_ARRAY(jval)->values;
1348   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1349       (!YAJL_IS_ARRAY(array_values[1])))
1350     return NULL;
1351
1352   /* check first element of the array */
1353   str_val = YAJL_GET_STRING(array_values[0]);
1354   if (strcmp("map", str_val) != 0)
1355     return NULL;
1356
1357   /* try to find map value by map key */
1358   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1359   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1360   for (size_t i = 0; i < map_len; i++) {
1361     /* check YAJL array */
1362     if (!YAJL_IS_ARRAY(map_values[i]))
1363       break;
1364
1365     /* check a database pair value (2-element, first one represents a key
1366      * and it should be a string in our case */
1367     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1368     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1369     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1370       break;
1371
1372     /* return map value if given key equals map key */
1373     str_val = YAJL_GET_STRING(array_values[0]);
1374     if (strcmp(key, str_val) == 0)
1375       return array_values[1];
1376   }
1377   return NULL;
1378 }