Made OpticalPathProvisioner to store connectivity data in distributed store. (ONOS-4518)

Change-Id: I7f9ef02cab4aa1848c8926d2e88478e035076c99
diff --git a/apps/newoptical/BUCK b/apps/newoptical/BUCK
index fd80f40..db15c64 100644
--- a/apps/newoptical/BUCK
+++ b/apps/newoptical/BUCK
@@ -1,5 +1,6 @@
 COMPILE_DEPS = [
   '//lib:CORE_DEPS',
+  '//core/store/serializers:onos-core-serializers',
   '//lib:org.apache.karaf.shell.console',
   '//cli:onos-cli',
 ]
diff --git a/apps/newoptical/pom.xml b/apps/newoptical/pom.xml
index af0f841..13821f7 100644
--- a/apps/newoptical/pom.xml
+++ b/apps/newoptical/pom.xml
@@ -89,6 +89,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava-testlib</artifactId>
             <scope>test</scope>
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
index cd5a2cd..b67564c 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalConnectivity.java
@@ -22,14 +22,14 @@
 import org.onosproject.newoptical.api.OpticalConnectivityId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Link;
-import org.onosproject.net.Path;
 
 import java.time.Duration;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Entity to store optical connectivity request and related information.
@@ -42,48 +42,48 @@
     private final Bandwidth requestBandwidth;
     private final Duration requestLatency;
 
-    // Bandwidth capacity of optical layer
-    private Bandwidth opticalCapacity;
+    /**
+     * Set of packet link that is not yet established.
+     * Packet links in this set are expected to be discovered after underlying (optical) path has been provisioned.
+     */
+    private final ImmutableSet<PacketLinkRealizedByOptical> unestablishedLinks;
 
-    private final Set<PacketLinkRealizedByOptical> realizingLinks = new HashSet<>();
+    /**
+     * Set of packet link that is already established.
+     */
+    private final ImmutableSet<PacketLinkRealizedByOptical> establishedLinks;
 
