Made OpticalPathProvisioner to store connectivity data in distributed store. (ONOS-4518)

Change-Id: I7f9ef02cab4aa1848c8926d2e88478e035076c99
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
index f64e579..906fe33 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
@@ -17,7 +17,6 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.felix.scr.annotations.Activate;
@@ -27,8 +26,8 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Bandwidth;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.ListenerTracker;
@@ -66,20 +65,29 @@
 import org.onosproject.net.topology.LinkWeight;
 import org.onosproject.net.topology.PathService;
 import org.onosproject.net.topology.TopologyEdge;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -99,6 +107,9 @@
     protected static final Logger log = LoggerFactory.getLogger(OpticalPathProvisioner.class);
 
     private static final String OPTICAL_CONNECTIVITY_ID_COUNTER = "optical-connectivity-id";
+    private static final String LINKPATH_MAP_NAME = "newoptical-linkpath";
+    private static final String CONNECTIVITY_MAP_NAME = "newoptical-connectivity";
+    private static final String CROSSCONNECTLINK_SET_NAME = "newoptical-crossconnectlink";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected IntentService intentService;
@@ -137,14 +148,28 @@
 
     private ListenerTracker listeners;
 
-    private Map<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap = new ConcurrentHashMap<>();
+    private InternalStoreListener storeListener = new InternalStoreListener();
 
-    // TODO this should be stored to distributed store
-    private Map<OpticalConnectivityId, OpticalConnectivity> connectivities = new ConcurrentHashMap<>();
+    private ConsistentMap<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap;
 
-    // TODO this should be stored to distributed store
+    private ConsistentMap<OpticalConnectivityId, OpticalConnectivity> connectivityMap;
+
     // Map of cross connect link and installed path which uses the link
