tests for DistributedMastershipStore

Change-Id: Ic7daa333ac7d7947155b745daf08e4771f1189ef
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 18e6e96..dc42773 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -4,7 +4,6 @@
 import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
 
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -28,6 +27,7 @@
 import com.google.common.base.Optional;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
 import com.hazelcast.core.IMap;
 
 /**
@@ -39,8 +39,22 @@
 extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
 implements MastershipStore {
 
-    private IMap<byte[], byte[]> rawMasters;
-    private LoadingCache<DeviceId, Optional<NodeId>> masters;
+    //arbitrary lock name
+    private static final String LOCK = "lock";
+    //initial term value
+    private static final Integer INIT = 0;
+    //placeholder non-null value
+    private static final Byte NIL = 0x0;
+
+    //devices to masters
+    protected IMap<byte[], byte[]> rawMasters;
+    //devices to terms
+    protected IMap<byte[], Integer> rawTerms;
+    //collection of nodes. values are ignored, as it's used as a makeshift 'set'
+    protected IMap<byte[], Byte> backups;
+
+    //TODO - remove
+    //private LoadingCache<DeviceId, Optional<NodeId>> masters;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -51,23 +65,18 @@
         super.activate();
 
         rawMasters = theInstance.getMap("masters");
-        OptionalCacheLoader<DeviceId, NodeId> nodeLoader
-        = new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
-        masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
-        rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+        rawTerms = theInstance.getMap("terms");
+        backups = theInstance.getMap("backups");
 
-        loadMasters();
+        //TODO: hook up maps to event notification
+        //OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+        //= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
+        //masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+        //rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
 
         log.info("Started");
     }
 
-    private void loadMasters() {
-        for (byte[] keyBytes : rawMasters.keySet()) {
-            final DeviceId id = deserialize(keyBytes);
-            masters.refresh(id);
-        }
-    }
-
     @Deactivate
     public void deactivate() {
         log.info("Stopped");
@@ -75,60 +84,178 @@
 
     @Override
     public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
-        synchronized (this) {
-            NodeId currentMaster = getMaster(deviceId);
-            if (Objects.equals(currentMaster, nodeId)) {
-                return null;
-            }
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(nodeId);
 
-            // FIXME: for now implementing semantics of setMaster
-            rawMasters.put(serialize(deviceId), serialize(nodeId));
-            masters.put(deviceId, Optional.of(nodeId));
-            return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            Integer term = rawTerms.get(did);
+            switch (role) {
+                case MASTER:
+                    return null;
+                case STANDBY:
+                    rawMasters.put(did, nid);
+                    rawTerms.put(did, ++term);
+                    backups.putIfAbsent(nid, NIL);
+                    break;
+                case NONE:
+                    rawMasters.put(did, nid);
+                    //new switch OR state transition after being orphaned
+                    if (term == null) {
+                        rawTerms.put(did, INIT);
+                    } else {
+                        rawTerms.put(did, ++term);
+                    }
+                    backups.put(nid, NIL);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+                    return null;
+            }
+            return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
-        return masters.getUnchecked(deviceId).orNull();
+        return deserialize(rawMasters.get(serialize(deviceId)));
     }
 
     @Override
     public Set<DeviceId> getDevices(NodeId nodeId) {
         ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
-        for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
-            if (nodeId.equals(entry.getValue().get())) {
-                builder.add(entry.getKey());
+
+        for (Map.Entry<byte[], byte[]> entry : rawMasters.entrySet()) {
+            if (nodeId.equals(deserialize(entry.getValue()))) {
+                builder.add((DeviceId) deserialize(entry.getKey()));
             }
         }
+
         return builder.build();
     }
 
     @Override
     public MastershipRole requestRole(DeviceId deviceId) {
-        // FIXME: for now we are 'selecting' as master whoever asks
-        setMaster(clusterService.getLocalNode().id(), deviceId);
-        return MastershipRole.MASTER;
+        // first to empty slot for device in master map is MASTER
+        // depending on how backups are organized, might need to trigger election
+        // so only controller doesn't set itself to backup for another device
+        byte [] did = serialize(deviceId);
+        NodeId local = clusterService.getLocalNode().id();
+        byte [] lnid = serialize(local);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(local, deviceId);
+            switch (role) {
+                case MASTER:
+                    break;
+                case STANDBY:
+                    backups.put(lnid, NIL);
+                    rawTerms.putIfAbsent(did, INIT);
+                    break;
+                case NONE:
+                    rawMasters.put(did, lnid);
+                    rawTerms.putIfAbsent(did, INIT);
+                    backups.put(lnid, NIL);
+                    role = MastershipRole.MASTER;
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return role;
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
     public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
-        NodeId master = masters.getUnchecked(deviceId).orNull();
-        return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+        byte[] did = serialize(deviceId);
+
+        NodeId current = deserialize(rawMasters.get(did));
+        MastershipRole role = null;
+
+        if (current == null) {
+            //IFF no controllers have claimed mastership over it
+            role = MastershipRole.NONE;
+        } else {
+            if (current.equals(nodeId)) {
+                role = MastershipRole.MASTER;
+            } else {
+                role = MastershipRole.STANDBY;
+            }
+        }
+
+        return role;
     }
 
     @Override
     public MastershipTerm getTermFor(DeviceId deviceId) {
-        // TODO Auto-generated method stub
-        return null;
+        byte[] did = serialize(deviceId);
+
+        if ((rawMasters.get(did) == null) ||
+                (rawTerms.get(did) == null)) {
+            return null;
+        }
+        return MastershipTerm.of(
+                (NodeId) deserialize(rawMasters.get(did)), rawTerms.get(did));
     }
 
     @Override
     public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
-        // TODO Auto-generated method stub
+        byte [] did = serialize(deviceId);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    //hand off device to another
+                    NodeId backup = reelect(nodeId, deviceId);
+                    if (backup == null) {
+                        //goes back to NONE
+                        rawMasters.remove(did);
+                    } else {
+                        //goes to STANDBY for local, MASTER for someone else
+                        Integer term = rawTerms.get(did);
+                        rawMasters.put(did, serialize(backup));
+                        rawTerms.put(did, ++term);
+                        return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+                    }
+                case STANDBY:
+                case NONE:
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    //helper for "re-electing" a new master for a given device
+    private NodeId reelect(NodeId current, DeviceId deviceId) {
+        for (byte [] node : backups.keySet()) {
+            NodeId nid = deserialize(node);
+            if (!current.equals(nid)) {
+                return nid;
+            }
+        }
         return null;
     }
 
+    //adds node to pool(s) of backup
+    private void backup(NodeId nodeId, DeviceId deviceId) {
+        //TODO might be useful to isolate out
+    }
+
     private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
         public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
             super(cache);