[ONOS-6371] (vNet) simple mastership store

Implement simple mastership store that trivial and in-memory
inventory for mastership.

Change-Id: I713d1379591cfdddf7803d68817040096bda8bdf
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
index bd972b2..47b9c6f 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
@@ -16,10 +16,20 @@
 
 package org.onosproject.incubator.store.virtual.impl;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
 import org.onosproject.incubator.net.virtual.NetworkId;
@@ -31,9 +41,19 @@
 import org.onosproject.net.MastershipRole;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -48,8 +68,27 @@
 
     private final Logger log = getLogger(getClass());
 
+    private static final int NOTHING = 0;
+    private static final int INIT = 1;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    //devices mapped to their masters, to emulate multiple nodes
+    protected final Map<NetworkId, Map<DeviceId, NodeId>> masterMapByNetwork =
+            new HashMap<>();
+    //emulate backups with pile of nodes
+    protected final Map<NetworkId, Map<DeviceId, List<NodeId>>> backupsByNetwork =
+            new HashMap<>();
+    //terms
+    protected final Map<NetworkId, Map<DeviceId, AtomicInteger>> termMapByNetwork =
+            new HashMap<>();
+
     @Activate
     public void activate() {
+        if (clusterService == null) {
+            clusterService = createFakeClusterService();
+        }
         log.info("Started");
     }
 
@@ -59,55 +98,399 @@
     }
 
     @Override
-    public CompletableFuture<MastershipRole> requestRole(NetworkId networkId, DeviceId deviceId) {
-        return null;
+    public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
+                                                         DeviceId deviceId) {
+        //query+possible reelection
+        NodeId node = clusterService.getLocalNode().id();
+        MastershipRole role = getRole(networkId, node, deviceId);
+
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+
+        switch (role) {
+            case MASTER:
+                return CompletableFuture.completedFuture(MastershipRole.MASTER);
+            case STANDBY:
+                if (getMaster(networkId, deviceId) == null) {
+                    // no master => become master
+                    masterMap.put(deviceId, node);
+                    incrementTerm(networkId, deviceId);
+                    // remove from backup list
+                    removeFromBackups(networkId, deviceId, node);
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
+                                                       getNodes(networkId, deviceId)));
+                    return CompletableFuture.completedFuture(MastershipRole.MASTER);
+                }
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
+            case NONE:
+                if (getMaster(networkId, deviceId) == null) {
+                    // no master => become master
+                    masterMap.put(deviceId, node);
+                    incrementTerm(networkId, deviceId);
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
+                                                       getNodes(networkId, deviceId)));
+                    return CompletableFuture.completedFuture(MastershipRole.MASTER);
+                }
+                // add to backup list
+                if (addToBackup(networkId, deviceId, node)) {
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
+                                                       getNodes(networkId, deviceId)));
+                }
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
+            default:
+                log.warn("unknown Mastership Role {}", role);
+        }
+        return CompletableFuture.completedFuture(role);
     }
 
     @Override
     public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+
+        //just query
+        NodeId current = masterMap.get(deviceId);
+        MastershipRole role;
+
+        if (current != null && current.equals(nodeId)) {
+            return MastershipRole.MASTER;
+        }
+
+        if (backups.getOrDefault(deviceId, Collections.emptyList()).contains(nodeId)) {
+            role = MastershipRole.STANDBY;
+        } else {
+            role = MastershipRole.NONE;
+        }
+        return role;
     }
 
     @Override
     public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        return masterMap.get(deviceId);
     }
 
     @Override
     public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+
+        return new RoleInfo(masterMap.get(deviceId),
+                            backups.getOrDefault(deviceId, ImmutableList.of()));
     }
 
     @Override
     public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+
+        Set<DeviceId> ids = new HashSet<>();
+        for (Map.Entry<DeviceId, NodeId> d : masterMap.entrySet()) {
+            if (Objects.equals(d.getValue(), nodeId)) {
+                ids.add(d.getKey());
+            }
+        }
+        return ids;
     }
 
     @Override
-    public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
+    public synchronized CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
                                                         NodeId nodeId, DeviceId deviceId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+
