Ensure mastership elections are cached in flow rule store

Change-Id: Iff1b1d743a38310e76a5c87605480e9faa5eab2b
(cherry picked from commit d334841f7067def3bad1c618a27a75d28d6df004)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 746142e..c3a3977 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -202,6 +202,7 @@
             leaderElector = storageService.leaderElectorBuilder()
                     .withName("onos-leadership-elections")
                     .withElectionTimeout(electionTimeoutMillis)
+                    .withRelaxedReadConsistency()
                     .build()
                     .asLeaderElector();
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index dec602b..79c9147 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,31 +15,29 @@
  */
 package org.onosproject.store.flow.impl;
 
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.core.VersionService;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.event.Change;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
 import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.service.CoordinationService;
-import org.onosproject.store.service.LeaderElector;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.List;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -50,66 +48,38 @@
  */
 @Component(immediate = true)
 @Service
-public class ReplicaInfoManager
-        extends AbstractListenerManager<ReplicaInfoEvent, ReplicaInfoEventListener>
-        implements ReplicaInfoService {
-
-    private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+");
+public class ReplicaInfoManager implements ReplicaInfoService {
 
     private final Logger log = getLogger(getClass());
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected CoordinationService coordinationService;
+    private final MastershipListener mastershipListener = new InternalMastershipListener();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected VersionService versionService;
+    protected EventDeliveryService eventDispatcher;
 
-    private final Consumer<Change<Leadership>> leadershipChangeListener = change -> {
-        Leadership oldLeadership = change.oldValue();
-        Leadership newLeadership = change.newValue();
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
 
-        String topic = newLeadership.topic();
-        if (!isDeviceMastershipTopic(topic)) {
-            return;
-        }
-
-        DeviceId deviceId = extractDeviceIdFromTopic(topic);
-        ReplicaInfo replicaInfo = buildFromLeadership(newLeadership);
-
-        boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader());
-        boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates());
-
-        if (leaderChanged) {
-            post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo));
-        }
-        if (candidatesChanged) {
-            post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo));
-        }
-    };
-
-    private LeaderElector leaderElector;
+    protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
+        listenerRegistry = new ListenerRegistry<>();
 
     @Activate
     public void activate() {
         eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
-        leaderElector = coordinationService.leaderElectorBuilder()
-                .withName("onos-leadership-elections")
-                .build()
-                .asLeaderElector();
-        leaderElector.addChangeListener(leadershipChangeListener);
+        mastershipService.addListener(mastershipListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(ReplicaInfoEvent.class);
-        leaderElector.removeChangeListener(leadershipChangeListener);
+        mastershipService.removeListener(mastershipListener);
         log.info("Stopped");
     }
 
     @Override
     public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
-        return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId)));
+        return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
     }
 
     @Override
@@ -122,27 +92,32 @@
         listenerRegistry.removeListener(checkNotNull(listener));
     }
 
-    String createDeviceMastershipTopic(DeviceId deviceId) {
-        return String.format("device:%s|%s", deviceId.toString(), versionService.version());
+    private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
+        List<NodeId> backups = roles.backups() == null ?
+            Collections.emptyList() : ImmutableList.copyOf(roles.backups());
+        return new ReplicaInfo(roles.master(), backups);
     }
 
-    DeviceId extractDeviceIdFromTopic(String topic) {
-        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
-        if (m.matches()) {
-            return DeviceId.deviceId(m.group(1));
-        } else {
-            throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
+    final class InternalMastershipListener implements MastershipListener {
+
+        @Override
+        public void event(MastershipEvent event) {
+            final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+            switch (event.type()) {
+                case MASTER_CHANGED:
+                    eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
+                        event.subject(),
+                        replicaInfo));
+                    break;
+                case BACKUPS_CHANGED:
+                    eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED,
+                        event.subject(),
+                        replicaInfo));
+                    break;
+                default:
+                    break;
+            }
         }
     }
 
