refactored MastershipStore to not use ILock

Change-Id: Ic254f6faddba3427d3380910ca90d3d65a29f40b
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
index 1def9f9..bc32375 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/mastership/impl/DistributedMastershipStore.java
@@ -2,6 +2,7 @@
 
 import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -22,12 +23,15 @@
 import org.onlab.onos.net.DeviceId;
 import org.onlab.onos.net.MastershipRole;
 import org.onlab.onos.store.common.AbstractHazelcastStore;
+import org.onlab.onos.store.common.SMap;
+import org.onlab.onos.store.serializers.KryoSerializer;
 
 import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.ILock;
 import com.hazelcast.core.IMap;
 import com.hazelcast.core.MultiMap;
 
+import static org.onlab.onos.net.MastershipRole.*;
+
 /**
  * Distributed implementation of the mastership store. The store is
  * responsible for the master selection process.
@@ -38,36 +42,26 @@
 extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
 implements MastershipStore {
 
-    //arbitrary lock name
-    private static final String LOCK = "lock";
     //initial term/TTL value
     private static final Integer INIT = 0;
 
-    //devices to masters
-    protected IMap<byte[], byte[]> masters;
+    //device to node roles
+    protected SMap<DeviceId, RoleValue> roleMap;
     //devices to terms
-    protected IMap<byte[], Integer> terms;
-
-    //re-election related, disjoint-set structures:
-    //device-nodes multiset of available nodes
-    protected MultiMap<byte[], byte[]> standbys;
-    //device-nodes multiset for nodes that have given up on device
-    protected MultiMap<byte[], byte[]> unusable;
+    protected SMap<DeviceId, Integer> terms;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     @Activate
     public void activate() {
         super.activate();
 
-        masters = theInstance.getMap("masters");
-        terms = theInstance.getMap("terms");
-        standbys = theInstance.getMultiMap("backups");
-        unusable = theInstance.getMultiMap("unusable");
-
-        masters.addEntryListener(new RemoteMasterShipEventHandler(), true);
+        roleMap = new SMap(theInstance.getMap("nodeRoles"), new KryoSerializer());
+        terms = new SMap(theInstance.getMap("terms"), new KryoSerializer());
+       // roleMap.addEntryListener(new RemoteMasterShipEventHandler(), true);
 
         log.info("Started");
     }
@@ -79,12 +73,9 @@
 
     @Override
     public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
-        byte[] did = serialize(deviceId);
-        byte[] nid = serialize(nodeId);
-
-        NodeId current = deserialize(masters.get(did));
+        NodeId current = getNode(MASTER, deviceId);
         if (current == null) {
-            if (standbys.containsEntry(did, nid)) {
+            if (isRole(STANDBY, nodeId, deviceId)) {
                 //was previously standby, or set to standby from master
                 return MastershipRole.STANDBY;
             } else {
@@ -103,69 +94,66 @@
 
     @Override
     public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
-        byte [] did = serialize(deviceId);
-        byte [] nid = serialize(nodeId);
 
-        ILock lock = theInstance.getLock(LOCK);
-        lock.lock();
+        MastershipRole role = getRole(nodeId, deviceId);
+        roleMap.lock(deviceId);
         try {
-            MastershipRole role = getRole(nodeId, deviceId);
+            RoleValue rv = getRoleValue(deviceId);
             switch (role) {
                 case MASTER:
                     //reinforce mastership
-                    evict(nid, did);
+                    rv.reassign(nodeId, STANDBY, NONE);
                     return null;
                 case STANDBY:
-                    //make current master standby
-                    byte [] current = masters.get(did);
+                    NodeId current = rv.get(MASTER);
                     if (current != null) {
-                        backup(current, did);
+                        //backup and replace current master
+                        rv.reassign(nodeId, NONE, STANDBY);
+                        rv.replace(current, nodeId, MASTER);
+                    } else {
+                        //no master before so just add.
+                        rv.add(MASTER, nodeId);
                     }
-                    //assign specified node as new master
-                    masters.put(did, nid);
-                    evict(nid, did);
-                    updateTerm(did);
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    updateTerm(deviceId);
                     return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
                 case NONE:
-                    masters.put(did, nid);
-                    evict(nid, did);
-                    updateTerm(did);
+                    rv.add(MASTER, nodeId);
+                    rv.reassign(nodeId, STANDBY, NONE);
+                    updateTerm(deviceId);
                     return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
                 default:
                     log.warn("unknown Mastership Role {}", role);
                     return null;
             }
         } finally {
-            lock.unlock();
+            roleMap.unlock(deviceId);
         }
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
-        return deserialize(masters.get(serialize(deviceId)));
+        return getMaster(deviceId);
     }
 
 
     @Override
     public List<NodeId> getNodes(DeviceId deviceId) {
-        byte [] did = serialize(deviceId);
         List<NodeId> nodes = new LinkedList<>();
 
-        //add current master to head - if there is one
-        ILock lock = theInstance.getLock(LOCK);
-        lock.lock();
+        //add current master to head - if there is one.
+        roleMap.lock(deviceId);
         try {
-            byte [] master = masters.get(did);
+            RoleValue rv = getRoleValue(deviceId);
+            NodeId master = rv.get(MASTER);
             if (master != null) {
-                nodes.add((NodeId) deserialize(master));
+                nodes.add(master);
             }
-
-            for (byte [] el : standbys.get(serialize(deviceId))) {
-                nodes.add((NodeId) deserialize(el));
-            }
-            return nodes;
+            //We ignore NONE nodes.
+            nodes.addAll(rv.nodesOfRole(STANDBY));
+            return Collections.unmodifiableList(nodes);
         } finally {
-            lock.unlock();
+            roleMap.unlock(deviceId);
         }
     }
 
@@ -173,9 +161,9 @@
     public Set<DeviceId> getDevices(NodeId nodeId) {
         ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
 
-        for (Map.Entry<byte[], byte[]> entry : masters.entrySet()) {
-            if (nodeId.equals(deserialize(entry.getValue()))) {
-                builder.add((DeviceId) deserialize(entry.getKey()));
+        for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
+            if (nodeId.equals(el.getValue().get(MASTER))) {
+                builder.add(el.getKey());
             }
         }
 
@@ -185,26 +173,24 @@
     @Override
     public MastershipRole requestRole(DeviceId deviceId) {
         NodeId local = clusterService.getLocalNode().id();
-        byte [] did = serialize(deviceId);
-        byte [] lnid = serialize(local);
 
-        ILock lock = theInstance.getLock(LOCK);
-        lock.lock();
+        roleMap.lock(deviceId);
         try {
+            RoleValue rv = getRoleValue(deviceId);
             MastershipRole role = getRole(local, deviceId);
             switch (role) {
                 case MASTER:
-                    evict(lnid, did);
+                    rv.reassign(local, STANDBY, NONE);
                     break;
                 case STANDBY:
-                    backup(lnid, did);
-                    terms.putIfAbsent(did, INIT);
+                    rv.reassign(local, NONE, STANDBY);
+                    terms.putIfAbsent(deviceId, INIT);
                     break;
                 case NONE:
                     //claim mastership
-                    masters.put(did, lnid);
-                    evict(lnid, did);
-                    updateTerm(did);
+                    rv.add(MASTER, local);
+                    rv.reassign(local, STANDBY, NONE);
+                    updateTerm(deviceId);
                     role = MastershipRole.MASTER;
                     break;
                 default:
@@ -212,128 +198,128 @@
             }
             return role;
         } finally {
-            lock.unlock();
+            roleMap.unlock(deviceId);
         }
     }
 
     @Override
     public MastershipTerm getTermFor(DeviceId deviceId) {
-        byte[] did = serialize(deviceId);
-        if ((masters.get(did) == null) ||
-                (terms.get(did) == null)) {
+        RoleValue rv = getRoleValue(deviceId);
+        if ((rv.get(MASTER) == null) || (terms.get(deviceId) == null)) {
             return null;
         }
-        return MastershipTerm.of(
-                (NodeId) deserialize(masters.get(did)), terms.get(did));
+        return MastershipTerm.of(rv.get(MASTER), terms.get(deviceId));
     }
 
     @Override
     public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
-        byte [] did = serialize(deviceId);
-        byte [] nid = serialize(nodeId);
         MastershipEvent event = null;
 
-        ILock lock = theInstance.getLock(LOCK);
-        lock.lock();
+        roleMap.lock(deviceId);
         try {
+            RoleValue rv = getRoleValue(deviceId);
             MastershipRole role = getRole(nodeId, deviceId);
             switch (role) {
                 case MASTER:
                     event = reelect(nodeId, deviceId);
-                    backup(nid, did);
-                    break;
+                    //fall through to reinforce role
                 case STANDBY:
                     //fall through to reinforce role
                 case NONE:
-                    backup(nid, did);
+                    rv.reassign(nodeId, NONE, STANDBY);
                     break;
                 default:
                     log.warn("unknown Mastership Role {}", role);
             }
             return event;
         } finally {
-            lock.unlock();
+            roleMap.unlock(deviceId);
         }
     }
 
     @Override
     public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
-        byte [] did = serialize(deviceId);
-        byte [] nid = serialize(nodeId);
         MastershipEvent event = null;
 
-        ILock lock = theInstance.getLock(LOCK);
-        lock.lock();
+        roleMap.lock(deviceId);
         try {
+            RoleValue rv = getRoleValue(deviceId);
             MastershipRole role = getRole(nodeId, deviceId);
             switch (role) {
                 case MASTER:
                     event = reelect(nodeId, deviceId);
-                    evict(nid, did);
-                    break;
+                    //fall through to reinforce relinquishment
                 case STANDBY:
                     //fall through to reinforce relinquishment
                 case NONE:
-                    evict(nid, did);
+                    rv.reassign(nodeId, STANDBY, NONE);
                     break;
                 default:
                 log.warn("unknown Mastership Role {}", role);
             }
             return event;
         } finally {
-            lock.unlock();
+            roleMap.unlock(deviceId);
         }
     }
 
     //helper to fetch a new master candidate for a given device.
     private MastershipEvent reelect(NodeId current, DeviceId deviceId) {
-        byte [] did = serialize(deviceId);
-        byte [] nid = serialize(current);
+        RoleValue rv = roleMap.get(deviceId);
 
         //if this is an queue it'd be neater.
-        byte [] backup = null;
-        for (byte [] n : standbys.get(serialize(deviceId))) {
-            if (!current.equals(deserialize(n))) {
+        NodeId backup = null;
+        for (NodeId n : rv.nodesOfRole(STANDBY)) {
+            if (!current.equals(n)) {
                 backup = n;
                 break;
             }
         }
 
         if (backup == null) {
-            masters.remove(did, nid);
+            rv.remove(MASTER, current);
             return null;
         } else {
-            masters.put(did, backup);
-            evict(backup, did);
-            Integer term = terms.get(did);
-            terms.put(did, ++term);
+            rv.replace(current, backup, MASTER);
+            rv.reassign(backup, STANDBY, NONE);
+            Integer term = terms.get(deviceId);
+            terms.put(deviceId, ++term);
             return new MastershipEvent(
-                    MASTER_CHANGED, deviceId, (NodeId) deserialize(backup));
+                    MASTER_CHANGED, deviceId, backup);
         }
     }
 
-    //adds node to pool(s) of backups and moves them from unusable.
-    private void backup(byte [] nodeId, byte [] deviceId) {
-        if (!standbys.containsEntry(deviceId, nodeId)) {
-            standbys.put(deviceId, nodeId);
+    //return the RoleValue structure for a device, or create one
+    private RoleValue getRoleValue(DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value == null) {
+            value = new RoleValue();
+            roleMap.put(deviceId, value);
         }
-        if (unusable.containsEntry(deviceId, nodeId)) {
-            unusable.remove(deviceId, nodeId);
-        }
+        return value;
     }
 
-    //adds node to unusable and evicts it from backup pool.
-    private void evict(byte [] nodeId, byte [] deviceId) {
-        if (!unusable.containsEntry(deviceId, nodeId)) {
-            unusable.put(deviceId, nodeId);
+    //get first applicable node out of store-unique structure.
+    private NodeId getNode(MastershipRole role, DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value != null) {
+            return value.get(role);
         }
-        if (standbys.containsEntry(deviceId, nodeId)) {
-            standbys.remove(deviceId, nodeId);
+        return null;
+    }
+
+    //check if node is a certain role given a device
+    private boolean isRole(
+            MastershipRole role, NodeId nodeId, DeviceId deviceId) {
+        RoleValue value = roleMap.get(deviceId);
+        if (value != null) {
+            return value.contains(role, nodeId);
         }
+        return false;
     }
 
     //adds or updates term information.
-    private void updateTerm(byte [] deviceId) {
+    private void updateTerm(DeviceId deviceId) {
         Integer term = terms.get(deviceId);
         if (term == null) {
             terms.put(deviceId, INIT);