+        MastershipRole role = getRole(networkId, nodeId, deviceId);
+        switch (role) {
+            case MASTER:
+                // no-op
+                return CompletableFuture.completedFuture(null);
+            case STANDBY:
+            case NONE:
+                NodeId prevMaster = masterMap.put(deviceId, nodeId);
+                incrementTerm(networkId, deviceId);
+                removeFromBackups(networkId, deviceId, nodeId);
+                addToBackup(networkId, deviceId, prevMaster);
+                break;
+            default:
+                log.warn("unknown Mastership Role {}", role);
+                return null;
+        }
+
+        return CompletableFuture.completedFuture(
+                new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
     }
 
     @Override
     public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
-        return null;
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+
+        if ((termMap.get(deviceId) == null)) {
+            return MastershipTerm.of(masterMap.get(deviceId), NOTHING);
+        }
+        return MastershipTerm.of(
+                masterMap.get(deviceId), termMap.get(deviceId).get());
     }
 
     @Override
     public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
                                                          NodeId nodeId, DeviceId deviceId) {
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+
+        MastershipRole role = getRole(networkId, nodeId, deviceId);
+        switch (role) {
+            case MASTER:
+                NodeId backup = reelect(networkId, deviceId, nodeId);
+                if (backup == null) {
+                    // no master alternative
+                    masterMap.remove(deviceId);
+                    // TODO: Should there be new event type for no MASTER?
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(MASTER_CHANGED, deviceId,
+                                                getNodes(networkId, deviceId)));
+                } else {
+                    NodeId prevMaster = masterMap.put(deviceId, backup);
+                    incrementTerm(networkId, deviceId);
+                    addToBackup(networkId, deviceId, prevMaster);
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(MASTER_CHANGED, deviceId,
+                                                getNodes(networkId, deviceId)));
+                }
+
+            case STANDBY:
+            case NONE:
+                boolean modified = addToBackup(networkId, deviceId, nodeId);
+                if (modified) {
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(BACKUPS_CHANGED, deviceId,
+                                                getNodes(networkId, deviceId)));
+                }
+                break;
+
+            default:
+                log.warn("unknown Mastership Role {}", role);
+        }
         return null;
     }
 
+
+    /**
+     * Dumbly selects next-available node that's not the current one.
+     * emulate leader election.
+     *
+     * @param networkId a virtual network identifier
+     * @param deviceId a virtual device identifier
+     * @param nodeId a nod identifier
+     * @return Next available node as a leader
+     */
+    private synchronized NodeId reelect(NetworkId networkId, DeviceId deviceId,
+                                        NodeId nodeId) {
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+
+        List<NodeId> stbys = backups.getOrDefault(deviceId, Collections.emptyList());
+        NodeId backup = null;
+        for (NodeId n : stbys) {
+            if (!n.equals(nodeId)) {
+                backup = n;
+                break;
+            }
+        }
+        stbys.remove(backup);
+        return backup;
+    }
+
     @Override
