OVS link: Implement OVS link plugin
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  *
27  *                         OVS DB API internal architecture diagram
28  * +------------------------------------------------------------------------------+
29  * |OVS plugin      |OVS utils                                                    |
30  * |                |     +------------------------+                              |
31  * |                |     |      echo handler      |                JSON request/ |
32  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
33  * |                |  |  |                        |    |         | result        |
34  * |                |  |  +------------------------+    |         |               |
35  * |                |  |                                |    +----+---+--------+  |
36  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
37  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
38  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
39  * |  +----------+  |  |  |                        |    |    |        |        |  |
40  * |                |  |  +------------------------+    |    +--------+---+----+  |
41  * |                |  |                                |                 ^       |
42  * |  +----------+  |  |  +------------------------+    |                 |       |
43  * |  |  result  |  |  |  |     result handler     |    |                 |       |
44  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
45  * |  +----------+  |  |  |                        |               data   |       |
46  * |                |  |  +------------------------+                      |       |
47  * |                |  |                                                  |       |
48  * |                |  |    +------------------+             +------------+----+  |
49  * |  +----------+  |  |    |thread|           |             |thread|          |  |
50  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
51  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
52  * |  +----------+  |  |    +------------------+             +--------+--------+  |
53  * |                |  |                                              ^           |
54  * +----------------+-------------------------------------------------------------+
55  *                     |                                              |
56  *                 JSON|echo reply                                 raw|data
57  *                     v                                              v
58  * +-------------------+----------------------------------------------+-----------+
59  * |                                 TCP/UNIX socket                              |
60  * +-------------------------------------------------------------------------------
61  *
62  **/
63
64 /* collectd headers */
65 #include "common.h"
66
67 /* private headers */
68 #include "utils_ovs.h"
69
70 /* system libraries */
71 #include <semaphore.h>
72 #include <arpa/inet.h>
73 #include <poll.h>
74 #include <sys/un.h>
75
76 #define OVS_ERROR(fmt, ...) do { \
77   ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
78 #define OVS_DEBUG(fmt, ...) do { \
79   DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
80         ## __VA_ARGS__); } while (0)
81
82 #define OVS_DB_POLL_TIMEOUT          1  /* poll receive timeout (sec) */
83 #define OVS_DB_POLL_READ_BLOCK_SIZE  5  /* read block size (bytes) */
84 #define OVS_DB_DEFAULT_DB_NAME       "Open_vSwitch"
85 #define OVS_DB_RECONNECT_TIMEOUT     1  /* reconnect timeout (sec) */
86
87 #define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
88 #define OVS_DB_EVENT_TERMINATE       1
89 #define OVS_DB_EVENT_CONNECTED       2
90
91 #define OVS_DB_POLL_STATE_RUNNING    1
92 #define OVS_DB_POLL_STATE_EXITING    2
93
94 #define OVS_DB_SEND_REQ_TIMEOUT      5  /* send request timeout (sec) */
95
96 #define OVS_YAJL_CALL(func, ...) \
97   do { \
98     yajl_gen_ret = yajl_gen_status_ok; \
99     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
100       goto yajl_gen_failure; \
101   } while (0)
102 #define OVS_YAJL_ERROR_BUFFER_SIZE       1024
103 #define OVS_ERROR_BUFF_SIZE              512
104 #define OVS_UID_STR_SIZE                 17     /* 64-bit HEX string len + '\0' */
105
106 /* JSON reader internal data */
107 struct ovs_json_reader_s {
108   char *buff_ptr;
109   size_t buff_size;
110   size_t buff_offset;
111   size_t json_offset;
112 };
113 typedef struct ovs_json_reader_s ovs_json_reader_t;
114
115 /* Result callback declaration */
116 struct ovs_result_cb_s {
117   sem_t sync;
118   ovs_db_result_cb_t call;
119 };
120 typedef struct ovs_result_cb_s ovs_result_cb_t;
121
122 /* Table callback declaration */
123 struct ovs_table_cb_s {
124   ovs_db_table_cb_t call;
125 };
126 typedef struct ovs_table_cb_s ovs_table_cb_t;
127
128 /* Callback declaration */
129 struct ovs_callback_s {
130   uint64_t uid;
131   union {
132     ovs_result_cb_t result;
133     ovs_table_cb_t table;
134   };
135   struct ovs_callback_s *next;
136   struct ovs_callback_s *prev;
137 };
138 typedef struct ovs_callback_s ovs_callback_t;
139
140 /* Connection declaration */
141 struct ovs_conn_s {
142   int sock;
143   int domain;
144   int type;
145   int addr_size;
146   union {
147     struct sockaddr_in s_inet;
148     struct sockaddr_un s_unix;
149   } addr;
150 };
151 typedef struct ovs_conn_s ovs_conn_t;
152
153 /* Event thread data declaration */
154 struct ovs_event_thread_s {
155   pthread_t tid;
156   pthread_mutex_t mutex;
157   pthread_cond_t cond;
158   int value;
159 };
160 typedef struct ovs_event_thread_s ovs_event_thread_t;
161
162 /* Poll thread data declaration */
163 struct ovs_poll_thread_s {
164   pthread_t tid;
165   pthread_mutex_t mutex;
166   int state;
167 };
168 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
169
170 /* OVS DB internal data declaration */
171 struct ovs_db_s {
172   ovs_poll_thread_t poll_thread;
173   ovs_event_thread_t event_thread;
174   pthread_mutex_t mutex;
175   ovs_callback_t *cb;
176   ovs_conn_t conn;
177   ovs_db_init_cb_t init_cb;
178 };
179 typedef struct ovs_db_s ovs_db_t;
180
181 /* Post an event to event thread.
182  * Possible events are:
183  *  OVS_DB_EVENT_TERMINATE
184  *  OVS_DB_EVENT_CONNECTED
185  */
186 static void
187 ovs_db_event_post(ovs_db_t *pdb, int event)
188 {
189   pthread_mutex_lock(&pdb->event_thread.mutex);
190   pdb->event_thread.value = event;
191   pthread_mutex_unlock(&pdb->event_thread.mutex);
192   pthread_cond_signal(&pdb->event_thread.cond);
193 }
194
195 /* Check if POLL thread is still running. Returns
196  * 1 if running otherwise 0 is returned */
197 static inline int
198 ovs_db_poll_is_running(ovs_db_t *pdb)
199 {
200   int state = 0;
201   pthread_mutex_lock(&pdb->poll_thread.mutex);
202   state = pdb->poll_thread.state;
203   pthread_mutex_unlock(&pdb->poll_thread.mutex);
204   return (state == OVS_DB_POLL_STATE_RUNNING);
205 }
206
207 /* Terminate POLL thread */
208 static inline void
209 ovs_db_poll_terminate(ovs_db_t *pdb)
210 {
211   pthread_mutex_lock(&pdb->poll_thread.mutex);
212   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
213   pthread_mutex_unlock(&pdb->poll_thread.mutex);
214 }
215
216 /* Generate unique identifier (UID). It is used by OVS DB API
217  * to set "id" field for any OVS DB JSON request. */
218 static uint64_t
219 ovs_uid_generate()
220 {
221   struct timespec ts;
222   clock_gettime(CLOCK_MONOTONIC, &ts);
223   return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
224 }
225
226 /*
227  * Callback API. These function are used to store
228  * registered callbacks in OVS DB API.
229  */
230
231 /* Add new callback into OVS DB object */
232 static void
233 ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
234 {
235   pthread_mutex_lock(&pdb->mutex);
236   if (pdb->cb)
237     pdb->cb->prev = new_cb;
238   new_cb->next = pdb->cb;
239   new_cb->prev = NULL;
240   pdb->cb = new_cb;
241   pthread_mutex_unlock(&pdb->mutex);
242 }
243
244 /* Remove callback from OVS DB object */
245 static void
246 ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
247 {
248   ovs_callback_t *pre_cb = del_cb->prev;
249   ovs_callback_t *next_cb = del_cb->next;
250
251   pthread_mutex_lock(&pdb->mutex);
252   if (next_cb)
253     next_cb->prev = del_cb->prev;
254
255   if (pre_cb)
256     pre_cb->next = del_cb->next;
257   else
258     pdb->cb = del_cb->next;
259
260   free(del_cb);
261   pthread_mutex_unlock(&pdb->mutex);
262 }
263
264 /* Remove all callbacks form OVS DB object */
265 static void
266 ovs_db_callback_remove_all(ovs_db_t *pdb)
267 {
268   pthread_mutex_lock(&pdb->mutex);
269   for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
270     pdb->cb = pdb->cb->next;
271     free(del_cb);
272   }
273   pdb->cb = NULL;
274   pthread_mutex_unlock(&pdb->mutex);
275 }
276
277 /* Get/find callback in OVS DB object by UID. Returns pointer
278  * to requested callback otherwise NULL is returned */
279 static ovs_callback_t *
280 ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
281 {
282   pthread_mutex_lock(&pdb->mutex);
283   for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
284     if (cb->uid == uid) {
285       pthread_mutex_unlock(&pdb->mutex);
286       return cb;
287     }
288   pthread_mutex_unlock(&pdb->mutex);
289   return NULL;
290 }
291
292 /* Send all requested data to the socket. Returns 0 if
293  * ALL request data has been sent otherwise negative value
294  * is returned */
295 static int
296 ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
297 {
298   ssize_t nbytes = 0;
299   size_t rem = len;
300   size_t off = 0;
301
302   while (rem > 0) {
303     if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
304       return (-1);
305     rem -= (size_t)nbytes;
306     off += (size_t)nbytes;
307   }
308   return (0);
309 }
310
311 /* Parse OVS server URL.
312  * Format of the URL:
313  *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
314  *   "unix:file" - define UNIX socket file (UNIX domain)
315  */
316 static int
317 ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
318 {
319   ovs_conn_t tmp_conn;
320   char *nexttok = NULL;
321   char *in_str = NULL;
322   char *saveptr;
323   int ret = 0;
324
325   /* sanity check */
326   if ((surl == NULL) || (strlen(surl) < 1))
327     return (-1);
328
329   /* parse domain */
330   tmp_conn = *conn;
331   in_str = sstrdup(surl);
332   if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
333     if (strcmp("tcp", nexttok) == 0) {
334       tmp_conn.domain = AF_INET;
335       tmp_conn.type = SOCK_STREAM;
336       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
337     } else if (strcmp("unix", nexttok) == 0) {
338       tmp_conn.domain = AF_UNIX;
339       tmp_conn.type = SOCK_STREAM;
340       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
341     } else
342       goto failure;
343   } else
344     goto failure;
345
346   /* parse url depending on domain */
347   if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
348     if (tmp_conn.domain == AF_UNIX) {
349       /* <UNIX-NAME> */
350       tmp_conn.addr.s_inet.sin_family = AF_UNIX;
351       sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
352     } else {
353       /* <IP:PORT> */
354       tmp_conn.addr.s_inet.sin_family = AF_INET;
355       ret =
356         inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
357       if (ret == 1) {
358         if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
359           tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
360         else
361           goto failure;
362       } else
363         goto failure;
364     }
365   }
366
367   /* save result and return success */
368   *conn = tmp_conn;
369   sfree(in_str);
370   return (0);
371
372 failure:
373   OVS_ERROR("%s() : invalid OVS DB URL provided");
374   sfree(in_str);
375   return (-1);
376 }
377
378 /*
379  * YAJL (Yet Another JSON Library) helper functions
380  * Documentation (https://lloyd.github.io/yajl/)
381  */
382
383 /* Add null-terminated string into YAJL generator handle (JSON object).
384  * Similar function to yajl_gen_string() but takes null-terminated string
385  * instead of string and its length.
386  *
387  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
388  * string - Null-terminated string
389  */
390 static inline yajl_gen_status
391 ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
392 {
393   return yajl_gen_string(hander, string, strlen(string));
394 }
395
396 /* Add YAJL value into YAJL generator handle (JSON object)
397  *
398  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
399  * jval - YAJL value usually returned by yajl_tree_get()
400  */
401 static yajl_gen_status
402 ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
403 {
404   size_t array_len = 0;
405   yajl_val *jvalues = NULL;
406   yajl_val jobj_value = NULL;
407   const char *obj_key = NULL;
408   size_t obj_len = 0;
409   yajl_gen_status yajl_gen_ret;
410
411   if (YAJL_IS_STRING(jval))
412     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
413   else if (YAJL_IS_DOUBLE(jval))
414     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
415   else if (YAJL_IS_INTEGER(jval))
416     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
417   else if (YAJL_IS_TRUE(jval))
418     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
419   else if (YAJL_IS_FALSE(jval))
420     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
421   else if (YAJL_IS_NULL(jval))
422     OVS_YAJL_CALL(yajl_gen_null, jgen);
423   else if (YAJL_IS_ARRAY(jval)) {
424     /* create new array and add all elements into the array */
425     array_len = YAJL_GET_ARRAY(jval)->len;
426     jvalues = YAJL_GET_ARRAY(jval)->values;
427     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
428     for (int i = 0; i < array_len; i++)
429       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
430     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
431   } else if (YAJL_IS_OBJECT(jval)) {
432     /* create new object and add all elements into the object */
433     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
434     obj_len = YAJL_GET_OBJECT(jval)->len;
435     for (int i = 0; i < obj_len; i++) {
436       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
437       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
438       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
439       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
440     }
441     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
442   } else {
443     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
444               (int)(jval)->type);
445     goto yajl_gen_failure;
446   }
447   return yajl_gen_status_ok;
448
449 yajl_gen_failure:
450   OVS_ERROR("%s() error to generate value", __FUNCTION__);
451   return yajl_gen_ret;
452 }
453
454 /* OVS DB echo request handler. When OVS DB sends
455  * "echo" request to the client, client should generate
456  * "echo" replay with the same content received in the
457  * request */
458 static int
459 ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
460 {
461   yajl_val jparams;
462   yajl_val jid;
463   yajl_gen jgen;
464   size_t resp_len = 0;
465   const char *resp = NULL;
466   const char *params_path[] = {"params", NULL};
467   const char *id_path[] = {"id", NULL};
468   yajl_gen_status yajl_gen_ret;
469
470   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
471     return (-1);
472
473   /* check & get request attributes */
474   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
475       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
476     OVS_ERROR("parse echo request failed");
477     goto yajl_gen_failure;
478   }
479
480   /* generate JSON echo response */
481   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
482
483   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
484   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
485
486   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
487   OVS_YAJL_CALL(yajl_gen_null, jgen);
488
489   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
490   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
491
492   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
493   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
494                 &resp_len);
495
496   /* send the response */
497   OVS_DEBUG("response: %s", resp);
498   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
499     OVS_ERROR("send echo reply failed");
500     goto yajl_gen_failure;
501   }
502   /* clean up and return success */
503   yajl_gen_clear(jgen);
504   return (0);
505
506 yajl_gen_failure:
507   /* release memory */
508   yajl_gen_clear(jgen);
509   return (-1);
510 }
511
512 /* Get OVS DB registered callback by YAJL val. The YAJL
513  * value should be YAJL string (UID). Returns NULL if
514  * callback hasn't been found.
515  */
516 static ovs_callback_t *
517 ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
518 {
519   char *endptr = NULL;
520   const char *suid = NULL;
521   uint64_t uid;
522
523   if (jid && YAJL_IS_STRING(jid)) {
524     suid = YAJL_GET_STRING(jid);
525     uid = (uint64_t) strtoul(suid, &endptr, 16);
526     if (*endptr == '\0' && uid)
527       return ovs_db_callback_get(pdb, uid);
528   }
529
530   return NULL;
531 }
532
533 /* OVS DB table update event handler.
534  * This callback is called by POLL thread if OVS DB
535  * table update callback is received from the DB
536  * server. Once registered callback found, it's called
537  * by this handler. */
538 static int
539 ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
540 {
541   ovs_callback_t *cb = NULL;
542   yajl_val jvalue;
543   yajl_val jparams;
544   yajl_val jtable_updates;
545   yajl_val jtable_update;
546   size_t obj_len = 0;
547   const char *table_name = NULL;
548   const char *params_path[] = {"params", NULL};
549   const char *id_path[] = {"id", NULL};
550
551   /* check & get request attributes */
552   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
553       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
554     goto ovs_failure;
555
556   /* check array length: [<json-value>, <table-updates>] */
557   if (YAJL_GET_ARRAY(jparams)->len != 2)
558     goto ovs_failure;
559
560   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
561   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
562   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
563     goto ovs_failure;
564
565   /* find registered callback based on <json-value> */
566   cb = ovs_db_table_callback_get(pdb, jvalue);
567   if (cb == NULL || cb->table.call == NULL)
568     goto ovs_failure;
569
570   /* call registered callback */
571   cb->table.call(jtable_updates);
572   return 0;
573
574 ovs_failure:
575   OVS_ERROR("invalid OVS DB table update event");
576   return (-1);
577 }
578
579 /* OVS DB result request handler.
580  * This callback is called by POLL thread if OVS DB
581  * result reply is received from the DB server.
582  * Once registered callback found, it's called
583  * by this handler. */
584 static int
585 ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
586 {
587   ovs_callback_t *cb = NULL;
588   yajl_val jresult;
589   yajl_val jerror;
590   yajl_val jid;
591   const char *result_path[] = {"result", NULL};
592   const char *error_path[] = {"error", NULL};
593   const char *id_path[] = {"id", NULL};
594
595   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
596   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
597   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
598
599   /* check & get result attributes */
600   if (!jresult || !jerror || !jid)
601     return (-1);
602
603   /* try to find registered callback */
604   cb = ovs_db_table_callback_get(pdb, jid);
605   if (cb != NULL && cb->result.call != NULL) {
606     /* call registered callback */
607     cb->result.call(jresult, jerror);
608     /* unlock owner of the reply */
609     sem_post(&cb->result.sync);
610   }
611
612   return (0);
613 }
614
615 /* Handle JSON data (one request) and call
616  * appropriate event OVS DB handler. Currently,
617  * update callback 'ovs_db_table_update_cb' and
618  * result callback 'ovs_db_result_cb' is supported.
619  */
620 static int
621 ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
622 {
623   const char *method = NULL;
624   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
625   const char *method_path[] = {"method", NULL};
626   const char *result_path[] = {"result", NULL};
627   char *sjson = NULL;
628   yajl_val jnode, jval;
629
630   /* duplicate the data to make null-terminated string
631    * required for yajl_tree_parse() */
632   if ((sjson = strndup(data, len)) == NULL)
633     return (-1);
634
635   OVS_DEBUG("%s", sjson);
636
637   /* parse json data */
638   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
639   if (jnode == NULL) {
640     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
641     return (-1);
642   }
643
644   /* get method name */
645   if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
646     method = YAJL_GET_STRING(jval);
647     if (strcmp("echo", method) == 0) {
648       /* echo request from the server */
649       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
650         OVS_ERROR("handle echo request failed");
651     } else if (strcmp("update", method) == 0) {
652       /* update notification */
653       if (ovs_db_table_update_cb(pdb, jnode) < 0)
654         OVS_ERROR("handle update notification failed");
655     }
656   } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_object)) {
657     /* result notification */
658     if (ovs_db_result_cb(pdb, jnode) < 0)
659       OVS_ERROR("handle result reply failed");
660   }
661
662   /* release memory */
663   yajl_tree_free(jnode);
664   sfree(sjson);
665   return (0);
666 }
667
668 /*
669  * JSON reader implementation.
670  *
671  * This module process raw JSON data (byte stream) and
672  * returns fully-fledged JSON data which can be processed
673  * (parsed) by YAJL later.
674  */
675
676 /* Allocate JSON reader instance */
677 static inline ovs_json_reader_t *
678 ovs_json_reader_alloc()
679 {
680   ovs_json_reader_t *jreader = NULL;
681
682   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
683     return NULL;
684
685   return jreader;
686 }
687
688 /* Push raw data into into the JSON reader for processing */
689 static inline int
690 ovs_json_reader_push_data(ovs_json_reader_t *jreader,
691                           const char *data, size_t data_len)
692 {
693   char *new_buff = NULL;
694   size_t available = jreader->buff_size - jreader->buff_offset;
695
696   /* check/update required memory space */
697   if (available < data_len) {
698     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
699               (int)jreader->buff_size, (int)available, (int)data_len);
700
701     /* allocate new chunk of memory */
702     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
703     if (new_buff == NULL)
704       return (-1);
705
706     /* point to new allocated memory */
707     jreader->buff_ptr = new_buff;
708     jreader->buff_size += data_len;
709   }
710
711   /* store input data */
712   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
713   jreader->buff_offset += data_len;
714   return (0);
715 }
716
717 /* Pop one fully-fledged JSON if already exists. Returns 0 if
718  * completed JSON already exists otherwise negative value is
719  * returned */
720 static inline int
721 ovs_json_reader_pop(ovs_json_reader_t *jreader,
722                     const char **json_ptr, size_t *json_len_ptr)
723 {
724   size_t nbraces = 0;
725   size_t json_len = 0;
726   char *json = NULL;
727
728   /* search open/close brace */
729   for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
730     if (jreader->buff_ptr[i] == '{') {
731       nbraces++;
732     } else if (jreader->buff_ptr[i] == '}')
733       if (nbraces)
734         if (!(--nbraces)) {
735           /* JSON data */
736           *json_ptr = jreader->buff_ptr + jreader->json_offset;
737           *json_len_ptr = json_len + 1;
738           jreader->json_offset = i + 1;
739           return (0);
740         }
741
742     /* increase JSON data length */
743     if (nbraces)
744       json_len++;
745   }
746
747   if (jreader->json_offset) {
748     if (jreader->json_offset < jreader->buff_offset) {
749       /* shift data to the beginning of the buffer
750        * and zero rest of the buffer data */
751       json = &jreader->buff_ptr[jreader->json_offset];
752       json_len = jreader->buff_offset - jreader->json_offset;
753       for (int i = 0; i < jreader->buff_size; i++)
754         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
755       jreader->buff_offset = json_len;
756     } else
757       /* reset the buffer */
758       jreader->buff_offset = 0;
759
760     /* data is at the beginning of the buffer */
761     jreader->json_offset = 0;
762   }
763
764   return (-1);
765 }
766
767 /* Reset JSON reader. It is useful when start processing
768  * new raw data. E.g.: in case of lost stream connection.
769  */
770 static inline void
771 ovs_json_reader_reset(ovs_json_reader_t *jreader)
772 {
773   if (jreader) {
774     jreader->buff_offset = 0;
775     jreader->json_offset = 0;
776   }
777 }
778
779 /* Release internal data allocated for JSON reader */
780 static inline void
781 ovs_json_reader_free(ovs_json_reader_t *jreader)
782 {
783   if (jreader) {
784     free(jreader->buff_ptr);
785     free(jreader);
786   }
787 }
788
789 /* Reconnect to OVD DB and call init OVS DB callback
790  * 'init_cb' if connection has been established.
791  */
792 static int
793 ovs_db_reconnect(ovs_db_t *pdb)
794 {
795   char errbuff[OVS_ERROR_BUFF_SIZE];
796
797   /* remove all registered OVS DB table/result callbacks */
798   ovs_db_callback_remove_all(pdb);
799
800   /* open new socket */
801   if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
802     sstrerror(errno, errbuff, sizeof(errbuff));
803     OVS_ERROR("socket(): %s", errbuff);
804     return (-1);
805   }
806
807   /* try to connect to server */
808   if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
809               pdb->conn.addr_size) < 0) {
810     sstrerror(errno, errbuff, sizeof(errbuff));
811     OVS_ERROR("connect(): %s", errbuff);
812     close(pdb->conn.sock);
813     return (-1);
814   }
815
816   /* send notification to event thread */
817   ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
818   return (0);
819 }
820
821 /* POLL worker thread.
822  * It listens on OVS DB connection for incoming
823  * requests/reply/events etc. Also, it reconnects to OVS DB
824  * if connection has been lost.
825  */
826 static void *
827 ovs_poll_worker(void *arg)
828 {
829   ovs_db_t *pdb = (ovs_db_t *)arg;      /* pointer to OVS DB */
830   ovs_json_reader_t *jreader = NULL;
831   const char *json;
832   size_t json_len;
833   ssize_t nbytes = 0;
834   char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
835   struct pollfd poll_fd;
836   int poll_ret = 0;
837
838   if ((jreader = ovs_json_reader_alloc()) == NULL) {
839     OVS_ERROR("initialize json reader failed");
840     goto thread_exit;
841   }
842
843   /* start polling data */
844   poll_fd.fd = pdb->conn.sock;
845   poll_fd.events = POLLIN | POLLPRI;
846   poll_fd.revents = 0;
847
848   /* poll data */
849   while (ovs_db_poll_is_running(pdb)) {
850     poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
851     if (poll_ret > 0) {
852       if (poll_fd.revents & POLLNVAL) {
853         /* invalid file descriptor, reconnect */
854         if (ovs_db_reconnect(pdb) != 0) {
855           /* sleep awhile until next reconnect */
856           usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
857         }
858         ovs_json_reader_reset(jreader);
859         poll_fd.fd = pdb->conn.sock;
860       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
861         /* connection is broken */
862         OVS_ERROR("poll() peer closed its end of the channel");
863         close(poll_fd.fd);
864       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
865         /* read incoming data */
866         nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
867         if (nbytes > 0) {
868           OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
869           ovs_json_reader_push_data(jreader, buff, nbytes);
870           while (!ovs_json_reader_pop(jreader, &json, &json_len))
871             /* process JSON data */
872             ovs_db_json_data_process(pdb, json, json_len);
873         } else if (nbytes == 0) {
874           OVS_ERROR("recv() peer has performed an orderly shutdown");
875           close(poll_fd.fd);
876         } else {
877           OVS_ERROR("recv() receive data error");
878           break;
879         }
880       }                         /* poll() POLLIN & POLLPRI */
881     } else if (poll_ret == 0)
882       OVS_DEBUG("poll() timeout");
883     else {
884       OVS_ERROR("poll() error");
885       break;
886     }
887   }
888
889 thread_exit:
890   OVS_DEBUG("poll thread has been completed");
891   ovs_json_reader_free(jreader);
892   pthread_exit((void *)0);
893   return ((void *)0);
894 }
895
896 /* EVENT worker thread.
897  * Perform task based on incoming events. This
898  * task can be done asynchronously which allows to
899  * handle OVD DB callback like 'init_cb'.
900  */
901 static void *
902 ovs_event_worker(void *arg)
903 {
904   int ret = 0;
905   ovs_db_t *pdb = (ovs_db_t *)arg;
906   struct timespec ts;
907
908   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
909     /* wait for an event */
910     clock_gettime(CLOCK_REALTIME, &ts);
911     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
912     ret = pthread_cond_timedwait(&pdb->event_thread.cond,
913                                  &pdb->event_thread.mutex, &ts);
914     if (!ret) {
915       /* handle the event */
916       OVS_DEBUG("handle event %d", pdb->event_thread.value);
917       if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
918         if (pdb->init_cb)
919           pdb->init_cb(pdb);
920     } else if (ret == ETIMEDOUT) {
921       /* wait timeout */
922       OVS_DEBUG("no event received (timeout)");
923       continue;
924     } else {
925       /* unexpected error */
926       OVS_ERROR("pthread_cond_timedwait() failed");
927       break;
928     }
929   }
930
931 thread_exit:
932   OVS_DEBUG("event thread has been completed");
933   pthread_exit((void *)0);
934   return ((void *)0);
935 }
936
937 /* Stop EVENT thread */
938 static int
939 ovs_db_event_thread_stop(ovs_db_t *pdb)
940 {
941   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
942   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
943     return (-1);
944   pthread_mutex_unlock(&pdb->event_thread.mutex);
945   pthread_mutex_destroy(&pdb->event_thread.mutex);
946   return (0);
947 }
948
949 /* Stop POLL thread */
950 static int
951 ovs_db_poll_thread_stop(ovs_db_t *pdb)
952 {
953   ovs_db_poll_terminate(pdb);
954   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
955     return (-1);
956   pthread_mutex_destroy(&pdb->poll_thread.mutex);
957   return (0);
958 }
959
960 /*
961  * Public OVS DB API implementation
962  */
963
964 ovs_db_t *
965 ovs_db_init(const char *surl, ovs_db_callback_t *cb)
966 {
967   pthread_mutexattr_t mutex_attr;
968   ovs_db_t *pdb = NULL;
969
970   /* allocate db data & fill it */
971   if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
972     return (NULL);
973
974   /* convert string url to socket addr */
975   if (ovs_db_url_parse(surl, &pdb->conn) < 0)
976     goto failure;
977
978   /* setup OVS DB callbacks */
979   if (cb)
980     pdb->init_cb = cb->init_cb;
981
982   /* prepare event thread */
983   pthread_cond_init(&pdb->event_thread.cond, NULL);
984   pthread_mutex_init(&pdb->event_thread.mutex, NULL);
985   pthread_mutex_lock(&pdb->event_thread.mutex);
986   if (plugin_thread_create(&pdb->event_thread.tid, NULL,
987                            ovs_event_worker, pdb) != 0) {
988     OVS_ERROR("event worker start failed");
989     goto failure;
990   }
991
992   /* prepare polling thread */
993   ovs_db_reconnect(pdb);
994   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
995   pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
996   if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
997                            ovs_poll_worker, pdb) != 0) {
998     OVS_ERROR("pull worker start failed");
999     goto failure;
1000   }
1001
1002   /* init OVS DB mutex */
1003   if (pthread_mutexattr_init(&mutex_attr) ||
1004       pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
1005       pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1006     OVS_ERROR("OVS DB mutex init failed");
1007     goto failure;
1008   }
1009
1010   /* return db to the caller */
1011   return pdb;
1012
1013 failure:
1014   if (pdb->conn.sock)
1015     /* close connection */
1016     close(pdb->conn.sock);
1017   if (pdb->event_thread.tid != 0)
1018     /* stop event thread */
1019     if (ovs_db_event_thread_stop(pdb) < 0)
1020       OVS_ERROR("stop event thread failed");
1021   if (pdb->poll_thread.tid != 0)
1022     /* stop poll thread */
1023     if (ovs_db_poll_thread_stop(pdb) < 0)
1024       OVS_ERROR("stop poll thread failed");
1025   sfree(pdb);
1026   return NULL;
1027 }
1028
1029 int
1030 ovs_db_send_request(ovs_db_t *pdb, const char *method,
1031                     const char *params, ovs_db_result_cb_t cb)
1032 {
1033   int ret = 0;
1034   yajl_gen_status yajl_gen_ret;
1035   yajl_val jparams;
1036   yajl_gen jgen;
1037   ovs_callback_t *new_cb = NULL;
1038   uint64_t uid;
1039   char uid_buff[OVS_UID_STR_SIZE];
1040   const char *req = NULL;
1041   size_t req_len = 0;
1042   struct timespec ts;
1043
1044   /* sanity check */
1045   if (!pdb || !method || !params)
1046     return (-1);
1047
1048   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1049     return (-1);
1050
1051   /* try to parse params */
1052   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1053     OVS_ERROR("params is not a JSON string");
1054     yajl_gen_clear(jgen);
1055     return (-1);
1056   }
1057
1058   /* generate method field */
1059   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1060
1061   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1062   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1063
1064   /* generate params field */
1065   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1066   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1067   yajl_tree_free(jparams);
1068
1069   /* generate id field */
1070   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1071   uid = ovs_uid_generate();
1072   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1073   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1074
1075   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1076
1077   if (cb) {
1078     /* register result callback */
1079     if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1080       goto yajl_gen_failure;
1081
1082     /* add new callback to front */
1083     sem_init(&new_cb->result.sync, 0, 0);
1084     new_cb->result.call = cb;
1085     new_cb->uid = uid;
1086     ovs_db_callback_add(pdb, new_cb);
1087   }
1088
1089   /* send the request */
1090   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
1091                 &req_len);
1092   OVS_DEBUG("%s", req);
1093   if (!ovs_db_data_send(pdb, req, req_len)) {
1094     if (cb) {
1095       /* wait for result */
1096       clock_gettime(CLOCK_REALTIME, &ts);
1097       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1098       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1099         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1100                   OVS_DB_SEND_REQ_TIMEOUT);
1101         ret = (-1);
1102       }
1103     }
1104   } else {
1105     OVS_ERROR("ovs_db_data_send() failed");
1106     ret = (-1);
1107   }
1108
1109 yajl_gen_failure:
1110   if (new_cb) {
1111     /* destroy callback */
1112     sem_destroy(&new_cb->result.sync);
1113     ovs_db_callback_remove(pdb, new_cb);
1114   }
1115
1116   /* release memory */
1117   yajl_gen_clear(jgen);
1118   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1119 }
1120
1121 int
1122 ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1123                          const char **tb_column, ovs_db_table_cb_t update_cb,
1124                          ovs_db_result_cb_t result_cb, unsigned int flags)
1125 {
1126   yajl_gen jgen;
1127   yajl_gen_status yajl_gen_ret;
1128   ovs_callback_t *new_cb = NULL;
1129   char uid_str[OVS_UID_STR_SIZE];
1130   char *params;
1131   size_t params_len;
1132   int ovs_db_ret = 0;
1133
1134   /* sanity check */
1135   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1136     return (-1);
1137
1138   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1139     return (-1);
1140
1141   /* register table update callback */
1142   if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1143     return (-1);
1144
1145   /* add new callback to front */
1146   new_cb->table.call = update_cb;
1147   new_cb->uid = ovs_uid_generate();
1148   ovs_db_callback_add(pdb, new_cb);
1149
1150   /* make update notification request
1151    * [<db-name>, <json-value>, <monitor-requests>] */
1152   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1153   {
1154     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1155
1156     /* uid string <json-value> */
1157     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1158     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1159
1160     /* <monitor-requests> */
1161     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1162     {
1163       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1164       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1165       {
1166         /* <monitor-request> */
1167         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1168         {
1169           if (tb_column) {
1170             /* columns within the table to be monitored */
1171             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1172             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1173             for (; *tb_column; tb_column++)
1174               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1175             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1176           }
1177           /* specify select option */
1178           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1179           {
1180             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1181             {
1182               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1183               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1184                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1185               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1186               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1187                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1188               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1189               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1190                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1191               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1192               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1193                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1194             }
1195             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1196           }
1197         }
1198         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1199       }
1200       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1201     }
1202     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1203   }
1204   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1205
1206   /* make a request to subscribe to given table */
1207   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1208                 &params_len);
1209   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1210     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1211     ovs_db_ret = (-1);
1212   }
1213
1214 yajl_gen_failure:
1215   /* release memory */
1216   yajl_gen_clear(jgen);
1217   return ovs_db_ret;
1218 }
1219
1220 int
1221 ovs_db_destroy(ovs_db_t *pdb)
1222 {
1223   int ovs_db_ret = 0;
1224   int ret = 0;
1225
1226   /* sanity check */
1227   if (pdb == NULL)
1228     return (-1);
1229
1230   /* try to lock the structure before releasing */
1231   if (ret = pthread_mutex_lock(&pdb->mutex)) {
1232     OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1233     return (-1);
1234   }
1235
1236   /* stop poll thread */
1237   if (ovs_db_event_thread_stop(pdb) < 0) {
1238     OVS_ERROR("stop poll thread failed");
1239     ovs_db_ret = (-1);
1240   }
1241
1242   /* stop event thread */
1243   if (ovs_db_poll_thread_stop(pdb) < 0) {
1244     OVS_ERROR("stop event thread failed");
1245     ovs_db_ret = (-1);
1246   }
1247
1248   /* unsubscribe callbacks */
1249   ovs_db_callback_remove_all(pdb);
1250
1251   /* close connection */
1252   if (pdb->conn.sock)
1253     close(pdb->conn.sock);
1254
1255   /* release DB handler */
1256   pthread_mutex_unlock(&pdb->mutex);
1257   pthread_mutex_destroy(&pdb->mutex);
1258   sfree(pdb);
1259   return ovs_db_ret;
1260 }
1261
1262 /*
1263  * Public OVS utils API implementation
1264  */
1265
1266 /* Get YAJL value by key from YAJL dictionary */
1267 yajl_val
1268 ovs_utils_get_value_by_key(yajl_val jval, const char *key)
1269 {
1270   const char *obj_key = NULL;
1271
1272   /* check params */
1273   if (!YAJL_IS_OBJECT(jval) || !key)
1274     return NULL;
1275
1276   /* find a value by key */
1277   for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1278     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1279     if (strcmp(obj_key, key) == 0)
1280       return YAJL_GET_OBJECT(jval)->values[i];
1281   }
1282
1283   return NULL;
1284 }