Ensure mastership elections are cached in flow rule store
Change-Id: Iff1b1d743a38310e76a5c87605480e9faa5eab2b
(cherry picked from commit d334841f7067def3bad1c618a27a75d28d6df004)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 746142e..c3a3977 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -202,6 +202,7 @@
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.withElectionTimeout(electionTimeoutMillis)
+ .withRelaxedReadConsistency()
.build()
.asLeaderElector();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index dec602b..79c9147 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,31 +15,29 @@
*/
package org.onosproject.store.flow.impl;
-import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.core.VersionService;
-import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.event.Change;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.service.CoordinationService;
-import org.onosproject.store.service.LeaderElector;
import org.slf4j.Logger;
+import java.util.Collections;
+import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -50,66 +48,38 @@
*/
@Component(immediate = true)
@Service
-public class ReplicaInfoManager
- extends AbstractListenerManager<ReplicaInfoEvent, ReplicaInfoEventListener>
- implements ReplicaInfoService {
-
- private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+");
+public class ReplicaInfoManager implements ReplicaInfoService {
private final Logger log = getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoordinationService coordinationService;
+ private final MastershipListener mastershipListener = new InternalMastershipListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
+ protected EventDeliveryService eventDispatcher;
- private final Consumer<Change<Leadership>> leadershipChangeListener = change -> {
- Leadership oldLeadership = change.oldValue();
- Leadership newLeadership = change.newValue();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
- String topic = newLeadership.topic();
- if (!isDeviceMastershipTopic(topic)) {
- return;
- }
-
- DeviceId deviceId = extractDeviceIdFromTopic(topic);
- ReplicaInfo replicaInfo = buildFromLeadership(newLeadership);
-
- boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader());
- boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates());
-
- if (leaderChanged) {
- post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo));
- }
- if (candidatesChanged) {
- post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo));
- }
- };
-
- private LeaderElector leaderElector;
+ protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
+ listenerRegistry = new ListenerRegistry<>();
@Activate
public void activate() {
eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
- leaderElector = coordinationService.leaderElectorBuilder()
- .withName("onos-leadership-elections")
- .build()
- .asLeaderElector();
- leaderElector.addChangeListener(leadershipChangeListener);
+ mastershipService.addListener(mastershipListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(ReplicaInfoEvent.class);
- leaderElector.removeChangeListener(leadershipChangeListener);
+ mastershipService.removeListener(mastershipListener);
log.info("Stopped");
}
@Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
- return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId)));
+ return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
}
@Override
@@ -122,27 +92,32 @@
listenerRegistry.removeListener(checkNotNull(listener));
}
- String createDeviceMastershipTopic(DeviceId deviceId) {
- return String.format("device:%s|%s", deviceId.toString(), versionService.version());
+ private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
+ List<NodeId> backups = roles.backups() == null ?
+ Collections.emptyList() : ImmutableList.copyOf(roles.backups());
+ return new ReplicaInfo(roles.master(), backups);
}
- DeviceId extractDeviceIdFromTopic(String topic) {
- Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
- if (m.matches()) {
- return DeviceId.deviceId(m.group(1));
- } else {
- throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
+ final class InternalMastershipListener implements MastershipListener {
+
+ @Override
+ public void event(MastershipEvent event) {
+ final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+ switch (event.type()) {
+ case MASTER_CHANGED:
+ eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
+ event.subject(),
+ replicaInfo));
+ break;
+ case BACKUPS_CHANGED:
+ eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED,
+ event.subject(),
+ replicaInfo));
+ break;
+ default:
+ break;
+ }
}
}
- boolean isDeviceMastershipTopic(String topic) {
- Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
- return m.matches();
- }
-
- static ReplicaInfo buildFromLeadership(Leadership leadership) {
- return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream()
- .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId()))
- .collect(Collectors.toList()));
- }
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index b342fa9..9709643 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,197 +15,153 @@
*/
package org.onosproject.store.flow.impl;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.function.Consumer;
-
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onosproject.cluster.Leader;
-import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.core.Version;
-import org.onosproject.event.Change;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
+import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.CoordinationService;
-import org.onosproject.store.service.LeaderElector;
-import org.onosproject.store.service.LeaderElectorBuilder;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
+import org.onosproject.store.flow.ReplicaInfoService;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ReplicaInfoManagerTest {
+
private static final DeviceId DID1 = DeviceId.deviceId("of:1");
private static final DeviceId DID2 = DeviceId.deviceId("of:2");
private static final NodeId NID1 = new NodeId("foo");
- private static final NodeId NID2 = new NodeId("bar");
- private TestLeaderElector leaderElector;
- private ReplicaInfoManager manager;
+ private ReplicaInfoManager mgr;
+ private ReplicaInfoService service;
+
+ private ListenerRegistry<MastershipEvent, MastershipListener>
+ mastershipListenerRegistry;
+ private TestEventDispatcher eventDispatcher;
+
@Before
public void setUp() throws Exception {
- leaderElector = new TestLeaderElector();
- manager = new TestReplicaInfoManager();
- manager.versionService = () -> Version.version("1.0.0");
- CoordinationService coordinationService = mock(CoordinationService.class);
- AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class);
- expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes();
- expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() {
- @Override
- public AsyncLeaderElector build() {
- return leaderElector;
- }
- }).anyTimes();
- replay(coordinationService, leaderElector);
- manager.coordinationService = coordinationService;
+ mastershipListenerRegistry = new ListenerRegistry<>();
- manager.activate();
+ mgr = new ReplicaInfoManager();
+ service = mgr;
+
+ eventDispatcher = new TestEventDispatcher();
+ mgr.eventDispatcher = eventDispatcher;
+ mgr.mastershipService = new TestMastershipService();
+
+ // register dummy mastership event source
+ mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry);
+
+ mgr.activate();
}
@After
public void tearDown() throws Exception {
- manager.deactivate();
+ mgr.deactivate();
}
@Test
- public void testMastershipTopics() throws Exception {
- assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1));
- assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0"));
- assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0"));
- assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0"));
- assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz"));
- assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0"));
- assertFalse(manager.isDeviceMastershipTopic("foobarbaz"));
+ public void testGetReplicaInfoFor() {
+ ReplicaInfo info1 = service.getReplicaInfoFor(DID1);
+ assertEquals(Optional.of(NID1), info1.master());
+ // backups are always empty for now
+ assertEquals(Collections.emptyList(), info1.backups());
+
+ ReplicaInfo info2 = service.getReplicaInfoFor(DID2);
+ assertEquals("There's no master", Optional.empty(), info2.master());
+ // backups are always empty for now
+ assertEquals(Collections.emptyList(), info2.backups());
}
@Test
- public void testReplicaEvents() throws Exception {
- Queue<ReplicaInfoEvent> events = new ArrayBlockingQueue<>(2);
- manager.addListener(events::add);
+ public void testReplicaInfoEvent() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ service.addListener(new MasterNodeCheck(latch, DID1, NID1));
- Leadership oldLeadership = new Leadership(
- manager.createDeviceMastershipTopic(DID1),
- new Leader(NID1, 1, 1),
- Lists.newArrayList(NID1));
- Leadership newLeadership = new Leadership(
- manager.createDeviceMastershipTopic(DID1),
- new Leader(NID2, 2, 1),
- Lists.newArrayList(NID2, NID1));
+ // fake MastershipEvent
+ eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
+ new RoleInfo(NID1, new LinkedList<>())));
- leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
- leaderElector.post(new Change<>(oldLeadership, newLeadership));
-
- ReplicaInfoEvent event = events.remove();
- assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type());
- assertEquals(NID2, event.replicaInfo().master().get());
- assertEquals(1, event.replicaInfo().backups().size());
-
- event = events.remove();
- assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
- assertEquals(NID2, event.replicaInfo().master().get());
- assertEquals(1, event.replicaInfo().backups().size());
-
- assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get());
- assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
-
- oldLeadership = new Leadership(
- manager.createDeviceMastershipTopic(DID1),
- new Leader(NID1, 1, 1),
- Lists.newArrayList(NID1));
- newLeadership = new Leadership(
- manager.createDeviceMastershipTopic(DID1),
- new Leader(NID1, 1, 1),
- Lists.newArrayList(NID1, NID2));
-
- leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
- leaderElector.post(new Change<>(oldLeadership, newLeadership));
-
- event = events.remove();
- assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
- assertEquals(NID1, event.replicaInfo().master().get());
- assertEquals(1, event.replicaInfo().backups().size());
-
- assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get());
- assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
}
- private class TestReplicaInfoManager extends ReplicaInfoManager {
- TestReplicaInfoManager() {
- eventDispatcher = new TestEventDispatcher();
+
+ private final class MasterNodeCheck implements ReplicaInfoEventListener {
+ private final CountDownLatch latch;
+ private Optional<NodeId> expectedMaster;
+ private DeviceId expectedDevice;
+
+
+ MasterNodeCheck(CountDownLatch latch, DeviceId did,
+ NodeId nid) {
+ this.latch = latch;
+ this.expectedMaster = Optional.ofNullable(nid);
+ this.expectedDevice = did;
+ }
+
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ assertEquals(expectedDevice, event.subject());
+ assertEquals(expectedMaster, event.replicaInfo().master());
+ // backups are always empty for now
+ assertEquals(Collections.emptyList(), event.replicaInfo().backups());
+ latch.countDown();
}
}
- private class TestLeaderElector implements LeaderElector {
- private final Map<String, Leadership> leaderships = Maps.newConcurrentMap();
- private final Set<Consumer<Change<Leadership>>> listeners = Sets.newConcurrentHashSet();
- @Override
- public String name() {
- return null;
+ private final class TestMastershipService
+ extends MastershipServiceAdapter
+ implements MastershipService {
+
+ private Map<DeviceId, NodeId> masters;
+
+ TestMastershipService() {
+ masters = Maps.newHashMap();
+ masters.put(DID1, NID1);
+ // DID2 has no master
}
@Override
- public Leadership run(String topic, NodeId nodeId) {
- return null;
+ public NodeId getMasterFor(DeviceId deviceId) {
+ return masters.get(deviceId);
}
@Override
- public void withdraw(String topic) {
-
+ public RoleInfo getNodesFor(DeviceId deviceId) {
+ return new RoleInfo(masters.get(deviceId), Collections.emptyList());
}
@Override
- public boolean anoint(String topic, NodeId nodeId) {
- return false;
+ public void addListener(MastershipListener listener) {
+ mastershipListenerRegistry.addListener(listener);
}
@Override
- public boolean promote(String topic, NodeId nodeId) {
- return false;
- }
-
- @Override
- public void evict(NodeId nodeId) {
-
- }
-
- @Override
- public Leadership getLeadership(String topic) {
- return leaderships.get(topic);
- }
-
- @Override
- public Map<String, Leadership> getLeaderships() {
- return leaderships;
- }
-
- @Override
- public void addChangeListener(Consumer<Change<Leadership>> consumer) {
- listeners.add(consumer);
- }
-
- @Override
- public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
- listeners.remove(consumer);
- }
-
- void post(Change<Leadership> change) {
- listeners.forEach(l -> l.accept(change));
+ public void removeListener(MastershipListener listener) {
+ mastershipListenerRegistry.removeListener(listener);
}
}
+
}