Fix balance-masters functionality in the new LeadershipService based device mastership store

Change-Id: I9f64d514cee7d5a5383fd4c2fa30a8616c97785c
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 44d84db..c228d32 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -67,6 +67,32 @@
     void withdraw(String path);
 
     /**
+     * If the local nodeId is the leader for specified topic, this method causes it to
+     * step down temporarily from leadership.
+     * <p>
+     * The node will continue to be in contention for leadership and can
+     * potentially become the leader again if and when it becomes the highest
+     * priority candidate
+     * <p>
+     * If the local nodeId is not the leader, this method will be a noop.
+     *
+     * @param path topic for which this controller node should give up leadership
+     * @return true if this node stepped down from leadership, false otherwise
+     */
+    boolean stepdown(String path);
+
+    /**
+     * Moves the specified nodeId to the top of the candidates list for the topic.
+     * <p>
+     * If the node is not a candidate for this topic, this method will be a noop.
+     *
+     * @param path leadership topic
+     * @param nodeId nodeId to make the top candidate
+     * @return true if nodeId is now the top candidate, false otherwise
+     */
+    boolean makeTopCandidate(String path, NodeId nodeId);
+
+    /**
      * Returns the current leader board.
      *
      * @return mapping from topic to leadership info.
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index 02742d4..abc92f5 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -73,4 +73,14 @@
     public List<NodeId> getCandidates(String path) {
         return null;
     }
-}
+
+    @Override
+    public boolean stepdown(String path) {
+        return false;
+    }
+
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 85ebf9e..e34790d 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -24,8 +24,6 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.metrics.MetricsService;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -52,8 +50,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 import static org.onlab.metrics.MetricsUtil.startTimer;
@@ -91,7 +87,6 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MetricsService metricsService;
 
-    private ClusterEventListener clusterListener = new InternalClusterEventListener();
     private Timer requestRoleTimer;
 
     @Activate
@@ -99,7 +94,6 @@
         requestRoleTimer = createTimer("Mastership", "requestRole", "responseTime");
 
         eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
-        clusterService.addListener(clusterListener);
         store.setDelegate(delegate);
         log.info("Started");
     }
@@ -107,7 +101,6 @@
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(MastershipEvent.class);
-        clusterService.removeListener(clusterListener);
         store.unsetDelegate(delegate);
         log.info("Stopped");
     }
@@ -282,52 +275,6 @@
         }
     }
 
-    //callback for reacting to cluster events
-    private class InternalClusterEventListener implements ClusterEventListener {
-
-        // A notion of a local maximum cluster size, used to tie-break.
-        // Think of a better way to do this.
-        private AtomicInteger clusterSize;
-
-        InternalClusterEventListener() {
-            clusterSize = new AtomicInteger(0);
-        }
-
-        @Override
-        public void event(ClusterEvent event) {
-            switch (event.type()) {
-                case INSTANCE_ADDED:
-                case INSTANCE_ACTIVATED:
-                    clusterSize.incrementAndGet();
-                    log.info("instance {} added/activated", event.subject());
-                    break;
-                case INSTANCE_REMOVED:
-                case INSTANCE_DEACTIVATED:
-                    ControllerNode node = event.subject();
-                    log.info("instance {} removed/deactivated", node);
-                    store.relinquishAllRole(node.id());
-
-                    clusterSize.decrementAndGet();
-                    break;
-                default:
-                    log.warn("unknown cluster event {}", event);
-            }
-        }
-
-        // Can be removed if we go with naive split-brain handling: only majority
-        // assigns mastership
-        private boolean isInMajority() {
-            if (clusterService.getNodes().size() > (clusterSize.intValue() / 2)) {
-                return true;
-            }
-
-             //FIXME: break tie for equal-sized clusters,
-
-            return false;
-        }
-
-    }
-
     public class InternalDelegate implements MastershipStoreDelegate {
 
         @Override
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 58d0512..d975d8a 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -536,7 +536,7 @@
         if (myNextRole == NONE) {
             mastershipService.requestRoleFor(did);
             MastershipTerm term = termService.getMastershipTerm(did);
-            if (myNodeId.equals(term.master())) {
+            if (term != null && myNodeId.equals(term.master())) {
                 myNextRole = MASTER;
             } else {
                 myNextRole = STANDBY;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index ddd0b9e..d5dc43e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -584,4 +584,14 @@
     public List<NodeId> getCandidates(String path) {
         return null;
     }
+
+    @Override
+    public boolean stepdown(String path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 0b2b4cb..74dae01 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -34,6 +34,7 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -48,7 +49,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
-
 import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
 import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
 
@@ -210,7 +210,7 @@
                     candidateList.add(localNodeId);
                     if (candidateMap.replace(path, candidates.version(), candidateList)) {
                         Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                        notifyCandidateAdded(
+                        notifyCandidateUpdated(
                                 path, candidateList, newCandidates.version(), newCandidates.creationTime());
                     } else {
                         rerunForLeadership(path);
@@ -221,7 +221,7 @@
                 List<NodeId> candidateList = ImmutableList.of(localNodeId);
                 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
                     Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                    notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                    notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
                 } else {
                     rerunForLeadership(path);
                     return;
@@ -270,6 +270,27 @@
     }
 
     @Override
+    public boolean stepdown(String path) {
+        if (!activeTopics.contains(path)) {
+            return false;
+        }
+
+        try {
+            Versioned<NodeId> leader = leaderMap.get(path);
+            if (leader != null && Objects.equals(leader.value(), localNodeId)) {
+                if (leaderMap.remove(path, leader.version())) {
+                    log.info("Gave up leadership for {}", path);
+                    notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+                    return true;
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Error executing stepdown for {}", path, e);
+        }
+        return false;
+    }
+
+    @Override
     public void addListener(LeadershipEventListener listener) {
         listenerRegistry.addListener(listener);
     }
@@ -279,6 +300,28 @@
         listenerRegistry.removeListener(listener);
     }
 
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        Versioned<List<NodeId>> candidates = candidateMap.get(path);
+        if (candidates == null || !candidates.value().contains(nodeId)) {
+            return false;
+        }
+        if (nodeId.equals(candidates.value().get(0))) {
+            return true;
+        }
+        List<NodeId> currentRoster = candidates.value();
+        List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
+        newRoster.add(nodeId);
+        currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
+        boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
+        if (updated) {
+            Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+            notifyCandidateUpdated(
+                path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
+        }
+        return updated;
+    }
+
     private void tryLeaderLock(String path) {
         if (!activeTopics.contains(path)) {
             return;
@@ -334,7 +377,7 @@
         }
     }
 
-    private void notifyCandidateAdded(
+    private void notifyCandidateUpdated(
             String path, List<NodeId> candidates, long epoch, long electedTime) {
         Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
         final MutableBoolean updated = new MutableBoolean(false);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5606c60..d7d2bc0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -93,6 +93,8 @@
             new MessageSubject("mastership-store-device-role-query");
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
             new MessageSubject("mastership-store-device-role-relinquish");
+    private static final MessageSubject MASTERSHIP_RELINQUISH_SUBJECT =
+            new MessageSubject("mastership-store-device-mastership-relinquish");
 
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
@@ -111,6 +113,7 @@
                     .register(KryoNamespaces.API)
                     .register(MastershipRole.class)
                     .register(MastershipEvent.class)
+                    .register(MastershipEvent.Type.class)
                     .build();
         }
     };
@@ -125,6 +128,11 @@
         clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
                new RoleRelinquishHandler(),
                messageHandlingExecutor);
+        clusterCommunicator.addSubscriber(MASTERSHIP_RELINQUISH_SUBJECT,
+                SERIALIZER::decode,
+                this::relinquishMastership,
+                SERIALIZER::encode,
+                messageHandlingExecutor);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
@@ -135,6 +143,7 @@
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+        clusterCommunicator.removeSubscriber(MASTERSHIP_RELINQUISH_SUBJECT);
         messageHandlingExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
 
@@ -237,7 +246,22 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+        NodeId currentMaster = getMaster(deviceId);
+        if (nodeId.equals(currentMaster)) {
+            return null;
+        } else {
+            String leadershipTopic = createDeviceMastershipTopic(deviceId);
+            List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+            if (candidates.isEmpty()) {
+                return null;
+            }
+            if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
+                return relinquishMastership(deviceId);
+            } else {
+                log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
+            }
+        }
+        return null;
     }
 
     @Override
@@ -254,7 +278,13 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+        NodeId currentMaster = getMaster(deviceId);
+        if (!nodeId.equals(currentMaster)) {
+            return null;
+        }
+        // FIXME: This can becomes the master again unless it
+        // is demoted to the end of candidates list.
+        return relinquishMastership(deviceId);
     }
 
     @Override
@@ -294,6 +324,37 @@
         return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
     }
 
+    private MastershipEvent relinquishMastership(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        NodeId currentMaster = getMaster(deviceId);
+        if (currentMaster == null) {
+            return null;
+        }
+
+        if (!currentMaster.equals(localNodeId)) {
+            log.info("Forwarding request to relinquish "
+                    + "mastership for device {} to {}", deviceId, currentMaster);
+            return futureGetOrElse(clusterCommunicator.sendAndReceive(
+                    deviceId,
+                    MASTERSHIP_RELINQUISH_SUBJECT,
+                    SERIALIZER::encode,
+                    SERIALIZER::decode,
+                    currentMaster), null);
+        }
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
+
+        MastershipEvent.Type eventType = null;
+        if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
+            eventType = MastershipEvent.Type.MASTER_CHANGED;
+        }
+
+        return leadershipService.stepdown(leadershipTopic)
+                ? new MastershipEvent(eventType, deviceId, getNodes(deviceId)) : null;
+    }
+
     private class RoleQueryHandler implements ClusterMessageHandler {
         @Override
         public void handle(ClusterMessage message) {
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 5f7bb92..d00f616 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -119,4 +119,14 @@
     public List<NodeId> getCandidates(String path) {
         return null;
     }
+
+    @Override
+    public boolean stepdown(String path) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        throw new UnsupportedOperationException();
+    }
 }