Separate cluster management and mastership management to separate packages

Change-Id: If1e399648451842da6dccc644b75b81337b105f9
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
new file mode 100644
index 0000000..e073b63
--- /dev/null
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -0,0 +1,338 @@
+package org.onlab.onos.store.mastership.impl;
+
+import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipStore;
+import org.onlab.onos.mastership.MastershipStoreDelegate;
+import org.onlab.onos.mastership.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.store.common.AbstractHazelcastStore;
+
+import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MultiMap;
+
+/**
+ * Distributed implementation of the mastership store. The store is
+ * responsible for the master selection process.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMastershipStore
+extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
+implements MastershipStore {
+
+    //arbitrary lock name
+    private static final String LOCK = "lock";
+    //initial term/TTL value
+    private static final Integer INIT = 0;
+
+    //devices to masters
+    protected IMap<byte[], byte[]> masters;
+    //devices to terms
+    protected IMap<byte[], Integer> terms;
+
+    //re-election related, disjoint-set structures:
+    //device-nodes multiset of available nodes
+    protected MultiMap<byte[], byte[]> standbys;
+    //device-nodes multiset for nodes that have given up on device
+    protected MultiMap<byte[], byte[]> unusable;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Override
+    @Activate
+    public void activate() {
+        super.activate();
+
+        masters = theInstance.getMap("masters");
+        terms = theInstance.getMap("terms");
+        standbys = theInstance.getMultiMap("backups");
+        unusable = theInstance.getMultiMap("unusable");
+
+        masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+        byte[] did = serialize(deviceId);
+        byte[] nid = serialize(nodeId);
+
+        NodeId current = deserialize(masters.get(did));
+        if (current == null) {
+            if (standbys.containsEntry(did, nid)) {
+                //was previously standby, or set to standby from master
+                return MastershipRole.STANDBY;
+            } else {
+                return MastershipRole.NONE;
+            }
+        } else {
+            if (current.equals(nodeId)) {
+                //*should* be in unusable, not always
+                return MastershipRole.MASTER;
+            } else {
+                //may be in backups or unusable from earlier retirement
+                return MastershipRole.STANDBY;
+            }
+        }
+    }
+
+    @Override
+    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(nodeId);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    //reinforce mastership
+                    evict(nid, did);
+                    return null;
+                case STANDBY:
+                    //make current master standby
+                    byte [] current = masters.get(did);
+                    if (current != null) {
+                        backup(current, did);
+                    }
+                    //assign specified node as new master
+                    masters.put(did, nid);
+                    evict(nid, did);
+                    updateTerm(did);
+                    return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+                case NONE:
+                    masters.put(did, nid);
+                    evict(nid, did);
+                    updateTerm(did);
+                    return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+                    return null;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public NodeId getMaster(DeviceId deviceId) {
+        return deserialize(masters.get(serialize(deviceId)));
+    }
+
+    @Override
+    public Set<DeviceId> getDevices(NodeId nodeId) {
+        ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
+
+        for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
+            if (nodeId.equals(deserialize(entry.getValue()))) {
+                builder.add((DeviceId) deserialize(entry.getKey()));
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public MastershipRole requestRole(DeviceId deviceId) {
+        NodeId local = clusterService.getLocalNode().id();
+        byte [] did = serialize(deviceId);
+        byte [] lnid = serialize(local);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(local, deviceId);
+            switch (role) {
+                case MASTER:
+                    evict(lnid, did);
+                    break;
+                case STANDBY:
+                    backup(lnid, did);
+                    terms.putIfAbsent(did, INIT);
+                    break;
+                case NONE:
+                    //claim mastership
+                    masters.put(did, lnid);
+                    evict(lnid, did);
+                    updateTerm(did);
+                    role = MastershipRole.MASTER;
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return role;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public MastershipTerm getTermFor(DeviceId deviceId) {
+        byte[] did = serialize(deviceId);
+        if ((masters.get(did) == null) ||
+                (terms.get(did) == null)) {
+            return null;
+        }
+        return MastershipTerm.of(
+                (NodeId) deserialize(masters.get(did)), terms.get(did));
+    }
+
+    @Override
+    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(nodeId);
+        MastershipEvent event = null;
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId);
+                    backup(nid, did);
+                    break;
+                case STANDBY:
+                    //fall through to reinforce role
+                case NONE:
+                    backup(nid, did);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(nodeId);
+        MastershipEvent event = null;
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    event = reelect(nodeId, deviceId);
+                    evict(nid, did);
+                    break;
+                case STANDBY:
+                    //fall through to reinforce relinquishment
+                case NONE:
+                    evict(nid, did);
+                    break;
+                default:
+                log.warn("unknown Mastership Role {}", role);
+            }
+            return event;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    //helper to fetch a new master candidate for a given device.
+    private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(current);
+
+        //if this is an queue it'd be neater.
+        byte [] backup = null;
+        for (byte [] n : standbys.get(serialize(deviceId))) {
+            if (!current.equals(deserialize(n))) {
+                backup = n;
+                break;
+            }
+        }
+
+        if (backup == null) {
+            masters.remove(did, nid);
+            return null;
+        } else {
+            masters.put(did, backup);
+            evict(backup, did);
+            Integer term = terms.get(did);
+            terms.put(did, ++term);
+            return new MastershipEvent(
+                    MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
+        }
+    }
+
+    //adds node to pool(s) of backups and moves them from unusable.
+    private void backup(byte [] nodeId, byte [] deviceId) {
+        if (!standbys.containsEntry(deviceId, nodeId)) {
+            standbys.put(deviceId, nodeId);
+        }
+        if (unusable.containsEntry(deviceId, nodeId)) {
+            unusable.remove(deviceId, nodeId);
+        }
+    }
+
+    //adds node to unusable and evicts it from backup pool.
+    private void evict(byte [] nodeId, byte [] deviceId) {
+        if (!unusable.containsEntry(deviceId, nodeId)) {
+            unusable.put(deviceId, nodeId);
+        }
+        if (standbys.containsEntry(deviceId, nodeId)) {
+            standbys.remove(deviceId, nodeId);
+        }
+    }
+
+    //adds or updates term information.
+    private void updateTerm(byte [] deviceId) {
+        Integer term = terms.get(deviceId);
+        if (term == null) {
+            terms.put(deviceId, INIT);
+        } else {
+            terms.put(deviceId, ++term);
+        }
+    }
+
+    private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
+
+        @Override
+        protected void onAdd(DeviceId deviceId, NodeId nodeId) {
+            notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+        }
+
+        @Override
+        protected void onRemove(DeviceId deviceId, NodeId nodeId) {
+            //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+        }
+
+        @Override
+        protected void onUpdate(DeviceId deviceId, NodeId oldNodeId, NodeId nodeId) {
+            //only addition indicates a change in mastership
+            //notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, nodeId));
+        }
+    }
+
+}