Couple of fixes:
1. Retry leadership lock after a successful stepdown
2. setStandby should adjust the candidates list to ensure another node steps up to become the master.

Change-Id: I8dc5da82c9b8b9e99d4118ec33a63037543927f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index ed34928..69a1a8a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,7 +12,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
@@ -23,8 +22,6 @@
 import org.onosproject.event.AbstractListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
@@ -44,6 +41,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -128,7 +126,8 @@
                 groupedThreads("onos/store/leadership", "peer-updater"));
         clusterCommunicator.addSubscriber(
                 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                new InternalLeadershipEventListener(),
+                SERIALIZER::decode,
+                this::onLeadershipEvent,
                 messageHandlingExecutor);
 
         deadLockDetectionExecutor.scheduleWithFixedDelay(
@@ -139,7 +138,7 @@
         listenerRegistry = new AbstractListenerRegistry<>();
         eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
 
-        log.info("Started.");
+        log.info("Started");
     }
 
     @Deactivate
@@ -158,7 +157,7 @@
         deadLockDetectionExecutor.shutdown();
         leadershipStatusBroadcaster.shutdown();
 
-        log.info("Stopped.");
+        log.info("Stopped");
     }
 
     @Override
@@ -210,8 +209,12 @@
                     candidateList.add(localNodeId);
                     if (candidateMap.replace(path, candidates.version(), candidateList)) {
                         Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                        notifyCandidateUpdated(
-                                path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                        publish(new LeadershipEvent(
+                                LeadershipEvent.Type.CANDIDATES_CHANGED,
+                                new Leadership(path,
+                                    newCandidates.value(),
+                                    newCandidates.version(),
+                                    newCandidates.creationTime())));
                     } else {
                         rerunForLeadership(path);
                         return;
@@ -221,7 +224,12 @@
                 List<NodeId> candidateList = ImmutableList.of(localNodeId);
                 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
                     Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                    notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.CANDIDATES_CHANGED,
+                            new Leadership(path,
+                                newCandidates.value(),
+                                newCandidates.version(),
+                                newCandidates.creationTime())));
                 } else {
                     rerunForLeadership(path);
                     return;
@@ -245,7 +253,12 @@
             if (leader != null && Objects.equals(leader.value(), localNodeId)) {
                 if (leaderMap.remove(path, leader.version())) {
                     log.info("Gave up leadership for {}", path);
-                    notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_BOOTED,
+                            new Leadership(path,
+                                localNodeId,
+                                leader.version(),
+                                leader.creationTime())));
                 }
             }
             // else we are not the current leader, can still be a candidate.
@@ -258,7 +271,12 @@
             }
             if (candidateMap.replace(path, candidates.version(), candidateList)) {
                 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates);
+                publish(new LeadershipEvent(
+                                LeadershipEvent.Type.CANDIDATES_CHANGED,
+                                new Leadership(path,
+                                    newCandidates.value(),
+                                    newCandidates.version(),
+                                    newCandidates.creationTime())));
             } else {
                 log.warn("Failed to withdraw from candidates list. Will retry");
                 retryWithdraw(path);
@@ -279,8 +297,14 @@
             Versioned<NodeId> leader = leaderMap.get(path);
             if (leader != null && Objects.equals(leader.value(), localNodeId)) {
                 if (leaderMap.remove(path, leader.version())) {
-                    log.info("Gave up leadership for {}", path);
-                    notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+                    log.info("Stepped down from leadership for {}", path);
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_BOOTED,
+                            new Leadership(path,
+                                localNodeId,
+                                leader.version(),
+                                leader.creationTime())));
+                    retryLock(path);
                     return true;
                 }
             }
@@ -306,30 +330,35 @@
         if (candidates == null || !candidates.value().contains(nodeId)) {
             return false;
         }
-        if (nodeId.equals(candidates.value().get(0))) {
+        List<NodeId> currentRoster = candidates.value();
+        if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
             return true;
         }