-    private State state = State.CREATED;
-
-    public enum State {
-        CREATED,
-        INSTALLING,
-        INSTALLED,
-        WITHDRAWING,
-        WITHDRAWN,
-        FAILED
-    }
-
-    public OpticalConnectivity(OpticalConnectivityId id, Path path, Bandwidth requestBandwidth,
-                               Duration requestLatency) {
+    public OpticalConnectivity(OpticalConnectivityId id,
+                               List<Link> links,
+                               Bandwidth requestBandwidth,
+                               Duration requestLatency,
+                               Set<PacketLinkRealizedByOptical> unestablishedLinks,
+                               Set<PacketLinkRealizedByOptical> establishedLinks) {
         this.id = id;
-        this.links = ImmutableList.copyOf(path.links());
+        this.links = ImmutableList.copyOf(links);
         this.requestBandwidth = requestBandwidth;
         this.requestLatency = requestLatency;
+        this.unestablishedLinks = ImmutableSet.copyOf(unestablishedLinks);
+        this.establishedLinks = ImmutableSet.copyOf(establishedLinks);
     }
 
-    public void setLinkEstablished(ConnectPoint src, ConnectPoint dst) {
-        realizingLinks.stream().filter(l -> l.isBetween(src, dst))
-                .findAny()
-                .ifPresent(l -> l.setEstablished(true));
-    }
-
-    public void setLinkRemoved(ConnectPoint src, ConnectPoint dst) {
-        realizingLinks.stream().filter(l -> l.isBetween(src, dst))
-                .findAny()
-                .ifPresent(l -> l.setEstablished(false));
+    private OpticalConnectivity(OpticalConnectivity connectivity) {
+        this.id = connectivity.id;
+        this.links = ImmutableList.copyOf(connectivity.links);
+        this.requestBandwidth = connectivity.requestBandwidth;
+        this.requestLatency = connectivity.requestLatency;
+        this.unestablishedLinks = ImmutableSet.copyOf(connectivity.unestablishedLinks);
+        this.establishedLinks = ImmutableSet.copyOf(connectivity.establishedLinks);
     }
 
     public boolean isAllRealizingLinkEstablished() {
-        return realizingLinks.stream().allMatch(PacketLinkRealizedByOptical::isEstablished);
+        // Check if all links are established
+        return unestablishedLinks.isEmpty();
     }
 
     public boolean isAllRealizingLinkNotEstablished() {
-        return !realizingLinks.stream().anyMatch(PacketLinkRealizedByOptical::isEstablished);
+        // Check if any link is not established
+        return establishedLinks.isEmpty();
     }
 
     public OpticalConnectivityId id() {
@@ -102,59 +102,64 @@
         return requestLatency;
     }
 
-    public State state() {
-        return state;
+    public Set<PacketLinkRealizedByOptical> getEstablishedLinks() {
+        return establishedLinks;
     }
 
-    public boolean state(State state) {
-        boolean valid = true;
-        // reject invalid state transition
-        switch (this.state) {
-            case CREATED:
-                valid = (state == State.INSTALLING || state == State.FAILED);
-                break;
-            case INSTALLING:
-                valid = (state == State.INSTALLED || state == State.FAILED);
-                break;
-            case INSTALLED:
-                valid = (state == State.WITHDRAWING || state == State.FAILED);
-                break;
-            case WITHDRAWING:
-                valid = (state == State.WITHDRAWN || state == State.FAILED);
-                break;
-            case FAILED:
-                valid = (state == State.INSTALLING || state == State.WITHDRAWING || state == State.FAILED);
-                break;
-            default:
-                break;
+    public Set<PacketLinkRealizedByOptical> getUnestablishedLinks() {
+        return unestablishedLinks;
+    }
+
+    public OpticalConnectivity setLinkEstablished(ConnectPoint src,
+                                                  ConnectPoint dst,
+                                                  boolean established) {
+        Set<PacketLinkRealizedByOptical> newEstablishedLinks;
+        Set<PacketLinkRealizedByOptical> newUnestablishedLinks;
+
+        if (established) {
+            // move PacketLink from unestablished set to established set
+            Optional<PacketLinkRealizedByOptical> link = this.unestablishedLinks.stream()
+                    .filter(l -> l.isBetween(src, dst)).findAny();
+            checkState(link.isPresent());
+
+            newUnestablishedLinks = this.unestablishedLinks.stream()
+                    .filter(l -> !l.isBetween(src, dst))
+                    .collect(Collectors.toSet());
+            newEstablishedLinks = ImmutableSet.<PacketLinkRealizedByOptical>builder()
+                    .addAll(this.establishedLinks)
+                    .add(link.get())
+                    .build();
+        } else {
+            // move PacketLink from established set to unestablished set
+            Optional<PacketLinkRealizedByOptical> link = this.establishedLinks.stream()
+                    .filter(l -> l.isBetween(src, dst)).findAny();
+            checkState(link.isPresent());
+
+            newEstablishedLinks = this.establishedLinks.stream()
+                    .filter(l -> !l.isBetween(src, dst))
+                    .collect(Collectors.toSet());
+            newUnestablishedLinks = ImmutableSet.<PacketLinkRealizedByOptical>builder()
+                    .addAll(this.unestablishedLinks)
+                    .add(link.get())
+                    .build();
         }
 
-        if (valid) {
-            this.state = state;
-        }
-
-        return valid;
-    }
-
-    public Bandwidth getOpticalCapacity() {
-        return opticalCapacity;
-    }
-
-    public void setOpticalCapacity(Bandwidth opticalCapacity) {
-        this.opticalCapacity = opticalCapacity;
-    }
-
-    public void addRealizingLink(PacketLinkRealizedByOptical link) {
-        checkNotNull(link);
-        realizingLinks.add(link);
-    }
-
-    public void removeRealizingLink(PacketLinkRealizedByOptical link) {
-        checkNotNull(link);
-        realizingLinks.remove(link);
+        return new OpticalConnectivity(this.id,
+                this.links,
+                this.requestBandwidth,
+                this.requestLatency,
+                newUnestablishedLinks,
+                newEstablishedLinks);
     }
 
     public Set<PacketLinkRealizedByOptical> getRealizingLinks() {
-        return ImmutableSet.copyOf(realizingLinks);
+        return ImmutableSet.<PacketLinkRealizedByOptical>builder()
+                .addAll(unestablishedLinks)
+                .addAll(establishedLinks)
+                .build();
+    }
+
+    public static OpticalConnectivity copyOf(OpticalConnectivity connectivity) {
+        return new OpticalConnectivity(connectivity);
     }
 }
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
index f64e579..906fe33 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/OpticalPathProvisioner.java
@@ -17,7 +17,6 @@
 
 import com.google.common.annotations.Beta;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.felix.scr.annotations.Activate;
@@ -27,8 +26,8 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Bandwidth;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.ListenerTracker;
@@ -66,20 +65,29 @@
 import org.onosproject.net.topology.LinkWeight;
 import org.onosproject.net.topology.PathService;
 import org.onosproject.net.topology.TopologyEdge;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -99,6 +107,9 @@
     protected static final Logger log = LoggerFactory.getLogger(OpticalPathProvisioner.class);
 
     private static final String OPTICAL_CONNECTIVITY_ID_COUNTER = "optical-connectivity-id";
+    private static final String LINKPATH_MAP_NAME = "newoptical-linkpath";
+    private static final String CONNECTIVITY_MAP_NAME = "newoptical-connectivity";
+    private static final String CROSSCONNECTLINK_SET_NAME = "newoptical-crossconnectlink";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected IntentService intentService;
@@ -137,14 +148,28 @@
 
     private ListenerTracker listeners;
 
-    private Map<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap = new ConcurrentHashMap<>();
+    private InternalStoreListener storeListener = new InternalStoreListener();
 
-    // TODO this should be stored to distributed store
-    private Map<OpticalConnectivityId, OpticalConnectivity> connectivities = new ConcurrentHashMap<>();
+    private ConsistentMap<PacketLinkRealizedByOptical, OpticalConnectivity> linkPathMap;
 
-    // TODO this should be stored to distributed store
+    private ConsistentMap<OpticalConnectivityId, OpticalConnectivity> connectivityMap;
+
     // Map of cross connect link and installed path which uses the link
-    private Set<Link> usedCrossConnectLinks = Sets.newConcurrentHashSet();
+    private DistributedSet<Link> usedCrossConnectLinkSet;
+
+    private static final KryoNamespace.Builder LINKPATH_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(PacketLinkRealizedByOptical.class)
+            .register(OpticalConnectivityId.class)
+            .register(OpticalConnectivity.class);
+
+    private static final KryoNamespace.Builder CONNECTIVITY_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(OpticalConnectivityId.class)
+            .register(OpticalConnectivity.class);
+
+    private static final KryoNamespace.Builder CROSSCONNECTLINKS_SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API);
 
     @Activate
     protected void activate() {
@@ -153,17 +178,39 @@
 
         idCounter = storageService.getAtomicCounter(OPTICAL_CONNECTIVITY_ID_COUNTER);
 
+        linkPathMap = storageService.<PacketLinkRealizedByOptical, OpticalConnectivity>consistentMapBuilder()
+                .withSerializer(Serializer.using(LINKPATH_SERIALIZER.build()))
+                .withName(LINKPATH_MAP_NAME)
+                .withApplicationId(appId)
+                .build();
+
+        connectivityMap = storageService.<OpticalConnectivityId, OpticalConnectivity>consistentMapBuilder()
+                .withSerializer(Serializer.using(CONNECTIVITY_SERIALIZER.build()))
+                .withName(CONNECTIVITY_MAP_NAME)
+                .withApplicationId(appId)
+                .build();
+
+        usedCrossConnectLinkSet = storageService.<Link>setBuilder()
+                .withSerializer(Serializer.using(CROSSCONNECTLINKS_SERIALIZER.build()))
+                .withName(CROSSCONNECTLINK_SET_NAME)
+                .withApplicationId(appId)
+                .build()
+                .asDistributedSet();
+
         eventDispatcher.addSink(OpticalPathEvent.class, listenerRegistry);
 
         listeners = new ListenerTracker();
         listeners.addListener(linkService, new InternalLinkListener())
                 .addListener(intentService, new InternalIntentListener());
 
+        linkPathMap.addListener(storeListener);
+
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        linkPathMap.removeListener(storeListener);
         listeners.removeListeners();
         eventDispatcher.removeSink(OpticalPathEvent.class);
 
@@ -205,22 +252,67 @@
         checkNotNull(path);
         log.info("setupPath({}, {}, {})", path, bandwidth, latency);
 
-        // validate optical path
-        List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = getCrossConnectPoints(path);
-        if (!checkXcPoints(xcPointPairs)) {
-            // Can't setup path if cross connect points are mismatched
-            log.error("Failed to setup path because of mismatched cross connect points.");
-            return null;
+        // map of cross connect points (optical port -> packet port)
+        Map<ConnectPoint, ConnectPoint> crossConnectPointMap = new HashMap<>();
+
+        // list of (src, dst) pair of optical ports between which optical path should be installed
+        List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints = new ArrayList<>();
+
+        // Scan path to find pairs of connect points between which optical intent is installed
+        // opticalSrcPort works as a flag parameter to show scanning status
+        ConnectPoint opticalSrcPort = null;
+        for (Link link : path.links()) {
+            if (!isCrossConnectLink(link)) {
+                continue;
+            }
+
+            if (opticalSrcPort != null) {
+                // opticalSrcPort!=null means src port was already found
+                // in this case link.src() is optical layer, and link.dst() is packet layer
+
+                // Check if types of src port and dst port matches
+                Device srcDevice = checkNotNull(deviceService.getDevice(opticalSrcPort.deviceId()),
+                        "Unknown device ID");
+                Device dstDevice = checkNotNull(deviceService.getDevice(link.src().deviceId()),
+                        "Unknown device ID");
+                if (srcDevice.type() != dstDevice.type()) {
+                    log.error("Unsupported mix of cross connect points : {}, {}",
+                            srcDevice.type(), dstDevice.type());
+                    return null;
+                }
+
+                // Update cross connect points map
+                crossConnectPointMap.put(link.src(), link.dst());
+
+                // Add optical ports pair to list
+                crossConnectPoints.add(Pair.of(opticalSrcPort, link.src()));
+
+                // Reset flag parameter
+                opticalSrcPort = null;
+            } else {
+                // opticalSrcPort==null means src port was not found yet
+                // in this case link.src() is packet layer, and link.dst() is optical layer
+
+                // Update cross connect points map
+                crossConnectPointMap.put(link.dst(), link.src());
+                // Set opticalSrcPort to src of link (optical port)
+                opticalSrcPort = link.dst();
+            }
         }
 
-        OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency);
+        // create intents from cross connect points
+        List<Intent> intents = createIntents(crossConnectPoints);
 
-        // create intents from cross connect points and set connectivity information
-        List<Intent> intents = createIntents(xcPointPairs, connectivity);
+        // create set of PacketLinkRealizedByOptical
+        Set<PacketLinkRealizedByOptical> packetLinks = createPacketLinkSet(crossConnectPoints,
+                intents, crossConnectPointMap);
+
+        // create OpticalConnectivity object and store information to distributed store
+        OpticalConnectivity connectivity = createConnectivity(path, bandwidth, latency, packetLinks);
 
         // store cross connect port usage
         path.links().stream().filter(this::isCrossConnectLink)
-                .forEach(usedCrossConnectLinks::add);
+                .forEach(usedCrossConnectLinkSet::add);
 
         // Submit the intents
         for (Intent i : intents) {
@@ -231,12 +323,16 @@
         return connectivity.id();
     }
 
-    private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency) {
+    private OpticalConnectivity createConnectivity(Path path, Bandwidth bandwidth, Duration latency,
+                                                   Set<PacketLinkRealizedByOptical> links) {
         OpticalConnectivityId id = OpticalConnectivityId.of(idCounter.getAndIncrement());
-        OpticalConnectivity connectivity = new OpticalConnectivity(id, path, bandwidth, latency);
+        OpticalConnectivity connectivity = new OpticalConnectivity(id, path.links(), bandwidth, latency,
+                links, Collections.emptySet());
+
+        links.forEach(l -> linkPathMap.put(l, connectivity));
 
         // store connectivity information
-        connectivities.put(connectivity.id(), connectivity);
+        connectivityMap.put(connectivity.id(), connectivity);
 
         return connectivity;
     }
@@ -244,7 +340,7 @@
     @Override
     public boolean removeConnectivity(OpticalConnectivityId id) {
         log.info("removeConnectivity({})", id);
-        OpticalConnectivity connectivity = connectivities.remove(id);
+        Versioned<OpticalConnectivity> connectivity = connectivityMap.remove(id);
 
         if (connectivity == null) {
             log.info("OpticalConnectivity with id {} not found.", id);
@@ -252,7 +348,7 @@
         }
 
         // TODO withdraw intent only if all of connectivities that use the optical path are withdrawn
-        connectivity.getRealizingLinks().forEach(l -> {
+        connectivity.value().getRealizingLinks().forEach(l -> {
             Intent intent = intentService.getIntent(l.realizingIntentKey());
             intentService.withdraw(intent);
         });
@@ -262,98 +358,34 @@
 
     @Override
     public Optional<List<Link>> getPath(OpticalConnectivityId id) {
-        OpticalConnectivity connectivity = connectivities.get(id);
+        Versioned<OpticalConnectivity> connectivity = connectivityMap.get(id);
         if (connectivity == null) {
             log.info("OpticalConnectivity with id {} not found.", id);
             return Optional.empty();
         }
 
-        return Optional.of(ImmutableList.copyOf(connectivity.links()));
-    }
-
-    /**
-     * Returns list of (optical, packet) pairs of cross connection points of missing optical path sections.
-     *
-     * Scans the given multi-layer path and looks for sections that use cross connect links.
-     * The ingress and egress points in the optical layer are combined to the packet layer ports, and
-     * are returned in a list.
-     *
-     * @param path the multi-layer path
-     * @return List of cross connect link's (packet port, optical port) pairs
-     */
-    private List<Pair<ConnectPoint, ConnectPoint>> getCrossConnectPoints(Path path) {
-        List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs = new LinkedList<>();
-        boolean scanning = false;
-
-        for (Link link : path.links()) {
-            if (!isCrossConnectLink(link)) {
-                continue;
-            }
-
-            if (scanning) {
-                // link.src() is packet, link.dst() is optical
-                xcPointPairs.add(Pair.of(checkNotNull(link.src()), checkNotNull(link.dst())));
-                scanning = false;
-            } else {
-                // link.src() is optical, link.dst() is packet
-                xcPointPairs.add(Pair.of(checkNotNull(link.dst()), checkNotNull(link.src())));
-                scanning = true;
-            }
-        }
-
-        return xcPointPairs;
-    }
-
-    /**
-     * Checks if optical cross connect points are of same type.
-     *
-     * @param xcPointPairs list of cross connection points
-     * @return true if cross connect point pairs are of same type, false otherwise
-     */
-    private boolean checkXcPoints(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs) {
-        checkArgument(xcPointPairs.size() % 2 == 0);
-
-        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
-
-        while (itr.hasNext()) {
-            // checkArgument at start ensures we'll always have pairs of connect points
-            Pair<ConnectPoint, ConnectPoint> src = itr.next();
-            Pair<ConnectPoint, ConnectPoint> dst = itr.next();
-
-            Device.Type srcType = deviceService.getDevice(src.getKey().deviceId()).type();
-            Device.Type dstType = deviceService.getDevice(dst.getKey().deviceId()).type();
-
-            // Only support connections between identical port types
-            if (srcType != dstType) {
-                log.warn("Unsupported mix of cross connect points : {}, {}", srcType, dstType);
-                return false;
-            }
-        }
-
-        return true;
+        return Optional.of(ImmutableList.copyOf(connectivity.value().links()));
     }
 
     /**
      * Scans the list of cross connection points and returns a list of optical connectivity intents.
-     * During the process, store intent ID and its realizing link information to given connectivity object.
+     * During the process, save information about packet links to given set.
      *
-     * @param xcPointPairs list of cross connection points
+     * @param crossConnectPoints list of (src, dst) pair between which optical path will be set up
      * @return list of optical connectivity intents
      */
-    private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> xcPointPairs,
-                                       OpticalConnectivity connectivity) {
-        checkArgument(xcPointPairs.size() % 2 == 0);
-
+    private List<Intent> createIntents(List<Pair<ConnectPoint, ConnectPoint>> crossConnectPoints) {
         List<Intent> intents = new LinkedList<>();
-        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = xcPointPairs.iterator();
+        Iterator<Pair<ConnectPoint, ConnectPoint>> itr = crossConnectPoints.iterator();
 
         while (itr.hasNext()) {
             // checkArgument at start ensures we'll always have pairs of connect points
-            Pair<ConnectPoint, ConnectPoint> src = itr.next();
-            Pair<ConnectPoint, ConnectPoint> dst = itr.next();
+            Pair<ConnectPoint, ConnectPoint> next = itr.next();
+            ConnectPoint src = next.getLeft();
+            ConnectPoint dst = next.getRight();
 
-            Port srcPort = deviceService.getPort(src.getKey().deviceId(), src.getKey().port());
-            Port dstPort = deviceService.getPort(dst.getKey().deviceId(), dst.getKey().port());
+            Port srcPort = deviceService.getPort(src.deviceId(), src.port());
+            Port dstPort = deviceService.getPort(dst.deviceId(), dst.port());
 
             if (srcPort instanceof OduCltPort && dstPort instanceof OduCltPort) {
                 OduCltPort srcOCPort = (OduCltPort) srcPort;
@@ -365,16 +397,12 @@
                 // Create OTN circuit
                 OpticalCircuitIntent circuitIntent = OpticalCircuitIntent.builder()
                         .appId(appId)
-                        .src(src.getKey())
-                        .dst(dst.getKey())
+                        .src(src)
+                        .dst(dst)
                         .signalType(srcOCPort.signalType())
                         .bidirectional(true)
                         .build();
                 intents.add(circuitIntent);
-                PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
-                        circuitIntent);
-                connectivity.addRealizingLink(pLink);
-                linkPathMap.put(pLink, connectivity);
             } else if (srcPort instanceof OchPort && dstPort instanceof OchPort) {
                 OchPort srcOchPort = (OchPort) srcPort;
                 OchPort dstOchPort = (OchPort) dstPort;
@@ -385,16 +413,12 @@
                 // Create lightpath
                 OpticalConnectivityIntent opticalIntent = OpticalConnectivityIntent.builder()
                         .appId(appId)
-                        .src(src.getKey())
-                        .dst(dst.getKey())
+                        .src(src)
+                        .dst(dst)
                         .signalType(srcOchPort.signalType())
                         .bidirectional(true)
                         .build();
                 intents.add(opticalIntent);
-                PacketLinkRealizedByOptical pLink = PacketLinkRealizedByOptical.create(src.getValue(), dst.getValue(),
-                        opticalIntent);
-                connectivity.addRealizingLink(pLink);
-                linkPathMap.put(pLink, connectivity);
             } else {
                 log.warn("Unsupported cross connect point types {} {}", srcPort.type(), dstPort.type());
                 return Collections.emptyList();
@@ -404,6 +428,36 @@
         return intents;
     }
 
+    private Set<PacketLinkRealizedByOptical> createPacketLinkSet(List<Pair<ConnectPoint, ConnectPoint>> connectPoints,
+                                                                 List<Intent> intents,
+                                                                 Map<ConnectPoint, ConnectPoint> crossConnectPoints) {
+        checkArgument(connectPoints.size() == intents.size());
+
+        Set<PacketLinkRealizedByOptical> pLinks = new HashSet<>();
+
+        Iterator<Pair<ConnectPoint, ConnectPoint>> xcPointsItr = connectPoints.iterator();
+        Iterator<Intent> intentItr = intents.iterator();
+        while (xcPointsItr.hasNext()) {
+            Pair<ConnectPoint, ConnectPoint> xcPoints = xcPointsItr.next();
+            Intent intent = intentItr.next();
+
+            ConnectPoint packetSrc = checkNotNull(crossConnectPoints.get(xcPoints.getLeft()));
+            ConnectPoint packetDst = checkNotNull(crossConnectPoints.get(xcPoints.getRight()));
+
+            if (intent instanceof OpticalConnectivityIntent) {
+                pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+                        (OpticalConnectivityIntent) intent));
+            } else if (intent instanceof OpticalCircuitIntent) {
+                pLinks.add(PacketLinkRealizedByOptical.create(packetSrc, packetDst,
+                        (OpticalCircuitIntent) intent));
+            } else {
+                log.warn("Unexpected intent type: {}", intent.getClass());
+            }
+        }
+
+        return pLinks;
+    }
+
     /**
      * Verifies if given device type is in packet layer, i.e., ROADM, OTN or ROADM_OTN device.
      *
@@ -444,17 +498,14 @@
 
     /**
      * Updates bandwidth resource of given connect point.
+     *
      * @param cp Connect point
      * @param bandwidth New bandwidth
      */
     private void updatePortBandwidth(ConnectPoint cp, Bandwidth bandwidth) {
-        NodeId localNode = clusterService.getLocalNode().id();
-        NodeId sourceMaster = mastershipService.getMasterFor(cp.deviceId());
-        if (localNode.equals(sourceMaster)) {
-            log.debug("update Port {} Bandwidth {}", cp, bandwidth);
-            BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
-            bwCapacity.capacity(bandwidth).apply();
-        }
+        log.debug("update Port {} Bandwidth {}", cp, bandwidth);
+        BandwidthCapacity bwCapacity = networkConfigService.addConfig(cp, BandwidthCapacity.class);
+        bwCapacity.capacity(bandwidth).apply();
     }
 
     /**
@@ -487,15 +538,22 @@
      * @param connectivity Optical connectivity
      */
     private void releaseBandwidthUsage(OpticalConnectivity connectivity) {
-        OpticalConnectivityId connectivityId = connectivity.id();
-
-        log.debug("releasing bandwidth allocated to {}", connectivityId);
-        if (!resourceService.release(connectivityId)) {
-            log.warn("Failed to release bandwidth allocated to {}",
-                    connectivityId);
-            // TODO any recovery?
+        if (connectivity.links().isEmpty()) {
+            return;
         }
-        log.debug("DONE releasing bandwidth for {}", connectivityId);
+
+        // release resource only if this node is the master for link head device
+        if (mastershipService.isLocalMaster(connectivity.links().get(0).src().deviceId())) {
+            OpticalConnectivityId connectivityId = connectivity.id();
+
+            log.debug("releasing bandwidth allocated to {}", connectivityId);
+            if (!resourceService.release(connectivityId)) {
+                log.warn("Failed to release bandwidth allocated to {}",
+                        connectivityId);
+                // TODO any recovery?
+            }
+            log.debug("DONE releasing bandwidth for {}", connectivityId);
+        }
     }
 
     private class BandwidthLinkWeight implements LinkWeight {
@@ -515,7 +573,7 @@
             }
 
             // Avoid cross connect links with used ports
-            if (isCrossConnectLink(l) && usedCrossConnectLinks.contains(l)) {
+            if (isCrossConnectLink(l) && usedCrossConnectLinkSet.contains(l)) {
                 return -1.0;
             }
 
@@ -592,18 +650,16 @@
                         ConnectPoint packetSrc = e.getKey().src();
                         ConnectPoint packetDst = e.getKey().dst();
                         Bandwidth bw = e.getKey().bandwidth();
-                        // Updates bandwidth of packet ports
-                        updatePortBandwidth(packetSrc, bw);
-                        updatePortBandwidth(packetDst, bw);
 
-                        OpticalConnectivity connectivity = e.getValue();
-                        connectivity.setLinkEstablished(packetSrc, packetDst);
+                        // reflect modification only if packetSrc is local_
+                        if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+                            // Updates bandwidth of packet ports
+                            updatePortBandwidth(packetSrc, bw);
+                            updatePortBandwidth(packetDst, bw);
 
-                        if (e.getValue().isAllRealizingLinkEstablished()) {
-                            updateBandwidthUsage(connectivity);
-
-                            // Notifies listeners if all links are established
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, e.getValue().id()));
+                            // Updates link status in distributed map
+                            linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+                                    e.getValue().value().setLinkEstablished(packetSrc, packetDst, true));
                         }
                     });
         }
@@ -633,23 +689,23 @@
                     .forEach(e -> {
                         ConnectPoint packetSrc = e.getKey().src();
                         ConnectPoint packetDst = e.getKey().dst();
-                        // Updates bandwidth of packet ports
-                        updatePortBandwidth(packetSrc, bw);
-                        updatePortBandwidth(packetDst, bw);
-                        OpticalConnectivity connectivity = e.getValue();
-                        connectivity.setLinkRemoved(packetSrc, packetDst);
 
-                        // Notifies listeners if all links are gone
-                        if (e.getValue().isAllRealizingLinkNotEstablished()) {
-                            releaseBandwidthUsage(connectivity);
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, e.getValue().id()));
+                        // reflect modification only if packetSrc is local_
+                        if (mastershipService.isLocalMaster(packetSrc.deviceId())) {
+                            // Updates bandwidth of packet ports
+                            updatePortBandwidth(packetSrc, bw);
+                            updatePortBandwidth(packetDst, bw);
+
+                            // Updates link status in distributed map
+                            linkPathMap.computeIfPresent(e.getKey(), (link, connectivity) ->
+                                    e.getValue().value().setLinkEstablished(packetSrc, packetDst, false));
                         }
                     });
         }
 
         private void removeXcLinkUsage(ConnectPoint cp) {
             Optional<Link> link = linkService.getLinks(cp).stream()
-                    .filter(usedCrossConnectLinks::contains)
+                    .filter(usedCrossConnectLinkSet::contains)
                     .findAny();
 
             if (!link.isPresent()) {
@@ -657,7 +713,7 @@
                 return;
             }
 
-            usedCrossConnectLinks.remove(link.get());
+            usedCrossConnectLinkSet.remove(link.get());
         }
     }
 