-    boolean isDeviceMastershipTopic(String topic) {
-        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
-        return m.matches();
-    }
-
-    static ReplicaInfo buildFromLeadership(Leadership leadership) {
-        return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream()
-                .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId()))
-                .collect(Collectors.toList()));
-    }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index b342fa9..9709643 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,197 +15,153 @@
  */
 package org.onosproject.store.flow.impl;
 
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.onosproject.cluster.Leader;
-import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
 import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.core.Version;
-import org.onosproject.event.Change;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.net.DeviceId;
+import org.onosproject.store.flow.ReplicaInfo;
 import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.CoordinationService;
-import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
+import org.onosproject.store.flow.ReplicaInfoService;
 
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class ReplicaInfoManagerTest {
 
+
     private static final DeviceId DID1 = DeviceId.deviceId("of:1");
     private static final DeviceId DID2 = DeviceId.deviceId("of:2");
     private static final NodeId NID1 = new NodeId("foo");
-    private static final NodeId NID2 = new NodeId("bar");
 
-    private TestLeaderElector leaderElector;
-    private ReplicaInfoManager manager;
+    private ReplicaInfoManager mgr;
+    private ReplicaInfoService service;
+
+    private ListenerRegistry<MastershipEvent, MastershipListener>
+        mastershipListenerRegistry;
+    private TestEventDispatcher eventDispatcher;
+
 
     @Before
     public void setUp() throws Exception {
-        leaderElector = new TestLeaderElector();
-        manager = new TestReplicaInfoManager();
-        manager.versionService = () -> Version.version("1.0.0");
-        CoordinationService coordinationService = mock(CoordinationService.class);
-        AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class);
-        expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes();
-        expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() {
-            @Override
-            public AsyncLeaderElector build() {
-                return leaderElector;
-            }
-        }).anyTimes();
-        replay(coordinationService, leaderElector);
-        manager.coordinationService = coordinationService;
+        mastershipListenerRegistry = new ListenerRegistry<>();
 
-        manager.activate();
+        mgr = new ReplicaInfoManager();
+        service = mgr;
+
+        eventDispatcher = new TestEventDispatcher();
+        mgr.eventDispatcher = eventDispatcher;
+        mgr.mastershipService = new TestMastershipService();
+
+        // register dummy mastership event source
+        mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry);
+
+        mgr.activate();
     }
 
     @After
     public void tearDown() throws Exception {
-        manager.deactivate();
+        mgr.deactivate();
     }
 
     @Test
-    public void testMastershipTopics() throws Exception {
-        assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1));
-        assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0"));
-        assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0"));
-        assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0"));
-        assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz"));
-        assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0"));
-        assertFalse(manager.isDeviceMastershipTopic("foobarbaz"));
+    public void testGetReplicaInfoFor() {
+        ReplicaInfo info1 = service.getReplicaInfoFor(DID1);
+        assertEquals(Optional.of(NID1), info1.master());
+        // backups are always empty for now
+        assertEquals(Collections.emptyList(), info1.backups());
+
+        ReplicaInfo info2 = service.getReplicaInfoFor(DID2);
+        assertEquals("There's no master", Optional.empty(), info2.master());
+        // backups are always empty for now
+        assertEquals(Collections.emptyList(), info2.backups());
     }
 
     @Test
-    public void testReplicaEvents() throws Exception {
-        Queue<ReplicaInfoEvent> events = new ArrayBlockingQueue<>(2);
-        manager.addListener(events::add);
+    public void testReplicaInfoEvent() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        service.addListener(new MasterNodeCheck(latch, DID1, NID1));
 
-        Leadership oldLeadership = new Leadership(
-                manager.createDeviceMastershipTopic(DID1),
-                new Leader(NID1, 1, 1),
-                Lists.newArrayList(NID1));
-        Leadership newLeadership = new Leadership(
-                manager.createDeviceMastershipTopic(DID1),
-                new Leader(NID2, 2, 1),
-                Lists.newArrayList(NID2, NID1));
+        // fake MastershipEvent
+        eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
+            new RoleInfo(NID1, new LinkedList<>())));
 
