tests for DistributedMastershipStore
Change-Id: Ic7daa333ac7d7947155b745daf08e4771f1189ef
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index b61afd2..bb221da 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -200,7 +200,7 @@
// process.
if (event != null) {
log.info("Device {} connected", deviceId);
- mastershipService.requestRoleFor(deviceId);
+ //mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.requestRoleFor(deviceId));
post(event);
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 18e6e96..dc42773 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -4,7 +4,6 @@
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
@@ -28,6 +27,7 @@
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
/**
@@ -39,8 +39,22 @@
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
- private IMap<byte[], byte[]> rawMasters;
- private LoadingCache<DeviceId, Optional<NodeId>> masters;
+ //arbitrary lock name
+ private static final String LOCK = "lock";
+ //initial term value
+ private static final Integer INIT = 0;
+ //placeholder non-null value
+ private static final Byte NIL = 0x0;
+
+ //devices to masters
+ protected IMap<byte[], byte[]> rawMasters;
+ //devices to terms
+ protected IMap<byte[], Integer> rawTerms;
+ //collection of nodes. values are ignored, as it's used as a makeshift 'set'
+ protected IMap<byte[], Byte> backups;
+
+ //TODO - remove
+ //private LoadingCache<DeviceId, Optional<NodeId>> masters;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -51,23 +65,18 @@
super.activate();
rawMasters = theInstance.getMap("masters");
- OptionalCacheLoader<DeviceId, NodeId> nodeLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
- masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+ rawTerms = theInstance.getMap("terms");
+ backups = theInstance.getMap("backups");
- loadMasters();
+ //TODO: hook up maps to event notification
+ //OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+ //= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
+ //masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ //rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
log.info("Started");
}
- private void loadMasters() {
- for (byte[] keyBytes : rawMasters.keySet()) {
- final DeviceId id = deserialize(keyBytes);
- masters.refresh(id);
- }
- }
-
@Deactivate
public void deactivate() {
log.info("Stopped");
@@ -75,60 +84,178 @@
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- synchronized (this) {
- NodeId currentMaster = getMaster(deviceId);
- if (Objects.equals(currentMaster, nodeId)) {
- return null;
- }
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
- // FIXME: for now implementing semantics of setMaster
- rawMasters.put(serialize(deviceId), serialize(nodeId));
- masters.put(deviceId, Optional.of(nodeId));
- return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ Integer term = rawTerms.get(did);
+ switch (role) {
+ case MASTER:
+ return null;
+ case STANDBY:
+ rawMasters.put(did, nid);
+ rawTerms.put(did, ++term);
+ backups.putIfAbsent(nid, NIL);
+ break;
+ case NONE:
+ rawMasters.put(did, nid);
+ //new switch OR state transition after being orphaned
+ if (term == null) {
+ rawTerms.put(did, INIT);
+ } else {
+ rawTerms.put(did, ++term);
+ }
+ backups.put(nid, NIL);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ } finally {
+ lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
- return masters.getUnchecked(deviceId).orNull();
+ return deserialize(rawMasters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
- for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
- if (nodeId.equals(entry.getValue().get())) {
- builder.add(entry.getKey());
+
+ for (Map.Entry<byte[], byte[]> entry : rawMasters.entrySet()) {
+ if (nodeId.equals(deserialize(entry.getValue()))) {
+ builder.add((DeviceId) deserialize(entry.getKey()));
}
}
+
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
- // FIXME: for now we are 'selecting' as master whoever asks
- setMaster(clusterService.getLocalNode().id(), deviceId);
- return MastershipRole.MASTER;
+ // first to empty slot for device in master map is MASTER
+ // depending on how backups are organized, might need to trigger election
+ // so only controller doesn't set itself to backup for another device
+ byte [] did = serialize(deviceId);
+ NodeId local = clusterService.getLocalNode().id();
+ byte [] lnid = serialize(local);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ break;
+ case STANDBY:
+ backups.put(lnid, NIL);
+ rawTerms.putIfAbsent(did, INIT);
+ break;
+ case NONE:
+ rawMasters.put(did, lnid);
+ rawTerms.putIfAbsent(did, INIT);
+ backups.put(lnid, NIL);
+ role = MastershipRole.MASTER;
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ lock.unlock();
+ }
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId master = masters.getUnchecked(deviceId).orNull();
- return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ byte[] did = serialize(deviceId);
+
+ NodeId current = deserialize(rawMasters.get(did));
+ MastershipRole role = null;
+
+ if (current == null) {
+ //IFF no controllers have claimed mastership over it
+ role = MastershipRole.NONE;
+ } else {
+ if (current.equals(nodeId)) {
+ role = MastershipRole.MASTER;
+ } else {
+ role = MastershipRole.STANDBY;
+ }
+ }
+
+ return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ byte[] did = serialize(deviceId);
+
+ if ((rawMasters.get(did) == null) ||
+ (rawTerms.get(did) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(
+ (NodeId) deserialize(rawMasters.get(did)), rawTerms.get(did));
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
- // TODO Auto-generated method stub
+ byte [] did = serialize(deviceId);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ //hand off device to another
+ NodeId backup = reelect(nodeId, deviceId);
+ if (backup == null) {
+ //goes back to NONE
+ rawMasters.remove(did);
+ } else {
+ //goes to STANDBY for local, MASTER for someone else
+ Integer term = rawTerms.get(did);
+ rawMasters.put(did, serialize(backup));
+ rawTerms.put(did, ++term);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+ }
+ case STANDBY:
+ case NONE:
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ //helper for "re-electing" a new master for a given device
+ private NodeId reelect(NodeId current, DeviceId deviceId) {
+ for (byte [] node : backups.keySet()) {
+ NodeId nid = deserialize(node);
+ if (!current.equals(nid)) {
+ return nid;
+ }
+ }
return null;
}
+ //adds node to pool(s) of backup
+ private void backup(NodeId nodeId, DeviceId deviceId) {
+ //TODO might be useful to isolate out
+ }
+
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..81ddce0
--- /dev/null
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,276 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipEvent.Type;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.common.StoreManager;
+import org.onlab.onos.store.common.StoreService;
+import org.onlab.onos.store.common.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializationManager;
+import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+ private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+ private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+ private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+ private static final DeviceId DID4 = DeviceId.deviceId("of:04");
+
+ private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+ private static final NodeId N1 = new NodeId("node1");
+ private static final NodeId N2 = new NodeId("node2");
+
+ private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+ private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+ private DistributedMastershipStore dms;
+ private TestDistributedMastershipStore testStore;
+ private KryoSerializationManager serializationMgr;
+ private StoreManager storeMgr;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // TODO should find a way to clean Hazelcast instance without shutdown.
+ Config config = TestStoreManager.getTestConfig();
+
+ storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+ storeMgr.activate();
+
+ serializationMgr = new KryoSerializationManager();
+ serializationMgr.activate();
+
+ dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+ dms.clusterService = new TestClusterService();
+ dms.activate();
+
+ testStore = (TestDistributedMastershipStore) dms;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dms.deactivate();
+
+ serializationMgr.deactivate();
+
+ storeMgr.deactivate();
+ }
+
+ @Test
+ public void getRole() {
+ assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+ testStore.put(DID1, N1, true, true, true);
+ assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+ assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+ }
+
+ @Test
+ public void getMaster() {
+ assertTrue("wrong store state:", dms.rawMasters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ assertEquals("wrong master:", N1, dms.getMaster(DID1));
+ assertNull("wrong master:", dms.getMaster(DID2));
+ }
+
+ @Test
+ public void getDevices() {
+ assertTrue("wrong store state:", dms.rawMasters.isEmpty());
+
+ testStore.put(DID1, N1, true, false, false);
+ testStore.put(DID2, N1, true, false, false);
+ testStore.put(DID3, N2, true, false, false);
+
+ assertEquals("wrong devices",
+ Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+ }
+
+ @Test
+ public void requestRoleAndTerm() {
+ //CN1 is "local"
+ testStore.setCurrent(CN1);
+
+ //if already MASTER, nothing should happen
+ testStore.put(DID2, N1, true, false, false);
+ assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+ assertTrue("wrong state for store:",
+ dms.backups.isEmpty() & dms.rawTerms.isEmpty());
+
+ //populate maps with DID1, N1 thru NONE case
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertTrue("wrong state for store:",
+ !dms.backups.isEmpty() & !dms.rawTerms.isEmpty());
+ assertEquals("wrong term",
+ MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+ //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+ testStore.setCurrent(CN2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong number of entries:", 2, dms.rawTerms.size());
+
+ //change term and requestRole() again; should persist
+ testStore.increment(DID2);
+ assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void setMaster() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+ //switch over to N2
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+ //orphan switch - should be rare case
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+ assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+ //disconnect and reconnect - sign of failing re-election or single-instance channel
+ testStore.reset(true, false, false);
+ dms.setMaster(N2, DID2);
+ assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+ }
+
+ @Test
+ public void unsetMaster() {
+ //populate maps with DID1, N1 as MASTER thru NONE case
+ testStore.setCurrent(CN1);
+ assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ //no backup, no new MASTER/event
+ assertNull("wrong event:", dms.unsetMaster(N1, DID1));
+ //add backup CN2, get it elected MASTER
+ dms.requestRole(DID1);
+ testStore.setCurrent(CN2);
+ dms.requestRole(DID1);
+ assertEquals("wrong event:", Type.MASTER_CHANGED, dms.unsetMaster(N1, DID1).type());
+ assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+ //STANDBY - nothing here, either
+ assertNull("wrong event:", dms.unsetMaster(N1, DID1));
+ assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
+
+ //NONE - nothing happens
+ assertNull("wrong event:", dms.unsetMaster(N1, DID2));
+ assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+ }
+
+ private class TestDistributedMastershipStore extends
+ DistributedMastershipStore {
+ public TestDistributedMastershipStore(StoreService storeService,
+ KryoSerializationService kryoSerializationService) {
+ this.storeService = storeService;
+ this.kryoSerializationService = kryoSerializationService;
+ }
+
+ //helper to populate master/backup structures
+ public void put(DeviceId dev, NodeId node,
+ boolean store, boolean backup, boolean term) {
+ if (store) {
+ dms.rawMasters.put(serialize(dev), serialize(node));
+ }
+ if (backup) {
+ dms.backups.put(serialize(node), (byte) 0);
+ }
+ if (term) {
+ dms.rawTerms.put(serialize(dev), 0);
+ }
+ }
+
+ //clears structures
+ public void reset(boolean store, boolean backup, boolean term) {
+ if (store) {
+ dms.rawMasters.clear();
+ }
+ if (backup) {
+ dms.backups.clear();
+ }
+ if (term) {
+ dms.rawTerms.clear();
+ }
+ }
+
+ //increment term for a device
+ public void increment(DeviceId dev) {
+ Integer t = dms.rawTerms.get(serialize(dev));
+ if (t != null) {
+ dms.rawTerms.put(serialize(dev), ++t);
+ }
+ }
+
+ //sets the "local" node
+ public void setCurrent(ControllerNode node) {
+ ((TestClusterService) clusterService).current = node;
+ }
+ }
+
+ private class TestClusterService implements ClusterService {
+
+ protected ControllerNode current;
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return current;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(CN1, CN2);
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return null;
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+
+ }
+}