refactored MastershipStore to not use ILock
Change-Id: Ic254f6faddba3427d3380910ca90d3d65a29f40b
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index 1def9f9..bc32375 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -2,6 +2,7 @@
import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -22,12 +23,15 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.AbstractHazelcastStore;
+import org.onlab.onos.store.common.SMap;
+import org.onlab.onos.store.serializers.KryoSerializer;
import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MultiMap;
+import static org.onlab.onos.net.MastershipRole.*;
+
/**
* Distributed implementation of the mastership store. The store is
* responsible for the master selection process.
@@ -38,36 +42,26 @@
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
- //arbitrary lock name
- private static final String LOCK = "lock";
//initial term/TTL value
private static final Integer INIT = 0;
- //devices to masters
- protected IMap<byte[], byte[]> masters;
+ //device to node roles
+ protected SMap<DeviceId, RoleValue> roleMap;
//devices to terms
- protected IMap<byte[], Integer> terms;
-
- //re-election related, disjoint-set structures:
- //device-nodes multiset of available nodes
- protected MultiMap<byte[], byte[]> standbys;
- //device-nodes multiset for nodes that have given up on device
- protected MultiMap<byte[], byte[]> unusable;
+ protected SMap<DeviceId, Integer> terms;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
@Activate
public void activate() {
super.activate();
- masters = theInstance.getMap("masters");
- terms = theInstance.getMap("terms");
- standbys = theInstance.getMultiMap("backups");
- unusable = theInstance.getMultiMap("unusable");
-
- masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
+ roleMap = new SMap(theInstance.getMap("nodeRoles"), new KryoSerializer());
+ terms = new SMap(theInstance.getMap("terms"), new KryoSerializer());
+ // roleMap.addEntryListener(new RemoteMasterShipEventHandler(), true);
log.info("Started");
}
@@ -79,12 +73,9 @@
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- byte[] did = serialize(deviceId);
- byte[] nid = serialize(nodeId);
-
- NodeId current = deserialize(masters.get(did));
+ NodeId current = getNode(MASTER, deviceId);
if (current == null) {
- if (standbys.containsEntry(did, nid)) {
+ if (isRole(STANDBY, nodeId, deviceId)) {
//was previously standby, or set to standby from master
return MastershipRole.STANDBY;
} else {
@@ -103,69 +94,66 @@
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- byte [] did = serialize(deviceId);
- byte [] nid = serialize(nodeId);
- ILock lock = theInstance.getLock(LOCK);
- lock.lock();
+ MastershipRole role = getRole(nodeId, deviceId);
+ roleMap.lock(deviceId);
try {
- MastershipRole role = getRole(nodeId, deviceId);
+ RoleValue rv = getRoleValue(deviceId);
switch (role) {
case MASTER:
//reinforce mastership
- evict(nid, did);
+ rv.reassign(nodeId, STANDBY, NONE);
return null;
case STANDBY:
- //make current master standby
- byte [] current = masters.get(did);
+ NodeId current = rv.get(MASTER);
if (current != null) {
- backup(current, did);
+ //backup and replace current master
+ rv.reassign(nodeId, NONE, STANDBY);
+ rv.replace(current, nodeId, MASTER);
+ } else {
+ //no master before so just add.
+ rv.add(MASTER, nodeId);
}
- //assign specified node as new master
- masters.put(did, nid);
- evict(nid, did);
- updateTerm(did);
+ rv.reassign(nodeId, STANDBY, NONE);
+ updateTerm(deviceId);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
case NONE:
- masters.put(did, nid);
- evict(nid, did);
- updateTerm(did);
+ rv.add(MASTER, nodeId);
+ rv.reassign(nodeId, STANDBY, NONE);
+ updateTerm(deviceId);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
default:
log.warn("unknown Mastership Role {}", role);
return null;
}
} finally {
- lock.unlock();
+ roleMap.unlock(deviceId);
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
- return deserialize(masters.get(serialize(deviceId)));
+ return getMaster(deviceId);
}
@Override
public List<NodeId> getNodes(DeviceId deviceId) {
- byte [] did = serialize(deviceId);
List<NodeId> nodes = new LinkedList<>();
- //add current master to head - if there is one
- ILock lock = theInstance.getLock(LOCK);
- lock.lock();
+ //add current master to head - if there is one.
+ roleMap.lock(deviceId);
try {
- byte [] master = masters.get(did);
+ RoleValue rv = getRoleValue(deviceId);
+ NodeId master = rv.get(MASTER);
if (master != null) {
- nodes.add((NodeId) deserialize(master));
+ nodes.add(master);
}
-
- for (byte [] el : standbys.get(serialize(deviceId))) {
- nodes.add((NodeId) deserialize(el));
- }
- return nodes;
+ //We ignore NONE nodes.
+ nodes.addAll(rv.nodesOfRole(STANDBY));
+ return Collections.unmodifiableList(nodes);
} finally {
- lock.unlock();
+ roleMap.unlock(deviceId);
}
}
@@ -173,9 +161,9 @@
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
- for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
- if (nodeId.equals(deserialize(entry.getValue()))) {
- builder.add((DeviceId) deserialize(entry.getKey()));
+ for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+ if (nodeId.equals(el.getValue().get(MASTER))) {
+ builder.add(el.getKey());
}
}
@@ -185,26 +173,24 @@
@Override
public MastershipRole requestRole(DeviceId deviceId) {
NodeId local = clusterService.getLocalNode().id();
- byte [] did = serialize(deviceId);
- byte [] lnid = serialize(local);
- ILock lock = theInstance.getLock(LOCK);
- lock.lock();
+ roleMap.lock(deviceId);
try {
+ RoleValue rv = getRoleValue(deviceId);
MastershipRole role = getRole(local, deviceId);
switch (role) {
case MASTER:
- evict(lnid, did);
+ rv.reassign(local, STANDBY, NONE);
break;
case STANDBY:
- backup(lnid, did);
- terms.putIfAbsent(did, INIT);
+ rv.reassign(local, NONE, STANDBY);
+ terms.putIfAbsent(deviceId, INIT);
break;
case NONE:
//claim mastership
- masters.put(did, lnid);
- evict(lnid, did);
- updateTerm(did);
+ rv.add(MASTER, local);
+ rv.reassign(local, STANDBY, NONE);
+ updateTerm(deviceId);
role = MastershipRole.MASTER;
break;
default:
@@ -212,128 +198,128 @@
}
return role;
} finally {
- lock.unlock();
+ roleMap.unlock(deviceId);
}
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- byte[] did = serialize(deviceId);
- if ((masters.get(did) == null) ||
- (terms.get(did) == null)) {
+ RoleValue rv = getRoleValue(deviceId);
+ if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
return null;
}
- return MastershipTerm.of(
- (NodeId) deserialize(masters.get(did)), terms.get(did));
+ return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
}
@Override
public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
- byte [] did = serialize(deviceId);
- byte [] nid = serialize(nodeId);
MastershipEvent event = null;
- ILock lock = theInstance.getLock(LOCK);
- lock.lock();
+ roleMap.lock(deviceId);
try {
+ RoleValue rv = getRoleValue(deviceId);
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
event = reelect(nodeId, deviceId);
- backup(nid, did);
- break;
+ //fall through to reinforce role
case STANDBY:
//fall through to reinforce role
case NONE:
- backup(nid, did);
+ rv.reassign(nodeId, NONE, STANDBY);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return event;
} finally {
- lock.unlock();
+ roleMap.unlock(deviceId);
}
}
@Override
public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
- byte [] did = serialize(deviceId);
- byte [] nid = serialize(nodeId);
MastershipEvent event = null;
- ILock lock = theInstance.getLock(LOCK);
- lock.lock();
+ roleMap.lock(deviceId);
try {
+ RoleValue rv = getRoleValue(deviceId);
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
event = reelect(nodeId, deviceId);
- evict(nid, did);
- break;
+ //fall through to reinforce relinquishment
case STANDBY:
//fall through to reinforce relinquishment
case NONE:
- evict(nid, did);
+ rv.reassign(nodeId, STANDBY, NONE);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return event;
} finally {
- lock.unlock();
+ roleMap.unlock(deviceId);
}
}
//helper to fetch a new master candidate for a given device.
private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
- byte [] did = serialize(deviceId);
- byte [] nid = serialize(current);
+ RoleValue rv = roleMap.get(deviceId);
//if this is an queue it'd be neater.
- byte [] backup = null;
- for (byte [] n : standbys.get(serialize(deviceId))) {
- if (!current.equals(deserialize(n))) {
+ NodeId backup = null;
+ for (NodeId n : rv.nodesOfRole(STANDBY)) {
+ if (!current.equals(n)) {
backup = n;
break;
}
}
if (backup == null) {
- masters.remove(did, nid);
+ rv.remove(MASTER, current);
return null;
} else {
- masters.put(did, backup);
- evict(backup, did);
- Integer term = terms.get(did);
- terms.put(did, ++term);
+ rv.replace(current, backup, MASTER);
+ rv.reassign(backup, STANDBY, NONE);
+ Integer term = terms.get(deviceId);
+ terms.put(deviceId, ++term);
return new MastershipEvent(
- MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
+ MASTER_CHANGED, deviceId, backup);
}
}
- //adds node to pool(s) of backups and moves them from unusable.
- private void backup(byte [] nodeId, byte [] deviceId) {
- if (!standbys.containsEntry(deviceId, nodeId)) {
- standbys.put(deviceId, nodeId);
+ //return the RoleValue structure for a device, or create one
+ private RoleValue getRoleValue(DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value == null) {
+ value = new RoleValue();
+ roleMap.put(deviceId, value);
}
- if (unusable.containsEntry(deviceId, nodeId)) {
- unusable.remove(deviceId, nodeId);
- }
+ return value;
}
- //adds node to unusable and evicts it from backup pool.
- private void evict(byte [] nodeId, byte [] deviceId) {
- if (!unusable.containsEntry(deviceId, nodeId)) {
- unusable.put(deviceId, nodeId);
+ //get first applicable node out of store-unique structure.
+ private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value != null) {
+ return value.get(role);
}
- if (standbys.containsEntry(deviceId, nodeId)) {
- standbys.remove(deviceId, nodeId);
+ return null;
+ }
+
+ //check if node is a certain role given a device
+ private boolean isRole(
+ MastershipRole role, NodeId nodeId, DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value != null) {
+ return value.contains(role, nodeId);
}
+ return false;
}
//adds or updates term information.
- private void updateTerm(byte [] deviceId) {
+ private void updateTerm(DeviceId deviceId) {
Integer term = terms.get(deviceId);
if (term == null) {
terms.put(deviceId, INIT);