ovs_events: Add external ids as metadata
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  *
27  *                         OVS DB API internal architecture diagram
28  * +------------------------------------------------------------------------------+
29  * |OVS plugin      |OVS utils                                                    |
30  * |                |     +------------------------+                              |
31  * |                |     |      echo handler      |                JSON request/ |
32  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
33  * |                |  |  |                        |    |         | result        |
34  * |                |  |  +------------------------+    |         |               |
35  * |                |  |                                |    +----+---+--------+  |
36  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
37  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
38  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
39  * |  +----------+  |  |  |                        |    |    |        |        |  |
40  * |                |  |  +------------------------+    |    +--------+---+----+  |
41  * |                |  |                                |                 ^       |
42  * |  +----------+  |  |  +------------------------+    |                 |       |
43  * |  |  result  |  |  |  |     result handler     |    |                 |       |
44  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
45  * |  +----------+  |  |  |                        |               data   |       |
46  * |                |  |  +------------------------+                      |       |
47  * |                |  |                                                  |       |
48  * |                |  |    +------------------+             +------------+----+  |
49  * |  +----------+  |  |    |thread|           |             |thread|          |  |
50  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
51  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
52  * |  +----------+  |  |    +------------------+             +--------+--------+  |
53  * |                |  |                                              ^           |
54  * +----------------+-------------------------------------------------------------+
55  *                     |                                              |
56  *                 JSON|echo reply                                 raw|data
57  *                     v                                              v
58  * +-------------------+----------------------------------------------+-----------+
59  * |                                 TCP/UNIX socket                              |
60  * +-------------------------------------------------------------------------------
61  *
62  **/
63
64 /* collectd headers */
65 #include "common.h"
66
67 /* private headers */
68 #include "utils_ovs.h"
69
70 /* system libraries */
71 #include <semaphore.h>
72 #include <arpa/inet.h>
73 #include <poll.h>
74 #include <sys/un.h>
75
76 #define OVS_ERROR(fmt, ...) do { \
77   ERROR("ovs_utils: "fmt, ## __VA_ARGS__); } while (0)
78 #define OVS_DEBUG(fmt, ...) do { \
79   DEBUG("%s:%d:%s(): "fmt, __FILE__, __LINE__, __FUNCTION__, \
80         ## __VA_ARGS__); } while (0)
81
82 #define OVS_DB_POLL_TIMEOUT          1  /* poll receive timeout (sec) */
83 #define OVS_DB_POLL_READ_BLOCK_SIZE  512        /* read block size (bytes) */
84 #define OVS_DB_DEFAULT_DB_NAME       "Open_vSwitch"
85 #define OVS_DB_RECONNECT_TIMEOUT     1  /* reconnect timeout (sec) */
86
87 #define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
88 #define OVS_DB_EVENT_TERMINATE       1
89 #define OVS_DB_EVENT_CONN_ESTABLISHED     2
90 #define OVS_DB_EVENT_CONN_TERMINATED      3
91
92 #define OVS_DB_POLL_STATE_RUNNING    1
93 #define OVS_DB_POLL_STATE_EXITING    2
94
95 #define OVS_DB_SEND_REQ_TIMEOUT      5  /* send request timeout (sec) */
96
97 #define OVS_YAJL_CALL(func, ...) \
98   do { \
99     yajl_gen_ret = yajl_gen_status_ok; \
100     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok) \
101       goto yajl_gen_failure; \
102   } while (0)
103 #define OVS_YAJL_ERROR_BUFFER_SIZE       1024
104 #define OVS_ERROR_BUFF_SIZE              512
105 #define OVS_UID_STR_SIZE                 17     /* 64-bit HEX string len + '\0' */
106
107 /* JSON reader internal data */
108 struct ovs_json_reader_s {
109   char *buff_ptr;
110   size_t buff_size;
111   size_t buff_offset;
112   size_t json_offset;
113 };
114 typedef struct ovs_json_reader_s ovs_json_reader_t;
115
116 /* Result callback declaration */
117 struct ovs_result_cb_s {
118   sem_t sync;
119   ovs_db_result_cb_t call;
120 };
121 typedef struct ovs_result_cb_s ovs_result_cb_t;
122
123 /* Table callback declaration */
124 struct ovs_table_cb_s {
125   ovs_db_table_cb_t call;
126 };
127 typedef struct ovs_table_cb_s ovs_table_cb_t;
128
129 /* Callback declaration */
130 struct ovs_callback_s {
131   uint64_t uid;
132   union {
133     ovs_result_cb_t result;
134     ovs_table_cb_t table;
135   };
136   struct ovs_callback_s *next;
137   struct ovs_callback_s *prev;
138 };
139 typedef struct ovs_callback_s ovs_callback_t;
140
141 /* Connection declaration */
142 struct ovs_conn_s {
143   int sock;
144   int domain;
145   int type;
146   int addr_size;
147   union {
148     struct sockaddr_in s_inet;
149     struct sockaddr_un s_unix;
150   } addr;
151 };
152 typedef struct ovs_conn_s ovs_conn_t;
153
154 /* Event thread data declaration */
155 struct ovs_event_thread_s {
156   pthread_t tid;
157   pthread_mutex_t mutex;
158   pthread_cond_t cond;
159   int value;
160 };
161 typedef struct ovs_event_thread_s ovs_event_thread_t;
162
163 /* Poll thread data declaration */
164 struct ovs_poll_thread_s {
165   pthread_t tid;
166   pthread_mutex_t mutex;
167   int state;
168 };
169 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
170
171 /* OVS DB internal data declaration */
172 struct ovs_db_s {
173   ovs_poll_thread_t poll_thread;
174   ovs_event_thread_t event_thread;
175   pthread_mutex_t mutex;
176   ovs_callback_t *remote_cb;
177   ovs_db_callback_t cb;
178   ovs_conn_t conn;
179 };
180 typedef struct ovs_db_s ovs_db_t;
181
182 /* Post an event to event thread.
183  * Possible events are:
184  *  OVS_DB_EVENT_TERMINATE
185  *  OVS_DB_EVENT_CONN_ESTABLISHED
186  *  OVS_DB_EVENT_CONN_TERMINATED
187  */
188 static void
189 ovs_db_event_post(ovs_db_t *pdb, int event)
190 {
191   pthread_mutex_lock(&pdb->event_thread.mutex);
192   pdb->event_thread.value = event;
193   pthread_mutex_unlock(&pdb->event_thread.mutex);
194   pthread_cond_signal(&pdb->event_thread.cond);
195 }
196
197 /* Check if POLL thread is still running. Returns
198  * 1 if running otherwise 0 is returned */
199 static inline int
200 ovs_db_poll_is_running(ovs_db_t *pdb)
201 {
202   int state = 0;
203   pthread_mutex_lock(&pdb->poll_thread.mutex);
204   state = pdb->poll_thread.state;
205   pthread_mutex_unlock(&pdb->poll_thread.mutex);
206   return (state == OVS_DB_POLL_STATE_RUNNING);
207 }
208
209 /* Terminate POLL thread */
210 static inline void
211 ovs_db_poll_terminate(ovs_db_t *pdb)
212 {
213   pthread_mutex_lock(&pdb->poll_thread.mutex);
214   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
215   pthread_mutex_unlock(&pdb->poll_thread.mutex);
216 }
217
218 /* Generate unique identifier (UID). It is used by OVS DB API
219  * to set "id" field for any OVS DB JSON request. */
220 static uint64_t
221 ovs_uid_generate()
222 {
223   struct timespec ts;
224   clock_gettime(CLOCK_MONOTONIC, &ts);
225   return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
226 }
227
228 /*
229  * Callback API. These function are used to store
230  * registered callbacks in OVS DB API.
231  */
232
233 /* Add new callback into OVS DB object */
234 static void
235 ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
236 {
237   pthread_mutex_lock(&pdb->mutex);
238   if (pdb->remote_cb)
239     pdb->remote_cb->prev = new_cb;
240   new_cb->next = pdb->remote_cb;
241   new_cb->prev = NULL;
242   pdb->remote_cb = new_cb;
243   pthread_mutex_unlock(&pdb->mutex);
244 }
245
246 /* Remove callback from OVS DB object */
247 static void
248 ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
249 {
250   ovs_callback_t *pre_cb = del_cb->prev;
251   ovs_callback_t *next_cb = del_cb->next;
252
253   pthread_mutex_lock(&pdb->mutex);
254   if (next_cb)
255     next_cb->prev = del_cb->prev;
256
257   if (pre_cb)
258     pre_cb->next = del_cb->next;
259   else
260     pdb->remote_cb = del_cb->next;
261
262   free(del_cb);
263   pthread_mutex_unlock(&pdb->mutex);
264 }
265
266 /* Remove all callbacks form OVS DB object */
267 static void
268 ovs_db_callback_remove_all(ovs_db_t *pdb)
269 {
270   pthread_mutex_lock(&pdb->mutex);
271   for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
272        del_cb = pdb->remote_cb) {
273     pdb->remote_cb = pdb->remote_cb->next;
274     free(del_cb);
275   }
276   pdb->remote_cb = NULL;
277   pthread_mutex_unlock(&pdb->mutex);
278 }
279
280 /* Get/find callback in OVS DB object by UID. Returns pointer
281  * to requested callback otherwise NULL is returned */
282 static ovs_callback_t *
283 ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
284 {
285   pthread_mutex_lock(&pdb->mutex);
286   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
287     if (cb->uid == uid) {
288       pthread_mutex_unlock(&pdb->mutex);
289       return cb;
290     }
291   pthread_mutex_unlock(&pdb->mutex);
292   return NULL;
293 }
294
295 /* Send all requested data to the socket. Returns 0 if
296  * ALL request data has been sent otherwise negative value
297  * is returned */
298 static int
299 ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len)
300 {
301   ssize_t nbytes = 0;
302   size_t rem = len;
303   size_t off = 0;
304
305   while (rem > 0) {
306     if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
307       return (-1);
308     rem -= (size_t)nbytes;
309     off += (size_t)nbytes;
310   }
311   return (0);
312 }
313
314 /* Parse OVS server URL.
315  * Format of the URL:
316  *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
317  *   "unix:file" - define UNIX socket file (UNIX domain)
318  */
319 static int
320 ovs_db_url_parse(const char *surl, ovs_conn_t *conn)
321 {
322   ovs_conn_t tmp_conn;
323   char *nexttok = NULL;
324   char *in_str = NULL;
325   char *saveptr;
326   int ret = 0;
327
328   /* sanity check */
329   if ((surl == NULL) || (strlen(surl) < 1))
330     return (-1);
331
332   /* parse domain */
333   tmp_conn = *conn;
334   in_str = sstrdup(surl);
335   if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
336     if (strcmp("tcp", nexttok) == 0) {
337       tmp_conn.domain = AF_INET;
338       tmp_conn.type = SOCK_STREAM;
339       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
340     } else if (strcmp("unix", nexttok) == 0) {
341       tmp_conn.domain = AF_UNIX;
342       tmp_conn.type = SOCK_STREAM;
343       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
344     } else
345       goto failure;
346   } else
347     goto failure;
348
349   /* parse url depending on domain */
350   if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
351     if (tmp_conn.domain == AF_UNIX) {
352       /* <UNIX-NAME> */
353       tmp_conn.addr.s_inet.sin_family = AF_UNIX;
354       sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
355     } else {
356       /* <IP:PORT> */
357       tmp_conn.addr.s_inet.sin_family = AF_INET;
358       ret =
359         inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
360       if (ret == 1) {
361         if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
362           tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
363         else
364           goto failure;
365       } else
366         goto failure;
367     }
368   }
369
370   /* save result and return success */
371   *conn = tmp_conn;
372   sfree(in_str);
373   return (0);
374
375 failure:
376   OVS_ERROR("%s() : invalid OVS DB URL provided");
377   sfree(in_str);
378   return (-1);
379 }
380
381 /*
382  * YAJL (Yet Another JSON Library) helper functions
383  * Documentation (https://lloyd.github.io/yajl/)
384  */
385
386 /* Add null-terminated string into YAJL generator handle (JSON object).
387  * Similar function to yajl_gen_string() but takes null-terminated string
388  * instead of string and its length.
389  *
390  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
391  * string - Null-terminated string
392  */
393 static inline yajl_gen_status
394 ovs_yajl_gen_tstring(yajl_gen hander, const char *string)
395 {
396   return yajl_gen_string(hander, string, strlen(string));
397 }
398
399 /* Add YAJL value into YAJL generator handle (JSON object)
400  *
401  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
402  * jval - YAJL value usually returned by yajl_tree_get()
403  */
404 static yajl_gen_status
405 ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval)
406 {
407   size_t array_len = 0;
408   yajl_val *jvalues = NULL;
409   yajl_val jobj_value = NULL;
410   const char *obj_key = NULL;
411   size_t obj_len = 0;
412   yajl_gen_status yajl_gen_ret;
413
414   if (YAJL_IS_STRING(jval))
415     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
416   else if (YAJL_IS_DOUBLE(jval))
417     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
418   else if (YAJL_IS_INTEGER(jval))
419     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
420   else if (YAJL_IS_TRUE(jval))
421     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
422   else if (YAJL_IS_FALSE(jval))
423     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
424   else if (YAJL_IS_NULL(jval))
425     OVS_YAJL_CALL(yajl_gen_null, jgen);
426   else if (YAJL_IS_ARRAY(jval)) {
427     /* create new array and add all elements into the array */
428     array_len = YAJL_GET_ARRAY(jval)->len;
429     jvalues = YAJL_GET_ARRAY(jval)->values;
430     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
431     for (int i = 0; i < array_len; i++)
432       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
433     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
434   } else if (YAJL_IS_OBJECT(jval)) {
435     /* create new object and add all elements into the object */
436     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
437     obj_len = YAJL_GET_OBJECT(jval)->len;
438     for (int i = 0; i < obj_len; i++) {
439       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
440       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
441       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
442       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
443     }
444     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
445   } else {
446     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
447               (int)(jval)->type);
448     goto yajl_gen_failure;
449   }
450   return yajl_gen_status_ok;
451
452 yajl_gen_failure:
453   OVS_ERROR("%s() error to generate value", __FUNCTION__);
454   return yajl_gen_ret;
455 }
456
457 /* OVS DB echo request handler. When OVS DB sends
458  * "echo" request to the client, client should generate
459  * "echo" replay with the same content received in the
460  * request */
461 static int
462 ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode)
463 {
464   yajl_val jparams;
465   yajl_val jid;
466   yajl_gen jgen;
467   size_t resp_len = 0;
468   const char *resp = NULL;
469   const char *params_path[] = {"params", NULL};
470   const char *id_path[] = {"id", NULL};
471   yajl_gen_status yajl_gen_ret;
472
473   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
474     return (-1);
475
476   /* check & get request attributes */
477   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
478       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
479     OVS_ERROR("parse echo request failed");
480     goto yajl_gen_failure;
481   }
482
483   /* generate JSON echo response */
484   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
485
486   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
487   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
488
489   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
490   OVS_YAJL_CALL(yajl_gen_null, jgen);
491
492   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
493   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
494
495   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
496   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
497                 &resp_len);
498
499   /* send the response */
500   OVS_DEBUG("response: %s", resp);
501   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
502     OVS_ERROR("send echo reply failed");
503     goto yajl_gen_failure;
504   }
505   /* clean up and return success */
506   yajl_gen_clear(jgen);
507   return (0);
508
509 yajl_gen_failure:
510   /* release memory */
511   yajl_gen_clear(jgen);
512   return (-1);
513 }
514
515 /* Get OVS DB registered callback by YAJL val. The YAJL
516  * value should be YAJL string (UID). Returns NULL if
517  * callback hasn't been found.
518  */
519 static ovs_callback_t *
520 ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid)
521 {
522   char *endptr = NULL;
523   const char *suid = NULL;
524   uint64_t uid;
525
526   if (jid && YAJL_IS_STRING(jid)) {
527     suid = YAJL_GET_STRING(jid);
528     uid = (uint64_t) strtoul(suid, &endptr, 16);
529     if (*endptr == '\0' && uid)
530       return ovs_db_callback_get(pdb, uid);
531   }
532
533   return NULL;
534 }
535
536 /* OVS DB table update event handler.
537  * This callback is called by POLL thread if OVS DB
538  * table update callback is received from the DB
539  * server. Once registered callback found, it's called
540  * by this handler. */
541 static int
542 ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode)
543 {
544   ovs_callback_t *cb = NULL;
545   yajl_val jvalue;
546   yajl_val jparams;
547   yajl_val jtable_updates;
548   yajl_val jtable_update;
549   size_t obj_len = 0;
550   const char *table_name = NULL;
551   const char *params_path[] = {"params", NULL};
552   const char *id_path[] = {"id", NULL};
553
554   /* check & get request attributes */
555   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
556       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
557     goto ovs_failure;
558
559   /* check array length: [<json-value>, <table-updates>] */
560   if (YAJL_GET_ARRAY(jparams)->len != 2)
561     goto ovs_failure;
562
563   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
564   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
565   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
566     goto ovs_failure;
567
568   /* find registered callback based on <json-value> */
569   cb = ovs_db_table_callback_get(pdb, jvalue);
570   if (cb == NULL || cb->table.call == NULL)
571     goto ovs_failure;
572
573   /* call registered callback */
574   cb->table.call(jtable_updates);
575   return 0;
576
577 ovs_failure:
578   OVS_ERROR("invalid OVS DB table update event");
579   return (-1);
580 }
581
582 /* OVS DB result request handler.
583  * This callback is called by POLL thread if OVS DB
584  * result reply is received from the DB server.
585  * Once registered callback found, it's called
586  * by this handler. */
587 static int
588 ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode)
589 {
590   ovs_callback_t *cb = NULL;
591   yajl_val jresult;
592   yajl_val jerror;
593   yajl_val jid;
594   const char *result_path[] = {"result", NULL};
595   const char *error_path[] = {"error", NULL};
596   const char *id_path[] = {"id", NULL};
597
598   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
599   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
600   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
601
602   /* check & get result attributes */
603   if (!jresult || !jerror || !jid)
604     return (-1);
605
606   /* try to find registered callback */
607   cb = ovs_db_table_callback_get(pdb, jid);
608   if (cb != NULL && cb->result.call != NULL) {
609     /* call registered callback */
610     cb->result.call(jresult, jerror);
611     /* unlock owner of the reply */
612     sem_post(&cb->result.sync);
613   }
614
615   return (0);
616 }
617
618 /* Handle JSON data (one request) and call
619  * appropriate event OVS DB handler. Currently,
620  * update callback 'ovs_db_table_update_cb' and
621  * result callback 'ovs_db_result_cb' is supported.
622  */
623 static int
624 ovs_db_json_data_process(ovs_db_t *pdb, const char *data, size_t len)
625 {
626   const char *method = NULL;
627   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
628   const char *method_path[] = {"method", NULL};
629   const char *result_path[] = {"result", NULL};
630   char *sjson = NULL;
631   yajl_val jnode, jval;
632
633   /* duplicate the data to make null-terminated string
634    * required for yajl_tree_parse() */
635   if ((sjson = malloc(len + 1)) == NULL)
636     return (-1);
637
638   sstrncpy(sjson, data, len + 1);
639   OVS_DEBUG("[len=%d] %s", len, sjson);
640
641   /* parse json data */
642   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
643   if (jnode == NULL) {
644     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
645     sfree(sjson);
646     return (-1);
647   }
648
649   /* get method name */
650   if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
651     method = YAJL_GET_STRING(jval);
652     if (strcmp("echo", method) == 0) {
653       /* echo request from the server */
654       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
655         OVS_ERROR("handle echo request failed");
656     } else if (strcmp("update", method) == 0) {
657       /* update notification */
658       if (ovs_db_table_update_cb(pdb, jnode) < 0)
659         OVS_ERROR("handle update notification failed");
660     }
661   } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_any)) {
662     /* result notification */
663     if (ovs_db_result_cb(pdb, jnode) < 0)
664       OVS_ERROR("handle result reply failed");
665   } else
666     OVS_ERROR("connot find method or result failed");
667
668   /* release memory */
669   yajl_tree_free(jnode);
670   sfree(sjson);
671   return (0);
672 }
673
674 /*
675  * JSON reader implementation.
676  *
677  * This module process raw JSON data (byte stream) and
678  * returns fully-fledged JSON data which can be processed
679  * (parsed) by YAJL later.
680  */
681
682 /* Allocate JSON reader instance */
683 static inline ovs_json_reader_t *
684 ovs_json_reader_alloc()
685 {
686   ovs_json_reader_t *jreader = NULL;
687
688   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
689     return NULL;
690
691   return jreader;
692 }
693
694 /* Push raw data into into the JSON reader for processing */
695 static inline int
696 ovs_json_reader_push_data(ovs_json_reader_t *jreader,
697                           const char *data, size_t data_len)
698 {
699   char *new_buff = NULL;
700   size_t available = jreader->buff_size - jreader->buff_offset;
701
702   /* check/update required memory space */
703   if (available < data_len) {
704     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
705               (int)jreader->buff_size, (int)available, (int)data_len);
706
707     /* allocate new chunk of memory */
708     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
709     if (new_buff == NULL)
710       return (-1);
711
712     /* point to new allocated memory */
713     jreader->buff_ptr = new_buff;
714     jreader->buff_size += data_len;
715   }
716
717   /* store input data */
718   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
719   jreader->buff_offset += data_len;
720   return (0);
721 }
722
723 /* Pop one fully-fledged JSON if already exists. Returns 0 if
724  * completed JSON already exists otherwise negative value is
725  * returned */
726 static inline int
727 ovs_json_reader_pop(ovs_json_reader_t *jreader,
728                     const char **json_ptr, size_t *json_len_ptr)
729 {
730   size_t nbraces = 0;
731   size_t json_len = 0;
732   char *json = NULL;
733
734   /* search open/close brace */
735   for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
736     if (jreader->buff_ptr[i] == '{') {
737       nbraces++;
738     } else if (jreader->buff_ptr[i] == '}')
739       if (nbraces)
740         if (!(--nbraces)) {
741           /* JSON data */
742           *json_ptr = jreader->buff_ptr + jreader->json_offset;
743           *json_len_ptr = json_len + 1;
744           jreader->json_offset = i + 1;
745           return (0);
746         }
747
748     /* increase JSON data length */
749     if (nbraces)
750       json_len++;
751   }
752
753   if (jreader->json_offset) {
754     if (jreader->json_offset < jreader->buff_offset) {
755       /* shift data to the beginning of the buffer
756        * and zero rest of the buffer data */
757       json = &jreader->buff_ptr[jreader->json_offset];
758       json_len = jreader->buff_offset - jreader->json_offset;
759       for (int i = 0; i < jreader->buff_size; i++)
760         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
761       jreader->buff_offset = json_len;
762     } else
763       /* reset the buffer */
764       jreader->buff_offset = 0;
765
766     /* data is at the beginning of the buffer */
767     jreader->json_offset = 0;
768   }
769
770   return (-1);
771 }
772
773 /* Reset JSON reader. It is useful when start processing
774  * new raw data. E.g.: in case of lost stream connection.
775  */
776 static inline void
777 ovs_json_reader_reset(ovs_json_reader_t *jreader)
778 {
779   if (jreader) {
780     jreader->buff_offset = 0;
781     jreader->json_offset = 0;
782   }
783 }
784
785 /* Release internal data allocated for JSON reader */
786 static inline void
787 ovs_json_reader_free(ovs_json_reader_t *jreader)
788 {
789   if (jreader) {
790     free(jreader->buff_ptr);
791     free(jreader);
792   }
793 }
794
795 /* Reconnect to OVD DB and call init OVS DB callback
796  * 'init_cb' if connection has been established.
797  */
798 static int
799 ovs_db_reconnect(ovs_db_t *pdb)
800 {
801   char errbuff[OVS_ERROR_BUFF_SIZE];
802
803   /* remove all registered OVS DB table/result callbacks */
804   ovs_db_callback_remove_all(pdb);
805
806   /* open new socket */
807   if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
808     sstrerror(errno, errbuff, sizeof(errbuff));
809     OVS_ERROR("socket(): %s", errbuff);
810     return (-1);
811   }
812
813   /* try to connect to server */
814   if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
815               pdb->conn.addr_size) < 0) {
816     sstrerror(errno, errbuff, sizeof(errbuff));
817     OVS_ERROR("connect(): %s", errbuff);
818     close(pdb->conn.sock);
819     return (-1);
820   }
821
822   /* send notification to event thread */
823   ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
824   return (0);
825 }
826
827 /* POLL worker thread.
828  * It listens on OVS DB connection for incoming
829  * requests/reply/events etc. Also, it reconnects to OVS DB
830  * if connection has been lost.
831  */
832 static void *
833 ovs_poll_worker(void *arg)
834 {
835   ovs_db_t *pdb = (ovs_db_t *)arg;      /* pointer to OVS DB */
836   ovs_json_reader_t *jreader = NULL;
837   const char *json;
838   size_t json_len;
839   ssize_t nbytes = 0;
840   char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
841   struct pollfd poll_fd;
842   int poll_ret = 0;
843
844   if ((jreader = ovs_json_reader_alloc()) == NULL) {
845     OVS_ERROR("initialize json reader failed");
846     goto thread_exit;
847   }
848
849   /* start polling data */
850   poll_fd.fd = pdb->conn.sock;
851   poll_fd.events = POLLIN | POLLPRI;
852   poll_fd.revents = 0;
853
854   /* poll data */
855   while (ovs_db_poll_is_running(pdb)) {
856     poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
857     if (poll_ret > 0) {
858       if (poll_fd.revents & POLLNVAL) {
859         /* invalid file descriptor, reconnect */
860         if (ovs_db_reconnect(pdb) != 0) {
861           /* sleep awhile until next reconnect */
862           usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
863         }
864         ovs_json_reader_reset(jreader);
865         poll_fd.fd = pdb->conn.sock;
866       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
867         /* connection is broken */
868         OVS_ERROR("poll() peer closed its end of the channel");
869         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
870         close(poll_fd.fd);
871       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
872         /* read incoming data */
873         nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
874         if (nbytes > 0) {
875           OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
876           ovs_json_reader_push_data(jreader, buff, nbytes);
877           while (!ovs_json_reader_pop(jreader, &json, &json_len))
878             /* process JSON data */
879             ovs_db_json_data_process(pdb, json, json_len);
880         } else if (nbytes == 0) {
881           OVS_ERROR("recv() peer has performed an orderly shutdown");
882           ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
883           close(poll_fd.fd);
884         } else {
885           OVS_ERROR("recv() receive data error");
886           break;
887         }
888       }                         /* poll() POLLIN & POLLPRI */
889     } else if (poll_ret == 0)
890       OVS_DEBUG("poll() timeout");
891     else {
892       OVS_ERROR("poll() error");
893       break;
894     }
895   }
896
897 thread_exit:
898   OVS_DEBUG("poll thread has been completed");
899   ovs_json_reader_free(jreader);
900   pthread_exit((void *)0);
901   return ((void *)0);
902 }
903
904 /* EVENT worker thread.
905  * Perform task based on incoming events. This
906  * task can be done asynchronously which allows to
907  * handle OVD DB callback like 'init_cb'.
908  */
909 static void *
910 ovs_event_worker(void *arg)
911 {
912   int ret = 0;
913   ovs_db_t *pdb = (ovs_db_t *)arg;
914   struct timespec ts;
915
916   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
917     /* wait for an event */
918     clock_gettime(CLOCK_REALTIME, &ts);
919     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
920     ret = pthread_cond_timedwait(&pdb->event_thread.cond,
921                                  &pdb->event_thread.mutex, &ts);
922     if (!ret) {
923       /* handle the event */
924       OVS_DEBUG("handle event %d", pdb->event_thread.value);
925       switch (pdb->event_thread.value) {
926       case OVS_DB_EVENT_CONN_ESTABLISHED:
927         if (pdb->cb.post_conn_init)
928           pdb->cb.post_conn_init(pdb);
929         break;
930       case OVS_DB_EVENT_CONN_TERMINATED:
931         if (pdb->cb.post_conn_terminate)
932           pdb->cb.post_conn_terminate();
933         break;
934       default:
935         OVS_DEBUG("unknown event received");
936         break;
937       }
938     } else if (ret == ETIMEDOUT) {
939       /* wait timeout */
940       OVS_DEBUG("no event received (timeout)");
941       continue;
942     } else {
943       /* unexpected error */
944       OVS_ERROR("pthread_cond_timedwait() failed");
945       break;
946     }
947   }
948
949 thread_exit:
950   OVS_DEBUG("event thread has been completed");
951   pthread_exit((void *)0);
952   return ((void *)0);
953 }
954
955 /* Stop EVENT thread */
956 static int
957 ovs_db_event_thread_stop(ovs_db_t *pdb)
958 {
959   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
960   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
961     return (-1);
962   pthread_mutex_unlock(&pdb->event_thread.mutex);
963   pthread_mutex_destroy(&pdb->event_thread.mutex);
964   return (0);
965 }
966
967 /* Stop POLL thread */
968 static int
969 ovs_db_poll_thread_stop(ovs_db_t *pdb)
970 {
971   ovs_db_poll_terminate(pdb);
972   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
973     return (-1);
974   pthread_mutex_destroy(&pdb->poll_thread.mutex);
975   return (0);
976 }
977
978 /*
979  * Public OVS DB API implementation
980  */
981
982 ovs_db_t *
983 ovs_db_init(const char *surl, ovs_db_callback_t *cb)
984 {
985   pthread_mutexattr_t mutex_attr;
986   ovs_db_t *pdb = NULL;
987
988   /* allocate db data & fill it */
989   if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
990     return (NULL);
991
992   /* convert string url to socket addr */
993   if (ovs_db_url_parse(surl, &pdb->conn) < 0)
994     goto failure;
995
996   /* setup OVS DB callbacks */
997   if (cb)
998     pdb->cb = *cb;
999
1000   /* prepare event thread */
1001   pthread_cond_init(&pdb->event_thread.cond, NULL);
1002   pthread_mutex_init(&pdb->event_thread.mutex, NULL);
1003   pthread_mutex_lock(&pdb->event_thread.mutex);
1004   if (plugin_thread_create(&pdb->event_thread.tid, NULL,
1005                            ovs_event_worker, pdb) != 0) {
1006     OVS_ERROR("event worker start failed");
1007     goto failure;
1008   }
1009
1010   /* prepare polling thread */
1011   ovs_db_reconnect(pdb);
1012   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
1013   pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
1014   if (plugin_thread_create(&pdb->poll_thread.tid, NULL,
1015                            ovs_poll_worker, pdb) != 0) {
1016     OVS_ERROR("pull worker start failed");
1017     goto failure;
1018   }
1019
1020   /* init OVS DB mutex */
1021   if (pthread_mutexattr_init(&mutex_attr) ||
1022       pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
1023       pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
1024     OVS_ERROR("OVS DB mutex init failed");
1025     goto failure;
1026   }
1027
1028   /* return db to the caller */
1029   return pdb;
1030
1031 failure:
1032   if (pdb->conn.sock)
1033     /* close connection */
1034     close(pdb->conn.sock);
1035   if (pdb->event_thread.tid != 0)
1036     /* stop event thread */
1037     if (ovs_db_event_thread_stop(pdb) < 0)
1038       OVS_ERROR("stop event thread failed");
1039   if (pdb->poll_thread.tid != 0)
1040     /* stop poll thread */
1041     if (ovs_db_poll_thread_stop(pdb) < 0)
1042       OVS_ERROR("stop poll thread failed");
1043   sfree(pdb);
1044   return NULL;
1045 }
1046
1047 int
1048 ovs_db_send_request(ovs_db_t *pdb, const char *method,
1049                     const char *params, ovs_db_result_cb_t cb)
1050 {
1051   int ret = 0;
1052   yajl_gen_status yajl_gen_ret;
1053   yajl_val jparams;
1054   yajl_gen jgen;
1055   ovs_callback_t *new_cb = NULL;
1056   uint64_t uid;
1057   char uid_buff[OVS_UID_STR_SIZE];
1058   const char *req = NULL;
1059   size_t req_len = 0;
1060   struct timespec ts;
1061
1062   /* sanity check */
1063   if (!pdb || !method || !params)
1064     return (-1);
1065
1066   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1067     return (-1);
1068
1069   /* try to parse params */
1070   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1071     OVS_ERROR("params is not a JSON string");
1072     yajl_gen_clear(jgen);
1073     return (-1);
1074   }
1075
1076   /* generate method field */
1077   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1078
1079   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1080   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1081
1082   /* generate params field */
1083   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1084   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1085   yajl_tree_free(jparams);
1086
1087   /* generate id field */
1088   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1089   uid = ovs_uid_generate();
1090   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1091   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1092
1093   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1094
1095   if (cb) {
1096     /* register result callback */
1097     if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1098       goto yajl_gen_failure;
1099
1100     /* add new callback to front */
1101     sem_init(&new_cb->result.sync, 0, 0);
1102     new_cb->result.call = cb;
1103     new_cb->uid = uid;
1104     ovs_db_callback_add(pdb, new_cb);
1105   }
1106
1107   /* send the request */
1108   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req,
1109                 &req_len);
1110   OVS_DEBUG("%s", req);
1111   if (!ovs_db_data_send(pdb, req, req_len)) {
1112     if (cb) {
1113       /* wait for result */
1114       clock_gettime(CLOCK_REALTIME, &ts);
1115       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1116       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1117         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1118                   OVS_DB_SEND_REQ_TIMEOUT);
1119         ret = (-1);
1120       }
1121     }
1122   } else {
1123     OVS_ERROR("ovs_db_data_send() failed");
1124     ret = (-1);
1125   }
1126
1127 yajl_gen_failure:
1128   if (new_cb) {
1129     /* destroy callback */
1130     sem_destroy(&new_cb->result.sync);
1131     ovs_db_callback_remove(pdb, new_cb);
1132   }
1133
1134   /* release memory */
1135   yajl_gen_clear(jgen);
1136   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1137 }
1138
1139 int
1140 ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1141                          const char **tb_column, ovs_db_table_cb_t update_cb,
1142                          ovs_db_result_cb_t result_cb, unsigned int flags)
1143 {
1144   yajl_gen jgen;
1145   yajl_gen_status yajl_gen_ret;
1146   ovs_callback_t *new_cb = NULL;
1147   char uid_str[OVS_UID_STR_SIZE];
1148   char *params;
1149   size_t params_len;
1150   int ovs_db_ret = 0;
1151
1152   /* sanity check */
1153   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1154     return (-1);
1155
1156   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1157     return (-1);
1158
1159   /* register table update callback */
1160   if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1161     return (-1);
1162
1163   /* add new callback to front */
1164   new_cb->table.call = update_cb;
1165   new_cb->uid = ovs_uid_generate();
1166   ovs_db_callback_add(pdb, new_cb);
1167
1168   /* make update notification request
1169    * [<db-name>, <json-value>, <monitor-requests>] */
1170   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1171   {
1172     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1173
1174     /* uid string <json-value> */
1175     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1176     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1177
1178     /* <monitor-requests> */
1179     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1180     {
1181       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1182       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1183       {
1184         /* <monitor-request> */
1185         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1186         {
1187           if (tb_column) {
1188             /* columns within the table to be monitored */
1189             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1190             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1191             for (; *tb_column; tb_column++)
1192               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1193             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1194           }
1195           /* specify select option */
1196           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1197           {
1198             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1199             {
1200               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1201               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1202                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1203               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1204               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1205                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1206               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1207               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1208                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1209               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1210               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1211                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1212             }
1213             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1214           }
1215         }
1216         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1217       }
1218       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1219     }
1220     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1221   }
1222   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1223
1224   /* make a request to subscribe to given table */
1225   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1226                 &params_len);
1227   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1228     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1229     ovs_db_ret = (-1);
1230   }
1231
1232 yajl_gen_failure:
1233   /* release memory */
1234   yajl_gen_clear(jgen);
1235   return ovs_db_ret;
1236 }
1237
1238 int
1239 ovs_db_destroy(ovs_db_t *pdb)
1240 {
1241   int ovs_db_ret = 0;
1242   int ret = 0;
1243
1244   /* sanity check */
1245   if (pdb == NULL)
1246     return (-1);
1247
1248   /* try to lock the structure before releasing */
1249   if (ret = pthread_mutex_lock(&pdb->mutex)) {
1250     OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1251     return (-1);
1252   }
1253
1254   /* stop poll thread */
1255   if (ovs_db_event_thread_stop(pdb) < 0) {
1256     OVS_ERROR("stop poll thread failed");
1257     ovs_db_ret = (-1);
1258   }
1259
1260   /* stop event thread */
1261   if (ovs_db_poll_thread_stop(pdb) < 0) {
1262     OVS_ERROR("stop event thread failed");
1263     ovs_db_ret = (-1);
1264   }
1265
1266   /* unsubscribe callbacks */
1267   ovs_db_callback_remove_all(pdb);
1268
1269   /* close connection */
1270   if (pdb->conn.sock)
1271     close(pdb->conn.sock);
1272
1273   /* release DB handler */
1274   pthread_mutex_unlock(&pdb->mutex);
1275   pthread_mutex_destroy(&pdb->mutex);
1276   sfree(pdb);
1277   return ovs_db_ret;
1278 }
1279
1280 /*
1281  * Public OVS utils API implementation
1282  */
1283
1284 /* Get YAJL value by key from YAJL dictionary */
1285 yajl_val
1286 ovs_utils_get_value_by_key(yajl_val jval, const char *key)
1287 {
1288   const char *obj_key = NULL;
1289
1290   /* check params */
1291   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1292     return NULL;
1293
1294   /* find a value by key */
1295   for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1296     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1297     if (strcmp(obj_key, key) == 0)
1298       return YAJL_GET_OBJECT(jval)->values[i];
1299   }
1300
1301   return NULL;
1302 }
1303
1304 /* Get OVS DB map value by given map key */
1305 yajl_val
1306 ovs_utils_get_map_value(yajl_val jval, const char *key)
1307 {
1308   size_t map_len = 0;
1309   size_t array_len = 0;
1310   yajl_val *map_values = NULL;
1311   yajl_val *array_values = NULL;
1312
1313   /* check YAJL array */
1314   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1315     return NULL;
1316
1317   /* check a database map value (2-element, first one should be a string */
1318   array_len = YAJL_GET_ARRAY(jval)->len;
1319   array_values = YAJL_GET_ARRAY(jval)->values;
1320   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1321       (!YAJL_IS_ARRAY(array_values[1])))
1322     return NULL;
1323
1324   /* check first element of the array */
1325   if (strcmp("map", YAJL_GET_STRING(array_values[0])) != 0)
1326     return NULL;
1327
1328   /* try to find map value by map key */
1329   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1330   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1331   for (int i = 0; i < map_len; i++) {
1332     /* check YAJL array */
1333     if (!YAJL_IS_ARRAY(map_values[i]))
1334       break;
1335
1336     /* check a database pair value (2-element, first one represents a key
1337      * and it should be a string in our case */
1338     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1339     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1340     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1341       break;
1342
1343     /* return map value if given key equals map key */
1344     if (strcmp(key, YAJL_GET_STRING(array_values[0])) == 0)
1345       return array_values[1];
1346   }
1347   return NULL;
1348 }