@@ -669,22 +725,60 @@
             switch (event.type()) {
                 case LINK_REMOVED:
                     Link link = event.subject();
+                    // updates linkPathMap only if src device of link is local
+                    if (!mastershipService.isLocalMaster(link.src().deviceId())) {
+                        return;
+                    }
+
+                    // find all packet links that correspond to removed link
                     Set<PacketLinkRealizedByOptical> pLinks = linkPathMap.keySet().stream()
                             .filter(l -> l.isBetween(link.src(), link.dst()) || l.isBetween(link.dst(), link.src()))
                             .collect(Collectors.toSet());
 
                     pLinks.forEach(l -> {
-                        OpticalConnectivity c = linkPathMap.get(l);
-                        // Notifies listeners if all links are gone
-                        if (c.isAllRealizingLinkNotEstablished()) {
-                            post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, c.id()));
-                        }
-                        linkPathMap.remove(l);
+                        // remove found packet links from distributed store
+                        linkPathMap.computeIfPresent(l, (plink, conn) -> {
+                            // Notifies listeners if all packet links are gone
+                            if (conn.isAllRealizingLinkNotEstablished()) {
+                                post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, conn.id()));
+                            }
+                            return null;
+                        });
                     });
                 default:
                     break;
             }
         }
     }
+
+    private class InternalStoreListener
+            implements MapEventListener<PacketLinkRealizedByOptical, OpticalConnectivity> {
+
+        @Override
+        public void event(MapEvent<PacketLinkRealizedByOptical, OpticalConnectivity> event) {
+            switch (event.type()) {
+                case UPDATE:
+                    OpticalConnectivity oldConnectivity = event.oldValue().value();
+                    OpticalConnectivity newConnectivity = event.newValue().value();
+
+                    if (!oldConnectivity.isAllRealizingLinkEstablished() &&
+                            newConnectivity.isAllRealizingLinkEstablished()) {
+                        // Notifies listeners if all links are established
+                        updateBandwidthUsage(newConnectivity);
+                        post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_INSTALLED, newConnectivity.id()));
+                    } else if (!oldConnectivity.isAllRealizingLinkNotEstablished() &&
+                            newConnectivity.isAllRealizingLinkNotEstablished()) {
+                        // Notifies listeners if all links are gone
+                        releaseBandwidthUsage(newConnectivity);
+                        post(new OpticalPathEvent(OpticalPathEvent.Type.PATH_REMOVED, newConnectivity.id()));
+                    }
+
+                    break;
+                default:
+                    break;
+            }
+        }
+
+    }
 }
 
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
index 2e969c2..52728fa 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/PacketLinkRealizedByOptical.java
@@ -33,9 +33,6 @@
     private final Bandwidth bandwidth;
     // TODO should be list of Intent Key?
     private final Key realizingIntentKey;
