Made OpticalPathProvisioner to store connectivity data in distributed store. (ONOS-4518)
Change-Id: I7f9ef02cab4aa1848c8926d2e88478e035076c99
diff --git a/apps/newoptical/BUCK b/apps/newoptical/BUCK
index fd80f40..db15c64 100644
--- a/apps/newoptical/BUCK
+++ b/apps/newoptical/BUCK
@@ -1,5 +1,6 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
+ '//core/store/serializers:onos-core-serializers',
'//lib:org.apache.karaf.shell.console',
'//cli:onos-cli',
]
diff --git a/apps/newoptical/pom.xml b/apps/newoptical/pom.xml
index af0f841..13821f7 100644
--- a/apps/newoptical/pom.xml
+++ b/apps/newoptical/pom.xml
@@ -89,6 +89,12 @@
</dependency>
<dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
index cd5a2cd..b67564c 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
@@ -22,14 +22,14 @@
import org.onosproject.newoptical.api.OpticalConnectivityId;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Link;
-import org.onosproject.net.Path;
import java.time.Duration;
-import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
/**
* Entity to store optical connectivity request and related information.
@@ -42,48 +42,48 @@
private final Bandwidth requestBandwidth;
private final Duration requestLatency;
- // Bandwidth capacity of optical layer
- private Bandwidth opticalCapacity;
+ /**
+ * Set of packet link that is not yet established.
+ * Packet links in this set are expected to be discovered after underlying (optical) path has been provisioned.
+ */
+ private final ImmutableSet<PacketLinkRealizedByOptical> unestablishedLinks;
- private final Set<PacketLinkRealizedByOptical> realizingLinks = new HashSet<>();
+ /**
+ * Set of packet link that is already established.
+ */
+ private final ImmutableSet<PacketLinkRealizedByOptical> establishedLinks;
- private State state = State.CREATED;
-
- public enum State {
- CREATED,
- INSTALLING,
- INSTALLED,
- WITHDRAWING,
- WITHDRAWN,
- FAILED
- }
-
- public OpticalConnectivity(OpticalConnectivityId id, Path path, Bandwidth requestBandwidth,
- Duration requestLatency) {
+ public OpticalConnectivity(OpticalConnectivityId id,
+ List<Link> links,
+ Bandwidth requestBandwidth,
+ Duration requestLatency,
+ Set<PacketLinkRealizedByOptical> unestablishedLinks,
+ Set<PacketLinkRealizedByOptical> establishedLinks) {
this.id = id;
- this.links = ImmutableList.copyOf(path.links());
+ this.links = ImmutableList.copyOf(links);
this.requestBandwidth = requestBandwidth;
this.requestLatency = requestLatency;
+ this.unestablishedLinks = ImmutableSet.copyOf(unestablishedLinks);
+ this.establishedLinks = ImmutableSet.copyOf(establishedLinks);
}
- public void setLinkEstablished(ConnectPoint src, ConnectPoint dst) {
- realizingLinks.stream().filter(l -> l.isBetween(src, dst))
- .findAny()
- .ifPresent(l -> l.setEstablished(true));
- }
-
- public void setLinkRemoved(ConnectPoint src, ConnectPoint dst) {
- realizingLinks.stream().filter(l -> l.isBetween(src, dst))
- .findAny()
- .ifPresent(l -> l.setEstablished(false));
+ private OpticalConnectivity(OpticalConnectivity connectivity) {
+ this.id = connectivity.id;
+ this.links = ImmutableList.copyOf(connectivity.links);
+ this.requestBandwidth = connectivity.requestBandwidth;
+ this.requestLatency = connectivity.requestLatency;
+ this.unestablishedLinks = ImmutableSet.copyOf(connectivity.unestablishedLinks);
+ this.establishedLinks = ImmutableSet.copyOf(connectivity.establishedLinks);
}
public boolean isAllRealizingLinkEstablished() {
- return realizingLinks.stream().allMatch(PacketLinkRealizedByOptical::isEstablished);
+ // Check if all links are established
+ return unestablishedLinks.isEmpty();
}
public boolean isAllRealizingLinkNotEstablished() {
- return !realizingLinks.stream().anyMatch(PacketLinkRealizedByOptical::isEstablished);
+ // Check if any link is not established
+ return establishedLinks.isEmpty();
}
public OpticalConnectivityId id() {
@@ -102,59 +102,64 @@
return requestLatency;
}
- public State state() {
- return state;
+ public Set<PacketLinkRealizedByOptical> getEstablishedLinks() {
+ return establishedLinks;
}
- public boolean state(State state) {
- boolean valid = true;
- // reject invalid state transition
- switch (this.state) {
- case CREATED:
- valid = (state == State.INSTALLING || state == State.FAILED);
- break;
- case INSTALLING:
- valid = (state == State.INSTALLED || state == State.FAILED);
- break;
- case INSTALLED:
- valid = (state == State.WITHDRAWING || state == State.FAILED);
- break;
- case WITHDRAWING:
- valid = (state == State.WITHDRAWN || state == State.FAILED);
- break;
- case FAILED:
- valid = (state == State.INSTALLING || state == State.WITHDRAWING || state == State.FAILED);
- break;
- default:
- break;
+ public Set<PacketLinkRealizedByOptical> getUnestablishedLinks() {
+ return unestablishedLinks;
+ }
+
+ public OpticalConnectivity setLinkEstablished(ConnectPoint src,
+ ConnectPoint dst,
+ boolean established) {
+ Set<PacketLinkRealizedByOptical> newEstablishedLinks;
+ Set<PacketLinkRealizedByOptical> newUnestablishedLinks;
+
+ if (established) {
+ // move PacketLink from unestablished set to established set
+ Optional<PacketLinkRealizedByOptical> link = this.unestablishedLinks.stream()
+ .filter(l -> l.isBetween(src, dst)).findAny();
+ checkState(link.isPresent());
+
+ newUnestablishedLinks = this.unestablishedLinks.stream()
+ .filter(l -> !l.isBetween(src, dst))
+ .collect(Collectors.toSet());
+ newEstablishedLinks = ImmutableSet.<PacketLinkRealizedByOptical>builder()
+ .addAll(this.establishedLinks)
+ .add(link.get())
+ .build();
+ } else {
+ // move PacketLink from established set to unestablished set
+ Optional<PacketLinkRealizedByOptical> link = this.establishedLinks.stream()
+ .filter(l -> l.isBetween(src, dst)).findAny();
+ checkState(link.isPresent());
+
+ newEstablishedLinks = this.establishedLinks.stream()
+ .filter(l -> !l.isBetween(src, dst))
+ .collect(Collectors.toSet());
+ newUnestablishedLinks = ImmutableSet.<PacketLinkRealizedByOptical>builder()
+ .addAll(this.unestablishedLinks)
+ .add(link.get())
+ .build();
}
- if (valid) {
- this.state = state;
- }
-
- return valid;
- }
-
- public Bandwidth getOpticalCapacity() {
- return opticalCapacity;
- }
-
- public void setOpticalCapacity(Bandwidth opticalCapacity) {
- this.opticalCapacity = opticalCapacity;
- }
-
- public void addRealizingLink(PacketLinkRealizedByOptical link) {
- checkNotNull(link);
- realizingLinks.add(link);
- }
-
- public void removeRealizingLink(PacketLinkRealizedByOptical link) {
- checkNotNull(link);
- realizingLinks.remove(link);
+ return new OpticalConnectivity(this.id,
+ this.links,
+ this.requestBandwidth,
+ this.requestLatency,
+ newUnestablishedLinks,
+ newEstablishedLinks);
}
public Set<PacketLinkRealizedByOptical> getRealizingLinks() {
- return ImmutableSet.copyOf(realizingLinks);
+ return ImmutableSet.<PacketLinkRealizedByOptical>builder()
+ .addAll(unestablishedLinks)
+ .addAll(establishedLinks)
+ .build();
+ }
+
+ public static OpticalConnectivity copyOf(OpticalConnectivity connectivity) {
+ return new OpticalConnectivity(connectivity);
}
}
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
index f64e579..906fe33 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
@@ -17,7 +17,6 @@
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
@@ -27,8 +26,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Bandwidth;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerTracker;
@@ -66,20 +65,29 @@
import org.onosproject.net.topology.LinkWeight;
import org.onosproject.net.topology.PathService;
import org.onosproject.net.topology.TopologyEdge;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -99,6 +107,9 @@
protected static final Logger log = LoggerFactory.getLogger(OpticalPathProvisioner.class);
private static final String OPTICAL_CONNECTIVITY_ID_COUNTER = "optical-connectivity-id";
+ private static final String LINKPATH_MAP_NAME = "newoptical-linkpath";
+ private static final String CONNECTIVITY_MAP_NAME = "newoptical-connectivity";
+ private static final String CROSSCONNECTLINK_SET_NAME = "newoptical-crossconnectlink";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
@@ -137,14 +148,28 @@
private ListenerTracker listeners;
- private Map<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap = new ConcurrentHashMap<>();
+ private InternalStoreListener storeListener = new InternalStoreListener();
- // TODO this should be stored to distributed store
- private Map<OpticalConnectivityId, OpticalConnectivity> connectivities = new ConcurrentHashMap<>();
+ private ConsistentMap<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap;
- // TODO this should be stored to distributed store
+ private ConsistentMap<OpticalConnectivityId, OpticalConnectivity> connectivityMap;
+
// Map of cross connect link and installed path which uses the link
- private Set<Link> usedCrossConnectLinks = Sets.newConcurrentHashSet();
+ private DistributedSet<Link> usedCrossConnectLinkSet;
+
+ private static final KryoNamespace.Builder LINKPATH_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PacketLinkRealizedByOptical.class)
+ .register(OpticalConnectivityId.class)
+ .register(OpticalConnectivity.class);
+
+ private static final KryoNamespace.Builder CONNECTIVITY_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(OpticalConnectivityId.class)
+ .register(OpticalConnectivity.class);
+
+ private static final KryoNamespace.Builder CROSSCONNECTLINKS_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API);
@Activate
protected void activate() {
@@ -153,17 +178,39 @@
idCounter = storageService.getAtomicCounter(OPTICAL_CONNECTIVITY_ID_COUNTER);
+ linkPathMap = storageService.<PacketLinkRealizedByOptical, OpticalConnectivity>consistentMapBuilder()
+ .withSerializer(Serializer.using(LINKPATH_SERIALIZER.build()))
+ .withName(LINKPATH_MAP_NAME)
+ .withApplicationId(appId)
+ .build();
+
+ connectivityMap = storageService.<OpticalConnectivityId, OpticalConnectivity>consistentMapBuilder()
+ .withSerializer(Serializer.using(CONNECTIVITY_SERIALIZER.build()))
+ .withName(CONNECTIVITY_MAP_NAME)
+ .withApplicationId(appId)
+ .build();
+
+ usedCrossConnectLinkSet = storageService.<Link>setBuilder()
+ .withSerializer(Serializer.using(CROSSCONNECTLINKS_SERIALIZER.build()))
+ .withName(CROSSCONNECTLINK_SET_NAME)
+ .withApplicationId(appId)
+ .build()
+ .asDistributedSet();
+
eventDispatcher.addSink(OpticalPathEvent.class, listenerRegistry);
listeners = new ListenerTracker();
listeners.addListener(linkService, new InternalLinkListener())
.addListener(intentService, new InternalIntentListener());
+ linkPathMap.addListener(storeListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ linkPathMap.removeListener(storeListener);
listeners.removeListeners();
eventDispatcher.removeSink(OpticalPathEvent.class);
@@ -205,22 +252,67 @@
checkNotNull(path);
log.info("setupPath({}, {}, {})", path, bandwidth, latency);
- // validate optical path
- List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = getCrossConnectPoints(path);
- if (!checkXcPoints(xcPointPairs)) {
- // Can't setup path if cross connect points are mismatched
- log.error("Failed to setup path because of mismatched cross connect points.");
- return null;
+ // map of cross connect points (optical port -> packet port)
+ Map<ConnectPoint, ConnectPoint> crossConnectPointMap = new HashMap<>();
+
+ // list of (src, dst) pair of optical ports between which optical path should be installed
+ List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints = new ArrayList<>();
+
+ // Scan path to find pairs of connect points between which optical intent is installed
+ // opticalSrcPort works as a flag parameter to show scanning status
+ ConnectPoint opticalSrcPort = null;
+ for (Link link : path.links()) {
+ if (!isCrossConnectLink(link)) {
+ continue;
+ }
+
+ if (opticalSrcPort != null) {
+ // opticalSrcPort!=null means src port was already found
+ // in this case link.src() is optical layer, and link.dst() is packet layer
+
+ // Check if types of src port and dst port matches
+ Device srcDevice = checkNotNull(deviceService.getDevice(opticalSrcPort.deviceId()),
+ "Unknown device ID");
+ Device dstDevice = checkNotNull(deviceService.getDevice(link.src().deviceId()),
+ "Unknown device ID");
+ if (srcDevice.type() != dstDevice.type()) {
+ log.error("Unsupported mix of cross connect points : {}, {}",
+ srcDevice.type(), dstDevice.type());
+ return null;
+ }
+
+ // Update cross connect points map
+ crossConnectPointMap.put(link.src(), link.dst());
+
+ // Add optical ports pair to list
+ crossConnectPoints.add(Pair.of(opticalSrcPort, link.src()));
+
+ // Reset flag parameter
+ opticalSrcPort = null;
+ } else {
+ // opticalSrcPort==null means src port was not found yet
+ // in this case link.src() is packet layer, and link.dst() is optical layer
+
+ // Update cross connect points map
+ crossConnectPointMap.put(link.dst(), link.src());
+ // Set opticalSrcPort to src of link (optical port)
+ opticalSrcPort = link.dst();
+ }
}
- OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency);
+ // create intents from cross connect points
+ List<Intent> intents = createIntents(crossConnectPoints);
- // create intents from cross connect points and set connectivity information
- List<Intent> intents = createIntents(xcPointPairs, connectivity);
+ // create set of PacketLinkRealizedByOptical
+ Set<PacketLinkRealizedByOptical> packetLinks = createPacketLinkSet(crossConnectPoints,
+ intents, crossConnectPointMap);
+
+ // create OpticalConnectivity object and store information to distributed store
+ OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency, packetLinks);
// store cross connect port usage
path.links().stream().filter(this::isCrossConnectLink)
- .forEach(usedCrossConnectLinks::add);
+ .forEach(usedCrossConnectLinkSet::add);
// Submit the intents
for (Intent i : intents) {
@@ -231,12 +323,16 @@
return connectivity.id();
}
- private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency) {
+ private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency,
+ Set<PacketLinkRealizedByOptical> links) {
OpticalConnectivityId id = OpticalConnectivityId.of(idCounter.getAndIncrement());
- OpticalConnectivity connectivity = new OpticalConnectivity(id, path, bandwidth, latency);
+ OpticalConnectivity connectivity = new OpticalConnectivity(id, path.links(), bandwidth, latency,
+ links, Collections.emptySet());
+
+ links.forEach(l -> linkPathMap.put(l, connectivity));
// store connectivity information
- connectivities.put(connectivity.id(), connectivity);
+ connectivityMap.put(connectivity.id(), connectivity);
return connectivity;
}
@@ -244,7 +340,7 @@
@Override
public boolean removeConnectivity(OpticalConnectivityId id) {
log.info("removeConnectivity({})", id);
- OpticalConnectivity connectivity = connectivities.remove(id);
+ Versioned<OpticalConnectivity> connectivity = connectivityMap.remove(id);
if (connectivity == null) {
log.info("OpticalConnectivity with id {} not found.", id);
@@ -252,7 +348,7 @@
}
// TODO withdraw intent only if all of connectivities that use the optical path are withdrawn
- connectivity.getRealizingLinks().forEach(l -> {
+ connectivity.value().getRealizingLinks().forEach(l -> {
Intent intent = intentService.getIntent(l.realizingIntentKey());
intentService.withdraw(intent);
});
@@ -262,98 +358,34 @@
@Override
public Optional<List<Link>> getPath(OpticalConnectivityId id) {
- OpticalConnectivity connectivity = connectivities.get(id);
+ Versioned<OpticalConnectivity> connectivity = connectivityMap.get(id);
if (connectivity == null) {
log.info("OpticalConnectivity with id {} not found.", id);
return Optional.empty();
}
- return Optional.of(ImmutableList.copyOf(connectivity.links()));
- }
-
- /**
- * Returns list of (optical, packet) pairs of cross connection points of missing optical path sections.
- *
- * Scans the given multi-layer path and looks for sections that use cross connect links.
- * The ingress and egress points in the optical layer are combined to the packet layer ports, and
- * are returned in a list.
- *
- * @param path the multi-layer path
- * @return List of cross connect link's (packet port, optical port) pairs
- */
- private List<Pair<ConnectPoint, ConnectPoint>> getCrossConnectPoints(Path path) {
- List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = new LinkedList<>();
- boolean scanning = false;
-
- for (Link link : path.links()) {
- if (!isCrossConnectLink(link)) {
- continue;
- }
-
- if (scanning) {
- // link.src() is packet, link.dst() is optical
- xcPointPairs.add(Pair.of(checkNotNull(link.src()), checkNotNull(link.dst())));
- scanning = false;
- } else {
- // link.src() is optical, link.dst() is packet
- xcPointPairs.add(Pair.of(checkNotNull(link.dst()), checkNotNull(link.src())));
- scanning = true;
- }
- }
-
- return xcPointPairs;
- }
-
- /**
- * Checks if optical cross connect points are of same type.
- *
- * @param xcPointPairs list of cross connection points
- * @return true if cross connect point pairs are of same type, false otherwise
- */
- private boolean checkXcPoints(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs) {
- checkArgument(xcPointPairs.size() % 2 == 0);
-
- Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
-
- while (itr.hasNext()) {
- // checkArgument at start ensures we'll always have pairs of connect points
- Pair<ConnectPoint, ConnectPoint> src = itr.next();
- Pair<ConnectPoint, ConnectPoint> dst = itr.next();
-
- Device.Type srcType = deviceService.getDevice(src.getKey().deviceId()).type();
- Device.Type dstType = deviceService.getDevice(dst.getKey().deviceId()).type();
-
- // Only support connections between identical port types
- if (srcType != dstType) {
- log.warn("Unsupported mix of cross connect points : {}, {}", srcType, dstType);
- return false;
- }
- }
-
- return true;
+ return Optional.of(ImmutableList.copyOf(connectivity.value().links()));
}
/**
* Scans the list of cross connection points and returns a list of optical connectivity intents.
- * During the process, store intent ID and its realizing link information to given connectivity object.
+ * During the process, save information about packet links to given set.
*
- * @param xcPointPairs list of cross connection points
+ * @param crossConnectPoints list of (src, dst) pair between which optical path will be set up
* @return list of optical connectivity intents
*/
- private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs,
- OpticalConnectivity connectivity) {
- checkArgument(xcPointPairs.size() % 2 == 0);
-
+ private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints) {
List<Intent> intents = new LinkedList<>();
- Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
+ Iterator<Pair<ConnectPoint, ConnectPoint>> itr = crossConnectPoints.iterator();
while (itr.hasNext()) {
// checkArgument at start ensures we'll always have pairs of connect points
- Pair<ConnectPoint, ConnectPoint> src = itr.next();
- Pair<ConnectPoint, ConnectPoint> dst = itr.next();
+ Pair<ConnectPoint, ConnectPoint> next = itr.next();
+ ConnectPoint src = next.getLeft();
+ ConnectPoint dst = next.getRight();
- Port srcPort = deviceService.getPort(src.getKey().deviceId(), src.getKey().port());
- Port dstPort = deviceService.getPort(dst.getKey().deviceId(), dst.getKey().port());
+ Port srcPort = deviceService.getPort(src.deviceId(), src.port());
+ Port dstPort = deviceService.getPort(dst.deviceId(), dst.port());
if (srcPort instanceof OduCltPort && dstPort instanceof OduCltPort) {
OduCltPort srcOCPort = (OduCltPort) srcPort;
@@ -365,16 +397,12 @@
// Create OTN circuit
OpticalCircuitIntent circuitIntent = OpticalCircuitIntent.builder()
.appId(appId)
- .src(src.getKey())
- .dst(dst.getKey())
+ .src(src)
+ .dst(dst)
.signalType(srcOCPort.signalType())
.bidirectional(true)
.build();
intents.add(circuitIntent);
- PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
- circuitIntent);
- connectivity.addRealizingLink(pLink);
- linkPathMap.put(pLink, connectivity);
} else if (srcPort instanceof OchPort && dstPort instanceof OchPort) {
OchPort srcOchPort = (OchPort) srcPort;
OchPort dstOchPort = (OchPort) dstPort;
@@ -385,16 +413,12 @@
// Create lightpath
OpticalConnectivityIntent opticalIntent = OpticalConnectivityIntent.builder()
.appId(appId)
- .src(src.getKey())
- .dst(dst.getKey())
+ .src(src)
+ .dst(dst)
.signalType(srcOchPort.signalType())
.bidirectional(true)
.build();
intents.add(opticalIntent);
- PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
- opticalIntent);
- connectivity.addRealizingLink(pLink);
- linkPathMap.put(pLink, connectivity);
} else {
log.warn("Unsupported cross connect point types {} {}", srcPort.type(), dstPort.type());
return Collections.emptyList();
@@ -404,6 +428,36 @@
return intents;
}
+ private Set<PacketLinkRealizedByOptical> createPacketLinkSet(List<Pair<ConnectPoint, ConnectPoint>> connectPoints,
+ List<Intent> intents,
+ Map<ConnectPoint, ConnectPoint> crossConnectPoints) {
+ checkArgument(connectPoints.size() == intents.size());
+
+ Set<PacketLinkRealizedByOptical> pLinks = new HashSet<>();
+
+ Iterator<Pair<ConnectPoint, ConnectPoint>> xcPointsItr = connectPoints.iterator();
+ Iterator<Intent> intentItr = intents.iterator();
+ while (xcPointsItr.hasNext()) {
+ Pair<ConnectPoint, ConnectPoint> xcPoints = xcPointsItr.next();
+ Intent intent = intentItr.next();
+
+ ConnectPoint packetSrc = checkNotNull(crossConnectPoints.get(xcPoints.getLeft()));
+ ConnectPoint packetDst = checkNotNull(crossConnectPoints.get(xcPoints.getRight()));
+
+ if (intent instanceof OpticalConnectivityIntent) {
+ pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+ (OpticalConnectivityIntent) intent));
+ } else if (intent instanceof OpticalCircuitIntent) {
+ pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+ (OpticalCircuitIntent) intent));
+ } else {
+ log.warn("Unexpected intent type: {}", intent.getClass());
+ }
+ }
+
+ return pLinks;
+ }
+
/**
* Verifies if given device type is in packet layer, i.e., ROADM, OTN or ROADM_OTN device.
*
@@ -444,17 +498,14 @@
/**
* Updates bandwidth resource of given connect point.
+ *
* @param cp Connect point
* @param bandwidth New bandwidth
*/
private void updatePortBandwidth(ConnectPoint cp, Bandwidth bandwidth) {
- NodeId localNode = clusterService.getLocalNode().id();
- NodeId sourceMaster = mastershipService.getMasterFor(cp.deviceId());
- if (localNode.equals(sourceMaster)) {
- log.debug("update Port {} Bandwidth {}", cp, bandwidth);
- BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
- bwCapacity.capacity(bandwidth).apply();
- }
+ log.debug("update Port {} Bandwidth {}", cp, bandwidth);
+ BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
+ bwCapacity.capacity(bandwidth).apply();
}
/**
@@ -487,15 +538,22 @@
* @param connectivity Optical connectivity
*/
private void releaseBandwidthUsage(OpticalConnectivity connectivity) {
- OpticalConnectivityId connectivityId = connectivity.id();
-
- log.debug("releasing bandwidth allocated to {}", connectivityId);
- if (!resourceService.release(connectivityId)) {
- log.warn("Failed to release bandwidth allocated to {}",
- connectivityId);
- // TODO any recovery?
+ if (connectivity.links().isEmpty()) {
+ return;
}
- log.debug("DONE releasing bandwidth for {}", connectivityId);
+
+ // release resource only if this node is the master for link head device
+ if (mastershipService.isLocalMaster(connectivity.links().get(0).src().deviceId())) {
+ OpticalConnectivityId connectivityId = connectivity.id();
+
+ log.debug("releasing bandwidth allocated to {}", connectivityId);
+ if (!resourceService.release(connectivityId)) {
+ log.warn("Failed to release bandwidth allocated to {}",
+ connectivityId);
+ // TODO any recovery?
+ }
+ log.debug("DONE releasing bandwidth for {}", connectivityId);
+ }
}
private class BandwidthLinkWeight implements LinkWeight {
@@ -515,7 +573,7 @@
}
// Avoid cross connect links with used ports
- if (isCrossConnectLink(l) && usedCrossConnectLinks.contains(l)) {
+ if (isCrossConnectLink(l) && usedCrossConnectLinkSet.contains(l)) {
return -1.0;
}
@@ -592,18 +650,16 @@
ConnectPoint packetSrc = e.getKey().src();
ConnectPoint packetDst = e.getKey().dst();
Bandwidth bw = e.getKey().bandwidth();
- // Updates bandwidth of packet ports
- updatePortBandwidth(packetSrc, bw);
- updatePortBandwidth(packetDst, bw);
- OpticalConnectivity connectivity = e.getValue();
- connectivity.setLinkEstablished(packetSrc, packetDst);
+ // reflect modification only if packetSrc is local_
+ if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+ // Updates bandwidth of packet ports
+ updatePortBandwidth(packetSrc, bw);
+ updatePortBandwidth(packetDst, bw);
- if (e.getValue().isAllRealizingLinkEstablished()) {
- updateBandwidthUsage(connectivity);
-
- // Notifies listeners if all links are established
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, e.getValue().id()));
+ // Updates link status in distributed map
+ linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+ e.getValue().value().setLinkEstablished(packetSrc, packetDst, true));
}
});
}
@@ -633,23 +689,23 @@
.forEach(e -> {
ConnectPoint packetSrc = e.getKey().src();
ConnectPoint packetDst = e.getKey().dst();
- // Updates bandwidth of packet ports
- updatePortBandwidth(packetSrc, bw);
- updatePortBandwidth(packetDst, bw);
- OpticalConnectivity connectivity = e.getValue();
- connectivity.setLinkRemoved(packetSrc, packetDst);
- // Notifies listeners if all links are gone
- if (e.getValue().isAllRealizingLinkNotEstablished()) {
- releaseBandwidthUsage(connectivity);
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, e.getValue().id()));
+ // reflect modification only if packetSrc is local_
+ if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+ // Updates bandwidth of packet ports
+ updatePortBandwidth(packetSrc, bw);
+ updatePortBandwidth(packetDst, bw);
+
+ // Updates link status in distributed map
+ linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+ e.getValue().value().setLinkEstablished(packetSrc, packetDst, false));
}
});
}
private void removeXcLinkUsage(ConnectPoint cp) {
Optional<Link> link = linkService.getLinks(cp).stream()
- .filter(usedCrossConnectLinks::contains)
+ .filter(usedCrossConnectLinkSet::contains)
.findAny();
if (!link.isPresent()) {
@@ -657,7 +713,7 @@
return;
}
- usedCrossConnectLinks.remove(link.get());
+ usedCrossConnectLinkSet.remove(link.get());
}
}
@@ -669,22 +725,60 @@
switch (event.type()) {
case LINK_REMOVED:
Link link = event.subject();
+ // updates linkPathMap only if src device of link is local
+ if (!mastershipService.isLocalMaster(link.src().deviceId())) {
+ return;
+ }
+
+ // find all packet links that correspond to removed link
Set<PacketLinkRealizedByOptical> pLinks = linkPathMap.keySet().stream()
.filter(l -> l.isBetween(link.src(), link.dst()) || l.isBetween(link.dst(), link.src()))
.collect(Collectors.toSet());
pLinks.forEach(l -> {
- OpticalConnectivity c = linkPathMap.get(l);
- // Notifies listeners if all links are gone
- if (c.isAllRealizingLinkNotEstablished()) {
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, c.id()));
- }
- linkPathMap.remove(l);
+ // remove found packet links from distributed store
+ linkPathMap.computeIfPresent(l, (plink, conn) -> {
+ // Notifies listeners if all packet links are gone
+ if (conn.isAllRealizingLinkNotEstablished()) {
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, conn.id()));
+ }
+ return null;
+ });
});
default:
break;
}
}
}
+
+ private class InternalStoreListener
+ implements MapEventListener<PacketLinkRealizedByOptical, OpticalConnectivity> {
+
+ @Override
+ public void event(MapEvent<PacketLinkRealizedByOptical, OpticalConnectivity> event) {
+ switch (event.type()) {
+ case UPDATE:
+ OpticalConnectivity oldConnectivity = event.oldValue().value();
+ OpticalConnectivity newConnectivity = event.newValue().value();
+
+ if (!oldConnectivity.isAllRealizingLinkEstablished() &&
+ newConnectivity.isAllRealizingLinkEstablished()) {
+ // Notifies listeners if all links are established
+ updateBandwidthUsage(newConnectivity);
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, newConnectivity.id()));
+ } else if (!oldConnectivity.isAllRealizingLinkNotEstablished() &&
+ newConnectivity.isAllRealizingLinkNotEstablished()) {
+ // Notifies listeners if all links are gone
+ releaseBandwidthUsage(newConnectivity);
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, newConnectivity.id()));
+ }
+
+ break;
+ default:
+ break;
+ }
+ }
+
+ }
}
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
index 2e969c2..52728fa 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
@@ -33,9 +33,6 @@
private final Bandwidth bandwidth;
// TODO should be list of Intent Key?
private final Key realizingIntentKey;
- // established=false represents that this (packet) link is expected to be
- // discovered after underlying (optical) path has been provisioned.
- private boolean established;
/**
* Creates instance with specified parameters.
@@ -51,7 +48,6 @@
this.dst = dst;
this.realizingIntentKey = realizingIntentKey;
this.bandwidth = bandwidth;
- this.established = false;
}
/**
@@ -129,24 +125,6 @@
}
/**
- * Returns whether packet link is realized or not.
- *
- * @return true if packet link is realized. false if not.
- */
- public boolean isEstablished() {
- return established;
- }
-
- /**
- * Sets packet link to be established.
- *
- * @param established status of packet link
- */
- public void setEstablished(boolean established) {
- this.established = established;
- }
-
- /**
* Check if packet link is between specified two connect points.
*
* @param src source connect point
@@ -157,4 +135,36 @@
return (this.src.equals(src) && this.dst.equals(dst));
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ PacketLinkRealizedByOptical that = (PacketLinkRealizedByOptical) o;
+
+ if (!src.equals(that.src)) {
+ return false;
+ }
+ if (!dst.equals(that.dst)) {
+ return false;
+ }
+ if (!bandwidth.equals(that.bandwidth)) {
+ return false;
+ }
+ return realizingIntentKey.equals(that.realizingIntentKey);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = src.hashCode();
+ result = 31 * result + dst.hashCode();
+ result = 31 * result + bandwidth.hashCode();
+ result = 31 * result + realizingIntentKey.hashCode();
+ return result;
+ }
}
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
index 9a31127..b3db4fe 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
@@ -33,7 +33,7 @@
public interface OpticalPathService extends ListenerService<OpticalPathEvent, OpticalPathListener> {
/**
- * Calculates optical path between connect points and sets up connectivity.
+ * Calculates multi-layer path between connect points and sets up connectivity.
*
* @param ingress ingress port
* @param egress egress port
@@ -45,9 +45,9 @@
Bandwidth bandwidth, Duration latency);
/**
- * Sets up connectivity along given optical path.
+ * Sets up connectivity along given multi-layer path including cross-connect links.
*
- * @param path path along which connectivity will be set up
+ * @param path multi-layer path along which connectivity will be set up
* @param bandwidth required bandwidth. No bandwidth is assured if null.
* @param latency required latency. No latency is assured if null.
* @return true if successful. false otherwise.
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
index 8d0bc87..7535f7c 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
@@ -16,6 +16,7 @@
package org.onosproject.newoptical;
+import com.google.common.collect.ImmutableSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,7 +40,9 @@
import org.onosproject.newoptical.api.OpticalConnectivityId;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -94,10 +97,9 @@
Link link2 = createLink(cp22, cp31);
List<Link> links = Stream.of(link1, link2).collect(Collectors.toList());
- Path path = new MockPath(cp12, cp31, links);
-
OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
- OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+ OpticalConnectivity oc = new OpticalConnectivity(cid, links, bandwidth, latency,
+ Collections.emptySet(), Collections.emptySet());
assertNotNull(oc);
assertEquals(oc.id(), cid);
@@ -133,8 +135,6 @@
Link link6 = createLink(cp62, cp71);
List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
- Path path = new MockPath(cp12, cp71, links);
-
// Mocks 2 intents to create OduCtl connectivity
OpticalConnectivityIntent connIntent1 = createConnectivityIntent(cp21, cp32);
PacketLinkRealizedByOptical oduLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -144,29 +144,29 @@
PacketLinkRealizedByOptical oduLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
connIntent2);
+ Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(oduLink1, oduLink2);
+
Bandwidth bandwidth = Bandwidth.bps(100);
Duration latency = Duration.ofMillis(10);
OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
- OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+ OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+ plinks, Collections.emptySet());
- oc.addRealizingLink(oduLink1);
- oc.addRealizingLink(oduLink2);
-
- assertTrue(oc.isAllRealizingLinkNotEstablished());
- assertFalse(oc.isAllRealizingLinkEstablished());
+ assertTrue(oc1.isAllRealizingLinkNotEstablished());
+ assertFalse(oc1.isAllRealizingLinkEstablished());
// Sets link realized by connIntent1 to be established
- oc.setLinkEstablished(cp12, cp41);
+ OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
- assertFalse(oc.isAllRealizingLinkNotEstablished());
- assertFalse(oc.isAllRealizingLinkEstablished());
+ assertFalse(oc2.isAllRealizingLinkNotEstablished());
+ assertFalse(oc2.isAllRealizingLinkEstablished());
// Sets link realized by connIntent2 to be established
- oc.setLinkEstablished(cp42, cp71);
+ OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
- assertFalse(oc.isAllRealizingLinkNotEstablished());
- assertTrue(oc.isAllRealizingLinkEstablished());
+ assertFalse(oc3.isAllRealizingLinkNotEstablished());
+ assertTrue(oc3.isAllRealizingLinkEstablished());
}
/**
@@ -196,8 +196,6 @@
Link link6 = createLink(cp62, cp71);
List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
- Path path = new MockPath(cp12, cp71, links);
-
// Mocks 2 intents to create Och connectivity
OpticalCircuitIntent circuitIntent1 = createCircuitIntent(cp21, cp32);
PacketLinkRealizedByOptical ochLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -207,29 +205,29 @@
PacketLinkRealizedByOptical ochLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
circuitIntent2);
+ Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(ochLink1, ochLink2);
+
Bandwidth bandwidth = Bandwidth.bps(100);
Duration latency = Duration.ofMillis(10);
OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
- OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+ OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+ plinks, Collections.emptySet());
- oc.addRealizingLink(ochLink1);
- oc.addRealizingLink(ochLink2);
-
- assertTrue(oc.isAllRealizingLinkNotEstablished());
- assertFalse(oc.isAllRealizingLinkEstablished());
+ assertTrue(oc1.isAllRealizingLinkNotEstablished());
+ assertFalse(oc1.isAllRealizingLinkEstablished());
// Sets link realized by circuitIntent1 to be established
- oc.setLinkEstablished(cp12, cp41);
+ OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
- assertFalse(oc.isAllRealizingLinkNotEstablished());
- assertFalse(oc.isAllRealizingLinkEstablished());
+ assertFalse(oc2.isAllRealizingLinkNotEstablished());
+ assertFalse(oc2.isAllRealizingLinkEstablished());
// Sets link realized by circuitIntent2 to be established
- oc.setLinkEstablished(cp42, cp71);
+ OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
- assertFalse(oc.isAllRealizingLinkNotEstablished());
- assertTrue(oc.isAllRealizingLinkEstablished());
+ assertFalse(oc3.isAllRealizingLinkNotEstablished());
+ assertTrue(oc3.isAllRealizingLinkEstablished());
}
private ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
index efbaf77..b7d26e3 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
@@ -21,9 +21,12 @@
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.ChassisId;
+import org.onlab.packet.IpAddress;
import org.onlab.util.Bandwidth;
import org.onlab.util.Frequency;
import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.core.DefaultApplicationId;
@@ -44,12 +47,15 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.Link;
+import org.onosproject.net.MastershipRole;
import org.onosproject.net.OchSignal;
import org.onosproject.net.OduSignalType;
import org.onosproject.net.Path;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.Config;
import org.onosproject.net.config.NetworkConfigServiceAdapter;
+import org.onosproject.net.config.basics.BandwidthCapacity;
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentEvent;
@@ -74,8 +80,20 @@
import org.onosproject.newoptical.api.OpticalConnectivityId;
import org.onosproject.newoptical.api.OpticalPathEvent;
import org.onosproject.newoptical.api.OpticalPathListener;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.DistributedSetAdapter;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.StorageServiceAdapter;
+import org.onosproject.store.service.Versioned;
import java.time.Duration;
import java.util.ArrayList;
@@ -87,6 +105,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -155,6 +175,8 @@
protected TestLinkService linkService;
protected TestPathService pathService;
protected TestIntentService intentService;
+ protected TestMastershipService mastershipService;
+ protected TestClusterService clusterService;
protected IdGenerator idGenerator;
@Before
@@ -188,14 +210,24 @@
this.pathService = new TestPathService();
this.intentService = new TestIntentService();
+ this.mastershipService = new TestMastershipService();
+ this.clusterService = new TestClusterService();
+
+ mastershipService.setMastership(DEVICE1.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE2.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE3.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE4.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE5.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE6.id(), MastershipRole.MASTER);
+ mastershipService.setMastership(DEVICE7.id(), MastershipRole.MASTER);
this.target = new OpticalPathProvisioner();
target.coreService = new TestCoreService();
target.intentService = this.intentService;
target.pathService = this.pathService;
target.linkService = this.linkService;
- target.mastershipService = new TestMastershipService();
- target.clusterService = new TestClusterService();
+ target.mastershipService = this.mastershipService;
+ target.clusterService = this.clusterService;
target.storageService = new TestStorageService();
target.deviceService = this.deviceService;
target.networkConfigService = new TestNetworkConfigService();
@@ -310,15 +342,16 @@
}
/**
- * Checks if PATH_INSTALLED event comes up after intent is installed.
+ * Checks if PATH_INSTALLED event comes up after intent whose master is this node is installed.
*/
@Test
- public void testInstalledEvent() {
+ public void testInstalledEventLocal() {
Bandwidth bandwidth = Bandwidth.bps(100);
Duration latency = Duration.ofMillis(10);
OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+ // notify all intents are installed
intentService.notifyInstalled();
assertEquals(1, listener.events.size());
@@ -327,26 +360,77 @@
}
/**
- * Checks if PATH_REMOVED event comes up after packet link is removed.
+ * Checks if PATH_INSTALLED event comes up after intent whose master is remote node is installed.
*/
@Test
- public void testRemovedEvent() {
+ public void testInstalledEventRemote() {
+ // set the master for ingress device of intent to remote node
+ mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
Bandwidth bandwidth = Bandwidth.bps(100);
Duration latency = Duration.ofMillis(10);
OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+ // notify all intents are installed
+ intentService.notifyInstalled();
+
+ // remote nodes must not receive event before distributed map is updated
+ assertEquals(0, listener.events.size());
+ }
+
+ /**
+ * Checks if PATH_REMOVED event comes up after packet link is removed.
+ */
+ @Test
+ public void testRemovedEventLocal() {
+ Bandwidth bandwidth = Bandwidth.bps(100);
+ Duration latency = Duration.ofMillis(10);
+
+ OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+ // notify all intents are installed
intentService.notifyInstalled();
target.removeConnectivity(cid);
+ // notify all intents are withdrawn
intentService.notifyWithdrawn();
+ // must have received "INSTALLED" and "REMOVED" events
assertEquals(2, listener.events.size());
+ assertEquals(OpticalPathEvent.Type.PATH_INSTALLED, listener.events.get(0).type());
+ assertEquals(cid, listener.events.get(0).subject());
assertEquals(OpticalPathEvent.Type.PATH_REMOVED, listener.events.get(1).type());
assertEquals(cid, listener.events.get(1).subject());
}
+
+ /**
+ * Checks if PATH_REMOVED event comes up after packet link is removed.
+ */
+ @Test
+ public void testRemovedEventRemote() {
+ // set the master for ingress device of intent to remote node
+ mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
+ Bandwidth bandwidth = Bandwidth.bps(100);
+ Duration latency = Duration.ofMillis(10);
+
+ OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+ // notify all intents are installed
+ intentService.notifyInstalled();
+
+ target.removeConnectivity(cid);
+
+ // notify all intents are withdrawn
+ intentService.notifyWithdrawn();
+
+ // remote nodes must not receive event before distributed map is updated
+ assertEquals(0, listener.events.size());
+ }
+
private static ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
return new ConnectPoint(
deviceIdOf(devIdNum),
@@ -495,18 +579,192 @@
}
private static class TestMastershipService extends MastershipServiceAdapter {
+ private Map<DeviceId, MastershipRole> mastershipMap = new HashMap<>();
+
+ public void setMastership(DeviceId deviceId, MastershipRole role) {
+ mastershipMap.put(deviceId, role);
+ }
+
+ public void clear() {
+ mastershipMap.clear();
+ }
+
+ @Override
+ public MastershipRole getLocalRole(DeviceId deviceId) {
+ return mastershipMap.get(deviceId);
+ }
}
private static class TestClusterService extends ClusterServiceAdapter {
+ private NodeId nodeId;
+ public void setLocalNode(String nodeIdStr) {
+ nodeId = NodeId.nodeId(nodeIdStr);
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return new ControllerNode() {
+ @Override
+ public NodeId id() {
+ return nodeId;
+ }
+
+ @Override
+ public IpAddress ip() {
+ return null;
+ }
+
+ @Override
+ public int tcpPort() {
+ return 0;
+ }
+ };
+ }
}
private static class TestStorageService extends StorageServiceAdapter {
+
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ ConsistentMapBuilder<K, V> builder = new ConsistentMapBuilder<K, V>() {
+ @Override
+ public AsyncConsistentMap<K, V> buildAsyncMap() {
+ return null;
+ }
+
+ @Override
+ public ConsistentMap<K, V> build() {
+ return new TestConsistentMap<K, V>();
+ }
+ };
+
+ return builder;
+ }
+
+ @Override
+ public <E> DistributedSetBuilder<E> setBuilder() {
+ DistributedSetBuilder<E> builder = new DistributedSetBuilder<E>() {
+ @Override
+ public AsyncDistributedSet<E> build() {
+ return new DistributedSetAdapter<E>() {
+ @Override
+ public DistributedSet<E> asDistributedSet() {
+ return new TestDistributedSet<E>();
+ }
+ };
+ }
+ };
+
+ return builder;
+ }
+
@Override
public AtomicCounter getAtomicCounter(String name) {
return new MockAtomicCounter();
}
+
+ // Mock ConsistentMap that behaves as a HashMap
+ class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
+ private Map<K, Versioned<V>> map = new HashMap<>();
+ private Map<MapEventListener<K, V>, Executor> listeners = new HashMap<>();
+
+ public void notifyListeners(MapEvent<K, V> event) {
+ listeners.forEach((c, e) -> e.execute(() -> c.event(event)));
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public Versioned<V> put(K key, V value) {
+ Versioned<V> oldValue = map.get(key);
+ Versioned<V> newValue = new Versioned<>(value, oldValue == null ? 0 : oldValue.version() + 1);
+ map.put(key, newValue);
+ notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+ return newValue;
+ }
+
+ @Override
+ public Versioned<V> get(K key) {
+ return map.get(key);
+ }
+
+ @Override
+ public Versioned<V> remove(K key) {
+ Versioned<V> oldValue = map.remove(key);
+ notifyListeners(new MapEvent<>(name(), key, oldValue, null));
+ return oldValue;
+ }
+
+ @Override
+ public Versioned<V> computeIfPresent(K key,
+ BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ Versioned<V> oldValue = map.get(key);
+ Versioned<V> newValue = new Versioned<>(remappingFunction.apply(key, oldValue.value()),
+ oldValue == null ? 0 : oldValue.version() + 1);
+ map.put(key, newValue);
+ notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+ return newValue;
+ }
+
+
+ @Override
+ public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+ return map.entrySet();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public Collection<Versioned<V>> values() {
+ return map.values();
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ @Override
+ public void addListener(MapEventListener<K, V> listener, Executor executor) {
+ listeners.put(listener, executor);
+ }
+
+ @Override
+ public void removeListener(MapEventListener<K, V> listener) {
+ listeners.remove(listener);
+ }
+ }
+
+ // Mock DistributedSet that behaves as a HashSet
+ class TestDistributedSet<E> extends HashSet<E> implements DistributedSet<E> {
+
+ @Override
+ public void addListener(SetEventListener<E> listener) {
+ }
+
+ @Override
+ public void removeListener(SetEventListener<E> listener) {
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return null;
+ }
+ }
+
}
private static class TestDeviceService extends DeviceServiceAdapter {
@@ -525,6 +783,25 @@
}
private static class TestNetworkConfigService extends NetworkConfigServiceAdapter {
+ @Override
+ @SuppressWarnings("unchecked")
+ public <S, C extends Config<S>> C addConfig(S subject, Class<C> configClass) {
+ if (BandwidthCapacity.class.equals(configClass)) {
+ return (C) new BandwidthCapacity() {
+ @Override
+ public void apply() {
+ // do nothing
+ }
+
+ @Override
+ public BandwidthCapacity capacity(Bandwidth bandwidth) {
+ // do nothing
+ return this;
+ }
+ };
+ }
+ return null;
+ }
}