Merge pull request #2618 from ajssmith/amqp1_dev1_branch
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  *of
8  * this software and associated documentation files (the "Software"), to deal in
9  * the Software without restriction, including without limitation the rights to
10  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
11  * of the Software, and to permit persons to whom the Software is furnished to
12  *do
13  * so, subject to the following conditions:
14  *
15  * The above copyright notice and this permission notice shall be included in
16  *all
17  * copies or substantial portions of the Software.
18  *
19  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25  * SOFTWARE.
26  *
27  * Authors:
28  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
29  **/
30
31 /* clang-format off */
32 /*
33  *                         OVS DB API internal architecture diagram
34  * +------------------------------------------------------------------------------+
35  * |OVS plugin      |OVS utils                                                    |
36  * |                |     +------------------------+                              |
37  * |                |     |      echo handler      |                JSON request/ |
38  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
39  * |                |  |  |                        |    |         | result        |
40  * |                |  |  +------------------------+    |         |               |
41  * |                |  |                                |    +----+---+--------+  |
42  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
43  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
44  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
45  * |  +----------+  |  |  |                        |    |    |        |        |  |
46  * |                |  |  +------------------------+    |    +--------+---+----+  |
47  * |                |  |                                |                 ^       |
48  * |  +----------+  |  |  +------------------------+    |                 |       |
49  * |  |  result  |  |  |  |     result handler     |    |                 |       |
50  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
51  * |  +----------+  |  |  |                        |               data   |       |
52  * |                |  |  +------------------------+                      |       |
53  * |                |  |                                                  |       |
54  * |                |  |    +------------------+             +------------+----+  |
55  * |  +----------+  |  |    |thread|           |             |thread|          |  |
56  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
57  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
58  * |  +----------+  |  |    +------------------+             +--------+--------+  |
59  * |                |  |                                              ^           |
60  * +----------------+-------------------------------------------------------------+
61  *                     |                                              |
62  *                 JSON|echo reply                                 raw|data
63  *                     v                                              v
64  * +-------------------+----------------------------------------------+-----------+
65  * |                                 TCP/UNIX socket                              |
66  * +-------------------------------------------------------------------------------
67  */
68 /* clang-format on */
69
70 /* collectd headers */
71 #include "collectd.h"
72
73 #include "common.h"
74
75 /* private headers */
76 #include "utils_ovs.h"
77
78 /* system libraries */
79 #if HAVE_NETDB_H
80 #include <netdb.h>
81 #endif
82 #if HAVE_ARPA_INET_H
83 #include <arpa/inet.h>
84 #endif
85 #if HAVE_POLL_H
86 #include <poll.h>
87 #endif
88 #if HAVE_SYS_UN_H
89 #include <sys/un.h>
90 #endif
91
92 #include <semaphore.h>
93
94 #define OVS_ERROR(fmt, ...)                                                    \
95   do {                                                                         \
96     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
97   } while (0)
98 #define OVS_DEBUG(fmt, ...)                                                    \
99   do {                                                                         \
100     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
101           ##__VA_ARGS__);                                                      \
102   } while (0)
103
104 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
105 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
106 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
107
108 #define OVS_DB_EVENT_NONE 0
109 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
110 #define OVS_DB_EVENT_TERMINATE 1
111 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
112 #define OVS_DB_EVENT_CONN_TERMINATED 3
113
114 #define OVS_DB_POLL_STATE_RUNNING 1
115 #define OVS_DB_POLL_STATE_EXITING 2
116
117 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
118
119 #define OVS_YAJL_CALL(func, ...)                                               \
120   do {                                                                         \
121     yajl_gen_ret = yajl_gen_status_ok;                                         \
122     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
123       goto yajl_gen_failure;                                                   \
124   } while (0)
125 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
126 #define OVS_ERROR_BUFF_SIZE 512
127 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
128
129 /* JSON reader internal data */
130 struct ovs_json_reader_s {
131   char *buff_ptr;
132   size_t buff_size;
133   size_t buff_offset;
134   size_t json_offset;
135 };
136 typedef struct ovs_json_reader_s ovs_json_reader_t;
137
138 /* Result callback declaration */
139 struct ovs_result_cb_s {
140   sem_t sync;
141   ovs_db_result_cb_t call;
142 };
143 typedef struct ovs_result_cb_s ovs_result_cb_t;
144
145 /* Table callback declaration */
146 struct ovs_table_cb_s {
147   ovs_db_table_cb_t call;
148 };
149 typedef struct ovs_table_cb_s ovs_table_cb_t;
150
151 /* Callback declaration */
152 struct ovs_callback_s {
153   uint64_t uid;
154   union {
155     ovs_result_cb_t result;
156     ovs_table_cb_t table;
157   };
158   struct ovs_callback_s *next;
159   struct ovs_callback_s *prev;
160 };
161 typedef struct ovs_callback_s ovs_callback_t;
162
163 /* Event thread data declaration */
164 struct ovs_event_thread_s {
165   pthread_t tid;
166   pthread_mutex_t mutex;
167   pthread_cond_t cond;
168   int value;
169 };
170 typedef struct ovs_event_thread_s ovs_event_thread_t;
171
172 /* Poll thread data declaration */
173 struct ovs_poll_thread_s {
174   pthread_t tid;
175   pthread_mutex_t mutex;
176   int state;
177 };
178 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
179
180 /* OVS DB internal data declaration */
181 struct ovs_db_s {
182   ovs_poll_thread_t poll_thread;
183   ovs_event_thread_t event_thread;
184   pthread_mutex_t mutex;
185   ovs_callback_t *remote_cb;
186   ovs_db_callback_t cb;
187   char service[OVS_DB_ADDR_SERVICE_SIZE];
188   char node[OVS_DB_ADDR_NODE_SIZE];
189   char unix_path[OVS_DB_ADDR_NODE_SIZE];
190   int sock;
191 };
192
193 /* Global variables */
194 static uint64_t ovs_uid;
195 static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER;
196
197 /* Post an event to event thread.
198  * Possible events are:
199  *  OVS_DB_EVENT_TERMINATE
200  *  OVS_DB_EVENT_CONN_ESTABLISHED
201  *  OVS_DB_EVENT_CONN_TERMINATED
202  */
203 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
204   pthread_mutex_lock(&pdb->event_thread.mutex);
205   pdb->event_thread.value = event;
206   pthread_mutex_unlock(&pdb->event_thread.mutex);
207   pthread_cond_signal(&pdb->event_thread.cond);
208 }
209
210 /* Check if POLL thread is still running. Returns
211  * 1 if running otherwise 0 is returned */
212 static bool ovs_db_poll_is_running(ovs_db_t *pdb) {
213   int state = 0;
214   pthread_mutex_lock(&pdb->poll_thread.mutex);
215   state = pdb->poll_thread.state;
216   pthread_mutex_unlock(&pdb->poll_thread.mutex);
217   return state == OVS_DB_POLL_STATE_RUNNING;
218 }
219
220 /* Generate unique identifier (UID). It is used by OVS DB API
221  * to set "id" field for any OVS DB JSON request. */
222 static uint64_t ovs_uid_generate() {
223   uint64_t new_uid;
224   pthread_mutex_lock(&ovs_uid_mutex);
225   new_uid = ++ovs_uid;
226   pthread_mutex_unlock(&ovs_uid_mutex);
227   return new_uid;
228 }
229
230 /*
231  * Callback API. These function are used to store
232  * registered callbacks in OVS DB API.
233  */
234
235 /* Add new callback into OVS DB object */
236 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
237   pthread_mutex_lock(&pdb->mutex);
238   if (pdb->remote_cb)
239     pdb->remote_cb->prev = new_cb;
240   new_cb->next = pdb->remote_cb;
241   new_cb->prev = NULL;
242   pdb->remote_cb = new_cb;
243   pthread_mutex_unlock(&pdb->mutex);
244 }
245
246 /* Remove callback from OVS DB object */
247 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
248   pthread_mutex_lock(&pdb->mutex);
249   ovs_callback_t *pre_cb = del_cb->prev;
250   ovs_callback_t *next_cb = del_cb->next;
251
252   if (next_cb)
253     next_cb->prev = del_cb->prev;
254
255   if (pre_cb)
256     pre_cb->next = del_cb->next;
257   else
258     pdb->remote_cb = del_cb->next;
259
260   free(del_cb);
261   pthread_mutex_unlock(&pdb->mutex);
262 }
263
264 /* Remove all callbacks form OVS DB object */
265 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
266   pthread_mutex_lock(&pdb->mutex);
267   while (pdb->remote_cb != NULL) {
268     ovs_callback_t *del_cb = pdb->remote_cb;
269     pdb->remote_cb = del_cb->next;
270     sfree(del_cb);
271   }
272   pthread_mutex_unlock(&pdb->mutex);
273 }
274
275 /* Get/find callback in OVS DB object by UID. Returns pointer
276  * to requested callback otherwise NULL is returned.
277  *
278  * IMPORTANT NOTE:
279  *   The OVS DB mutex MUST be locked by the caller
280  *   to make sure that returned callback is still valid.
281  */
282 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
283   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
284     if (cb->uid == uid)
285       return cb;
286   return NULL;
287 }
288
289 /* Send all requested data to the socket. Returns 0 if
290  * ALL request data has been sent otherwise negative value
291  * is returned */
292 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
293   ssize_t nbytes = 0;
294   size_t rem = len;
295   size_t off = 0;
296
297   while (rem > 0) {
298     if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
299       return -1;
300     rem -= (size_t)nbytes;
301     off += (size_t)nbytes;
302   }
303   return 0;
304 }
305
306 /*
307  * YAJL (Yet Another JSON Library) helper functions
308  * Documentation (https://lloyd.github.io/yajl/)
309  */
310
311 /* Add null-terminated string into YAJL generator handle (JSON object).
312  * Similar function to yajl_gen_string() but takes null-terminated string
313  * instead of string and its length.
314  *
315  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
316  * string - Null-terminated string
317  */
318 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
319                                             const char *string) {
320   return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
321 }
322
323 /* Add YAJL value into YAJL generator handle (JSON object)
324  *
325  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
326  * jval - YAJL value usually returned by yajl_tree_get()
327  */
328 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
329   size_t array_len = 0;
330   yajl_val *jvalues = NULL;
331   yajl_val jobj_value = NULL;
332   const char *obj_key = NULL;
333   size_t obj_len = 0;
334   yajl_gen_status yajl_gen_ret = yajl_gen_status_ok;
335
336   if (jval == NULL)
337     return yajl_gen_generation_complete;
338
339   if (YAJL_IS_STRING(jval))
340     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
341   else if (YAJL_IS_DOUBLE(jval))
342     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
343   else if (YAJL_IS_INTEGER(jval))
344     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
345   else if (YAJL_IS_TRUE(jval))
346     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
347   else if (YAJL_IS_FALSE(jval))
348     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
349   else if (YAJL_IS_NULL(jval))
350     OVS_YAJL_CALL(yajl_gen_null, jgen);
351   else if (YAJL_IS_ARRAY(jval)) {
352     /* create new array and add all elements into the array */
353     array_len = YAJL_GET_ARRAY(jval)->len;
354     jvalues = YAJL_GET_ARRAY(jval)->values;
355     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
356     for (size_t i = 0; i < array_len; i++)
357       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
358     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
359   } else if (YAJL_IS_OBJECT(jval)) {
360     /* create new object and add all elements into the object */
361     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
362     obj_len = YAJL_GET_OBJECT(jval)->len;
363     for (size_t i = 0; i < obj_len; i++) {
364       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
365       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
366       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
367       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
368     }
369     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
370   } else {
371     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
372               (int)(jval)->type);
373     goto yajl_gen_failure;
374   }
375   return yajl_gen_status_ok;
376
377 yajl_gen_failure:
378   OVS_ERROR("%s() error to generate value", __FUNCTION__);
379   return yajl_gen_ret;
380 }
381
382 /* OVS DB echo request handler. When OVS DB sends
383  * "echo" request to the client, client should generate
384  * "echo" replay with the same content received in the
385  * request */
386 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
387   yajl_val jparams;
388   yajl_val jid;
389   yajl_gen jgen;
390   size_t resp_len = 0;
391   const char *resp = NULL;
392   const char *params_path[] = {"params", NULL};
393   const char *id_path[] = {"id", NULL};
394   yajl_gen_status yajl_gen_ret;
395
396   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
397     return -1;
398
399   /* check & get request attributes */
400   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
401       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
402     OVS_ERROR("parse echo request failed");
403     goto yajl_gen_failure;
404   }
405
406   /* generate JSON echo response */
407   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
408
409   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
410   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
411
412   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
413   OVS_YAJL_CALL(yajl_gen_null, jgen);
414
415   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
416   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
417
418   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
419   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
420                 &resp_len);
421
422   /* send the response */
423   OVS_DEBUG("response: %s", resp);
424   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
425     OVS_ERROR("send echo reply failed");
426     goto yajl_gen_failure;
427   }
428   /* clean up and return success */
429   yajl_gen_clear(jgen);
430   return 0;
431
432 yajl_gen_failure:
433   /* release memory */
434   yajl_gen_clear(jgen);
435   return -1;
436 }
437
438 /* Get OVS DB registered callback by YAJL val. The YAJL
439  * value should be YAJL string (UID). Returns NULL if
440  * callback hasn't been found. See also ovs_db_callback_get()
441  * description for addition info.
442  */
443 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
444   char *endptr = NULL;
445   const char *suid = NULL;
446   uint64_t uid;
447
448   if (jid && YAJL_IS_STRING(jid)) {
449     suid = YAJL_GET_STRING(jid);
450     uid = (uint64_t)strtoul(suid, &endptr, 16);
451     if (*endptr == '\0' && uid)
452       return ovs_db_callback_get(pdb, uid);
453   }
454
455   return NULL;
456 }
457
458 /* OVS DB table update event handler.
459  * This callback is called by POLL thread if OVS DB
460  * table update callback is received from the DB
461  * server. Once registered callback found, it's called
462  * by this handler. */
463 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
464   ovs_callback_t *cb = NULL;
465   yajl_val jvalue;
466   yajl_val jparams;
467   yajl_val jtable_updates;
468   const char *params_path[] = {"params", NULL};
469   const char *id_path[] = {"id", NULL};
470
471   /* check & get request attributes */
472   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
473       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) {
474     OVS_ERROR("invalid OVS DB request received");
475     return -1;
476   }
477
478   /* check array length: [<json-value>, <table-updates>] */
479   if ((YAJL_GET_ARRAY(jparams) == NULL) ||
480       (YAJL_GET_ARRAY(jparams)->len != 2)) {
481     OVS_ERROR("invalid OVS DB request received");
482     return -1;
483   }
484
485   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
486   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
487   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) {
488     OVS_ERROR("invalid OVS DB request id or table update received");
489     return -1;
490   }
491
492   /* find registered callback based on <json-value> */
493   pthread_mutex_lock(&pdb->mutex);
494   cb = ovs_db_table_callback_get(pdb, jvalue);
495   if (cb == NULL || cb->table.call == NULL) {
496     OVS_ERROR("No OVS DB table update callback found");
497     pthread_mutex_unlock(&pdb->mutex);
498     return -1;
499   }
500
501   /* call registered callback */
502   cb->table.call(jtable_updates);
503   pthread_mutex_unlock(&pdb->mutex);
504   return 0;
505 }
506
507 /* OVS DB result request handler.
508  * This callback is called by POLL thread if OVS DB
509  * result reply is received from the DB server.
510  * Once registered callback found, it's called
511  * by this handler. */
512 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
513   ovs_callback_t *cb = NULL;
514   yajl_val jresult;
515   yajl_val jerror;
516   yajl_val jid;
517   const char *result_path[] = {"result", NULL};
518   const char *error_path[] = {"error", NULL};
519   const char *id_path[] = {"id", NULL};
520
521   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
522   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
523   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
524
525   /* check & get result attributes */
526   if (!jresult || !jerror || !jid)
527     return -1;
528
529   /* try to find registered callback */
530   pthread_mutex_lock(&pdb->mutex);
531   cb = ovs_db_table_callback_get(pdb, jid);
532   if (cb != NULL && cb->result.call != NULL) {
533     /* call registered callback */
534     cb->result.call(jresult, jerror);
535     /* unlock owner of the reply */
536     sem_post(&cb->result.sync);
537   }
538
539   pthread_mutex_unlock(&pdb->mutex);
540   return 0;
541 }
542
543 /* Handle JSON data (one request) and call
544  * appropriate event OVS DB handler. Currently,
545  * update callback 'ovs_db_table_update_cb' and
546  * result callback 'ovs_db_result_cb' is supported.
547  */
548 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
549                                     size_t len) {
550   const char *method = NULL;
551   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
552   const char *method_path[] = {"method", NULL};
553   const char *result_path[] = {"result", NULL};
554   char *sjson = NULL;
555   yajl_val jnode, jval;
556
557   /* duplicate the data to make null-terminated string
558    * required for yajl_tree_parse() */
559   if ((sjson = calloc(1, len + 1)) == NULL)
560     return -1;
561
562   sstrncpy(sjson, data, len + 1);
563   OVS_DEBUG("[len=%" PRIsz "] %s", len, sjson);
564
565   /* parse json data */
566   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
567   if (jnode == NULL) {
568     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
569     sfree(sjson);
570     return -1;
571   }
572
573   /* get method name */
574   if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
575     if ((method = YAJL_GET_STRING(jval)) == NULL) {
576       yajl_tree_free(jnode);
577       sfree(sjson);
578       return -1;
579     }
580     if (strcmp("echo", method) == 0) {
581       /* echo request from the server */
582       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
583         OVS_ERROR("handle echo request failed");
584     } else if (strcmp("update", method) == 0) {
585       /* update notification */
586       if (ovs_db_table_update_cb(pdb, jnode) < 0)
587         OVS_ERROR("handle update notification failed");
588     }
589   } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
590     /* result notification */
591     if (ovs_db_result_cb(pdb, jnode) < 0)
592       OVS_ERROR("handle result reply failed");
593   } else
594     OVS_ERROR("connot find method or result failed");
595
596   /* release memory */
597   yajl_tree_free(jnode);
598   sfree(sjson);
599   return 0;
600 }
601
602 /*
603  * JSON reader implementation.
604  *
605  * This module process raw JSON data (byte stream) and
606  * returns fully-fledged JSON data which can be processed
607  * (parsed) by YAJL later.
608  */
609
610 /* Allocate JSON reader instance */
611 static ovs_json_reader_t *ovs_json_reader_alloc() {
612   ovs_json_reader_t *jreader = NULL;
613
614   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
615     return NULL;
616
617   return jreader;
618 }
619
620 /* Push raw data into into the JSON reader for processing */
621 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
622                                      const char *data, size_t data_len) {
623   char *new_buff = NULL;
624   size_t available = jreader->buff_size - jreader->buff_offset;
625
626   /* check/update required memory space */
627   if (available < data_len) {
628     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
629               (int)jreader->buff_size, (int)available, (int)data_len);
630
631     /* allocate new chunk of memory */
632     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
633     if (new_buff == NULL)
634       return -1;
635
636     /* point to new allocated memory */
637     jreader->buff_ptr = new_buff;
638     jreader->buff_size += data_len;
639   }
640
641   /* store input data */
642   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
643   jreader->buff_offset += data_len;
644   return 0;
645 }
646
647 /* Pop one fully-fledged JSON if already exists. Returns 0 if
648  * completed JSON already exists otherwise negative value is
649  * returned */
650 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
651                                const char **json_ptr, size_t *json_len_ptr) {
652   size_t nbraces = 0;
653   size_t json_len = 0;
654   char *json = NULL;
655
656   /* search open/close brace */
657   for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) {
658     if (jreader->buff_ptr[i] == '{') {
659       nbraces++;
660     } else if (jreader->buff_ptr[i] == '}')
661       if (nbraces)
662         if (!(--nbraces)) {
663           /* JSON data */
664           *json_ptr = jreader->buff_ptr + jreader->json_offset;
665           *json_len_ptr = json_len + 1;
666           jreader->json_offset = i + 1;
667           return 0;
668         }
669
670     /* increase JSON data length */
671     if (nbraces)
672       json_len++;
673   }
674
675   if (jreader->json_offset) {
676     if (jreader->json_offset < jreader->buff_offset) {
677       /* shift data to the beginning of the buffer
678        * and zero rest of the buffer data */
679       json = &jreader->buff_ptr[jreader->json_offset];
680       json_len = jreader->buff_offset - jreader->json_offset;
681       for (size_t i = 0; i < jreader->buff_size; i++)
682         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
683       jreader->buff_offset = json_len;
684     } else
685       /* reset the buffer */
686       jreader->buff_offset = 0;
687
688     /* data is at the beginning of the buffer */
689     jreader->json_offset = 0;
690   }
691
692   return -1;
693 }
694
695 /* Reset JSON reader. It is useful when start processing
696  * new raw data. E.g.: in case of lost stream connection.
697  */
698 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
699   if (jreader) {
700     jreader->buff_offset = 0;
701     jreader->json_offset = 0;
702   }
703 }
704
705 /* Release internal data allocated for JSON reader */
706 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
707   if (jreader) {
708     free(jreader->buff_ptr);
709     free(jreader);
710   }
711 }
712
713 /* Reconnect to OVS DB and call the OVS DB post connection init callback
714  * if connection has been established.
715  */
716 static void ovs_db_reconnect(ovs_db_t *pdb) {
717   const char *node_info = pdb->node;
718   struct addrinfo *result;
719
720   if (pdb->unix_path[0] != '\0') {
721     /* use UNIX socket instead of INET address */
722     node_info = pdb->unix_path;
723     result = calloc(1, sizeof(struct addrinfo));
724     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
725     if (result == NULL || sa_unix == NULL) {
726       sfree(result);
727       sfree(sa_unix);
728       return;
729     }
730     result->ai_family = AF_UNIX;
731     result->ai_socktype = SOCK_STREAM;
732     result->ai_addrlen = sizeof(*sa_unix);
733     result->ai_addr = (struct sockaddr *)sa_unix;
734     sa_unix->sun_family = result->ai_family;
735     sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path));
736   } else {
737     /* inet socket address */
738     struct addrinfo hints;
739
740     /* setup criteria for selecting the socket address */
741     memset(&hints, 0, sizeof(hints));
742     hints.ai_family = AF_UNSPEC;
743     hints.ai_socktype = SOCK_STREAM;
744
745     /* get socket addresses */
746     int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result);
747     if (ret != 0) {
748       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
749       return;
750     }
751   }
752   /* try to connect to the server */
753   for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) {
754     int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
755     if (sock < 0) {
756       OVS_DEBUG("socket(): %s", STRERRNO);
757       continue;
758     }
759     if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) {
760       close(sock);
761       OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family);
762     } else {
763       /* send notification to event thread */
764       pdb->sock = sock;
765       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
766       break;
767     }
768   }
769
770   if (pdb->sock < 0)
771     OVS_ERROR("connect to \"%s\" failed", node_info);
772
773   freeaddrinfo(result);
774 }
775
776 /* POLL worker thread.
777  * It listens on OVS DB connection for incoming
778  * requests/reply/events etc. Also, it reconnects to OVS DB
779  * if connection has been lost.
780  */
781 static void *ovs_poll_worker(void *arg) {
782   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
783   ovs_json_reader_t *jreader = NULL;
784   struct pollfd poll_fd = {
785       .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0,
786   };
787
788   /* create JSON reader instance */
789   if ((jreader = ovs_json_reader_alloc()) == NULL) {
790     OVS_ERROR("initialize json reader failed");
791     return NULL;
792   }
793
794   /* poll data */
795   while (ovs_db_poll_is_running(pdb)) {
796     poll_fd.fd = pdb->sock;
797     int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
798     if (poll_ret < 0) {
799       OVS_ERROR("poll(): %s", STRERRNO);
800       break;
801     } else if (poll_ret == 0) {
802       OVS_DEBUG("poll(): timeout");
803       if (pdb->sock < 0)
804         /* invalid fd, so try to reconnect */
805         ovs_db_reconnect(pdb);
806       continue;
807     }
808     if (poll_fd.revents & POLLNVAL) {
809       /* invalid file descriptor, clean-up */
810       ovs_db_callback_remove_all(pdb);
811       ovs_json_reader_reset(jreader);
812       /* setting poll FD to -1 tells poll() call to ignore this FD.
813        * In that case poll() call will return timeout all the time */
814       pdb->sock = (-1);
815     } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
816       /* connection is broken */
817       close(poll_fd.fd);
818       ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
819       OVS_ERROR("poll() peer closed its end of the channel");
820     } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
821       /* read incoming data */
822       char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
823       ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0);
824       if (nbytes < 0) {
825         OVS_ERROR("recv(): %s", STRERRNO);
826         /* read error? Try to reconnect */
827         close(poll_fd.fd);
828         continue;
829       } else if (nbytes == 0) {
830         close(poll_fd.fd);
831         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
832         OVS_ERROR("recv() peer has performed an orderly shutdown");
833         continue;
834       }
835       /* read incoming data */
836       size_t json_len = 0;
837       const char *json = NULL;
838       OVS_DEBUG("recv(): received %zd bytes of data", nbytes);
839       ovs_json_reader_push_data(jreader, buff, nbytes);
840       while (!ovs_json_reader_pop(jreader, &json, &json_len))
841         /* process JSON data */
842         ovs_db_json_data_process(pdb, json, json_len);
843     }
844   }
845
846   OVS_DEBUG("poll thread has been completed");
847   ovs_json_reader_free(jreader);
848   return NULL;
849 }
850
851 /* EVENT worker thread.
852  * Perform task based on incoming events. This
853  * task can be done asynchronously which allows to
854  * handle OVS DB callback like 'init_cb'.
855  */
856 static void *ovs_event_worker(void *arg) {
857   ovs_db_t *pdb = (ovs_db_t *)arg;
858
859   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
860     /* wait for an event */
861     struct timespec ts;
862     clock_gettime(CLOCK_REALTIME, &ts);
863     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
864     int ret = pthread_cond_timedwait(&pdb->event_thread.cond,
865                                      &pdb->event_thread.mutex, &ts);
866     if (!ret || ret == ETIMEDOUT) {
867       /* handle the event */
868       OVS_DEBUG("handle event %d", pdb->event_thread.value);
869       switch (pdb->event_thread.value) {
870       case OVS_DB_EVENT_CONN_ESTABLISHED:
871         if (pdb->cb.post_conn_init)
872           pdb->cb.post_conn_init(pdb);
873         /* reset event */
874         pdb->event_thread.value = OVS_DB_EVENT_NONE;
875         break;
876       case OVS_DB_EVENT_CONN_TERMINATED:
877         if (pdb->cb.post_conn_terminate)
878           pdb->cb.post_conn_terminate();
879         /* reset event */
880         pdb->event_thread.value = OVS_DB_EVENT_NONE;
881         break;
882       case OVS_DB_EVENT_NONE:
883         /* wait timeout */
884         OVS_DEBUG("no event received (timeout)");
885         break;
886       default:
887         OVS_DEBUG("unknown event received");
888         break;
889       }
890     } else {
891       /* unexpected error */
892       OVS_ERROR("pthread_cond_timedwait() failed");
893       break;
894     }
895   }
896
897   OVS_DEBUG("event thread has been completed");
898   return NULL;
899 }
900
901 /* Initialize EVENT thread */
902 static int ovs_db_event_thread_init(ovs_db_t *pdb) {
903   pdb->event_thread.tid = (pthread_t){0};
904   /* init event thread condition variable */
905   if (pthread_cond_init(&pdb->event_thread.cond, NULL)) {
906     return -1;
907   }
908   /* init event thread mutex */
909   if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) {
910     pthread_cond_destroy(&pdb->event_thread.cond);
911     return -1;
912   }
913   /* Hold the event thread mutex. It ensures that no events
914    * will be lost while thread is still starting. Once event
915    * thread is started and ready to accept events, it will release
916    * the mutex */
917   if (pthread_mutex_lock(&pdb->event_thread.mutex)) {
918     pthread_mutex_destroy(&pdb->event_thread.mutex);
919     pthread_cond_destroy(&pdb->event_thread.cond);
920     return -1;
921   }
922   /* start event thread */
923   pthread_t tid;
924   if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb,
925                            "utils_ovs:event") != 0) {
926     pthread_mutex_unlock(&pdb->event_thread.mutex);
927     pthread_mutex_destroy(&pdb->event_thread.mutex);
928     pthread_cond_destroy(&pdb->event_thread.cond);
929     return -1;
930   }
931   pdb->event_thread.tid = tid;
932   return 0;
933 }
934
935 /* Terminate EVENT thread */
936 static int ovs_db_event_thread_terminate(ovs_db_t *pdb) {
937   if (pthread_equal(pdb->event_thread.tid, (pthread_t){0})) {
938     /* already terminated */
939     return 0;
940   }
941   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
942   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
943     return -1;
944   /* Event thread always holds the thread mutex when
945    * performs some task (handles event) and releases it when
946    * while sleeping. Thus, if event thread exits, the mutex
947    * remains locked */
948   pdb->event_thread.tid = (pthread_t){0};
949   pthread_mutex_unlock(&pdb->event_thread.mutex);
950   return 0;
951 }
952
953 /* Destroy EVENT thread private data */
954 static void ovs_db_event_thread_data_destroy(ovs_db_t *pdb) {
955   /* destroy mutex */
956   pthread_mutex_destroy(&pdb->event_thread.mutex);
957   pthread_cond_destroy(&pdb->event_thread.cond);
958 }
959
960 /* Initialize POLL thread */
961 static int ovs_db_poll_thread_init(ovs_db_t *pdb) {
962   pdb->poll_thread.tid = (pthread_t){0};
963   /* init event thread mutex */
964   if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) {
965     return -1;
966   }
967   /* start poll thread */
968   pthread_t tid;
969   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
970   if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb,
971                            "utils_ovs:poll") != 0) {
972     pthread_mutex_destroy(&pdb->poll_thread.mutex);
973     return -1;
974   }
975   pdb->poll_thread.tid = tid;
976   return 0;
977 }
978
979 /* Destroy POLL thread */
980 /* XXX: Must hold pdb->mutex when calling! */
981 static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) {
982   if (pthread_equal(pdb->poll_thread.tid, (pthread_t){0})) {
983     /* already destroyed */
984     return 0;
985   }
986   /* change thread state */
987   pthread_mutex_lock(&pdb->poll_thread.mutex);
988   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
989   pthread_mutex_unlock(&pdb->poll_thread.mutex);
990   /* join the thread */
991   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
992     return -1;
993   pthread_mutex_destroy(&pdb->poll_thread.mutex);
994   pdb->poll_thread.tid = (pthread_t){0};
995   return 0;
996 }
997
998 /*
999  * Public OVS DB API implementation
1000  */
1001
1002 ovs_db_t *ovs_db_init(const char *node, const char *service,
1003                       const char *unix_path, ovs_db_callback_t *cb) {
1004   int ret;
1005
1006   /* sanity check */
1007   if (node == NULL || service == NULL || unix_path == NULL)
1008     return NULL;
1009
1010   /* allocate db data & fill it */
1011   ovs_db_t *pdb = calloc(1, sizeof(*pdb));
1012   if (pdb == NULL)
1013     return NULL;
1014   pdb->sock = -1;
1015
1016   /* store the OVS DB address */
1017   sstrncpy(pdb->node, node, sizeof(pdb->node));
1018   sstrncpy(pdb->service, service, sizeof(pdb->service));
1019   sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path));
1020
1021   /* setup OVS DB callbacks */
1022   if (cb)
1023     pdb->cb = *cb;
1024
1025   /* init OVS DB mutex attributes */
1026   pthread_mutexattr_t mutex_attr;
1027   if (pthread_mutexattr_init(&mutex_attr)) {
1028     OVS_ERROR("OVS DB mutex attribute init failed");
1029     sfree(pdb);
1030     return NULL;
1031   }
1032   /* set OVS DB mutex as recursive */
1033   if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) {
1034     OVS_ERROR("Failed to set OVS DB mutex as recursive");
1035     pthread_mutexattr_destroy(&mutex_attr);
1036     sfree(pdb);
1037     return NULL;
1038   }
1039   /* init OVS DB mutex */
1040   if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1041     OVS_ERROR("OVS DB mutex init failed");
1042     pthread_mutexattr_destroy(&mutex_attr);
1043     sfree(pdb);
1044     return NULL;
1045   }
1046   /* destroy mutex attributes */
1047   pthread_mutexattr_destroy(&mutex_attr);
1048
1049   /* init event thread */
1050   if (ovs_db_event_thread_init(pdb) < 0) {
1051     ret = ovs_db_destroy(pdb);
1052     if (ret > 0)
1053       goto failure;
1054   }
1055
1056   /* init polling thread */
1057   if (ovs_db_poll_thread_init(pdb) < 0) {
1058     ret = ovs_db_destroy(pdb);
1059     if (ret > 0) {
1060       ovs_db_event_thread_data_destroy(pdb);
1061       goto failure;
1062     }
1063   }
1064   return pdb;
1065
1066 failure:
1067   pthread_mutex_destroy(&pdb->mutex);
1068   sfree(pdb);
1069   return NULL;
1070 }
1071
1072 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1073                         ovs_db_result_cb_t cb) {
1074   int ret = 0;
1075   yajl_gen_status yajl_gen_ret;
1076   yajl_val jparams;
1077   yajl_gen jgen;
1078   ovs_callback_t *new_cb = NULL;
1079   uint64_t uid;
1080   char uid_buff[OVS_UID_STR_SIZE];
1081   const char *req = NULL;
1082   size_t req_len = 0;
1083   struct timespec ts;
1084
1085   /* sanity check */
1086   if (!pdb || !method || !params)
1087     return -1;
1088
1089   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1090     return -1;
1091
1092   /* try to parse params */
1093   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1094     OVS_ERROR("params is not a JSON string");
1095     yajl_gen_clear(jgen);
1096     return -1;
1097   }
1098
1099   /* generate method field */
1100   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1101
1102   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1103   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1104
1105   /* generate params field */
1106   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1107   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1108   yajl_tree_free(jparams);
1109
1110   /* generate id field */
1111   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1112   uid = ovs_uid_generate();
1113   snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1114   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1115
1116   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1117
1118   if (cb) {
1119     /* register result callback */
1120     if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1121       goto yajl_gen_failure;
1122
1123     /* add new callback to front */
1124     sem_init(&new_cb->result.sync, 0, 0);
1125     new_cb->result.call = cb;
1126     new_cb->uid = uid;
1127     ovs_db_callback_add(pdb, new_cb);
1128   }
1129
1130   /* send the request */
1131   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1132   OVS_DEBUG("%s", req);
1133   if (!ovs_db_data_send(pdb, req, req_len)) {
1134     if (cb) {
1135       /* wait for result */
1136       clock_gettime(CLOCK_REALTIME, &ts);
1137       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1138       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1139         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1140                   OVS_DB_SEND_REQ_TIMEOUT);
1141         ret = (-1);
1142       }
1143     }
1144   } else {
1145     OVS_ERROR("ovs_db_data_send() failed");
1146     ret = (-1);
1147   }
1148
1149 yajl_gen_failure:
1150   if (new_cb) {
1151     /* destroy callback */
1152     sem_destroy(&new_cb->result.sync);
1153     ovs_db_callback_remove(pdb, new_cb);
1154   }
1155
1156   /* release memory */
1157   yajl_gen_clear(jgen);
1158   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1159 }
1160
1161 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1162                              const char **tb_column,
1163                              ovs_db_table_cb_t update_cb,
1164                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1165   yajl_gen jgen;
1166   yajl_gen_status yajl_gen_ret;
1167   ovs_callback_t *new_cb = NULL;
1168   char uid_str[OVS_UID_STR_SIZE];
1169   char *params;
1170   size_t params_len;
1171   int ovs_db_ret = 0;
1172
1173   /* sanity check */
1174   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1175     return -1;
1176
1177   /* allocate new update callback */
1178   if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL)
1179     return -1;
1180
1181   /* init YAJL generator */
1182   if ((jgen = yajl_gen_alloc(NULL)) == NULL) {
1183     sfree(new_cb);
1184     return -1;
1185   }
1186
1187   /* add new callback to front */
1188   new_cb->table.call = update_cb;
1189   new_cb->uid = ovs_uid_generate();
1190   ovs_db_callback_add(pdb, new_cb);
1191
1192   /* make update notification request
1193    * [<db-name>, <json-value>, <monitor-requests>] */
1194   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1195   {
1196     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1197
1198     /* uid string <json-value> */
1199     snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1200     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1201
1202     /* <monitor-requests> */
1203     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1204     {
1205       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1206       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1207       {
1208         /* <monitor-request> */
1209         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1210         {
1211           if (tb_column) {
1212             /* columns within the table to be monitored */
1213             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1214             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1215             for (; *tb_column; tb_column++)
1216               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1217             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1218           }
1219           /* specify select option */
1220           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1221           {
1222             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1223             {
1224               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1225               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1226                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1227               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1228               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1229                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1230               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1231               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1232                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1233               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1234               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1235                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1236             }
1237             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1238           }
1239         }
1240         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1241       }
1242       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1243     }
1244     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1245   }
1246   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1247
1248   /* make a request to subscribe to given table */
1249   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1250                 &params_len);
1251   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1252     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1253     ovs_db_ret = (-1);
1254   }
1255
1256 yajl_gen_failure:
1257   /* release memory */
1258   yajl_gen_clear(jgen);
1259   return ovs_db_ret;
1260 }
1261
1262 int ovs_db_destroy(ovs_db_t *pdb) {
1263   int ovs_db_ret = 0;
1264   int ret = 0;
1265
1266   /* sanity check */
1267   if (pdb == NULL)
1268     return -1;
1269
1270   /* stop event thread */
1271   if (ovs_db_event_thread_terminate(pdb) < 0) {
1272     OVS_ERROR("stop event thread failed");
1273     ovs_db_ret = -1;
1274   }
1275
1276   /* try to lock the structure before releasing */
1277   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1278     OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret);
1279     return ret;
1280   }
1281
1282   /* stop poll thread and destroy thread's private data */
1283   if (ovs_db_poll_thread_destroy(pdb) < 0) {
1284     OVS_ERROR("destroy poll thread failed");
1285     ovs_db_ret = -1;
1286   }
1287
1288   /* destroy event thread private data */
1289   ovs_db_event_thread_data_destroy(pdb);
1290
1291   pthread_mutex_unlock(&pdb->mutex);
1292
1293   /* unsubscribe callbacks */
1294   ovs_db_callback_remove_all(pdb);
1295
1296   /* close connection */
1297   if (pdb->sock >= 0)
1298     close(pdb->sock);
1299
1300   /* release DB handler */
1301   pthread_mutex_destroy(&pdb->mutex);
1302   sfree(pdb);
1303   return ovs_db_ret;
1304 }
1305
1306 /*
1307  * Public OVS utils API implementation
1308  */
1309
1310 /* Get YAJL value by key from YAJL dictionary
1311  *
1312  * EXAMPLE:
1313  *  {
1314  *    "key_a" : <YAJL return value>
1315  *    "key_b" : <YAJL return value>
1316  *  }
1317  */
1318 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1319   const char *obj_key = NULL;
1320
1321   /* check params */
1322   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1323     return NULL;
1324
1325   /* find a value by key */
1326   for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1327     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1328     if (strcmp(obj_key, key) == 0)
1329       return YAJL_GET_OBJECT(jval)->values[i];
1330   }
1331
1332   return NULL;
1333 }
1334
1335 /* Get OVS DB map value by given map key
1336  *
1337  * FROM RFC7047:
1338  *
1339  *   <pair>
1340  *     A 2-element JSON array that represents a pair within a database
1341  *     map.  The first element is an <atom> that represents the key, and
1342  *     the second element is an <atom> that represents the value.
1343  *
1344  *   <map>
1345  *     A 2-element JSON array that represents a database map value.  The
1346  *     first element of the array must be the string "map", and the
1347  *     second element must be an array of zero or more <pair>s giving the
1348  *     values in the map.  All of the <pair>s must have the same key and
1349  *     value types.
1350  *
1351  * EXAMPLE:
1352  *  [
1353  *    "map", [
1354  *             [ "key_a", <YAJL value>], [ "key_b", <YAJL value>], ...
1355  *           ]
1356  *  ]
1357  */
1358 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1359   size_t map_len = 0;
1360   size_t array_len = 0;
1361   yajl_val *map_values = NULL;
1362   yajl_val *array_values = NULL;
1363   const char *str_val = NULL;
1364
1365   /* check YAJL array */
1366   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1367     return NULL;
1368
1369   /* check a database map value (2-element, first one should be a string */
1370   array_len = YAJL_GET_ARRAY(jval)->len;
1371   array_values = YAJL_GET_ARRAY(jval)->values;
1372   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1373       (!YAJL_IS_ARRAY(array_values[1])))
1374     return NULL;
1375
1376   /* check first element of the array */
1377   str_val = YAJL_GET_STRING(array_values[0]);
1378   if (str_val == NULL || strcmp("map", str_val) != 0)
1379     return NULL;
1380
1381   /* try to find map value by map key */
1382   if (YAJL_GET_ARRAY(array_values[1]) == NULL)
1383     return NULL;
1384
1385   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1386   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1387   for (size_t i = 0; i < map_len; i++) {
1388     /* check YAJL array */
1389     if (!YAJL_IS_ARRAY(map_values[i]) || YAJL_GET_ARRAY(map_values[i]) == NULL)
1390       break;
1391
1392     /* check a database pair value (2-element, first one represents a key
1393      * and it should be a string in our case */
1394     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1395     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1396     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1397       break;
1398
1399     /* return map value if given key equals map key */
1400     str_val = YAJL_GET_STRING(array_values[0]);
1401     if (str_val != NULL && strcmp(key, str_val) == 0)
1402       return array_values[1];
1403   }
1404   return NULL;
1405 }