-    private Set<Link> usedCrossConnectLinks = Sets.newConcurrentHashSet();
+    private DistributedSet<Link> usedCrossConnectLinkSet;
+
+    private static final KryoNamespace.Builder LINKPATH_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(PacketLinkRealizedByOptical.class)
+            .register(OpticalConnectivityId.class)
+            .register(OpticalConnectivity.class);
+
+    private static final KryoNamespace.Builder CONNECTIVITY_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(OpticalConnectivityId.class)
+            .register(OpticalConnectivity.class);
+
+    private static final KryoNamespace.Builder CROSSCONNECTLINKS_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API);
 
     @Activate
     protected void activate() {
@@ -153,17 +178,39 @@
 
         idCounter = storageService.getAtomicCounter(OPTICAL_CONNECTIVITY_ID_COUNTER);
 
+        linkPathMap = storageService.<PacketLinkRealizedByOptical, OpticalConnectivity>consistentMapBuilder()
+                .withSerializer(Serializer.using(LINKPATH_SERIALIZER.build()))
+                .withName(LINKPATH_MAP_NAME)
+                .withApplicationId(appId)
+                .build();
+
+        connectivityMap = storageService.<OpticalConnectivityId, OpticalConnectivity>consistentMapBuilder()
+                .withSerializer(Serializer.using(CONNECTIVITY_SERIALIZER.build()))
+                .withName(CONNECTIVITY_MAP_NAME)
+                .withApplicationId(appId)
+                .build();
+
+        usedCrossConnectLinkSet = storageService.<Link>setBuilder()
+                .withSerializer(Serializer.using(CROSSCONNECTLINKS_SERIALIZER.build()))
+                .withName(CROSSCONNECTLINK_SET_NAME)
+                .withApplicationId(appId)
+                .build()
+                .asDistributedSet();
+
         eventDispatcher.addSink(OpticalPathEvent.class, listenerRegistry);
 
         listeners = new ListenerTracker();
         listeners.addListener(linkService, new InternalLinkListener())
                 .addListener(intentService, new InternalIntentListener());
 
+        linkPathMap.addListener(storeListener);
+
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        linkPathMap.removeListener(storeListener);
         listeners.removeListeners();
         eventDispatcher.removeSink(OpticalPathEvent.class);
 
@@ -205,22 +252,67 @@
         checkNotNull(path);
         log.info("setupPath({}, {}, {})", path, bandwidth, latency);
 
-        // validate optical path
-        List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = getCrossConnectPoints(path);
-        if (!checkXcPoints(xcPointPairs)) {
-            // Can't setup path if cross connect points are mismatched
-            log.error("Failed to setup path because of mismatched cross connect points.");
-            return null;
+        // map of cross connect points (optical port -> packet port)
+        Map<ConnectPoint, ConnectPoint> crossConnectPointMap = new HashMap<>();
+
+        // list of (src, dst) pair of optical ports between which optical path should be installed
+        List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints = new ArrayList<>();
+
+        // Scan path to find pairs of connect points between which optical intent is installed
+        // opticalSrcPort works as a flag parameter to show scanning status
+        ConnectPoint opticalSrcPort = null;
+        for (Link link : path.links()) {
+            if (!isCrossConnectLink(link)) {
+                continue;
+            }
+
+            if (opticalSrcPort != null) {
+                // opticalSrcPort!=null means src port was already found
+                // in this case link.src() is optical layer, and link.dst() is packet layer
+
+                // Check if types of src port and dst port matches
+                Device srcDevice = checkNotNull(deviceService.getDevice(opticalSrcPort.deviceId()),
+                        "Unknown device ID");
+                Device dstDevice = checkNotNull(deviceService.getDevice(link.src().deviceId()),
+                        "Unknown device ID");
+                if (srcDevice.type() != dstDevice.type()) {
+                    log.error("Unsupported mix of cross connect points : {}, {}",
+                            srcDevice.type(), dstDevice.type());
+                    return null;
+                }
+
+                // Update cross connect points map
+                crossConnectPointMap.put(link.src(), link.dst());
+
+                // Add optical ports pair to list
+                crossConnectPoints.add(Pair.of(opticalSrcPort, link.src()));
+
+                // Reset flag parameter
+                opticalSrcPort = null;
+            } else {
+                // opticalSrcPort==null means src port was not found yet
+                // in this case link.src() is packet layer, and link.dst() is optical layer
+
+                // Update cross connect points map
+                crossConnectPointMap.put(link.dst(), link.src());
+                // Set opticalSrcPort to src of link (optical port)
+                opticalSrcPort = link.dst();
+            }
         }
 
-        OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency);
+        // create intents from cross connect points
+        List<Intent> intents = createIntents(crossConnectPoints);
 
-        // create intents from cross connect points and set connectivity information
-        List<Intent> intents = createIntents(xcPointPairs, connectivity);
+        // create set of PacketLinkRealizedByOptical
+        Set<PacketLinkRealizedByOptical> packetLinks = createPacketLinkSet(crossConnectPoints,
+                intents, crossConnectPointMap);
+
+        // create OpticalConnectivity object and store information to distributed store
+        OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency, packetLinks);
 
         // store cross connect port usage
         path.links().stream().filter(this::isCrossConnectLink)
-                .forEach(usedCrossConnectLinks::add);
+                .forEach(usedCrossConnectLinkSet::add);
 
         // Submit the intents
         for (Intent i : intents) {
@@ -231,12 +323,16 @@
         return connectivity.id();
     }
 
-    private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency) {
+    private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency,
+                                                   Set<PacketLinkRealizedByOptical> links) {
         OpticalConnectivityId id = OpticalConnectivityId.of(idCounter.getAndIncrement());
-        OpticalConnectivity connectivity = new OpticalConnectivity(id, path, bandwidth, latency);
+        OpticalConnectivity connectivity = new OpticalConnectivity(id, path.links(), bandwidth, latency,
+                links, Collections.emptySet());
+
+        links.forEach(l -> linkPathMap.put(l, connectivity));
 
         // store connectivity information
-        connectivities.put(connectivity.id(), connectivity);
+        connectivityMap.put(connectivity.id(), connectivity);
 
         return connectivity;
     }
