moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle
Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..ecc8982
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,372 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.cluster.RoleInfo;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.hz.AbstractHazelcastStore;
+import org.onlab.onos.store.hz.SMap;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoNamespace;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.MapEvent;
+
+import static org.onlab.onos.net.MastershipRole.*;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+ //initial term/TTL value
+ private static final Integer INIT = 0;
+
+ //device to node roles
+ protected SMap<DeviceId, RoleValue> roleMap;
+ //devices to terms
+ protected SMap<DeviceId, Integer> terms;
+ //last-known cluster size, used for tie-breaking when partitioning occurs
+ protected IAtomicLong clusterSize;
+
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Override
+ @Activate
+ public void activate() {
+ super.activate();
+
+ this.serializer = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+
+ .register(RoleValue.class, new RoleValueSerializer())
+ .build()
+ .populate(1);
+ }
+ };
+
+ roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
+ roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
+ terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
+ clusterSize = theInstance.getAtomicLong("clustersize");
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+ final RoleValue roleInfo = getRoleValue(deviceId);
+ if (roleInfo.contains(MASTER, nodeId)) {
+ return MASTER;
+ }
+ if (roleInfo.contains(STANDBY, nodeId)) {
+ return STANDBY;
+ }
+ return NONE;
+ }
+
+ @Override
+ public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+
+ MastershipRole role = getRole(nodeId, deviceId);
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ switch (role) {
+ case MASTER:
+ //reinforce mastership
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ return null;
+ case STANDBY:
+ case NONE:
+ NodeId current = rv.get(MASTER);
+ if (current != null) {
+ //backup and replace current master
+ rv.reassign(current, NONE, STANDBY);
+ rv.replace(current, nodeId, MASTER);
+ } else {
+ //no master before so just add.
+ rv.add(MASTER, nodeId);
+ }
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ updateTerm(deviceId);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ return null;
+ }
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public NodeId getMaster(DeviceId deviceId) {
+ return getNode(MASTER, deviceId);
+ }
+
+
+ @Override
+ public RoleInfo getNodes(DeviceId deviceId) {
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ return rv.roleInfo();
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NodeId nodeId) {
+ ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+ for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+ if (nodeId.equals(el.getValue().get(MASTER))) {
+ builder.add(el.getKey());
+ }
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public MastershipRole requestRole(DeviceId deviceId) {
+ NodeId local = clusterService.getLocalNode().id();
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(local, deviceId);
+ switch (role) {
+ case MASTER:
+ rv.reassign(local, STANDBY, NONE);
+ terms.putIfAbsent(deviceId, INIT);
+ roleMap.put(deviceId, rv);
+ break;
+ case STANDBY:
+ rv.reassign(local, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ terms.putIfAbsent(deviceId, INIT);
+ break;
+ case NONE:
+ //either we're the first standby, or first to device.
+ //for latter, claim mastership.
+ if (rv.get(MASTER) == null) {
+ rv.add(MASTER, local);
+ rv.reassign(local, STANDBY, NONE);
+ updateTerm(deviceId);
+ role = MastershipRole.MASTER;
+ } else {
+ rv.add(STANDBY, local);
+ rv.reassign(local, NONE, STANDBY);
+ role = MastershipRole.STANDBY;
+ }
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return role;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipTerm getTermFor(DeviceId deviceId) {
+ RoleValue rv = getRoleValue(deviceId);
+ if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
+ return null;
+ }
+ return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
+ }
+
+ @Override
+ public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce role
+ case STANDBY:
+ //fall through to reinforce role
+ case NONE:
+ rv.reassign(nodeId, NONE, STANDBY);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ @Override
+ public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ MastershipEvent event = null;
+
+ roleMap.lock(deviceId);
+ try {
+ RoleValue rv = getRoleValue(deviceId);
+ MastershipRole role = getRole(nodeId, deviceId);
+ switch (role) {
+ case MASTER:
+ event = reelect(nodeId, deviceId, rv);
+ //fall through to reinforce relinquishment
+ case STANDBY:
+ //fall through to reinforce relinquishment
+ case NONE:
+ rv.reassign(nodeId, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ break;
+ default:
+ log.warn("unknown Mastership Role {}", role);
+ }
+ return event;
+ } finally {
+ roleMap.unlock(deviceId);
+ }
+ }
+
+ //helper to fetch a new master candidate for a given device.
+ private MastershipEvent reelect(
+ NodeId current, DeviceId deviceId, RoleValue rv) {
+
+ //if this is an queue it'd be neater.
+ NodeId backup = null;
+ for (NodeId n : rv.nodesOfRole(STANDBY)) {
+ if (!current.equals(n)) {
+ backup = n;
+ break;
+ }
+ }
+
+ if (backup == null) {
+ log.info("{} giving up and going to NONE for {}", current, deviceId);
+ rv.remove(MASTER, current);
+ roleMap.put(deviceId, rv);
+ return null;
+ } else {
+ log.info("{} trying to pass mastership for {} to {}", current, deviceId, backup);
+ rv.replace(current, backup, MASTER);
+ rv.reassign(backup, STANDBY, NONE);
+ roleMap.put(deviceId, rv);
+ Integer term = terms.get(deviceId);
+ terms.put(deviceId, ++term);
+ return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ }
+ }
+
+ //return the RoleValue structure for a device, or create one
+ private RoleValue getRoleValue(DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value == null) {
+ value = new RoleValue();
+ RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
+ if (concurrentlyAdded != null) {
+ return concurrentlyAdded;
+ }
+ }
+ return value;
+ }
+
+ //get first applicable node out of store-unique structure.
+ private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+ RoleValue value = roleMap.get(deviceId);
+ if (value != null) {
+ return value.get(role);
+ }
+ return null;
+ }
+
+ //adds or updates term information.
+ private void updateTerm(DeviceId deviceId) {
+ terms.lock(deviceId);
+ try {
+ Integer term = terms.get(deviceId);
+ if (term == null) {
+ terms.put(deviceId, INIT);
+ } else {
+ terms.put(deviceId, ++term);
+ }
+ } finally {
+ terms.unlock(deviceId);
+ }
+ }
+
+ private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
+
+ @Override
+ public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
+
+ notifyDelegate(new MastershipEvent(
+ MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
+ }
+
+ @Override
+ public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
+ }
+
+ @Override
+ public void mapEvicted(MapEvent event) {
+ }
+
+ @Override
+ public void mapCleared(MapEvent event) {
+ }
+ }
+
+}