Bug fix for NETCONF controller - disconnect

Change-Id: I1f01f5b7e21e2e9c14358b4686077896ae4975e8
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 94a14e5..6938661 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -25,6 +25,7 @@
 import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.onosproject.netconf.DatastoreId;
 import org.onosproject.netconf.NetconfDeviceInfo;
@@ -126,7 +127,7 @@
     private final Set<String> deviceCapabilities = new LinkedHashSet<>();
     private NetconfStreamHandler streamHandler;
     private Map<Integer, CompletableFuture<String>> replies;
-    private List<String> errorReplies;
+    private List<String> errorReplies; // Not sure why we need this?
     private boolean subscriptionConnected = false;
     private String notificationFilterSchema = null;
 
@@ -158,6 +159,11 @@
 
     private void startClient() throws IOException {
         client = SshClient.setUpDefaultClient();
+        int replyTimeoutSec = NetconfControllerImpl.netconfIdleTimeout;
+        client.getProperties().putIfAbsent(FactoryManager.IDLE_TIMEOUT,
+                TimeUnit.SECONDS.toMillis(replyTimeoutSec));
+        client.getProperties().putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT,
+                TimeUnit.SECONDS.toMillis(replyTimeoutSec + 15L));
         client.start();
         client.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
         startSession();
@@ -372,7 +378,7 @@
                 subscriptionConnected = false;
                 startSubscription(notificationFilterSchema);
             }
-        } catch (IOException e) {
+        } catch (IOException | IllegalStateException e) {
             log.error("Can't reopen connection for device {}", e.getMessage());
             throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
         }
@@ -420,9 +426,31 @@
         String rp;
         try {
             rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
-            replies.remove(messageId);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new NetconfException("No matching reply for request " + request, e);
+            replies.remove(messageId); // Why here???
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new NetconfException("Interrupted waiting for reply for request" + request, e);
+        } catch (TimeoutException e) {
+            throw new NetconfException("Timed out waiting for reply for request " + request, e);
+        } catch (ExecutionException e) {
+            log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e);
+            try {
+                session.close();
+                channel.close(); //Closes the socket which should interrupt NetconfStreamThread
+                client.close();
+            } catch (IOException ioe) {
+                log.warn("Error closing session {} on {}", sessionID, deviceInfo, ioe);
+            }
+            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                    NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+                    null, "Closed due to unexpected error " + e.getCause(),
+                    Optional.of(-1), deviceInfo);
+            publishEvent(event);
+            errorReplies.clear(); // move to cleanUp()?
+            cleanUp();
+
+            throw new NetconfException("Closing session " + sessionID + " for " + deviceInfo +
+                    " for request " + request, e);
         }
         log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
         return rp.trim();
@@ -763,6 +791,14 @@
         return false;
     }
 
+    protected void publishEvent(NetconfDeviceOutputEvent event) {
+        primaryListeners.forEach(lsnr -> {
+            if (lsnr.isRelevant(event)) {
+                lsnr.event(event);
+            }
+        });
+    }
+
     static class NotificationSession extends NetconfSessionMinaImpl {
 
         private String notificationFilter;
@@ -806,11 +842,7 @@
 
         @Override
         public void event(NetconfDeviceOutputEvent event) {
-            primaryListeners.forEach(lsnr -> {
-                if (lsnr.isRelevant(event)) {
-                    lsnr.event(event);
-                }
-            });
+            publishEvent(event);
         }
     }
 
@@ -828,7 +860,7 @@
                 return;
             }
             CompletableFuture<String> completedReply =
-                    replies.get(messageId.get());
+                    replies.get(messageId.get()); // remove(..)?
             if (completedReply != null) {
                 completedReply.complete(event.getMessagePayload());
             }