@@ -244,7 +340,7 @@
     @Override
     public boolean removeConnectivity(OpticalConnectivityId id) {
         log.info("removeConnectivity({})", id);
-        OpticalConnectivity connectivity = connectivities.remove(id);
+        Versioned<OpticalConnectivity> connectivity = connectivityMap.remove(id);
 
         if (connectivity == null) {
             log.info("OpticalConnectivity with id {} not found.", id);
@@ -252,7 +348,7 @@
         }
 
         // TODO withdraw intent only if all of connectivities that use the optical path are withdrawn
-        connectivity.getRealizingLinks().forEach(l -> {
+        connectivity.value().getRealizingLinks().forEach(l -> {
             Intent intent = intentService.getIntent(l.realizingIntentKey());
             intentService.withdraw(intent);
         });
@@ -262,98 +358,34 @@
 
     @Override
     public Optional<List<Link>> getPath(OpticalConnectivityId id) {
-        OpticalConnectivity connectivity = connectivities.get(id);
+        Versioned<OpticalConnectivity> connectivity = connectivityMap.get(id);
         if (connectivity == null) {
             log.info("OpticalConnectivity with id {} not found.", id);
             return Optional.empty();
         }
 
-        return Optional.of(ImmutableList.copyOf(connectivity.links()));
-    }
-
-    /**
-     * Returns list of (optical, packet) pairs of cross connection points of missing optical path sections.
-     *
-     * Scans the given multi-layer path and looks for sections that use cross connect links.
-     * The ingress and egress points in the optical layer are combined to the packet layer ports, and
-     * are returned in a list.
-     *
-     * @param path the multi-layer path
-     * @return List of cross connect link's (packet port, optical port) pairs
-     */
-    private List<Pair<ConnectPoint, ConnectPoint>> getCrossConnectPoints(Path path) {
-        List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = new LinkedList<>();
-        boolean scanning = false;
-
-        for (Link link : path.links()) {
-            if (!isCrossConnectLink(link)) {
-                continue;
-            }
-
-            if (scanning) {
-                // link.src() is packet, link.dst() is optical
-                xcPointPairs.add(Pair.of(checkNotNull(link.src()), checkNotNull(link.dst())));
-                scanning = false;
-            } else {
-                // link.src() is optical, link.dst() is packet
-                xcPointPairs.add(Pair.of(checkNotNull(link.dst()), checkNotNull(link.src())));
-                scanning = true;
-            }
-        }
-
-        return xcPointPairs;
-    }
-
-    /**
-     * Checks if optical cross connect points are of same type.
-     *
-     * @param xcPointPairs list of cross connection points
-     * @return true if cross connect point pairs are of same type, false otherwise
-     */
-    private boolean checkXcPoints(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs) {
-        checkArgument(xcPointPairs.size() % 2 == 0);
-
-        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
-
-        while (itr.hasNext()) {
-            // checkArgument at start ensures we'll always have pairs of connect points
-            Pair<ConnectPoint, ConnectPoint> src = itr.next();
-            Pair<ConnectPoint, ConnectPoint> dst = itr.next();
-
-            Device.Type srcType = deviceService.getDevice(src.getKey().deviceId()).type();
-            Device.Type dstType = deviceService.getDevice(dst.getKey().deviceId()).type();
-
-            // Only support connections between identical port types
-            if (srcType != dstType) {
-                log.warn("Unsupported mix of cross connect points : {}, {}", srcType, dstType);
-                return false;
-            }
-        }
-
-        return true;
+        return Optional.of(ImmutableList.copyOf(connectivity.value().links()));
     }
 
     /**
      * Scans the list of cross connection points and returns a list of optical connectivity intents.
-     * During the process, store intent ID and its realizing link information to given connectivity object.
+     * During the process, save information about packet links to given set.
      *
-     * @param xcPointPairs list of cross connection points
+     * @param crossConnectPoints list of (src, dst) pair between which optical path will be set up
      * @return list of optical connectivity intents
      */
-    private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs,
-                                       OpticalConnectivity connectivity) {
-        checkArgument(xcPointPairs.size() % 2 == 0);
-
+    private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints) {
         List<Intent> intents = new LinkedList<>();
-        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
+        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = crossConnectPoints.iterator();
 
         while (itr.hasNext()) {
             // checkArgument at start ensures we'll always have pairs of connect points
-            Pair<ConnectPoint, ConnectPoint> src = itr.next();
-            Pair<ConnectPoint, ConnectPoint> dst = itr.next();
+            Pair<ConnectPoint, ConnectPoint> next = itr.next();
+            ConnectPoint src = next.getLeft();
+            ConnectPoint dst = next.getRight();
 
-            Port srcPort = deviceService.getPort(src.getKey().deviceId(), src.getKey().port());
-            Port dstPort = deviceService.getPort(dst.getKey().deviceId(), dst.getKey().port());
+            Port srcPort = deviceService.getPort(src.deviceId(), src.port());
+            Port dstPort = deviceService.getPort(dst.deviceId(), dst.port());
 
             if (srcPort instanceof OduCltPort && dstPort instanceof OduCltPort) {
                 OduCltPort srcOCPort = (OduCltPort) srcPort;
@@ -365,16 +397,12 @@
                 // Create OTN circuit
                 OpticalCircuitIntent circuitIntent = OpticalCircuitIntent.builder()
                         .appId(appId)
-                        .src(src.getKey())
-                        .dst(dst.getKey())
+                        .src(src)
+                        .dst(dst)
                         .signalType(srcOCPort.signalType())
                         .bidirectional(true)
                         .build();
                 intents.add(circuitIntent);
-                PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
-                        circuitIntent);
-                connectivity.addRealizingLink(pLink);
-                linkPathMap.put(pLink, connectivity);
             } else if (srcPort instanceof OchPort && dstPort instanceof OchPort) {
                 OchPort srcOchPort = (OchPort) srcPort;
                 OchPort dstOchPort = (OchPort) dstPort;
@@ -385,16 +413,12 @@
                 // Create lightpath
                 OpticalConnectivityIntent opticalIntent = OpticalConnectivityIntent.builder()
                         .appId(appId)
-                        .src(src.getKey())
-                        .dst(dst.getKey())
+                        .src(src)
+                        .dst(dst)
                         .signalType(srcOchPort.signalType())
                         .bidirectional(true)
                         .build();
                 intents.add(opticalIntent);
-                PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
-                        opticalIntent);
-                connectivity.addRealizingLink(pLink);
-                linkPathMap.put(pLink, connectivity);
             } else {
                 log.warn("Unsupported cross connect point types {} {}", srcPort.type(), dstPort.type());
                 return Collections.emptyList();
@@ -404,6 +428,36 @@
         return intents;
     }
 
+    private Set<PacketLinkRealizedByOptical> createPacketLinkSet(List<Pair<ConnectPoint, ConnectPoint>> connectPoints,
+                                                                 List<Intent> intents,
+                                                                 Map<ConnectPoint, ConnectPoint> crossConnectPoints) {
+        checkArgument(connectPoints.size() == intents.size());
+
+        Set<PacketLinkRealizedByOptical> pLinks = new HashSet<>();
+
+        Iterator<Pair<ConnectPoint, ConnectPoint>> xcPointsItr = connectPoints.iterator();
+        Iterator<Intent> intentItr = intents.iterator();
+        while (xcPointsItr.hasNext()) {
+            Pair<ConnectPoint, ConnectPoint> xcPoints = xcPointsItr.next();
+            Intent intent = intentItr.next();
+
+            ConnectPoint packetSrc = checkNotNull(crossConnectPoints.get(xcPoints.getLeft()));
+            ConnectPoint packetDst = checkNotNull(crossConnectPoints.get(xcPoints.getRight()));
+
+            if (intent instanceof OpticalConnectivityIntent) {
+                pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+                        (OpticalConnectivityIntent) intent));
+            } else if (intent instanceof OpticalCircuitIntent) {
+                pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+                        (OpticalCircuitIntent) intent));
+            } else {
+                log.warn("Unexpected intent type: {}", intent.getClass());
+            }
+        }
+
+        return pLinks;
+    }
+
     /**
      * Verifies if given device type is in packet layer, i.e., ROADM, OTN or ROADM_OTN device.
      *
@@ -444,17 +498,14 @@
 
     /**
      * Updates bandwidth resource of given connect point.
+     *
      * @param cp Connect point
      * @param bandwidth New bandwidth
      */
     private void updatePortBandwidth(ConnectPoint cp, Bandwidth bandwidth) {
-        NodeId localNode = clusterService.getLocalNode().id();
-        NodeId sourceMaster = mastershipService.getMasterFor(cp.deviceId());
-        if (localNode.equals(sourceMaster)) {
-            log.debug("update Port {} Bandwidth {}", cp, bandwidth);
-            BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
-            bwCapacity.capacity(bandwidth).apply();
-        }
+        log.debug("update Port {} Bandwidth {}", cp, bandwidth);
+        BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
+        bwCapacity.capacity(bandwidth).apply();
     }
 
     /**
@@ -487,15 +538,22 @@
      * @param connectivity Optical connectivity
      */
     private void releaseBandwidthUsage(OpticalConnectivity connectivity) {
-        OpticalConnectivityId connectivityId = connectivity.id();
-
-        log.debug("releasing bandwidth allocated to {}", connectivityId);
-        if (!resourceService.release(connectivityId)) {
-            log.warn("Failed to release bandwidth allocated to {}",
-                    connectivityId);
-            // TODO any recovery?
+        if (connectivity.links().isEmpty()) {
+            return;
         }
-        log.debug("DONE releasing bandwidth for {}", connectivityId);
+
+        // release resource only if this node is the master for link head device
+        if (mastershipService.isLocalMaster(connectivity.links().get(0).src().deviceId())) {
+            OpticalConnectivityId connectivityId = connectivity.id();
+
+            log.debug("releasing bandwidth allocated to {}", connectivityId);
+            if (!resourceService.release(connectivityId)) {
+                log.warn("Failed to release bandwidth allocated to {}",
+                        connectivityId);
+                // TODO any recovery?
+            }
+            log.debug("DONE releasing bandwidth for {}", connectivityId);
+        }
     }
 
     private class BandwidthLinkWeight implements LinkWeight {
@@ -515,7 +573,7 @@
             }
 
             // Avoid cross connect links with used ports
-            if (isCrossConnectLink(l) && usedCrossConnectLinks.contains(l)) {
+            if (isCrossConnectLink(l) && usedCrossConnectLinkSet.contains(l)) {
                 return -1.0;
             }
 
@@ -592,18 +650,16 @@
                         ConnectPoint packetSrc = e.getKey().src();
                         ConnectPoint packetDst = e.getKey().dst();
                         Bandwidth bw = e.getKey().bandwidth();
-                        // Updates bandwidth of packet ports
-                        updatePortBandwidth(packetSrc, bw);
-                        updatePortBandwidth(packetDst, bw);
 
-                        OpticalConnectivity connectivity = e.getValue();
-                        connectivity.setLinkEstablished(packetSrc, packetDst);
+                        // reflect modification only if packetSrc is local_
+                        if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+                            // Updates bandwidth of packet ports
+                            updatePortBandwidth(packetSrc, bw);
+                            updatePortBandwidth(packetDst, bw);
 
-                        if (e.getValue().isAllRealizingLinkEstablished()) {
-                            updateBandwidthUsage(connectivity);
-
-                            // Notifies listeners if all links are established
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, e.getValue().id()));
+                            // Updates link status in distributed map
+                            linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+                                    e.getValue().value().setLinkEstablished(packetSrc, packetDst, true));
                         }
                     });
         }