-    public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
-                                                             NodeId nodeId, DeviceId deviceId) {
-        return null;
+    public synchronized CompletableFuture<MastershipEvent>
+    relinquishRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
+    Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+
+        MastershipRole role = getRole(networkId, nodeId, deviceId);
+        switch (role) {
+            case MASTER:
+                NodeId backup = reelect(networkId, deviceId, nodeId);
+                masterMap.put(deviceId, backup);
+                incrementTerm(networkId, deviceId);
+                return CompletableFuture.completedFuture(
+                        new MastershipEvent(MASTER_CHANGED, deviceId,
+                                            getNodes(networkId, deviceId)));
+
+            case STANDBY:
+                if (removeFromBackups(networkId, deviceId, nodeId)) {
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(BACKUPS_CHANGED, deviceId,
+                                                getNodes(networkId, deviceId)));
+                }
+                break;
+
+            case NONE:
+                break;
+
+            default:
+                log.warn("unknown Mastership Role {}", role);
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
+        Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
 
+        List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
+        Set<DeviceId> toRelinquish = new HashSet<>();
+
+        masterMap.entrySet().stream()
+                .filter(entry -> nodeId.equals(entry.getValue()))
+                .forEach(entry -> toRelinquish.add(entry.getKey()));
+
+        backups.entrySet().stream()
+                .filter(entry -> entry.getValue().contains(nodeId))
+                .forEach(entry -> toRelinquish.add(entry.getKey()));
+
+        toRelinquish.forEach(deviceId -> eventFutures.add(
+                relinquishRole(networkId, nodeId, deviceId)));
+
+        eventFutures.forEach(future -> {
+            future.whenComplete((event, error) -> notifyDelegate(networkId, event));
+        });
+    }
+
+    /**
+     * Increase the term for a device, and store it.
+     *
+     * @param networkId a virtual network identifier
+     * @param deviceId a virtual device identifier
+     */
+    private synchronized void incrementTerm(NetworkId networkId, DeviceId deviceId) {
+        Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+
+        AtomicInteger term = termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING));
+        term.incrementAndGet();
+        termMap.put(deviceId, term);
+    }
+
+    /**
+     * Remove backup node for a device.
+     *
+     * @param networkId a virtual network identifier
+     * @param deviceId a virtual device identifier
+     * @param nodeId a node identifier
+     * @return True if success
+     */
+    private synchronized boolean removeFromBackups(NetworkId networkId,
+                                                   DeviceId deviceId, NodeId nodeId) {
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+
+        List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
+        boolean modified = stbys.remove(nodeId);
+        backups.put(deviceId, stbys);
+        return modified;
+    }
+
+    /**
+     * add to backup if not there already, silently ignores null node.
+     *
+     * @param networkId a virtual network identifier
+     * @param deviceId a virtual device identifier
+     * @param nodeId a node identifier
+     * @return True if success
+     */
+    private synchronized boolean addToBackup(NetworkId networkId,
+                                             DeviceId deviceId, NodeId nodeId) {
+        Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+
+        boolean modified = false;
+        List<NodeId> stbys = backups.getOrDefault(deviceId, new ArrayList<>());
+        if (nodeId != null && !stbys.contains(nodeId)) {
+            stbys.add(nodeId);
+            backups.put(deviceId, stbys);
+            modified = true;
+        }
+        return modified;
+    }
+
+    /**
+     * Returns deviceId-master map for a specified virtual network.
+     *
+     * @param networkId a virtual network identifier
+     * @return DeviceId-master map of a given virtual network.
+     */
+    private Map<DeviceId, NodeId> getMasterMap(NetworkId networkId) {
+        return masterMapByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
+    }
+
+    /**
+     * Returns deviceId-backups map for a specified virtual network.
+     *
+     * @param networkId a virtual network identifier
+     * @return DeviceId-backups map of a given virtual network.
+     */
+    private Map<DeviceId, List<NodeId>> getBackups(NetworkId networkId) {
+        return backupsByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
+    }
+
+    /**
+     * Returns deviceId-terms map for a specified virtual network.
+     *
+     * @param networkId a virtual network identifier
+     * @return DeviceId-terms map of a given virtual network.
+     */
+    private Map<DeviceId, AtomicInteger> getTermMap(NetworkId networkId) {
+        return termMapByNetwork.computeIfAbsent(networkId, k -> new HashMap<>());
+    }
+
+    /**
+     * Returns a fake cluster service for a test purpose only.
+     *
+     * @return a fake cluster service
+     */
+    private ClusterService createFakeClusterService() {
+        // just for ease of unit test
+        final ControllerNode instance =
+                new DefaultControllerNode(new NodeId("local"),
+                                          IpAddress.valueOf("127.0.0.1"));
+
+        ClusterService faceClusterService = new ClusterService() {
+
+            private final DateTime creationTime = DateTime.now();
+
+            @Override
+            public ControllerNode getLocalNode() {
+                return instance;
+            }
+
+            @Override
+            public Set<ControllerNode> getNodes() {
+                return ImmutableSet.of(instance);
+            }
+
+            @Override
+            public ControllerNode getNode(NodeId nodeId) {
+                if (instance.id().equals(nodeId)) {
+                    return instance;
+                }
+                return null;
+            }
+
+            @Override
+            public ControllerNode.State getState(NodeId nodeId) {
+                if (instance.id().equals(nodeId)) {
+                    return ControllerNode.State.ACTIVE;
+                } else {
+                    return ControllerNode.State.INACTIVE;
+                }
+            }
+
+            @Override
+            public DateTime getLastUpdated(NodeId nodeId) {
+                return creationTime;
+            }
+
+            @Override
+            public void addListener(ClusterEventListener listener) {
+            }
+
+            @Override
+            public void removeListener(ClusterEventListener listener) {
+            }
+        };
+        return faceClusterService;
     }
 }