[ONOS-7363] Isolate replica info to a single version for isolation of primary-backup protocols

Change-Id: I780a949a2bc924e114d0e25fca1888d14c9924ad
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 0f47c08..dec602b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,29 +15,31 @@
  */
 package org.onosproject.store.flow.impl;
 
-import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.mastership.MastershipService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.core.VersionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Change;
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.LeaderElector;
 import org.slf4j.Logger;
 
-import java.util.Collections;
-import java.util.List;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -48,38 +50,66 @@
  */
 @Component(immediate = true)
 @Service
-public class ReplicaInfoManager implements ReplicaInfoService {
+public class ReplicaInfoManager
+        extends AbstractListenerManager<ReplicaInfoEvent, ReplicaInfoEventListener>
+        implements ReplicaInfoService {
+
+    private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+");
 
     private final Logger log = getLogger(getClass());
 
-    private final MastershipListener mastershipListener = new InternalMastershipListener();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoordinationService coordinationService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected EventDeliveryService eventDispatcher;
+    protected VersionService versionService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected MastershipService mastershipService;
+    private final Consumer<Change<Leadership>> leadershipChangeListener = change -> {
+        Leadership oldLeadership = change.oldValue();
+        Leadership newLeadership = change.newValue();
 
-    protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
-        listenerRegistry = new ListenerRegistry<>();
+        String topic = newLeadership.topic();
+        if (!isDeviceMastershipTopic(topic)) {
+            return;
+        }
+
+        DeviceId deviceId = extractDeviceIdFromTopic(topic);
+        ReplicaInfo replicaInfo = buildFromLeadership(newLeadership);
+
+        boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader());
+        boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates());
+
+        if (leaderChanged) {
+            post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo));
+        }
+        if (candidatesChanged) {
+            post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo));
+        }
+    };
+
+    private LeaderElector leaderElector;
 
     @Activate
     public void activate() {
         eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
-        mastershipService.addListener(mastershipListener);
+        leaderElector = coordinationService.leaderElectorBuilder()
+                .withName("onos-leadership-elections")
+                .build()
+                .asLeaderElector();
+        leaderElector.addChangeListener(leadershipChangeListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(ReplicaInfoEvent.class);
-        mastershipService.removeListener(mastershipListener);
+        leaderElector.removeChangeListener(leadershipChangeListener);
         log.info("Stopped");
     }
 
     @Override
     public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
-        return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+        return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId)));
     }
 
     @Override
@@ -92,32 +122,27 @@
         listenerRegistry.removeListener(checkNotNull(listener));
     }
 
-    private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
-        List<NodeId> backups = roles.backups() == null ?
-                Collections.emptyList() : ImmutableList.copyOf(roles.backups());
-        return new ReplicaInfo(roles.master(), backups);
+    String createDeviceMastershipTopic(DeviceId deviceId) {
+        return String.format("device:%s|%s", deviceId.toString(), versionService.version());
     }
 
-    final class InternalMastershipListener implements MastershipListener {
-
-        @Override
-        public void event(MastershipEvent event) {
-            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
-            switch (event.type()) {
-            case MASTER_CHANGED:
-                eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
-                                                          event.subject(),
-                                                          replicaInfo));
-                break;
-            case BACKUPS_CHANGED:
-                eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED,
-                                                          event.subject(),
-                                                          replicaInfo));
-                break;
-            default:
-                break;
-            }
+    DeviceId extractDeviceIdFromTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        if (m.matches()) {
+            return DeviceId.deviceId(m.group(1));
+        } else {
+            throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
         }
     }
 
+    boolean isDeviceMastershipTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        return m.matches();
+    }
+
+    static ReplicaInfo buildFromLeadership(Leadership leadership) {
+        return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream()
+                .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId()))
+                .collect(Collectors.toList()));
+    }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index dd822df..b342fa9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,153 +15,197 @@
  */
 package org.onosproject.store.flow.impl;
 
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
 import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipEvent.Type;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.core.Version;
