[ONOS-6236] Adding retry mechanism when NETCONF device sends socket closed signal

Change-Id: Ie48e94f92ce745f3f65a352d80b7a74c2eceba04
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
index afe791c..59ed7d4 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfControllerImpl.java
@@ -52,9 +52,12 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
 
 /**
  * The implementation of NetconfController.
@@ -93,6 +96,10 @@
     protected Set<NetconfDeviceListener> netconfDeviceListeners = new CopyOnWriteArraySet<>();
     protected NetconfDeviceFactory deviceFactory = new DefaultNetconfDeviceFactory();
 
+    protected final ExecutorService executor =
+            Executors.newCachedThreadPool(groupedThreads("onos/netconfdevicecontroller",
+                                                           "connection-reopen-%d", log));
+
     @Activate
     public void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
@@ -145,7 +152,7 @@
         netconfReplyTimeout = newNetconfReplyTimeout;
         netconfConnectTimeout = newNetconfConnectTimeout;
         log.info("Settings: {} = {}, {} = {}",
-                PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
+                 PROP_NETCONF_REPLY_TIMEOUT, netconfReplyTimeout, PROP_NETCONF_CONNECT_TIMEOUT, netconfConnectTimeout);
     }
 
     @Override
@@ -195,7 +202,7 @@
                     port = Integer.parseInt(info[2]);
                 } else {
                     ip = Arrays.asList(info).stream().filter(el -> !el.equals(info[0])
-                    && !el.equals(info[info.length - 1]))
+                            && !el.equals(info[info.length - 1]))
                             .reduce((t, u) -> t + ":" + u)
                             .get();
                     log.debug("ip v6 {}", ip);
@@ -204,26 +211,26 @@
             }
             try {
                 DeviceKey deviceKey = deviceKeyService.getDeviceKey(
-                                DeviceKeyId.deviceKeyId(deviceId.toString()));
+                        DeviceKeyId.deviceKeyId(deviceId.toString()));
                 NetconfDeviceInfo deviceInfo = null;
                 if (deviceKey.type() == DeviceKey.Type.USERNAME_PASSWORD) {
                     UsernamePassword usernamepasswd = deviceKey.asUsernamePassword();
 
                     deviceInfo = new NetconfDeviceInfo(usernamepasswd.username(),
-                            usernamepasswd.password(),
-                            IpAddress.valueOf(ip),
-                            port);
+                                                       usernamepasswd.password(),
+                                                       IpAddress.valueOf(ip),
+                                                       port);
 
                 } else if (deviceKey.type() == DeviceKey.Type.SSL_KEY) {
                     String username = deviceKey.annotations().value(AnnotationKeys.USERNAME);
                     String password = deviceKey.annotations().value(AnnotationKeys.PASSWORD);
-                    String sshkey =   deviceKey.annotations().value(AnnotationKeys.SSHKEY);
+                    String sshkey = deviceKey.annotations().value(AnnotationKeys.SSHKEY);
 
                     deviceInfo = new NetconfDeviceInfo(username,
-                            password,
-                            IpAddress.valueOf(ip),
-                            port,
-                            sshkey);
+                                                       password,
+                                                       IpAddress.valueOf(ip),
+                                                       port,
+                                                       sshkey);
                 } else {
                     log.error("Unknown device key for device {}", deviceId);
                 }
@@ -291,7 +298,6 @@
     }
 
 
-
     //Device factory for the specific NetconfDeviceImpl
     private class DefaultNetconfDeviceFactory implements NetconfDeviceFactory {
 
@@ -307,8 +313,23 @@
 
         @Override
         public void event(NetconfDeviceOutputEvent event) {
+            DeviceId did = event.getDeviceInfo().getDeviceId();
             if (event.type().equals(NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED)) {
-                removeDevice(event.getDeviceInfo().getDeviceId());
+                removeDevice(did);
+            } else if (event.type().equals(NetconfDeviceOutputEvent.Type.SESSION_CLOSED)) {
+                log.info("Trying to reestablish connection with device {}", did);
+                executor.execute(() -> {
+                    try {
+                        netconfDeviceMap.get(did).getSession().checkAndReestablish();
+                        log.info("Connection with device {} was reestablished", did);
+                    } catch (NetconfException e) {
+                        log.error("The SSH connection with device {} couldn't be " +
+                                          "reestablished due to {}. " +
+                                          "Marking the device as unreachable", e.getMessage());
+                        log.debug("Complete exception: ", e);
+                        removeDevice(did);
+                    }
+                });
             }
         }
 
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
index dc6683f..63a95ba 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfDeviceOutputEventListenerImpl.java
@@ -48,11 +48,13 @@
                 break;
             case DEVICE_UNREGISTERED:
                 log.warn("Device {} has closed session", deviceInfo);
-                //TODO tell onos about closed session
                 break;
             case DEVICE_ERROR:
                 log.warn("Device {} has error: {}", deviceInfo, event.getMessagePayload());
                 break;
+            case SESSION_CLOSED:
+                log.warn("Device {} has closed Session: {}", deviceInfo, event.getMessagePayload());
+                break;
             default:
                 log.warn("Wrong event type {} ", event.type());
         }
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
index 72ae513..07afac8 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfSessionImpl.java
@@ -19,6 +19,7 @@
 import com.google.common.annotations.Beta;
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
+import ch.ethz.ssh2.channel.Channel;
 import com.google.common.base.Preconditions;
 import org.onosproject.netconf.TargetConfig;
 import org.onosproject.netconf.NetconfDeviceInfo;
@@ -44,7 +45,6 @@
 import java.util.regex.Pattern;
 
 
-
 /**
  * Implementation of a NETCONF session to talk to a device.
  */
