Made OpticalPathProvisioner to store connectivity data in distributed store. (ONOS-4518)

Change-Id: I7f9ef02cab4aa1848c8926d2e88478e035076c99
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
index 8d0bc87..7535f7c 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalConnectivityTest.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.newoptical;
 
+import com.google.common.collect.ImmutableSet;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,7 +40,9 @@
 import org.onosproject.newoptical.api.OpticalConnectivityId;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -94,10 +97,9 @@
         Link link2 = createLink(cp22, cp31);
         List<Link> links = Stream.of(link1, link2).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp31, links);
-
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc = new OpticalConnectivity(cid, links, bandwidth, latency,
+                Collections.emptySet(), Collections.emptySet());
 
         assertNotNull(oc);
         assertEquals(oc.id(), cid);
@@ -133,8 +135,6 @@
         Link link6 = createLink(cp62, cp71);
         List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp71, links);
-
         // Mocks 2 intents to create OduCtl connectivity
         OpticalConnectivityIntent connIntent1 = createConnectivityIntent(cp21, cp32);
         PacketLinkRealizedByOptical oduLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -144,29 +144,29 @@
         PacketLinkRealizedByOptical oduLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
                 connIntent2);
 
+        Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(oduLink1, oduLink2);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+                plinks, Collections.emptySet());
 
-        oc.addRealizingLink(oduLink1);
-        oc.addRealizingLink(oduLink2);
-
-        assertTrue(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertTrue(oc1.isAllRealizingLinkNotEstablished());
+        assertFalse(oc1.isAllRealizingLinkEstablished());
 
         // Sets link realized by connIntent1 to be established
-        oc.setLinkEstablished(cp12, cp41);
+        OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc2.isAllRealizingLinkNotEstablished());
+        assertFalse(oc2.isAllRealizingLinkEstablished());
 
         // Sets link realized by connIntent2 to be established