-        List<NodeId> currentRoster = candidates.value();
         List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
         newRoster.add(nodeId);
         currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
         boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
         if (updated) {
             Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-            notifyCandidateUpdated(
-                path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
+            publish(new LeadershipEvent(
+                    LeadershipEvent.Type.CANDIDATES_CHANGED,
+                    new Leadership(path,
+                        newCandidates.value(),
+                        newCandidates.version(),
+                        newCandidates.creationTime())));
         }
         return updated;
     }
 
     private void tryLeaderLock(String path) {
-        if (!activeTopics.contains(path)) {
+        if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
             return;
         }
         try {
             Versioned<List<NodeId>> candidates = candidateMap.get(path);
             if (candidates != null) {
-                List<NodeId> activeNodes = candidates.value().stream()
+                List<NodeId> activeNodes = candidates.value()
+                                  .stream()
                                   .filter(n -> clusterService.getState(n) == ACTIVE)
                                   .collect(Collectors.toList());
                 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
@@ -353,8 +382,12 @@
                 if (localNodeId.equals(currentLeader.value())) {
                     log.info("Already has leadership for {}", path);
                     // FIXME: candidates can get out of sync.
-                    notifyNewLeader(
-                            path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_ELECTED,
+                            new Leadership(path,
+                                localNodeId,
+                                currentLeader.version(),
+                                currentLeader.creationTime())));
                 } else {
                     // someone else has leadership. will retry after sometime.
                     retryLock(path);
@@ -365,7 +398,12 @@
                     // do a get again to get the version (epoch)
                     Versioned<NodeId> newLeader = leaderMap.get(path);
                     // FIXME: candidates can get out of sync
-                    notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_ELECTED,
+                            new Leadership(path,
+                                newLeader.value(),
+                                newLeader.version(),
+                                newLeader.creationTime())));
                 } else {
                     // someone beat us to it.
                     retryLock(path);
@@ -377,140 +415,51 @@
         }
     }
 
-    private void notifyCandidateUpdated(
-            String path, List<NodeId> candidates, long epoch, long electedTime) {
-        Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
-        final MutableBoolean updated = new MutableBoolean(false);
-        candidateBoard.compute(path, (k, current) -> {
-            if (current == null || current.epoch() < newInfo.epoch()) {
-                log.debug("updating candidateboard with {}", newInfo);
-                updated.setTrue();
-                return newInfo;
-            }
-            return current;
-        });
-        // maybe rethink types of candidates events
-        if (updated.booleanValue()) {
-            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
-            notifyPeers(event);
+    private void publish(LeadershipEvent event) {
+        onLeadershipEvent(event);
+        clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
+    }
+
+    private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
+        log.trace("Leadership Event: time = {} type = {} event = {}",
+                leadershipEvent.time(), leadershipEvent.type(),
+                leadershipEvent);
+
+        Leadership leadershipUpdate = leadershipEvent.subject();
+        LeadershipEvent.Type eventType = leadershipEvent.type();
+        String topic = leadershipUpdate.topic();
+
+        AtomicBoolean updateAccepted = new AtomicBoolean(false);
+        if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+            leaderBoard.compute(topic, (k, currentLeadership) -> {
+                if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    return leadershipUpdate;
+                }
+                return currentLeadership;
+            });
+        } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+            leaderBoard.compute(topic, (k, currentLeadership) -> {
+                if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    return null;
+                }
+                return currentLeadership;
+            });
+        } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+            candidateBoard.compute(topic, (k, currentInfo) -> {
+                if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    return leadershipUpdate;
+                }
+                return currentInfo;
+            });
+        } else {
+            throw new IllegalStateException("Unknown event type.");
         }
-    }
 