-    // established=false represents that this (packet) link is expected to be
-    // discovered after underlying (optical) path has been provisioned.
-    private boolean established;
 
     /**
      * Creates instance with specified parameters.
@@ -51,7 +48,6 @@
         this.dst = dst;
         this.realizingIntentKey = realizingIntentKey;
         this.bandwidth = bandwidth;
-        this.established = false;
     }
 
     /**
@@ -129,24 +125,6 @@
     }
 
     /**
-     * Returns whether packet link is realized or not.
-     *
-     * @return true if packet link is realized.  false if not.
-     */
-    public boolean isEstablished() {
-        return established;
-    }
-
-    /**
-     * Sets packet link to be established.
-     *
-     * @param established status of packet link
-     */
-    public void setEstablished(boolean established) {
-        this.established = established;
-    }
-
-    /**
      * Check if packet link is between specified two connect points.
      *
      * @param src source connect point
@@ -157,4 +135,36 @@
         return (this.src.equals(src) && this.dst.equals(dst));
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        PacketLinkRealizedByOptical that = (PacketLinkRealizedByOptical) o;
+
+        if (!src.equals(that.src)) {
+            return false;
+        }
+        if (!dst.equals(that.dst)) {
+            return false;
+        }
+        if (!bandwidth.equals(that.bandwidth)) {
+            return false;
+        }
+        return realizingIntentKey.equals(that.realizingIntentKey);
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = src.hashCode();
+        result = 31 * result + dst.hashCode();
+        result = 31 * result + bandwidth.hashCode();
+        result = 31 * result + realizingIntentKey.hashCode();
+        return result;
+    }
 }
diff --git a/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java b/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
index 9a31127..b3db4fe 100644
--- a/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
+++ b/apps/newoptical/src/main/java/org/onosproject/newoptical/api/OpticalPathService.java
@@ -33,7 +33,7 @@
 public interface OpticalPathService extends ListenerService<OpticalPathEvent, OpticalPathListener> {
 
     /**
-     * Calculates optical path between connect points and sets up connectivity.
+     * Calculates multi-layer path between connect points and sets up connectivity.
      *
      * @param ingress   ingress port
      * @param egress    egress port
@@ -45,9 +45,9 @@
                                             Bandwidth bandwidth, Duration latency);
 
     /**
-     * Sets up connectivity along given optical path.
+     * Sets up connectivity along given multi-layer path including cross-connect links.
      *
-     * @param path      path along which connectivity will be set up
+     * @param path      multi-layer path along which connectivity will be set up
      * @param bandwidth required bandwidth. No bandwidth is assured if null.
      * @param latency   required latency. No latency is assured if null.
      * @return true if successful. false otherwise.
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
index 8d0bc87..7535f7c 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.newoptical;
 
+import com.google.common.collect.ImmutableSet;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,7 +40,9 @@
 import org.onosproject.newoptical.api.OpticalConnectivityId;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -94,10 +97,9 @@
         Link link2 = createLink(cp22, cp31);
         List<Link> links = Stream.of(link1, link2).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp31, links);
-
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc = new OpticalConnectivity(cid, links, bandwidth, latency,
+                Collections.emptySet(), Collections.emptySet());
 
         assertNotNull(oc);
         assertEquals(oc.id(), cid);
@@ -133,8 +135,6 @@
         Link link6 = createLink(cp62, cp71);
         List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp71, links);
-
         // Mocks 2 intents to create OduCtl connectivity
         OpticalConnectivityIntent connIntent1 = createConnectivityIntent(cp21, cp32);
         PacketLinkRealizedByOptical oduLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -144,29 +144,29 @@
         PacketLinkRealizedByOptical oduLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
                 connIntent2);
 
+        Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(oduLink1, oduLink2);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+                plinks, Collections.emptySet());
 
-        oc.addRealizingLink(oduLink1);
-        oc.addRealizingLink(oduLink2);
-
-        assertTrue(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertTrue(oc1.isAllRealizingLinkNotEstablished());
+        assertFalse(oc1.isAllRealizingLinkEstablished());
 
         // Sets link realized by connIntent1 to be established
-        oc.setLinkEstablished(cp12, cp41);
+        OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc2.isAllRealizingLinkNotEstablished());
+        assertFalse(oc2.isAllRealizingLinkEstablished());
 
         // Sets link realized by connIntent2 to be established
-        oc.setLinkEstablished(cp42, cp71);
+        OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertTrue(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc3.isAllRealizingLinkNotEstablished());
+        assertTrue(oc3.isAllRealizingLinkEstablished());
     }
 
     /**
@@ -196,8 +196,6 @@
         Link link6 = createLink(cp62, cp71);
         List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp71, links);
-
         // Mocks 2 intents to create Och connectivity
         OpticalCircuitIntent circuitIntent1 = createCircuitIntent(cp21, cp32);
         PacketLinkRealizedByOptical ochLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -207,29 +205,29 @@
         PacketLinkRealizedByOptical ochLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
                 circuitIntent2);
 
+        Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(ochLink1, ochLink2);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+                plinks, Collections.emptySet());
 
-        oc.addRealizingLink(ochLink1);
-        oc.addRealizingLink(ochLink2);
-
-        assertTrue(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertTrue(oc1.isAllRealizingLinkNotEstablished());
+        assertFalse(oc1.isAllRealizingLinkEstablished());
 
         // Sets link realized by circuitIntent1 to be established
-        oc.setLinkEstablished(cp12, cp41);
+        OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc2.isAllRealizingLinkNotEstablished());
+        assertFalse(oc2.isAllRealizingLinkEstablished());
 
         // Sets link realized by circuitIntent2 to be established
-        oc.setLinkEstablished(cp42, cp71);
+        OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertTrue(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc3.isAllRealizingLinkNotEstablished());
+        assertTrue(oc3.isAllRealizingLinkEstablished());
     }
 
     private ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
index efbaf77..b7d26e3 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
@@ -21,9 +21,12 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.ChassisId;
+import org.onlab.packet.IpAddress;
 import org.onlab.util.Bandwidth;
 import org.onlab.util.Frequency;
 import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
@@ -44,12 +47,15 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.ElementId;
 import org.onosproject.net.Link;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.OchSignal;
 import org.onosproject.net.OduSignalType;
 import org.onosproject.net.Path;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.Config;
 import org.onosproject.net.config.NetworkConfigServiceAdapter;
+import org.onosproject.net.config.basics.BandwidthCapacity;
 import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentEvent;
@@ -74,8 +80,20 @@
 import org.onosproject.newoptical.api.OpticalConnectivityId;
 import org.onosproject.newoptical.api.OpticalPathEvent;
 import org.onosproject.newoptical.api.OpticalPathListener;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.DistributedSetAdapter;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEventListener;
 import org.onosproject.store.service.StorageServiceAdapter;
+import org.onosproject.store.service.Versioned;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -87,6 +105,8 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -155,6 +175,8 @@
     protected TestLinkService linkService;
     protected TestPathService pathService;
     protected TestIntentService intentService;
+    protected TestMastershipService mastershipService;
+    protected TestClusterService clusterService;
     protected IdGenerator idGenerator;
 
     @Before
@@ -188,14 +210,24 @@
 
         this.pathService = new TestPathService();
         this.intentService = new TestIntentService();
+        this.mastershipService = new TestMastershipService();
+        this.clusterService = new TestClusterService();
+
+        mastershipService.setMastership(DEVICE1.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE3.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE4.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE5.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE6.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE7.id(), MastershipRole.MASTER);
 
         this.target = new OpticalPathProvisioner();
         target.coreService = new TestCoreService();
         target.intentService = this.intentService;
         target.pathService = this.pathService;
         target.linkService = this.linkService;
-        target.mastershipService = new TestMastershipService();
-        target.clusterService = new TestClusterService();
+        target.mastershipService = this.mastershipService;
+        target.clusterService = this.clusterService;
         target.storageService = new TestStorageService();
         target.deviceService = this.deviceService;
         target.networkConfigService = new TestNetworkConfigService();
@@ -310,15 +342,16 @@
     }
 
     /**
-     * Checks if PATH_INSTALLED event comes up after intent is installed.
+     * Checks if PATH_INSTALLED event comes up after intent whose master is this node is installed.
      */
     @Test
