LeadershipStore updates:
 - Now tracking leader and candidates for a topic using a single map.
 - Using term numbers that are incremented by one every time a new leader is elected.
 - Introduced a separate LeadershipStore to conform to the  manager-store pattern

Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
new file mode 100644
index 0000000..c6647b7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
+ */
+@Service
+@Component(immediate = true, enabled = true)
+public class DistributedLeadershipStore
+    extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
+    implements LeadershipStore {
+
+    private static final Logger log = getLogger(DistributedLeadershipStore.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    protected NodeId localNodeId;
+    protected ConsistentMap<String, InternalLeadership> leadershipMap;
+    private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
+            event -> {
+                Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
+                Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
+                boolean leaderChanged =
+                        !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
+                boolean candidatesChanged =
+                        !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
+                                                    ImmutableSet.<NodeId>of() : oldValue.candidates()),
+                                                  Sets.newHashSet(newValue.candidates())).isEmpty();
+                LeadershipEvent.Type eventType = null;
+                if (leaderChanged && candidatesChanged) {
+                    eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
+                }
+                if (leaderChanged && !candidatesChanged) {
+                    eventType = LeadershipEvent.Type.LEADER_CHANGED;
+                }
+                if (!leaderChanged && candidatesChanged) {
+                    eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
+                }
+                notifyDelegate(new LeadershipEvent(eventType, newValue));
+            };
+
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
+                                      .withName("onos-leadership")
+                                      .withPartitionsDisabled()
+                                      .withRelaxedReadConsistency()
+                                      .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
+                                      .build();
+        leadershipMap.addListener(leadershipChangeListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        leadershipMap.removeListener(leadershipChangeListener);
+        log.info("Stopped");
+    }
+
+    @Override
+    public Leadership addRegistration(String topic) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v == null || !v.candidates().contains(localNodeId),
+                (k, v) -> {
+                    if (v == null || v.candidates().isEmpty()) {
+                        return new InternalLeadership(topic,
+                                localNodeId,
+                                v == null ? 1 : v.term() + 1,
+                                System.currentTimeMillis(),
+                                ImmutableList.of(localNodeId));
+                    }
+                    List<NodeId> newCandidates = new ArrayList<>(v.candidates());
+                    newCandidates.add(localNodeId);
+                    return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
+                });
+        return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
+    }
+
+    @Override
+    public void removeRegistration(String topic) {
+        removeRegistration(topic, localNodeId);
+    }
+
+    private void removeRegistration(String topic, NodeId nodeId) {
+        leadershipMap.computeIf(topic,
+                v -> v != null && v.candidates().contains(nodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = v.candidates()
+                            .stream()
+                            .filter(id -> !nodeId.equals(id))
+                            .collect(Collectors.toList());
+                    NodeId newLeader = nodeId.equals(v.leader()) ?
+                            newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
+                    long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+                            v.term() : v.term() + 1;
+                    long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+                            v.termStartTime() : System.currentTimeMillis();
+                    return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
+                });
+    }
+
+    @Override
+    public void removeRegistration(NodeId nodeId) {
+        leadershipMap.entrySet()
+                                  .stream()
+                                  .filter(e -> e.getValue().value().candidates().contains(nodeId))
+                                  .map(e -> e.getKey())
+                                  .forEach(topic -> this.removeRegistration(topic, nodeId));
+    }
+
+    @Override
+    public boolean moveLeadership(String topic, NodeId toNodeId) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v != null &&
+                    v.candidates().contains(toNodeId) &&
+                    !Objects.equal(v.leader(), toNodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = new ArrayList<>();
+                    newCandidates.add(toNodeId);
+                    newCandidates.addAll(v.candidates()
+                            .stream()
+                            .filter(id -> !toNodeId.equals(id))
+                            .collect(Collectors.toList()));
+                    return new InternalLeadership(topic,
+                            toNodeId,
+                            v.term() + 1,
+                            System.currentTimeMillis(),
+                            newCandidates);
+                });
+        return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
+    }
+
+    @Override
+    public boolean makeTopCandidate(String topic, NodeId nodeId) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v != null &&
+                v.candidates().contains(nodeId) &&
+                !v.candidates().get(0).equals(nodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = new ArrayList<>();
+                    newCandidates.add(nodeId);
+                    newCandidates.addAll(v.candidates()
+                            .stream()
+                            .filter(id -> !nodeId.equals(id))
+                            .collect(Collectors.toList()));
+                    return new InternalLeadership(topic,
+                            v.leader(),
+                            v.term(),
+                            System.currentTimeMillis(),
+                            newCandidates);
+                });
+        return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
+    }
+
+    @Override
+    public Leadership getLeadership(String topic) {
+        return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderships() {
+        Map<String, Leadership> leaderships = Maps.newHashMap();
+        leadershipMap.entrySet().forEach(e -> {
+            leaderships.put(e.getKey(), e.getValue().value().asLeadership());
+        });
+        return ImmutableMap.copyOf(leaderships);
+    }
+
+    private static class InternalLeadership {
+        private final String topic;
+        private final NodeId leader;
+        private final long term;
+        private final long termStartTime;
+        private final List<NodeId> candidates;
+
+        public InternalLeadership(String topic,
+                NodeId leader,
+                long term,
+                long termStartTime,
+                List<NodeId> candidates) {
+            this.topic = topic;
+            this.leader = leader;
+            this.term = term;
+            this.termStartTime = termStartTime;
+            this.candidates = ImmutableList.copyOf(candidates);
+        }
+
+        public NodeId leader() {
+            return this.leader;
+        }
+
+        public long term() {
+            return term;
+        }
+
+        public long termStartTime() {
+            return termStartTime;
+        }
+
+        public List<NodeId> candidates() {
+            return candidates;
+        }
+
+        public Leadership asLeadership() {
+            return new Leadership(topic, leader == null ?
+                    null : new Leader(leader, term, termStartTime), candidates);
+        }
+
+        public static Leadership toLeadership(InternalLeadership internalLeadership) {
+            return internalLeadership == null ? null : internalLeadership.asLeadership();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("leader", leader)
+                    .add("term", term)
+                    .add("termStartTime", termStartTime)
+                    .add("candidates", candidates)
+                    .toString();
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index bfb4754..d2c63f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -21,8 +21,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.Leadership;
@@ -76,7 +74,6 @@
 
     private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
-    private ClusterEventListener clusterListener = new InternalClusterEventListener();
 
     private ScheduledExecutorService executor = Executors
             .newScheduledThreadPool(1);
@@ -84,7 +81,6 @@
     @Activate
     public void activate() {
         leadershipService.addListener(leaderListener);
-        clusterService.addListener(clusterListener);
 
         listenerRegistry = new ListenerRegistry<>();
         eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
@@ -103,7 +99,6 @@
 
         eventDispatcher.removeSink(IntentPartitionEvent.class);
         leadershipService.removeListener(leaderListener);
-        clusterService.removeListener(clusterListener);
     }
 
     /**
@@ -180,7 +175,7 @@
 
         List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
                 .stream()
-                .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
+                .filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
                 .filter(l -> l.topic().startsWith(ELECTION_PREFIX))
                 .collect(Collectors.toList());
 
@@ -220,24 +215,16 @@
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
 
-            if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
+            if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
-                // See if we need to let some partitions go
-                scheduleRebalance(0);
-
                 eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
                                                         leadership.topic()));
             }
-        }
-    }
 
-    private final class InternalClusterEventListener implements
-            ClusterEventListener {
-
-        @Override
-        public void event(ClusterEvent event) {
-            scheduleRebalance(0);
+            if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
+                scheduleRebalance(0);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.mastership.impl;
 
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
 import org.slf4j.Logger;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
     protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipAdminService leadershipAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
     private NodeId localNodeId;
-    private final Set<DeviceId> connectedDevices = Sets.newHashSet();
 
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
             new MessageSubject("mastership-store-device-role-relinquish");
-    private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
-            new MessageSubject("mastership-store-device-mastership-relinquish");
 
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
                 this::relinquishLocalRole,
                 SERIALIZER::encode,
                 messageHandlingExecutor);
-        clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                SERIALIZER::decode,
-                this::transitionFromMasterToStandby,
-                SERIALIZER::encode,
-                messageHandlingExecutor);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
@@ -146,7 +141,6 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
-        clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
         messageHandlingExecutor.shutdown();
         transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        connectedDevices.add(deviceId);
-        return leadershipService.runForLeadership(leadershipTopic)
-                .thenApply(leadership -> {
-                    return Objects.equal(localNodeId, leadership.leader())
-                            ? MastershipRole.MASTER : MastershipRole.STANDBY;
-                });
+        Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+        return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+                ? MastershipRole.MASTER : MastershipRole.STANDBY);
     }
 
     @Override
@@ -173,20 +164,19 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId leader = leadershipService.getLeader(leadershipTopic);
-        if (Objects.equal(nodeId, leader)) {
-            return MastershipRole.MASTER;
-        }
-        return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
-                MastershipRole.STANDBY : MastershipRole.NONE;
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+        List<NodeId> candidates = leadership == null ?
+                ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+        return Objects.equal(nodeId, leader) ?
+                MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        return leadershipService.getLeader(leadershipTopic);
+        return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
     }
 
     @Override
@@ -194,9 +184,8 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService
-            .getNodes()
-            .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+        clusterService.getNodes()
+                      .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
 
         NodeId master = null;
         final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        NodeId currentMaster = getMaster(deviceId);
-        if (nodeId.equals(currentMaster)) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            String leadershipTopic = createDeviceMastershipTopic(deviceId);
-            List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
-            if (candidates.isEmpty()) {
-                return  CompletableFuture.completedFuture(null);
-            }
-            if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
-                CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
-                // There is brief wait before we step down from mastership.
-                // This is to ensure any work that happens when standby preference
-                // order changes can complete. For example: flow entries need to be backed
-                // to the new top standby (ONOS-1883)
-                // FIXME: This potentially introduces a race-condition.
-                // Right now role changes are only forced via CLI.
-                transferExecutor.schedule(() -> {
-                    result.complete(transitionFromMasterToStandby(deviceId));
-                }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
-                return result;
-            } else {
-                log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
-            }
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+            transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+                    WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -267,7 +236,7 @@
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         Leadership leadership = leadershipService.getLeadership(leadershipTopic);
-        return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+        return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
     }
 
     @Override
@@ -318,71 +287,44 @@
     private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        // Check if this node is can be managed by this node.
-        if (!connectedDevices.contains(deviceId)) {
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
             return CompletableFuture.completedFuture(null);
         }
-
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
-        MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
-            ? MastershipEvent.Type.MASTER_CHANGED
-            : MastershipEvent.Type.BACKUPS_CHANGED;
-
-        connectedDevices.remove(deviceId);
-        return leadershipService.withdraw(leadershipTopic)
-                                .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
-    }
-
-    private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
-        checkArgument(deviceId != null, DEVICE_ID_NULL);
-
-        NodeId currentMaster = getMaster(deviceId);
-        if (currentMaster == null) {
-            return null;
-        }
-
-        if (!currentMaster.equals(localNodeId)) {
-            log.info("Forwarding request to relinquish "
-                    + "mastership for device {} to {}", deviceId, currentMaster);
-            return futureGetOrElse(clusterCommunicator.sendAndReceive(
-                    deviceId,
-                    TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                    SERIALIZER::encode,
-                    SERIALIZER::decode,
-                    currentMaster), null);
-        }
-
-        return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
-                ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+        MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+                MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+        leadershipService.withdraw(leadershipTopic);
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
     }
 
     @Override
     public void relinquishAllRole(NodeId nodeId) {
-        // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+        // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            return isDeviceMastershipTopic(leadership.topic());
+        }
+
         @Override
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
-            if (!isDeviceMastershipTopic(leadership.topic())) {
-                return;
-            }
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+            RoleInfo roleInfo = getNodes(deviceId);
             switch (event.type()) {
-            case LEADER_ELECTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_AND_CANDIDATES_CHANGED:
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
-            case LEADER_REELECTED:
-                // There is no concept of leader re-election in the new distributed leadership manager.
-                throw new IllegalStateException("Unexpected event type");
-            case LEADER_BOOTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_CHANGED:
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
             case CANDIDATES_CHANGED:
-                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
                 break;
             default:
                 return;
@@ -407,5 +349,4 @@
         Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
         return m.matches();
     }
-
-}
+}
\ No newline at end of file