-        oc.setLinkEstablished(cp42, cp71);
+        OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertTrue(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc3.isAllRealizingLinkNotEstablished());
+        assertTrue(oc3.isAllRealizingLinkEstablished());
     }
 
     /**
@@ -196,8 +196,6 @@
         Link link6 = createLink(cp62, cp71);
         List<Link> links = Stream.of(link1, link2, link3, link4, link5, link6).collect(Collectors.toList());
 
-        Path path = new MockPath(cp12, cp71, links);
-
         // Mocks 2 intents to create Och connectivity
         OpticalCircuitIntent circuitIntent1 = createCircuitIntent(cp21, cp32);
         PacketLinkRealizedByOptical ochLink1 = PacketLinkRealizedByOptical.create(cp12, cp41,
@@ -207,29 +205,29 @@
         PacketLinkRealizedByOptical ochLink2 = PacketLinkRealizedByOptical.create(cp42, cp71,
                 circuitIntent2);
 
+        Set<PacketLinkRealizedByOptical> plinks = ImmutableSet.of(ochLink1, ochLink2);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = OpticalConnectivityId.of(1L);
-        OpticalConnectivity oc = new OpticalConnectivity(cid, path, bandwidth, latency);
+        OpticalConnectivity oc1 = new OpticalConnectivity(cid, links, bandwidth, latency,
+                plinks, Collections.emptySet());
 
-        oc.addRealizingLink(ochLink1);
-        oc.addRealizingLink(ochLink2);
-
-        assertTrue(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertTrue(oc1.isAllRealizingLinkNotEstablished());
+        assertFalse(oc1.isAllRealizingLinkEstablished());
 
         // Sets link realized by circuitIntent1 to be established
-        oc.setLinkEstablished(cp12, cp41);
+        OpticalConnectivity oc2 = oc1.setLinkEstablished(cp12, cp41, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertFalse(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc2.isAllRealizingLinkNotEstablished());
+        assertFalse(oc2.isAllRealizingLinkEstablished());
 
         // Sets link realized by circuitIntent2 to be established
-        oc.setLinkEstablished(cp42, cp71);
+        OpticalConnectivity oc3 = oc2.setLinkEstablished(cp42, cp71, true);
 
-        assertFalse(oc.isAllRealizingLinkNotEstablished());
-        assertTrue(oc.isAllRealizingLinkEstablished());
+        assertFalse(oc3.isAllRealizingLinkNotEstablished());
+        assertTrue(oc3.isAllRealizingLinkEstablished());
     }
 
     private ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
diff --git a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
index efbaf77..b7d26e3 100644
--- a/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
+++ b/apps/newoptical/src/test/java/org/onosproject/newoptical/OpticalPathProvisionerTest.java
@@ -21,9 +21,12 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.ChassisId;
+import org.onlab.packet.IpAddress;
 import org.onlab.util.Bandwidth;
 import org.onlab.util.Frequency;
 import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.core.DefaultApplicationId;
@@ -44,12 +47,15 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.ElementId;
 import org.onosproject.net.Link;
+import org.onosproject.net.MastershipRole;
 import org.onosproject.net.OchSignal;
 import org.onosproject.net.OduSignalType;
 import org.onosproject.net.Path;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.Config;
 import org.onosproject.net.config.NetworkConfigServiceAdapter;
+import org.onosproject.net.config.basics.BandwidthCapacity;
 import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.intent.Intent;
 import org.onosproject.net.intent.IntentEvent;
@@ -74,8 +80,20 @@
 import org.onosproject.newoptical.api.OpticalConnectivityId;
 import org.onosproject.newoptical.api.OpticalPathEvent;
 import org.onosproject.newoptical.api.OpticalPathListener;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AtomicCounter;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapAdapter;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.DistributedSetAdapter;
+import org.onosproject.store.service.DistributedSetBuilder;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEventListener;
 import org.onosproject.store.service.StorageServiceAdapter;
+import org.onosproject.store.service.Versioned;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -87,6 +105,8 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -155,6 +175,8 @@
     protected TestLinkService linkService;
     protected TestPathService pathService;
     protected TestIntentService intentService;
+    protected TestMastershipService mastershipService;
+    protected TestClusterService clusterService;
     protected IdGenerator idGenerator;
 
     @Before
@@ -188,14 +210,24 @@
 
         this.pathService = new TestPathService();
         this.intentService = new TestIntentService();
+        this.mastershipService = new TestMastershipService();
+        this.clusterService = new TestClusterService();
+
+        mastershipService.setMastership(DEVICE1.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE3.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE4.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE5.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE6.id(), MastershipRole.MASTER);
+        mastershipService.setMastership(DEVICE7.id(), MastershipRole.MASTER);
 
         this.target = new OpticalPathProvisioner();
         target.coreService = new TestCoreService();
         target.intentService = this.intentService;
         target.pathService = this.pathService;
         target.linkService = this.linkService;
-        target.mastershipService = new TestMastershipService();
-        target.clusterService = new TestClusterService();
+        target.mastershipService = this.mastershipService;
+        target.clusterService = this.clusterService;
         target.storageService = new TestStorageService();
         target.deviceService = this.deviceService;
         target.networkConfigService = new TestNetworkConfigService();
@@ -310,15 +342,16 @@
     }
 
     /**
-     * Checks if PATH_INSTALLED event comes up after intent is installed.
+     * Checks if PATH_INSTALLED event comes up after intent whose master is this node is installed.
      */
     @Test
