tests for DistributedMastershipStore

Change-Id: Ic7daa333ac7d7947155b745daf08e4771f1189ef
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index b61afd2..bb221da 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -200,7 +200,7 @@
             // process.
             if (event != null) {
                 log.info("Device {} connected", deviceId);
-                mastershipService.requestRoleFor(deviceId);
+                //mastershipService.requestRoleFor(deviceId);
                 provider().roleChanged(event.subject(),
                         mastershipService.requestRoleFor(deviceId));
                 post(event);
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 18e6e96..dc42773 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -4,7 +4,6 @@
 import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
 
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -28,6 +27,7 @@
 import com.google.common.base.Optional;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
+import com.hazelcast.core.ILock;
 import com.hazelcast.core.IMap;
 
 /**
@@ -39,8 +39,22 @@
 extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
 implements MastershipStore {
 
-    private IMap<byte[], byte[]> rawMasters;
-    private LoadingCache<DeviceId, Optional<NodeId>> masters;
+    //arbitrary lock name
+    private static final String LOCK = "lock";
+    //initial term value
+    private static final Integer INIT = 0;
+    //placeholder non-null value
+    private static final Byte NIL = 0x0;
+
+    //devices to masters
+    protected IMap<byte[], byte[]> rawMasters;
+    //devices to terms
+    protected IMap<byte[], Integer> rawTerms;
+    //collection of nodes. values are ignored, as it's used as a makeshift 'set'
+    protected IMap<byte[], Byte> backups;
+
+    //TODO - remove
+    //private LoadingCache<DeviceId, Optional<NodeId>> masters;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -51,23 +65,18 @@
         super.activate();
 
         rawMasters = theInstance.getMap("masters");
-        OptionalCacheLoader<DeviceId, NodeId> nodeLoader
-        = new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
-        masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
-        rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
+        rawTerms = theInstance.getMap("terms");
+        backups = theInstance.getMap("backups");
 
-        loadMasters();
+        //TODO: hook up maps to event notification
+        //OptionalCacheLoader<DeviceId, NodeId> nodeLoader
+        //= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
+        //masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
+        //rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
 
         log.info("Started");
     }
 
-    private void loadMasters() {
-        for (byte[] keyBytes : rawMasters.keySet()) {
-            final DeviceId id = deserialize(keyBytes);
-            masters.refresh(id);
-        }
-    }
-
     @Deactivate
     public void deactivate() {
         log.info("Stopped");
@@ -75,60 +84,178 @@
 
     @Override
     public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
-        synchronized (this) {
-            NodeId currentMaster = getMaster(deviceId);
-            if (Objects.equals(currentMaster, nodeId)) {
-                return null;
-            }
+        byte [] did = serialize(deviceId);
+        byte [] nid = serialize(nodeId);
 
-            // FIXME: for now implementing semantics of setMaster
-            rawMasters.put(serialize(deviceId), serialize(nodeId));
-            masters.put(deviceId, Optional.of(nodeId));
-            return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            Integer term = rawTerms.get(did);
+            switch (role) {
+                case MASTER:
+                    return null;
+                case STANDBY:
+                    rawMasters.put(did, nid);
+                    rawTerms.put(did, ++term);
+                    backups.putIfAbsent(nid, NIL);
+                    break;
+                case NONE:
+                    rawMasters.put(did, nid);
+                    //new switch OR state transition after being orphaned
+                    if (term == null) {
+                        rawTerms.put(did, INIT);
+                    } else {
+                        rawTerms.put(did, ++term);
+                    }
+                    backups.put(nid, NIL);
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+                    return null;
+            }
+            return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
-        return masters.getUnchecked(deviceId).orNull();
+        return deserialize(rawMasters.get(serialize(deviceId)));
     }
 
     @Override
     public Set<DeviceId> getDevices(NodeId nodeId) {
         ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
-        for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
-            if (nodeId.equals(entry.getValue().get())) {
-                builder.add(entry.getKey());
+
+        for (Map.Entry<byte[], byte[]> entry : rawMasters.entrySet()) {
+            if (nodeId.equals(deserialize(entry.getValue()))) {
+                builder.add((DeviceId) deserialize(entry.getKey()));
             }
         }
+
         return builder.build();
     }
 
     @Override
     public MastershipRole requestRole(DeviceId deviceId) {
-        // FIXME: for now we are 'selecting' as master whoever asks
-        setMaster(clusterService.getLocalNode().id(), deviceId);
-        return MastershipRole.MASTER;
+        // first to empty slot for device in master map is MASTER
+        // depending on how backups are organized, might need to trigger election
+        // so only controller doesn't set itself to backup for another device
+        byte [] did = serialize(deviceId);
+        NodeId local = clusterService.getLocalNode().id();
+        byte [] lnid = serialize(local);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(local, deviceId);
+            switch (role) {
+                case MASTER:
+                    break;
+                case STANDBY:
+                    backups.put(lnid, NIL);
+                    rawTerms.putIfAbsent(did, INIT);
+                    break;
+                case NONE:
+                    rawMasters.put(did, lnid);
+                    rawTerms.putIfAbsent(did, INIT);
+                    backups.put(lnid, NIL);
+                    role = MastershipRole.MASTER;
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return role;
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
     public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
-        NodeId master = masters.getUnchecked(deviceId).orNull();
-        return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
+        byte[] did = serialize(deviceId);
+
+        NodeId current = deserialize(rawMasters.get(did));
+        MastershipRole role = null;
+
+        if (current == null) {
+            //IFF no controllers have claimed mastership over it
+            role = MastershipRole.NONE;
+        } else {
+            if (current.equals(nodeId)) {
+                role = MastershipRole.MASTER;
+            } else {
+                role = MastershipRole.STANDBY;
+            }
+        }
+
+        return role;
     }
 
     @Override
     public MastershipTerm getTermFor(DeviceId deviceId) {
-        // TODO Auto-generated method stub
-        return null;
+        byte[] did = serialize(deviceId);
+
+        if ((rawMasters.get(did) == null) ||
+                (rawTerms.get(did) == null)) {
+            return null;
+        }
+        return MastershipTerm.of(
+                (NodeId) deserialize(rawMasters.get(did)), rawTerms.get(did));
     }
 
     @Override
     public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
-        // TODO Auto-generated method stub
+        byte [] did = serialize(deviceId);
+
+        ILock lock = theInstance.getLock(LOCK);
+        lock.lock();
+        try {
+            MastershipRole role = getRole(nodeId, deviceId);
+            switch (role) {
+                case MASTER:
+                    //hand off device to another
+                    NodeId backup = reelect(nodeId, deviceId);
+                    if (backup == null) {
+                        //goes back to NONE
+                        rawMasters.remove(did);
+                    } else {
+                        //goes to STANDBY for local, MASTER for someone else
+                        Integer term = rawTerms.get(did);
+                        rawMasters.put(did, serialize(backup));
+                        rawTerms.put(did, ++term);
+                        return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
+                    }
+                case STANDBY:
+                case NONE:
+                    break;
+                default:
+                    log.warn("unknown Mastership Role {}", role);
+            }
+            return null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    //helper for "re-electing" a new master for a given device
+    private NodeId reelect(NodeId current, DeviceId deviceId) {
+        for (byte [] node : backups.keySet()) {
+            NodeId nid = deserialize(node);
+            if (!current.equals(nid)) {
+                return nid;
+            }
+        }
         return null;
     }
 
+    //adds node to pool(s) of backup
+    private void backup(NodeId nodeId, DeviceId deviceId) {
+        //TODO might be useful to isolate out
+    }
+
     private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
         public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
             super(cache);
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
new file mode 100644
index 0000000..81ddce0
--- /dev/null
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -0,0 +1,276 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.onlab.onos.net.MastershipRole.*;
+
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipEvent.Type;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.common.StoreManager;
+import org.onlab.onos.store.common.StoreService;
+import org.onlab.onos.store.common.TestStoreManager;
+import org.onlab.onos.store.serializers.KryoSerializationManager;
+import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Sets;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+
+/**
+ * Test of the Hazelcast-based distributed MastershipStore implementation.
+ */
+public class DistributedMastershipStoreTest {
+
+    private static final DeviceId DID1 = DeviceId.deviceId("of:01");
+    private static final DeviceId DID2 = DeviceId.deviceId("of:02");
+    private static final DeviceId DID3 = DeviceId.deviceId("of:03");
+    private static final DeviceId DID4 = DeviceId.deviceId("of:04");
+
+    private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
+
+    private static final NodeId N1 = new NodeId("node1");
+    private static final NodeId N2 = new NodeId("node2");
+
+    private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
+    private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
+
+    private DistributedMastershipStore dms;
+    private TestDistributedMastershipStore testStore;
+    private KryoSerializationManager serializationMgr;
+    private StoreManager storeMgr;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        // TODO should find a way to clean Hazelcast instance without shutdown.
+        Config config = TestStoreManager.getTestConfig();
+
+        storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
+        storeMgr.activate();
+
+        serializationMgr = new KryoSerializationManager();
+        serializationMgr.activate();
+
+        dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
+        dms.clusterService = new TestClusterService();
+        dms.activate();
+
+        testStore = (TestDistributedMastershipStore) dms;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dms.deactivate();
+
+        serializationMgr.deactivate();
+
+        storeMgr.deactivate();
+    }
+
+    @Test
+    public void getRole() {
+        assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
+        testStore.put(DID1, N1, true, true, true);
+        assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
+        assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
+    }
+
+    @Test
+    public void getMaster() {
+        assertTrue("wrong store state:", dms.rawMasters.isEmpty());
+
+        testStore.put(DID1, N1, true, false, false);
+        assertEquals("wrong master:", N1, dms.getMaster(DID1));
+        assertNull("wrong master:", dms.getMaster(DID2));
+    }
+
+    @Test
+    public void getDevices() {
+        assertTrue("wrong store state:", dms.rawMasters.isEmpty());
+
+        testStore.put(DID1, N1, true, false, false);
+        testStore.put(DID2, N1, true, false, false);
+        testStore.put(DID3, N2, true, false, false);
+
+        assertEquals("wrong devices",
+                Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
+    }
+
+    @Test
+    public void requestRoleAndTerm() {
+        //CN1 is "local"
+        testStore.setCurrent(CN1);
+
+        //if already MASTER, nothing should happen
+        testStore.put(DID2, N1, true, false, false);
+        assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+        assertTrue("wrong state for store:",
+                dms.backups.isEmpty() & dms.rawTerms.isEmpty());
+
+        //populate maps with DID1, N1 thru NONE case
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertTrue("wrong state for store:",
+                !dms.backups.isEmpty() & !dms.rawTerms.isEmpty());
+        assertEquals("wrong term",
+                MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
+
+        //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
+        testStore.setCurrent(CN2);
+        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong number of entries:", 2, dms.rawTerms.size());
+
+        //change term and requestRole() again; should persist
+        testStore.increment(DID2);
+        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
+    }
+
+    @Test
+    public void setMaster() {
+        //populate maps with DID1, N1 as MASTER thru NONE case
+        testStore.setCurrent(CN1);
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertNull("wrong event:", dms.setMaster(N1, DID1));
+
+        //switch over to N2
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+        assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
+
+        //orphan switch - should be rare case
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+        assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
+        //disconnect and reconnect - sign of failing re-election or single-instance channel
+        testStore.reset(true, false, false);
+        dms.setMaster(N2, DID2);
+        assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
+    }
+
+    @Test
+    public void unsetMaster() {
+        //populate maps with DID1, N1 as MASTER thru NONE case
+        testStore.setCurrent(CN1);
+        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        //no backup, no new MASTER/event
+        assertNull("wrong event:", dms.unsetMaster(N1, DID1));
+        //add backup CN2, get it elected MASTER
+        dms.requestRole(DID1);
+        testStore.setCurrent(CN2);
+        dms.requestRole(DID1);
+        assertEquals("wrong event:", Type.MASTER_CHANGED, dms.unsetMaster(N1, DID1).type());
+        assertEquals("wrong master", N2, dms.getMaster(DID1));
+
+        //STANDBY - nothing here, either
+        assertNull("wrong event:", dms.unsetMaster(N1, DID1));
+        assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
+
+        //NONE - nothing happens
+        assertNull("wrong event:", dms.unsetMaster(N1, DID2));
+        assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
+    }
+
+    private class TestDistributedMastershipStore extends
+            DistributedMastershipStore {
+        public TestDistributedMastershipStore(StoreService storeService,
+                KryoSerializationService kryoSerializationService) {
+            this.storeService = storeService;
+            this.kryoSerializationService = kryoSerializationService;
+        }
+
+        //helper to populate master/backup structures
+        public void put(DeviceId dev, NodeId node,
+                boolean store, boolean backup, boolean term) {
+            if (store) {
+                dms.rawMasters.put(serialize(dev), serialize(node));
+            }
+            if (backup) {
+                dms.backups.put(serialize(node), (byte) 0);
+            }
+            if (term) {
+                dms.rawTerms.put(serialize(dev), 0);
+            }
+        }
+
+        //clears structures
+        public void reset(boolean store, boolean backup, boolean term) {
+            if (store) {
+                dms.rawMasters.clear();
+            }
+            if (backup) {
+                dms.backups.clear();
+            }
+            if (term) {
+                dms.rawTerms.clear();
+            }
+        }
+
+        //increment term for a device
+        public void increment(DeviceId dev) {
+            Integer t = dms.rawTerms.get(serialize(dev));
+            if (t != null) {
+                dms.rawTerms.put(serialize(dev), ++t);
+            }
+        }
+
+        //sets the "local" node
+        public void setCurrent(ControllerNode node) {
+            ((TestClusterService) clusterService).current = node;
+        }
+    }
+
+    private class TestClusterService implements ClusterService {
+
+        protected ControllerNode current;
+
+        @Override
+        public ControllerNode getLocalNode() {
+            return current;
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return Sets.newHashSet(CN1, CN2);
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return null;
+        }
+
+        @Override
+        public State getState(NodeId nodeId) {
+            return null;
+        }
+
+        @Override
+        public void addListener(ClusterEventListener listener) {
+        }
+
+        @Override
+        public void removeListener(ClusterEventListener listener) {
+        }
+
+    }
+}