[ONOS-7363] Isolate replica info to a single version for isolation of primary-backup protocols
Change-Id: I780a949a2bc924e114d0e25fca1888d14c9924ad
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 0f47c08..dec602b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,29 +15,31 @@
*/
package org.onosproject.store.flow.impl;
-import com.google.common.collect.ImmutableList;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.mastership.MastershipService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.core.VersionService;
+import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Change;
import org.onosproject.net.DeviceId;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.LeaderElector;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -48,38 +50,66 @@
*/
@Component(immediate = true)
@Service
-public class ReplicaInfoManager implements ReplicaInfoService {
+public class ReplicaInfoManager
+ extends AbstractListenerManager<ReplicaInfoEvent, ReplicaInfoEventListener>
+ implements ReplicaInfoService {
+
+ private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = Pattern.compile("device:([^|]+)\\|[^|]+");
private final Logger log = getLogger(getClass());
- private final MastershipListener mastershipListener = new InternalMastershipListener();
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoordinationService coordinationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
+ protected VersionService versionService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
+ private final Consumer<Change<Leadership>> leadershipChangeListener = change -> {
+ Leadership oldLeadership = change.oldValue();
+ Leadership newLeadership = change.newValue();
- protected final ListenerRegistry<ReplicaInfoEvent, ReplicaInfoEventListener>
- listenerRegistry = new ListenerRegistry<>();
+ String topic = newLeadership.topic();
+ if (!isDeviceMastershipTopic(topic)) {
+ return;
+ }
+
+ DeviceId deviceId = extractDeviceIdFromTopic(topic);
+ ReplicaInfo replicaInfo = buildFromLeadership(newLeadership);
+
+ boolean leaderChanged = !Objects.equals(oldLeadership.leader(), newLeadership.leader());
+ boolean candidatesChanged = !Objects.equals(oldLeadership.candidates(), newLeadership.candidates());
+
+ if (leaderChanged) {
+ post(new ReplicaInfoEvent(MASTER_CHANGED, deviceId, replicaInfo));
+ }
+ if (candidatesChanged) {
+ post(new ReplicaInfoEvent(BACKUPS_CHANGED, deviceId, replicaInfo));
+ }
+ };
+
+ private LeaderElector leaderElector;
@Activate
public void activate() {
eventDispatcher.addSink(ReplicaInfoEvent.class, listenerRegistry);
- mastershipService.addListener(mastershipListener);
+ leaderElector = coordinationService.leaderElectorBuilder()
+ .withName("onos-leadership-elections")
+ .build()
+ .asLeaderElector();
+ leaderElector.addChangeListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(ReplicaInfoEvent.class);
- mastershipService.removeListener(mastershipListener);
+ leaderElector.removeChangeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
- return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+ return buildFromLeadership(leaderElector.getLeadership(createDeviceMastershipTopic(deviceId)));
}
@Override
@@ -92,32 +122,27 @@
listenerRegistry.removeListener(checkNotNull(listener));
}
- private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
- List<NodeId> backups = roles.backups() == null ?
- Collections.emptyList() : ImmutableList.copyOf(roles.backups());
- return new ReplicaInfo(roles.master(), backups);
+ String createDeviceMastershipTopic(DeviceId deviceId) {
+ return String.format("device:%s|%s", deviceId.toString(), versionService.version());
}
- final class InternalMastershipListener implements MastershipListener {
-
- @Override
- public void event(MastershipEvent event) {
- final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
- switch (event.type()) {
- case MASTER_CHANGED:
- eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
- event.subject(),
- replicaInfo));
- break;
- case BACKUPS_CHANGED:
- eventDispatcher.post(new ReplicaInfoEvent(BACKUPS_CHANGED,
- event.subject(),
- replicaInfo));
- break;
- default:
- break;
- }
+ DeviceId extractDeviceIdFromTopic(String topic) {
+ Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+ if (m.matches()) {
+ return DeviceId.deviceId(m.group(1));
+ } else {
+ throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
}
}
+ boolean isDeviceMastershipTopic(String topic) {
+ Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+ return m.matches();
+ }
+
+ static ReplicaInfo buildFromLeadership(Leadership leadership) {
+ return new ReplicaInfo(leadership.leaderNodeId(), leadership.candidates().stream()
+ .filter(nodeId -> !Objects.equals(nodeId, leadership.leaderNodeId()))
+ .collect(Collectors.toList()));
+ }
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index dd822df..b342fa9 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,153 +15,197 @@
*/
package org.onosproject.store.flow.impl;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.common.event.impl.TestEventDispatcher;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipEvent.Type;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.mastership.MastershipServiceAdapter;
+import org.onosproject.core.Version;
+import org.onosproject.event.Change;
import org.onosproject.net.DeviceId;
-import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.flow.ReplicaInfoEventListener;
-import org.onosproject.store.flow.ReplicaInfoService;
+import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.CoordinationService;
+import org.onosproject.store.service.LeaderElector;
+import org.onosproject.store.service.LeaderElectorBuilder;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ReplicaInfoManagerTest {
-
private static final DeviceId DID1 = DeviceId.deviceId("of:1");
private static final DeviceId DID2 = DeviceId.deviceId("of:2");
private static final NodeId NID1 = new NodeId("foo");
+ private static final NodeId NID2 = new NodeId("bar");
- private ReplicaInfoManager mgr;
- private ReplicaInfoService service;
-
- private ListenerRegistry<MastershipEvent, MastershipListener>
- mastershipListenerRegistry;
- private TestEventDispatcher eventDispatcher;
-
+ private TestLeaderElector leaderElector;
+ private ReplicaInfoManager manager;
@Before
public void setUp() throws Exception {
- mastershipListenerRegistry = new ListenerRegistry<>();
+ leaderElector = new TestLeaderElector();
+ manager = new TestReplicaInfoManager();
+ manager.versionService = () -> Version.version("1.0.0");
+ CoordinationService coordinationService = mock(CoordinationService.class);
+ AsyncLeaderElector leaderElector = mock(AsyncLeaderElector.class);
+ expect(leaderElector.asLeaderElector()).andReturn(this.leaderElector).anyTimes();
+ expect(coordinationService.leaderElectorBuilder()).andReturn(new LeaderElectorBuilder() {
+ @Override
+ public AsyncLeaderElector build() {
+ return leaderElector;
+ }
+ }).anyTimes();
+ replay(coordinationService, leaderElector);
+ manager.coordinationService = coordinationService;
- mgr = new ReplicaInfoManager();
- service = mgr;
-
- eventDispatcher = new TestEventDispatcher();
- mgr.eventDispatcher = eventDispatcher;
- mgr.mastershipService = new TestMastershipService();
-
- // register dummy mastership event source
- mgr.eventDispatcher.addSink(MastershipEvent.class, mastershipListenerRegistry);
-
- mgr.activate();
+ manager.activate();
}
@After
public void tearDown() throws Exception {
- mgr.deactivate();
+ manager.deactivate();
}
@Test
- public void testGetReplicaInfoFor() {
- ReplicaInfo info1 = service.getReplicaInfoFor(DID1);
- assertEquals(Optional.of(NID1), info1.master());
- // backups are always empty for now
- assertEquals(Collections.emptyList(), info1.backups());
-
- ReplicaInfo info2 = service.getReplicaInfoFor(DID2);
- assertEquals("There's no master", Optional.empty(), info2.master());
- // backups are always empty for now
- assertEquals(Collections.emptyList(), info2.backups());
+ public void testMastershipTopics() throws Exception {
+ assertEquals("device:of:1|1.0.0", manager.createDeviceMastershipTopic(DID1));
+ assertEquals(DID1, manager.extractDeviceIdFromTopic("device:of:1|1.0.0"));
+ assertTrue(manager.isDeviceMastershipTopic("device:of:1|1.0.0"));
+ assertFalse(manager.isDeviceMastershipTopic("foo:bar|1.0.0"));
+ assertFalse(manager.isDeviceMastershipTopic("foo:bar|baz"));
+ assertFalse(manager.isDeviceMastershipTopic("foobarbaz|1.0.0"));
+ assertFalse(manager.isDeviceMastershipTopic("foobarbaz"));
}
@Test
- public void testReplicaInfoEvent() throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- service.addListener(new MasterNodeCheck(latch, DID1, NID1));
+ public void testReplicaEvents() throws Exception {
+ Queue<ReplicaInfoEvent> events = new ArrayBlockingQueue<>(2);
+ manager.addListener(events::add);
- // fake MastershipEvent
- eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
- new RoleInfo(NID1, new LinkedList<>())));
+ Leadership oldLeadership = new Leadership(
+ manager.createDeviceMastershipTopic(DID1),
+ new Leader(NID1, 1, 1),
+ Lists.newArrayList(NID1));
+ Leadership newLeadership = new Leadership(
+ manager.createDeviceMastershipTopic(DID1),
+ new Leader(NID2, 2, 1),
+ Lists.newArrayList(NID2, NID1));
- assertTrue(latch.await(1, TimeUnit.SECONDS));
+ leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
+ leaderElector.post(new Change<>(oldLeadership, newLeadership));
+
+ ReplicaInfoEvent event = events.remove();
+ assertEquals(ReplicaInfoEvent.Type.MASTER_CHANGED, event.type());
+ assertEquals(NID2, event.replicaInfo().master().get());
+ assertEquals(1, event.replicaInfo().backups().size());
+
+ event = events.remove();
+ assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
+ assertEquals(NID2, event.replicaInfo().master().get());
+ assertEquals(1, event.replicaInfo().backups().size());
+
+ assertEquals(NID2, manager.getReplicaInfoFor(DID1).master().get());
+ assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
+
+ oldLeadership = new Leadership(
+ manager.createDeviceMastershipTopic(DID1),
+ new Leader(NID1, 1, 1),
+ Lists.newArrayList(NID1));
+ newLeadership = new Leadership(
+ manager.createDeviceMastershipTopic(DID1),
+ new Leader(NID1, 1, 1),
+ Lists.newArrayList(NID1, NID2));
+
+ leaderElector.leaderships.put(manager.createDeviceMastershipTopic(DID1), newLeadership);
+ leaderElector.post(new Change<>(oldLeadership, newLeadership));
+
+ event = events.remove();
+ assertEquals(ReplicaInfoEvent.Type.BACKUPS_CHANGED, event.type());
+ assertEquals(NID1, event.replicaInfo().master().get());
+ assertEquals(1, event.replicaInfo().backups().size());
+
+ assertEquals(NID1, manager.getReplicaInfoFor(DID1).master().get());
+ assertEquals(1, manager.getReplicaInfoFor(DID1).backups().size());
}
-
- private final class MasterNodeCheck implements ReplicaInfoEventListener {
- private final CountDownLatch latch;
- private Optional<NodeId> expectedMaster;
- private DeviceId expectedDevice;
-
-
- MasterNodeCheck(CountDownLatch latch, DeviceId did,
- NodeId nid) {
- this.latch = latch;
- this.expectedMaster = Optional.ofNullable(nid);
- this.expectedDevice = did;
- }
-
- @Override
- public void event(ReplicaInfoEvent event) {
- assertEquals(expectedDevice, event.subject());
- assertEquals(expectedMaster, event.replicaInfo().master());
- // backups are always empty for now
- assertEquals(Collections.emptyList(), event.replicaInfo().backups());
- latch.countDown();
+ private class TestReplicaInfoManager extends ReplicaInfoManager {
+ TestReplicaInfoManager() {
+ eventDispatcher = new TestEventDispatcher();
}
}
+ private class TestLeaderElector implements LeaderElector {
+ private final Map<String, Leadership> leaderships = Maps.newConcurrentMap();
+ private final Set<Consumer<Change<Leadership>>> listeners = Sets.newConcurrentHashSet();
- private final class TestMastershipService
- extends MastershipServiceAdapter
- implements MastershipService {
-
- private Map<DeviceId, NodeId> masters;
-
- TestMastershipService() {
- masters = Maps.newHashMap();
- masters.put(DID1, NID1);
- // DID2 has no master
+ @Override
+ public String name() {
+ return null;
}
@Override
- public NodeId getMasterFor(DeviceId deviceId) {
- return masters.get(deviceId);
+ public Leadership run(String topic, NodeId nodeId) {
+ return null;
}
@Override
- public RoleInfo getNodesFor(DeviceId deviceId) {
- return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+ public void withdraw(String topic) {
+
}
@Override
- public void addListener(MastershipListener listener) {
- mastershipListenerRegistry.addListener(listener);
+ public boolean anoint(String topic, NodeId nodeId) {
+ return false;
}
@Override
- public void removeListener(MastershipListener listener) {
- mastershipListenerRegistry.removeListener(listener);
+ public boolean promote(String topic, NodeId nodeId) {
+ return false;
+ }
+
+ @Override
+ public void evict(NodeId nodeId) {
+
+ }
+
+ @Override
+ public Leadership getLeadership(String topic) {
+ return leaderships.get(topic);
+ }
+
+ @Override
+ public Map<String, Leadership> getLeaderships() {
+ return leaderships;
+ }
+
+ @Override
+ public void addChangeListener(Consumer<Change<Leadership>> consumer) {
+ listeners.add(consumer);
+ }
+
+ @Override
+ public void removeChangeListener(Consumer<Change<Leadership>> consumer) {
+ listeners.remove(consumer);
+ }
+
+ void post(Change<Leadership> change) {
+ listeners.forEach(l -> l.accept(change));
}
}
-
}