Bug fix for NETCONF controller - disconnect

Change-Id: I1f01f5b7e21e2e9c14358b4686077896ae4975e8
diff --git a/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java b/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
index 4df22ec..233e537 100644
--- a/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
+++ b/drivers/microsemi/src/main/java/org/onosproject/drivers/microsemi/EA1000FlowRuleProgrammable.java
@@ -185,6 +185,7 @@
             } else {
                 log.error("Unexpected error on SA Filt getFlowEntries on {}",
                         handler().data().deviceId(), e);
+                return flowEntryCollection;
             }
         }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
index 428c9d2..2785fa3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfControllerImpl.java
@@ -80,10 +80,16 @@
             label = "Time (in seconds) waiting for a NetConf reply")
     protected static int netconfReplyTimeout = DEFAULT_REPLY_TIMEOUT_SECONDS;
 
+    private static final String PROP_NETCONF_IDLE_TIMEOUT = "netconfIdleTimeout";
+    private static final int DEFAULT_IDLE_TIMEOUT_SECONDS = 300;
+    @Property(name = PROP_NETCONF_IDLE_TIMEOUT, intValue = DEFAULT_IDLE_TIMEOUT_SECONDS,
+            label = "Time (in seconds) SSH session will close if no traffic seen")
+    protected static int netconfIdleTimeout = DEFAULT_IDLE_TIMEOUT_SECONDS;
+
     private static final String SSH_LIBRARY = "sshLibrary";
     private static final String APACHE_MINA = "apache_mina";
     @Property(name = SSH_LIBRARY, value = APACHE_MINA,
-            label = "Ssh Llbrary instead of Apache Mina (i.e. ethz-ssh2")
+            label = "Ssh Library instead of apache_mina (i.e. ethz-ssh2")
     protected static String sshLibrary = APACHE_MINA;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -142,6 +148,7 @@
 
         int newNetconfReplyTimeout;
         int newNetconfConnectTimeout;
+        int newNetconfIdleTimeout;
         String newSshLibrary;
         try {
             String s = get(properties, PROP_NETCONF_REPLY_TIMEOUT);
@@ -152,6 +159,10 @@
             newNetconfConnectTimeout = isNullOrEmpty(s) ?
                     netconfConnectTimeout : Integer.parseInt(s.trim());
 
+            s = get(properties, PROP_NETCONF_IDLE_TIMEOUT);
+            newNetconfIdleTimeout = isNullOrEmpty(s) ?
+                    netconfIdleTimeout : Integer.parseInt(s.trim());
+
             newSshLibrary = get(properties, SSH_LIBRARY);
 
         } catch (NumberFormatException e) {
@@ -165,14 +176,19 @@
         } else if (newNetconfReplyTimeout <= 0) {
             log.warn("netconfReplyTimeout is invalid - 0 or less.");
             return;
+        } else if (newNetconfIdleTimeout <= 0) {
+            log.warn("netconfIdleTimeout is invalid - 0 or less.");
+            return;
         }
 
         netconfReplyTimeout = newNetconfReplyTimeout;
         netconfConnectTimeout = newNetconfConnectTimeout;
+        netconfIdleTimeout = newNetconfIdleTimeout;
         sshLibrary = newSshLibrary;
         log.info("Settings: {} = {}, {} = {}, {} = {}",
                  PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout,
                  PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout,
+                 PROP_NETCONF_IDLE_TIMEOUT, netconfIdleTimeout,
                  SSH_LIBRARY, sshLibrary);
     }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
index 00fe4dc..8303868 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
@@ -385,8 +385,27 @@
         try {
             rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
             replies.remove(messageId);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new NetconfException("No matching reply for request " + request, e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new NetconfException("Interrupted waiting for reply for request" + request, e);
+        } catch (TimeoutException e) {
+            throw new NetconfException("Timed out waiting for reply for request " + request, e);
+        } catch (ExecutionException e) {
+            log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e);
+
+            netconfConnection.close(); //Closes the socket which should interrupt NetconfStreamThread
+            sshSession.close();
+
+            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                    NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+                    null, "Closed due to unexpected error " + e.getCause(),
+                    Optional.of(-1), deviceInfo);
+            publishEvent(event);
+            replies.clear();
+            errorReplies.clear();
+
+            throw new NetconfException("Closing session " + sessionID + " for " + deviceInfo +
+                    " for request " + request, e);
         }
         log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
         return rp.trim();
@@ -724,6 +743,14 @@
         return false;
     }
 
+    protected void publishEvent(NetconfDeviceOutputEvent event) {
+        primaryListeners.forEach(lsnr -> {
+            if (lsnr.isRelevant(event)) {
+                lsnr.event(event);
+            }
+        });
+    }
+
     static class NotificationSession extends NetconfSessionImpl {
 
         private String notificationFilter;
@@ -767,11 +794,7 @@
 
         @Override
         public void event(NetconfDeviceOutputEvent event) {
-            primaryListeners.forEach(lsnr -> {
-                if (lsnr.isRelevant(event)) {
-                    lsnr.event(event);
-                }
-            });
+            publishEvent(event);
         }
     }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
index 94a14e5..6938661 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionMinaImpl.java
@@ -25,6 +25,7 @@
 import org.apache.sshd.client.future.ConnectFuture;
 import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
 import org.onosproject.netconf.DatastoreId;
 import org.onosproject.netconf.NetconfDeviceInfo;
@@ -126,7 +127,7 @@
     private final Set<String> deviceCapabilities = new LinkedHashSet<>();
     private NetconfStreamHandler streamHandler;
     private Map<Integer, CompletableFuture<String>> replies;
