Merge pull request #2287 from BrandonArp/fix_am_1_11
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  **/
27
28 /* clang-format off */
29 /*
30  *                         OVS DB API internal architecture diagram
31  * +------------------------------------------------------------------------------+
32  * |OVS plugin      |OVS utils                                                    |
33  * |                |     +------------------------+                              |
34  * |                |     |      echo handler      |                JSON request/ |
35  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36  * |                |  |  |                        |    |         | result        |
37  * |                |  |  +------------------------+    |         |               |
38  * |                |  |                                |    +----+---+--------+  |
39  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
40  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
41  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
42  * |  +----------+  |  |  |                        |    |    |        |        |  |
43  * |                |  |  +------------------------+    |    +--------+---+----+  |
44  * |                |  |                                |                 ^       |
45  * |  +----------+  |  |  +------------------------+    |                 |       |
46  * |  |  result  |  |  |  |     result handler     |    |                 |       |
47  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
48  * |  +----------+  |  |  |                        |               data   |       |
49  * |                |  |  +------------------------+                      |       |
50  * |                |  |                                                  |       |
51  * |                |  |    +------------------+             +------------+----+  |
52  * |  +----------+  |  |    |thread|           |             |thread|          |  |
53  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
54  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
55  * |  +----------+  |  |    +------------------+             +--------+--------+  |
56  * |                |  |                                              ^           |
57  * +----------------+-------------------------------------------------------------+
58  *                     |                                              |
59  *                 JSON|echo reply                                 raw|data
60  *                     v                                              v
61  * +-------------------+----------------------------------------------+-----------+
62  * |                                 TCP/UNIX socket                              |
63  * +-------------------------------------------------------------------------------
64  */
65 /* clang-format on */
66
67 /* collectd headers */
68 #include "collectd.h"
69
70 #include "common.h"
71
72 /* private headers */
73 #include "utils_ovs.h"
74
75 /* system libraries */
76 #if HAVE_NETDB_H
77 #include <netdb.h>
78 #endif
79 #if HAVE_ARPA_INET_H
80 #include <arpa/inet.h>
81 #endif
82 #if HAVE_POLL_H
83 #include <poll.h>
84 #endif
85 #if HAVE_SYS_UN_H
86 #include <sys/un.h>
87 #endif
88
89 #include <semaphore.h>
90
91 #define OVS_ERROR(fmt, ...)                                                    \
92   do {                                                                         \
93     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
94   } while (0)
95 #define OVS_DEBUG(fmt, ...)                                                    \
96   do {                                                                         \
97     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
98           ##__VA_ARGS__);                                                      \
99   } while (0)
100
101 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
102 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
103 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
104
105 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
106 #define OVS_DB_EVENT_TERMINATE 1
107 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
108 #define OVS_DB_EVENT_CONN_TERMINATED 3
109
110 #define OVS_DB_POLL_STATE_RUNNING 1
111 #define OVS_DB_POLL_STATE_EXITING 2
112
113 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
114
115 #define OVS_YAJL_CALL(func, ...)                                               \
116   do {                                                                         \
117     yajl_gen_ret = yajl_gen_status_ok;                                         \
118     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
119       goto yajl_gen_failure;                                                   \
120   } while (0)
121 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
122 #define OVS_ERROR_BUFF_SIZE 512
123 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
124
125 /* JSON reader internal data */
126 struct ovs_json_reader_s {
127   char *buff_ptr;
128   size_t buff_size;
129   size_t buff_offset;
130   size_t json_offset;
131 };
132 typedef struct ovs_json_reader_s ovs_json_reader_t;
133
134 /* Result callback declaration */
135 struct ovs_result_cb_s {
136   sem_t sync;
137   ovs_db_result_cb_t call;
138 };
139 typedef struct ovs_result_cb_s ovs_result_cb_t;
140
141 /* Table callback declaration */
142 struct ovs_table_cb_s {
143   ovs_db_table_cb_t call;
144 };
145 typedef struct ovs_table_cb_s ovs_table_cb_t;
146
147 /* Callback declaration */
148 struct ovs_callback_s {
149   uint64_t uid;
150   union {
151     ovs_result_cb_t result;
152     ovs_table_cb_t table;
153   };
154   struct ovs_callback_s *next;
155   struct ovs_callback_s *prev;
156 };
157 typedef struct ovs_callback_s ovs_callback_t;
158
159 /* Event thread data declaration */
160 struct ovs_event_thread_s {
161   pthread_t tid;
162   pthread_mutex_t mutex;
163   pthread_cond_t cond;
164   int value;
165 };
166 typedef struct ovs_event_thread_s ovs_event_thread_t;
167
168 /* Poll thread data declaration */
169 struct ovs_poll_thread_s {
170   pthread_t tid;
171   pthread_mutex_t mutex;
172   int state;
173 };
174 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
175
176 /* OVS DB internal data declaration */
177 struct ovs_db_s {
178   ovs_poll_thread_t poll_thread;
179   ovs_event_thread_t event_thread;
180   pthread_mutex_t mutex;
181   ovs_callback_t *remote_cb;
182   ovs_db_callback_t cb;
183   char service[OVS_DB_ADDR_SERVICE_SIZE];
184   char node[OVS_DB_ADDR_NODE_SIZE];
185   char unix_path[OVS_DB_ADDR_NODE_SIZE];
186   int sock;
187 };
188
189 /* Global variables */
190 static uint64_t ovs_uid = 0;
191 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
192
193 /* Post an event to event thread.
194  * Possible events are:
195  *  OVS_DB_EVENT_TERMINATE
196  *  OVS_DB_EVENT_CONN_ESTABLISHED
197  *  OVS_DB_EVENT_CONN_TERMINATED
198  */
199 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
200   pthread_mutex_lock(&pdb->event_thread.mutex);
201   pdb->event_thread.value = event;
202   pthread_mutex_unlock(&pdb->event_thread.mutex);
203   pthread_cond_signal(&pdb->event_thread.cond);
204 }
205
206 /* Check if POLL thread is still running. Returns
207  * 1 if running otherwise 0 is returned */
208 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
209   int state = 0;
210   pthread_mutex_lock(&pdb->poll_thread.mutex);
211   state = pdb->poll_thread.state;
212   pthread_mutex_unlock(&pdb->poll_thread.mutex);
213   return state == OVS_DB_POLL_STATE_RUNNING;
214 }
215
216 /* Generate unique identifier (UID). It is used by OVS DB API
217  * to set "id" field for any OVS DB JSON request. */
218 static uint64_t ovs_uid_generate() {
219   uint64_t new_uid;
220   pthread_mutex_lock(&ovs_uid_mutex);
221   new_uid = ++ovs_uid;
222   pthread_mutex_unlock(&ovs_uid_mutex);
223   return new_uid;
224 }
225
226 /*
227  * Callback API. These function are used to store
228  * registered callbacks in OVS DB API.
229  */
230
231 /* Add new callback into OVS DB object */
232 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
233   pthread_mutex_lock(&pdb->mutex);
234   if (pdb->remote_cb)
235     pdb->remote_cb->prev = new_cb;
236   new_cb->next = pdb->remote_cb;
237   new_cb->prev = NULL;
238   pdb->remote_cb = new_cb;
239   pthread_mutex_unlock(&pdb->mutex);
240 }
241
242 /* Remove callback from OVS DB object */
243 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
244   pthread_mutex_lock(&pdb->mutex);
245   ovs_callback_t *pre_cb = del_cb->prev;
246   ovs_callback_t *next_cb = del_cb->next;
247
248   if (next_cb)
249     next_cb->prev = del_cb->prev;
250
251   if (pre_cb)
252     pre_cb->next = del_cb->next;
253   else
254     pdb->remote_cb = del_cb->next;
255
256   free(del_cb);
257   pthread_mutex_unlock(&pdb->mutex);
258 }
259
260 /* Remove all callbacks form OVS DB object */
261 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
262   pthread_mutex_lock(&pdb->mutex);
263   for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
264        del_cb = pdb->remote_cb) {
265     pdb->remote_cb = del_cb->next;
266     free(del_cb);
267   }
268   pdb->remote_cb = NULL;
269   pthread_mutex_unlock(&pdb->mutex);
270 }
271
272 /* Get/find callback in OVS DB object by UID. Returns pointer
273  * to requested callback otherwise NULL is returned.
274  *
275  * IMPORTANT NOTE:
276  *   The OVS DB mutex MUST be locked by the caller
277  *   to make sure that returned callback is still valid.
278  */
279 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
280   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
281     if (cb->uid == uid)
282       return cb;
283   return NULL;
284 }
285
286 /* Send all requested data to the socket. Returns 0 if
287  * ALL request data has been sent otherwise negative value
288  * is returned */
289 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
290   ssize_t nbytes = 0;
291   size_t rem = len;
292   size_t off = 0;
293
294   while (rem > 0) {
295     if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
296       return -1;
297     rem -= (size_t)nbytes;
298     off += (size_t)nbytes;
299   }
300   return 0;
301 }
302
303 /*
304  * YAJL (Yet Another JSON Library) helper functions
305  * Documentation (https://lloyd.github.io/yajl/)
306  */
307
308 /* Add null-terminated string into YAJL generator handle (JSON object).
309  * Similar function to yajl_gen_string() but takes null-terminated string
310  * instead of string and its length.
311  *
312  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
313  * string - Null-terminated string
314  */
315 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
316                                             const char *string) {
317   return yajl_gen_string(hander, (const unsigned char *)string,
318                          strlen(string));
319 }
320
321 /* Add YAJL value into YAJL generator handle (JSON object)
322  *
323  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
324  * jval - YAJL value usually returned by yajl_tree_get()
325  */
326 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
327   size_t array_len = 0;
328   yajl_val *jvalues = NULL;
329   yajl_val jobj_value = NULL;
330   const char *obj_key = NULL;
331   size_t obj_len = 0;
332   yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
333
334   if (jval == NULL)
335     return yajl_gen_generation_complete;
336
337   if (YAJL_IS_STRING(jval))
338     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
339   else if (YAJL_IS_DOUBLE(jval))
340     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
341   else if (YAJL_IS_INTEGER(jval))
342     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
343   else if (YAJL_IS_TRUE(jval))
344     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
345   else if (YAJL_IS_FALSE(jval))
346     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
347   else if (YAJL_IS_NULL(jval))
348     OVS_YAJL_CALL(yajl_gen_null, jgen);
349   else if (YAJL_IS_ARRAY(jval)) {
350     /* create new array and add all elements into the array */
351     array_len = YAJL_GET_ARRAY(jval)->len;
352     jvalues = YAJL_GET_ARRAY(jval)->values;
353     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
354     for (size_t i = 0; i < array_len; i++)
355       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
356     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
357   } else if (YAJL_IS_OBJECT(jval)) {
358     /* create new object and add all elements into the object */
359     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
360     obj_len = YAJL_GET_OBJECT(jval)->len;
361     for (size_t i = 0; i < obj_len; i++) {
362       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
363       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
364       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
365       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
366     }
367     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
368   } else {
369     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
370               (int)(jval)->type);
371     goto yajl_gen_failure;
372   }
373   return yajl_gen_status_ok;
374
375 yajl_gen_failure:
376   OVS_ERROR("%s() error to generate value", __FUNCTION__);
377   return yajl_gen_ret;
378 }
379
380 /* OVS DB echo request handler. When OVS DB sends
381  * "echo" request to the client, client should generate
382  * "echo" replay with the same content received in the
383  * request */
384 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
385   yajl_val jparams;
386   yajl_val jid;
387   yajl_gen jgen;
388   size_t resp_len = 0;
389   const char *resp = NULL;
390   const char *params_path[] = {"params", NULL};
391   const char *id_path[] = {"id", NULL};
392   yajl_gen_status yajl_gen_ret;
393
394   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
395     return -1;
396
397   /* check & get request attributes */
398   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
399       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
400     OVS_ERROR("parse echo request failed");
401     goto yajl_gen_failure;
402   }
403
404   /* generate JSON echo response */
405   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
406
407   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
408   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
409
410   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
411   OVS_YAJL_CALL(yajl_gen_null, jgen);
412
413   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
414   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
415
416   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
417   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
418                 &resp_len);
419
420   /* send the response */
421   OVS_DEBUG("response: %s", resp);
422   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
423     OVS_ERROR("send echo reply failed");
424     goto yajl_gen_failure;
425   }
426   /* clean up and return success */
427   yajl_gen_clear(jgen);
428   return 0;
429
430 yajl_gen_failure:
431   /* release memory */
432   yajl_gen_clear(jgen);
433   return -1;
434 }
435
436 /* Get OVS DB registered callback by YAJL val. The YAJL
437  * value should be YAJL string (UID). Returns NULL if
438  * callback hasn't been found. See also ovs_db_callback_get()
439  * description for addition info.
440  */
441 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
442   char *endptr = NULL;
443   const char *suid = NULL;
444   uint64_t uid;
445
446   if (jid && YAJL_IS_STRING(jid)) {
447     suid = YAJL_GET_STRING(jid);
448     uid = (uint64_t)strtoul(suid, &endptr, 16);
449     if (*endptr == '\0' && uid)
450       return ovs_db_callback_get(pdb, uid);
451   }
452
453   return NULL;
454 }
455
456 /* OVS DB table update event handler.
457  * This callback is called by POLL thread if OVS DB
458  * table update callback is received from the DB
459  * server. Once registered callback found, it's called
460  * by this handler. */
461 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
462   ovs_callback_t *cb = NULL;
463   yajl_val jvalue;
464   yajl_val jparams;
465   yajl_val jtable_updates;
466   const char *params_path[] = {"params", NULL};
467   const char *id_path[] = {"id", NULL};
468
469   /* check & get request attributes */
470   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
471       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
472     OVS_ERROR("invalid OVS DB request received");
473     return -1;
474   }
475
476   /* check array length: [<json-value>, <table-updates>] */
477   if ((YAJL_GET_ARRAY(jparams) == NULL) ||
478       (YAJL_GET_ARRAY(jparams)->len != 2)) {
479     OVS_ERROR("invalid OVS DB request received");
480     return -1;
481   }
482
483   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
484   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
485   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
486     OVS_ERROR("invalid OVS DB request id or table update received");
487     return -1;
488   }
489
490   /* find registered callback based on <json-value> */
491   pthread_mutex_lock(&pdb->mutex);
492   cb = ovs_db_table_callback_get(pdb, jvalue);
493   if (cb == NULL || cb->table.call == NULL) {
494     OVS_ERROR("No OVS DB table update callback found");
495     pthread_mutex_unlock(&pdb->mutex);
496     return -1;
497   }
498
499   /* call registered callback */
500   cb->table.call(jtable_updates);
501   pthread_mutex_unlock(&pdb->mutex);
502   return 0;
503 }
504
505 /* OVS DB result request handler.
506  * This callback is called by POLL thread if OVS DB
507  * result reply is received from the DB server.
508  * Once registered callback found, it's called
509  * by this handler. */
510 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
511   ovs_callback_t *cb = NULL;
512   yajl_val jresult;
513   yajl_val jerror;
514   yajl_val jid;
515   const char *result_path[] = {"result", NULL};
516   const char *error_path[] = {"error", NULL};
517   const char *id_path[] = {"id", NULL};
518
519   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
520   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
521   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
522
523   /* check & get result attributes */
524   if (!jresult || !jerror || !jid)
525     return -1;
526
527   /* try to find registered callback */
528   pthread_mutex_lock(&pdb->mutex);
529   cb = ovs_db_table_callback_get(pdb, jid);
530   if (cb != NULL && cb->result.call != NULL) {
531     /* call registered callback */
532     cb->result.call(jresult, jerror);
533     /* unlock owner of the reply */
534     sem_post(&cb->result.sync);
535   }
536
537   pthread_mutex_unlock(&pdb->mutex);
538   return 0;
539 }
540
541 /* Handle JSON data (one request) and call
542  * appropriate event OVS DB handler. Currently,
543  * update callback 'ovs_db_table_update_cb' and
544  * result callback 'ovs_db_result_cb' is supported.
545  */
546 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
547                                     size_t len) {
548   const char *method = NULL;
549   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
550   const char *method_path[] = {"method", NULL};
551   const char *result_path[] = {"result", NULL};
552   char *sjson = NULL;
553   yajl_val jnode, jval;
554
555   /* duplicate the data to make null-terminated string
556    * required for yajl_tree_parse() */
557   if ((sjson = calloc(1, len + 1)) == NULL)
558     return -1;
559
560   sstrncpy(sjson, data, len + 1);
561   OVS_DEBUG("[len=%zu] %s", len, sjson);
562
563   /* parse json data */
564   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
565   if (jnode == NULL) {
566     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
567     sfree(sjson);
568     return -1;
569   }
570
571   /* get method name */
572   if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
573     if ((method = YAJL_GET_STRING(jval)) == NULL) {
574       yajl_tree_free(jnode);
575       sfree(sjson);
576       return -1;
577     }
578     if (strcmp("echo", method) == 0) {
579       /* echo request from the server */
580       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
581         OVS_ERROR("handle echo request failed");
582     } else if (strcmp("update", method) == 0) {
583       /* update notification */
584       if (ovs_db_table_update_cb(pdb, jnode) < 0)
585         OVS_ERROR("handle update notification failed");
586     }
587   } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
588     /* result notification */
589     if (ovs_db_result_cb(pdb, jnode) < 0)
590       OVS_ERROR("handle result reply failed");
591   } else
592     OVS_ERROR("connot find method or result failed");
593
594   /* release memory */
595   yajl_tree_free(jnode);
596   sfree(sjson);
597   return 0;
598 }
599
600 /*
601  * JSON reader implementation.
602  *
603  * This module process raw JSON data (byte stream) and
604  * returns fully-fledged JSON data which can be processed
605  * (parsed) by YAJL later.
606  */
607
608 /* Allocate JSON reader instance */
609 static ovs_json_reader_t *ovs_json_reader_alloc() {
610   ovs_json_reader_t *jreader = NULL;
611
612   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
613     return NULL;
614
615   return jreader;
616 }
617
618 /* Push raw data into into the JSON reader for processing */
619 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
620                                      const char *data, size_t data_len) {
621   char *new_buff = NULL;
622   size_t available = jreader->buff_size - jreader->buff_offset;
623
624   /* check/update required memory space */
625   if (available < data_len) {
626     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
627               (int)jreader->buff_size, (int)available, (int)data_len);
628
629     /* allocate new chunk of memory */
630     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
631     if (new_buff == NULL)
632       return -1;
633
634     /* point to new allocated memory */
635     jreader->buff_ptr = new_buff;
636     jreader->buff_size += data_len;
637   }
638
639   /* store input data */
640   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
641   jreader->buff_offset += data_len;
642   return 0;
643 }
644
645 /* Pop one fully-fledged JSON if already exists. Returns 0 if
646  * completed JSON already exists otherwise negative value is
647  * returned */
648 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
649                                const char **json_ptr, size_t *json_len_ptr) {
650   size_t nbraces = 0;
651   size_t json_len = 0;
652   char *json = NULL;
653
654   /* search open/close brace */
655   for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
656     if (jreader->buff_ptr[i] == '{') {
657       nbraces++;
658     } else if (jreader->buff_ptr[i] == '}')
659       if (nbraces)
660         if (!(--nbraces)) {
661           /* JSON data */
662           *json_ptr = jreader->buff_ptr + jreader->json_offset;
663           *json_len_ptr = json_len + 1;
664           jreader->json_offset = i + 1;
665           return 0;
666         }
667
668     /* increase JSON data length */
669     if (nbraces)
670       json_len++;
671   }
672
673   if (jreader->json_offset) {
674     if (jreader->json_offset < jreader->buff_offset) {
675       /* shift data to the beginning of the buffer
676        * and zero rest of the buffer data */
677       json = &jreader->buff_ptr[jreader->json_offset];
678       json_len = jreader->buff_offset - jreader->json_offset;
679       for (size_t i = 0; i < jreader->buff_size; i++)
680         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
681       jreader->buff_offset = json_len;
682     } else
683       /* reset the buffer */
684       jreader->buff_offset = 0;
685
686     /* data is at the beginning of the buffer */
687     jreader->json_offset = 0;
688   }
689
690   return -1;
691 }
692
693 /* Reset JSON reader. It is useful when start processing
694  * new raw data. E.g.: in case of lost stream connection.
695  */
696 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
697   if (jreader) {
698     jreader->buff_offset = 0;
699     jreader->json_offset = 0;
700   }
701 }
702
703 /* Release internal data allocated for JSON reader */
704 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
705   if (jreader) {
706     free(jreader->buff_ptr);
707     free(jreader);
708   }
709 }
710
711 /* Reconnect to OVS DB and call the OVS DB post connection init callback
712  * if connection has been established.
713  */
714 static void ovs_db_reconnect(ovs_db_t *pdb) {
715   const char *node_info = pdb->node;
716   struct addrinfo *result;
717
718   if (pdb->unix_path[0] != '\0') {
719     /* use UNIX socket instead of INET address */
720     node_info = pdb->unix_path;
721     result = calloc(1, sizeof(struct addrinfo));
722     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
723     if (result == NULL || sa_unix == NULL) {
724       sfree(result);
725       sfree(sa_unix);
726       return;
727     }
728     result->ai_family = AF_UNIX;
729     result->ai_socktype = SOCK_STREAM;
730     result->ai_addrlen = sizeof(*sa_unix);
731     result->ai_addr = (struct sockaddr *)sa_unix;
732     sa_unix->sun_family = result->ai_family;
733     sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
734   } else {
735     /* inet socket address */
736     struct addrinfo hints;
737
738     /* setup criteria for selecting the socket address */
739     memset(&hints, 0, sizeof(hints));
740     hints.ai_family = AF_UNSPEC;
741     hints.ai_socktype = SOCK_STREAM;
742
743     /* get socket addresses */
744     int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
745     if (ret != 0) {
746       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
747       return;
748     }
749   }
750   /* try to connect to the server */
751   for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
752     char errbuff[OVS_ERROR_BUFF_SIZE];
753     int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
754     if (sock < 0) {
755       sstrerror(errno, errbuff, sizeof(errbuff));
756       OVS_DEBUG("socket(): %s", errbuff);
757       continue;
758     }
759     if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
760       close(sock);
761       sstrerror(errno, errbuff, sizeof(errbuff));
762       OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
763     } else {
764       /* send notification to event thread */
765       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
766       pdb->sock = sock;
767       break;
768     }
769   }
770
771   if (pdb->sock < 0)
772     OVS_ERROR("connect to \"%s\" failed", node_info);
773
774   freeaddrinfo(result);
775 }
776
777 /* POLL worker thread.
778  * It listens on OVS DB connection for incoming
779  * requests/reply/events etc. Also, it reconnects to OVS DB
780  * if connection has been lost.
781  */
782 static void *ovs_poll_worker(void *arg) {
783   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
784   ovs_json_reader_t *jreader = NULL;
785   struct pollfd poll_fd = {
786       .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
787   };
788
789   /* create JSON reader instance */
790   if ((jreader = ovs_json_reader_alloc()) == NULL) {
791     OVS_ERROR("initialize json reader failed");
792     return NULL;
793   }
794
795   /* poll data */
796   while (ovs_db_poll_is_running(pdb)) {
797     char errbuff[OVS_ERROR_BUFF_SIZE];
798     poll_fd.fd = pdb->sock;
799     int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
800     if (poll_ret < 0) {
801       sstrerror(errno, errbuff, sizeof(errbuff));
802       OVS_ERROR("poll(): %s", errbuff);
803       break;
804     } else if (poll_ret == 0) {
805       OVS_DEBUG("poll(): timeout");
806       if (pdb->sock < 0)
807         /* invalid fd, so try to reconnect */
808         ovs_db_reconnect(pdb);
809       continue;
810     }
811     if (poll_fd.revents & POLLNVAL) {
812       /* invalid file descriptor, clean-up */
813       ovs_db_callback_remove_all(pdb);
814       ovs_json_reader_reset(jreader);
815       /* setting poll FD to -1 tells poll() call to ignore this FD.
816        * In that case poll() call will return timeout all the time */
817       pdb->sock = (-1);
818     } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
819       /* connection is broken */
820       close(poll_fd.fd);
821       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
822       OVS_ERROR("poll() peer closed its end of the channel");
823     } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
824       /* read incoming data */
825       char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
826       ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
827       if (nbytes < 0) {
828         sstrerror(errno, errbuff, sizeof(errbuff));
829         OVS_ERROR("recv(): %s", errbuff);
830         /* read error? Try to reconnect */
831         close(poll_fd.fd);
832         continue;
833       } else if (nbytes == 0) {
834         close(poll_fd.fd);
835         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
836         OVS_ERROR("recv() peer has performed an orderly shutdown");
837         continue;
838       }
839       /* read incoming data */
840       size_t json_len = 0;
841       const char *json = NULL;
842       OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
843       ovs_json_reader_push_data(jreader, buff, nbytes);
844       while (!ovs_json_reader_pop(jreader, &json, &json_len))
845         /* process JSON data */
846         ovs_db_json_data_process(pdb, json, json_len);
847     }
848   }
849
850   OVS_DEBUG("poll thread has been completed");
851   ovs_json_reader_free(jreader);
852   return NULL;
853 }
854
855 /* EVENT worker thread.
856  * Perform task based on incoming events. This
857  * task can be done asynchronously which allows to
858  * handle OVS DB callback like 'init_cb'.
859  */
860 static void *ovs_event_worker(void *arg) {
861   ovs_db_t *pdb = (ovs_db_t *)arg;
862
863   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
864     /* wait for an event */
865     struct timespec ts;
866     clock_gettime(CLOCK_REALTIME, &ts);
867     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
868     int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
869                                      &pdb->event_thread.mutex, &ts);
870     if (!ret) {
871       /* handle the event */
872       OVS_DEBUG("handle event %d", pdb->event_thread.value);
873       switch (pdb->event_thread.value) {
874       case OVS_DB_EVENT_CONN_ESTABLISHED:
875         if (pdb->cb.post_conn_init)
876           pdb->cb.post_conn_init(pdb);
877         break;
878       case OVS_DB_EVENT_CONN_TERMINATED:
879         if (pdb->cb.post_conn_terminate)
880           pdb->cb.post_conn_terminate();
881         break;
882       default:
883         OVS_DEBUG("unknown event received");
884         break;
885       }
886     } else if (ret == ETIMEDOUT) {
887       /* wait timeout */
888       OVS_DEBUG("no event received (timeout)");
889       continue;
890     } else {
891       /* unexpected error */
892       OVS_ERROR("pthread_cond_timedwait() failed");
893       break;
894     }
895   }
896
897   OVS_DEBUG("event thread has been completed");
898   return NULL;
899 }
900
901 /* Initialize EVENT thread */
902 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
903   pdb->event_thread.tid = (pthread_t)-1;
904   /* init event thread condition variable */
905   if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
906     return -1;
907   }
908   /* init event thread mutex */
909   if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
910     pthread_cond_destroy(&pdb->event_thread.cond);
911     return -1;
912   }
913   /* Hold the event thread mutex. It ensures that no events
914    * will be lost while thread is still starting. Once event
915    * thread is started and ready to accept events, it will release
916    * the mutex */
917   if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
918     pthread_mutex_destroy(&pdb->event_thread.mutex);
919     pthread_cond_destroy(&pdb->event_thread.cond);
920     return -1;
921   }
922   /* start event thread */
923   pthread_t tid;
924   if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
925                            "utils_ovs:event") != 0) {
926     pthread_mutex_unlock(&pdb->event_thread.mutex);
927     pthread_mutex_destroy(&pdb->event_thread.mutex);
928     pthread_cond_destroy(&pdb->event_thread.cond);
929     return -1;
930   }
931   pdb->event_thread.tid = tid;
932   return 0;
933 }
934
935 /* Destroy EVENT thread */
936 static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
937   if (pdb->event_thread.tid == (pthread_t)-1)
938     /* already destroyed */
939     return 0;
940   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
941   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
942     return -1;
943   /* Event thread always holds the thread mutex when
944    * performs some task (handles event) and releases it when
945    * while sleeping. Thus, if event thread exits, the mutex
946    * remains locked */
947   pthread_mutex_unlock(&pdb->event_thread.mutex);
948   pthread_mutex_destroy(&pdb->event_thread.mutex);
949   pthread_cond_destroy(&pdb->event_thread.cond);
950   pdb->event_thread.tid = (pthread_t)-1;
951   return 0;
952 }
953
954 /* Initialize POLL thread */
955 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
956   pdb->poll_thread.tid = (pthread_t)-1;
957   /* init event thread mutex */
958   if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
959     return -1;
960   }
961   /* start poll thread */
962   pthread_t tid;
963   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
964   if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
965                            "utils_ovs:poll") != 0) {
966     pthread_mutex_destroy(&pdb->poll_thread.mutex);
967     return -1;
968   }
969   pdb->poll_thread.tid = tid;
970   return 0;
971 }
972
973 /* Destroy POLL thread */
974 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
975   if (pdb->poll_thread.tid == (pthread_t)-1)
976     /* already destroyed */
977     return 0;
978   /* change thread state */
979   pthread_mutex_lock(&pdb->poll_thread.mutex);
980   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
981   pthread_mutex_unlock(&pdb->poll_thread.mutex);
982   /* join the thread */
983   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
984     return -1;
985   pthread_mutex_destroy(&pdb->poll_thread.mutex);
986   pdb->poll_thread.tid = (pthread_t)-1;
987   return 0;
988 }
989
990 /*
991  * Public OVS DB API implementation
992  */
993
994 ovs_db_t *ovs_db_init(const char *node, const char *service,
995                       const char *unix_path, ovs_db_callback_t *cb) {
996   /* sanity check */
997   if (node == NULL || service == NULL || unix_path == NULL)
998     return NULL;
999
1000   /* allocate db data & fill it */
1001   ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
1002   if (pdb == NULL)
1003     return NULL;
1004
1005   /* store the OVS DB address */
1006   sstrncpy(pdb->node, node, sizeof(pdb->node));
1007   sstrncpy(pdb->service, service, sizeof(pdb->service));
1008   sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1009
1010   /* setup OVS DB callbacks */
1011   if (cb)
1012     pdb->cb = *cb;
1013
1014   /* init OVS DB mutex attributes */
1015   pthread_mutexattr_t mutex_attr;
1016   if (pthread_mutexattr_init(&mutex_attr)) {
1017     OVS_ERROR("OVS DB mutex attribute init failed");
1018     sfree(pdb);
1019     return NULL;
1020   }
1021   /* set OVS DB mutex as recursive */
1022   if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1023     OVS_ERROR("Failed to set OVS DB mutex as recursive");
1024     pthread_mutexattr_destroy(&mutex_attr);
1025     sfree(pdb);
1026     return NULL;
1027   }
1028   /* init OVS DB mutex */
1029   if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1030     OVS_ERROR("OVS DB mutex init failed");
1031     pthread_mutexattr_destroy(&mutex_attr);
1032     sfree(pdb);
1033     return NULL;
1034   }
1035   /* destroy mutex attributes */
1036   pthread_mutexattr_destroy(&mutex_attr);
1037
1038   /* init event thread */
1039   if (ovs_db_event_thread_init(pdb) < 0) {
1040     ovs_db_destroy(pdb);
1041     return NULL;
1042   }
1043
1044   /* init polling thread */
1045   pdb->sock = -1;
1046   if (ovs_db_poll_thread_init(pdb) < 0) {
1047     ovs_db_destroy(pdb);
1048     return NULL;
1049   }
1050   return pdb;
1051 }
1052
1053 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1054                         ovs_db_result_cb_t cb) {
1055   int ret = 0;
1056   yajl_gen_status yajl_gen_ret;
1057   yajl_val jparams;
1058   yajl_gen jgen;
1059   ovs_callback_t *new_cb = NULL;
1060   uint64_t uid;
1061   char uid_buff[OVS_UID_STR_SIZE];
1062   const char *req = NULL;
1063   size_t req_len = 0;
1064   struct timespec ts;
1065
1066   /* sanity check */
1067   if (!pdb || !method || !params)
1068     return -1;
1069
1070   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1071     return -1;
1072
1073   /* try to parse params */
1074   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1075     OVS_ERROR("params is not a JSON string");
1076     yajl_gen_clear(jgen);
1077     return -1;
1078   }
1079
1080   /* generate method field */
1081   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1082
1083   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1084   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1085
1086   /* generate params field */
1087   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1088   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1089   yajl_tree_free(jparams);
1090
1091   /* generate id field */
1092   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1093   uid = ovs_uid_generate();
1094   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1095   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1096
1097   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1098
1099   if (cb) {
1100     /* register result callback */
1101     if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1102       goto yajl_gen_failure;
1103
1104     /* add new callback to front */
1105     sem_init(&new_cb->result.sync, 0, 0);
1106     new_cb->result.call = cb;
1107     new_cb->uid = uid;
1108     ovs_db_callback_add(pdb, new_cb);
1109   }
1110
1111   /* send the request */
1112   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1113   OVS_DEBUG("%s", req);
1114   if (!ovs_db_data_send(pdb, req, req_len)) {
1115     if (cb) {
1116       /* wait for result */
1117       clock_gettime(CLOCK_REALTIME, &ts);
1118       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1119       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1120         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1121                   OVS_DB_SEND_REQ_TIMEOUT);
1122         ret = (-1);
1123       }
1124     }
1125   } else {
1126     OVS_ERROR("ovs_db_data_send() failed");
1127     ret = (-1);
1128   }
1129
1130 yajl_gen_failure:
1131   if (new_cb) {
1132     /* destroy callback */
1133     sem_destroy(&new_cb->result.sync);
1134     ovs_db_callback_remove(pdb, new_cb);
1135   }
1136
1137   /* release memory */
1138   yajl_gen_clear(jgen);
1139   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1140 }
1141
1142 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1143                              const char **tb_column,
1144                              ovs_db_table_cb_t update_cb,
1145                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1146   yajl_gen jgen;
1147   yajl_gen_status yajl_gen_ret;
1148   ovs_callback_t *new_cb = NULL;
1149   char uid_str[OVS_UID_STR_SIZE];
1150   char *params;
1151   size_t params_len;
1152   int ovs_db_ret = 0;
1153
1154   /* sanity check */
1155   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1156     return -1;
1157
1158   /* allocate new update callback */
1159   if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1160     return -1;
1161
1162   /* init YAJL generator */
1163   if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1164     sfree(new_cb);
1165     return -1;
1166   }
1167
1168   /* add new callback to front */
1169   new_cb->table.call = update_cb;
1170   new_cb->uid = ovs_uid_generate();
1171   ovs_db_callback_add(pdb, new_cb);
1172
1173   /* make update notification request
1174    * [<db-name>, <json-value>, <monitor-requests>] */
1175   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1176   {
1177     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1178
1179     /* uid string <json-value> */
1180     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1181     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1182
1183     /* <monitor-requests> */
1184     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1185     {
1186       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1187       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1188       {
1189         /* <monitor-request> */
1190         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1191         {
1192           if (tb_column) {
1193             /* columns within the table to be monitored */
1194             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1195             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1196             for (; *tb_column; tb_column++)
1197               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1198             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1199           }
1200           /* specify select option */
1201           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1202           {
1203             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1204             {
1205               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1206               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1207                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1208               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1209               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1210                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1211               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1212               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1213                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1214               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1215               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1216                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1217             }
1218             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1219           }
1220         }
1221         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1222       }
1223       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1224     }
1225     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1226   }
1227   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1228
1229   /* make a request to subscribe to given table */
1230   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1231                 &params_len);
1232   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1233     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1234     ovs_db_ret = (-1);
1235   }
1236
1237 yajl_gen_failure:
1238   /* release memory */
1239   yajl_gen_clear(jgen);
1240   return ovs_db_ret;
1241 }
1242
1243 int ovs_db_destroy(ovs_db_t *pdb) {
1244   int ovs_db_ret = 0;
1245   int ret = 0;
1246
1247   /* sanity check */
1248   if (pdb == NULL)
1249     return -1;
1250
1251   /* try to lock the structure before releasing */
1252   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1253     OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1254     return -1;
1255   }
1256
1257   /* stop poll thread */
1258   if (ovs_db_event_thread_destroy(pdb) < 0) {
1259     OVS_ERROR("destroy poll thread failed");
1260     ovs_db_ret = (-1);
1261   }
1262
1263   /* stop event thread */
1264   if (ovs_db_poll_thread_destroy(pdb) < 0) {
1265     OVS_ERROR("stop event thread failed");
1266     ovs_db_ret = (-1);
1267   }
1268
1269   /* unsubscribe callbacks */
1270   ovs_db_callback_remove_all(pdb);
1271
1272   /* close connection */
1273   if (pdb->sock >= 0)
1274     close(pdb->sock);
1275
1276   /* release DB handler */
1277   pthread_mutex_unlock(&pdb->mutex);
1278   pthread_mutex_destroy(&pdb->mutex);
1279   sfree(pdb);
1280   return ovs_db_ret;
1281 }
1282
1283 /*
1284  * Public OVS utils API implementation
1285  */
1286
1287 /* Get YAJL value by key from YAJL dictionary
1288  *
1289  * EXAMPLE:
1290  *  {
1291  *    "key_a" : <YAJL return value>
1292  *    "key_b" : <YAJL return value>
1293  *  }
1294  */
1295 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1296   const char *obj_key = NULL;
1297
1298   /* check params */
1299   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1300     return NULL;
1301
1302   /* find a value by key */
1303   for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1304     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1305     if (strcmp(obj_key, key) == 0)
1306       return YAJL_GET_OBJECT(jval)->values[i];
1307   }
1308
1309   return NULL;
1310 }
1311
1312 /* Get OVS DB map value by given map key
1313  *
1314  * FROM RFC7047:
1315  *
1316  *   <pair>
1317  *     A 2-element JSON array that represents a pair within a database
1318  *     map.  The first element is an <atom> that represents the key, and
1319  *     the second element is an <atom> that represents the value.
1320  *
1321  *   <map>
1322  *     A 2-element JSON array that represents a database map value.  The
1323  *     first element of the array must be the string "map", and the
1324  *     second element must be an array of zero or more <pair>s giving the
1325  *     values in the map.  All of the <pair>s must have the same key and
1326  *     value types.
1327  *
1328  * EXAMPLE:
1329  *  [
1330  *    "map", [
1331  *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1332  *           ]
1333  *  ]
1334  */
1335 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1336   size_t map_len = 0;
1337   size_t array_len = 0;
1338   yajl_val *map_values = NULL;
1339   yajl_val *array_values = NULL;
1340   const char *str_val = NULL;
1341
1342   /* check YAJL array */
1343   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1344     return NULL;
1345
1346   /* check a database map value (2-element, first one should be a string */
1347   array_len = YAJL_GET_ARRAY(jval)->len;
1348   array_values = YAJL_GET_ARRAY(jval)->values;
1349   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1350       (!YAJL_IS_ARRAY(array_values[1])))
1351     return NULL;
1352
1353   /* check first element of the array */
1354   str_val = YAJL_GET_STRING(array_values[0]);
1355   if (strcmp("map", str_val) != 0)
1356     return NULL;
1357
1358   /* try to find map value by map key */
1359   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1360   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1361   for (size_t i = 0; i < map_len; i++) {
1362     /* check YAJL array */
1363     if (!YAJL_IS_ARRAY(map_values[i]))
1364       break;
1365
1366     /* check a database pair value (2-element, first one represents a key
1367      * and it should be a string in our case */
1368     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1369     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1370     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1371       break;
1372
1373     /* return map value if given key equals map key */
1374     str_val = YAJL_GET_STRING(array_values[0]);
1375     if (strcmp(key, str_val) == 0)
1376       return array_values[1];
1377   }
1378   return NULL;
1379 }