ONOS-6253 workaround for device not capable of :interleave-ing
Change-Id: Id64043dc6558cd677381bdd6133d8a7dc8c85869
diff --git a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
index fc8ac3e..c0fcce3 100644
--- a/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
+++ b/protocols/netconf/ctl/src/main/java/org/onosproject/netconf/ctl/impl/NetconfSessionImpl.java
@@ -20,11 +20,15 @@
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.channel.Channel;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.onosproject.netconf.TargetConfig;
import org.onosproject.netconf.FilteringNetconfDeviceOutputEventListener;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfDeviceOutputEvent;
+import org.onosproject.netconf.NetconfDeviceOutputEvent.Type;
import org.onosproject.netconf.NetconfDeviceOutputEventListener;
import org.onosproject.netconf.NetconfException;
import org.onosproject.netconf.NetconfSession;
@@ -33,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
@@ -41,6 +46,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -100,7 +106,7 @@
private String sessionID;
private final AtomicInteger messageIdInteger = new AtomicInteger(0);
private Connection netconfConnection;
- private NetconfDeviceInfo deviceInfo;
+ protected final NetconfDeviceInfo deviceInfo;
private Session sshSession;
private boolean connectionActive;
private Iterable<String> onosCapabilities =
@@ -116,6 +122,11 @@
private boolean subscriptionConnected = false;
private String notificationFilterSchema = null;
+ private final Collection<NetconfDeviceOutputEventListener> primaryListeners =
+ new CopyOnWriteArrayList<>();
+ private final Collection<NetconfSession> children =
+ new CopyOnWriteArrayList<>();
+
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
this.deviceInfo = deviceInfo;
@@ -192,10 +203,35 @@
@Beta
- private void startSubscriptionConnection(String filterSchema) throws NetconfException {
+ protected void startSubscriptionStream(String filterSchema) throws NetconfException {
+ boolean openNewSession = false;
if (!deviceCapabilities.contains(INTERLEAVE_CAPABILITY_STRING)) {
- throw new NetconfException("Device" + deviceInfo + "does not support interleave");
+ log.info("Device {} doesn't support interleave, creating child session", deviceInfo);
+ openNewSession = true;
+
+ } else if (subscriptionConnected &&
+ notificationFilterSchema != null &&
+ !Objects.equal(filterSchema, notificationFilterSchema)) {
+ // interleave supported and existing filter is NOT "no filtering"
+ // and was requested with different filtering schema
+ log.info("Cannot use existing session for subscription {} ({})",
+ deviceInfo, filterSchema);
+ openNewSession = true;
}
+
+ if (openNewSession) {
+ log.info("Creating notification session to {} with filter {}",
+ deviceInfo, filterSchema);
+ NetconfSession child = new NotificationSession(deviceInfo);
+
+ child.addDeviceOutputListener(new NotificationForwarder());
+
+ child.startSubscription(filterSchema);
+ children.add(child);
+ return;
+ }
+
+ // request to start interleaved notification session
String reply = sendRequest(createSubscriptionString(filterSchema));
if (!checkReply(reply)) {
throw new NetconfException("Subscription not successful with device "
@@ -207,7 +243,7 @@
@Override
public void startSubscription() throws NetconfException {
if (!subscriptionConnected) {
- startSubscriptionConnection(null);
+ startSubscriptionStream(null);
}
streamHandler.setEnableNotifications(true);
}
@@ -217,13 +253,13 @@
public void startSubscription(String filterSchema) throws NetconfException {
if (!subscriptionConnected) {
notificationFilterSchema = filterSchema;
- startSubscriptionConnection(filterSchema);
+ startSubscriptionStream(filterSchema);
}
streamHandler.setEnableNotifications(true);
}
@Beta
- private String createSubscriptionString(String filterSchema) {
+ protected String createSubscriptionString(String filterSchema) {
StringBuilder subscriptionbuffer = new StringBuilder();
subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
subscriptionbuffer.append(" <create-subscription\n");
@@ -652,10 +688,12 @@
@Override
public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
streamHandler.addDeviceEventListener(listener);
+ primaryListeners.add(listener);
}
@Override
public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
+ primaryListeners.remove(listener);
streamHandler.removeDeviceEventListener(listener);
}
@@ -675,6 +713,57 @@
return false;
}
+ static class NotificationSession extends NetconfSessionImpl {
+
+ private String notificationFilter;
+
+ NotificationSession(NetconfDeviceInfo deviceInfo)
+ throws NetconfException {
+ super(deviceInfo);
+ }
+
+ @Override
+ protected void startSubscriptionStream(String filterSchema)
+ throws NetconfException {
+
+ notificationFilter = filterSchema;
+ requestSync(createSubscriptionString(filterSchema));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("deviceInfo", deviceInfo)
+ .add("sessionID", getSessionId())
+ .add("notificationFilter", notificationFilter)
+ .toString();
+ }
+ }
+
+ /**
+ * Listener attached to child session for notification streaming.
+ *
+ * Forwards all notification event from child session to primary session
+ * listeners.
+ */
+ private final class NotificationForwarder
+ implements NetconfDeviceOutputEventListener {
+
+ @Override
+ public boolean isRelevant(NetconfDeviceOutputEvent event) {
+ return event.type() == Type.DEVICE_NOTIFICATION;
+ }
+
+ @Override
+ public void event(NetconfDeviceOutputEvent event) {
+ primaryListeners.forEach(lsnr -> {
+ if (lsnr.isRelevant(event)) {
+ lsnr.event(event);
+ }
+ });
+ }
+ }
+
public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
@Override