-    private List<String> errorReplies;
+    private List<String> errorReplies; // Not sure why we need this?
     private boolean subscriptionConnected = false;
     private String notificationFilterSchema = null;
 
@@ -158,6 +159,11 @@
 
     private void startClient() throws IOException {
         client = SshClient.setUpDefaultClient();
+        int replyTimeoutSec = NetconfControllerImpl.netconfIdleTimeout;
+        client.getProperties().putIfAbsent(FactoryManager.IDLE_TIMEOUT,
+                TimeUnit.SECONDS.toMillis(replyTimeoutSec));
+        client.getProperties().putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT,
+                TimeUnit.SECONDS.toMillis(replyTimeoutSec + 15L));
         client.start();
         client.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
         startSession();
@@ -372,7 +378,7 @@
                 subscriptionConnected = false;
                 startSubscription(notificationFilterSchema);
             }
-        } catch (IOException e) {
+        } catch (IOException | IllegalStateException e) {
             log.error("Can't reopen connection for device {}", e.getMessage());
             throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
         }
@@ -420,9 +426,31 @@
         String rp;
         try {
             rp = futureReply.get(replyTimeout, TimeUnit.SECONDS);
-            replies.remove(messageId);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new NetconfException("No matching reply for request " + request, e);
+            replies.remove(messageId); // Why here???
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new NetconfException("Interrupted waiting for reply for request" + request, e);
+        } catch (TimeoutException e) {
+            throw new NetconfException("Timed out waiting for reply for request " + request, e);
+        } catch (ExecutionException e) {
+            log.warn("Closing session {} for {} due to unexpected Error", sessionID, deviceInfo, e);
+            try {
+                session.close();
+                channel.close(); //Closes the socket which should interrupt NetconfStreamThread
+                client.close();
+            } catch (IOException ioe) {
+                log.warn("Error closing session {} on {}", sessionID, deviceInfo, ioe);
+            }
+            NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
+                    NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
+                    null, "Closed due to unexpected error " + e.getCause(),
+                    Optional.of(-1), deviceInfo);
+            publishEvent(event);
+            errorReplies.clear(); // move to cleanUp()?
+            cleanUp();
+
+            throw new NetconfException("Closing session " + sessionID + " for " + deviceInfo +
+                    " for request " + request, e);
         }
         log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
         return rp.trim();
@@ -763,6 +791,14 @@
         return false;
     }
 
+    protected void publishEvent(NetconfDeviceOutputEvent event) {
+        primaryListeners.forEach(lsnr -> {
+            if (lsnr.isRelevant(event)) {
+                lsnr.event(event);
+            }
+        });
+    }
+
     static class NotificationSession extends NetconfSessionMinaImpl {
 
         private String notificationFilter;
@@ -806,11 +842,7 @@
 
         @Override
         public void event(NetconfDeviceOutputEvent event) {
-            primaryListeners.forEach(lsnr -> {
-                if (lsnr.isRelevant(event)) {
-                    lsnr.event(event);
-                }
-            });
+            publishEvent(event);
         }
     }
 
@@ -828,7 +860,7 @@
                 return;
             }
             CompletableFuture<String> completedReply =
-                    replies.get(messageId.get());
+                    replies.get(messageId.get()); // remove(..)?
             if (completedReply != null) {
                 completedReply.complete(event.getMessagePayload());
             }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
index 44206e3..6b231b3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfStreamThread.java
@@ -16,7 +16,6 @@
 
 package org.onosproject.netconf.ctl.impl;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
@@ -30,7 +29,8 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.PrintWriter;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -55,7 +55,7 @@
     private static final String MESSAGE_ID = "message-id=";
     private static final Pattern MSGID_PATTERN = Pattern.compile(MESSAGE_ID + "\"(\\d+)\"");
 
-    private PrintWriter outputStream;
+    private OutputStreamWriter outputStream;
     private final InputStream err;
     private final InputStream in;
     private NetconfDeviceInfo netconfDeviceInfo;
@@ -72,7 +72,7 @@
                                Map<Integer, CompletableFuture<String>> replies) {
         this.in = in;
         this.err = err;
-        outputStream = new PrintWriter(out);
+        outputStream = new OutputStreamWriter(out, StandardCharsets.UTF_8);
         netconfDeviceInfo = deviceInfo;
         state = NetconfMessageState.NO_MATCHING_PATTERN;
         sessionDelegate = delegate;
@@ -94,8 +94,13 @@
         replies.put(messageId, cf);
 
         synchronized (outputStream) {
-            outputStream.print(request);
-            outputStream.flush();
+            try {
+                outputStream.write(request);
+                outputStream.flush();
+            } catch (IOException e) {
+                log.error("Writing to {} failed", netconfDeviceInfo, e);
+                cf.completeExceptionally(e);
+            }
         }
 
         return cf;
@@ -251,9 +256,11 @@
     protected static Optional<Integer> getMsgId(String reply) {
         Matcher matcher = MSGID_PATTERN.matcher(reply);
         if (matcher.find()) {
-            Integer messageId = Integer.parseInt(matcher.group(1));
-            Preconditions.checkNotNull(messageId, "Error in retrieving the message id");
-            return Optional.of(messageId);
+            try {
+                return Optional.of(Integer.valueOf(matcher.group(1)));
+            } catch (NumberFormatException e) {
+                log.warn("Failed to parse message-id from {}", matcher.group(), e);
+            }
         }
         if (reply.contains(HELLO)) {
             return Optional.of(-1);