ONOS-6253 workaround for device not capable of :interleave-ing

Change-Id: Id64043dc6558cd677381bdd6133d8a7dc8c85869
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
index fc8ac3e..c0fcce3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
@@ -20,11 +20,15 @@
 import ch.ethz.ssh2.Connection;
 import ch.ethz.ssh2.Session;
 import ch.ethz.ssh2.channel.Channel;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import org.onosproject.netconf.TargetConfig;
 import org.onosproject.netconf.FilteringNetconfDeviceOutputEventListener;
 import org.onosproject.netconf.NetconfDeviceInfo;
 import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
 import org.onosproject.netconf.NetconfDeviceOutputEventListener;
 import org.onosproject.netconf.NetconfException;
 import org.onosproject.netconf.NetconfSession;
@@ -33,6 +37,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -41,6 +46,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -100,7 +106,7 @@
     private String sessionID;
     private final AtomicInteger messageIdInteger = new AtomicInteger(0);
     private Connection netconfConnection;
-    private NetconfDeviceInfo deviceInfo;
+    protected final NetconfDeviceInfo deviceInfo;
     private Session sshSession;
     private boolean connectionActive;
     private Iterable<String> onosCapabilities =
@@ -116,6 +122,11 @@
     private boolean subscriptionConnected = false;
     private String notificationFilterSchema = null;
 
+    private final Collection<NetconfDeviceOutputEventListener> primaryListeners =
+            new CopyOnWriteArrayList<>();
+    private final Collection<NetconfSession> children =
+            new CopyOnWriteArrayList<>();
+
 
     public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
         this.deviceInfo = deviceInfo;
@@ -192,10 +203,35 @@
 
 
     @Beta
-    private void startSubscriptionConnection(String filterSchema) throws NetconfException {
+    protected void startSubscriptionStream(String filterSchema) throws NetconfException {
+        boolean openNewSession = false;
         if (!deviceCapabilities.contains(INTERLEAVE_CAPABILITY_STRING)) {
-            throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+            log.info("Device {} doesn't support interleave, creating child session", deviceInfo);
+            openNewSession = true;
+
+        } else if (subscriptionConnected &&
+                   notificationFilterSchema != null &&
+                   !Objects.equal(filterSchema, notificationFilterSchema)) {
+            // interleave supported and existing filter is NOT "no filtering"
+            // and was requested with different filtering schema
+            log.info("Cannot use existing session for subscription {} ({})",
+                     deviceInfo, filterSchema);
+            openNewSession = true;
         }
+
+        if (openNewSession) {
+            log.info("Creating notification session to {} with filter {}",
+                     deviceInfo, filterSchema);
+            NetconfSession child = new NotificationSession(deviceInfo);
+
+            child.addDeviceOutputListener(new NotificationForwarder());
+
+            child.startSubscription(filterSchema);
+            children.add(child);
+            return;
+        }
+
+        // request to start interleaved notification session
         String reply = sendRequest(createSubscriptionString(filterSchema));
         if (!checkReply(reply)) {
             throw new NetconfException("Subscription not successful with device "
@@ -207,7 +243,7 @@
     @Override
     public void startSubscription() throws NetconfException {
         if (!subscriptionConnected) {
-            startSubscriptionConnection(null);
+            startSubscriptionStream(null);
         }
         streamHandler.setEnableNotifications(true);
     }
@@ -217,13 +253,13 @@
     public void startSubscription(String filterSchema) throws NetconfException {
         if (!subscriptionConnected) {
             notificationFilterSchema = filterSchema;
-            startSubscriptionConnection(filterSchema);
+            startSubscriptionStream(filterSchema);
         }
         streamHandler.setEnableNotifications(true);
     }
 
     @Beta
-    private String createSubscriptionString(String filterSchema) {
+    protected String createSubscriptionString(String filterSchema) {
         StringBuilder subscriptionbuffer = new StringBuilder();
         subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
         subscriptionbuffer.append("  <create-subscription\n");
@@ -652,10 +688,12 @@
     @Override
     public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
         streamHandler.addDeviceEventListener(listener);
+        primaryListeners.add(listener);
     }
 
     @Override
     public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+        primaryListeners.remove(listener);
         streamHandler.removeDeviceEventListener(listener);
     }
 
@@ -675,6 +713,57 @@
         return false;
     }
 
+    static class NotificationSession extends NetconfSessionImpl {
+
+        private String notificationFilter;
+
+        NotificationSession(NetconfDeviceInfo deviceInfo)
+                throws NetconfException {
+            super(deviceInfo);
+        }
+
+        @Override
+        protected void startSubscriptionStream(String filterSchema)
+                throws NetconfException {
+
+            notificationFilter = filterSchema;
+            requestSync(createSubscriptionString(filterSchema));
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("deviceInfo", deviceInfo)
+                    .add("sessionID", getSessionId())
+                    .add("notificationFilter", notificationFilter)
+                    .toString();
+        }
+    }
+
+    /**
+     * Listener attached to child session for notification streaming.
+     *
+     * Forwards all notification event from child session to primary session
+     * listeners.
+     */
+    private final class NotificationForwarder
+            implements NetconfDeviceOutputEventListener {
+
+        @Override
+        public boolean isRelevant(NetconfDeviceOutputEvent event) {
+            return event.type() == Type.DEVICE_NOTIFICATION;
+        }
+
+        @Override
+        public void event(NetconfDeviceOutputEvent event) {
+            primaryListeners.forEach(lsnr -> {
+                if (lsnr.isRelevant(event)) {
+                    lsnr.event(event);
+                }
+            });
+        }
+    }
+
     public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
 
         @Override