+import org.onosproject.event.Change;
 import org.onosproject.net.DeviceId;
-import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.flow.ReplicaInfoEventListener;
-import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.LeaderElector;
+import org.onosproject.store.service.LeaderElectorBuilder;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ReplicaInfoManagerTest {
 
-
     private static final DeviceId DID1 = DeviceId.deviceId("of:1");
     private static final DeviceId DID2 = DeviceId.deviceId("of:2");
     private static final NodeId NID1 = new NodeId("foo");
+    private static final NodeId NID2 = new NodeId("bar");
 
-    private ReplicaInfoManager mgr;
-    private ReplicaInfoService service;
-
-    private ListenerRegistry<MastershipEvent, MastershipListener>
-        mastershipListenerRegistry;
-    private TestEventDispatcher eventDispatcher;
-
+    private TestLeaderElector leaderElector;
+    private ReplicaInfoManager manager;
 
     @Before
     public void setUp() throws Exception {
-        mastershipListenerRegistry = new ListenerRegistry<>();
+        leaderElector = new TestLeaderElector();
+        manager = new TestReplicaInfoManager();
+        manager.versionService = () -> Version.version("1.0.0");
+        CoordinationService coordinationService = mock(CoordinationService.class);
+        AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class);
+        expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes();
+        expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() {
+            @Override
+            public AsyncLeaderElector build() {
+                return leaderElector;
+            }
+        }).anyTimes();
+        replay(coordinationService, leaderElector);
+        manager.coordinationService = coordinationService;
 
-        mgr = new ReplicaInfoManager();
-        service = mgr;
-
-        eventDispatcher = new TestEventDispatcher();
-        mgr.eventDispatcher = eventDispatcher;
-        mgr.mastershipService = new TestMastershipService();
-
-        // register dummy mastership event source
-        mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry);
-
-        mgr.activate();
+        manager.activate();
     }
 
     @After
     public void tearDown() throws Exception {
-        mgr.deactivate();
+        manager.deactivate();
     }
 
     @Test
-    public void testGetReplicaInfoFor() {
-        ReplicaInfo info1 = service.getReplicaInfoFor(DID1);
-        assertEquals(Optional.of(NID1), info1.master());
-        // backups are always empty for now
-        assertEquals(Collections.emptyList(), info1.backups());
-
-        ReplicaInfo info2 = service.getReplicaInfoFor(DID2);
-        assertEquals("There's no master", Optional.empty(), info2.master());
-        // backups are always empty for now
-        assertEquals(Collections.emptyList(), info2.backups());
+    public void testMastershipTopics() throws Exception {
+        assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1));
+        assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0"));
+        assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0"));
+        assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0"));
+        assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz"));
+        assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0"));
+        assertFalse(manager.isDeviceMastershipTopic("foobarbaz"));
     }
 
     @Test
-    public void testReplicaInfoEvent() throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        service.addListener(new MasterNodeCheck(latch, DID1, NID1));
+    public void testReplicaEvents() throws Exception {
+        Queue<ReplicaInfoEvent> events = new ArrayBlockingQueue<>(2);
+        manager.addListener(events::add);
 
-        // fake MastershipEvent
-        eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
-                new RoleInfo(NID1, new LinkedList<>())));
+        Leadership oldLeadership = new Leadership(
+                manager.createDeviceMastershipTopic(DID1),
+                new Leader(NID1, 1, 1),
+                Lists.newArrayList(NID1));
+        Leadership newLeadership = new Leadership(
+                manager.createDeviceMastershipTopic(DID1),
+                new Leader(NID2, 2, 1),
+                Lists.newArrayList(NID2, NID1));
 