-    public void testInstalledEvent() {
+    public void testInstalledEventLocal() {
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
 
+        // notify all intents are installed
         intentService.notifyInstalled();
 
         assertEquals(1, listener.events.size());
@@ -327,26 +360,77 @@
     }
 
     /**
-     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     * Checks if PATH_INSTALLED event comes up after intent whose master is remote node is installed.
      */
     @Test
-    public void testRemovedEvent() {
+    public void testInstalledEventRemote() {
+        // set the master for ingress device of intent to remote node
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
 
+        // notify all intents are installed
+        intentService.notifyInstalled();
+
+        // remote nodes must not receive event before distributed map is updated
+        assertEquals(0, listener.events.size());
+    }
+
+    /**
+     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     */
+    @Test
+    public void testRemovedEventLocal() {
+        Bandwidth bandwidth = Bandwidth.bps(100);
+        Duration latency = Duration.ofMillis(10);
+
+        OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+        // notify all intents are installed
         intentService.notifyInstalled();
 
         target.removeConnectivity(cid);
 
+        // notify all intents are withdrawn
         intentService.notifyWithdrawn();
 
+        // must have received "INSTALLED" and "REMOVED" events
         assertEquals(2, listener.events.size());
+        assertEquals(OpticalPathEvent.Type.PATH_INSTALLED, listener.events.get(0).type());
+        assertEquals(cid, listener.events.get(0).subject());
         assertEquals(OpticalPathEvent.Type.PATH_REMOVED, listener.events.get(1).type());
         assertEquals(cid, listener.events.get(1).subject());
     }
 
+
+    /**
+     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     */
+    @Test
+    public void testRemovedEventRemote() {
+        // set the master for ingress device of intent to remote node
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
+        Bandwidth bandwidth = Bandwidth.bps(100);
+        Duration latency = Duration.ofMillis(10);
+
+        OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+        // notify all intents are installed
+        intentService.notifyInstalled();
+
+        target.removeConnectivity(cid);
+
+        // notify all intents are withdrawn
+        intentService.notifyWithdrawn();
+
+        // remote nodes must not receive event before distributed map is updated
+        assertEquals(0, listener.events.size());
+    }
+
     private static ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
         return new ConnectPoint(
                 deviceIdOf(devIdNum),
@@ -495,18 +579,192 @@
     }
 
     private static class TestMastershipService extends MastershipServiceAdapter {
+        private Map<DeviceId, MastershipRole> mastershipMap = new HashMap<>();
+
+        public void setMastership(DeviceId deviceId, MastershipRole role) {
+            mastershipMap.put(deviceId, role);
+        }
+
+        public void clear() {
+            mastershipMap.clear();
+        }
+
+        @Override
+        public MastershipRole getLocalRole(DeviceId deviceId) {
+            return mastershipMap.get(deviceId);
+        }
 
     }
 
     private static class TestClusterService extends ClusterServiceAdapter {
+        private NodeId nodeId;
 
+        public void setLocalNode(String nodeIdStr) {
+            nodeId = NodeId.nodeId(nodeIdStr);
+        }
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return new ControllerNode() {
+                @Override
+                public NodeId id() {
+                    return nodeId;
+                }
+
+                @Override
+                public IpAddress ip() {
+                    return null;
+                }
+
+                @Override
+                public int tcpPort() {
+                    return 0;
+                }
+            };
+        }
     }
 
     private static class TestStorageService extends StorageServiceAdapter {
+
+        @Override
+        public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+            ConsistentMapBuilder<K, V> builder = new ConsistentMapBuilder<K, V>() {
+                @Override
+                public AsyncConsistentMap<K, V> buildAsyncMap() {
+                    return null;
+                }
+
+                @Override
+                public ConsistentMap<K, V> build() {
+                    return new TestConsistentMap<K, V>();
+                }
+            };
+
+            return builder;
+        }
+
+        @Override
+        public <E> DistributedSetBuilder<E> setBuilder() {
+            DistributedSetBuilder<E> builder = new DistributedSetBuilder<E>() {
+                @Override
+                public AsyncDistributedSet<E> build() {
+                    return new DistributedSetAdapter<E>() {
+                        @Override
+                        public DistributedSet<E> asDistributedSet() {
+                            return new TestDistributedSet<E>();
+                        }
+                    };
+                }
+            };
+
+            return builder;
+        }
+
         @Override
         public AtomicCounter getAtomicCounter(String name) {
             return new MockAtomicCounter();
         }
+
+        // Mock ConsistentMap that behaves as a HashMap
+        class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
+            private Map<K, Versioned<V>> map = new HashMap<>();
+            private Map<MapEventListener<K, V>, Executor> listeners = new HashMap<>();
+
+            public void notifyListeners(MapEvent<K, V> event) {
+                listeners.forEach((c, e) -> e.execute(() -> c.event(event)));
+            }
+
+            @Override
+            public int size() {
+                return map.size();
+            }
+
+            @Override
+            public Versioned<V> put(K key, V value) {
+                Versioned<V> oldValue = map.get(key);
+                Versioned<V> newValue = new Versioned<>(value, oldValue == null ? 0 : oldValue.version() + 1);
+                map.put(key, newValue);
+                notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+                return newValue;
+            }
+
+            @Override
+            public Versioned<V> get(K key) {
+                return map.get(key);
+            }
+
+            @Override
+            public Versioned<V> remove(K key) {
+                Versioned<V> oldValue = map.remove(key);
+                notifyListeners(new MapEvent<>(name(), key, oldValue, null));
+                return oldValue;
+            }
+
+            @Override
+            public Versioned<V> computeIfPresent(K key,
+                                                 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+                Versioned<V> oldValue = map.get(key);
+                Versioned<V> newValue = new Versioned<>(remappingFunction.apply(key, oldValue.value()),
+                        oldValue == null ? 0 : oldValue.version() + 1);
+                map.put(key, newValue);
+                notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+                return newValue;
+            }
+
+
+            @Override
+            public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+                return map.entrySet();
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return map.keySet();
+            }
+
+            @Override
+            public Collection<Versioned<V>> values() {
+                return map.values();
+            }
+
+            @Override
+            public void clear() {
+                map.clear();
+            }
+
+            @Override
+            public void addListener(MapEventListener<K, V> listener, Executor executor) {
+                listeners.put(listener, executor);
+            }
+
+            @Override
+            public void removeListener(MapEventListener<K, V> listener) {
+                listeners.remove(listener);
+            }
+        }
+
+        // Mock DistributedSet that behaves as a HashSet
+        class TestDistributedSet<E> extends HashSet<E> implements DistributedSet<E> {
+
+            @Override
+            public void addListener(SetEventListener<E> listener) {
+            }
+
+            @Override
+            public void removeListener(SetEventListener<E> listener) {
+            }
+
+            @Override
+            public String name() {
+                return null;
+            }
+
+            @Override
+            public Type primitiveType() {
+                return null;
+            }
+        }
+
     }
 
     private static class TestDeviceService extends DeviceServiceAdapter {
@@ -525,6 +783,25 @@
     }
 
     private static class TestNetworkConfigService extends NetworkConfigServiceAdapter {
+        @Override
+        @SuppressWarnings("unchecked")
+        public <S, C extends Config<S>> C addConfig(S subject, Class<C> configClass) {
+            if (BandwidthCapacity.class.equals(configClass)) {
+                return (C) new BandwidthCapacity() {
+                    @Override
+                    public void apply() {
+                        // do nothing
+                    }
+
+                    @Override
+                    public BandwidthCapacity capacity(Bandwidth bandwidth) {
+                        // do nothing
+                        return this;
+                    }
+                };
+            }
+            return null;
+        }
 
     }