Made OpticalPathProvisioner to store connectivity data in distributed store. (ONOS-4518)
Change-Id: I7f9ef02cab4aa1848c8926d2e88478e035076c99
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
index f64e579..906fe33 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
@@ -17,7 +17,6 @@
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
@@ -27,8 +26,8 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Bandwidth;
+import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerTracker;
@@ -66,20 +65,29 @@
import org.onosproject.net.topology.LinkWeight;
import org.onosproject.net.topology.PathService;
import org.onosproject.net.topology.TopologyEdge;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -99,6 +107,9 @@
protected static final Logger log = LoggerFactory.getLogger(OpticalPathProvisioner.class);
private static final String OPTICAL_CONNECTIVITY_ID_COUNTER = "optical-connectivity-id";
+ private static final String LINKPATH_MAP_NAME = "newoptical-linkpath";
+ private static final String CONNECTIVITY_MAP_NAME = "newoptical-connectivity";
+ private static final String CROSSCONNECTLINK_SET_NAME = "newoptical-crossconnectlink";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
@@ -137,14 +148,28 @@
private ListenerTracker listeners;
- private Map<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap = new ConcurrentHashMap<>();
+ private InternalStoreListener storeListener = new InternalStoreListener();
- // TODO this should be stored to distributed store
- private Map<OpticalConnectivityId, OpticalConnectivity> connectivities = new ConcurrentHashMap<>();
+ private ConsistentMap<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap;
- // TODO this should be stored to distributed store
+ private ConsistentMap<OpticalConnectivityId, OpticalConnectivity> connectivityMap;
+
// Map of cross connect link and installed path which uses the link
- private Set<Link> usedCrossConnectLinks = Sets.newConcurrentHashSet();
+ private DistributedSet<Link> usedCrossConnectLinkSet;
+
+ private static final KryoNamespace.Builder LINKPATH_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(PacketLinkRealizedByOptical.class)
+ .register(OpticalConnectivityId.class)
+ .register(OpticalConnectivity.class);
+
+ private static final KryoNamespace.Builder CONNECTIVITY_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(OpticalConnectivityId.class)
+ .register(OpticalConnectivity.class);
+
+ private static final KryoNamespace.Builder CROSSCONNECTLINKS_SERIALIZER = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API);
@Activate
protected void activate() {
@@ -153,17 +178,39 @@
idCounter = storageService.getAtomicCounter(OPTICAL_CONNECTIVITY_ID_COUNTER);
+ linkPathMap = storageService.<PacketLinkRealizedByOptical, OpticalConnectivity>consistentMapBuilder()
+ .withSerializer(Serializer.using(LINKPATH_SERIALIZER.build()))
+ .withName(LINKPATH_MAP_NAME)
+ .withApplicationId(appId)
+ .build();
+
+ connectivityMap = storageService.<OpticalConnectivityId, OpticalConnectivity>consistentMapBuilder()
+ .withSerializer(Serializer.using(CONNECTIVITY_SERIALIZER.build()))
+ .withName(CONNECTIVITY_MAP_NAME)
+ .withApplicationId(appId)
+ .build();
+
+ usedCrossConnectLinkSet = storageService.<Link>setBuilder()
+ .withSerializer(Serializer.using(CROSSCONNECTLINKS_SERIALIZER.build()))
+ .withName(CROSSCONNECTLINK_SET_NAME)
+ .withApplicationId(appId)
+ .build()
+ .asDistributedSet();
+
eventDispatcher.addSink(OpticalPathEvent.class, listenerRegistry);
listeners = new ListenerTracker();
listeners.addListener(linkService, new InternalLinkListener())
.addListener(intentService, new InternalIntentListener());
+ linkPathMap.addListener(storeListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ linkPathMap.removeListener(storeListener);
listeners.removeListeners();
eventDispatcher.removeSink(OpticalPathEvent.class);
@@ -205,22 +252,67 @@
checkNotNull(path);
log.info("setupPath({}, {}, {})", path, bandwidth, latency);
- // validate optical path
- List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = getCrossConnectPoints(path);
- if (!checkXcPoints(xcPointPairs)) {
- // Can't setup path if cross connect points are mismatched
- log.error("Failed to setup path because of mismatched cross connect points.");
- return null;
+ // map of cross connect points (optical port -> packet port)
+ Map<ConnectPoint, ConnectPoint> crossConnectPointMap = new HashMap<>();
+
+ // list of (src, dst) pair of optical ports between which optical path should be installed
+ List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints = new ArrayList<>();
+
+ // Scan path to find pairs of connect points between which optical intent is installed
+ // opticalSrcPort works as a flag parameter to show scanning status
+ ConnectPoint opticalSrcPort = null;
+ for (Link link : path.links()) {
+ if (!isCrossConnectLink(link)) {
+ continue;
+ }
+
+ if (opticalSrcPort != null) {
+ // opticalSrcPort!=null means src port was already found
+ // in this case link.src() is optical layer, and link.dst() is packet layer
+
+ // Check if types of src port and dst port matches
+ Device srcDevice = checkNotNull(deviceService.getDevice(opticalSrcPort.deviceId()),
+ "Unknown device ID");
+ Device dstDevice = checkNotNull(deviceService.getDevice(link.src().deviceId()),
+ "Unknown device ID");
+ if (srcDevice.type() != dstDevice.type()) {
+ log.error("Unsupported mix of cross connect points : {}, {}",
+ srcDevice.type(), dstDevice.type());
+ return null;
+ }
+
+ // Update cross connect points map
+ crossConnectPointMap.put(link.src(), link.dst());
+
+ // Add optical ports pair to list
+ crossConnectPoints.add(Pair.of(opticalSrcPort, link.src()));
+
+ // Reset flag parameter
+ opticalSrcPort = null;
+ } else {
+ // opticalSrcPort==null means src port was not found yet
+ // in this case link.src() is packet layer, and link.dst() is optical layer
+
+ // Update cross connect points map
+ crossConnectPointMap.put(link.dst(), link.src());
+ // Set opticalSrcPort to src of link (optical port)
+ opticalSrcPort = link.dst();
+ }
}
- OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency);
+ // create intents from cross connect points
+ List<Intent> intents = createIntents(crossConnectPoints);
- // create intents from cross connect points and set connectivity information
- List<Intent> intents = createIntents(xcPointPairs, connectivity);
+ // create set of PacketLinkRealizedByOptical
+ Set<PacketLinkRealizedByOptical> packetLinks = createPacketLinkSet(crossConnectPoints,
+ intents, crossConnectPointMap);
+
+ // create OpticalConnectivity object and store information to distributed store
+ OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency, packetLinks);
// store cross connect port usage
path.links().stream().filter(this::isCrossConnectLink)
- .forEach(usedCrossConnectLinks::add);
+ .forEach(usedCrossConnectLinkSet::add);
// Submit the intents
for (Intent i : intents) {
@@ -231,12 +323,16 @@
return connectivity.id();
}
- private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency) {
+ private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency,
+ Set<PacketLinkRealizedByOptical> links) {
OpticalConnectivityId id = OpticalConnectivityId.of(idCounter.getAndIncrement());
- OpticalConnectivity connectivity = new OpticalConnectivity(id, path, bandwidth, latency);
+ OpticalConnectivity connectivity = new OpticalConnectivity(id, path.links(), bandwidth, latency,
+ links, Collections.emptySet());
+
+ links.forEach(l -> linkPathMap.put(l, connectivity));
// store connectivity information
- connectivities.put(connectivity.id(), connectivity);
+ connectivityMap.put(connectivity.id(), connectivity);
return connectivity;
}
@@ -244,7 +340,7 @@
@Override
public boolean removeConnectivity(OpticalConnectivityId id) {
log.info("removeConnectivity({})", id);
- OpticalConnectivity connectivity = connectivities.remove(id);
+ Versioned<OpticalConnectivity> connectivity = connectivityMap.remove(id);
if (connectivity == null) {
log.info("OpticalConnectivity with id {} not found.", id);
@@ -252,7 +348,7 @@
}
// TODO withdraw intent only if all of connectivities that use the optical path are withdrawn
- connectivity.getRealizingLinks().forEach(l -> {
+ connectivity.value().getRealizingLinks().forEach(l -> {
Intent intent = intentService.getIntent(l.realizingIntentKey());
intentService.withdraw(intent);
});
@@ -262,98 +358,34 @@
@Override
public Optional<List<Link>> getPath(OpticalConnectivityId id) {
- OpticalConnectivity connectivity = connectivities.get(id);
+ Versioned<OpticalConnectivity> connectivity = connectivityMap.get(id);
if (connectivity == null) {
log.info("OpticalConnectivity with id {} not found.", id);
return Optional.empty();
}
- return Optional.of(ImmutableList.copyOf(connectivity.links()));
- }
-
- /**
- * Returns list of (optical, packet) pairs of cross connection points of missing optical path sections.
- *
- * Scans the given multi-layer path and looks for sections that use cross connect links.
- * The ingress and egress points in the optical layer are combined to the packet layer ports, and
- * are returned in a list.
- *
- * @param path the multi-layer path
- * @return List of cross connect link's (packet port, optical port) pairs
- */
- private List<Pair<ConnectPoint, ConnectPoint>> getCrossConnectPoints(Path path) {
- List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = new LinkedList<>();
- boolean scanning = false;
-
- for (Link link : path.links()) {
- if (!isCrossConnectLink(link)) {
- continue;
- }
-
- if (scanning) {
- // link.src() is packet, link.dst() is optical
- xcPointPairs.add(Pair.of(checkNotNull(link.src()), checkNotNull(link.dst())));
- scanning = false;
- } else {
- // link.src() is optical, link.dst() is packet
- xcPointPairs.add(Pair.of(checkNotNull(link.dst()), checkNotNull(link.src())));
- scanning = true;
- }
- }
-
- return xcPointPairs;
- }
-
- /**
- * Checks if optical cross connect points are of same type.
- *
- * @param xcPointPairs list of cross connection points
- * @return true if cross connect point pairs are of same type, false otherwise
- */
- private boolean checkXcPoints(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs) {
- checkArgument(xcPointPairs.size() % 2 == 0);
-
- Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
-
- while (itr.hasNext()) {
- // checkArgument at start ensures we'll always have pairs of connect points
- Pair<ConnectPoint, ConnectPoint> src = itr.next();
- Pair<ConnectPoint, ConnectPoint> dst = itr.next();
-
- Device.Type srcType = deviceService.getDevice(src.getKey().deviceId()).type();
- Device.Type dstType = deviceService.getDevice(dst.getKey().deviceId()).type();
-
- // Only support connections between identical port types
- if (srcType != dstType) {
- log.warn("Unsupported mix of cross connect points : {}, {}", srcType, dstType);
- return false;
- }
- }
-
- return true;
+ return Optional.of(ImmutableList.copyOf(connectivity.value().links()));
}
/**
* Scans the list of cross connection points and returns a list of optical connectivity intents.
- * During the process, store intent ID and its realizing link information to given connectivity object.
+ * During the process, save information about packet links to given set.
*
- * @param xcPointPairs list of cross connection points
+ * @param crossConnectPoints list of (src, dst) pair between which optical path will be set up
* @return list of optical connectivity intents
*/
- private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs,
- OpticalConnectivity connectivity) {
- checkArgument(xcPointPairs.size() % 2 == 0);
-
+ private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints) {
List<Intent> intents = new LinkedList<>();
- Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
+ Iterator<Pair<ConnectPoint, ConnectPoint>> itr = crossConnectPoints.iterator();
while (itr.hasNext()) {
// checkArgument at start ensures we'll always have pairs of connect points
- Pair<ConnectPoint, ConnectPoint> src = itr.next();
- Pair<ConnectPoint, ConnectPoint> dst = itr.next();
+ Pair<ConnectPoint, ConnectPoint> next = itr.next();
+ ConnectPoint src = next.getLeft();
+ ConnectPoint dst = next.getRight();
- Port srcPort = deviceService.getPort(src.getKey().deviceId(), src.getKey().port());
- Port dstPort = deviceService.getPort(dst.getKey().deviceId(), dst.getKey().port());
+ Port srcPort = deviceService.getPort(src.deviceId(), src.port());
+ Port dstPort = deviceService.getPort(dst.deviceId(), dst.port());
if (srcPort instanceof OduCltPort && dstPort instanceof OduCltPort) {
OduCltPort srcOCPort = (OduCltPort) srcPort;
@@ -365,16 +397,12 @@
// Create OTN circuit
OpticalCircuitIntent circuitIntent = OpticalCircuitIntent.builder()
.appId(appId)
- .src(src.getKey())
- .dst(dst.getKey())
+ .src(src)
+ .dst(dst)
.signalType(srcOCPort.signalType())
.bidirectional(true)
.build();
intents.add(circuitIntent);
- PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
- circuitIntent);
- connectivity.addRealizingLink(pLink);
- linkPathMap.put(pLink, connectivity);
} else if (srcPort instanceof OchPort && dstPort instanceof OchPort) {
OchPort srcOchPort = (OchPort) srcPort;
OchPort dstOchPort = (OchPort) dstPort;
@@ -385,16 +413,12 @@
// Create lightpath
OpticalConnectivityIntent opticalIntent = OpticalConnectivityIntent.builder()
.appId(appId)
- .src(src.getKey())
- .dst(dst.getKey())
+ .src(src)
+ .dst(dst)
.signalType(srcOchPort.signalType())
.bidirectional(true)
.build();
intents.add(opticalIntent);
- PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
- opticalIntent);
- connectivity.addRealizingLink(pLink);
- linkPathMap.put(pLink, connectivity);
} else {
log.warn("Unsupported cross connect point types {} {}", srcPort.type(), dstPort.type());
return Collections.emptyList();
@@ -404,6 +428,36 @@
return intents;
}
+ private Set<PacketLinkRealizedByOptical> createPacketLinkSet(List<Pair<ConnectPoint, ConnectPoint>> connectPoints,
+ List<Intent> intents,
+ Map<ConnectPoint, ConnectPoint> crossConnectPoints) {
+ checkArgument(connectPoints.size() == intents.size());
+
+ Set<PacketLinkRealizedByOptical> pLinks = new HashSet<>();
+
+ Iterator<Pair<ConnectPoint, ConnectPoint>> xcPointsItr = connectPoints.iterator();
+ Iterator<Intent> intentItr = intents.iterator();
+ while (xcPointsItr.hasNext()) {
+ Pair<ConnectPoint, ConnectPoint> xcPoints = xcPointsItr.next();
+ Intent intent = intentItr.next();
+
+ ConnectPoint packetSrc = checkNotNull(crossConnectPoints.get(xcPoints.getLeft()));
+ ConnectPoint packetDst = checkNotNull(crossConnectPoints.get(xcPoints.getRight()));
+
+ if (intent instanceof OpticalConnectivityIntent) {
+ pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+ (OpticalConnectivityIntent) intent));
+ } else if (intent instanceof OpticalCircuitIntent) {
+ pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+ (OpticalCircuitIntent) intent));
+ } else {
+ log.warn("Unexpected intent type: {}", intent.getClass());
+ }
+ }
+
+ return pLinks;
+ }
+
/**
* Verifies if given device type is in packet layer, i.e., ROADM, OTN or ROADM_OTN device.
*
@@ -444,17 +498,14 @@
/**
* Updates bandwidth resource of given connect point.
+ *
* @param cp Connect point
* @param bandwidth New bandwidth
*/
private void updatePortBandwidth(ConnectPoint cp, Bandwidth bandwidth) {
- NodeId localNode = clusterService.getLocalNode().id();
- NodeId sourceMaster = mastershipService.getMasterFor(cp.deviceId());
- if (localNode.equals(sourceMaster)) {
- log.debug("update Port {} Bandwidth {}", cp, bandwidth);
- BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
- bwCapacity.capacity(bandwidth).apply();
- }
+ log.debug("update Port {} Bandwidth {}", cp, bandwidth);
+ BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
+ bwCapacity.capacity(bandwidth).apply();
}
/**
@@ -487,15 +538,22 @@
* @param connectivity Optical connectivity
*/
private void releaseBandwidthUsage(OpticalConnectivity connectivity) {
- OpticalConnectivityId connectivityId = connectivity.id();
-
- log.debug("releasing bandwidth allocated to {}", connectivityId);
- if (!resourceService.release(connectivityId)) {
- log.warn("Failed to release bandwidth allocated to {}",
- connectivityId);
- // TODO any recovery?
+ if (connectivity.links().isEmpty()) {
+ return;
}
- log.debug("DONE releasing bandwidth for {}", connectivityId);
+
+ // release resource only if this node is the master for link head device
+ if (mastershipService.isLocalMaster(connectivity.links().get(0).src().deviceId())) {
+ OpticalConnectivityId connectivityId = connectivity.id();
+
+ log.debug("releasing bandwidth allocated to {}", connectivityId);
+ if (!resourceService.release(connectivityId)) {
+ log.warn("Failed to release bandwidth allocated to {}",
+ connectivityId);
+ // TODO any recovery?
+ }
+ log.debug("DONE releasing bandwidth for {}", connectivityId);
+ }
}
private class BandwidthLinkWeight implements LinkWeight {
@@ -515,7 +573,7 @@
}
// Avoid cross connect links with used ports
- if (isCrossConnectLink(l) && usedCrossConnectLinks.contains(l)) {
+ if (isCrossConnectLink(l) && usedCrossConnectLinkSet.contains(l)) {
return -1.0;
}
@@ -592,18 +650,16 @@
ConnectPoint packetSrc = e.getKey().src();
ConnectPoint packetDst = e.getKey().dst();
Bandwidth bw = e.getKey().bandwidth();
- // Updates bandwidth of packet ports
- updatePortBandwidth(packetSrc, bw);
- updatePortBandwidth(packetDst, bw);
- OpticalConnectivity connectivity = e.getValue();
- connectivity.setLinkEstablished(packetSrc, packetDst);
+ // reflect modification only if packetSrc is local_
+ if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+ // Updates bandwidth of packet ports
+ updatePortBandwidth(packetSrc, bw);
+ updatePortBandwidth(packetDst, bw);
- if (e.getValue().isAllRealizingLinkEstablished()) {
- updateBandwidthUsage(connectivity);
-
- // Notifies listeners if all links are established
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, e.getValue().id()));
+ // Updates link status in distributed map
+ linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+ e.getValue().value().setLinkEstablished(packetSrc, packetDst, true));
}
});
}
@@ -633,23 +689,23 @@
.forEach(e -> {
ConnectPoint packetSrc = e.getKey().src();
ConnectPoint packetDst = e.getKey().dst();
- // Updates bandwidth of packet ports
- updatePortBandwidth(packetSrc, bw);
- updatePortBandwidth(packetDst, bw);
- OpticalConnectivity connectivity = e.getValue();
- connectivity.setLinkRemoved(packetSrc, packetDst);
- // Notifies listeners if all links are gone
- if (e.getValue().isAllRealizingLinkNotEstablished()) {
- releaseBandwidthUsage(connectivity);
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, e.getValue().id()));
+ // reflect modification only if packetSrc is local_
+ if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+ // Updates bandwidth of packet ports
+ updatePortBandwidth(packetSrc, bw);
+ updatePortBandwidth(packetDst, bw);
+
+ // Updates link status in distributed map
+ linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+ e.getValue().value().setLinkEstablished(packetSrc, packetDst, false));
}
});
}
private void removeXcLinkUsage(ConnectPoint cp) {
Optional<Link> link = linkService.getLinks(cp).stream()
- .filter(usedCrossConnectLinks::contains)
+ .filter(usedCrossConnectLinkSet::contains)
.findAny();
if (!link.isPresent()) {
@@ -657,7 +713,7 @@
return;
}
- usedCrossConnectLinks.remove(link.get());
+ usedCrossConnectLinkSet.remove(link.get());
}
}
@@ -669,22 +725,60 @@
switch (event.type()) {
case LINK_REMOVED:
Link link = event.subject();
+ // updates linkPathMap only if src device of link is local
+ if (!mastershipService.isLocalMaster(link.src().deviceId())) {
+ return;
+ }
+
+ // find all packet links that correspond to removed link
Set<PacketLinkRealizedByOptical> pLinks = linkPathMap.keySet().stream()
.filter(l -> l.isBetween(link.src(), link.dst()) || l.isBetween(link.dst(), link.src()))
.collect(Collectors.toSet());
pLinks.forEach(l -> {
- OpticalConnectivity c = linkPathMap.get(l);
- // Notifies listeners if all links are gone
- if (c.isAllRealizingLinkNotEstablished()) {
- post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, c.id()));
- }
- linkPathMap.remove(l);
+ // remove found packet links from distributed store
+ linkPathMap.computeIfPresent(l, (plink, conn) -> {
+ // Notifies listeners if all packet links are gone
+ if (conn.isAllRealizingLinkNotEstablished()) {
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, conn.id()));
+ }
+ return null;
+ });
});
default:
break;
}
}
}
+
+ private class InternalStoreListener
+ implements MapEventListener<PacketLinkRealizedByOptical, OpticalConnectivity> {
+
+ @Override
+ public void event(MapEvent<PacketLinkRealizedByOptical, OpticalConnectivity> event) {
+ switch (event.type()) {
+ case UPDATE:
+ OpticalConnectivity oldConnectivity = event.oldValue().value();
+ OpticalConnectivity newConnectivity = event.newValue().value();
+
+ if (!oldConnectivity.isAllRealizingLinkEstablished() &&
+ newConnectivity.isAllRealizingLinkEstablished()) {
+ // Notifies listeners if all links are established
+ updateBandwidthUsage(newConnectivity);
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, newConnectivity.id()));
+ } else if (!oldConnectivity.isAllRealizingLinkNotEstablished() &&
+ newConnectivity.isAllRealizingLinkNotEstablished()) {
+ // Notifies listeners if all links are gone
+ releaseBandwidthUsage(newConnectivity);
+ post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, newConnectivity.id()));
+ }
+
+ break;
+ default:
+ break;
+ }
+ }
+
+ }
}