-        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
+        leaderElector.post(new Change<>(oldLeadership, newLeadership));
+
+        ReplicaInfoEvent event = events.remove();
+        assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type());
+        assertEquals(NID2, event.replicaInfo().master().get());
+        assertEquals(1, event.replicaInfo().backups().size());
+
+        event = events.remove();
+        assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
+        assertEquals(NID2, event.replicaInfo().master().get());
+        assertEquals(1, event.replicaInfo().backups().size());
+
+        assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get());
+        assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
+
+        oldLeadership = new Leadership(
+                manager.createDeviceMastershipTopic(DID1),
+                new Leader(NID1, 1, 1),
+                Lists.newArrayList(NID1));
+        newLeadership = new Leadership(
+                manager.createDeviceMastershipTopic(DID1),
+                new Leader(NID1, 1, 1),
+                Lists.newArrayList(NID1, NID2));
+
+        leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
+        leaderElector.post(new Change<>(oldLeadership, newLeadership));
+
+        event = events.remove();
+        assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
+        assertEquals(NID1, event.replicaInfo().master().get());
+        assertEquals(1, event.replicaInfo().backups().size());
+
+        assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get());
+        assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
     }
 
-
-    private final class MasterNodeCheck implements ReplicaInfoEventListener {
-        private final CountDownLatch latch;
-        private Optional<NodeId> expectedMaster;
-        private DeviceId expectedDevice;
-
-
-        MasterNodeCheck(CountDownLatch latch, DeviceId did,
-                        NodeId nid) {
-            this.latch = latch;
-            this.expectedMaster = Optional.ofNullable(nid);
-            this.expectedDevice = did;
-        }
-
-        @Override
-        public void event(ReplicaInfoEvent event) {
-            assertEquals(expectedDevice, event.subject());
-            assertEquals(expectedMaster, event.replicaInfo().master());
-            // backups are always empty for now
-            assertEquals(Collections.emptyList(), event.replicaInfo().backups());
-            latch.countDown();
+    private class TestReplicaInfoManager extends ReplicaInfoManager {
+        TestReplicaInfoManager() {
+            eventDispatcher = new TestEventDispatcher();
         }
     }
 
+    private class TestLeaderElector implements LeaderElector {
+        private final Map<String, Leadership> leaderships = Maps.newConcurrentMap();
+        private final Set<Consumer<Change<Leadership>>> listeners = Sets.newConcurrentHashSet();
 
-    private final class TestMastershipService
-            extends MastershipServiceAdapter
-            implements MastershipService {
-
-        private Map<DeviceId, NodeId> masters;
-
-        TestMastershipService() {
-            masters = Maps.newHashMap();
-            masters.put(DID1, NID1);
-            // DID2 has no master
+        @Override
+        public String name() {
+            return null;
         }
 
         @Override
-        public NodeId getMasterFor(DeviceId deviceId) {
-            return masters.get(deviceId);
+        public Leadership run(String topic, NodeId nodeId) {
+            return null;
         }
 
         @Override
-        public RoleInfo getNodesFor(DeviceId deviceId) {
-            return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+        public void withdraw(String topic) {
+
         }
 
         @Override
-        public void addListener(MastershipListener listener) {
-            mastershipListenerRegistry.addListener(listener);
+        public boolean anoint(String topic, NodeId nodeId) {
+            return false;
         }
 
         @Override
-        public void removeListener(MastershipListener listener) {
-            mastershipListenerRegistry.removeListener(listener);
+        public boolean promote(String topic, NodeId nodeId) {
+            return false;
+        }
+
+        @Override
+        public void evict(NodeId nodeId) {
+
+        }
+
+        @Override
+        public Leadership getLeadership(String topic) {
+            return leaderships.get(topic);
+        }
+
+        @Override
+        public Map<String, Leadership> getLeaderships() {
+            return leaderships;
+        }
+
+        @Override
+        public void addChangeListener(Consumer<Change<Leadership>> consumer) {
+            listeners.add(consumer);
+        }
+
+        @Override
+        public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
+            listeners.remove(consumer);
+        }
+
+        void post(Change<Leadership> change) {
+            listeners.forEach(l -> l.accept(change));
         }
     }
-
 }