Merge branch 'collectd-5.8'
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  *of
8  * this software and associated documentation files (the "Software"), to deal in
9  * the Software without restriction, including without limitation the rights to
10  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
11  * of the Software, and to permit persons to whom the Software is furnished to
12  *do
13  * so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  *all
17  * copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  *
27  * Authors:
28  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
29  **/
30
31 /* clang-format off */
32 /*
33  *                         OVS DB API internal architecture diagram
34  * +------------------------------------------------------------------------------+
35  * |OVS plugin      |OVS utils                                                    |
36  * |                |     +------------------------+                              |
37  * |                |     |      echo handler      |                JSON request/ |
38  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
39  * |                |  |  |                        |    |         | result        |
40  * |                |  |  +------------------------+    |         |               |
41  * |                |  |                                |    +----+---+--------+  |
42  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
43  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
44  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
45  * |  +----------+  |  |  |                        |    |    |        |        |  |
46  * |                |  |  +------------------------+    |    +--------+---+----+  |
47  * |                |  |                                |                 ^       |
48  * |  +----------+  |  |  +------------------------+    |                 |       |
49  * |  |  result  |  |  |  |     result handler     |    |                 |       |
50  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
51  * |  +----------+  |  |  |                        |               data   |       |
52  * |                |  |  +------------------------+                      |       |
53  * |                |  |                                                  |       |
54  * |                |  |    +------------------+             +------------+----+  |
55  * |  +----------+  |  |    |thread|           |             |thread|          |  |
56  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
57  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
58  * |  +----------+  |  |    +------------------+             +--------+--------+  |
59  * |                |  |                                              ^           |
60  * +----------------+-------------------------------------------------------------+
61  *                     |                                              |
62  *                 JSON|echo reply                                 raw|data
63  *                     v                                              v
64  * +-------------------+----------------------------------------------+-----------+
65  * |                                 TCP/UNIX socket                              |
66  * +-------------------------------------------------------------------------------
67  */
68 /* clang-format on */
69
70 /* collectd headers */
71 #include "collectd.h"
72
73 #include "common.h"
74
75 /* private headers */
76 #include "utils_ovs.h"
77
78 /* system libraries */
79 #if HAVE_NETDB_H
80 #include <netdb.h>
81 #endif
82 #if HAVE_ARPA_INET_H
83 #include <arpa/inet.h>
84 #endif
85 #if HAVE_POLL_H
86 #include <poll.h>
87 #endif
88 #if HAVE_SYS_UN_H
89 #include <sys/un.h>
90 #endif
91
92 #include <semaphore.h>
93
94 #define OVS_ERROR(fmt, ...)                                                    \
95   do {                                                                         \
96     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
97   } while (0)
98 #define OVS_DEBUG(fmt, ...)                                                    \
99   do {                                                                         \
100     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
101           ##__VA_ARGS__);                                                      \
102   } while (0)
103
104 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
105 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
106 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
107
108 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
109 #define OVS_DB_EVENT_TERMINATE 1
110 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
111 #define OVS_DB_EVENT_CONN_TERMINATED 3
112
113 #define OVS_DB_POLL_STATE_RUNNING 1
114 #define OVS_DB_POLL_STATE_EXITING 2
115
116 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
117
118 #define OVS_YAJL_CALL(func, ...)                                               \
119   do {                                                                         \
120     yajl_gen_ret = yajl_gen_status_ok;                                         \
121     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
122       goto yajl_gen_failure;                                                   \
123   } while (0)
124 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
125 #define OVS_ERROR_BUFF_SIZE 512
126 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
127
128 /* JSON reader internal data */
129 struct ovs_json_reader_s {
130   char *buff_ptr;
131   size_t buff_size;
132   size_t buff_offset;
133   size_t json_offset;
134 };
135 typedef struct ovs_json_reader_s ovs_json_reader_t;
136
137 /* Result callback declaration */
138 struct ovs_result_cb_s {
139   sem_t sync;
140   ovs_db_result_cb_t call;
141 };
142 typedef struct ovs_result_cb_s ovs_result_cb_t;
143
144 /* Table callback declaration */
145 struct ovs_table_cb_s {
146   ovs_db_table_cb_t call;
147 };
148 typedef struct ovs_table_cb_s ovs_table_cb_t;
149
150 /* Callback declaration */
151 struct ovs_callback_s {
152   uint64_t uid;
153   union {
154     ovs_result_cb_t result;
155     ovs_table_cb_t table;
156   };
157   struct ovs_callback_s *next;
158   struct ovs_callback_s *prev;
159 };
160 typedef struct ovs_callback_s ovs_callback_t;
161
162 /* Event thread data declaration */
163 struct ovs_event_thread_s {
164   pthread_t tid;
165   pthread_mutex_t mutex;
166   pthread_cond_t cond;
167   int value;
168 };
169 typedef struct ovs_event_thread_s ovs_event_thread_t;
170
171 /* Poll thread data declaration */
172 struct ovs_poll_thread_s {
173   pthread_t tid;
174   pthread_mutex_t mutex;
175   int state;
176 };
177 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
178
179 /* OVS DB internal data declaration */
180 struct ovs_db_s {
181   ovs_poll_thread_t poll_thread;
182   ovs_event_thread_t event_thread;
183   pthread_mutex_t mutex;
184   ovs_callback_t *remote_cb;
185   ovs_db_callback_t cb;
186   char service[OVS_DB_ADDR_SERVICE_SIZE];
187   char node[OVS_DB_ADDR_NODE_SIZE];
188   char unix_path[OVS_DB_ADDR_NODE_SIZE];
189   int sock;
190 };
191
192 /* Global variables */
193 static uint64_t ovs_uid = 0;
194 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
195
196 /* Post an event to event thread.
197  * Possible events are:
198  *  OVS_DB_EVENT_TERMINATE
199  *  OVS_DB_EVENT_CONN_ESTABLISHED
200  *  OVS_DB_EVENT_CONN_TERMINATED
201  */
202 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
203   pthread_mutex_lock(&pdb->event_thread.mutex);
204   pdb->event_thread.value = event;
205   pthread_mutex_unlock(&pdb->event_thread.mutex);
206   pthread_cond_signal(&pdb->event_thread.cond);
207 }
208
209 /* Check if POLL thread is still running. Returns
210  * 1 if running otherwise 0 is returned */
211 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
212   int state = 0;
213   pthread_mutex_lock(&pdb->poll_thread.mutex);
214   state = pdb->poll_thread.state;
215   pthread_mutex_unlock(&pdb->poll_thread.mutex);
216   return state == OVS_DB_POLL_STATE_RUNNING;
217 }
218
219 /* Generate unique identifier (UID). It is used by OVS DB API
220  * to set "id" field for any OVS DB JSON request. */
221 static uint64_t ovs_uid_generate() {
222   uint64_t new_uid;
223   pthread_mutex_lock(&ovs_uid_mutex);
224   new_uid = ++ovs_uid;
225   pthread_mutex_unlock(&ovs_uid_mutex);
226   return new_uid;
227 }
228
229 /*
230  * Callback API. These function are used to store
231  * registered callbacks in OVS DB API.
232  */
233
234 /* Add new callback into OVS DB object */
235 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
236   pthread_mutex_lock(&pdb->mutex);
237   if (pdb->remote_cb)
238     pdb->remote_cb->prev = new_cb;
239   new_cb->next = pdb->remote_cb;
240   new_cb->prev = NULL;
241   pdb->remote_cb = new_cb;
242   pthread_mutex_unlock(&pdb->mutex);
243 }
244
245 /* Remove callback from OVS DB object */
246 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
247   pthread_mutex_lock(&pdb->mutex);
248   ovs_callback_t *pre_cb = del_cb->prev;
249   ovs_callback_t *next_cb = del_cb->next;
250
251   if (next_cb)
252     next_cb->prev = del_cb->prev;
253
254   if (pre_cb)
255     pre_cb->next = del_cb->next;
256   else
257     pdb->remote_cb = del_cb->next;
258
259   free(del_cb);
260   pthread_mutex_unlock(&pdb->mutex);
261 }
262
263 /* Remove all callbacks form OVS DB object */
264 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
265   pthread_mutex_lock(&pdb->mutex);
266   while (pdb->remote_cb != NULL) {
267     ovs_callback_t *del_cb = pdb->remote_cb;
268     pdb->remote_cb = del_cb->next;
269     sfree(del_cb);
270   }
271   pthread_mutex_unlock(&pdb->mutex);
272 }
273
274 /* Get/find callback in OVS DB object by UID. Returns pointer
275  * to requested callback otherwise NULL is returned.
276  *
277  * IMPORTANT NOTE:
278  *   The OVS DB mutex MUST be locked by the caller
279  *   to make sure that returned callback is still valid.
280  */
281 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
282   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
283     if (cb->uid == uid)
284       return cb;
285   return NULL;
286 }
287
288 /* Send all requested data to the socket. Returns 0 if
289  * ALL request data has been sent otherwise negative value
290  * is returned */
291 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
292   ssize_t nbytes = 0;
293   size_t rem = len;
294   size_t off = 0;
295
296   while (rem > 0) {
297     if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
298       return -1;
299     rem -= (size_t)nbytes;
300     off += (size_t)nbytes;
301   }
302   return 0;
303 }
304
305 /*
306  * YAJL (Yet Another JSON Library) helper functions
307  * Documentation (https://lloyd.github.io/yajl/)
308  */
309
310 /* Add null-terminated string into YAJL generator handle (JSON object).
311  * Similar function to yajl_gen_string() but takes null-terminated string
312  * instead of string and its length.
313  *
314  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
315  * string - Null-terminated string
316  */
317 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
318                                             const char *string) {
319   return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
320 }
321
322 /* Add YAJL value into YAJL generator handle (JSON object)
323  *
324  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
325  * jval - YAJL value usually returned by yajl_tree_get()
326  */
327 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
328   size_t array_len = 0;
329   yajl_val *jvalues = NULL;
330   yajl_val jobj_value = NULL;
331   const char *obj_key = NULL;
332   size_t obj_len = 0;
333   yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
334
335   if (jval == NULL)
336     return yajl_gen_generation_complete;
337
338   if (YAJL_IS_STRING(jval))
339     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
340   else if (YAJL_IS_DOUBLE(jval))
341     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
342   else if (YAJL_IS_INTEGER(jval))
343     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
344   else if (YAJL_IS_TRUE(jval))
345     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
346   else if (YAJL_IS_FALSE(jval))
347     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
348   else if (YAJL_IS_NULL(jval))
349     OVS_YAJL_CALL(yajl_gen_null, jgen);
350   else if (YAJL_IS_ARRAY(jval)) {
351     /* create new array and add all elements into the array */
352     array_len = YAJL_GET_ARRAY(jval)->len;
353     jvalues = YAJL_GET_ARRAY(jval)->values;
354     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
355     for (size_t i = 0; i < array_len; i++)
356       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
357     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
358   } else if (YAJL_IS_OBJECT(jval)) {
359     /* create new object and add all elements into the object */
360     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
361     obj_len = YAJL_GET_OBJECT(jval)->len;
362     for (size_t i = 0; i < obj_len; i++) {
363       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
364       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
365       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
366       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
367     }
368     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
369   } else {
370     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
371               (int)(jval)->type);
372     goto yajl_gen_failure;
373   }
374   return yajl_gen_status_ok;
375
376 yajl_gen_failure:
377   OVS_ERROR("%s() error to generate value", __FUNCTION__);
378   return yajl_gen_ret;
379 }
380
381 /* OVS DB echo request handler. When OVS DB sends
382  * "echo" request to the client, client should generate
383  * "echo" replay with the same content received in the
384  * request */
385 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
386   yajl_val jparams;
387   yajl_val jid;
388   yajl_gen jgen;
389   size_t resp_len = 0;
390   const char *resp = NULL;
391   const char *params_path[] = {"params", NULL};
392   const char *id_path[] = {"id", NULL};
393   yajl_gen_status yajl_gen_ret;
394
395   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
396     return -1;
397
398   /* check & get request attributes */
399   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
400       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
401     OVS_ERROR("parse echo request failed");
402     goto yajl_gen_failure;
403   }
404
405   /* generate JSON echo response */
406   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
407
408   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
409   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
410
411   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
412   OVS_YAJL_CALL(yajl_gen_null, jgen);
413
414   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
415   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
416
417   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
418   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
419                 &resp_len);
420
421   /* send the response */
422   OVS_DEBUG("response: %s", resp);
423   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
424     OVS_ERROR("send echo reply failed");
425     goto yajl_gen_failure;
426   }
427   /* clean up and return success */
428   yajl_gen_clear(jgen);
429   return 0;
430
431 yajl_gen_failure:
432   /* release memory */
433   yajl_gen_clear(jgen);
434   return -1;
435 }
436
437 /* Get OVS DB registered callback by YAJL val. The YAJL
438  * value should be YAJL string (UID). Returns NULL if
439  * callback hasn't been found. See also ovs_db_callback_get()
440  * description for addition info.
441  */
442 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
443   char *endptr = NULL;
444   const char *suid = NULL;
445   uint64_t uid;
446
447   if (jid && YAJL_IS_STRING(jid)) {
448     suid = YAJL_GET_STRING(jid);
449     uid = (uint64_t)strtoul(suid, &endptr, 16);
450     if (*endptr == '\0' && uid)
451       return ovs_db_callback_get(pdb, uid);
452   }
453
454   return NULL;
455 }
456
457 /* OVS DB table update event handler.
458  * This callback is called by POLL thread if OVS DB
459  * table update callback is received from the DB
460  * server. Once registered callback found, it's called
461  * by this handler. */
462 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
463   ovs_callback_t *cb = NULL;
464   yajl_val jvalue;
465   yajl_val jparams;
466   yajl_val jtable_updates;
467   const char *params_path[] = {"params", NULL};
468   const char *id_path[] = {"id", NULL};
469
470   /* check & get request attributes */
471   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
472       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
473     OVS_ERROR("invalid OVS DB request received");
474     return -1;
475   }
476
477   /* check array length: [<json-value>, <table-updates>] */
478   if ((YAJL_GET_ARRAY(jparams) == NULL) ||
479       (YAJL_GET_ARRAY(jparams)->len != 2)) {
480     OVS_ERROR("invalid OVS DB request received");
481     return -1;
482   }
483
484   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
485   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
486   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
487     OVS_ERROR("invalid OVS DB request id or table update received");
488     return -1;
489   }
490
491   /* find registered callback based on <json-value> */
492   pthread_mutex_lock(&pdb->mutex);
493   cb = ovs_db_table_callback_get(pdb, jvalue);
494   if (cb == NULL || cb->table.call == NULL) {
495     OVS_ERROR("No OVS DB table update callback found");
496     pthread_mutex_unlock(&pdb->mutex);
497     return -1;
498   }
499
500   /* call registered callback */
501   cb->table.call(jtable_updates);
502   pthread_mutex_unlock(&pdb->mutex);
503   return 0;
504 }
505
506 /* OVS DB result request handler.
507  * This callback is called by POLL thread if OVS DB
508  * result reply is received from the DB server.
509  * Once registered callback found, it's called
510  * by this handler. */
511 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
512   ovs_callback_t *cb = NULL;
513   yajl_val jresult;
514   yajl_val jerror;
515   yajl_val jid;
516   const char *result_path[] = {"result", NULL};
517   const char *error_path[] = {"error", NULL};
518   const char *id_path[] = {"id", NULL};
519
520   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
521   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
522   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
523
524   /* check & get result attributes */
525   if (!jresult || !jerror || !jid)
526     return -1;
527
528   /* try to find registered callback */
529   pthread_mutex_lock(&pdb->mutex);
530   cb = ovs_db_table_callback_get(pdb, jid);
531   if (cb != NULL && cb->result.call != NULL) {
532     /* call registered callback */
533     cb->result.call(jresult, jerror);
534     /* unlock owner of the reply */
535     sem_post(&cb->result.sync);
536   }
537
538   pthread_mutex_unlock(&pdb->mutex);
539   return 0;
540 }
541
542 /* Handle JSON data (one request) and call
543  * appropriate event OVS DB handler. Currently,
544  * update callback 'ovs_db_table_update_cb' and
545  * result callback 'ovs_db_result_cb' is supported.
546  */
547 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
548                                     size_t len) {
549   const char *method = NULL;
550   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
551   const char *method_path[] = {"method", NULL};
552   const char *result_path[] = {"result", NULL};
553   char *sjson = NULL;
554   yajl_val jnode, jval;
555
556   /* duplicate the data to make null-terminated string
557    * required for yajl_tree_parse() */
558   if ((sjson = calloc(1, len + 1)) == NULL)
559     return -1;
560
561   sstrncpy(sjson, data, len + 1);
562   OVS_DEBUG("[len=%zu] %s", len, sjson);
563
564   /* parse json data */
565   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
566   if (jnode == NULL) {
567     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
568     sfree(sjson);
569     return -1;
570   }
571
572   /* get method name */
573   if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
574     if ((method = YAJL_GET_STRING(jval)) == NULL) {
575       yajl_tree_free(jnode);
576       sfree(sjson);
577       return -1;
578     }
579     if (strcmp("echo", method) == 0) {
580       /* echo request from the server */
581       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
582         OVS_ERROR("handle echo request failed");
583     } else if (strcmp("update", method) == 0) {
584       /* update notification */
585       if (ovs_db_table_update_cb(pdb, jnode) < 0)
586         OVS_ERROR("handle update notification failed");
587     }
588   } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
589     /* result notification */
590     if (ovs_db_result_cb(pdb, jnode) < 0)
591       OVS_ERROR("handle result reply failed");
592   } else
593     OVS_ERROR("connot find method or result failed");
594
595   /* release memory */
596   yajl_tree_free(jnode);
597   sfree(sjson);
598   return 0;
599 }
600
601 /*
602  * JSON reader implementation.
603  *
604  * This module process raw JSON data (byte stream) and
605  * returns fully-fledged JSON data which can be processed
606  * (parsed) by YAJL later.
607  */
608
609 /* Allocate JSON reader instance */
610 static ovs_json_reader_t *ovs_json_reader_alloc() {
611   ovs_json_reader_t *jreader = NULL;
612
613   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
614     return NULL;
615
616   return jreader;
617 }
618
619 /* Push raw data into into the JSON reader for processing */
620 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
621                                      const char *data, size_t data_len) {
622   char *new_buff = NULL;
623   size_t available = jreader->buff_size - jreader->buff_offset;
624
625   /* check/update required memory space */
626   if (available < data_len) {
627     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
628               (int)jreader->buff_size, (int)available, (int)data_len);
629
630     /* allocate new chunk of memory */
631     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
632     if (new_buff == NULL)
633       return -1;
634
635     /* point to new allocated memory */
636     jreader->buff_ptr = new_buff;
637     jreader->buff_size += data_len;
638   }
639
640   /* store input data */
641   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
642   jreader->buff_offset += data_len;
643   return 0;
644 }
645
646 /* Pop one fully-fledged JSON if already exists. Returns 0 if
647  * completed JSON already exists otherwise negative value is
648  * returned */
649 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
650                                const char **json_ptr, size_t *json_len_ptr) {
651   size_t nbraces = 0;
652   size_t json_len = 0;
653   char *json = NULL;
654
655   /* search open/close brace */
656   for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
657     if (jreader->buff_ptr[i] == '{') {
658       nbraces++;
659     } else if (jreader->buff_ptr[i] == '}')
660       if (nbraces)
661         if (!(--nbraces)) {
662           /* JSON data */
663           *json_ptr = jreader->buff_ptr + jreader->json_offset;
664           *json_len_ptr = json_len + 1;
665           jreader->json_offset = i + 1;
666           return 0;
667         }
668
669     /* increase JSON data length */
670     if (nbraces)
671       json_len++;
672   }
673
674   if (jreader->json_offset) {
675     if (jreader->json_offset < jreader->buff_offset) {
676       /* shift data to the beginning of the buffer
677        * and zero rest of the buffer data */
678       json = &jreader->buff_ptr[jreader->json_offset];
679       json_len = jreader->buff_offset - jreader->json_offset;
680       for (size_t i = 0; i < jreader->buff_size; i++)
681         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
682       jreader->buff_offset = json_len;
683     } else
684       /* reset the buffer */
685       jreader->buff_offset = 0;
686
687     /* data is at the beginning of the buffer */
688     jreader->json_offset = 0;
689   }
690
691   return -1;
692 }
693
694 /* Reset JSON reader. It is useful when start processing
695  * new raw data. E.g.: in case of lost stream connection.
696  */
697 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
698   if (jreader) {
699     jreader->buff_offset = 0;
700     jreader->json_offset = 0;
701   }
702 }
703
704 /* Release internal data allocated for JSON reader */
705 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
706   if (jreader) {
707     free(jreader->buff_ptr);
708     free(jreader);
709   }
710 }
711
712 /* Reconnect to OVS DB and call the OVS DB post connection init callback
713  * if connection has been established.
714  */
715 static void ovs_db_reconnect(ovs_db_t *pdb) {
716   const char *node_info = pdb->node;
717   struct addrinfo *result;
718
719   if (pdb->unix_path[0] != '\0') {
720     /* use UNIX socket instead of INET address */
721     node_info = pdb->unix_path;
722     result = calloc(1, sizeof(struct addrinfo));
723     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
724     if (result == NULL || sa_unix == NULL) {
725       sfree(result);
726       sfree(sa_unix);
727       return;
728     }
729     result->ai_family = AF_UNIX;
730     result->ai_socktype = SOCK_STREAM;
731     result->ai_addrlen = sizeof(*sa_unix);
732     result->ai_addr = (struct sockaddr *)sa_unix;
733     sa_unix->sun_family = result->ai_family;
734     sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
735   } else {
736     /* inet socket address */
737     struct addrinfo hints;
738
739     /* setup criteria for selecting the socket address */
740     memset(&hints, 0, sizeof(hints));
741     hints.ai_family = AF_UNSPEC;
742     hints.ai_socktype = SOCK_STREAM;
743
744     /* get socket addresses */
745     int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
746     if (ret != 0) {
747       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
748       return;
749     }
750   }
751   /* try to connect to the server */
752   for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
753     int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
754     if (sock < 0) {
755       OVS_DEBUG("socket(): %s", STRERRNO);
756       continue;
757     }
758     if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
759       close(sock);
760       OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family);
761     } else {
762       /* send notification to event thread */
763       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
764       pdb->sock = sock;
765       break;
766     }
767   }
768
769   if (pdb->sock < 0)
770     OVS_ERROR("connect to \"%s\" failed", node_info);
771
772   freeaddrinfo(result);
773 }
774
775 /* POLL worker thread.
776  * It listens on OVS DB connection for incoming
777  * requests/reply/events etc. Also, it reconnects to OVS DB
778  * if connection has been lost.
779  */
780 static void *ovs_poll_worker(void *arg) {
781   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
782   ovs_json_reader_t *jreader = NULL;
783   struct pollfd poll_fd = {
784       .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
785   };
786
787   /* create JSON reader instance */
788   if ((jreader = ovs_json_reader_alloc()) == NULL) {
789     OVS_ERROR("initialize json reader failed");
790     return NULL;
791   }
792
793   /* poll data */
794   while (ovs_db_poll_is_running(pdb)) {
795     poll_fd.fd = pdb->sock;
796     int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
797     if (poll_ret < 0) {
798       OVS_ERROR("poll(): %s", STRERRNO);
799       break;
800     } else if (poll_ret == 0) {
801       OVS_DEBUG("poll(): timeout");
802       if (pdb->sock < 0)
803         /* invalid fd, so try to reconnect */
804         ovs_db_reconnect(pdb);
805       continue;
806     }
807     if (poll_fd.revents & POLLNVAL) {
808       /* invalid file descriptor, clean-up */
809       ovs_db_callback_remove_all(pdb);
810       ovs_json_reader_reset(jreader);
811       /* setting poll FD to -1 tells poll() call to ignore this FD.
812        * In that case poll() call will return timeout all the time */
813       pdb->sock = (-1);
814     } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
815       /* connection is broken */
816       close(poll_fd.fd);
817       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
818       OVS_ERROR("poll() peer closed its end of the channel");
819     } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
820       /* read incoming data */
821       char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
822       ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
823       if (nbytes < 0) {
824         OVS_ERROR("recv(): %s", STRERRNO);
825         /* read error? Try to reconnect */
826         close(poll_fd.fd);
827         continue;
828       } else if (nbytes == 0) {
829         close(poll_fd.fd);
830         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
831         OVS_ERROR("recv() peer has performed an orderly shutdown");
832         continue;
833       }
834       /* read incoming data */
835       size_t json_len = 0;
836       const char *json = NULL;
837       OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
838       ovs_json_reader_push_data(jreader, buff, nbytes);
839       while (!ovs_json_reader_pop(jreader, &json, &json_len))
840         /* process JSON data */
841         ovs_db_json_data_process(pdb, json, json_len);
842     }
843   }
844
845   OVS_DEBUG("poll thread has been completed");
846   ovs_json_reader_free(jreader);
847   return NULL;
848 }
849
850 /* EVENT worker thread.
851  * Perform task based on incoming events. This
852  * task can be done asynchronously which allows to
853  * handle OVS DB callback like 'init_cb'.
854  */
855 static void *ovs_event_worker(void *arg) {
856   ovs_db_t *pdb = (ovs_db_t *)arg;
857
858   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
859     /* wait for an event */
860     struct timespec ts;
861     clock_gettime(CLOCK_REALTIME, &ts);
862     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
863     int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
864                                      &pdb->event_thread.mutex, &ts);
865     if (!ret) {
866       /* handle the event */
867       OVS_DEBUG("handle event %d", pdb->event_thread.value);
868       switch (pdb->event_thread.value) {
869       case OVS_DB_EVENT_CONN_ESTABLISHED:
870         if (pdb->cb.post_conn_init)
871           pdb->cb.post_conn_init(pdb);
872         break;
873       case OVS_DB_EVENT_CONN_TERMINATED:
874         if (pdb->cb.post_conn_terminate)
875           pdb->cb.post_conn_terminate();
876         break;
877       default:
878         OVS_DEBUG("unknown event received");
879         break;
880       }
881     } else if (ret == ETIMEDOUT) {
882       /* wait timeout */
883       OVS_DEBUG("no event received (timeout)");
884       continue;
885     } else {
886       /* unexpected error */
887       OVS_ERROR("pthread_cond_timedwait() failed");
888       break;
889     }
890   }
891
892   OVS_DEBUG("event thread has been completed");
893   return NULL;
894 }
895
896 /* Initialize EVENT thread */
897 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
898   pdb->event_thread.tid = (pthread_t){0};
899   /* init event thread condition variable */
900   if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
901     return -1;
902   }
903   /* init event thread mutex */
904   if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
905     pthread_cond_destroy(&pdb->event_thread.cond);
906     return -1;
907   }
908   /* Hold the event thread mutex. It ensures that no events
909    * will be lost while thread is still starting. Once event
910    * thread is started and ready to accept events, it will release
911    * the mutex */
912   if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
913     pthread_mutex_destroy(&pdb->event_thread.mutex);
914     pthread_cond_destroy(&pdb->event_thread.cond);
915     return -1;
916   }
917   /* start event thread */
918   pthread_t tid;
919   if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
920                            "utils_ovs:event") != 0) {
921     pthread_mutex_unlock(&pdb->event_thread.mutex);
922     pthread_mutex_destroy(&pdb->event_thread.mutex);
923     pthread_cond_destroy(&pdb->event_thread.cond);
924     return -1;
925   }
926   pdb->event_thread.tid = tid;
927   return 0;
928 }
929
930 /* Destroy EVENT thread */
931 /* XXX: Must hold pdb->mutex when calling! */
932 static int ovs_db_event_thread_destroy(ovs_db_t *pdb) {
933   if (pthread_equal(pdb->event_thread.tid, (pthread_t){0})) {
934     /* already destroyed */
935     return 0;
936   }
937   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
938   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
939     return -1;
940   /* Event thread always holds the thread mutex when
941    * performs some task (handles event) and releases it when
942    * while sleeping. Thus, if event thread exits, the mutex
943    * remains locked */
944   pthread_mutex_unlock(&pdb->event_thread.mutex);
945   pthread_mutex_destroy(&pdb->event_thread.mutex);
946   pthread_cond_destroy(&pdb->event_thread.cond);
947   pdb->event_thread.tid = (pthread_t){0};
948   return 0;
949 }
950
951 /* Initialize POLL thread */
952 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
953   pdb->poll_thread.tid = (pthread_t){0};
954   /* init event thread mutex */
955   if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
956     return -1;
957   }
958   /* start poll thread */
959   pthread_t tid;
960   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
961   if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
962                            "utils_ovs:poll") != 0) {
963     pthread_mutex_destroy(&pdb->poll_thread.mutex);
964     return -1;
965   }
966   pdb->poll_thread.tid = tid;
967   return 0;
968 }
969
970 /* Destroy POLL thread */
971 /* XXX: Must hold pdb->mutex when calling! */
972 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
973   if (pthread_equal(pdb->poll_thread.tid, (pthread_t){0})) {
974     /* already destroyed */
975     return 0;
976   }
977   /* change thread state */
978   pthread_mutex_lock(&pdb->poll_thread.mutex);
979   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
980   pthread_mutex_unlock(&pdb->poll_thread.mutex);
981   /* join the thread */
982   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
983     return -1;
984   pthread_mutex_destroy(&pdb->poll_thread.mutex);
985   pdb->poll_thread.tid = (pthread_t){0};
986   return 0;
987 }
988
989 /*
990  * Public OVS DB API implementation
991  */
992
993 ovs_db_t *ovs_db_init(const char *node, const char *service,
994                       const char *unix_path, ovs_db_callback_t *cb) {
995   /* sanity check */
996   if (node == NULL || service == NULL || unix_path == NULL)
997     return NULL;
998
999   /* allocate db data & fill it */
1000   ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb));
1001   if (pdb == NULL)
1002     return NULL;
1003
1004   /* store the OVS DB address */
1005   sstrncpy(pdb->node, node, sizeof(pdb->node));
1006   sstrncpy(pdb->service, service, sizeof(pdb->service));
1007   sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1008
1009   /* setup OVS DB callbacks */
1010   if (cb)
1011     pdb->cb = *cb;
1012
1013   /* init OVS DB mutex attributes */
1014   pthread_mutexattr_t mutex_attr;
1015   if (pthread_mutexattr_init(&mutex_attr)) {
1016     OVS_ERROR("OVS DB mutex attribute init failed");
1017     sfree(pdb);
1018     return NULL;
1019   }
1020   /* set OVS DB mutex as recursive */
1021   if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1022     OVS_ERROR("Failed to set OVS DB mutex as recursive");
1023     pthread_mutexattr_destroy(&mutex_attr);
1024     sfree(pdb);
1025     return NULL;
1026   }
1027   /* init OVS DB mutex */
1028   if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1029     OVS_ERROR("OVS DB mutex init failed");
1030     pthread_mutexattr_destroy(&mutex_attr);
1031     sfree(pdb);
1032     return NULL;
1033   }
1034   /* destroy mutex attributes */
1035   pthread_mutexattr_destroy(&mutex_attr);
1036
1037   /* init event thread */
1038   if (ovs_db_event_thread_init(pdb) < 0) {
1039     ovs_db_destroy(pdb);
1040     return NULL;
1041   }
1042
1043   /* init polling thread */
1044   pdb->sock = -1;
1045   if (ovs_db_poll_thread_init(pdb) < 0) {
1046     ovs_db_destroy(pdb);
1047     return NULL;
1048   }
1049   return pdb;
1050 }
1051
1052 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1053                         ovs_db_result_cb_t cb) {
1054   int ret = 0;
1055   yajl_gen_status yajl_gen_ret;
1056   yajl_val jparams;
1057   yajl_gen jgen;
1058   ovs_callback_t *new_cb = NULL;
1059   uint64_t uid;
1060   char uid_buff[OVS_UID_STR_SIZE];
1061   const char *req = NULL;
1062   size_t req_len = 0;
1063   struct timespec ts;
1064
1065   /* sanity check */
1066   if (!pdb || !method || !params)
1067     return -1;
1068
1069   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1070     return -1;
1071
1072   /* try to parse params */
1073   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1074     OVS_ERROR("params is not a JSON string");
1075     yajl_gen_clear(jgen);
1076     return -1;
1077   }
1078
1079   /* generate method field */
1080   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1081
1082   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1083   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1084
1085   /* generate params field */
1086   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1087   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1088   yajl_tree_free(jparams);
1089
1090   /* generate id field */
1091   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1092   uid = ovs_uid_generate();
1093   snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1094   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1095
1096   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1097
1098   if (cb) {
1099     /* register result callback */
1100     if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1101       goto yajl_gen_failure;
1102
1103     /* add new callback to front */
1104     sem_init(&new_cb->result.sync, 0, 0);
1105     new_cb->result.call = cb;
1106     new_cb->uid = uid;
1107     ovs_db_callback_add(pdb, new_cb);
1108   }
1109
1110   /* send the request */
1111   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1112   OVS_DEBUG("%s", req);
1113   if (!ovs_db_data_send(pdb, req, req_len)) {
1114     if (cb) {
1115       /* wait for result */
1116       clock_gettime(CLOCK_REALTIME, &ts);
1117       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1118       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1119         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1120                   OVS_DB_SEND_REQ_TIMEOUT);
1121         ret = (-1);
1122       }
1123     }
1124   } else {
1125     OVS_ERROR("ovs_db_data_send() failed");
1126     ret = (-1);
1127   }
1128
1129 yajl_gen_failure:
1130   if (new_cb) {
1131     /* destroy callback */
1132     sem_destroy(&new_cb->result.sync);
1133     ovs_db_callback_remove(pdb, new_cb);
1134   }
1135
1136   /* release memory */
1137   yajl_gen_clear(jgen);
1138   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1139 }
1140
1141 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1142                              const char **tb_column,
1143                              ovs_db_table_cb_t update_cb,
1144                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1145   yajl_gen jgen;
1146   yajl_gen_status yajl_gen_ret;
1147   ovs_callback_t *new_cb = NULL;
1148   char uid_str[OVS_UID_STR_SIZE];
1149   char *params;
1150   size_t params_len;
1151   int ovs_db_ret = 0;
1152
1153   /* sanity check */
1154   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1155     return -1;
1156
1157   /* allocate new update callback */
1158   if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1159     return -1;
1160
1161   /* init YAJL generator */
1162   if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1163     sfree(new_cb);
1164     return -1;
1165   }
1166
1167   /* add new callback to front */
1168   new_cb->table.call = update_cb;
1169   new_cb->uid = ovs_uid_generate();
1170   ovs_db_callback_add(pdb, new_cb);
1171
1172   /* make update notification request
1173    * [<db-name>, <json-value>, <monitor-requests>] */
1174   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1175   {
1176     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1177
1178     /* uid string <json-value> */
1179     snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1180     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1181
1182     /* <monitor-requests> */
1183     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1184     {
1185       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1186       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1187       {
1188         /* <monitor-request> */
1189         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1190         {
1191           if (tb_column) {
1192             /* columns within the table to be monitored */
1193             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1194             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1195             for (; *tb_column; tb_column++)
1196               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1197             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1198           }
1199           /* specify select option */
1200           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1201           {
1202             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1203             {
1204               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1205               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1206                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1207               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1208               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1209                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1210               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1211               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1212                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1213               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1214               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1215                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1216             }
1217             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1218           }
1219         }
1220         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1221       }
1222       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1223     }
1224     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1225   }
1226   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1227
1228   /* make a request to subscribe to given table */
1229   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1230                 &params_len);
1231   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1232     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1233     ovs_db_ret = (-1);
1234   }
1235
1236 yajl_gen_failure:
1237   /* release memory */
1238   yajl_gen_clear(jgen);
1239   return ovs_db_ret;
1240 }
1241
1242 int ovs_db_destroy(ovs_db_t *pdb) {
1243   int ovs_db_ret = 0;
1244   int ret = 0;
1245
1246   /* sanity check */
1247   if (pdb == NULL)
1248     return -1;
1249
1250   /* try to lock the structure before releasing */
1251   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1252     OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1253     return -1;
1254   }
1255
1256   /* stop poll thread */
1257   if (ovs_db_event_thread_destroy(pdb) < 0) {
1258     OVS_ERROR("destroy poll thread failed");
1259     ovs_db_ret = -1;
1260   }
1261
1262   /* stop event thread */
1263   if (ovs_db_poll_thread_destroy(pdb) < 0) {
1264     OVS_ERROR("stop event thread failed");
1265     ovs_db_ret = -1;
1266   }
1267
1268   pthread_mutex_unlock(&pdb->mutex);
1269
1270   /* unsubscribe callbacks */
1271   ovs_db_callback_remove_all(pdb);
1272
1273   /* close connection */
1274   if (pdb->sock >= 0)
1275     close(pdb->sock);
1276
1277   /* release DB handler */
1278   pthread_mutex_destroy(&pdb->mutex);
1279   sfree(pdb);
1280   return ovs_db_ret;
1281 }
1282
1283 /*
1284  * Public OVS utils API implementation
1285  */
1286
1287 /* Get YAJL value by key from YAJL dictionary
1288  *
1289  * EXAMPLE:
1290  *  {
1291  *    "key_a" : <YAJL return value>
1292  *    "key_b" : <YAJL return value>
1293  *  }
1294  */
1295 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1296   const char *obj_key = NULL;
1297
1298   /* check params */
1299   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1300     return NULL;
1301
1302   /* find a value by key */
1303   for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1304     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1305     if (strcmp(obj_key, key) == 0)
1306       return YAJL_GET_OBJECT(jval)->values[i];
1307   }
1308
1309   return NULL;
1310 }
1311
1312 /* Get OVS DB map value by given map key
1313  *
1314  * FROM RFC7047:
1315  *
1316  *   <pair>
1317  *     A 2-element JSON array that represents a pair within a database
1318  *     map.  The first element is an <atom> that represents the key, and
1319  *     the second element is an <atom> that represents the value.
1320  *
1321  *   <map>
1322  *     A 2-element JSON array that represents a database map value.  The
1323  *     first element of the array must be the string "map", and the
1324  *     second element must be an array of zero or more <pair>s giving the
1325  *     values in the map.  All of the <pair>s must have the same key and
1326  *     value types.
1327  *
1328  * EXAMPLE:
1329  *  [
1330  *    "map", [
1331  *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1332  *           ]
1333  *  ]
1334  */
1335 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1336   size_t map_len = 0;
1337   size_t array_len = 0;
1338   yajl_val *map_values = NULL;
1339   yajl_val *array_values = NULL;
1340   const char *str_val = NULL;
1341
1342   /* check YAJL array */
1343   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1344     return NULL;
1345
1346   /* check a database map value (2-element, first one should be a string */
1347   array_len = YAJL_GET_ARRAY(jval)->len;
1348   array_values = YAJL_GET_ARRAY(jval)->values;
1349   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1350       (!YAJL_IS_ARRAY(array_values[1])))
1351     return NULL;
1352
1353   /* check first element of the array */
1354   str_val = YAJL_GET_STRING(array_values[0]);
1355   if (strcmp("map", str_val) != 0)
1356     return NULL;
1357
1358   /* try to find map value by map key */
1359   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1360   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1361   for (size_t i = 0; i < map_len; i++) {
1362     /* check YAJL array */
1363     if (!YAJL_IS_ARRAY(map_values[i]))
1364       break;
1365
1366     /* check a database pair value (2-element, first one represents a key
1367      * and it should be a string in our case */
1368     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1369     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1370     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1371       break;
1372
1373     /* return map value if given key equals map key */
1374     str_val = YAJL_GET_STRING(array_values[0]);
1375     if (strcmp(key, str_val) == 0)
1376       return array_values[1];
1377   }
1378   return NULL;
1379 }