tests for DistributedMastershipStore
Change-Id: Ic7daa333ac7d7947155b745daf08e4771f1189ef
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 18e6e96..dc42773 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -4,7 +4,6 @@
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
@@ -28,6 +27,7 @@
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
/**
@@ -39,8 +39,22 @@
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
- private IMap<byte[], byte[]> rawMasters;
- private LoadingCache<DeviceId, Optional<NodeId>> masters;
+ //arbitrary lock name
+ private static final String LOCK = "lock";
+ //initial term value
+ private static final Integer INIT = 0;
+ //placeholder non-null value
+ private static final Byte NIL = 0x0;
+
+ //devices to masters
+ protected IMap<byte[], byte[]> rawMasters;
+ //devices to terms
+ protected IMap<byte[], Integer> rawTerms;
+ //collection of nodes. values are ignored, as it's used as a makeshift 'set'
+ protected IMap<byte[], Byte> backups;
+
+ //TODO - remove
+ //private LoadingCache<DeviceId, Optional<NodeId>> masters;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -51,23 +65,18 @@
super.activate();
rawMasters = theInstance.getMap("masters");
- OptionalCacheLoader<DeviceId, NodeId> nodeLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
- masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+ rawTerms = theInstance.getMap("terms");
+ backups = theInstance.getMap("backups");
- loadMasters();
+ //TODO: hook up maps to event notification
+ //OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+ //= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
+ //masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+ //rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
log.info("Started");
}
- private void loadMasters() {
- for (byte[] keyBytes : rawMasters.keySet()) {
- final DeviceId id = deserialize(keyBytes);
- masters.refresh(id);
- }
- }
-
@Deactivate
public void deactivate() {
log.info("Stopped");
@@ -75,60 +84,178 @@
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
- synchronized (this) {
- NodeId currentMaster = getMaster(deviceId);
- if (Objects.equals(currentMaster, nodeId)) {
- return null;
- }
+ byte [] did = serialize(deviceId);
+ byte [] nid = serialize(nodeId);
- // FIXME: for now implementing semantics of setMaster
- rawMasters.put(serialize(deviceId), serialize(nodeId));
- masters.put(deviceId, Optional.of(nodeId));
- return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ Integer term = rawTerms.get(did);
+ switch (role) {
+ case MASTER:
+ return null;
+ case STANDBY:
+ rawMasters.put(did, nid);
+ rawTerms.put(did, ++term);
+ backups.putIfAbsent(nid, NIL);
+ break;
+ case NONE:
+ rawMasters.put(did, nid);
+ //new switch OR state transition after being orphaned
+ if (term == null) {
+ rawTerms.put(did, INIT);
+ } else {
+ rawTerms.put(did, ++term);
+ }
+ backups.put(nid, NIL);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+ } finally {
+ lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
- return masters.getUnchecked(deviceId).orNull();
+ return deserialize(rawMasters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
- for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
- if (nodeId.equals(entry.getValue().get())) {
- builder.add(entry.getKey());
+
+ for (Map.Entry<byte[], byte[]> entry : rawMasters.entrySet()) {
+ if (nodeId.equals(deserialize(entry.getValue()))) {
+ builder.add((DeviceId) deserialize(entry.getKey()));
}
}
+
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
- // FIXME: for now we are 'selecting' as master whoever asks
- setMaster(clusterService.getLocalNode().id(), deviceId);
- return MastershipRole.MASTER;
+ // first to empty slot for device in master map is MASTER
+ // depending on how backups are organized, might need to trigger election
+ // so only controller doesn't set itself to backup for another device
+ byte [] did = serialize(deviceId);
+ NodeId local = clusterService.getLocalNode().id();
+ byte [] lnid = serialize(local);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ break;
+ case STANDBY:
+ backups.put(lnid, NIL);
+ rawTerms.putIfAbsent(did, INIT);
+ break;
+ case NONE:
+ rawMasters.put(did, lnid);
+ rawTerms.putIfAbsent(did, INIT);
+ backups.put(lnid, NIL);
+ role = MastershipRole.MASTER;
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ lock.unlock();
+ }
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- NodeId master = masters.getUnchecked(deviceId).orNull();
- return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ byte[] did = serialize(deviceId);
+
+ NodeId current = deserialize(rawMasters.get(did));
+ MastershipRole role = null;
+
+ if (current == null) {
+ //IFF no controllers have claimed mastership over it
+ role = MastershipRole.NONE;
+ } else {
+ if (current.equals(nodeId)) {
+ role = MastershipRole.MASTER;
+ } else {
+ role = MastershipRole.STANDBY;
+ }
+ }
+
+ return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
- // TODO Auto-generated method stub
- return null;
+ byte[] did = serialize(deviceId);
+
+ if ((rawMasters.get(did) == null) ||
+ (rawTerms.get(did) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(
+ (NodeId) deserialize(rawMasters.get(did)), rawTerms.get(did));
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
- // TODO Auto-generated method stub
+ byte [] did = serialize(deviceId);
+
+ ILock lock = theInstance.getLock(LOCK);
+ lock.lock();
+ try {
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ //hand off device to another
+ NodeId backup = reelect(nodeId, deviceId);
+ if (backup == null) {
+ //goes back to NONE
+ rawMasters.remove(did);
+ } else {
+ //goes to STANDBY for local, MASTER for someone else
+ Integer term = rawTerms.get(did);
+ rawMasters.put(did, serialize(backup));
+ rawTerms.put(did, ++term);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+ }
+ case STANDBY:
+ case NONE:
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ //helper for "re-electing" a new master for a given device
+ private NodeId reelect(NodeId current, DeviceId deviceId) {
+ for (byte [] node : backups.keySet()) {
+ NodeId nid = deserialize(node);
+ if (!current.equals(nid)) {
+ return nid;
+ }
+ }
return null;
}
+ //adds node to pool(s) of backup
+ private void backup(NodeId nodeId, DeviceId deviceId) {
+ //TODO might be useful to isolate out
+ }
+
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);