ONOS-7188:Fix TCP Connection can not disconnect problem

Change-Id: Ibcdec4193b8facde3039eb1a28b55b1aeb89becf
diff --git a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
index 1b7ca82..0349f84 100644
--- a/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
+++ b/protocols/ovsdb/ctl/src/main/java/org/onosproject/ovsdb/controller/impl/OvsdbControllerImpl.java
@@ -67,6 +67,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -80,23 +82,18 @@
 
     public static final Logger log = LoggerFactory
             .getLogger(OvsdbControllerImpl.class);
-
+    private static final long DEFAULT_OVSDB_RPC_TIMEOUT = 3000;
+    private final Controller controller = new Controller();
     protected ConcurrentHashMap<OvsdbNodeId, OvsdbClientService> ovsdbClients =
             new ConcurrentHashMap<OvsdbNodeId, OvsdbClientService>();
-
     protected OvsdbAgent agent = new InternalOvsdbNodeAgent();
     protected InternalMonitorCallBack updateCallback = new InternalMonitorCallBack();
-
     protected Set<OvsdbNodeListener> ovsdbNodeListener = new CopyOnWriteArraySet<>();
     protected Set<OvsdbEventListener> ovsdbEventListener = new CopyOnWriteArraySet<>();
-
     protected ConcurrentHashMap<String, OvsdbClientService> requestNotification =
             new ConcurrentHashMap<String, OvsdbClientService>();
-
     protected ConcurrentHashMap<String, String> requestDbName = new ConcurrentHashMap<String, String>();
 
-    private final Controller controller = new Controller();
-
     @Activate
     public void activate(ComponentContext context) {
         controller.start(agent, updateCallback);
@@ -154,64 +151,6 @@
     }
 
     /**
-     * Implementation of an Ovsdb Agent which is responsible for keeping track
-     * of connected node and the state in which they are.
-     */
-    private class InternalOvsdbNodeAgent implements OvsdbAgent {
-        @Override
-        public void addConnectedNode(OvsdbNodeId nodeId,
-                                     OvsdbClientService ovsdbClient) {
-
-            if (ovsdbClients.get(nodeId) != null) {
-                return;
-            } else {
-                ovsdbClients.put(nodeId, ovsdbClient);
-
-                try {
-                    List<String> dbNames = ovsdbClient.listDbs().get();
-                    for (String dbName : dbNames) {
-                        DatabaseSchema dbSchema;
-                        dbSchema = ovsdbClient.getOvsdbSchema(dbName).get();
-
-                        log.debug("Begin to monitor tables");
-                        String id = java.util.UUID.randomUUID().toString();
-                        TableUpdates updates = ovsdbClient
-                                .monitorTables(dbName, id).get();
-
-                        requestDbName.put(id, dbName);
-                        requestNotification.put(id, ovsdbClient);
-
-                        if (updates != null) {
-                            processTableUpdates(ovsdbClient, updates,
-                                                dbSchema.name());
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    log.warn("Interrupted while waiting to get message from ovsdb");
-                    Thread.currentThread().interrupt();
-                } catch (ExecutionException e) {
-                    log.error("Exception thrown while to get message from ovsdb");
-                }
-
-                log.debug("Add node to north");
-                for (OvsdbNodeListener l : ovsdbNodeListener) {
-                    l.nodeAdded(nodeId);
-                }
-                return;
-            }
-        }
-
-        @Override
-        public void removeConnectedNode(OvsdbNodeId nodeId) {
-            ovsdbClients.remove(nodeId);
-            log.debug("Node connection is removed");
-            for (OvsdbNodeListener l : ovsdbNodeListener) {
-                l.nodeRemoved(nodeId);
-            }
-        }
-    }
-
-    /**
      * Processes table updates.
      *
      * @param clientService OvsdbClientService instance
@@ -387,6 +326,73 @@
     }
 
     /**
+     * Implementation of an Ovsdb Agent which is responsible for keeping track
+     * of connected node and the state in which they are.
+     */
+    private class InternalOvsdbNodeAgent implements OvsdbAgent {
+        @Override
+        public void addConnectedNode(OvsdbNodeId nodeId,
+                                     OvsdbClientService ovsdbClient) {
+
+            if (ovsdbClients.get(nodeId) != null) {
+                ovsdbClient.disconnect();
+                return;
+            } else {
+
+                try {
+                    List<String> dbNames = ovsdbClient.listDbs().get(DEFAULT_OVSDB_RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+                    for (String dbName : dbNames) {
+                        DatabaseSchema dbSchema;
+                        dbSchema = ovsdbClient.getOvsdbSchema(dbName)
+                                .get(DEFAULT_OVSDB_RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+
+                        log.debug("Begin to monitor tables");
+                        String id = java.util.UUID.randomUUID().toString();
+                        TableUpdates updates = ovsdbClient
+                                .monitorTables(dbName, id).get(DEFAULT_OVSDB_RPC_TIMEOUT, TimeUnit.MILLISECONDS);
+
+                        requestDbName.put(id, dbName);
+                        requestNotification.put(id, ovsdbClient);
+
+                        if (updates != null) {
+                            processTableUpdates(ovsdbClient, updates,
+                                                dbSchema.name());
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted while waiting to get message from ovsdb");
+                    Thread.currentThread().interrupt();
+                    return;
+                } catch (ExecutionException e) {
+                    log.error("Exception thrown while to get message from ovsdb");
+                    ovsdbClient.disconnect();
+                    return;
+                } catch (TimeoutException e) {
+                    log.error("TimeoutException thrown while to get message from ovsdb");
+                    ovsdbClient.disconnect();
+                    return;
+                }
+                ovsdbClients.put(nodeId, ovsdbClient);
+
+                log.debug("Add node to north");
+                for (OvsdbNodeListener l : ovsdbNodeListener) {
+                    l.nodeAdded(nodeId);
+                }
+                return;
+            }
+        }
+
+        @Override
+        public void removeConnectedNode(OvsdbNodeId nodeId) {
+            ovsdbClients.remove(nodeId);
+            log.debug("Node connection is removed");
+            for (OvsdbNodeListener l : ovsdbNodeListener) {
+                l.nodeRemoved(nodeId);
+            }
+        }
+    }
+
+    /**
      * Implementation of an Callback which is responsible for receiving request
      * infomation from ovsdb.
      */