@@ -633,23 +689,23 @@
                     .forEach(e -> {
                         ConnectPoint packetSrc = e.getKey().src();
                         ConnectPoint packetDst = e.getKey().dst();
-                        // Updates bandwidth of packet ports
-                        updatePortBandwidth(packetSrc, bw);
-                        updatePortBandwidth(packetDst, bw);
-                        OpticalConnectivity connectivity = e.getValue();
-                        connectivity.setLinkRemoved(packetSrc, packetDst);
 
-                        // Notifies listeners if all links are gone
-                        if (e.getValue().isAllRealizingLinkNotEstablished()) {
-                            releaseBandwidthUsage(connectivity);
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, e.getValue().id()));
+                        // reflect modification only if packetSrc is local_
+                        if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+                            // Updates bandwidth of packet ports
+                            updatePortBandwidth(packetSrc, bw);
+                            updatePortBandwidth(packetDst, bw);
+
+                            // Updates link status in distributed map
+                            linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+                                    e.getValue().value().setLinkEstablished(packetSrc, packetDst, false));
                         }
                     });
         }
 
         private void removeXcLinkUsage(ConnectPoint cp) {
             Optional<Link> link = linkService.getLinks(cp).stream()
-                    .filter(usedCrossConnectLinks::contains)
+                    .filter(usedCrossConnectLinkSet::contains)
                     .findAny();
 
             if (!link.isPresent()) {
@@ -657,7 +713,7 @@
                 return;
             }
 
-            usedCrossConnectLinks.remove(link.get());
+            usedCrossConnectLinkSet.remove(link.get());
         }
     }
 
