ovs_events plugin: Filter through clang-format.
[collectd.git] / src / utils_ovs.c
1 /**
2  * collectd - src/utils_ovs.c
3  *
4  * Copyright(c) 2016 Intel Corporation. All rights reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy of
7  * this software and associated documentation files (the "Software"), to deal in
8  * the Software without restriction, including without limitation the rights to
9  * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
10  * of the Software, and to permit persons to whom the Software is furnished to do
11  * so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in all
14  * copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22  * SOFTWARE.
23  *
24  * Authors:
25  *   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
26  **/
27
28 /* clang-format off */
29 /*
30  *                         OVS DB API internal architecture diagram
31  * +------------------------------------------------------------------------------+
32  * |OVS plugin      |OVS utils                                                    |
33  * |                |     +------------------------+                              |
34  * |                |     |      echo handler      |                JSON request/ |
35  * |                |  +--+ (ovs_db_table_echo_cb) +<---+---------+ update event/ |
36  * |                |  |  |                        |    |         | result        |
37  * |                |  |  +------------------------+    |         |               |
38  * |                |  |                                |    +----+---+--------+  |
39  * |  +----------+  |  |  +------------------------+    |    |        |        |  |
40  * |  |  update  |  |  |  |     update handler     |    |    |  YAJL  |  JSON  |  |
41  * |  | callback +<-------+(ovs_db_table_update_cp)+<---+    | parser | reader |  |
42  * |  +----------+  |  |  |                        |    |    |        |        |  |
43  * |                |  |  +------------------------+    |    +--------+---+----+  |
44  * |                |  |                                |                 ^       |
45  * |  +----------+  |  |  +------------------------+    |                 |       |
46  * |  |  result  |  |  |  |     result handler     |    |                 |       |
47  * |  | callback +<-------+   (ovs_db_result_cb)   +<---+        JSON raw |       |
48  * |  +----------+  |  |  |                        |               data   |       |
49  * |                |  |  +------------------------+                      |       |
50  * |                |  |                                                  |       |
51  * |                |  |    +------------------+             +------------+----+  |
52  * |  +----------+  |  |    |thread|           |             |thread|          |  |
53  * |  |   init   |  |  |    |                  |  reconnect  |                 |  |
54  * |  | callback +<---------+   EVENT WORKER   +<------------+   POLL WORKER   |  |
55  * |  +----------+  |  |    +------------------+             +--------+--------+  |
56  * |                |  |                                              ^           |
57  * +----------------+-------------------------------------------------------------+
58  *                     |                                              |
59  *                 JSON|echo reply                                 raw|data
60  *                     v                                              v
61  * +-------------------+----------------------------------------------+-----------+
62  * |                                 TCP/UNIX socket                              |
63  * +-------------------------------------------------------------------------------
64  */
65 /* clang-format on */
66
67 /* collectd headers */
68 #include "common.h"
69
70 /* private headers */
71 #include "utils_ovs.h"
72
73 /* system libraries */
74 #include <arpa/inet.h>
75 #include <poll.h>
76 #include <semaphore.h>
77 #include <sys/un.h>
78
79 #define OVS_ERROR(fmt, ...)                                                    \
80   do {                                                                         \
81     ERROR("ovs_utils: " fmt, ##__VA_ARGS__);                                   \
82   } while (0)
83 #define OVS_DEBUG(fmt, ...)                                                    \
84   do {                                                                         \
85     DEBUG("%s:%d:%s(): " fmt, __FILE__, __LINE__, __FUNCTION__,                \
86           ##__VA_ARGS__);                                                      \
87   } while (0)
88
89 #define OVS_DB_POLL_TIMEOUT 1           /* poll receive timeout (sec) */
90 #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */
91 #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch"
92 #define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */
93
94 #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */
95 #define OVS_DB_EVENT_TERMINATE 1
96 #define OVS_DB_EVENT_CONN_ESTABLISHED 2
97 #define OVS_DB_EVENT_CONN_TERMINATED 3
98
99 #define OVS_DB_POLL_STATE_RUNNING 1
100 #define OVS_DB_POLL_STATE_EXITING 2
101
102 #define OVS_DB_SEND_REQ_TIMEOUT 5 /* send request timeout (sec) */
103
104 #define OVS_YAJL_CALL(func, ...)                                               \
105   do {                                                                         \
106     yajl_gen_ret = yajl_gen_status_ok;                                         \
107     if ((yajl_gen_ret = func(__VA_ARGS__)) != yajl_gen_status_ok)              \
108       goto yajl_gen_failure;                                                   \
109   } while (0)
110 #define OVS_YAJL_ERROR_BUFFER_SIZE 1024
111 #define OVS_ERROR_BUFF_SIZE 512
112 #define OVS_UID_STR_SIZE 17 /* 64-bit HEX string len + '\0' */
113
114 /* JSON reader internal data */
115 struct ovs_json_reader_s {
116   char *buff_ptr;
117   size_t buff_size;
118   size_t buff_offset;
119   size_t json_offset;
120 };
121 typedef struct ovs_json_reader_s ovs_json_reader_t;
122
123 /* Result callback declaration */
124 struct ovs_result_cb_s {
125   sem_t sync;
126   ovs_db_result_cb_t call;
127 };
128 typedef struct ovs_result_cb_s ovs_result_cb_t;
129
130 /* Table callback declaration */
131 struct ovs_table_cb_s {
132   ovs_db_table_cb_t call;
133 };
134 typedef struct ovs_table_cb_s ovs_table_cb_t;
135
136 /* Callback declaration */
137 struct ovs_callback_s {
138   uint64_t uid;
139   union {
140     ovs_result_cb_t result;
141     ovs_table_cb_t table;
142   };
143   struct ovs_callback_s *next;
144   struct ovs_callback_s *prev;
145 };
146 typedef struct ovs_callback_s ovs_callback_t;
147
148 /* Connection declaration */
149 struct ovs_conn_s {
150   int sock;
151   int domain;
152   int type;
153   int addr_size;
154   union {
155     struct sockaddr_in s_inet;
156     struct sockaddr_un s_unix;
157   } addr;
158 };
159 typedef struct ovs_conn_s ovs_conn_t;
160
161 /* Event thread data declaration */
162 struct ovs_event_thread_s {
163   pthread_t tid;
164   pthread_mutex_t mutex;
165   pthread_cond_t cond;
166   int value;
167 };
168 typedef struct ovs_event_thread_s ovs_event_thread_t;
169
170 /* Poll thread data declaration */
171 struct ovs_poll_thread_s {
172   pthread_t tid;
173   pthread_mutex_t mutex;
174   int state;
175 };
176 typedef struct ovs_poll_thread_s ovs_poll_thread_t;
177
178 /* OVS DB internal data declaration */
179 struct ovs_db_s {
180   ovs_poll_thread_t poll_thread;
181   ovs_event_thread_t event_thread;
182   pthread_mutex_t mutex;
183   ovs_callback_t *remote_cb;
184   ovs_db_callback_t cb;
185   ovs_conn_t conn;
186 };
187 typedef struct ovs_db_s ovs_db_t;
188
189 /* Post an event to event thread.
190  * Possible events are:
191  *  OVS_DB_EVENT_TERMINATE
192  *  OVS_DB_EVENT_CONN_ESTABLISHED
193  *  OVS_DB_EVENT_CONN_TERMINATED
194  */
195 static void ovs_db_event_post(ovs_db_t *pdb, int event) {
196   pthread_mutex_lock(&pdb->event_thread.mutex);
197   pdb->event_thread.value = event;
198   pthread_mutex_unlock(&pdb->event_thread.mutex);
199   pthread_cond_signal(&pdb->event_thread.cond);
200 }
201
202 /* Check if POLL thread is still running. Returns
203  * 1 if running otherwise 0 is returned */
204 static inline int ovs_db_poll_is_running(ovs_db_t *pdb) {
205   int state = 0;
206   pthread_mutex_lock(&pdb->poll_thread.mutex);
207   state = pdb->poll_thread.state;
208   pthread_mutex_unlock(&pdb->poll_thread.mutex);
209   return (state == OVS_DB_POLL_STATE_RUNNING);
210 }
211
212 /* Terminate POLL thread */
213 static inline void ovs_db_poll_terminate(ovs_db_t *pdb) {
214   pthread_mutex_lock(&pdb->poll_thread.mutex);
215   pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING;
216   pthread_mutex_unlock(&pdb->poll_thread.mutex);
217 }
218
219 /* Generate unique identifier (UID). It is used by OVS DB API
220  * to set "id" field for any OVS DB JSON request. */
221 static uint64_t ovs_uid_generate() {
222   struct timespec ts;
223   clock_gettime(CLOCK_MONOTONIC, &ts);
224   return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX));
225 }
226
227 /*
228  * Callback API. These function are used to store
229  * registered callbacks in OVS DB API.
230  */
231
232 /* Add new callback into OVS DB object */
233 static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) {
234   pthread_mutex_lock(&pdb->mutex);
235   if (pdb->remote_cb)
236     pdb->remote_cb->prev = new_cb;
237   new_cb->next = pdb->remote_cb;
238   new_cb->prev = NULL;
239   pdb->remote_cb = new_cb;
240   pthread_mutex_unlock(&pdb->mutex);
241 }
242
243 /* Remove callback from OVS DB object */
244 static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) {
245   ovs_callback_t *pre_cb = del_cb->prev;
246   ovs_callback_t *next_cb = del_cb->next;
247
248   pthread_mutex_lock(&pdb->mutex);
249   if (next_cb)
250     next_cb->prev = del_cb->prev;
251
252   if (pre_cb)
253     pre_cb->next = del_cb->next;
254   else
255     pdb->remote_cb = del_cb->next;
256
257   free(del_cb);
258   pthread_mutex_unlock(&pdb->mutex);
259 }
260
261 /* Remove all callbacks form OVS DB object */
262 static void ovs_db_callback_remove_all(ovs_db_t *pdb) {
263   pthread_mutex_lock(&pdb->mutex);
264   for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
265        del_cb = pdb->remote_cb) {
266     pdb->remote_cb = pdb->remote_cb->next;
267     free(del_cb);
268   }
269   pdb->remote_cb = NULL;
270   pthread_mutex_unlock(&pdb->mutex);
271 }
272
273 /* Get/find callback in OVS DB object by UID. Returns pointer
274  * to requested callback otherwise NULL is returned */
275 static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) {
276   pthread_mutex_lock(&pdb->mutex);
277   for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
278     if (cb->uid == uid) {
279       pthread_mutex_unlock(&pdb->mutex);
280       return cb;
281     }
282   pthread_mutex_unlock(&pdb->mutex);
283   return NULL;
284 }
285
286 /* Send all requested data to the socket. Returns 0 if
287  * ALL request data has been sent otherwise negative value
288  * is returned */
289 static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) {
290   ssize_t nbytes = 0;
291   size_t rem = len;
292   size_t off = 0;
293
294   while (rem > 0) {
295     if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0)
296       return (-1);
297     rem -= (size_t)nbytes;
298     off += (size_t)nbytes;
299   }
300   return (0);
301 }
302
303 /* Parse OVS server URL.
304  * Format of the URL:
305  *   "tcp:a.b.c.d:port" - define TCP connection (INET domain)
306  *   "unix:file" - define UNIX socket file (UNIX domain)
307  */
308 static int ovs_db_url_parse(const char *surl, ovs_conn_t *conn) {
309   ovs_conn_t tmp_conn;
310   char *nexttok = NULL;
311   char *in_str = NULL;
312   char *saveptr;
313   int ret = 0;
314
315   /* sanity check */
316   if ((surl == NULL) || (strlen(surl) < 1))
317     return (-1);
318
319   /* parse domain */
320   tmp_conn = *conn;
321   in_str = sstrdup(surl);
322   if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) {
323     if (strcmp("tcp", nexttok) == 0) {
324       tmp_conn.domain = AF_INET;
325       tmp_conn.type = SOCK_STREAM;
326       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet);
327     } else if (strcmp("unix", nexttok) == 0) {
328       tmp_conn.domain = AF_UNIX;
329       tmp_conn.type = SOCK_STREAM;
330       tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix);
331     } else
332       goto failure;
333   } else
334     goto failure;
335
336   /* parse url depending on domain */
337   if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) {
338     if (tmp_conn.domain == AF_UNIX) {
339       /* <UNIX-NAME> */
340       tmp_conn.addr.s_inet.sin_family = AF_UNIX;
341       sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1);
342     } else {
343       /* <IP:PORT> */
344       tmp_conn.addr.s_inet.sin_family = AF_INET;
345       ret = inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr);
346       if (ret == 1) {
347         if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL)
348           tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok));
349         else
350           goto failure;
351       } else
352         goto failure;
353     }
354   }
355
356   /* save result and return success */
357   *conn = tmp_conn;
358   sfree(in_str);
359   return (0);
360
361 failure:
362   OVS_ERROR("invalid OVS DB URL provided [url=%s]", surl);
363   sfree(in_str);
364   return (-1);
365 }
366
367 /*
368  * YAJL (Yet Another JSON Library) helper functions
369  * Documentation (https://lloyd.github.io/yajl/)
370  */
371
372 /* Add null-terminated string into YAJL generator handle (JSON object).
373  * Similar function to yajl_gen_string() but takes null-terminated string
374  * instead of string and its length.
375  *
376  * jgen   - YAJL generator handle allocated by yajl_gen_alloc()
377  * string - Null-terminated string
378  */
379 static inline yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander,
380                                                    const char *string) {
381   return yajl_gen_string(hander, string, strlen(string));
382 }
383
384 /* Add YAJL value into YAJL generator handle (JSON object)
385  *
386  * jgen - YAJL generator handle allocated by yajl_gen_alloc()
387  * jval - YAJL value usually returned by yajl_tree_get()
388  */
389 static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) {
390   size_t array_len = 0;
391   yajl_val *jvalues = NULL;
392   yajl_val jobj_value = NULL;
393   const char *obj_key = NULL;
394   size_t obj_len = 0;
395   yajl_gen_status yajl_gen_ret;
396
397   if (YAJL_IS_STRING(jval))
398     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval));
399   else if (YAJL_IS_DOUBLE(jval))
400     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_DOUBLE(jval));
401   else if (YAJL_IS_INTEGER(jval))
402     OVS_YAJL_CALL(yajl_gen_double, jgen, YAJL_GET_INTEGER(jval));
403   else if (YAJL_IS_TRUE(jval))
404     OVS_YAJL_CALL(yajl_gen_bool, jgen, 1);
405   else if (YAJL_IS_FALSE(jval))
406     OVS_YAJL_CALL(yajl_gen_bool, jgen, 0);
407   else if (YAJL_IS_NULL(jval))
408     OVS_YAJL_CALL(yajl_gen_null, jgen);
409   else if (YAJL_IS_ARRAY(jval)) {
410     /* create new array and add all elements into the array */
411     array_len = YAJL_GET_ARRAY(jval)->len;
412     jvalues = YAJL_GET_ARRAY(jval)->values;
413     OVS_YAJL_CALL(yajl_gen_array_open, jgen);
414     for (int i = 0; i < array_len; i++)
415       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]);
416     OVS_YAJL_CALL(yajl_gen_array_close, jgen);
417   } else if (YAJL_IS_OBJECT(jval)) {
418     /* create new object and add all elements into the object */
419     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
420     obj_len = YAJL_GET_OBJECT(jval)->len;
421     for (int i = 0; i < obj_len; i++) {
422       obj_key = YAJL_GET_OBJECT(jval)->keys[i];
423       jobj_value = YAJL_GET_OBJECT(jval)->values[i];
424       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key);
425       OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jobj_value);
426     }
427     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
428   } else {
429     OVS_ERROR("%s() unsupported value type %d (skip)", __FUNCTION__,
430               (int)(jval)->type);
431     goto yajl_gen_failure;
432   }
433   return yajl_gen_status_ok;
434
435 yajl_gen_failure:
436   OVS_ERROR("%s() error to generate value", __FUNCTION__);
437   return yajl_gen_ret;
438 }
439
440 /* OVS DB echo request handler. When OVS DB sends
441  * "echo" request to the client, client should generate
442  * "echo" replay with the same content received in the
443  * request */
444 static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) {
445   yajl_val jparams;
446   yajl_val jid;
447   yajl_gen jgen;
448   size_t resp_len = 0;
449   const char *resp = NULL;
450   const char *params_path[] = {"params", NULL};
451   const char *id_path[] = {"id", NULL};
452   yajl_gen_status yajl_gen_ret;
453
454   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
455     return (-1);
456
457   /* check & get request attributes */
458   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
459       ((jid = yajl_tree_get(jnode, id_path, yajl_t_any)) == NULL)) {
460     OVS_ERROR("parse echo request failed");
461     goto yajl_gen_failure;
462   }
463
464   /* generate JSON echo response */
465   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
466
467   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "result");
468   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
469
470   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "error");
471   OVS_YAJL_CALL(yajl_gen_null, jgen);
472
473   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
474   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jid);
475
476   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
477   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&resp,
478                 &resp_len);
479
480   /* send the response */
481   OVS_DEBUG("response: %s", resp);
482   if (ovs_db_data_send(pdb, resp, resp_len) < 0) {
483     OVS_ERROR("send echo reply failed");
484     goto yajl_gen_failure;
485   }
486   /* clean up and return success */
487   yajl_gen_clear(jgen);
488   return (0);
489
490 yajl_gen_failure:
491   /* release memory */
492   yajl_gen_clear(jgen);
493   return (-1);
494 }
495
496 /* Get OVS DB registered callback by YAJL val. The YAJL
497  * value should be YAJL string (UID). Returns NULL if
498  * callback hasn't been found.
499  */
500 static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) {
501   char *endptr = NULL;
502   const char *suid = NULL;
503   uint64_t uid;
504
505   if (jid && YAJL_IS_STRING(jid)) {
506     suid = YAJL_GET_STRING(jid);
507     uid = (uint64_t)strtoul(suid, &endptr, 16);
508     if (*endptr == '\0' && uid)
509       return ovs_db_callback_get(pdb, uid);
510   }
511
512   return NULL;
513 }
514
515 /* OVS DB table update event handler.
516  * This callback is called by POLL thread if OVS DB
517  * table update callback is received from the DB
518  * server. Once registered callback found, it's called
519  * by this handler. */
520 static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) {
521   ovs_callback_t *cb = NULL;
522   yajl_val jvalue;
523   yajl_val jparams;
524   yajl_val jtable_updates;
525   yajl_val jtable_update;
526   size_t obj_len = 0;
527   const char *table_name = NULL;
528   const char *params_path[] = {"params", NULL};
529   const char *id_path[] = {"id", NULL};
530
531   /* check & get request attributes */
532   if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL ||
533       (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL))
534     goto ovs_failure;
535
536   /* check array length: [<json-value>, <table-updates>] */
537   if (YAJL_GET_ARRAY(jparams)->len != 2)
538     goto ovs_failure;
539
540   jvalue = YAJL_GET_ARRAY(jparams)->values[0];
541   jtable_updates = YAJL_GET_ARRAY(jparams)->values[1];
542   if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue)))
543     goto ovs_failure;
544
545   /* find registered callback based on <json-value> */
546   cb = ovs_db_table_callback_get(pdb, jvalue);
547   if (cb == NULL || cb->table.call == NULL)
548     goto ovs_failure;
549
550   /* call registered callback */
551   cb->table.call(jtable_updates);
552   return 0;
553
554 ovs_failure:
555   OVS_ERROR("invalid OVS DB table update event");
556   return (-1);
557 }
558
559 /* OVS DB result request handler.
560  * This callback is called by POLL thread if OVS DB
561  * result reply is received from the DB server.
562  * Once registered callback found, it's called
563  * by this handler. */
564 static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) {
565   ovs_callback_t *cb = NULL;
566   yajl_val jresult;
567   yajl_val jerror;
568   yajl_val jid;
569   const char *result_path[] = {"result", NULL};
570   const char *error_path[] = {"error", NULL};
571   const char *id_path[] = {"id", NULL};
572
573   jresult = yajl_tree_get(jnode, result_path, yajl_t_any);
574   jerror = yajl_tree_get(jnode, error_path, yajl_t_any);
575   jid = yajl_tree_get(jnode, id_path, yajl_t_string);
576
577   /* check & get result attributes */
578   if (!jresult || !jerror || !jid)
579     return (-1);
580
581   /* try to find registered callback */
582   cb = ovs_db_table_callback_get(pdb, jid);
583   if (cb != NULL && cb->result.call != NULL) {
584     /* call registered callback */
585     cb->result.call(jresult, jerror);
586     /* unlock owner of the reply */
587     sem_post(&cb->result.sync);
588   }
589
590   return (0);
591 }
592
593 /* Handle JSON data (one request) and call
594  * appropriate event OVS DB handler. Currently,
595  * update callback 'ovs_db_table_update_cb' and
596  * result callback 'ovs_db_result_cb' is supported.
597  */
598 static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data,
599                                     size_t len) {
600   const char *method = NULL;
601   char yajl_errbuf[OVS_YAJL_ERROR_BUFFER_SIZE];
602   const char *method_path[] = {"method", NULL};
603   const char *result_path[] = {"result", NULL};
604   char *sjson = NULL;
605   yajl_val jnode, jval;
606
607   /* duplicate the data to make null-terminated string
608    * required for yajl_tree_parse() */
609   if ((sjson = malloc(len + 1)) == NULL)
610     return (-1);
611
612   sstrncpy(sjson, data, len + 1);
613   OVS_DEBUG("[len=%zu] %s", len, sjson);
614
615   /* parse json data */
616   jnode = yajl_tree_parse(sjson, yajl_errbuf, sizeof(yajl_errbuf));
617   if (jnode == NULL) {
618     OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf);
619     sfree(sjson);
620     return (-1);
621   }
622
623   /* get method name */
624   if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) {
625     method = YAJL_GET_STRING(jval);
626     if (strcmp("echo", method) == 0) {
627       /* echo request from the server */
628       if (ovs_db_table_echo_cb(pdb, jnode) < 0)
629         OVS_ERROR("handle echo request failed");
630     } else if (strcmp("update", method) == 0) {
631       /* update notification */
632       if (ovs_db_table_update_cb(pdb, jnode) < 0)
633         OVS_ERROR("handle update notification failed");
634     }
635   } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_any)) {
636     /* result notification */
637     if (ovs_db_result_cb(pdb, jnode) < 0)
638       OVS_ERROR("handle result reply failed");
639   } else
640     OVS_ERROR("connot find method or result failed");
641
642   /* release memory */
643   yajl_tree_free(jnode);
644   sfree(sjson);
645   return (0);
646 }
647
648 /*
649  * JSON reader implementation.
650  *
651  * This module process raw JSON data (byte stream) and
652  * returns fully-fledged JSON data which can be processed
653  * (parsed) by YAJL later.
654  */
655
656 /* Allocate JSON reader instance */
657 static inline ovs_json_reader_t *ovs_json_reader_alloc() {
658   ovs_json_reader_t *jreader = NULL;
659
660   if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL)
661     return NULL;
662
663   return jreader;
664 }
665
666 /* Push raw data into into the JSON reader for processing */
667 static inline int ovs_json_reader_push_data(ovs_json_reader_t *jreader,
668                                             const char *data, size_t data_len) {
669   char *new_buff = NULL;
670   size_t available = jreader->buff_size - jreader->buff_offset;
671
672   /* check/update required memory space */
673   if (available < data_len) {
674     OVS_DEBUG("Reallocate buffer [size=%d, available=%d required=%d]",
675               (int)jreader->buff_size, (int)available, (int)data_len);
676
677     /* allocate new chunk of memory */
678     new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len));
679     if (new_buff == NULL)
680       return (-1);
681
682     /* point to new allocated memory */
683     jreader->buff_ptr = new_buff;
684     jreader->buff_size += data_len;
685   }
686
687   /* store input data */
688   memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len);
689   jreader->buff_offset += data_len;
690   return (0);
691 }
692
693 /* Pop one fully-fledged JSON if already exists. Returns 0 if
694  * completed JSON already exists otherwise negative value is
695  * returned */
696 static inline int ovs_json_reader_pop(ovs_json_reader_t *jreader,
697                                       const char **json_ptr,
698                                       size_t *json_len_ptr) {
699   size_t nbraces = 0;
700   size_t json_len = 0;
701   char *json = NULL;
702
703   /* search open/close brace */
704   for (int i = jreader->json_offset; i < jreader->buff_offset; i++) {
705     if (jreader->buff_ptr[i] == '{') {
706       nbraces++;
707     } else if (jreader->buff_ptr[i] == '}')
708       if (nbraces)
709         if (!(--nbraces)) {
710           /* JSON data */
711           *json_ptr = jreader->buff_ptr + jreader->json_offset;
712           *json_len_ptr = json_len + 1;
713           jreader->json_offset = i + 1;
714           return (0);
715         }
716
717     /* increase JSON data length */
718     if (nbraces)
719       json_len++;
720   }
721
722   if (jreader->json_offset) {
723     if (jreader->json_offset < jreader->buff_offset) {
724       /* shift data to the beginning of the buffer
725        * and zero rest of the buffer data */
726       json = &jreader->buff_ptr[jreader->json_offset];
727       json_len = jreader->buff_offset - jreader->json_offset;
728       for (int i = 0; i < jreader->buff_size; i++)
729         jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0));
730       jreader->buff_offset = json_len;
731     } else
732       /* reset the buffer */
733       jreader->buff_offset = 0;
734
735     /* data is at the beginning of the buffer */
736     jreader->json_offset = 0;
737   }
738
739   return (-1);
740 }
741
742 /* Reset JSON reader. It is useful when start processing
743  * new raw data. E.g.: in case of lost stream connection.
744  */
745 static inline void ovs_json_reader_reset(ovs_json_reader_t *jreader) {
746   if (jreader) {
747     jreader->buff_offset = 0;
748     jreader->json_offset = 0;
749   }
750 }
751
752 /* Release internal data allocated for JSON reader */
753 static inline void ovs_json_reader_free(ovs_json_reader_t *jreader) {
754   if (jreader) {
755     free(jreader->buff_ptr);
756     free(jreader);
757   }
758 }
759
760 /* Reconnect to OVD DB and call init OVS DB callback
761  * 'init_cb' if connection has been established.
762  */
763 static int ovs_db_reconnect(ovs_db_t *pdb) {
764   char errbuff[OVS_ERROR_BUFF_SIZE];
765
766   /* remove all registered OVS DB table/result callbacks */
767   ovs_db_callback_remove_all(pdb);
768
769   /* open new socket */
770   if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) {
771     sstrerror(errno, errbuff, sizeof(errbuff));
772     OVS_ERROR("socket(): %s", errbuff);
773     return (-1);
774   }
775
776   /* try to connect to server */
777   if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr,
778               pdb->conn.addr_size) < 0) {
779     sstrerror(errno, errbuff, sizeof(errbuff));
780     OVS_ERROR("connect(): %s", errbuff);
781     close(pdb->conn.sock);
782     return (-1);
783   }
784
785   /* send notification to event thread */
786   ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
787   return (0);
788 }
789
790 /* POLL worker thread.
791  * It listens on OVS DB connection for incoming
792  * requests/reply/events etc. Also, it reconnects to OVS DB
793  * if connection has been lost.
794  */
795 static void *ovs_poll_worker(void *arg) {
796   ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */
797   ovs_json_reader_t *jreader = NULL;
798   const char *json;
799   size_t json_len;
800   ssize_t nbytes = 0;
801   char buff[OVS_DB_POLL_READ_BLOCK_SIZE];
802   struct pollfd poll_fd;
803   int poll_ret = 0;
804
805   if ((jreader = ovs_json_reader_alloc()) == NULL) {
806     OVS_ERROR("initialize json reader failed");
807     goto thread_exit;
808   }
809
810   /* start polling data */
811   poll_fd.fd = pdb->conn.sock;
812   poll_fd.events = POLLIN | POLLPRI;
813   poll_fd.revents = 0;
814
815   /* poll data */
816   while (ovs_db_poll_is_running(pdb)) {
817     poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000);
818     if (poll_ret > 0) {
819       if (poll_fd.revents & POLLNVAL) {
820         /* invalid file descriptor, reconnect */
821         if (ovs_db_reconnect(pdb) != 0) {
822           /* sleep awhile until next reconnect */
823           usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000);
824         }
825         ovs_json_reader_reset(jreader);
826         poll_fd.fd = pdb->conn.sock;
827       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
828         /* connection is broken */
829         OVS_ERROR("poll() peer closed its end of the channel");
830         ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
831         close(poll_fd.fd);
832       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
833         /* read incoming data */
834         nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0);
835         if (nbytes > 0) {
836           OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes);
837           ovs_json_reader_push_data(jreader, buff, nbytes);
838           while (!ovs_json_reader_pop(jreader, &json, &json_len))
839             /* process JSON data */
840             ovs_db_json_data_process(pdb, json, json_len);
841         } else if (nbytes == 0) {
842           OVS_ERROR("recv() peer has performed an orderly shutdown");
843           ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
844           close(poll_fd.fd);
845         } else {
846           OVS_ERROR("recv() receive data error");
847           break;
848         }
849       } /* poll() POLLIN & POLLPRI */
850     } else if (poll_ret == 0)
851       OVS_DEBUG("poll() timeout");
852     else {
853       OVS_ERROR("poll() error");
854       break;
855     }
856   }
857
858 thread_exit:
859   OVS_DEBUG("poll thread has been completed");
860   ovs_json_reader_free(jreader);
861   pthread_exit((void *)0);
862   return ((void *)0);
863 }
864
865 /* EVENT worker thread.
866  * Perform task based on incoming events. This
867  * task can be done asynchronously which allows to
868  * handle OVD DB callback like 'init_cb'.
869  */
870 static void *ovs_event_worker(void *arg) {
871   int ret = 0;
872   ovs_db_t *pdb = (ovs_db_t *)arg;
873   struct timespec ts;
874
875   while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) {
876     /* wait for an event */
877     clock_gettime(CLOCK_REALTIME, &ts);
878     ts.tv_sec += (OVS_DB_EVENT_TIMEOUT);
879     ret = pthread_cond_timedwait(&pdb->event_thread.cond,
880                                  &pdb->event_thread.mutex, &ts);
881     if (!ret) {
882       /* handle the event */
883       OVS_DEBUG("handle event %d", pdb->event_thread.value);
884       switch (pdb->event_thread.value) {
885       case OVS_DB_EVENT_CONN_ESTABLISHED:
886         if (pdb->cb.post_conn_init)
887           pdb->cb.post_conn_init(pdb);
888         break;
889       case OVS_DB_EVENT_CONN_TERMINATED:
890         if (pdb->cb.post_conn_terminate)
891           pdb->cb.post_conn_terminate();
892         break;
893       default:
894         OVS_DEBUG("unknown event received");
895         break;
896       }
897     } else if (ret == ETIMEDOUT) {
898       /* wait timeout */
899       OVS_DEBUG("no event received (timeout)");
900       continue;
901     } else {
902       /* unexpected error */
903       OVS_ERROR("pthread_cond_timedwait() failed");
904       break;
905     }
906   }
907
908 thread_exit:
909   OVS_DEBUG("event thread has been completed");
910   pthread_exit((void *)0);
911   return ((void *)0);
912 }
913
914 /* Stop EVENT thread */
915 static int ovs_db_event_thread_stop(ovs_db_t *pdb) {
916   ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE);
917   if (pthread_join(pdb->event_thread.tid, NULL) != 0)
918     return (-1);
919   pthread_mutex_unlock(&pdb->event_thread.mutex);
920   pthread_mutex_destroy(&pdb->event_thread.mutex);
921   return (0);
922 }
923
924 /* Stop POLL thread */
925 static int ovs_db_poll_thread_stop(ovs_db_t *pdb) {
926   ovs_db_poll_terminate(pdb);
927   if (pthread_join(pdb->poll_thread.tid, NULL) != 0)
928     return (-1);
929   pthread_mutex_destroy(&pdb->poll_thread.mutex);
930   return (0);
931 }
932
933 /*
934  * Public OVS DB API implementation
935  */
936
937 ovs_db_t *ovs_db_init(const char *surl, ovs_db_callback_t *cb) {
938   pthread_mutexattr_t mutex_attr;
939   ovs_db_t *pdb = NULL;
940
941   /* allocate db data & fill it */
942   if ((pdb = calloc(1, sizeof(*pdb))) == NULL)
943     return (NULL);
944
945   /* convert string url to socket addr */
946   if (ovs_db_url_parse(surl, &pdb->conn) < 0)
947     goto failure;
948
949   /* setup OVS DB callbacks */
950   if (cb)
951     pdb->cb = *cb;
952
953   /* prepare event thread */
954   pthread_cond_init(&pdb->event_thread.cond, NULL);
955   pthread_mutex_init(&pdb->event_thread.mutex, NULL);
956   pthread_mutex_lock(&pdb->event_thread.mutex);
957   if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker,
958                            pdb) != 0) {
959     OVS_ERROR("event worker start failed");
960     goto failure;
961   }
962
963   /* prepare polling thread */
964   ovs_db_reconnect(pdb);
965   pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING;
966   pthread_mutex_init(&pdb->poll_thread.mutex, NULL);
967   if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) !=
968       0) {
969     OVS_ERROR("pull worker start failed");
970     goto failure;
971   }
972
973   /* init OVS DB mutex */
974   if (pthread_mutexattr_init(&mutex_attr) ||
975       pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) ||
976       pthread_mutex_init(&pdb->mutex, &mutex_attr)) {
977     OVS_ERROR("OVS DB mutex init failed");
978     goto failure;
979   }
980
981   /* return db to the caller */
982   return pdb;
983
984 failure:
985   if (pdb->conn.sock)
986     /* close connection */
987     close(pdb->conn.sock);
988   if (pdb->event_thread.tid != 0)
989     /* stop event thread */
990     if (ovs_db_event_thread_stop(pdb) < 0)
991       OVS_ERROR("stop event thread failed");
992   if (pdb->poll_thread.tid != 0)
993     /* stop poll thread */
994     if (ovs_db_poll_thread_stop(pdb) < 0)
995       OVS_ERROR("stop poll thread failed");
996   sfree(pdb);
997   return NULL;
998 }
999
1000 int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params,
1001                         ovs_db_result_cb_t cb) {
1002   int ret = 0;
1003   yajl_gen_status yajl_gen_ret;
1004   yajl_val jparams;
1005   yajl_gen jgen;
1006   ovs_callback_t *new_cb = NULL;
1007   uint64_t uid;
1008   char uid_buff[OVS_UID_STR_SIZE];
1009   const char *req = NULL;
1010   size_t req_len = 0;
1011   struct timespec ts;
1012
1013   /* sanity check */
1014   if (!pdb || !method || !params)
1015     return (-1);
1016
1017   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1018     return (-1);
1019
1020   /* try to parse params */
1021   if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) {
1022     OVS_ERROR("params is not a JSON string");
1023     yajl_gen_clear(jgen);
1024     return (-1);
1025   }
1026
1027   /* generate method field */
1028   OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1029
1030   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "method");
1031   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, method);
1032
1033   /* generate params field */
1034   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "params");
1035   OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jparams);
1036   yajl_tree_free(jparams);
1037
1038   /* generate id field */
1039   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id");
1040   uid = ovs_uid_generate();
1041   ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid);
1042   OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff);
1043
1044   OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1045
1046   if (cb) {
1047     /* register result callback */
1048     if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1049       goto yajl_gen_failure;
1050
1051     /* add new callback to front */
1052     sem_init(&new_cb->result.sync, 0, 0);
1053     new_cb->result.call = cb;
1054     new_cb->uid = uid;
1055     ovs_db_callback_add(pdb, new_cb);
1056   }
1057
1058   /* send the request */
1059   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&req, &req_len);
1060   OVS_DEBUG("%s", req);
1061   if (!ovs_db_data_send(pdb, req, req_len)) {
1062     if (cb) {
1063       /* wait for result */
1064       clock_gettime(CLOCK_REALTIME, &ts);
1065       ts.tv_sec += OVS_DB_SEND_REQ_TIMEOUT;
1066       if (sem_timedwait(&new_cb->result.sync, &ts) < 0) {
1067         OVS_ERROR("%s() no replay received within %d sec", __FUNCTION__,
1068                   OVS_DB_SEND_REQ_TIMEOUT);
1069         ret = (-1);
1070       }
1071     }
1072   } else {
1073     OVS_ERROR("ovs_db_data_send() failed");
1074     ret = (-1);
1075   }
1076
1077 yajl_gen_failure:
1078   if (new_cb) {
1079     /* destroy callback */
1080     sem_destroy(&new_cb->result.sync);
1081     ovs_db_callback_remove(pdb, new_cb);
1082   }
1083
1084   /* release memory */
1085   yajl_gen_clear(jgen);
1086   return (yajl_gen_ret != yajl_gen_status_ok) ? (-1) : ret;
1087 }
1088
1089 int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name,
1090                              const char **tb_column,
1091                              ovs_db_table_cb_t update_cb,
1092                              ovs_db_result_cb_t result_cb, unsigned int flags) {
1093   yajl_gen jgen;
1094   yajl_gen_status yajl_gen_ret;
1095   ovs_callback_t *new_cb = NULL;
1096   char uid_str[OVS_UID_STR_SIZE];
1097   char *params;
1098   size_t params_len;
1099   int ovs_db_ret = 0;
1100
1101   /* sanity check */
1102   if (pdb == NULL || tb_name == NULL || update_cb == NULL)
1103     return (-1);
1104
1105   if ((jgen = yajl_gen_alloc(NULL)) == NULL)
1106     return (-1);
1107
1108   /* register table update callback */
1109   if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL)
1110     return (-1);
1111
1112   /* add new callback to front */
1113   new_cb->table.call = update_cb;
1114   new_cb->uid = ovs_uid_generate();
1115   ovs_db_callback_add(pdb, new_cb);
1116
1117   /* make update notification request
1118    * [<db-name>, <json-value>, <monitor-requests>] */
1119   OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1120   {
1121     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME);
1122
1123     /* uid string <json-value> */
1124     ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid);
1125     OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str);
1126
1127     /* <monitor-requests> */
1128     OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1129     {
1130       OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, tb_name);
1131       OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1132       {
1133         /* <monitor-request> */
1134         OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1135         {
1136           if (tb_column) {
1137             /* columns within the table to be monitored */
1138             OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "columns");
1139             OVS_YAJL_CALL(yajl_gen_array_open, jgen);
1140             for (; *tb_column; tb_column++)
1141               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, *tb_column);
1142             OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1143           }
1144           /* specify select option */
1145           OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "select");
1146           {
1147             OVS_YAJL_CALL(yajl_gen_map_open, jgen);
1148             {
1149               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "initial");
1150               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1151                             flags & OVS_DB_TABLE_CB_FLAG_INITIAL);
1152               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "insert");
1153               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1154                             flags & OVS_DB_TABLE_CB_FLAG_INSERT);
1155               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "delete");
1156               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1157                             flags & OVS_DB_TABLE_CB_FLAG_DELETE);
1158               OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "modify");
1159               OVS_YAJL_CALL(yajl_gen_bool, jgen,
1160                             flags & OVS_DB_TABLE_CB_FLAG_MODIFY);
1161             }
1162             OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1163           }
1164         }
1165         OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1166       }
1167       OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1168     }
1169     OVS_YAJL_CALL(yajl_gen_map_close, jgen);
1170   }
1171   OVS_YAJL_CALL(yajl_gen_array_close, jgen);
1172
1173   /* make a request to subscribe to given table */
1174   OVS_YAJL_CALL(yajl_gen_get_buf, jgen, (const unsigned char **)&params,
1175                 &params_len);
1176   if (ovs_db_send_request(pdb, "monitor", params, result_cb) < 0) {
1177     OVS_ERROR("Failed to subscribe to \"%s\" table", tb_name);
1178     ovs_db_ret = (-1);
1179   }
1180
1181 yajl_gen_failure:
1182   /* release memory */
1183   yajl_gen_clear(jgen);
1184   return ovs_db_ret;
1185 }
1186
1187 int ovs_db_destroy(ovs_db_t *pdb) {
1188   int ovs_db_ret = 0;
1189   int ret = 0;
1190
1191   /* sanity check */
1192   if (pdb == NULL)
1193     return (-1);
1194
1195   /* try to lock the structure before releasing */
1196   if (ret = pthread_mutex_lock(&pdb->mutex)) {
1197     OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret);
1198     return (-1);
1199   }
1200
1201   /* stop poll thread */
1202   if (ovs_db_event_thread_stop(pdb) < 0) {
1203     OVS_ERROR("stop poll thread failed");
1204     ovs_db_ret = (-1);
1205   }
1206
1207   /* stop event thread */
1208   if (ovs_db_poll_thread_stop(pdb) < 0) {
1209     OVS_ERROR("stop event thread failed");
1210     ovs_db_ret = (-1);
1211   }
1212
1213   /* unsubscribe callbacks */
1214   ovs_db_callback_remove_all(pdb);
1215
1216   /* close connection */
1217   if (pdb->conn.sock)
1218     close(pdb->conn.sock);
1219
1220   /* release DB handler */
1221   pthread_mutex_unlock(&pdb->mutex);
1222   pthread_mutex_destroy(&pdb->mutex);
1223   sfree(pdb);
1224   return ovs_db_ret;
1225 }
1226
1227 /*
1228  * Public OVS utils API implementation
1229  */
1230
1231 /* Get YAJL value by key from YAJL dictionary */
1232 yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) {
1233   const char *obj_key = NULL;
1234
1235   /* check params */
1236   if (!YAJL_IS_OBJECT(jval) || (key == NULL))
1237     return NULL;
1238
1239   /* find a value by key */
1240   for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) {
1241     obj_key = YAJL_GET_OBJECT(jval)->keys[i];
1242     if (strcmp(obj_key, key) == 0)
1243       return YAJL_GET_OBJECT(jval)->values[i];
1244   }
1245
1246   return NULL;
1247 }
1248
1249 /* Get OVS DB map value by given map key */
1250 yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) {
1251   size_t map_len = 0;
1252   size_t array_len = 0;
1253   yajl_val *map_values = NULL;
1254   yajl_val *array_values = NULL;
1255   const char *str_val = NULL;
1256
1257   /* check YAJL array */
1258   if (!YAJL_IS_ARRAY(jval) || (key == NULL))
1259     return NULL;
1260
1261   /* check a database map value (2-element, first one should be a string */
1262   array_len = YAJL_GET_ARRAY(jval)->len;
1263   array_values = YAJL_GET_ARRAY(jval)->values;
1264   if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])) ||
1265       (!YAJL_IS_ARRAY(array_values[1])))
1266     return NULL;
1267
1268   /* check first element of the array */
1269   str_val = YAJL_GET_STRING(array_values[0]);
1270   if (strcmp("map", str_val) != 0)
1271     return NULL;
1272
1273   /* try to find map value by map key */
1274   map_len = YAJL_GET_ARRAY(array_values[1])->len;
1275   map_values = YAJL_GET_ARRAY(array_values[1])->values;
1276   for (int i = 0; i < map_len; i++) {
1277     /* check YAJL array */
1278     if (!YAJL_IS_ARRAY(map_values[i]))
1279       break;
1280
1281     /* check a database pair value (2-element, first one represents a key
1282      * and it should be a string in our case */
1283     array_len = YAJL_GET_ARRAY(map_values[i])->len;
1284     array_values = YAJL_GET_ARRAY(map_values[i])->values;
1285     if ((array_len != 2) || (!YAJL_IS_STRING(array_values[0])))
1286       break;
1287
1288     /* return map value if given key equals map key */
1289     str_val = YAJL_GET_STRING(array_values[0]);
1290     if (strcmp(key, str_val) == 0)
1291       return array_values[1];
1292   }
1293   return NULL;
1294 }