X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Futils_ovs.c;h=9bca398da34551447a1ed380bbd4cca87afa8fa6;hb=0b7cd83a5e6bac068ea83a88a5ddcfb07c09fbec;hp=92914ce6d2358749edee8a4f48287149c5672a4d;hpb=193d73e9e99caa0be861d20f1a6ae335bfab4c9e;p=collectd.git diff --git a/src/utils_ovs.c b/src/utils_ovs.c index 92914ce6..9bca398d 100644 --- a/src/utils_ovs.c +++ b/src/utils_ovs.c @@ -3,14 +3,17 @@ * * Copyright(c) 2016 Intel Corporation. All rights reserved. * - * Permission is hereby granted, free of charge, to any person obtaining a copy of + * Permission is hereby granted, free of charge, to any person obtaining a copy + *of * this software and associated documentation files (the "Software"), to deal in * the Software without restriction, including without limitation the rights to * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies - * of the Software, and to permit persons to whom the Software is furnished to do + * of the Software, and to permit persons to whom the Software is furnished to + *do * so, subject to the following conditions: * - * The above copyright notice and this permission notice shall be included in all + * The above copyright notice and this permission notice shall be included in + *all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR @@ -65,16 +68,28 @@ /* clang-format on */ /* collectd headers */ +#include "collectd.h" + #include "common.h" /* private headers */ #include "utils_ovs.h" /* system libraries */ +#if HAVE_NETDB_H +#include +#endif +#if HAVE_ARPA_INET_H #include +#endif +#if HAVE_POLL_H #include -#include +#endif +#if HAVE_SYS_UN_H #include +#endif + +#include #define OVS_ERROR(fmt, ...) \ do { \ @@ -89,7 +104,6 @@ #define OVS_DB_POLL_TIMEOUT 1 /* poll receive timeout (sec) */ #define OVS_DB_POLL_READ_BLOCK_SIZE 512 /* read block size (bytes) */ #define OVS_DB_DEFAULT_DB_NAME "Open_vSwitch" -#define OVS_DB_RECONNECT_TIMEOUT 1 /* reconnect timeout (sec) */ #define OVS_DB_EVENT_TIMEOUT 5 /* event thread timeout (sec) */ #define OVS_DB_EVENT_TERMINATE 1 @@ -145,19 +159,6 @@ struct ovs_callback_s { }; typedef struct ovs_callback_s ovs_callback_t; -/* Connection declaration */ -struct ovs_conn_s { - int sock; - int domain; - int type; - int addr_size; - union { - struct sockaddr_in s_inet; - struct sockaddr_un s_unix; - } addr; -}; -typedef struct ovs_conn_s ovs_conn_t; - /* Event thread data declaration */ struct ovs_event_thread_s { pthread_t tid; @@ -182,9 +183,15 @@ struct ovs_db_s { pthread_mutex_t mutex; ovs_callback_t *remote_cb; ovs_db_callback_t cb; - ovs_conn_t conn; + char service[OVS_DB_ADDR_SERVICE_SIZE]; + char node[OVS_DB_ADDR_NODE_SIZE]; + char unix_path[OVS_DB_ADDR_NODE_SIZE]; + int sock; }; -typedef struct ovs_db_s ovs_db_t; + +/* Global variables */ +static uint64_t ovs_uid = 0; +static pthread_mutex_t ovs_uid_mutex = PTHREAD_MUTEX_INITIALIZER; /* Post an event to event thread. * Possible events are: @@ -201,27 +208,22 @@ static void ovs_db_event_post(ovs_db_t *pdb, int event) { /* Check if POLL thread is still running. Returns * 1 if running otherwise 0 is returned */ -static inline int ovs_db_poll_is_running(ovs_db_t *pdb) { +static _Bool ovs_db_poll_is_running(ovs_db_t *pdb) { int state = 0; pthread_mutex_lock(&pdb->poll_thread.mutex); state = pdb->poll_thread.state; pthread_mutex_unlock(&pdb->poll_thread.mutex); - return (state == OVS_DB_POLL_STATE_RUNNING); -} - -/* Terminate POLL thread */ -static inline void ovs_db_poll_terminate(ovs_db_t *pdb) { - pthread_mutex_lock(&pdb->poll_thread.mutex); - pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING; - pthread_mutex_unlock(&pdb->poll_thread.mutex); + return state == OVS_DB_POLL_STATE_RUNNING; } /* Generate unique identifier (UID). It is used by OVS DB API * to set "id" field for any OVS DB JSON request. */ static uint64_t ovs_uid_generate() { - struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - return ((ts.tv_sec << 32) | (ts.tv_nsec & UINT32_MAX)); + uint64_t new_uid; + pthread_mutex_lock(&ovs_uid_mutex); + new_uid = ++ovs_uid; + pthread_mutex_unlock(&ovs_uid_mutex); + return new_uid; } /* @@ -242,10 +244,10 @@ static void ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb) { /* Remove callback from OVS DB object */ static void ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb) { + pthread_mutex_lock(&pdb->mutex); ovs_callback_t *pre_cb = del_cb->prev; ovs_callback_t *next_cb = del_cb->next; - pthread_mutex_lock(&pdb->mutex); if (next_cb) next_cb->prev = del_cb->prev; @@ -263,7 +265,7 @@ static void ovs_db_callback_remove_all(ovs_db_t *pdb) { pthread_mutex_lock(&pdb->mutex); for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb; del_cb = pdb->remote_cb) { - pdb->remote_cb = pdb->remote_cb->next; + pdb->remote_cb = del_cb->next; free(del_cb); } pdb->remote_cb = NULL; @@ -271,15 +273,16 @@ static void ovs_db_callback_remove_all(ovs_db_t *pdb) { } /* Get/find callback in OVS DB object by UID. Returns pointer - * to requested callback otherwise NULL is returned */ + * to requested callback otherwise NULL is returned. + * + * IMPORTANT NOTE: + * The OVS DB mutex MUST be locked by the caller + * to make sure that returned callback is still valid. + */ static ovs_callback_t *ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid) { - pthread_mutex_lock(&pdb->mutex); for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next) - if (cb->uid == uid) { - pthread_mutex_unlock(&pdb->mutex); + if (cb->uid == uid) return cb; - } - pthread_mutex_unlock(&pdb->mutex); return NULL; } @@ -292,76 +295,12 @@ static int ovs_db_data_send(const ovs_db_t *pdb, const char *data, size_t len) { size_t off = 0; while (rem > 0) { - if ((nbytes = send(pdb->conn.sock, data + off, rem, 0)) <= 0) - return (-1); + if ((nbytes = send(pdb->sock, data + off, rem, 0)) <= 0) + return -1; rem -= (size_t)nbytes; off += (size_t)nbytes; } - return (0); -} - -/* Parse OVS server URL. - * Format of the URL: - * "tcp:a.b.c.d:port" - define TCP connection (INET domain) - * "unix:file" - define UNIX socket file (UNIX domain) - */ -static int ovs_db_url_parse(const char *surl, ovs_conn_t *conn) { - ovs_conn_t tmp_conn; - char *nexttok = NULL; - char *in_str = NULL; - char *saveptr; - int ret = 0; - - /* sanity check */ - if ((surl == NULL) || (strlen(surl) < 1)) - return (-1); - - /* parse domain */ - tmp_conn = *conn; - in_str = sstrdup(surl); - if ((nexttok = strtok_r(in_str, ":", &saveptr)) != NULL) { - if (strcmp("tcp", nexttok) == 0) { - tmp_conn.domain = AF_INET; - tmp_conn.type = SOCK_STREAM; - tmp_conn.addr_size = sizeof(tmp_conn.addr.s_inet); - } else if (strcmp("unix", nexttok) == 0) { - tmp_conn.domain = AF_UNIX; - tmp_conn.type = SOCK_STREAM; - tmp_conn.addr_size = sizeof(tmp_conn.addr.s_unix); - } else - goto failure; - } else - goto failure; - - /* parse url depending on domain */ - if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) { - if (tmp_conn.domain == AF_UNIX) { - /* */ - tmp_conn.addr.s_inet.sin_family = AF_UNIX; - sstrncpy(tmp_conn.addr.s_unix.sun_path, nexttok, strlen(nexttok) + 1); - } else { - /* */ - tmp_conn.addr.s_inet.sin_family = AF_INET; - ret = inet_pton(AF_INET, nexttok, (void *)&tmp_conn.addr.s_inet.sin_addr); - if (ret == 1) { - if ((nexttok = strtok_r(NULL, ":", &saveptr)) != NULL) - tmp_conn.addr.s_inet.sin_port = htons(atoi(nexttok)); - else - goto failure; - } else - goto failure; - } - } - - /* save result and return success */ - *conn = tmp_conn; - sfree(in_str); - return (0); - -failure: - OVS_ERROR("invalid OVS DB URL provided [url=%s]", surl); - sfree(in_str); - return (-1); + return 0; } /* @@ -376,9 +315,9 @@ failure: * jgen - YAJL generator handle allocated by yajl_gen_alloc() * string - Null-terminated string */ -static inline yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander, - const char *string) { - return yajl_gen_string(hander, string, strlen(string)); +static yajl_gen_status ovs_yajl_gen_tstring(yajl_gen hander, + const char *string) { + return yajl_gen_string(hander, (const unsigned char *)string, strlen(string)); } /* Add YAJL value into YAJL generator handle (JSON object) @@ -392,7 +331,10 @@ static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) { yajl_val jobj_value = NULL; const char *obj_key = NULL; size_t obj_len = 0; - yajl_gen_status yajl_gen_ret; + yajl_gen_status yajl_gen_ret = yajl_gen_status_ok; + + if (jval == NULL) + return yajl_gen_generation_complete; if (YAJL_IS_STRING(jval)) OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, YAJL_GET_STRING(jval)); @@ -411,14 +353,14 @@ static yajl_gen_status ovs_yajl_gen_val(yajl_gen jgen, yajl_val jval) { array_len = YAJL_GET_ARRAY(jval)->len; jvalues = YAJL_GET_ARRAY(jval)->values; OVS_YAJL_CALL(yajl_gen_array_open, jgen); - for (int i = 0; i < array_len; i++) + for (size_t i = 0; i < array_len; i++) OVS_YAJL_CALL(ovs_yajl_gen_val, jgen, jvalues[i]); OVS_YAJL_CALL(yajl_gen_array_close, jgen); } else if (YAJL_IS_OBJECT(jval)) { /* create new object and add all elements into the object */ OVS_YAJL_CALL(yajl_gen_map_open, jgen); obj_len = YAJL_GET_OBJECT(jval)->len; - for (int i = 0; i < obj_len; i++) { + for (size_t i = 0; i < obj_len; i++) { obj_key = YAJL_GET_OBJECT(jval)->keys[i]; jobj_value = YAJL_GET_OBJECT(jval)->values[i]; OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, obj_key); @@ -452,7 +394,7 @@ static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) { yajl_gen_status yajl_gen_ret; if ((jgen = yajl_gen_alloc(NULL)) == NULL) - return (-1); + return -1; /* check & get request attributes */ if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL || @@ -485,17 +427,18 @@ static int ovs_db_table_echo_cb(const ovs_db_t *pdb, yajl_val jnode) { } /* clean up and return success */ yajl_gen_clear(jgen); - return (0); + return 0; yajl_gen_failure: /* release memory */ yajl_gen_clear(jgen); - return (-1); + return -1; } /* Get OVS DB registered callback by YAJL val. The YAJL * value should be YAJL string (UID). Returns NULL if - * callback hasn't been found. + * callback hasn't been found. See also ovs_db_callback_get() + * description for addition info. */ static ovs_callback_t *ovs_db_table_callback_get(ovs_db_t *pdb, yajl_val jid) { char *endptr = NULL; @@ -522,38 +465,43 @@ static int ovs_db_table_update_cb(ovs_db_t *pdb, yajl_val jnode) { yajl_val jvalue; yajl_val jparams; yajl_val jtable_updates; - yajl_val jtable_update; - size_t obj_len = 0; - const char *table_name = NULL; const char *params_path[] = {"params", NULL}; const char *id_path[] = {"id", NULL}; /* check & get request attributes */ if ((jparams = yajl_tree_get(jnode, params_path, yajl_t_array)) == NULL || - (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) - goto ovs_failure; + (yajl_tree_get(jnode, id_path, yajl_t_null) == NULL)) { + OVS_ERROR("invalid OVS DB request received"); + return -1; + } /* check array length: [, ] */ - if (YAJL_GET_ARRAY(jparams)->len != 2) - goto ovs_failure; + if ((YAJL_GET_ARRAY(jparams) == NULL) || + (YAJL_GET_ARRAY(jparams)->len != 2)) { + OVS_ERROR("invalid OVS DB request received"); + return -1; + } jvalue = YAJL_GET_ARRAY(jparams)->values[0]; jtable_updates = YAJL_GET_ARRAY(jparams)->values[1]; - if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) - goto ovs_failure; + if ((!YAJL_IS_OBJECT(jtable_updates)) || (!YAJL_IS_STRING(jvalue))) { + OVS_ERROR("invalid OVS DB request id or table update received"); + return -1; + } /* find registered callback based on */ + pthread_mutex_lock(&pdb->mutex); cb = ovs_db_table_callback_get(pdb, jvalue); - if (cb == NULL || cb->table.call == NULL) - goto ovs_failure; + if (cb == NULL || cb->table.call == NULL) { + OVS_ERROR("No OVS DB table update callback found"); + pthread_mutex_unlock(&pdb->mutex); + return -1; + } /* call registered callback */ cb->table.call(jtable_updates); + pthread_mutex_unlock(&pdb->mutex); return 0; - -ovs_failure: - OVS_ERROR("invalid OVS DB table update event"); - return (-1); } /* OVS DB result request handler. @@ -576,9 +524,10 @@ static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) { /* check & get result attributes */ if (!jresult || !jerror || !jid) - return (-1); + return -1; /* try to find registered callback */ + pthread_mutex_lock(&pdb->mutex); cb = ovs_db_table_callback_get(pdb, jid); if (cb != NULL && cb->result.call != NULL) { /* call registered callback */ @@ -587,7 +536,8 @@ static int ovs_db_result_cb(ovs_db_t *pdb, yajl_val jnode) { sem_post(&cb->result.sync); } - return (0); + pthread_mutex_unlock(&pdb->mutex); + return 0; } /* Handle JSON data (one request) and call @@ -606,8 +556,8 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, /* duplicate the data to make null-terminated string * required for yajl_tree_parse() */ - if ((sjson = malloc(len + 1)) == NULL) - return (-1); + if ((sjson = calloc(1, len + 1)) == NULL) + return -1; sstrncpy(sjson, data, len + 1); OVS_DEBUG("[len=%zu] %s", len, sjson); @@ -617,12 +567,16 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, if (jnode == NULL) { OVS_ERROR("yajl_tree_parse() %s", yajl_errbuf); sfree(sjson); - return (-1); + return -1; } /* get method name */ - if (jval = yajl_tree_get(jnode, method_path, yajl_t_string)) { - method = YAJL_GET_STRING(jval); + if ((jval = yajl_tree_get(jnode, method_path, yajl_t_string)) != NULL) { + if ((method = YAJL_GET_STRING(jval)) == NULL) { + yajl_tree_free(jnode); + sfree(sjson); + return -1; + } if (strcmp("echo", method) == 0) { /* echo request from the server */ if (ovs_db_table_echo_cb(pdb, jnode) < 0) @@ -632,7 +586,7 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, if (ovs_db_table_update_cb(pdb, jnode) < 0) OVS_ERROR("handle update notification failed"); } - } else if (jval = yajl_tree_get(jnode, result_path, yajl_t_any)) { + } else if ((jval = yajl_tree_get(jnode, result_path, yajl_t_any)) != NULL) { /* result notification */ if (ovs_db_result_cb(pdb, jnode) < 0) OVS_ERROR("handle result reply failed"); @@ -642,7 +596,7 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, /* release memory */ yajl_tree_free(jnode); sfree(sjson); - return (0); + return 0; } /* @@ -654,7 +608,7 @@ static int ovs_db_json_data_process(ovs_db_t *pdb, const char *data, */ /* Allocate JSON reader instance */ -static inline ovs_json_reader_t *ovs_json_reader_alloc() { +static ovs_json_reader_t *ovs_json_reader_alloc() { ovs_json_reader_t *jreader = NULL; if ((jreader = calloc(sizeof(ovs_json_reader_t), 1)) == NULL) @@ -664,8 +618,8 @@ static inline ovs_json_reader_t *ovs_json_reader_alloc() { } /* Push raw data into into the JSON reader for processing */ -static inline int ovs_json_reader_push_data(ovs_json_reader_t *jreader, - const char *data, size_t data_len) { +static int ovs_json_reader_push_data(ovs_json_reader_t *jreader, + const char *data, size_t data_len) { char *new_buff = NULL; size_t available = jreader->buff_size - jreader->buff_offset; @@ -677,7 +631,7 @@ static inline int ovs_json_reader_push_data(ovs_json_reader_t *jreader, /* allocate new chunk of memory */ new_buff = realloc(jreader->buff_ptr, (jreader->buff_size + data_len)); if (new_buff == NULL) - return (-1); + return -1; /* point to new allocated memory */ jreader->buff_ptr = new_buff; @@ -687,21 +641,20 @@ static inline int ovs_json_reader_push_data(ovs_json_reader_t *jreader, /* store input data */ memcpy(jreader->buff_ptr + jreader->buff_offset, data, data_len); jreader->buff_offset += data_len; - return (0); + return 0; } /* Pop one fully-fledged JSON if already exists. Returns 0 if * completed JSON already exists otherwise negative value is * returned */ -static inline int ovs_json_reader_pop(ovs_json_reader_t *jreader, - const char **json_ptr, - size_t *json_len_ptr) { +static int ovs_json_reader_pop(ovs_json_reader_t *jreader, + const char **json_ptr, size_t *json_len_ptr) { size_t nbraces = 0; size_t json_len = 0; char *json = NULL; /* search open/close brace */ - for (int i = jreader->json_offset; i < jreader->buff_offset; i++) { + for (size_t i = jreader->json_offset; i < jreader->buff_offset; i++) { if (jreader->buff_ptr[i] == '{') { nbraces++; } else if (jreader->buff_ptr[i] == '}') @@ -711,7 +664,7 @@ static inline int ovs_json_reader_pop(ovs_json_reader_t *jreader, *json_ptr = jreader->buff_ptr + jreader->json_offset; *json_len_ptr = json_len + 1; jreader->json_offset = i + 1; - return (0); + return 0; } /* increase JSON data length */ @@ -725,7 +678,7 @@ static inline int ovs_json_reader_pop(ovs_json_reader_t *jreader, * and zero rest of the buffer data */ json = &jreader->buff_ptr[jreader->json_offset]; json_len = jreader->buff_offset - jreader->json_offset; - for (int i = 0; i < jreader->buff_size; i++) + for (size_t i = 0; i < jreader->buff_size; i++) jreader->buff_ptr[i] = ((i < json_len) ? (json[i]) : (0)); jreader->buff_offset = json_len; } else @@ -736,13 +689,13 @@ static inline int ovs_json_reader_pop(ovs_json_reader_t *jreader, jreader->json_offset = 0; } - return (-1); + return -1; } /* Reset JSON reader. It is useful when start processing * new raw data. E.g.: in case of lost stream connection. */ -static inline void ovs_json_reader_reset(ovs_json_reader_t *jreader) { +static void ovs_json_reader_reset(ovs_json_reader_t *jreader) { if (jreader) { jreader->buff_offset = 0; jreader->json_offset = 0; @@ -750,41 +703,74 @@ static inline void ovs_json_reader_reset(ovs_json_reader_t *jreader) { } /* Release internal data allocated for JSON reader */ -static inline void ovs_json_reader_free(ovs_json_reader_t *jreader) { +static void ovs_json_reader_free(ovs_json_reader_t *jreader) { if (jreader) { free(jreader->buff_ptr); free(jreader); } } -/* Reconnect to OVD DB and call init OVS DB callback - * 'init_cb' if connection has been established. +/* Reconnect to OVS DB and call the OVS DB post connection init callback + * if connection has been established. */ -static int ovs_db_reconnect(ovs_db_t *pdb) { - char errbuff[OVS_ERROR_BUFF_SIZE]; - - /* remove all registered OVS DB table/result callbacks */ - ovs_db_callback_remove_all(pdb); - - /* open new socket */ - if ((pdb->conn.sock = socket(pdb->conn.domain, pdb->conn.type, 0)) < 0) { - sstrerror(errno, errbuff, sizeof(errbuff)); - OVS_ERROR("socket(): %s", errbuff); - return (-1); +static void ovs_db_reconnect(ovs_db_t *pdb) { + const char *node_info = pdb->node; + struct addrinfo *result; + + if (pdb->unix_path[0] != '\0') { + /* use UNIX socket instead of INET address */ + node_info = pdb->unix_path; + result = calloc(1, sizeof(struct addrinfo)); + struct sockaddr_un *sa_unix = calloc(1, sizeof(struct sockaddr_un)); + if (result == NULL || sa_unix == NULL) { + sfree(result); + sfree(sa_unix); + return; + } + result->ai_family = AF_UNIX; + result->ai_socktype = SOCK_STREAM; + result->ai_addrlen = sizeof(*sa_unix); + result->ai_addr = (struct sockaddr *)sa_unix; + sa_unix->sun_family = result->ai_family; + sstrncpy(sa_unix->sun_path, pdb->unix_path, sizeof(sa_unix->sun_path)); + } else { + /* inet socket address */ + struct addrinfo hints; + + /* setup criteria for selecting the socket address */ + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + /* get socket addresses */ + int ret = getaddrinfo(pdb->node, pdb->service, &hints, &result); + if (ret != 0) { + OVS_ERROR("getaddrinfo(): %s", gai_strerror(ret)); + return; + } } - - /* try to connect to server */ - if (connect(pdb->conn.sock, (struct sockaddr *)&pdb->conn.addr, - pdb->conn.addr_size) < 0) { - sstrerror(errno, errbuff, sizeof(errbuff)); - OVS_ERROR("connect(): %s", errbuff); - close(pdb->conn.sock); - return (-1); + /* try to connect to the server */ + for (struct addrinfo *rp = result; rp != NULL; rp = rp->ai_next) { + int sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sock < 0) { + OVS_DEBUG("socket(): %s", STRERRNO); + continue; + } + if (connect(sock, rp->ai_addr, rp->ai_addrlen) < 0) { + close(sock); + OVS_DEBUG("connect(): %s [family=%d]", STRERRNO, rp->ai_family); + } else { + /* send notification to event thread */ + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED); + pdb->sock = sock; + break; + } } - /* send notification to event thread */ - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED); - return (0); + if (pdb->sock < 0) + OVS_ERROR("connect to \"%s\" failed", node_info); + + freeaddrinfo(result); } /* POLL worker thread. @@ -795,89 +781,88 @@ static int ovs_db_reconnect(ovs_db_t *pdb) { static void *ovs_poll_worker(void *arg) { ovs_db_t *pdb = (ovs_db_t *)arg; /* pointer to OVS DB */ ovs_json_reader_t *jreader = NULL; - const char *json; - size_t json_len; - ssize_t nbytes = 0; - char buff[OVS_DB_POLL_READ_BLOCK_SIZE]; - struct pollfd poll_fd; - int poll_ret = 0; + struct pollfd poll_fd = { + .fd = pdb->sock, .events = POLLIN | POLLPRI, .revents = 0, + }; + /* create JSON reader instance */ if ((jreader = ovs_json_reader_alloc()) == NULL) { OVS_ERROR("initialize json reader failed"); - goto thread_exit; + return NULL; } - /* start polling data */ - poll_fd.fd = pdb->conn.sock; - poll_fd.events = POLLIN | POLLPRI; - poll_fd.revents = 0; - /* poll data */ while (ovs_db_poll_is_running(pdb)) { - poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000); - if (poll_ret > 0) { - if (poll_fd.revents & POLLNVAL) { - /* invalid file descriptor, reconnect */ - if (ovs_db_reconnect(pdb) != 0) { - /* sleep awhile until next reconnect */ - usleep(OVS_DB_RECONNECT_TIMEOUT * 1000000); - } - ovs_json_reader_reset(jreader); - poll_fd.fd = pdb->conn.sock; - } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { - /* connection is broken */ - OVS_ERROR("poll() peer closed its end of the channel"); - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); - close(poll_fd.fd); - } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { - /* read incoming data */ - nbytes = recv(poll_fd.fd, buff, OVS_DB_POLL_READ_BLOCK_SIZE, 0); - if (nbytes > 0) { - OVS_DEBUG("recv(): received %d bytes of data", (int)nbytes); - ovs_json_reader_push_data(jreader, buff, nbytes); - while (!ovs_json_reader_pop(jreader, &json, &json_len)) - /* process JSON data */ - ovs_db_json_data_process(pdb, json, json_len); - } else if (nbytes == 0) { - OVS_ERROR("recv() peer has performed an orderly shutdown"); - ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); - close(poll_fd.fd); - } else { - OVS_ERROR("recv() receive data error"); - break; - } - } /* poll() POLLIN & POLLPRI */ - } else if (poll_ret == 0) - OVS_DEBUG("poll() timeout"); - else { - OVS_ERROR("poll() error"); + poll_fd.fd = pdb->sock; + int poll_ret = poll(&poll_fd, 1, /* ms */ OVS_DB_POLL_TIMEOUT * 1000); + if (poll_ret < 0) { + OVS_ERROR("poll(): %s", STRERRNO); break; + } else if (poll_ret == 0) { + OVS_DEBUG("poll(): timeout"); + if (pdb->sock < 0) + /* invalid fd, so try to reconnect */ + ovs_db_reconnect(pdb); + continue; + } + if (poll_fd.revents & POLLNVAL) { + /* invalid file descriptor, clean-up */ + ovs_db_callback_remove_all(pdb); + ovs_json_reader_reset(jreader); + /* setting poll FD to -1 tells poll() call to ignore this FD. + * In that case poll() call will return timeout all the time */ + pdb->sock = (-1); + } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) { + /* connection is broken */ + close(poll_fd.fd); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); + OVS_ERROR("poll() peer closed its end of the channel"); + } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) { + /* read incoming data */ + char buff[OVS_DB_POLL_READ_BLOCK_SIZE]; + ssize_t nbytes = recv(poll_fd.fd, buff, sizeof(buff), 0); + if (nbytes < 0) { + OVS_ERROR("recv(): %s", STRERRNO); + /* read error? Try to reconnect */ + close(poll_fd.fd); + continue; + } else if (nbytes == 0) { + close(poll_fd.fd); + ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED); + OVS_ERROR("recv() peer has performed an orderly shutdown"); + continue; + } + /* read incoming data */ + size_t json_len = 0; + const char *json = NULL; + OVS_DEBUG("recv(): received %zd bytes of data", nbytes); + ovs_json_reader_push_data(jreader, buff, nbytes); + while (!ovs_json_reader_pop(jreader, &json, &json_len)) + /* process JSON data */ + ovs_db_json_data_process(pdb, json, json_len); } } -thread_exit: OVS_DEBUG("poll thread has been completed"); ovs_json_reader_free(jreader); - pthread_exit((void *)0); - return ((void *)0); + return NULL; } /* EVENT worker thread. * Perform task based on incoming events. This * task can be done asynchronously which allows to - * handle OVD DB callback like 'init_cb'. + * handle OVS DB callback like 'init_cb'. */ static void *ovs_event_worker(void *arg) { - int ret = 0; ovs_db_t *pdb = (ovs_db_t *)arg; - struct timespec ts; while (pdb->event_thread.value != OVS_DB_EVENT_TERMINATE) { /* wait for an event */ + struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += (OVS_DB_EVENT_TIMEOUT); - ret = pthread_cond_timedwait(&pdb->event_thread.cond, - &pdb->event_thread.mutex, &ts); + int ret = pthread_cond_timedwait(&pdb->event_thread.cond, + &pdb->event_thread.mutex, &ts); if (!ret) { /* handle the event */ OVS_DEBUG("handle event %d", pdb->event_thread.value); @@ -905,96 +890,160 @@ static void *ovs_event_worker(void *arg) { } } -thread_exit: OVS_DEBUG("event thread has been completed"); - pthread_exit((void *)0); - return ((void *)0); + return NULL; +} + +/* Initialize EVENT thread */ +static int ovs_db_event_thread_init(ovs_db_t *pdb) { + pdb->event_thread.tid = (pthread_t)-1; + /* init event thread condition variable */ + if (pthread_cond_init(&pdb->event_thread.cond, NULL)) { + return -1; + } + /* init event thread mutex */ + if (pthread_mutex_init(&pdb->event_thread.mutex, NULL)) { + pthread_cond_destroy(&pdb->event_thread.cond); + return -1; + } + /* Hold the event thread mutex. It ensures that no events + * will be lost while thread is still starting. Once event + * thread is started and ready to accept events, it will release + * the mutex */ + if (pthread_mutex_lock(&pdb->event_thread.mutex)) { + pthread_mutex_destroy(&pdb->event_thread.mutex); + pthread_cond_destroy(&pdb->event_thread.cond); + return -1; + } + /* start event thread */ + pthread_t tid; + if (plugin_thread_create(&tid, NULL, ovs_event_worker, pdb, + "utils_ovs:event") != 0) { + pthread_mutex_unlock(&pdb->event_thread.mutex); + pthread_mutex_destroy(&pdb->event_thread.mutex); + pthread_cond_destroy(&pdb->event_thread.cond); + return -1; + } + pdb->event_thread.tid = tid; + return 0; } -/* Stop EVENT thread */ -static int ovs_db_event_thread_stop(ovs_db_t *pdb) { +/* Destroy EVENT thread */ +static int ovs_db_event_thread_destroy(ovs_db_t *pdb) { + if (pdb->event_thread.tid == (pthread_t)-1) + /* already destroyed */ + return 0; ovs_db_event_post(pdb, OVS_DB_EVENT_TERMINATE); if (pthread_join(pdb->event_thread.tid, NULL) != 0) - return (-1); + return -1; + /* Event thread always holds the thread mutex when + * performs some task (handles event) and releases it when + * while sleeping. Thus, if event thread exits, the mutex + * remains locked */ pthread_mutex_unlock(&pdb->event_thread.mutex); pthread_mutex_destroy(&pdb->event_thread.mutex); - return (0); + pthread_cond_destroy(&pdb->event_thread.cond); + pdb->event_thread.tid = (pthread_t)-1; + return 0; +} + +/* Initialize POLL thread */ +static int ovs_db_poll_thread_init(ovs_db_t *pdb) { + pdb->poll_thread.tid = (pthread_t)-1; + /* init event thread mutex */ + if (pthread_mutex_init(&pdb->poll_thread.mutex, NULL)) { + return -1; + } + /* start poll thread */ + pthread_t tid; + pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING; + if (plugin_thread_create(&tid, NULL, ovs_poll_worker, pdb, + "utils_ovs:poll") != 0) { + pthread_mutex_destroy(&pdb->poll_thread.mutex); + return -1; + } + pdb->poll_thread.tid = tid; + return 0; } -/* Stop POLL thread */ -static int ovs_db_poll_thread_stop(ovs_db_t *pdb) { - ovs_db_poll_terminate(pdb); +/* Destroy POLL thread */ +static int ovs_db_poll_thread_destroy(ovs_db_t *pdb) { + if (pdb->poll_thread.tid == (pthread_t)-1) + /* already destroyed */ + return 0; + /* change thread state */ + pthread_mutex_lock(&pdb->poll_thread.mutex); + pdb->poll_thread.state = OVS_DB_POLL_STATE_EXITING; + pthread_mutex_unlock(&pdb->poll_thread.mutex); + /* join the thread */ if (pthread_join(pdb->poll_thread.tid, NULL) != 0) - return (-1); + return -1; pthread_mutex_destroy(&pdb->poll_thread.mutex); - return (0); + pdb->poll_thread.tid = (pthread_t)-1; + return 0; } /* * Public OVS DB API implementation */ -ovs_db_t *ovs_db_init(const char *surl, ovs_db_callback_t *cb) { - pthread_mutexattr_t mutex_attr; - ovs_db_t *pdb = NULL; +ovs_db_t *ovs_db_init(const char *node, const char *service, + const char *unix_path, ovs_db_callback_t *cb) { + /* sanity check */ + if (node == NULL || service == NULL || unix_path == NULL) + return NULL; /* allocate db data & fill it */ - if ((pdb = calloc(1, sizeof(*pdb))) == NULL) - return (NULL); + ovs_db_t *pdb = pdb = calloc(1, sizeof(*pdb)); + if (pdb == NULL) + return NULL; - /* convert string url to socket addr */ - if (ovs_db_url_parse(surl, &pdb->conn) < 0) - goto failure; + /* store the OVS DB address */ + sstrncpy(pdb->node, node, sizeof(pdb->node)); + sstrncpy(pdb->service, service, sizeof(pdb->service)); + sstrncpy(pdb->unix_path, unix_path, sizeof(pdb->unix_path)); /* setup OVS DB callbacks */ if (cb) pdb->cb = *cb; - /* prepare event thread */ - pthread_cond_init(&pdb->event_thread.cond, NULL); - pthread_mutex_init(&pdb->event_thread.mutex, NULL); - pthread_mutex_lock(&pdb->event_thread.mutex); - if (plugin_thread_create(&pdb->event_thread.tid, NULL, ovs_event_worker, - pdb) != 0) { - OVS_ERROR("event worker start failed"); - goto failure; + /* init OVS DB mutex attributes */ + pthread_mutexattr_t mutex_attr; + if (pthread_mutexattr_init(&mutex_attr)) { + OVS_ERROR("OVS DB mutex attribute init failed"); + sfree(pdb); + return NULL; } - - /* prepare polling thread */ - ovs_db_reconnect(pdb); - pdb->poll_thread.state = OVS_DB_POLL_STATE_RUNNING; - pthread_mutex_init(&pdb->poll_thread.mutex, NULL); - if (plugin_thread_create(&pdb->poll_thread.tid, NULL, ovs_poll_worker, pdb) != - 0) { - OVS_ERROR("pull worker start failed"); - goto failure; + /* set OVS DB mutex as recursive */ + if (pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE)) { + OVS_ERROR("Failed to set OVS DB mutex as recursive"); + pthread_mutexattr_destroy(&mutex_attr); + sfree(pdb); + return NULL; } - /* init OVS DB mutex */ - if (pthread_mutexattr_init(&mutex_attr) || - pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_RECURSIVE) || - pthread_mutex_init(&pdb->mutex, &mutex_attr)) { + if (pthread_mutex_init(&pdb->mutex, &mutex_attr)) { OVS_ERROR("OVS DB mutex init failed"); - goto failure; + pthread_mutexattr_destroy(&mutex_attr); + sfree(pdb); + return NULL; } + /* destroy mutex attributes */ + pthread_mutexattr_destroy(&mutex_attr); - /* return db to the caller */ - return pdb; + /* init event thread */ + if (ovs_db_event_thread_init(pdb) < 0) { + ovs_db_destroy(pdb); + return NULL; + } -failure: - if (pdb->conn.sock) - /* close connection */ - close(pdb->conn.sock); - if (pdb->event_thread.tid != 0) - /* stop event thread */ - if (ovs_db_event_thread_stop(pdb) < 0) - OVS_ERROR("stop event thread failed"); - if (pdb->poll_thread.tid != 0) - /* stop poll thread */ - if (ovs_db_poll_thread_stop(pdb) < 0) - OVS_ERROR("stop poll thread failed"); - sfree(pdb); - return NULL; + /* init polling thread */ + pdb->sock = -1; + if (ovs_db_poll_thread_init(pdb) < 0) { + ovs_db_destroy(pdb); + return NULL; + } + return pdb; } int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params, @@ -1012,16 +1061,16 @@ int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params, /* sanity check */ if (!pdb || !method || !params) - return (-1); + return -1; if ((jgen = yajl_gen_alloc(NULL)) == NULL) - return (-1); + return -1; /* try to parse params */ if ((jparams = yajl_tree_parse(params, NULL, 0)) == NULL) { OVS_ERROR("params is not a JSON string"); yajl_gen_clear(jgen); - return (-1); + return -1; } /* generate method field */ @@ -1038,14 +1087,14 @@ int ovs_db_send_request(ovs_db_t *pdb, const char *method, const char *params, /* generate id field */ OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, "id"); uid = ovs_uid_generate(); - ssnprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid); + snprintf(uid_buff, sizeof(uid_buff), "%" PRIX64, uid); OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_buff); OVS_YAJL_CALL(yajl_gen_map_close, jgen); if (cb) { /* register result callback */ - if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) + if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL) goto yajl_gen_failure; /* add new callback to front */ @@ -1100,14 +1149,17 @@ int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name, /* sanity check */ if (pdb == NULL || tb_name == NULL || update_cb == NULL) - return (-1); + return -1; - if ((jgen = yajl_gen_alloc(NULL)) == NULL) - return (-1); + /* allocate new update callback */ + if ((new_cb = calloc(1, sizeof(ovs_callback_t))) == NULL) + return -1; - /* register table update callback */ - if ((new_cb = malloc(sizeof(ovs_callback_t))) == NULL) - return (-1); + /* init YAJL generator */ + if ((jgen = yajl_gen_alloc(NULL)) == NULL) { + sfree(new_cb); + return -1; + } /* add new callback to front */ new_cb->table.call = update_cb; @@ -1121,7 +1173,7 @@ int ovs_db_table_cb_register(ovs_db_t *pdb, const char *tb_name, OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, OVS_DB_DEFAULT_DB_NAME); /* uid string */ - ssnprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid); + snprintf(uid_str, sizeof(uid_str), "%" PRIX64, new_cb->uid); OVS_YAJL_CALL(ovs_yajl_gen_tstring, jgen, uid_str); /* */ @@ -1190,22 +1242,22 @@ int ovs_db_destroy(ovs_db_t *pdb) { /* sanity check */ if (pdb == NULL) - return (-1); + return -1; /* try to lock the structure before releasing */ - if (ret = pthread_mutex_lock(&pdb->mutex)) { - OVS_ERROR("pthread_mutex_lock() DB mutext lock failed (%d)", ret); - return (-1); + if ((ret = pthread_mutex_lock(&pdb->mutex))) { + OVS_ERROR("pthread_mutex_lock() DB mutex lock failed (%d)", ret); + return -1; } /* stop poll thread */ - if (ovs_db_event_thread_stop(pdb) < 0) { - OVS_ERROR("stop poll thread failed"); + if (ovs_db_event_thread_destroy(pdb) < 0) { + OVS_ERROR("destroy poll thread failed"); ovs_db_ret = (-1); } /* stop event thread */ - if (ovs_db_poll_thread_stop(pdb) < 0) { + if (ovs_db_poll_thread_destroy(pdb) < 0) { OVS_ERROR("stop event thread failed"); ovs_db_ret = (-1); } @@ -1214,8 +1266,8 @@ int ovs_db_destroy(ovs_db_t *pdb) { ovs_db_callback_remove_all(pdb); /* close connection */ - if (pdb->conn.sock) - close(pdb->conn.sock); + if (pdb->sock >= 0) + close(pdb->sock); /* release DB handler */ pthread_mutex_unlock(&pdb->mutex); @@ -1228,7 +1280,14 @@ int ovs_db_destroy(ovs_db_t *pdb) { * Public OVS utils API implementation */ -/* Get YAJL value by key from YAJL dictionary */ +/* Get YAJL value by key from YAJL dictionary + * + * EXAMPLE: + * { + * "key_a" : + * "key_b" : + * } + */ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) { const char *obj_key = NULL; @@ -1237,7 +1296,7 @@ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) { return NULL; /* find a value by key */ - for (int i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) { + for (size_t i = 0; i < YAJL_GET_OBJECT(jval)->len; i++) { obj_key = YAJL_GET_OBJECT(jval)->keys[i]; if (strcmp(obj_key, key) == 0) return YAJL_GET_OBJECT(jval)->values[i]; @@ -1246,7 +1305,29 @@ yajl_val ovs_utils_get_value_by_key(yajl_val jval, const char *key) { return NULL; } -/* Get OVS DB map value by given map key */ +/* Get OVS DB map value by given map key + * + * FROM RFC7047: + * + * + * A 2-element JSON array that represents a pair within a database + * map. The first element is an that represents the key, and + * the second element is an that represents the value. + * + * + * A 2-element JSON array that represents a database map value. The + * first element of the array must be the string "map", and the + * second element must be an array of zero or more s giving the + * values in the map. All of the s must have the same key and + * value types. + * + * EXAMPLE: + * [ + * "map", [ + * [ "key_a", ], [ "key_b", ], ... + * ] + * ] + */ yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) { size_t map_len = 0; size_t array_len = 0; @@ -1273,7 +1354,7 @@ yajl_val ovs_utils_get_map_value(yajl_val jval, const char *key) { /* try to find map value by map key */ map_len = YAJL_GET_ARRAY(array_values[1])->len; map_values = YAJL_GET_ARRAY(array_values[1])->values; - for (int i = 0; i < map_len; i++) { + for (size_t i = 0; i < map_len; i++) { /* check YAJL array */ if (!YAJL_IS_ARRAY(map_values[i])) break;