@@ -99,6 +99,7 @@
     private Map<Integer, CompletableFuture<String>> replies;
     private List<String> errorReplies;
     private boolean subscriptionConnected = false;
+    private String notificationFilterSchema = null;
 
 
     public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
@@ -131,7 +132,7 @@
                             deviceInfo.password().equals("") ? null : deviceInfo.password());
                 } else if (deviceInfo.getKey() != null) {
                     log.debug("Authenticating with key to device {} with username {}",
-                            deviceInfo.getDeviceId(), deviceInfo.name());
+                              deviceInfo.getDeviceId(), deviceInfo.name());
                     isAuthenticated = netconfConnection.authenticateWithPublicKey(
                             deviceInfo.name(), deviceInfo.getKey(),
                             deviceInfo.password().equals("") ? null : deviceInfo.password());
@@ -200,6 +201,7 @@
     @Override
     public void startSubscription(String filterSchema) throws NetconfException {
         if (!subscriptionConnected) {
+            notificationFilterSchema = filterSchema;
             startSubscriptionConnection(filterSchema);
         }
         streamHandler.setEnableNotifications(true);
@@ -255,18 +257,23 @@
 
     }
 
-    private void checkAndRestablishSession() throws NetconfException {
-        if (sshSession.getState() != 2) {
+    public void checkAndReestablish() throws NetconfException {
+        if (sshSession.getState() != Channel.STATE_OPEN) {
             try {
-                log.debug("The session with {} was reopened", deviceInfo.getDeviceId());
+                log.debug("Trying to reopen the Sesion with {}", deviceInfo.getDeviceId());
                 startSshSession();
-            } catch (IOException e) {
-                log.debug("The connection with {} was reopened", deviceInfo.getDeviceId());
+            } catch (IOException | IllegalStateException e) {
+                log.debug("Trying to reopen the Connection with {}", deviceInfo.getDeviceId());
                 try {
                     connectionActive = false;
                     replies.clear();
                     messageIdInteger.set(0);
                     startConnection();
+                    if (subscriptionConnected) {
+                        log.debug("Restarting subscription with {}", deviceInfo.getDeviceId());
+                        subscriptionConnected = false;
+                        startSubscription(notificationFilterSchema);
+                    }
                 } catch (IOException e2) {
                     log.error("No connection {} for device {}", netconfConnection, e.getMessage());
                     throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
@@ -296,7 +303,7 @@
     }
 
     private String sendRequest(String request) throws NetconfException {
-        checkAndRestablishSession();
+        checkAndReestablish();
         final int messageId = messageIdInteger.getAndIncrement();
         request = formatRequestMessageId(request, messageId);
         request = formatXmlHeader(request);
@@ -639,7 +646,7 @@
     public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
         @Override
-        public void notify(NetconfDeviceOutputEvent event)  {
+        public void notify(NetconfDeviceOutputEvent event) {
             Optional<Integer> messageId = event.getMessageID();
             log.debug("messageID {}, waiting replies messageIDs {}", messageId,
                       replies.keySet());
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
index 81c6f6e..e78f9fa 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/NetconfStreamThread.java
@@ -185,7 +185,7 @@
                         log.debug("Netconf device {}  sent error char in session," +
                                           " will need to be reopend", netconfDeviceInfo);
                         NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
-                                NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
+                                NetconfDeviceOutputEvent.Type.SESSION_CLOSED,
                                 null, null, Optional.of(-1), netconfDeviceInfo);
                         netconfDeviceEventListeners.forEach(
                                 listener -> listener.event(event));