-    public void testInstalledEvent() {
+    public void testInstalledEventLocal() {
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
 
+        // notify all intents are installed
         intentService.notifyInstalled();
 
         assertEquals(1, listener.events.size());
@@ -327,26 +360,77 @@
     }
 
     /**
-     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     * Checks if PATH_INSTALLED event comes up after intent whose master is remote node is installed.
      */
     @Test
-    public void testRemovedEvent() {
+    public void testInstalledEventRemote() {
+        // set the master for ingress device of intent to remote node
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
         Bandwidth bandwidth = Bandwidth.bps(100);
         Duration latency = Duration.ofMillis(10);
 
         OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
 
+        // notify all intents are installed
+        intentService.notifyInstalled();
+
+        // remote nodes must not receive event before distributed map is updated
+        assertEquals(0, listener.events.size());
+    }
+
+    /**
+     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     */
+    @Test
+    public void testRemovedEventLocal() {
+        Bandwidth bandwidth = Bandwidth.bps(100);
+        Duration latency = Duration.ofMillis(10);
+
+        OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+        // notify all intents are installed
         intentService.notifyInstalled();
 
         target.removeConnectivity(cid);
 
+        // notify all intents are withdrawn
         intentService.notifyWithdrawn();
 
+        // must have received "INSTALLED" and "REMOVED" events
         assertEquals(2, listener.events.size());
+        assertEquals(OpticalPathEvent.Type.PATH_INSTALLED, listener.events.get(0).type());
+        assertEquals(cid, listener.events.get(0).subject());
         assertEquals(OpticalPathEvent.Type.PATH_REMOVED, listener.events.get(1).type());
         assertEquals(cid, listener.events.get(1).subject());
     }
 
+
+    /**
+     * Checks if PATH_REMOVED event comes up after packet link is removed.
+     */
+    @Test
+    public void testRemovedEventRemote() {
+        // set the master for ingress device of intent to remote node
+        mastershipService.setMastership(DEVICE2.id(), MastershipRole.NONE);
+
+        Bandwidth bandwidth = Bandwidth.bps(100);
+        Duration latency = Duration.ofMillis(10);
+
+        OpticalConnectivityId cid = target.setupConnectivity(CP12, CP71, bandwidth, latency);
+
+        // notify all intents are installed
+        intentService.notifyInstalled();
+
+        target.removeConnectivity(cid);
+
+        // notify all intents are withdrawn
+        intentService.notifyWithdrawn();
+
+        // remote nodes must not receive event before distributed map is updated
+        assertEquals(0, listener.events.size());
+    }
+
     private static ConnectPoint createConnectPoint(long devIdNum, long portIdNum) {
         return new ConnectPoint(
                 deviceIdOf(devIdNum),
@@ -495,18 +579,192 @@
     }
 
     private static class TestMastershipService extends MastershipServiceAdapter {
+        private Map<DeviceId, MastershipRole> mastershipMap = new HashMap<>();
+
+        public void setMastership(DeviceId deviceId, MastershipRole role) {
+            mastershipMap.put(deviceId, role);
+        }
+
+        public void clear() {
+            mastershipMap.clear();
+        }
+
+        @Override
+        public MastershipRole getLocalRole(DeviceId deviceId) {
+            return mastershipMap.get(deviceId);
+        }
 
     }
 
     private static class TestClusterService extends ClusterServiceAdapter {
+        private NodeId nodeId;
 
+        public void setLocalNode(String nodeIdStr) {
+            nodeId = NodeId.nodeId(nodeIdStr);
+        }
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return new ControllerNode() {
+                @Override
+                public NodeId id() {
+                    return nodeId;
+                }
+
+                @Override
+                public IpAddress ip() {
+                    return null;
+                }
+
+                @Override
+                public int tcpPort() {
+                    return 0;
+                }
+            };
+        }
     }
 
     private static class TestStorageService extends StorageServiceAdapter {
+
+        @Override
+        public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+            ConsistentMapBuilder<K, V> builder = new ConsistentMapBuilder<K, V>() {
+                @Override
+                public AsyncConsistentMap<K, V> buildAsyncMap() {
+                    return null;
+                }
+
+                @Override
+                public ConsistentMap<K, V> build() {
+                    return new TestConsistentMap<K, V>();
+                }
+            };
+
+            return builder;
+        }
+
+        @Override
+        public <E> DistributedSetBuilder<E> setBuilder() {
+            DistributedSetBuilder<E> builder = new DistributedSetBuilder<E>() {
+                @Override
+                public AsyncDistributedSet<E> build() {
+                    return new DistributedSetAdapter<E>() {
+                        @Override
+                        public DistributedSet<E> asDistributedSet() {
+                            return new TestDistributedSet<E>();
+                        }
+                    };
+                }
+            };
+
+            return builder;
+        }
+
         @Override
         public AtomicCounter getAtomicCounter(String name) {
             return new MockAtomicCounter();
         }
+
+        // Mock ConsistentMap that behaves as a HashMap
+        class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
+            private Map<K, Versioned<V>> map = new HashMap<>();
+            private Map<MapEventListener<K, V>, Executor> listeners = new HashMap<>();
+
+            public void notifyListeners(MapEvent<K, V> event) {
+                listeners.forEach((c, e) -> e.execute(() -> c.event(event)));
+            }
+
+            @Override
+            public int size() {
+                return map.size();
+            }
+
+            @Override
+            public Versioned<V> put(K key, V value) {
+                Versioned<V> oldValue = map.get(key);
+                Versioned<V> newValue = new Versioned<>(value, oldValue == null ? 0 : oldValue.version() + 1);
+                map.put(key, newValue);
+                notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+                return newValue;
+            }
+
+            @Override
+            public Versioned<V> get(K key) {
+                return map.get(key);
+            }
+
+            @Override
+            public Versioned<V> remove(K key) {
+                Versioned<V> oldValue = map.remove(key);
+                notifyListeners(new MapEvent<>(name(), key, oldValue, null));
+                return oldValue;
+            }
+
+            @Override
+            public Versioned<V> computeIfPresent(K key,
+                                                 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+                Versioned<V> oldValue = map.get(key);
+                Versioned<V> newValue = new Versioned<>(remappingFunction.apply(key, oldValue.value()),
+                        oldValue == null ? 0 : oldValue.version() + 1);
+                map.put(key, newValue);
+                notifyListeners(new MapEvent<>(name(), key, newValue, oldValue));
+                return newValue;
+            }
+
+
+            @Override
+            public Set<Map.Entry<K, Versioned<V>>> entrySet() {
+                return map.entrySet();
+            }
+
+            @Override
+            public Set<K> keySet() {
+                return map.keySet();
+            }
+
+            @Override
+            public Collection<Versioned<V>> values() {
+                return map.values();
+            }
+
+            @Override
+            public void clear() {
+                map.clear();
+            }
+
+            @Override
+            public void addListener(MapEventListener<K, V> listener, Executor executor) {
+                listeners.put(listener, executor);
+            }
+
+            @Override
+            public void removeListener(MapEventListener<K, V> listener) {
+                listeners.remove(listener);
+            }
+        }
+
+        // Mock DistributedSet that behaves as a HashSet
+        class TestDistributedSet<E> extends HashSet<E> implements DistributedSet<E> {
+
+            @Override
+            public void addListener(SetEventListener<E> listener) {
+            }
+
+            @Override
+            public void removeListener(SetEventListener<E> listener) {
+            }
+
+            @Override
+            public String name() {
+                return null;
+            }
+
+            @Override
+            public Type primitiveType() {
+                return null;
+            }
+        }
+
     }
 
     private static class TestDeviceService extends DeviceServiceAdapter {
@@ -525,6 +783,25 @@
     }
 
     private static class TestNetworkConfigService extends NetworkConfigServiceAdapter {
+        @Override
+        @SuppressWarnings("unchecked")
+        public <S, C extends Config<S>> C addConfig(S subject, Class<C> configClass) {
+            if (BandwidthCapacity.class.equals(configClass)) {
+                return (C) new BandwidthCapacity() {
+                    @Override
+                    public void apply() {
+                        // do nothing
+                    }
+
+                    @Override
+                    public BandwidthCapacity capacity(Bandwidth bandwidth) {
+                        // do nothing
+                        return this;
+                    }
+                };
+            }
+            return null;
+        }
 
     }