Fix balance-masters functionality in the new LeadershipService based device mastership store

Change-Id: I9f64d514cee7d5a5383fd4c2fa30a8616c97785c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 0b2b4cb..74dae01 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -34,6 +34,7 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -48,7 +49,6 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
-
 import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
 import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
 
@@ -210,7 +210,7 @@
                     candidateList.add(localNodeId);
                     if (candidateMap.replace(path, candidates.version(), candidateList)) {
                         Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                        notifyCandidateAdded(
+                        notifyCandidateUpdated(
                                 path, candidateList, newCandidates.version(), newCandidates.creationTime());
                     } else {
                         rerunForLeadership(path);
@@ -221,7 +221,7 @@
                 List<NodeId> candidateList = ImmutableList.of(localNodeId);
                 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
                     Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                    notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                    notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
                 } else {
                     rerunForLeadership(path);
                     return;
@@ -270,6 +270,27 @@
     }
 
     @Override
+    public boolean stepdown(String path) {
+        if (!activeTopics.contains(path)) {
+            return false;
+        }
+
+        try {
+            Versioned<NodeId> leader = leaderMap.get(path);
+            if (leader != null && Objects.equals(leader.value(), localNodeId)) {
+                if (leaderMap.remove(path, leader.version())) {
+                    log.info("Gave up leadership for {}", path);
+                    notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+                    return true;
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Error executing stepdown for {}", path, e);
+        }
+        return false;
+    }
+
+    @Override
     public void addListener(LeadershipEventListener listener) {
         listenerRegistry.addListener(listener);
     }
@@ -279,6 +300,28 @@
         listenerRegistry.removeListener(listener);
     }
 
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        Versioned<List<NodeId>> candidates = candidateMap.get(path);
+        if (candidates == null || !candidates.value().contains(nodeId)) {
+            return false;
+        }
+        if (nodeId.equals(candidates.value().get(0))) {
+            return true;
+        }
+        List<NodeId> currentRoster = candidates.value();
+        List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
+        newRoster.add(nodeId);
+        currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
+        boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
+        if (updated) {
+            Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+            notifyCandidateUpdated(
+                path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
+        }
+        return updated;
+    }
+
     private void tryLeaderLock(String path) {
         if (!activeTopics.contains(path)) {
             return;
@@ -334,7 +377,7 @@
         }
     }
 
-    private void notifyCandidateAdded(
+    private void notifyCandidateUpdated(
             String path, List<NodeId> candidates, long epoch, long electedTime) {
         Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
         final MutableBoolean updated = new MutableBoolean(false);