@@ -669,22 +725,60 @@
             switch (event.type()) {
                 case LINK_REMOVED:
                     Link link = event.subject();
+                    // updates linkPathMap only if src device of link is local
+                    if (!mastershipService.isLocalMaster(link.src().deviceId())) {
+                        return;
+                    }
+
+                    // find all packet links that correspond to removed link
                     Set<PacketLinkRealizedByOptical> pLinks = linkPathMap.keySet().stream()
                             .filter(l -> l.isBetween(link.src(), link.dst()) || l.isBetween(link.dst(), link.src()))
                             .collect(Collectors.toSet());
 
                     pLinks.forEach(l -> {
-                        OpticalConnectivity c = linkPathMap.get(l);
-                        // Notifies listeners if all links are gone
-                        if (c.isAllRealizingLinkNotEstablished()) {
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, c.id()));
-                        }
-                        linkPathMap.remove(l);
+                        // remove found packet links from distributed store
+                        linkPathMap.computeIfPresent(l, (plink, conn) -> {
+                            // Notifies listeners if all packet links are gone
+                            if (conn.isAllRealizingLinkNotEstablished()) {
+                                post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, conn.id()));
+                            }
+                            return null;
+                        });
                     });
                 default:
                     break;
             }
         }
     }
+
+    private class InternalStoreListener
+            implements MapEventListener<PacketLinkRealizedByOptical, OpticalConnectivity> {
+
+        @Override
+        public void event(MapEvent<PacketLinkRealizedByOptical, OpticalConnectivity> event) {
+            switch (event.type()) {
+                case UPDATE:
+                    OpticalConnectivity oldConnectivity = event.oldValue().value();
+                    OpticalConnectivity newConnectivity = event.newValue().value();
+
+                    if (!oldConnectivity.isAllRealizingLinkEstablished() &&
+                            newConnectivity.isAllRealizingLinkEstablished()) {
+                        // Notifies listeners if all links are established
+                        updateBandwidthUsage(newConnectivity);
+                        post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, newConnectivity.id()));
+                    } else if (!oldConnectivity.isAllRealizingLinkNotEstablished() &&
+                            newConnectivity.isAllRealizingLinkNotEstablished()) {
+                        // Notifies listeners if all links are gone
+                        releaseBandwidthUsage(newConnectivity);
+                        post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, newConnectivity.id()));
+                    }
+
+                    break;
+                default:
+                    break;
+            }
+        }
+
+    }
 }