OVS link plugin: Add connection terminate callback
authorMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Tue, 20 Sep 2016 12:36:08 +0000 (13:36 +0100)
committerMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Mon, 26 Dec 2016 13:26:05 +0000 (13:26 +0000)
Extend OVS utils to notify OVS plugin about OVS DB
connection lost. If the connection is lost, OVS link plugin
will dispatch notification and print error message. OVS plugin
treats status of all interfaces as UNKNOWN.

Change-Id: I22fe3cb0740e0f4779a5c4f6b92e78f1ad9777a3
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
src/collectd.conf.pod
src/ovs_link.c
src/utils_ovs.c
src/utils_ovs.h

index 3c8af40..3d69dcf 100644 (file)
@@ -5505,8 +5505,8 @@ Default: empty (all interfaces on all bridges are monitored)
 
 =item B<SendNotification> I<true|false>
 
-If set to true, OVS interface link status notification is sent to collectd.
-Default value is false.
+If set to true, OVS link notifications (interface status and OVS DB connection
+terminate) are sent to collectd. Default value is false.
 
 =back
 
index e298ae9..1faa8ea 100644 (file)
@@ -58,6 +58,7 @@ typedef struct ovs_link_config_s ovs_link_config_t;
 /* OVS link context type */
 struct ovs_link_ctx_s {
   pthread_mutex_t mutex;        /* mutex to lock the context */
+  pthread_mutexattr_t mutex_attr;       /* context mutex attribute */
   ovs_db_t *ovs_db;             /* pointer to OVS DB instance */
   ovs_link_config_t config;     /* plugin config */
   ovs_link_interface_info_t *ifaces;    /* interface info */
@@ -245,7 +246,7 @@ ovs_link_link_status_submit(const char *link_name,
   values[0].gauge = (gauge_t) link_status;
   vl.time = cdtime();
   vl.values = values;
-  vl.values_len = sizeof(values) / sizeof(values[0]);
+  vl.values_len = STATIC_ARRAY_SIZE(values);
   sstrncpy(vl.host, hostname_g, sizeof(vl.host));
   sstrncpy(vl.plugin, OVS_LINK_PLUGIN, sizeof(vl.plugin));
   sstrncpy(vl.plugin_instance, link_name, sizeof(vl.plugin_instance));
@@ -254,6 +255,17 @@ ovs_link_link_status_submit(const char *link_name,
   plugin_dispatch_values(&vl);
 }
 
+/* Dispatch OVS DB terminate connection event to collectd */
+static void
+ovs_link_dispatch_terminate_notification(const char *msg)
+{
+  notification_t n = {NOTIF_FAILURE, cdtime(), "", "", OVS_LINK_PLUGIN,
+                      "", "", "", NULL};
+  ssnprintf(n.message, sizeof(n.message), msg);
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  plugin_dispatch_notification(&n);
+}
+
 /* Process OVS DB update table event. It handles link status update event(s)
  * and dispatches the value(s) to collectd if interface name matches one of
  * interfaces specified in configuration file.
@@ -348,10 +360,10 @@ ovs_link_table_result_cb(yajl_val jresult, yajl_val jerror)
 }
 
 /* Setup OVS DB table callback. It subscribes to OVS DB 'Interface' table
- * to receive link status events.
+ * to receive link status event(s).
  */
 static void
-ovs_link_initialize(ovs_db_t *pdb)
+ovs_link_conn_initialize(ovs_db_t *pdb)
 {
   int ret = 0;
   const char tb_name[] = "Interface";
@@ -371,6 +383,22 @@ ovs_link_initialize(ovs_db_t *pdb)
   DEBUG(OVS_LINK_PLUGIN ": OVS DB has been initialized");
 }
 
+/* OVS DB terminate connection notification callback */
+static void
+ovs_link_conn_terminate()
+{
+  const char msg[] = "OVS DB connection has been lost";
+  if (ovs_link_ctx.config.send_notification)
+    ovs_link_dispatch_terminate_notification(msg);
+  WARNIG(OVS_LINK_PLUGIN ": %s", msg);
+  OVS_LINK_CTX_LOCK {
+    /* update link status to UNKNOWN */
+    for (ovs_link_interface_info_t *iface = ovs_link_ctx.ifaces; iface;
+         iface = iface->next)
+      ovs_link_link_status_update(iface->name, UNKNOWN);
+  }
+}
+
 /* Read OVS link status plugin callback */
 static int
 ovs_link_plugin_read(user_data_t *ud)
@@ -390,15 +418,27 @@ static int
 ovs_link_plugin_init(void)
 {
   ovs_db_t *ovs_db = NULL;
-  ovs_db_callback_t cb = {.init_cb = ovs_link_initialize};
+  ovs_db_callback_t cb = {.post_conn_init = ovs_link_conn_initialize,
+                          .post_conn_terminate = ovs_link_conn_terminate};
+
+  /* Initialize the context mutex */
+  if (pthread_mutexattr_init(&ovs_link_ctx.mutex_attr) != 0) {
+    ERROR(OVS_LINK_PLUGIN ": init context mutex attribute failed");
+    return (-1);
+  }
+  pthread_mutexattr_settype(&ovs_link_ctx.mutex_attr,
+                            PTHREAD_MUTEX_RECURSIVE);
+  if (pthread_mutex_init(&ovs_link_ctx.mutex, &ovs_link_ctx.mutex_attr) != 0) {
+    ERROR(OVS_LINK_PLUGIN ": init context mutex failed");
+    goto ovs_link_failure;
+  }
 
   /* set default OVS DB url */
   if (ovs_link_ctx.config.ovs_db_server_url == NULL)
     if ((ovs_link_ctx.config.ovs_db_server_url =
          strdup(OVS_LINK_DEFAULT_OVS_DB_SERVER_URL)) == NULL) {
       ERROR(OVS_LINK_PLUGIN ": fail to set default OVS DB URL");
-      ovs_link_config_free();
-      return (-1);
+      goto ovs_link_failure;
     }
   DEBUG(OVS_LINK_PLUGIN ": OVS DB url = %s",
         ovs_link_ctx.config.ovs_db_server_url);
@@ -407,8 +447,7 @@ ovs_link_plugin_init(void)
   ovs_db = ovs_db_init(ovs_link_ctx.config.ovs_db_server_url, &cb);
   if (ovs_db == NULL) {
     ERROR(OVS_LINK_PLUGIN ": fail to connect to OVS DB server");
-    ovs_link_config_free();
-    return (-1);
+    goto ovs_link_failure;
   }
 
   /* store OVS DB handler */
@@ -418,6 +457,15 @@ ovs_link_plugin_init(void)
 
   DEBUG(OVS_LINK_PLUGIN ": plugin has been initialized");
   return (0);
+
+ovs_link_failure:
+  ERROR(OVS_LINK_PLUGIN ": plugin initialize failed");
+  /* release allocated memory */
+  ovs_link_config_free();
+  /* destroy context mutex */
+  pthread_mutexattr_destroy(&ovs_link_ctx.mutex_attr);
+  pthread_mutex_destroy(&ovs_link_ctx.mutex);
+  return (-1);
 }
 
 /* Shutdown OVS plugin */
@@ -431,6 +479,10 @@ ovs_link_plugin_shutdown(void)
   if (ovs_db_destroy(ovs_link_ctx.ovs_db))
     ERROR(OVS_LINK_PLUGIN ": OVSDB object destroy failed");
 
+  /* destroy context mutex */
+  pthread_mutexattr_destroy(&ovs_link_ctx.mutex_attr);
+  pthread_mutex_destroy(&ovs_link_ctx.mutex);
+
   DEBUG(OVS_LINK_PLUGIN ": plugin has been destroyed");
   return (0);
 }
index 4c99eef..87d2ca1 100644 (file)
@@ -86,7 +86,8 @@
 
 #define OVS_DB_EVENT_TIMEOUT         5  /* event thread timeout (sec) */
 #define OVS_DB_EVENT_TERMINATE       1
-#define OVS_DB_EVENT_CONNECTED       2
+#define OVS_DB_EVENT_CONN_ESTABLISHED     2
+#define OVS_DB_EVENT_CONN_TERMINATED      3
 
 #define OVS_DB_POLL_STATE_RUNNING    1
 #define OVS_DB_POLL_STATE_EXITING    2
@@ -172,16 +173,17 @@ struct ovs_db_s {
   ovs_poll_thread_t poll_thread;
   ovs_event_thread_t event_thread;
   pthread_mutex_t mutex;
-  ovs_callback_t *cb;
+  ovs_callback_t *remote_cb;
+  ovs_db_callback_t cb;
   ovs_conn_t conn;
-  ovs_db_init_cb_t init_cb;
 };
 typedef struct ovs_db_s ovs_db_t;
 
 /* Post an event to event thread.
  * Possible events are:
  *  OVS_DB_EVENT_TERMINATE
- *  OVS_DB_EVENT_CONNECTED
+ *  OVS_DB_EVENT_CONN_ESTABLISHED
+ *  OVS_DB_EVENT_CONN_TERMINATED
  */
 static void
 ovs_db_event_post(ovs_db_t *pdb, int event)
@@ -233,11 +235,11 @@ static void
 ovs_db_callback_add(ovs_db_t *pdb, ovs_callback_t *new_cb)
 {
   pthread_mutex_lock(&pdb->mutex);
-  if (pdb->cb)
-    pdb->cb->prev = new_cb;
-  new_cb->next = pdb->cb;
+  if (pdb->remote_cb)
+    pdb->remote_cb->prev = new_cb;
+  new_cb->next = pdb->remote_cb;
   new_cb->prev = NULL;
-  pdb->cb = new_cb;
+  pdb->remote_cb = new_cb;
   pthread_mutex_unlock(&pdb->mutex);
 }
 
@@ -255,7 +257,7 @@ ovs_db_callback_remove(ovs_db_t *pdb, ovs_callback_t *del_cb)
   if (pre_cb)
     pre_cb->next = del_cb->next;
   else
-    pdb->cb = del_cb->next;
+    pdb->remote_cb = del_cb->next;
 
   free(del_cb);
   pthread_mutex_unlock(&pdb->mutex);
@@ -266,11 +268,12 @@ static void
 ovs_db_callback_remove_all(ovs_db_t *pdb)
 {
   pthread_mutex_lock(&pdb->mutex);
-  for (ovs_callback_t *del_cb = pdb->cb; pdb->cb; del_cb = pdb->cb) {
-    pdb->cb = pdb->cb->next;
+  for (ovs_callback_t *del_cb = pdb->remote_cb; pdb->remote_cb;
+       del_cb = pdb->remote_cb) {
+    pdb->remote_cb = pdb->remote_cb->next;
     free(del_cb);
   }
-  pdb->cb = NULL;
+  pdb->remote_cb = NULL;
   pthread_mutex_unlock(&pdb->mutex);
 }
 
@@ -280,7 +283,7 @@ static ovs_callback_t *
 ovs_db_callback_get(ovs_db_t *pdb, uint64_t uid)
 {
   pthread_mutex_lock(&pdb->mutex);
-  for (ovs_callback_t *cb = pdb->cb; cb != NULL; cb = cb->next)
+  for (ovs_callback_t *cb = pdb->remote_cb; cb != NULL; cb = cb->next)
     if (cb->uid == uid) {
       pthread_mutex_unlock(&pdb->mutex);
       return cb;
@@ -814,7 +817,7 @@ ovs_db_reconnect(ovs_db_t *pdb)
   }
 
   /* send notification to event thread */
-  ovs_db_event_post(pdb, OVS_DB_EVENT_CONNECTED);
+  ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_ESTABLISHED);
   return (0);
 }
 
@@ -860,6 +863,7 @@ ovs_poll_worker(void *arg)
       } else if ((poll_fd.revents & POLLERR) || (poll_fd.revents & POLLHUP)) {
         /* connection is broken */
         OVS_ERROR("poll() peer closed its end of the channel");
+        ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
         close(poll_fd.fd);
       } else if ((poll_fd.revents & POLLIN) || (poll_fd.revents & POLLPRI)) {
         /* read incoming data */
@@ -872,6 +876,7 @@ ovs_poll_worker(void *arg)
             ovs_db_json_data_process(pdb, json, json_len);
         } else if (nbytes == 0) {
           OVS_ERROR("recv() peer has performed an orderly shutdown");
+          ovs_db_event_post(pdb, OVS_DB_EVENT_CONN_TERMINATED);
           close(poll_fd.fd);
         } else {
           OVS_ERROR("recv() receive data error");
@@ -914,9 +919,19 @@ ovs_event_worker(void *arg)
     if (!ret) {
       /* handle the event */
       OVS_DEBUG("handle event %d", pdb->event_thread.value);
-      if (pdb->event_thread.value == OVS_DB_EVENT_CONNECTED)
-        if (pdb->init_cb)
-          pdb->init_cb(pdb);
+      switch (pdb->event_thread.value) {
+      case OVS_DB_EVENT_CONN_ESTABLISHED:
+        if (pdb->cb.post_conn_init)
+          pdb->cb.post_conn_init(pdb);
+        break;
+      case OVS_DB_EVENT_CONN_TERMINATED:
+        if (pdb->cb.post_conn_terminate)
+          pdb->cb.post_conn_terminate();
+        break;
+      default:
+        OVS_DEBUG("unknown event received");
+        break;
+      }
     } else if (ret == ETIMEDOUT) {
       /* wait timeout */
       OVS_DEBUG("no event received (timeout)");
@@ -977,7 +992,7 @@ ovs_db_init(const char *surl, ovs_db_callback_t *cb)
 
   /* setup OVS DB callbacks */
   if (cb)
-    pdb->init_cb = cb->init_cb;
+    pdb->cb = *cb;
 
   /* prepare event thread */
   pthread_cond_init(&pdb->event_thread.cond, NULL);
index 484260f..66868cd 100644 (file)
 typedef struct ovs_db_s ovs_db_t;
 
 /* OVS DB callback type declaration */
-typedef void (*ovs_db_init_cb_t) (ovs_db_t *pdb);
 typedef void (*ovs_db_table_cb_t) (yajl_val jupdates);
 typedef void (*ovs_db_result_cb_t) (yajl_val jresult, yajl_val jerror);
 
 /* OVS DB structures */
 struct ovs_db_callback_s {
-  ovs_db_init_cb_t init_cb;
+  /*
+   * This callback is called when OVS DB connection
+   * has been established and ready to use. Client
+   * can use this callback to configure OVS DB, e.g.
+   * to subscribe to table update notification or poll
+   * some OVS DB data. This field can be NULL.
+   */
+  void (*post_conn_init) (ovs_db_t *pdb);
+  /*
+   * This callback is called when OVD DB connection
+   * has been lost. This field can be NULL.
+   */
+  void (*post_conn_terminate) (void);
 };
 typedef struct ovs_db_callback_s ovs_db_callback_t;