OVS events: Address all PR comments
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  **/
27
28 /* clang-format off */
29 /*
30  *                         OVS DB API internal architecture diagram
31  * +------------------------------------------------------------------------------+
32  * |OVS plugin      |OVS utils                                                    |
33  * |                |     +------------------------+                              |
34  * |                |     |      echo handler      |                JSON request/ |
35  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36  * |                |  |  |                        |    |         | result        |
37  * |                |  |  +------------------------+    |         |               |
38  * |                |  |                                |    +----+---+--------+  |
39  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
40  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
41  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
42  * |  +----------+  |  |  |                        |    |    |        |        |  |
43  * |                |  |  +------------------------+    |    +--------+---+----+  |
44  * |                |  |                                |                 ^       |
45  * |  +----------+  |  |  +------------------------+    |                 |       |
46  * |  |  result  |  |  |  |     result handler     |    |                 |       |
47  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
48  * |  +----------+  |  |  |                        |               data   |       |
49  * |                |  |  +------------------------+                      |       |
50  * |                |  |                                                  |       |
51  * |                |  |    +------------------+             +------------+----+  |
52  * |  +----------+  |  |    |thread|           |             |thread|          |  |
53  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
54  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
55  * |  +----------+  |  |    +------------------+             +--------+--------+  |
56  * |                |  |                                              ^           |
57  * +----------------+-------------------------------------------------------------+
58  *                     |                                              |
59  *                 JSON|echo reply                                 raw|data
60  *                     v                                              v
61  * +-------------------+----------------------------------------------+-----------+
62  * |                                 TCP/UNIX socket                              |
63  * +-------------------------------------------------------------------------------
64  */
65 /* clang-format on */
66
67 /* collectd headers */
68 #include "collectd.h"
69
70 #include "common.h"
71
72 /* private headers */
73 #include "utils_ovs.h"
74
75 /* system libraries */
76 #if HAVE_NETDB_H
77 #include <netdb.h>
78 #endif
79 #if HAVE_ARPA_INET_H
80 #include <arpa/inet.h>
81 #endif
82 #if HAVE_POLL_H
83 #include <poll.h>
84 #endif
85 #if HAVE_SYS_UN_H
86 #include <sys/un.h>
87 #endif
88
89 #include <semaphore.h>
90
91 #define OVS_ERROR(fmt, ...)                                                    \
92   do {                                                                         \
93     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
94   } while (0)
95 #define OVS_DEBUG(fmt, ...)                                                    \
96   do {                                                                         \
97     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
98           ##__VA_ARGS__);                                                      \
99   } while (0)
100
101 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
102 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
103 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
104 #define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
105
106 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
107 #define OVS_DB_EVENT_TERMINATE 1
108 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
109 #define OVS_DB_EVENT_CONN_TERMINATED 3
110
111 #define OVS_DB_POLL_STATE_RUNNING 1
112 #define OVS_DB_POLL_STATE_EXITING 2
113
114 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
115
116 #define OVS_YAJL_CALL(func, ...)                                               \
117   do {                                                                         \
118     yajl_gen_ret = yajl_gen_status_ok;                                         \
119     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
120       goto yajl_gen_failure;                                                   \
121   } while (0)
122 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
123 #define OVS_ERROR_BUFF_SIZE 512
124 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
125
126 /* JSON reader internal data */
127 struct ovs_json_reader_s {
128   char *buff_ptr;
129   size_t buff_size;
130   size_t buff_offset;
131   size_t json_offset;
132 };
133 typedef struct ovs_json_reader_s ovs_json_reader_t;
134
135 /* Result callback declaration */
136 struct ovs_result_cb_s {
137   sem_t sync;
138   ovs_db_result_cb_t call;
139 };
140 typedef struct ovs_result_cb_s ovs_result_cb_t;
141
142 /* Table callback declaration */
143 struct ovs_table_cb_s {
144   ovs_db_table_cb_t call;
145 };
146 typedef struct ovs_table_cb_s ovs_table_cb_t;
147
148 /* Callback declaration */
149 struct ovs_callback_s {
150   uint64_t uid;
151   union {
152     ovs_result_cb_t result;
153     ovs_table_cb_t table;
154   };
155   struct ovs_callback_s *next;
156   struct ovs_callback_s *prev;
157 };
158 typedef struct ovs_callback_s ovs_callback_t;
159
160 /* Event thread data declaration */
161 struct ovs_event_thread_s {
162   pthread_t tid;
163   pthread_mutex_t mutex;
164   pthread_cond_t cond;
165   int value;
166 };
167 typedef struct ovs_event_thread_s ovs_event_thread_t;
168
169 /* Poll thread data declaration */
170 struct ovs_poll_thread_s {
171   pthread_t tid;
172   pthread_mutex_t mutex;
173   int state;
174 };
175 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
176
177 /* OVS DB internal data declaration */
178 struct ovs_db_s {
179   ovs_poll_thread_t poll_thread;
180   ovs_event_thread_t event_thread;
181   pthread_mutex_t mutex;
182   ovs_callback_t *remote_cb;
183   ovs_db_callback_t cb;
184   char service[OVS_DB_ADDR_SERVICE_SIZE];
185   char node[OVS_DB_ADDR_NODE_SIZE];
186   int sock;
187 };
188
189 /* Post an event to event thread.
190  * Possible events are:
191  *  OVS_DB_EVENT_TERMINATE
192  *  OVS_DB_EVENT_CONN_ESTABLISHED
193  *  OVS_DB_EVENT_CONN_TERMINATED
194  */
195 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
196   pthread_mutex_lock(&pdb->event_thread.mutex);
197   pdb->event_thread.value = event;
198   pthread_mutex_unlock(&pdb->event_thread.mutex);
199   pthread_cond_signal(&pdb->event_thread.cond);
200 }
201
202 /* Check if POLL thread is still running. Returns
203  * 1 if running otherwise 0 is returned */
204 static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) {
205   int state = 0;
206   pthread_mutex_lock(&pdb->poll_thread.mutex);
207   state = pdb->poll_thread.state;
208   pthread_mutex_unlock(&pdb->poll_thread.mutex);
209   return (state == OVS_DB_POLL_STATE_RUNNING);
210 }
211
212 /* Terminate POLL thread */
213 static void ovs_db_poll_terminate(ovs_db_t *pdb) {
214   pthread_mutex_lock(&pdb->poll_thread.mutex);
215   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
216   pthread_mutex_unlock(&pdb->poll_thread.mutex);
217 }
218
219 /* Generate unique identifier (UID). It is used by OVS DB API
220  * to set "id" field for any OVS DB JSON request. */
221 static uint64_t ovs_uid_generate() {
222   struct timespec ts;
223   clock_gettime(CLOCK_MONOTONIC, &ts);
224   return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
225 }
226
227 /*
228  * Callback API. These function are used to store
229  * registered callbacks in OVS DB API.
230  */
231
232 /* Add new callback into OVS DB object */
233 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
234   pthread_mutex_lock(&pdb->mutex);
235   if (pdb->remote_cb)
236     pdb->remote_cb->prev = new_cb;
237   new_cb->next = pdb->remote_cb;
238   new_cb->prev = NULL;
239   pdb->remote_cb = new_cb;
240   pthread_mutex_unlock(&pdb->mutex);
241 }
242
243 /* Remove callback from OVS DB object */
244 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
245   ovs_callback_t *pre_cb = del_cb->prev;
246   ovs_callback_t *next_cb = del_cb->next;
247
248   pthread_mutex_lock(&pdb->mutex);
249   if (next_cb)
250     next_cb->prev = del_cb->prev;
251
252   if (pre_cb)
253     pre_cb->next = del_cb->next;
254   else
255     pdb->remote_cb = del_cb->next;
256
257   free(del_cb);
258   pthread_mutex_unlock(&pdb->mutex);
259 }
260
261 /* Remove all callbacks form OVS DB object */
262 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
263   pthread_mutex_lock(&pdb->mutex);
264   for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
265        del_cb = pdb->remote_cb) {
266     pdb->remote_cb = del_cb->next;
267     free(del_cb);
268   }
269   pdb->remote_cb = NULL;
270   pthread_mutex_unlock(&pdb->mutex);
271 }
272
273 /* Get/find callback in OVS DB object by UID. Returns pointer
274  * to requested callback otherwise NULL is returned.
275  *
276  * IMPORTANT NOTE:
277  *   The OVS DB mutex should be locked by the caller
278  *   to make sure that returned callback is still valid.
279  */
280 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
281   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
282     if (cb->uid == uid)
283       return cb;
284   return NULL;
285 }
286
287 /* Send all requested data to the socket. Returns 0 if
288  * ALL request data has been sent otherwise negative value
289  * is returned */
290 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
291   ssize_t nbytes = 0;
292   size_t rem = len;
293   size_t off = 0;
294
295   while (rem > 0) {
296     if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0)
297       return (-1);
298     rem -= (size_t)nbytes;
299     off += (size_t)nbytes;
300   }
301   return (0);
302 }
303
304 /*
305  * YAJL (Yet Another JSON Library) helper functions
306  * Documentation (https://lloyd.github.io/yajl/)
307  */
308
309 /* Add null-terminated string into YAJL generator handle (JSON object).
310  * Similar function to yajl_gen_string() but takes null-terminated string
311  * instead of string and its length.
312  *
313  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
314  * string - Null-terminated string
315  */
316 static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
317                                             const char *string) {
318   return yajl_gen_string(hander, (const unsigned char *)string, strlen(string));
319 }
320
321 /* Add YAJL value into YAJL generator handle (JSON object)
322  *
323  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
324  * jval - YAJL value usually returned by yajl_tree_get()
325  */
326 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
327   size_t array_len = 0;
328   yajl_val *jvalues = NULL;
329   yajl_val jobj_value = NULL;
330   const char *obj_key = NULL;
331   size_t obj_len = 0;
332   yajl_gen_status yajl_gen_ret;
333
334   if (YAJL_IS_STRING(jval))
335     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
336   else if (YAJL_IS_DOUBLE(jval))
337     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
338   else if (YAJL_IS_INTEGER(jval))
339     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
340   else if (YAJL_IS_TRUE(jval))
341     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
342   else if (YAJL_IS_FALSE(jval))
343     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
344   else if (YAJL_IS_NULL(jval))
345     OVS_YAJL_CALL(yajl_gen_null, jgen);
346   else if (YAJL_IS_ARRAY(jval)) {
347     /* create new array and add all elements into the array */
348     array_len = YAJL_GET_ARRAY(jval)->len;
349     jvalues = YAJL_GET_ARRAY(jval)->values;
350     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
351     for (int i = 0; i < array_len; i++)
352       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
353     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
354   } else if (YAJL_IS_OBJECT(jval)) {
355     /* create new object and add all elements into the object */
356     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
357     obj_len = YAJL_GET_OBJECT(jval)->len;
358     for (int i = 0; i < obj_len; i++) {
359       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
360       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
361       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
362       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
363     }
364     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
365   } else {
366     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
367               (int)(jval)->type);
368     goto yajl_gen_failure;
369   }
370   return yajl_gen_status_ok;
371
372 yajl_gen_failure:
373   OVS_ERROR("%s() error to generate value", __FUNCTION__);
374   return yajl_gen_ret;
375 }
376
377 /* OVS DB echo request handler. When OVS DB sends
378  * "echo" request to the client, client should generate
379  * "echo" replay with the same content received in the
380  * request */
381 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
382   yajl_val jparams;
383   yajl_val jid;
384   yajl_gen jgen;
385   size_t resp_len = 0;
386   const char *resp = NULL;
387   const char *params_path[] = {"params", NULL};
388   const char *id_path[] = {"id", NULL};
389   yajl_gen_status yajl_gen_ret;
390
391   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
392     return (-1);
393
394   /* check & get request attributes */
395   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
396       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
397     OVS_ERROR("parse echo request failed");
398     goto yajl_gen_failure;
399   }
400
401   /* generate JSON echo response */
402   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
403
404   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
405   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
406
407   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
408   OVS_YAJL_CALL(yajl_gen_null, jgen);
409
410   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
411   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
412
413   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
414   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
415                 &resp_len);
416
417   /* send the response */
418   OVS_DEBUG("response: %s", resp);
419   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
420     OVS_ERROR("send echo reply failed");
421     goto yajl_gen_failure;
422   }
423   /* clean up and return success */
424   yajl_gen_clear(jgen);
425   return (0);
426
427 yajl_gen_failure:
428   /* release memory */
429   yajl_gen_clear(jgen);
430   return (-1);
431 }
432
433 /* Get OVS DB registered callback by YAJL val. The YAJL
434  * value should be YAJL string (UID). Returns NULL if
435  * callback hasn't been found. See also ovs_db_callback_get()
436  * description for addition info.
437  */
438 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
439   char *endptr = NULL;
440   const char *suid = NULL;
441   uint64_t uid;
442
443   if (jid && YAJL_IS_STRING(jid)) {
444     suid = YAJL_GET_STRING(jid);
445     uid = (uint64_t)strtoul(suid, &endptr, 16);
446     if (*endptr == '\0' && uid)
447       return ovs_db_callback_get(pdb, uid);
448   }
449
450   return NULL;
451 }
452
453 /* OVS DB table update event handler.
454  * This callback is called by POLL thread if OVS DB
455  * table update callback is received from the DB
456  * server. Once registered callback found, it's called
457  * by this handler. */
458 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
459   ovs_callback_t *cb = NULL;
460   yajl_val jvalue;
461   yajl_val jparams;
462   yajl_val jtable_updates;
463   yajl_val jtable_update;
464   size_t obj_len = 0;
465   const char *table_name = NULL;
466   const char *params_path[] = {"params", NULL};
467   const char *id_path[] = {"id", NULL};
468
469   /* check & get request attributes */
470   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
471       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
472     goto ovs_failure;
473
474   /* check array length: [<json-value>, <table-updates>] */
475   if (YAJL_GET_ARRAY(jparams)->len != 2)
476     goto ovs_failure;
477
478   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
479   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
480   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
481     goto ovs_failure;
482
483   /* find registered callback based on <json-value> */
484   pthread_mutex_lock(&pdb->mutex);
485   cb = ovs_db_table_callback_get(pdb, jvalue);
486   if (cb == NULL || cb->table.call == NULL) {
487     pthread_mutex_unlock(&pdb->mutex);
488     goto ovs_failure;
489   }
490
491   /* call registered callback */
492   cb->table.call(jtable_updates);
493   pthread_mutex_unlock(&pdb->mutex);
494   return 0;
495
496 ovs_failure:
497   OVS_ERROR("invalid OVS DB table update event");
498   return (-1);
499 }
500
501 /* OVS DB result request handler.
502  * This callback is called by POLL thread if OVS DB
503  * result reply is received from the DB server.
504  * Once registered callback found, it's called
505  * by this handler. */
506 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
507   ovs_callback_t *cb = NULL;
508   yajl_val jresult;
509   yajl_val jerror;
510   yajl_val jid;
511   const char *result_path[] = {"result", NULL};
512   const char *error_path[] = {"error", NULL};
513   const char *id_path[] = {"id", NULL};
514
515   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
516   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
517   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
518
519   /* check & get result attributes */
520   if (!jresult || !jerror || !jid)
521     return (-1);
522
523   /* try to find registered callback */
524   pthread_mutex_lock(&pdb->mutex);
525   cb = ovs_db_table_callback_get(pdb, jid);
526   if (cb != NULL && cb->result.call != NULL) {
527     /* call registered callback */
528     cb->result.call(jresult, jerror);
529     /* unlock owner of the reply */
530     sem_post(&cb->result.sync);
531   }
532
533   pthread_mutex_unlock(&pdb->mutex);
534   return (0);
535 }
536
537 /* Handle JSON data (one request) and call
538  * appropriate event OVS DB handler. Currently,
539  * update callback 'ovs_db_table_update_cb' and
540  * result callback 'ovs_db_result_cb' is supported.
541  */
542 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
543                                     size_t len) {
544   const char *method = NULL;
545   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
546   const char *method_path[] = {"method", NULL};
547   const char *result_path[] = {"result", NULL};
548   char *sjson = NULL;
549   yajl_val jnode, jval;
550
551   /* duplicate the data to make null-terminated string
552    * required for yajl_tree_parse() */
553   if ((sjson = malloc(len + 1)) == NULL)
554     return (-1);
555
556   sstrncpy(sjson, data, len + 1);
557   OVS_DEBUG("[len=%zu] %s", len, sjson);
558
559   /* parse json data */
560   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
561   if (jnode == NULL) {
562     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
563     sfree(sjson);
564     return (-1);
565   }
566
567   /* get method name */
568   if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) {
569     method = YAJL_GET_STRING(jval);
570     if (strcmp("echo", method) == 0) {
571       /* echo request from the server */
572       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
573         OVS_ERROR("handle echo request failed");
574     } else if (strcmp("update", method) == 0) {
575       /* update notification */
576       if (ovs_db_table_update_cb(pdb, jnode) < 0)
577         OVS_ERROR("handle update notification failed");
578     }
579   } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) {
580     /* result notification */
581     if (ovs_db_result_cb(pdb, jnode) < 0)
582       OVS_ERROR("handle result reply failed");
583   } else
584     OVS_ERROR("connot find method or result failed");
585
586   /* release memory */
587   yajl_tree_free(jnode);
588   sfree(sjson);
589   return (0);
590 }
591
592 /*
593  * JSON reader implementation.
594  *
595  * This module process raw JSON data (byte stream) and
596  * returns fully-fledged JSON data which can be processed
597  * (parsed) by YAJL later.
598  */
599
600 /* Allocate JSON reader instance */
601 static ovs_json_reader_t *ovs_json_reader_alloc() {
602   ovs_json_reader_t *jreader = NULL;
603
604   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
605     return NULL;
606
607   return jreader;
608 }
609
610 /* Push raw data into into the JSON reader for processing */
611 static int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
612                                      const char *data, size_t data_len) {
613   char *new_buff = NULL;
614   size_t available = jreader->buff_size - jreader->buff_offset;
615
616   /* check/update required memory space */
617   if (available < data_len) {
618     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
619               (int)jreader->buff_size, (int)available, (int)data_len);
620
621     /* allocate new chunk of memory */
622     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
623     if (new_buff == NULL)
624       return (-1);
625
626     /* point to new allocated memory */
627     jreader->buff_ptr = new_buff;
628     jreader->buff_size += data_len;
629   }
630
631   /* store input data */
632   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
633   jreader->buff_offset += data_len;
634   return (0);
635 }
636
637 /* Pop one fully-fledged JSON if already exists. Returns 0 if
638  * completed JSON already exists otherwise negative value is
639  * returned */
640 static int ovs_json_reader_pop(ovs_json_reader_t *jreader,
641                                const char **json_ptr, size_t *json_len_ptr) {
642   size_t nbraces = 0;
643   size_t json_len = 0;
644   char *json = NULL;
645
646   /* search open/close brace */
647   for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
648     if (jreader->buff_ptr[i] == '{') {
649       nbraces++;
650     } else if (jreader->buff_ptr[i] == '}')
651       if (nbraces)
652         if (!(--nbraces)) {
653           /* JSON data */
654           *json_ptr = jreader->buff_ptr + jreader->json_offset;
655           *json_len_ptr = json_len + 1;
656           jreader->json_offset = i + 1;
657           return (0);
658         }
659
660     /* increase JSON data length */
661     if (nbraces)
662       json_len++;
663   }
664
665   if (jreader->json_offset) {
666     if (jreader->json_offset < jreader->buff_offset) {
667       /* shift data to the beginning of the buffer
668        * and zero rest of the buffer data */
669       json = &jreader->buff_ptr[jreader->json_offset];
670       json_len = jreader->buff_offset - jreader->json_offset;
671       for (int i = 0; i < jreader->buff_size; i++)
672         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
673       jreader->buff_offset = json_len;
674     } else
675       /* reset the buffer */
676       jreader->buff_offset = 0;
677
678     /* data is at the beginning of the buffer */
679     jreader->json_offset = 0;
680   }
681
682   return (-1);
683 }
684
685 /* Reset JSON reader. It is useful when start processing
686  * new raw data. E.g.: in case of lost stream connection.
687  */
688 static void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
689   if (jreader) {
690     jreader->buff_offset = 0;
691     jreader->json_offset = 0;
692   }
693 }
694
695 /* Release internal data allocated for JSON reader */
696 static void ovs_json_reader_free(ovs_json_reader_t *jreader) {
697   if (jreader) {
698     free(jreader->buff_ptr);
699     free(jreader);
700   }
701 }
702
703 /* Reconnect to OVD DB and call the OVS DB post connection init callback
704  * if connection has been established.
705  */
706 static int ovs_db_reconnect(ovs_db_t *pdb) {
707   char errbuff[OVS_ERROR_BUFF_SIZE];
708   const char unix_prefix[] = "unix:";
709   struct addrinfo *result, *rp;
710   _Bool is_connected = 0;
711   struct sockaddr_un saunix;
712
713   /* remove all registered OVS DB table/result callbacks */
714   ovs_db_callback_remove_all(pdb);
715
716   if (strncmp(pdb->node, unix_prefix, strlen(unix_prefix)) == 0) {
717     /* create unix socket address */
718     rp = calloc(1, sizeof(struct addrinfo));
719     struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un));
720     if (rp == NULL || sa_unix == NULL) {
721       sfree(rp);
722       sfree(sa_unix);
723       return (1);
724     }
725     rp->ai_family = AF_UNIX;
726     rp->ai_socktype = SOCK_STREAM;
727     rp->ai_addrlen = sizeof(*sa_unix);
728     rp->ai_addr = (struct sockaddr *)sa_unix;
729     sa_unix->sun_family = rp->ai_family;
730     sstrncpy(sa_unix->sun_path, (pdb->node + strlen(unix_prefix)),
731              sizeof(sa_unix->sun_path));
732     result = rp;
733   } else {
734     /* intet socket address */
735     int ret = 0;
736     struct addrinfo hints;
737
738     /* setup criteria for selecting the socket address */
739     memset(&hints, 0, sizeof(hints));
740     hints.ai_family = AF_UNSPEC;
741     hints.ai_socktype = SOCK_STREAM;
742
743     /* get socket addresses */
744     if ((ret = getaddrinfo(pdb->node, pdb->service, &hints, &result)) != 0) {
745       OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret));
746       return (1);
747     }
748   }
749   /* try to connect to the server */
750   for (rp = result; rp != NULL; rp = rp->ai_next) {
751     if ((pdb->sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol)) <
752         0) {
753       sstrerror(errno, errbuff, sizeof(errbuff));
754       OVS_DEBUG("socket(): %s", errbuff);
755       continue;
756     }
757     if (connect(pdb->sock, rp->ai_addr, rp->ai_addrlen) < 0) {
758       sstrerror(errno, errbuff, sizeof(errbuff));
759       OVS_DEBUG("connect(): %s [family=%d]", errbuff, rp->ai_family);
760       close(pdb->sock);
761     } else {
762       is_connected = 1;
763       break;
764     }
765   }
766
767   /* send notification to event thread */
768   if (is_connected)
769     ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
770   else
771     OVS_ERROR("connect to \"%s\" failed", pdb->node);
772
773   freeaddrinfo(result);
774   return !is_connected;
775 }
776
777 /* POLL worker thread.
778  * It listens on OVS DB connection for incoming
779  * requests/reply/events etc. Also, it reconnects to OVS DB
780  * if connection has been lost.
781  */
782 static void *ovs_poll_worker(void *arg) {
783   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
784   ovs_json_reader_t *jreader = NULL;
785   const char *json;
786   size_t json_len;
787   ssize_t nbytes = 0;
788   char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
789   struct pollfd poll_fd;
790   int poll_ret = 0;
791
792   if ((jreader = ovs_json_reader_alloc()) == NULL) {
793     OVS_ERROR("initialize json reader failed");
794     goto thread_exit;
795   }
796
797   /* start polling data */
798   poll_fd.fd = pdb->sock;
799   poll_fd.events = POLLIN | POLLPRI;
800   poll_fd.revents = 0;
801
802   /* poll data */
803   while (ovs_db_poll_is_running(pdb)) {
804     poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
805     if (poll_ret > 0) {
806       if (poll_fd.revents & POLLNVAL) {
807         /* invalid file descriptor, reconnect */
808         if (ovs_db_reconnect(pdb) != 0) {
809           /* sleep awhile until next reconnect */
810           usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
811         }
812         ovs_json_reader_reset(jreader);
813         poll_fd.fd = pdb->sock;
814       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
815         /* connection is broken */
816         OVS_ERROR("poll() peer closed its end of the channel");
817         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
818         close(poll_fd.fd);
819       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
820         /* read incoming data */
821         nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
822         if (nbytes > 0) {
823           OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
824           ovs_json_reader_push_data(jreader, buff, nbytes);
825           while (!ovs_json_reader_pop(jreader, &json, &json_len))
826             /* process JSON data */
827             ovs_db_json_data_process(pdb, json, json_len);
828         } else if (nbytes == 0) {
829           OVS_ERROR("recv() peer has performed an orderly shutdown");
830           ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
831           close(poll_fd.fd);
832         } else {
833           OVS_ERROR("recv() receive data error");
834           break;
835         }
836       } /* poll() POLLIN & POLLPRI */
837     } else if (poll_ret == 0)
838       OVS_DEBUG("poll() timeout");
839     else {
840       OVS_ERROR("poll() error");
841       break;
842     }
843   }
844
845 thread_exit:
846   OVS_DEBUG("poll thread has been completed");
847   ovs_json_reader_free(jreader);
848   pthread_exit((void *)0);
849   return ((void *)0);
850 }
851
852 /* EVENT worker thread.
853  * Perform task based on incoming events. This
854  * task can be done asynchronously which allows to
855  * handle OVD DB callback like 'init_cb'.
856  */
857 static void *ovs_event_worker(void *arg) {
858   int ret = 0;
859   ovs_db_t *pdb = (ovs_db_t *)arg;
860   struct timespec ts;
861
862   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
863     /* wait for an event */
864     clock_gettime(CLOCK_REALTIME, &ts);
865     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
866     ret = pthread_cond_timedwait(&pdb->event_thread.cond,
867                                  &pdb->event_thread.mutex, &ts);
868     if (!ret) {
869       /* handle the event */
870       OVS_DEBUG("handle event %d", pdb->event_thread.value);
871       switch (pdb->event_thread.value) {
872       case OVS_DB_EVENT_CONN_ESTABLISHED:
873         if (pdb->cb.post_conn_init)
874           pdb->cb.post_conn_init(pdb);
875         break;
876       case OVS_DB_EVENT_CONN_TERMINATED:
877         if (pdb->cb.post_conn_terminate)
878           pdb->cb.post_conn_terminate();
879         break;
880       default:
881         OVS_DEBUG("unknown event received");
882         break;
883       }
884     } else if (ret == ETIMEDOUT) {
885       /* wait timeout */
886       OVS_DEBUG("no event received (timeout)");
887       continue;
888     } else {
889       /* unexpected error */
890       OVS_ERROR("pthread_cond_timedwait() failed");
891       break;
892     }
893   }
894
895 thread_exit:
896   OVS_DEBUG("event thread has been completed");
897   pthread_exit((void *)0);
898   return ((void *)0);
899 }
900
901 /* Stop EVENT thread */
902 static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
903   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
904   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
905     return (-1);
906   pthread_mutex_unlock(&pdb->event_thread.mutex);
907   pthread_mutex_destroy(&pdb->event_thread.mutex);
908   return (0);
909 }
910
911 /* Stop POLL thread */
912 static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
913   ovs_db_poll_terminate(pdb);
914   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
915     return (-1);
916   pthread_mutex_destroy(&pdb->poll_thread.mutex);
917   return (0);
918 }
919
920 /*
921  * Public OVS DB API implementation
922  */
923
924 ovs_db_t *ovs_db_init(const char *node, const char *service,
925                       ovs_db_callback_t *cb) {
926   pthread_mutexattr_t mutex_attr;
927   ovs_db_t *pdb = NULL;
928
929   /* allocate db data & fill it */
930   if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
931     return (NULL);
932
933   /* node cannot be unset */
934   if (node == NULL || strlen(node) == 0)
935     return (NULL);
936
937   /* store the OVS DB address */
938   sstrncpy(pdb->node, node, sizeof(pdb->node));
939   if (service != NULL)
940     sstrncpy(pdb->service, service, sizeof(pdb->service));
941
942   /* setup OVS DB callbacks */
943   if (cb)
944     pdb->cb = *cb;
945
946   /* prepare event thread */
947   pthread_cond_init(&pdb->event_thread.cond, NULL);
948   pthread_mutex_init(&pdb->event_thread.mutex, NULL);
949   pthread_mutex_lock(&pdb->event_thread.mutex);
950   if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
951                            pdb) != 0) {
952     OVS_ERROR("event worker start failed");
953     goto failure;
954   }
955
956   /* prepare polling thread */
957   ovs_db_reconnect(pdb);
958   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
959   pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
960   if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
961       0) {
962     OVS_ERROR("pull worker start failed");
963     goto failure;
964   }
965
966   /* init OVS DB mutex */
967   if (pthread_mutexattr_init(&mutex_attr) ||
968       pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
969       pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
970     OVS_ERROR("OVS DB mutex init failed");
971     goto failure;
972   }
973
974   /* return db to the caller */
975   return pdb;
976
977 failure:
978   if (pdb->sock)
979     /* close connection */
980     close(pdb->sock);
981   if (pdb->event_thread.tid != 0)
982     /* stop event thread */
983     if (ovs_db_event_thread_stop(pdb) < 0)
984       OVS_ERROR("stop event thread failed");
985   if (pdb->poll_thread.tid != 0)
986     /* stop poll thread */
987     if (ovs_db_poll_thread_stop(pdb) < 0)
988       OVS_ERROR("stop poll thread failed");
989   sfree(pdb);
990   return NULL;
991 }
992
993 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
994                         ovs_db_result_cb_t cb) {
995   int ret = 0;
996   yajl_gen_status yajl_gen_ret;
997   yajl_val jparams;
998   yajl_gen jgen;
999   ovs_callback_t *new_cb = NULL;
1000   uint64_t uid;
1001   char uid_buff[OVS_UID_STR_SIZE];
1002   const char *req = NULL;
1003   size_t req_len = 0;
1004   struct timespec ts;
1005
1006   /* sanity check */
1007   if (!pdb || !method || !params)
1008     return (-1);
1009
1010   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1011     return (-1);
1012
1013   /* try to parse params */
1014   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1015     OVS_ERROR("params is not a JSON string");
1016     yajl_gen_clear(jgen);
1017     return (-1);
1018   }
1019
1020   /* generate method field */
1021   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1022
1023   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1024   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1025
1026   /* generate params field */
1027   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1028   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1029   yajl_tree_free(jparams);
1030
1031   /* generate id field */
1032   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1033   uid = ovs_uid_generate();
1034   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1035   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1036
1037   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1038
1039   if (cb) {
1040     /* register result callback */
1041     if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1042       goto yajl_gen_failure;
1043
1044     /* add new callback to front */
1045     sem_init(&new_cb->result.sync, 0, 0);
1046     new_cb->result.call = cb;
1047     new_cb->uid = uid;
1048     ovs_db_callback_add(pdb, new_cb);
1049   }
1050
1051   /* send the request */
1052   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1053   OVS_DEBUG("%s", req);
1054   if (!ovs_db_data_send(pdb, req, req_len)) {
1055     if (cb) {
1056       /* wait for result */
1057       clock_gettime(CLOCK_REALTIME, &ts);
1058       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1059       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1060         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1061                   OVS_DB_SEND_REQ_TIMEOUT);
1062         ret = (-1);
1063       }
1064     }
1065   } else {
1066     OVS_ERROR("ovs_db_data_send() failed");
1067     ret = (-1);
1068   }
1069
1070 yajl_gen_failure:
1071   if (new_cb) {
1072     /* destroy callback */
1073     sem_destroy(&new_cb->result.sync);
1074     ovs_db_callback_remove(pdb, new_cb);
1075   }
1076
1077   /* release memory */
1078   yajl_gen_clear(jgen);
1079   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1080 }
1081
1082 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1083                              const char **tb_column,
1084                              ovs_db_table_cb_t update_cb,
1085                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1086   yajl_gen jgen;
1087   yajl_gen_status yajl_gen_ret;
1088   ovs_callback_t *new_cb = NULL;
1089   char uid_str[OVS_UID_STR_SIZE];
1090   char *params;
1091   size_t params_len;
1092   int ovs_db_ret = 0;
1093
1094   /* sanity check */
1095   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1096     return (-1);
1097
1098   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1099     return (-1);
1100
1101   /* register table update callback */
1102   if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1103     return (-1);
1104
1105   /* add new callback to front */
1106   new_cb->table.call = update_cb;
1107   new_cb->uid = ovs_uid_generate();
1108   ovs_db_callback_add(pdb, new_cb);
1109
1110   /* make update notification request
1111    * [<db-name>, <json-value>, <monitor-requests>] */
1112   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1113   {
1114     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1115
1116     /* uid string <json-value> */
1117     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1118     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1119
1120     /* <monitor-requests> */
1121     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1122     {
1123       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1124       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1125       {
1126         /* <monitor-request> */
1127         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1128         {
1129           if (tb_column) {
1130             /* columns within the table to be monitored */
1131             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1132             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1133             for (; *tb_column; tb_column++)
1134               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1135             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1136           }
1137           /* specify select option */
1138           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1139           {
1140             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1141             {
1142               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1143               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1144                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1145               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1146               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1147                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1148               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1149               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1150                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1151               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1152               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1153                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1154             }
1155             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1156           }
1157         }
1158         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1159       }
1160       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1161     }
1162     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1163   }
1164   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1165
1166   /* make a request to subscribe to given table */
1167   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1168                 &params_len);
1169   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1170     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1171     ovs_db_ret = (-1);
1172   }
1173
1174 yajl_gen_failure:
1175   /* release memory */
1176   yajl_gen_clear(jgen);
1177   return ovs_db_ret;
1178 }
1179
1180 int ovs_db_destroy(ovs_db_t *pdb) {
1181   int ovs_db_ret = 0;
1182   int ret = 0;
1183
1184   /* sanity check */
1185   if (pdb == NULL)
1186     return (-1);
1187
1188   /* try to lock the structure before releasing */
1189   if ((ret = pthread_mutex_lock(&pdb->mutex))) {
1190     OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1191     return (-1);
1192   }
1193
1194   /* stop poll thread */
1195   if (ovs_db_event_thread_stop(pdb) < 0) {
1196     OVS_ERROR("stop poll thread failed");
1197     ovs_db_ret = (-1);
1198   }
1199
1200   /* stop event thread */
1201   if (ovs_db_poll_thread_stop(pdb) < 0) {
1202     OVS_ERROR("stop event thread failed");
1203     ovs_db_ret = (-1);
1204   }
1205
1206   /* unsubscribe callbacks */
1207   ovs_db_callback_remove_all(pdb);
1208
1209   /* close connection */
1210   if (pdb->sock)
1211     close(pdb->sock);
1212
1213   /* release DB handler */
1214   pthread_mutex_unlock(&pdb->mutex);
1215   pthread_mutex_destroy(&pdb->mutex);
1216   sfree(pdb);
1217   return ovs_db_ret;
1218 }
1219
1220 /*
1221  * Public OVS utils API implementation
1222  */
1223
1224 /* Get YAJL value by key from YAJL dictionary */
1225 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1226   const char *obj_key = NULL;
1227
1228   /* check params */
1229   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1230     return NULL;
1231
1232   /* find a value by key */
1233   for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1234     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1235     if (strcmp(obj_key, key) == 0)
1236       return YAJL_GET_OBJECT(jval)->values[i];
1237   }
1238
1239   return NULL;
1240 }
1241
1242 /* Get OVS DB map value by given map key */
1243 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1244   size_t map_len = 0;
1245   size_t array_len = 0;
1246   yajl_val *map_values = NULL;
1247   yajl_val *array_values = NULL;
1248   const char *str_val = NULL;
1249
1250   /* check YAJL array */
1251   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1252     return NULL;
1253
1254   /* check a database map value (2-element, first one should be a string */
1255   array_len = YAJL_GET_ARRAY(jval)->len;
1256   array_values = YAJL_GET_ARRAY(jval)->values;
1257   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1258       (!YAJL_IS_ARRAY(array_values[1])))
1259     return NULL;
1260
1261   /* check first element of the array */
1262   str_val = YAJL_GET_STRING(array_values[0]);
1263   if (strcmp("map", str_val) != 0)
1264     return NULL;
1265
1266   /* try to find map value by map key */
1267   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1268   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1269   for (int i = 0; i < map_len; i++) {
1270     /* check YAJL array */
1271     if (!YAJL_IS_ARRAY(map_values[i]))
1272       break;
1273
1274     /* check a database pair value (2-element, first one represents a key
1275      * and it should be a string in our case */
1276     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1277     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1278     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1279       break;
1280
1281     /* return map value if given key equals map key */
1282     str_val = YAJL_GET_STRING(array_values[0]);
1283     if (strcmp(key, str_val) == 0)
1284       return array_values[1];
1285   }
1286   return NULL;
1287 }