-    private void notifyCandidateRemoved(
-            String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
-        Leadership newInfo = (candidates == null)
-                ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
-                : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
-        final MutableBoolean updated = new MutableBoolean(false);
-
-        candidateBoard.compute(path, (k, current) -> {
-            if (current != null && current.epoch() < newInfo.epoch()) {
-                updated.setTrue();
-                return newInfo;
-            }
-            return current;
-        });
-        // maybe rethink types of candidates events
-        if (updated.booleanValue()) {
-            log.debug("updated candidateboard with removal: {}", newInfo);
-            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
-            notifyPeers(event);
-        }
-    }
-
-    private void notifyNewLeader(String path, NodeId leader,
-            List<NodeId> candidates, long epoch, long electedTime) {
-        Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
-        final MutableBoolean updatedLeader = new MutableBoolean(false);
-        log.debug("candidates for new Leadership {}", candidates);
-        leaderBoard.compute(path, (k, currentLeader) -> {
-            if (currentLeader == null || currentLeader.epoch() < epoch) {
-                log.debug("updating leaderboard with new {}", newLeadership);
-                updatedLeader.setTrue();
-                return newLeadership;
-            }
-            return currentLeader;
-        });
-
-        if (updatedLeader.booleanValue()) {
-            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
-            notifyPeers(event);
-        }
-    }
-
-    private void notifyPeers(LeadershipEvent event) {
-        eventDispatcher.post(event);
-        clusterCommunicator.broadcast(event,
-                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                SERIALIZER::encode);
-    }
-
-    private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
-        Versioned<List<NodeId>> candidates = candidateMap.get(path);
-        Leadership oldLeadership = new Leadership(
-                path, leader, candidates.value(), epoch, electedTime);
-        final MutableBoolean updatedLeader = new MutableBoolean(false);
-        leaderBoard.compute(path, (k, currentLeader) -> {
-            if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
-                updatedLeader.setTrue();
-                return null;
-            }
-            return currentLeader;
-        });
-
-        if (updatedLeader.booleanValue()) {
-            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
-            notifyPeers(event);
-        }
-    }
-
-    private class InternalLeadershipEventListener implements ClusterMessageHandler {
-
-        @Override
-        public void handle(ClusterMessage message) {
-            LeadershipEvent leadershipEvent =
-                    SERIALIZER.decode(message.payload());
-
-            log.trace("Leadership Event: time = {} type = {} event = {}",
-                    leadershipEvent.time(), leadershipEvent.type(),
-                    leadershipEvent);
-
-            Leadership leadershipUpdate = leadershipEvent.subject();
-            LeadershipEvent.Type eventType = leadershipEvent.type();
-            String topic = leadershipUpdate.topic();
-
-            MutableBoolean updateAccepted = new MutableBoolean(false);
-            if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
-                leaderBoard.compute(topic, (k, currentLeadership) -> {
-                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
-                        updateAccepted.setTrue();
-                        return leadershipUpdate;
-                    }
-                    return currentLeadership;
-                });
-            } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
-                leaderBoard.compute(topic, (k, currentLeadership) -> {
-                    if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
-                        updateAccepted.setTrue();
-                        return null;
-                    }
-                    return currentLeadership;
-                });
-            } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
-                candidateBoard.compute(topic, (k, currentInfo) -> {
-                    if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
-                        updateAccepted.setTrue();
-                        return leadershipUpdate;
-                    }
-                    return currentInfo;
-                });
-            } else {
-                throw new IllegalStateException("Unknown event type.");
-            }
-
-            if (updateAccepted.booleanValue()) {
-                eventDispatcher.post(leadershipEvent);
-            }
+        if (updateAccepted.get()) {
+            eventDispatcher.post(leadershipEvent);
         }
     }
 
@@ -549,7 +498,9 @@
                     try {
                         if (leaderMap.remove(path, epoch)) {
                             log.info("Purged stale lock held by {} for {}", nodeId, path);
-                            notifyRemovedLeader(path, nodeId, epoch, creationTime);
+                            publish(new LeadershipEvent(
+                                    LeadershipEvent.Type.LEADER_BOOTED,
+                                    new Leadership(path, nodeId, epoch, creationTime)));
                         }
                     } catch (Exception e) {
                         log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8cd6b59..0d85784 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -53,8 +53,6 @@
 import org.onosproject.net.MastershipRole;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
@@ -122,12 +120,16 @@
     public void activate() {
         messageHandlingExecutor =
                 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
-        clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
-                new RoleQueryHandler(),
+        clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
+                SERIALIZER::decode,
+                deviceId -> getRole(localNodeId, deviceId),
+                SERIALIZER::encode,
                 messageHandlingExecutor);
-        clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
-               new RoleRelinquishHandler(),
-               messageHandlingExecutor);
+        clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
+                SERIALIZER::decode,
+                deviceId -> relinquishRole(localNodeId, deviceId),
+                SERIALIZER::encode,
+                messageHandlingExecutor);
         clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
                 SERIALIZER::decode,
                 this::transitionFromMasterToStandby,
@@ -211,8 +213,6 @@
         Map<NodeId, MastershipRole> roles = Maps.newHashMap();
         clusterService
             .getNodes()
-            .stream()
-            .parallel()
             .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
 
         NodeId master = null;
@@ -282,9 +282,21 @@
         if (!nodeId.equals(currentMaster)) {
             return null;
         }
-        // FIXME: This can become the master again unless it
-        // is first demoted to the end of candidates list.
-        return transitionFromMasterToStandby(deviceId);
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+
+        NodeId newMaster = candidates.stream()
+                                     .filter(candidate -> !Objects.equal(nodeId, candidate))
+                                     .findFirst()
+                                     .orElse(null);
+        log.info("Transitioning to role {} for {}. Next master: {}",
+                newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
+
+        if (newMaster != null) {
+            return setMaster(newMaster, deviceId);
+        }
+        return relinquishRole(nodeId, deviceId);
     }
 
     @Override
@@ -344,28 +356,11 @@
                 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
     }
 
-    private class RoleQueryHandler implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            DeviceId deviceId = SERIALIZER.decode(message.payload());
-            message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
-        }
-    }
-
-
     @Override
     public void relinquishAllRole(NodeId nodeId) {
         // Noop. LeadershipService already takes care of detecting and purging deadlocks.
     }
 
-    private class RoleRelinquishHandler implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            DeviceId deviceId = SERIALIZER.decode(message.payload());
-            message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
-        }
-    }
-
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
         @Override
         public void event(LeadershipEvent event) {