-        leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
-        leaderElector.post(new Change<>(oldLeadership, newLeadership));
-
-        ReplicaInfoEvent event = events.remove();
-        assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type());
-        assertEquals(NID2, event.replicaInfo().master().get());
-        assertEquals(1, event.replicaInfo().backups().size());
-
-        event = events.remove();
-        assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
-        assertEquals(NID2, event.replicaInfo().master().get());
-        assertEquals(1, event.replicaInfo().backups().size());
-
-        assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get());
-        assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
-
-        oldLeadership = new Leadership(
-                manager.createDeviceMastershipTopic(DID1),
-                new Leader(NID1, 1, 1),
-                Lists.newArrayList(NID1));
-        newLeadership = new Leadership(
-                manager.createDeviceMastershipTopic(DID1),
-                new Leader(NID1, 1, 1),
-                Lists.newArrayList(NID1, NID2));
-
-        leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
-        leaderElector.post(new Change<>(oldLeadership, newLeadership));
-
-        event = events.remove();
-        assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
-        assertEquals(NID1, event.replicaInfo().master().get());
-        assertEquals(1, event.replicaInfo().backups().size());
-
-        assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get());
-        assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
     }
 
-    private class TestReplicaInfoManager extends ReplicaInfoManager {
-        TestReplicaInfoManager() {
-            eventDispatcher = new TestEventDispatcher();
+
+    private final class MasterNodeCheck implements ReplicaInfoEventListener {
+        private final CountDownLatch latch;
+        private Optional<NodeId> expectedMaster;
+        private DeviceId expectedDevice;
+
+
+        MasterNodeCheck(CountDownLatch latch, DeviceId did,
+            NodeId nid) {
+            this.latch = latch;
+            this.expectedMaster = Optional.ofNullable(nid);
+            this.expectedDevice = did;
+        }
+
+        @Override
+        public void event(ReplicaInfoEvent event) {
+            assertEquals(expectedDevice, event.subject());
+            assertEquals(expectedMaster, event.replicaInfo().master());
+            // backups are always empty for now
+            assertEquals(Collections.emptyList(), event.replicaInfo().backups());
+            latch.countDown();
         }
     }
 
-    private class TestLeaderElector implements LeaderElector {
-        private final Map<String, Leadership> leaderships = Maps.newConcurrentMap();
-        private final Set<Consumer<Change<Leadership>>> listeners = Sets.newConcurrentHashSet();
 
-        @Override
-        public String name() {
-            return null;
+    private final class TestMastershipService
+        extends MastershipServiceAdapter
+        implements MastershipService {
+
+        private Map<DeviceId, NodeId> masters;
+
+        TestMastershipService() {
+            masters = Maps.newHashMap();
+            masters.put(DID1, NID1);
+            // DID2 has no master
         }
 
         @Override
-        public Leadership run(String topic, NodeId nodeId) {
-            return null;
+        public NodeId getMasterFor(DeviceId deviceId) {
+            return masters.get(deviceId);
         }
 
         @Override
-        public void withdraw(String topic) {
-
+        public RoleInfo getNodesFor(DeviceId deviceId) {
+            return new RoleInfo(masters.get(deviceId), Collections.emptyList());
         }
 
         @Override
-        public boolean anoint(String topic, NodeId nodeId) {
-            return false;
+        public void addListener(MastershipListener listener) {
+            mastershipListenerRegistry.addListener(listener);
         }
 
         @Override
-        public boolean promote(String topic, NodeId nodeId) {
-            return false;
-        }
-
-        @Override
-        public void evict(NodeId nodeId) {
-
-        }
-
-        @Override
-        public Leadership getLeadership(String topic) {
-            return leaderships.get(topic);
-        }
-
-        @Override
-        public Map<String, Leadership> getLeaderships() {
-            return leaderships;
-        }
-
-        @Override
-        public void addChangeListener(Consumer<Change<Leadership>> consumer) {
-            listeners.add(consumer);
-        }
-
-        @Override
-        public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
-            listeners.remove(consumer);
-        }
-
-        void post(Change<Leadership> change) {
-            listeners.forEach(l -> l.accept(change));
+        public void removeListener(MastershipListener listener) {
+            mastershipListenerRegistry.removeListener(listener);
         }
     }
+
 }