fixes for RoleValue serialization
Change-Id: Ie51d0e16a0623061790523920f6a22aa18e74517
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index bc32375..d0eae2d 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -24,11 +24,12 @@
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.SMap;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.IAtomicLong;
import static org.onlab.onos.net.MastershipRole.*;
@@ -49,6 +50,9 @@
protected SMap<DeviceId, RoleValue> roleMap;
//devices to terms
protected SMap<DeviceId, Integer> terms;
+ //last-known cluster size, used for tie-breaking when partitioning occurs
+ protected IAtomicLong clusterSize;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -59,8 +63,21 @@
public void activate() {
super.activate();
- roleMap = new SMap(theInstance.getMap("nodeRoles"), new KryoSerializer());
- terms = new SMap(theInstance.getMap("terms"), new KryoSerializer());
+ this.serializer = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+
+ .register(RoleValue.class, new RoleValueSerializer())
+ .build()
+ .populate(1);
+ }
+ };
+
+ roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer);
+ terms = new SMap(theInstance.getMap("terms"), this.serializer);
+ clusterSize = theInstance.getAtomicLong("clustersize");
// roleMap.addEntryListener(new RemoteMasterShipEventHandler(), true);
log.info("Started");
@@ -103,6 +120,7 @@
case MASTER:
//reinforce mastership
rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
return null;
case STANDBY:
NodeId current = rv.get(MASTER);
@@ -115,11 +133,13 @@
rv.add(MASTER, nodeId);
}
rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
updateTerm(deviceId);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
case NONE:
rv.add(MASTER, nodeId);
rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
updateTerm(deviceId);
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
default:
@@ -133,7 +153,7 @@
@Override
public NodeId getMaster(DeviceId deviceId) {
- return getMaster(deviceId);
+ return getNode(MASTER, deviceId);
}
@@ -181,15 +201,18 @@
switch (role) {
case MASTER:
rv.reassign(local, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
break;
case STANDBY:
rv.reassign(local, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
terms.putIfAbsent(deviceId, INIT);
break;
case NONE:
//claim mastership
rv.add(MASTER, local);
rv.reassign(local, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
updateTerm(deviceId);
role = MastershipRole.MASTER;
break;
@@ -221,12 +244,13 @@
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
- event = reelect(nodeId, deviceId);
+ event = reelect(nodeId, deviceId, rv);
//fall through to reinforce role
case STANDBY:
//fall through to reinforce role
case NONE:
rv.reassign(nodeId, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
break;
default:
log.warn("unknown Mastership Role {}", role);
@@ -247,12 +271,13 @@
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
- event = reelect(nodeId, deviceId);
+ event = reelect(nodeId, deviceId, rv);
//fall through to reinforce relinquishment
case STANDBY:
//fall through to reinforce relinquishment
case NONE:
rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
break;
default:
log.warn("unknown Mastership Role {}", role);
@@ -264,8 +289,7 @@
}
//helper to fetch a new master candidate for a given device.
- private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
- RoleValue rv = roleMap.get(deviceId);
+ private MastershipEvent reelect(NodeId current, DeviceId deviceId, RoleValue rv) {
//if this is an queue it'd be neater.
NodeId backup = null;
@@ -278,10 +302,12 @@
if (backup == null) {
rv.remove(MASTER, current);
+ roleMap.put(deviceId, rv);
return null;
} else {
rv.replace(current, backup, MASTER);
rv.reassign(backup, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
Integer term = terms.get(deviceId);
terms.put(deviceId, ++term);
return new MastershipEvent(