LeadershipStore updates:
 - Now tracking leader and candidates for a topic using a single map.
 - Using term numbers that are incremented by one every time a new leader is elected.
 - Introduced a separate LeadershipStore to conform to the  manager-store pattern

Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
index a3cf7b3..a6f3d1c 100644
--- a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
+++ b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.cip;
 
 import com.google.common.io.ByteStreams;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -93,7 +94,7 @@
         cfgService.registerProperties(getClass());
 
         localId = clusterService.getLocalNode().id();
-        processLeadershipChange(leadershipService.getLeader(CLUSTER_IP));
+        processLeaderChange(leadershipService.getLeader(CLUSTER_IP));
 
         leadershipService.addListener(listener);
         leadershipService.runForLeadership(CLUSTER_IP);
@@ -137,10 +138,7 @@
         }
     }
 
-    private synchronized void processLeadershipChange(NodeId newLeader) {
-        if (newLeader == null) {
-            return;
-        }
+    private synchronized void processLeaderChange(NodeId newLeader) {
         boolean isLeader = Objects.equals(newLeader, localId);
         log.info("Processing leadership change; wasLeader={}, isLeader={}", wasLeader, isLeader);
         if (!wasLeader && isLeader) {
@@ -189,11 +187,15 @@
 
     // Listens for leadership changes.
     private class InternalLeadershipListener implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            return CLUSTER_IP.equals(event.subject().topic());
+        }
+
         @Override
         public void event(LeadershipEvent event) {
-            if (event.subject().topic().equals(CLUSTER_IP)) {
-                processLeadershipChange(event.subject().leader());
-            }
+             processLeaderChange(event.subject().leaderNodeId());
         }
     }
 
diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
index bcf4e2e..b2a33a9 100644
--- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
+++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
@@ -19,6 +19,7 @@
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -105,10 +106,7 @@
         log.info("Stopped");
     }
 
-    private synchronized void processLeadershipChange(NodeId newLeader) {
-        if (newLeader == null) {
-            return;
-        }
+    private synchronized void processLeaderChange(NodeId newLeader) {
         boolean currLeader = newLeader.equals(localId);
         if (isLeader.getAndSet(currLeader) != currLeader) {
             if (currLeader) {
@@ -159,7 +157,7 @@
 
         @Override
         public void event(LeadershipEvent event) {
-            processLeadershipChange(event.subject().leader());
+            processLeaderChange(event.subject().leaderNodeId());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
index 22c7d43..1811049 100644
--- a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
+++ b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
@@ -26,6 +26,7 @@
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.intent.Intent;
@@ -43,7 +44,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
@@ -74,6 +74,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected IntentService intentService;
 
+    private NodeId localNodeId;
     private ApplicationId appId;
 
     private final InternalLeadershipListener leadershipEventListener =
@@ -89,7 +90,7 @@
     @Activate
     public void activate() {
         intentsSynchronizerExecutor = createExecutor();
-
+        this.localNodeId = clusterService.getLocalNode().id();
         this.appId = coreService.registerApplication(APP_NAME);
 
         leadershipService.addListener(leadershipEventListener);
@@ -268,27 +269,22 @@
     private class InternalLeadershipListener implements LeadershipEventListener {
 
         @Override
-        public void event(LeadershipEvent event) {
-            if (!event.subject().topic().equals(appId.name())) {
-                // Not our topic: ignore
-                return;
-            }
-            if (!Objects.equals(event.subject().leader(),
-                    clusterService.getLocalNode().id())) {
-                // The event is not about this instance: ignore
-                return;
-            }
+        public boolean isRelevant(LeadershipEvent event) {
+            return event.subject().topic().equals(appId.name());
+        }
 
+        @Override
+        public void event(LeadershipEvent event) {
             switch (event.type()) {
-            case LEADER_ELECTED:
-                log.info("IntentSynchronizer gained leadership");
-                leaderChanged(true);
-                break;
-            case LEADER_BOOTED:
-                log.info("IntentSynchronizer lost leadership");
-                leaderChanged(false);
-                break;
-            case LEADER_REELECTED:
+            case LEADER_CHANGED:
+            case LEADER_AND_CANDIDATES_CHANGED:
+                if (localNodeId.equals(event.subject().leaderNodeId())) {
+                    log.info("IntentSynchronizer gained leadership");
+                    leaderChanged(true);
+                } else {
+                    log.info("IntentSynchronizer leader changed. New leader is {}", event.subject().leaderNodeId());
+                    leaderChanged(false);
+                }
             default:
                 break;
             }
diff --git a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
index d265323..351a099 100644
--- a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
+++ b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
@@ -29,7 +30,11 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.incubator.net.intf.Interface;
@@ -94,6 +99,9 @@
     private static final ApplicationId APPID =
             TestApplicationId.create("intent-sync-test");
 
+    private static final ControllerNode LOCAL_NODE =
+            new DefaultControllerNode(new NodeId("foo"), IpAddress.valueOf("127.0.0.1"));
+
     @Before
     public void setUp() throws Exception {
         super.setUp();
@@ -105,6 +113,7 @@
         intentSynchronizer = new TestIntentSynchronizer();
 
         intentSynchronizer.coreService = new TestCoreService();
+        intentSynchronizer.clusterService = new TestClusterService();
         intentSynchronizer.leadershipService = new TestLeadershipService();
         intentSynchronizer.intentService = intentService;
 
@@ -441,6 +450,13 @@
         }
     }
 
+    private class TestClusterService extends ClusterServiceAdapter {
+        @Override
+        public ControllerNode getLocalNode() {
+            return LOCAL_NODE;
+        }
+    }
+
     private class TestLeadershipService extends LeadershipServiceAdapter {
 
     }
diff --git a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
index b2a7207..e0d3b27 100644
--- a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
+++ b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
@@ -24,12 +24,11 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.CoreService;
-import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
-
 import org.slf4j.Logger;
 
 
@@ -56,7 +55,7 @@
     private LeadershipEventListener leadershipEventListener =
             new InnerLeadershipEventListener();
 
-    private ControllerNode localControllerNode;
+    private NodeId localNodeId;
 
 
     @Activate
@@ -65,7 +64,7 @@
 
         appId = coreService.registerApplication(ELECTION_APP);
 
-        localControllerNode = clusterService.getLocalNode();
+        localNodeId = clusterService.getLocalNode().id();
 
         leadershipService.addListener(leadershipEventListener);
     }
@@ -100,20 +99,10 @@
             log.debug("Leadership Event: time = {} type = {} event = {}",
                     event.time(), event.type(), event);
 
-            if (!event.subject().leader().equals(
-                    localControllerNode.id())) {
-                return;         // The event is not about this instance: ignore
-            }
-
             switch (event.type()) {
-                case LEADER_ELECTED:
-                    log.info("Election-test app leader elected");
-                    break;
-                case LEADER_BOOTED:
-                    log.info("Election-test app lost election");
-                    break;
-                case LEADER_REELECTED:
-                    log.debug("Election-test app was re-elected");
+                case LEADER_CHANGED:
+                case LEADER_AND_CANDIDATES_CHANGED:
+                    log.info("Election-test app leader changed. New leadership: {}", event.subject());
                     break;
                 default:
                     break;
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index da4ab13..6e2901d 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -40,7 +40,7 @@
         description = "Finds the leader for particular topic.")
 public class LeaderCommand extends AbstractShellCommand {
 
-    private static final String FMT = "%-30s | %-15s | %-6s | %-10s |";
+    private static final String FMT = "%-30s | %-15s | %-5s | %-10s |";
     private static final String FMT_C = "%-30s | %-15s | %-19s |";
     private boolean allTopics;
     private Pattern pattern;
@@ -57,19 +57,8 @@
     /**
      * Compares leaders, sorting by toString() output.
      */
-    private Comparator<Leadership> leadershipComparator =
-            (e1, e2) -> {
-                if (e1.leader() == null && e2.leader() == null) {
-                    return 0;
-                }
-                if (e1.leader() == null) {
-                    return 1;
-                }
-                if (e2.leader() == null) {
-                    return -1;
-                }
-                return e1.leader().toString().compareTo(e2.leader().toString());
-            };
+    private Comparator<Leadership> leadershipComparator = (l1, l2) ->
+        String.valueOf(l1.leaderNodeId()).compareTo(String.valueOf(l2.leaderNodeId()));
 
     /**
      * Displays text representing the leaders.
@@ -78,18 +67,19 @@
      */
     private void displayLeaders(Map<String, Leadership> leaderBoard) {
         print("------------------------------------------------------------------------");
-        print(FMT, "Topic", "Leader", "Epoch", "Elected");
+        print(FMT, "Topic", "Leader", "Term", "Elected");
         print("------------------------------------------------------------------------");
 
         leaderBoard.values()
                 .stream()
                 .filter(l -> allTopics || pattern.matcher(l.topic()).matches())
+                .filter(l -> l.leader() != null)
                 .sorted(leadershipComparator)
                 .forEach(l -> print(FMT,
                         l.topic(),
-                        l.leader(),
-                        l.epoch(),
-                        Tools.timeAgo(l.electedTime())));
+                        l.leaderNodeId(),
+                        l.leader().term(),
+                        Tools.timeAgo(l.leader().termStartTime())));
         print("------------------------------------------------------------------------");
     }
 
@@ -110,7 +100,7 @@
                         Leadership l = leaderBoard.get(es.getKey());
                         print(FMT_C,
                             es.getKey(),
-                            l == null ? "null" : l.leader(),
+                            String.valueOf(l.leaderNodeId()),
                             // formatting hacks to get it into a table
                             list.get(0).toString());
                             list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
@@ -134,10 +124,10 @@
                         result.add(
                             mapper.createObjectNode()
                                 .put("topic", l.topic())
-                                .put("leader", l.leader().toString())
+                                .put("leader", String.valueOf(l.leaderNodeId()))
                                 .put("candidates", l.candidates().toString())
-                                .put("epoch", l.epoch())
-                                .put("electedTime", Tools.timeAgo(l.electedTime()))));
+                                .put("epoch", l.leader().term())
+                                .put("epochStartTime", Tools.timeAgo(l.leader().termStartTime()))));
 
         return result;
     }
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leader.java b/core/api/src/main/java/org/onosproject/cluster/Leader.java
new file mode 100644
index 0000000..8e5528d
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/Leader.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Topic leader.
+ * <p>
+ * Identified by the {@link NodeId node identifier} and a monotonically increasing term number.
+ * The term number is incremented by one every time a new node is elected as leader.
+ * Also available is the system clock time at the instant when this node was elected as leader.
+ * Keep in mind though that as with any system clock based time stamps this particular information
+ * susceptible to clock skew and should only be relied on for simple diagnostic purposes.
+ */
+public class Leader {
+    private final NodeId nodeId;
+    private final long term;
+    private final long termStartTime;
+
+    public Leader(NodeId nodeId, long term, long termStartTime) {
+        this.nodeId = checkNotNull(nodeId);
+        checkArgument(term >= 0, "term must be non-negative");
+        this.term = term;
+        checkArgument(termStartTime >= 0, "termStartTime must be non-negative");
+        this.termStartTime = termStartTime;
+    }
+
+    /**
+     * Returns the identifier for of leader.
+     * @return node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the leader's term.
+     * @return leader term
+     */
+    public long term() {
+        return term;
+    }
+
+    /**
+     * Returns the system time when the current leadership term started.
+     * @return current leader term start time
+     */
+    public long termStartTime() {
+        return termStartTime;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other != null && other instanceof Leader) {
+            Leader that = (Leader) other;
+            return Objects.equal(this.nodeId, that.nodeId) &&
+                    this.term ==  that.term &&
+                    this.termStartTime == that.termStartTime;
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(nodeId, term, termStartTime);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+            .add("nodeId", nodeId)
+            .add("term", term)
+            .add("termStartTime", termStartTime)
+            .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
index 113e19c..b31c03f 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
@@ -17,63 +17,31 @@
 
 import java.util.Objects;
 import java.util.List;
-import java.util.Optional;
-
-import org.joda.time.DateTime;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 
 /**
- * Abstract leadership concept. The information carried by this construct
- * include the topic of contention, the {@link NodeId}s of Nodes that could
- * become leader for the topic, the epoch when the term for a given leader
- * began, and the system time when the term began. Note:
- * <ul>
- * <li>The list of NodeIds may include the current leader at index 0, and the
- * rest in decreasing preference order.</li>
- * <li>The epoch is the logical age of a Leadership construct, and should be
- * used for comparing two Leaderships, but only of the same topic.</li>
- * <li>The leader may be null if its accuracy can't be guaranteed. This applies
- * to CANDIDATES_CHANGED events and candidate board contents.</li>
- * </ul>
+ * State of leadership for topic.
+ * <p>
+ * Provided by this construct is the current {@link Leader leader} and the list of
+ * {@link NodeId nodeId}s currently registered as candidates for election for the topic.
+ * Keep in mind that only registered candidates can become leaders.
  */
 public class Leadership {
 
     private final String topic;
-    private final Optional<NodeId> leader;
+    private final Leader leader;
     private final List<NodeId> candidates;
-    private final long epoch;
-    private final long electedTime;
 
-    public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
+    public Leadership(String topic, Leader leader, List<NodeId> candidates) {
         this.topic = topic;
-        this.leader = Optional.of(leader);
-        this.candidates = ImmutableList.of(leader);
-        this.epoch = epoch;
-        this.electedTime = electedTime;
-    }
-
-    public Leadership(String topic, NodeId leader, List<NodeId> candidates,
-            long epoch, long electedTime) {
-        this.topic = topic;
-        this.leader = Optional.of(leader);
+        this.leader = leader;
         this.candidates = ImmutableList.copyOf(candidates);
-        this.epoch = epoch;
-        this.electedTime = electedTime;
-    }
-
-    public Leadership(String topic, List<NodeId> candidates,
-            long epoch, long electedTime) {
-        this.topic = topic;
-        this.leader = Optional.empty();
-        this.candidates = ImmutableList.copyOf(candidates);
-        this.epoch = epoch;
-        this.electedTime = electedTime;
     }
 
     /**
-     * The topic for which this leadership applies.
+     * Returns the leadership topic.
      *
      * @return leadership topic.
      */
@@ -82,13 +50,21 @@
     }
 
     /**
-     * The nodeId of leader for this topic.
+     * Returns the {@link NodeId nodeId} of the leader.
      *
-     * @return leader node.
+     * @return leader node identifier; will be null if there is no leader
      */
-    // This will return Optional<NodeId> in the future.
-    public NodeId leader() {
-        return leader.orElse(null);
+    public NodeId leaderNodeId() {
+        return leader == null ? null : leader.nodeId();
+    }
+
+    /**
+     * Returns the leader for this topic.
+     *
+     * @return leader; will be null if there is no leader for topic
+     */
+    public Leader leader() {
+        return leader;
     }
 
     /**
@@ -101,38 +77,9 @@
         return candidates;
     }
 
-    /**
-     * The epoch when the leadership was assumed.
-     * <p>
-     * Comparing epochs is only appropriate for leadership events for the same
-     * topic. The system guarantees that for any given topic the epoch for a new
-     * term is higher (not necessarily by 1) than the epoch for any previous
-     * term.
-     *
-     * @return leadership epoch
-     */
-    public long epoch() {
-        return epoch;
-    }
-
-    /**
-     * The system time when the term started.
-     * <p>
-     * The elected time is initially set on the node coordinating
-     * the leader election using its local system time. Due to possible
-     * clock skew, relying on this value for determining event ordering
-     * is discouraged. Epoch is more appropriate for determining
-     * event ordering.
-     *
-     * @return elected time.
-     */
-    public long electedTime() {
-        return electedTime;
-    }
-
     @Override
     public int hashCode() {
-        return Objects.hash(topic, leader, candidates, epoch, electedTime);
+        return Objects.hash(topic, leader, candidates);
     }
 
     @Override
@@ -144,9 +91,7 @@
             final Leadership other = (Leadership) obj;
             return Objects.equals(this.topic, other.topic) &&
                     Objects.equals(this.leader, other.leader) &&
-                    Objects.equals(this.candidates, other.candidates) &&
-                    Objects.equals(this.epoch, other.epoch) &&
-                    Objects.equals(this.electedTime, other.electedTime);
+                    Objects.equals(this.candidates, other.candidates);
         }
         return false;
     }
@@ -157,8 +102,6 @@
             .add("topic", topic)
             .add("leader", leader)
             .add("candidates", candidates)
-            .add("epoch", epoch)
-            .add("electedTime", new DateTime(electedTime))
             .toString();
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java
new file mode 100644
index 0000000..c58ca71
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+/**
+ * Interface for administratively manipulating leadership assignments.
+ */
+public interface LeadershipAdminService {
+
+    /**
+     * Attempts to assign leadership for a topic to a specified node.
+     * @param topic leadership topic
+     * @param nodeId identifier of the node to be made leader
+     * @return true is the transfer was successfully executed. This method returns {@code false}
+     * if {@code nodeId} is not one of the candidates for for the topic.
+     */
+    boolean transferLeadership(String topic, NodeId nodeId);
+
+    /**
+     * Make a node to be the next leader by promoting it to top of candidate list.
+     * @param topic leadership topic
+     * @param nodeId identifier of node to be next leader
+     * @return {@code true} if nodeId is now the top candidate. This method returns {@code false}
+     * if {@code nodeId} is not one of the candidates for for the topic.
+     */
+    boolean promoteToTopOfCandidateList(String topic, NodeId nodeId);
+
+    /**
+     * Removes all active leadership registrations for a given node.
+     * <p>
+     * This method will also evict the node from leaderships that it currently owns.
+     * @param nodeId node identifier
+     */
+    void unregister(NodeId nodeId);
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index d1fe670..6fb310d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -27,33 +27,25 @@
 public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
 
     /**
-     * Type of leadership-related events.
+     * Type of leadership events.
      */
     public enum Type {
         /**
-         * Signifies that the leader has been elected.
-         * The event subject is the new leader.
-         * This event does not guarantee accurate candidate information.
+         * Signifies a change in both the leader as well as change to the list of candidates. Keep in mind though that
+         * the first node entering the race will trigger this event as it will become a candidate and automatically get
+         * promoted to become leader.
          */
-        LEADER_ELECTED,
+        LEADER_AND_CANDIDATES_CHANGED,
 
         /**
-         * Signifies that the leader has been re-elected.
-         * The event subject is the leader.
-         * This event does not guarantee accurate candidate information.
+         * Signifies that the leader for a topic has changed.
          */
-        LEADER_REELECTED,
+        // TODO: We may not need this. We currently do not support a way for a current leader to step down
+        // while still reamining a candidate
+        LEADER_CHANGED,
 
         /**
-         * Signifies that the leader has been booted and lost leadership.
-         * The event subject is the former leader.
-         * This event does not guarantee accurate candidate information.
-         */
-        LEADER_BOOTED,
-
-        /**
-         * Signifies that the list of candidates for leadership for a topic has
-         * changed. This event does not guarantee accurate leader information.
+         * Signifies a change in the list of candidates for a topic.
          */
         CANDIDATES_CHANGED
     }
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 7d1f607..b0f7a8d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -17,87 +17,73 @@
 
 import org.onosproject.event.ListenerService;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Service for leader election.
+ * <p>
  * Leadership contests are organized around topics. A instance can join the
  * leadership race for a topic or withdraw from a race it has previously joined.
+ * <p>
  * Listeners can be added to receive notifications asynchronously for various
  * leadership contests.
+ * <p>
+ * When a node gets elected as a leader for a topic, all nodes receive notifications
+ * indicating a change in leadership.
  */
 public interface LeadershipService
     extends ListenerService<LeadershipEvent, LeadershipEventListener> {
 
     /**
-     * Returns the current leader for the topic.
+     * Returns the {@link NodeId node identifier} that is the current leader for a topic.
      *
-     * @param path topic
-     * @return nodeId of the leader, null if so such topic exists.
+     * @param topic leadership topic
+     * @return node identifier of the current leader; {@code null} if there is no leader for the topic
      */
-    NodeId getLeader(String path);
+    default NodeId getLeader(String topic) {
+        Leadership leadership = getLeadership(topic);
+        return leadership == null ? null : leadership.leaderNodeId();
+    }
 
     /**
-     * Returns the current leadership info for the topic.
+     * Returns the current {@link Leadership leadership} for a topic.
      *
-     * @param path topic
-     * @return leadership info or null if so such topic exists.
+     * @param topic leadership topic
+     * @return leadership or {@code null} if no such topic exists
      */
-    Leadership getLeadership(String path);
+    Leadership getLeadership(String topic);
 
     /**
-     * Returns the set of topics owned by the specified node.
+     * Returns the set of topics owned by the specified {@link NodeId node}.
      *
-     * @param nodeId node Id.
+     * @param nodeId node identifier.
      * @return set of topics for which this node is the current leader.
      */
-    Set<String> ownedTopics(NodeId nodeId);
+    default Set<String> ownedTopics(NodeId nodeId) {
+        return Maps.filterValues(getLeaderBoard(), v -> Objects.equal(nodeId, v.leaderNodeId())).keySet();
+    }
 
     /**
-     * Joins the leadership contest.
+     * Enters a leadership contest.
      *
-     * @param path topic for which this controller node wishes to be a leader
+     * @param topic leadership topic
      * @return {@code Leadership} future
      */
-    CompletableFuture<Leadership> runForLeadership(String path);
+    Leadership runForLeadership(String topic);
 
     /**
      * Withdraws from a leadership contest.
      *
-     * @param path topic for which this controller node no longer wishes to be a leader
-     * @return future that is successfully completed when withdraw is done
+     * @param topic leadership topic
      */
-    CompletableFuture<Void> withdraw(String path);
-
-    /**
-     * If the local nodeId is the leader for specified topic, this method causes it to
-     * step down temporarily from leadership.
-     * <p>
-     * The node will continue to be in contention for leadership and can
-     * potentially become the leader again if and when it becomes the highest
-     * priority candidate
-     * <p>
-     * If the local nodeId is not the leader, this method will make no changes and
-     * simply return false.
-     *
-     * @param path topic for which this controller node should give up leadership
-     * @return true if this node stepped down from leadership, false otherwise
-     */
-    boolean stepdown(String path);
-
-    /**
-     * Moves the specified nodeId to the top of the candidates list for the topic.
-     * <p>
-     * If the node is not a candidate for this topic, this method will be a noop.
-     *
-     * @param path leadership topic
-     * @param nodeId nodeId to make the top candidate
-     * @return true if nodeId is now the top candidate, false otherwise
-     */
-    boolean makeTopCandidate(String path, NodeId nodeId);
+    void withdraw(String topic);
 
     /**
      * Returns the current leader board.
@@ -107,18 +93,22 @@
     Map<String, Leadership> getLeaderBoard();
 
     /**
-     * Returns the candidates for all known topics.
+     * Returns the candidate nodes for each topic.
      *
      * @return A mapping from topics to corresponding list of candidates.
      */
-    Map<String, List<NodeId>> getCandidates();
+    default Map<String, List<NodeId>> getCandidates() {
+        return ImmutableMap.copyOf(Maps.transformValues(getLeaderBoard(), v -> ImmutableList.copyOf(v.candidates())));
+    }
 
     /**
-     * Returns the candidates for a given topic.
+     * Returns the candidate nodes for a given topic.
      *
-     * @param path topic
-     * @return A lists of NodeIds, which may be empty.
+     * @param topic leadership topic
+     * @return A lists of {@link NodeId nodeIds}, which may be empty.
      */
-    List<NodeId> getCandidates(String path);
-
-}
+    default List<NodeId> getCandidates(String topic) {
+        Leadership leadership = getLeadership(topic);
+        return leadership == null ? ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
new file mode 100644
index 0000000..e3a0503
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Map;
+import org.onosproject.store.Store;
+
+/**
+ * Store interface for managing {@link LeadershipService} state.
+ */
+public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
+
+    /**
+     * Adds registration for the local instance to be leader for topic.
+     *
+     * @param topic leadership topic
+     * @return Updated leadership after operation is completed
+     */
+    Leadership addRegistration(String topic);
+
+    /**
+     * Unregisters the local instance from leadership contest for topic.
+     *
+     * @param topic leadership topic
+     */
+    void removeRegistration(String topic);
+
+    /**
+     * Unregisters an instance from all leadership contests.
+     *
+     * @param nodeId node identifier
+     */
+    void removeRegistration(NodeId nodeId);
+
+    /**
+     * Updates state so that given node is leader for a topic.
+     *
+     * @param topic leadership topic
+     * @param toNodeId identifier of the desired leader
+     * @return {@code true} if the transfer succeeded; {@code false} otherwise.
+     * This method can return {@code false} if the node is not registered for the topic
+     */
+    boolean moveLeadership(String topic, NodeId toNodeId);
+
+    /**
+     * Attempts to make a node the top candidate.
+     *
+     * @param topic leadership topic
+     * @param nodeId node identifier
+     * @return {@code true} if the specified node is now the top candidate.
+     * This method will return {@code false} if the node is not registered for the topic
+     */
+    boolean makeTopCandidate(String topic, NodeId nodeId);
+
+    /**
+     * Returns the current leadership for topic.
+     *
+     * @param topic leadership topic
+     * @return current leadership
+     */
+    Leadership getLeadership(String topic);
+
+    /**
+     * Return current leadership for all topics.
+     *
+     * @return topic to leadership mapping
+     */
+    Map<String, Leadership> getLeaderships();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java
new file mode 100644
index 0000000..894fdc3
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * {@link LeadershipStore} delegate abstraction.
+ */
+public interface LeadershipStoreDelegate extends StoreDelegate<LeadershipEvent> {
+}
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
index be0321b..792d295 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.cluster;
 
+import java.util.Arrays;
+
 import org.junit.Test;
 
 import com.google.common.testing.EqualsTester;
@@ -28,10 +30,11 @@
 public class LeadershipEventTest {
     private final NodeId node1 = new NodeId("1");
     private final NodeId node2 = new NodeId("2");
-    private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
-    private final Leadership lead2 = new Leadership("topic1", node2, 1L, 2L);
+    private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+    private final Leadership lead2 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1, node2));
+    private final Leadership lead3 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2));
     private final LeadershipEvent event1 =
-            new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, lead1);
+            new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead1);
     private final long time = System.currentTimeMillis();
     private final LeadershipEvent event2 =
             new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
@@ -40,11 +43,9 @@
             new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
                     lead2, time);
     private final LeadershipEvent event3 =
-            new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, lead1);
+            new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead2);
     private final LeadershipEvent event4 =
-            new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead1);
-    private final LeadershipEvent event5 =
-            new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead2);
+            new LeadershipEvent(LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED, lead3);
 
     /**
      * Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -56,7 +57,6 @@
                 .addEqualityGroup(event2, sameAsEvent2)
                 .addEqualityGroup(event3)
                 .addEqualityGroup(event4)
-                .addEqualityGroup(event5)
                 .testEquals();
     }
 
@@ -65,7 +65,7 @@
      */
     @Test
     public void checkConstruction() {
-        assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_ELECTED));
+        assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_CHANGED));
         assertThat(event1.subject(), is(lead1));
 
         assertThat(event2.time(), is(time));
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index e1d421d..72595e4 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -18,7 +18,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Test adapter for leadership service.
@@ -41,13 +40,12 @@
     }
 
     @Override
-    public CompletableFuture<Leadership> runForLeadership(String path) {
+    public Leadership runForLeadership(String path) {
         return null;
     }
 
     @Override
-    public CompletableFuture<Void> withdraw(String path) {
-        return null;
+    public void withdraw(String path) {
     }
 
     @Override
@@ -74,14 +72,4 @@
     public List<NodeId> getCandidates(String path) {
         return null;
     }
-
-    @Override
-    public boolean stepdown(String path) {
-        return false;
-    }
-
-    @Override
-    public boolean makeTopCandidate(String path, NodeId nodeId) {
-        return false;
-    }
 }
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
index e2a8658..defc671 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
@@ -15,12 +15,13 @@
  */
 package org.onosproject.cluster;
 
+import java.util.Arrays;
+
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.testing.EqualsTester;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
@@ -31,16 +32,14 @@
 public class LeadershipTest {
     private final NodeId node1 = new NodeId("1");
     private final NodeId node2 = new NodeId("2");
-    private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
-    private final Leadership sameAsLead1 = new Leadership("topic1", node1, 1L, 2L);
-    private final Leadership lead2 = new Leadership("topic2", node1, 1L, 2L);
-    private final Leadership lead3 = new Leadership("topic1", node1, 2L, 2L);
-    private final Leadership lead4 = new Leadership("topic1", node1, 3L, 2L);
-    private final Leadership lead5 = new Leadership("topic1", node1, 3L, 3L);
-    private final Leadership lead6 = new Leadership("topic1", node1,
-            ImmutableList.of(node2), 1L, 2L);
-    private final Leadership lead7 = new Leadership("topic1",
-            ImmutableList.of(node2), 1L, 2L);
+    private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+    private final Leadership sameAsLead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+    private final Leadership lead2 = new Leadership("topic2", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+    private final Leadership lead3 = new Leadership("topic1", new Leader(node1, 2L, 2L), Arrays.asList(node1));
+    private final Leadership lead4 = new Leadership("topic1", new Leader(node1, 3L, 2L), Arrays.asList(node1));
+    private final Leadership lead5 = new Leadership("topic1", new Leader(node1, 3L, 3L), Arrays.asList(node1));
+    private final Leadership lead6 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2, node1));
+    private final Leadership lead7 = new Leadership("topic1", null, ImmutableList.of());
 
     /**
      * Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -64,12 +63,10 @@
      */
     @Test
     public void checkConstruction() {
-        assertThat(lead6.electedTime(), is(2L));
-        assertThat(lead6.epoch(), is(1L));
-        assertThat(lead6.leader(), is(node1));
+        assertThat(lead6.leader(), is(new Leader(node2, 1L, 2L)));
         assertThat(lead6.topic(), is("topic1"));
-        assertThat(lead6.candidates(), hasSize(1));
-        assertThat(lead6.candidates(), contains(node2));
+        assertThat(lead6.candidates(), hasSize(2));
+        assertThat(lead6.candidates().get(1), is(node1));
+        assertThat(lead6.candidates().get(0), is(node2));
     }
-
 }
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
index 194ffec..b540a1b 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
@@ -17,20 +17,22 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.stream.Collectors;
 
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEvent.Type;
@@ -53,8 +55,15 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
+    private NodeId localNodeId;
+
     private Map<String, Boolean> elections = new ConcurrentHashMap<>();
 
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+    }
+
     @Override
     public NodeId getLeader(String path) {
         return elections.get(path) ? clusterService.getLocalNode().id() : null;
@@ -63,7 +72,8 @@
     @Override
     public Leadership getLeadership(String path) {
         checkArgument(path != null);
-        return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null;
+        return elections.get(path) ?
+                new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId)) : null;
     }
 
     @Override
@@ -77,23 +87,22 @@
     }
 
     @Override
-    public CompletableFuture<Leadership> runForLeadership(String path) {
+    public Leadership runForLeadership(String path) {
         elections.put(path, true);
+        Leadership leadership = new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId));
         for (LeadershipEventListener listener : listeners) {
-            listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
-                    new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
+            listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED, leadership));
         }
-        return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
+        return leadership;
     }
 
     @Override
-    public CompletableFuture<Void> withdraw(String path) {
+    public void withdraw(String path) {
         elections.remove(path);
         for (LeadershipEventListener listener : listeners) {
-            listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
-                    new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
+            listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED,
+                    new Leadership(path, null, Arrays.asList())));
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -122,14 +131,4 @@
     public List<NodeId> getCandidates(String path) {
         return null;
     }
-
-    @Override
-    public boolean stepdown(String path) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean makeTopCandidate(String path, NodeId nodeId) {
-        throw new UnsupportedOperationException();
-    }
 }
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java
new file mode 100644
index 0000000..e9a78d6
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractListenerManager;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of {@link LeadershipService} and {@link LeadershipAdminService}.
+ */
+@Component(immediate = true)
+@Service
+public class LeadershipManager
+    extends AbstractListenerManager<LeadershipEvent, LeadershipEventListener>
+    implements LeadershipService, LeadershipAdminService {
+
+    private final Logger log = getLogger(getClass());
+
+    private LeadershipStoreDelegate delegate = this::post;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipStore store;
+
+    private NodeId localNodeId;
+
+    private final ScheduledExecutorService deadlockDetector =
+            Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/leadership", ""));
+
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+        store.setDelegate(delegate);
+        eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+        deadlockDetector.scheduleWithFixedDelay(() -> clusterService.getNodes()
+                .stream()
+                .map(ControllerNode::id)
+                .filter(id -> clusterService.getState(id) != ControllerNode.State.ACTIVE)
+                .forEach(this::unregister), 0, 2, TimeUnit.SECONDS);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        deadlockDetector.shutdown();
+        Maps.filterValues(store.getLeaderships(), v -> v.candidates().contains(localNodeId))
+            .keySet()
+            .forEach(this::withdraw);
+        store.unsetDelegate(delegate);
+        eventDispatcher.removeSink(LeadershipEvent.class);
+        log.info("Stopped");
+    }
+
+    @Override
+    public Leadership getLeadership(String topic) {
+        return store.getLeadership(topic);
+    }
+
+    @Override
+    public Leadership runForLeadership(String topic) {
+        return store.addRegistration(topic);
+    }
+
+    @Override
+    public void withdraw(String topic) {
+        store.removeRegistration(topic);
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        return store.getLeaderships();
+    }
+
+    @Override
+    public boolean transferLeadership(String topic, NodeId to) {
+        return store.moveLeadership(topic, to);
+    }
+
+    @Override
+    public void unregister(NodeId nodeId) {
+        store.removeRegistration(nodeId);
+    }
+
+    @Override
+    public boolean promoteToTopOfCandidateList(String topic, NodeId nodeId) {
+        return store.makeTopCandidate(topic, nodeId);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
new file mode 100644
index 0000000..c6647b7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
+ */
+@Service
+@Component(immediate = true, enabled = true)
+public class DistributedLeadershipStore
+    extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
+    implements LeadershipStore {
+
+    private static final Logger log = getLogger(DistributedLeadershipStore.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    protected NodeId localNodeId;
+    protected ConsistentMap<String, InternalLeadership> leadershipMap;
+    private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
+            event -> {
+                Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
+                Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
+                boolean leaderChanged =
+                        !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
+                boolean candidatesChanged =
+                        !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
+                                                    ImmutableSet.<NodeId>of() : oldValue.candidates()),
+                                                  Sets.newHashSet(newValue.candidates())).isEmpty();
+                LeadershipEvent.Type eventType = null;
+                if (leaderChanged && candidatesChanged) {
+                    eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
+                }
+                if (leaderChanged && !candidatesChanged) {
+                    eventType = LeadershipEvent.Type.LEADER_CHANGED;
+                }
+                if (!leaderChanged && candidatesChanged) {
+                    eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
+                }
+                notifyDelegate(new LeadershipEvent(eventType, newValue));
+            };
+
+    @Activate
+    public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
+                                      .withName("onos-leadership")
+                                      .withPartitionsDisabled()
+                                      .withRelaxedReadConsistency()
+                                      .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
+                                      .build();
+        leadershipMap.addListener(leadershipChangeListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        leadershipMap.removeListener(leadershipChangeListener);
+        log.info("Stopped");
+    }
+
+    @Override
+    public Leadership addRegistration(String topic) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v == null || !v.candidates().contains(localNodeId),
+                (k, v) -> {
+                    if (v == null || v.candidates().isEmpty()) {
+                        return new InternalLeadership(topic,
+                                localNodeId,
+                                v == null ? 1 : v.term() + 1,
+                                System.currentTimeMillis(),
+                                ImmutableList.of(localNodeId));
+                    }
+                    List<NodeId> newCandidates = new ArrayList<>(v.candidates());
+                    newCandidates.add(localNodeId);
+                    return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
+                });
+        return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
+    }
+
+    @Override
+    public void removeRegistration(String topic) {
+        removeRegistration(topic, localNodeId);
+    }
+
+    private void removeRegistration(String topic, NodeId nodeId) {
+        leadershipMap.computeIf(topic,
+                v -> v != null && v.candidates().contains(nodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = v.candidates()
+                            .stream()
+                            .filter(id -> !nodeId.equals(id))
+                            .collect(Collectors.toList());
+                    NodeId newLeader = nodeId.equals(v.leader()) ?
+                            newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
+                    long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+                            v.term() : v.term() + 1;
+                    long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+                            v.termStartTime() : System.currentTimeMillis();
+                    return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
+                });
+    }
+
+    @Override
+    public void removeRegistration(NodeId nodeId) {
+        leadershipMap.entrySet()
+                                  .stream()
+                                  .filter(e -> e.getValue().value().candidates().contains(nodeId))
+                                  .map(e -> e.getKey())
+                                  .forEach(topic -> this.removeRegistration(topic, nodeId));
+    }
+
+    @Override
+    public boolean moveLeadership(String topic, NodeId toNodeId) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v != null &&
+                    v.candidates().contains(toNodeId) &&
+                    !Objects.equal(v.leader(), toNodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = new ArrayList<>();
+                    newCandidates.add(toNodeId);
+                    newCandidates.addAll(v.candidates()
+                            .stream()
+                            .filter(id -> !toNodeId.equals(id))
+                            .collect(Collectors.toList()));
+                    return new InternalLeadership(topic,
+                            toNodeId,
+                            v.term() + 1,
+                            System.currentTimeMillis(),
+                            newCandidates);
+                });
+        return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
+    }
+
+    @Override
+    public boolean makeTopCandidate(String topic, NodeId nodeId) {
+        Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+                v -> v != null &&
+                v.candidates().contains(nodeId) &&
+                !v.candidates().get(0).equals(nodeId),
+                (k, v) -> {
+                    List<NodeId> newCandidates = new ArrayList<>();
+                    newCandidates.add(nodeId);
+                    newCandidates.addAll(v.candidates()
+                            .stream()
+                            .filter(id -> !nodeId.equals(id))
+                            .collect(Collectors.toList()));
+                    return new InternalLeadership(topic,
+                            v.leader(),
+                            v.term(),
+                            System.currentTimeMillis(),
+                            newCandidates);
+                });
+        return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
+    }
+
+    @Override
+    public Leadership getLeadership(String topic) {
+        return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderships() {
+        Map<String, Leadership> leaderships = Maps.newHashMap();
+        leadershipMap.entrySet().forEach(e -> {
+            leaderships.put(e.getKey(), e.getValue().value().asLeadership());
+        });
+        return ImmutableMap.copyOf(leaderships);
+    }
+
+    private static class InternalLeadership {
+        private final String topic;
+        private final NodeId leader;
+        private final long term;
+        private final long termStartTime;
+        private final List<NodeId> candidates;
+
+        public InternalLeadership(String topic,
+                NodeId leader,
+                long term,
+                long termStartTime,
+                List<NodeId> candidates) {
+            this.topic = topic;
+            this.leader = leader;
+            this.term = term;
+            this.termStartTime = termStartTime;
+            this.candidates = ImmutableList.copyOf(candidates);
+        }
+
+        public NodeId leader() {
+            return this.leader;
+        }
+
+        public long term() {
+            return term;
+        }
+
+        public long termStartTime() {
+            return termStartTime;
+        }
+
+        public List<NodeId> candidates() {
+            return candidates;
+        }
+
+        public Leadership asLeadership() {
+            return new Leadership(topic, leader == null ?
+                    null : new Leader(leader, term, termStartTime), candidates);
+        }
+
+        public static Leadership toLeadership(InternalLeadership internalLeadership) {
+            return internalLeadership == null ? null : internalLeadership.asLeadership();
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("leader", leader)
+                    .add("term", term)
+                    .add("termStartTime", termStartTime)
+                    .add("candidates", candidates)
+                    .toString();
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index bfb4754..d2c63f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -21,8 +21,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.Leadership;
@@ -76,7 +74,6 @@
 
     private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
     private LeadershipEventListener leaderListener = new InternalLeadershipListener();
-    private ClusterEventListener clusterListener = new InternalClusterEventListener();
 
     private ScheduledExecutorService executor = Executors
             .newScheduledThreadPool(1);
@@ -84,7 +81,6 @@
     @Activate
     public void activate() {
         leadershipService.addListener(leaderListener);
-        clusterService.addListener(clusterListener);
 
         listenerRegistry = new ListenerRegistry<>();
         eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
@@ -103,7 +99,6 @@
 
         eventDispatcher.removeSink(IntentPartitionEvent.class);
         leadershipService.removeListener(leaderListener);
-        clusterService.removeListener(clusterListener);
     }
 
     /**
@@ -180,7 +175,7 @@
 
         List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
                 .stream()
-                .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
+                .filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
                 .filter(l -> l.topic().startsWith(ELECTION_PREFIX))
                 .collect(Collectors.toList());
 
@@ -220,24 +215,16 @@
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
 
-            if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
+            if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
                     leadership.topic().startsWith(ELECTION_PREFIX)) {
 
-                // See if we need to let some partitions go
-                scheduleRebalance(0);
-
                 eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
                                                         leadership.topic()));
             }
-        }
-    }
 
-    private final class InternalClusterEventListener implements
-            ClusterEventListener {
-
-        @Override
-        public void event(ClusterEvent event) {
-            scheduleRebalance(0);
+            if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
+                scheduleRebalance(0);
+            }
         }
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.mastership.impl;
 
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
 import org.slf4j.Logger;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
     protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipAdminService leadershipAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
     private NodeId localNodeId;
-    private final Set<DeviceId> connectedDevices = Sets.newHashSet();
 
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
             new MessageSubject("mastership-store-device-role-relinquish");
-    private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
-            new MessageSubject("mastership-store-device-mastership-relinquish");
 
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
                 this::relinquishLocalRole,
                 SERIALIZER::encode,
                 messageHandlingExecutor);
-        clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                SERIALIZER::decode,
-                this::transitionFromMasterToStandby,
-                SERIALIZER::encode,
-                messageHandlingExecutor);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
@@ -146,7 +141,6 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
-        clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
         messageHandlingExecutor.shutdown();
         transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        connectedDevices.add(deviceId);
-        return leadershipService.runForLeadership(leadershipTopic)
-                .thenApply(leadership -> {
-                    return Objects.equal(localNodeId, leadership.leader())
-                            ? MastershipRole.MASTER : MastershipRole.STANDBY;
-                });
+        Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+        return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+                ? MastershipRole.MASTER : MastershipRole.STANDBY);
     }
 
     @Override
@@ -173,20 +164,19 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId leader = leadershipService.getLeader(leadershipTopic);
-        if (Objects.equal(nodeId, leader)) {
-            return MastershipRole.MASTER;
-        }
-        return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
-                MastershipRole.STANDBY : MastershipRole.NONE;
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+        List<NodeId> candidates = leadership == null ?
+                ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+        return Objects.equal(nodeId, leader) ?
+                MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        return leadershipService.getLeader(leadershipTopic);
+        return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
     }
 
     @Override
@@ -194,9 +184,8 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService
-            .getNodes()
-            .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+        clusterService.getNodes()
+                      .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
 
         NodeId master = null;
         final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        NodeId currentMaster = getMaster(deviceId);
-        if (nodeId.equals(currentMaster)) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            String leadershipTopic = createDeviceMastershipTopic(deviceId);
-            List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
-            if (candidates.isEmpty()) {
-                return  CompletableFuture.completedFuture(null);
-            }
-            if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
-                CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
-                // There is brief wait before we step down from mastership.
-                // This is to ensure any work that happens when standby preference
-                // order changes can complete. For example: flow entries need to be backed
-                // to the new top standby (ONOS-1883)
-                // FIXME: This potentially introduces a race-condition.
-                // Right now role changes are only forced via CLI.
-                transferExecutor.schedule(() -> {
-                    result.complete(transitionFromMasterToStandby(deviceId));
-                }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
-                return result;
-            } else {
-                log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
-            }
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+            transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+                    WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -267,7 +236,7 @@
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         Leadership leadership = leadershipService.getLeadership(leadershipTopic);
-        return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+        return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
     }
 
     @Override
@@ -318,71 +287,44 @@
     private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        // Check if this node is can be managed by this node.
-        if (!connectedDevices.contains(deviceId)) {
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
             return CompletableFuture.completedFuture(null);
         }
-
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
-        MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
-            ? MastershipEvent.Type.MASTER_CHANGED
-            : MastershipEvent.Type.BACKUPS_CHANGED;
-
-        connectedDevices.remove(deviceId);
-        return leadershipService.withdraw(leadershipTopic)
-                                .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
-    }
-
-    private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
-        checkArgument(deviceId != null, DEVICE_ID_NULL);
-
-        NodeId currentMaster = getMaster(deviceId);
-        if (currentMaster == null) {
-            return null;
-        }
-
-        if (!currentMaster.equals(localNodeId)) {
-            log.info("Forwarding request to relinquish "
-                    + "mastership for device {} to {}", deviceId, currentMaster);
-            return futureGetOrElse(clusterCommunicator.sendAndReceive(
-                    deviceId,
-                    TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                    SERIALIZER::encode,
-                    SERIALIZER::decode,
-                    currentMaster), null);
-        }
-
-        return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
-                ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+        MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+                MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+        leadershipService.withdraw(leadershipTopic);
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
     }
 
     @Override
     public void relinquishAllRole(NodeId nodeId) {
-        // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+        // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            return isDeviceMastershipTopic(leadership.topic());
+        }
+
         @Override
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
-            if (!isDeviceMastershipTopic(leadership.topic())) {
-                return;
-            }
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+            RoleInfo roleInfo = getNodes(deviceId);
             switch (event.type()) {
-            case LEADER_ELECTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_AND_CANDIDATES_CHANGED:
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
-            case LEADER_REELECTED:
-                // There is no concept of leader re-election in the new distributed leadership manager.
-                throw new IllegalStateException("Unexpected event type");
-            case LEADER_BOOTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_CHANGED:
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
             case CANDIDATES_CHANGED:
-                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
                 break;
             default:
                 return;
@@ -407,5 +349,4 @@
         Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
         return m.matches();
     }
-
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
index 5ba0c7c..0c92280 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
@@ -22,6 +22,7 @@
 import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.Leader;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
@@ -31,13 +32,12 @@
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.net.intent.Key;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import static junit.framework.TestCase.assertFalse;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -55,9 +55,10 @@
 public class IntentPartitionManagerTest {
 
     private final LeadershipEvent event
-            = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED,
+            = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
                                   new Leadership(ELECTION_PREFIX + "0",
-                                                 MY_NODE_ID, 0, 0));
+                                                 new Leader(MY_NODE_ID, 0, 0),
+                                                 Arrays.asList(MY_NODE_ID, OTHER_NODE_ID)));
 
     private static final NodeId MY_NODE_ID = new NodeId("local");
     private static final NodeId OTHER_NODE_ID = new NodeId("other");
@@ -78,7 +79,7 @@
         expectLastCall().andDelegateTo(new TestLeadershipService());
         for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
             expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
-                .andReturn(CompletableFuture.completedFuture(null))
+                .andReturn(null)
                 .times(1);
         }
 
@@ -105,7 +106,9 @@
             expect(leadershipService.getLeader(ELECTION_PREFIX + i))
                     .andReturn(MY_NODE_ID).anyTimes();
             leaderBoard.put(ELECTION_PREFIX + i,
-                            new Leadership(ELECTION_PREFIX + i, MY_NODE_ID, 0, 0));
+                            new Leadership(ELECTION_PREFIX + i,
+                                    new Leader(MY_NODE_ID, 0, 0),
+                                    Arrays.asList(MY_NODE_ID)));
         }
 
         for (int i = numMine; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
@@ -113,7 +116,9 @@
                     .andReturn(OTHER_NODE_ID).anyTimes();
 
             leaderBoard.put(ELECTION_PREFIX + i,
-                            new Leadership(ELECTION_PREFIX + i, OTHER_NODE_ID, 0, 0));
+                            new Leadership(ELECTION_PREFIX + i,
+                                    new Leader(OTHER_NODE_ID, 0, 0),
+                                    Arrays.asList(OTHER_NODE_ID)));
         }
 
         expect(leadershipService.getLeaderBoard()).andReturn(leaderBoard).anyTimes();
@@ -131,7 +136,7 @@
 
         for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
             expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
-                .andReturn(CompletableFuture.completedFuture(null))
+                .andReturn(null)
                 .times(1);
         }
 
@@ -200,9 +205,8 @@
         // We have all the partitions so we'll need to relinquish some
         setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
 
-        expect(leadershipService.withdraw(anyString()))
-                                 .andReturn(CompletableFuture.completedFuture(null))
-                                 .times(7);
+        leadershipService.withdraw(anyString());
+        expectLastCall().times(7);
 
         replay(leadershipService);
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
deleted file mode 100644
index db55a85..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEvent.Type;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
-import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
-
-/**
- * Distributed Lock Manager implemented on top of ConsistentMap.
- * <p>
- * This implementation makes use of ClusterService's failure
- * detection capabilities to detect and purge stale locks.
- * TODO: Ensure lock safety and liveness.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class DistributedLeadershipManager implements LeadershipService {
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected StorageService storageService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected EventDeliveryService eventDispatcher;
-
-    private final Logger log = getLogger(getClass());
-
-    private ScheduledExecutorService electionRunner;
-    private ScheduledExecutorService lockExecutor;
-    private ScheduledExecutorService staleLeadershipPurgeExecutor;
-    private ScheduledExecutorService leadershipRefresher;
-
-    // leader for each topic
-    private ConsistentMap<String, NodeId> leaderMap;
-    // list of candidates (includes chosen leader) for each topic
-    private ConsistentMap<String, List<NodeId>> candidateMap;
-
-    private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
-
-    // cached copy of leaderMap
-    // Note: Map value, Leadership, does not contain proper candidates info
-    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
-    // cached copy of candidateMap
-    // Note: Map value, Leadership, does not contain proper leader info
-    private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
-
-    private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
-
-    private NodeId localNodeId;
-    private Set<String> activeTopics = Sets.newConcurrentHashSet();
-    private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
-
-    // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
-    private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
-    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
-    private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
-    private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
-
-    private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
-
-    private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
-
-    @Activate
-    public void activate() {
-        leaderMap = storageService.<String, NodeId>consistentMapBuilder()
-                .withName("onos-topic-leaders")
-                .withSerializer(SERIALIZER)
-                .withPartitionsDisabled().build();
-        candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
-                .withName("onos-topic-candidates")
-                .withSerializer(SERIALIZER)
-                .withPartitionsDisabled().build();
-
-        leaderMap.addListener(event -> {
-            log.debug("Received {}", event);
-            LeadershipEvent.Type leadershipEventType = null;
-            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
-                leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
-            } else if (event.type() == MapEvent.Type.REMOVE) {
-                leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
-            }
-            onLeadershipEvent(new LeadershipEvent(
-                    leadershipEventType,
-                    new Leadership(event.key(),
-                            event.value().value(),
-                            event.value().version(),
-                            event.value().creationTime())));
-        });
-
-        candidateMap.addListener(event -> {
-            log.debug("Received {}", event);
-            if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
-                log.error("Entries must not be removed from candidate map");
-                return;
-            }
-            onLeadershipEvent(new LeadershipEvent(
-                    LeadershipEvent.Type.CANDIDATES_CHANGED,
-                    new Leadership(event.key(),
-                            event.value().value(),
-                            event.value().version(),
-                            event.value().creationTime())));
-        });
-
-        localNodeId = clusterService.getLocalNode().id();
-
-        electionRunner = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/store/leadership", "election-runner"));
-        lockExecutor = Executors.newScheduledThreadPool(
-                4, groupedThreads("onos/store/leadership", "election-thread-%d"));
-        staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
-        leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/store/leadership", "refresh-thread"));
-
-        clusterService.addListener(clusterEventListener);
-
-        electionRunner.scheduleWithFixedDelay(
-                this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
-
-        leadershipRefresher.scheduleWithFixedDelay(
-                this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
-
-        listenerRegistry = new ListenerRegistry<>();
-        eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
-
-        log.info("Started");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        if (clusterService.getNodes().size() > 1) {
-            // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
-            leaderBoard.forEach((topic, leadership) -> {
-                if (localNodeId.equals(leadership.leader())) {
-                    withdraw(topic);
-                }
-            });
-        }
-
-        clusterService.removeListener(clusterEventListener);
-        eventDispatcher.removeSink(LeadershipEvent.class);
-
-        electionRunner.shutdown();
-        lockExecutor.shutdown();
-        staleLeadershipPurgeExecutor.shutdown();
-        leadershipRefresher.shutdown();
-
-        log.info("Stopped");
-    }
-
-    @Override
-    public Map<String, Leadership> getLeaderBoard() {
-        return ImmutableMap.copyOf(leaderBoard);
-    }
-
-    @Override
-    public Map<String, List<NodeId>> getCandidates() {
-        return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
-    }
-
-    @Override
-    public List<NodeId> getCandidates(String path) {
-        Leadership current = candidateBoard.get(path);
-        return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
-    }
-
-    @Override
-    public NodeId getLeader(String path) {
-        Leadership leadership = leaderBoard.get(path);
-        return leadership != null ? leadership.leader() : null;
-    }
-
-    @Override
-    public Leadership getLeadership(String path) {
-        checkArgument(path != null);
-        return leaderBoard.get(path);
-    }
-
-    @Override
-    public Set<String> ownedTopics(NodeId nodeId) {
-        checkArgument(nodeId != null);
-        return leaderBoard.entrySet()
-                    .stream()
-                    .filter(entry -> nodeId.equals(entry.getValue().leader()))
-                    .map(Entry::getKey)
-                    .collect(Collectors.toSet());
-    }
-
-    @Override
-    public CompletableFuture<Leadership> runForLeadership(String path) {
-        log.debug("Running for leadership for topic: {}", path);
-        CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
-        doRunForLeadership(path, resultFuture);
-        return resultFuture;
-    }
-
-    private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
-        try {
-            Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
-                    currentList -> currentList == null || !currentList.contains(localNodeId),
-                    (topic, currentList) -> {
-                        if (currentList == null) {
-                            return ImmutableList.of(localNodeId);
-                        } else {
-                            List<NodeId> newList = Lists.newLinkedList();
-                            newList.addAll(currentList);
-                            newList.add(localNodeId);
-                            return newList;
-                        }
-                    });
-            log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
-            activeTopics.add(path);
-            Leadership leadership = electLeader(path, candidates.value());
-            if (leadership == null) {
-                pendingFutures.put(path, future);
-            } else {
-                future.complete(leadership);
-            }
-        } catch (ConsistentMapException e) {
-            log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
-            rerunForLeadership(path, future);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> withdraw(String path) {
-        activeTopics.remove(path);
-        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        doWithdraw(path, resultFuture);
-        return resultFuture;
-    }
-
-
-    private void doWithdraw(String path, CompletableFuture<Void> future) {
-        if (activeTopics.contains(path)) {
-            future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
-        }
-        try {
-            leaderMap.computeIf(path,
-                                localNodeId::equals,
-                                (topic, leader) -> null);
-            candidateMap.computeIf(path,
-                                   candidates -> candidates != null && candidates.contains(localNodeId),
-                                   (topic, candidates) -> candidates.stream()
-                                                                    .filter(nodeId -> !localNodeId.equals(nodeId))
-                                                                    .collect(Collectors.toList()));
-            future.complete(null);
-        } catch (Exception e) {
-            log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
-            retryWithdraw(path, future);
-        }
-    }
-
-    @Override
-    public boolean stepdown(String path) {
-        if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
-            return false;
-        }
-
-        try {
-            return leaderMap.computeIf(path,
-                                       localNodeId::equals,
-                                       (topic, leader) -> null) == null;
-        } catch (Exception e) {
-            log.warn("Error executing stepdown for {}", path, e);
-        }
-        return false;
-    }
-
-    @Override
-    public void addListener(LeadershipEventListener listener) {
-        listenerRegistry.addListener(listener);
-    }
-
-    @Override
-    public void removeListener(LeadershipEventListener listener) {
-        listenerRegistry.removeListener(listener);
-    }
-
-    @Override
-    public boolean makeTopCandidate(String path, NodeId nodeId) {
-        Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
-                candidates -> candidates != null &&
-                              candidates.contains(nodeId) &&
-                              !nodeId.equals(Iterables.getFirst(candidates, null)),
-                (topic, candidates) -> {
-                    List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
-                    updatedCandidates.add(nodeId);
-                    candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
-                    return updatedCandidates;
-                });
-        List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
-        return candidates.size() > 0 && nodeId.equals(candidates.get(0));
-    }
-
-    private Leadership electLeader(String path, List<NodeId> candidates) {
-        Leadership currentLeadership = getLeadership(path);
-        if (currentLeadership != null) {
-            return currentLeadership;
-        } else {
-            NodeId topCandidate = candidates
-                        .stream()
-                        .filter(n -> clusterService.getState(n) == ACTIVE)
-                        .findFirst()
-                        .orElse(null);
-            try {
-                Versioned<NodeId> leader = localNodeId.equals(topCandidate)
-                        ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
-                if (leader != null) {
-                    Leadership newLeadership = new Leadership(path,
-                            leader.value(),
-                            leader.version(),
-                            leader.creationTime());
-                    // Since reads only go through the local copy of leader board, we ought to update it
-                    // first before returning from this method.
-                    // This is to ensure a subsequent read will not read a stale value.
-                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
-                    return newLeadership;
-                }
-            } catch (Exception e) {
-                log.debug("Failed to elect leader for {}", path, e);
-            }
-        }
-        return null;
-    }
-
-    private void electLeaders() {
-        try {
-            candidateMap.entrySet().forEach(entry -> {
-                String path = entry.getKey();
-                Versioned<List<NodeId>> candidates = entry.getValue();
-                // for active topics, check if this node can become a leader (if it isn't already)
-                if (activeTopics.contains(path)) {
-                    lockExecutor.submit(() -> {
-                        Leadership leadership = electLeader(path, candidates.value());
-                        if (leadership != null) {
-                            CompletableFuture<Leadership> future = pendingFutures.remove(path);
-                            if (future != null) {
-                                future.complete(leadership);
-                            }
-                        }
-                    });
-                }
-                // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
-                // and also to update local listeners.
-                // Don't worry about duplicate events as they will be suppressed.
-                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
-                                                      new Leadership(path,
-                                                                     candidates.value(),
-                                                                     candidates.version(),
-                                                                     candidates.creationTime())));
-            });
-        } catch (Exception e) {
-            log.debug("Failure electing leaders", e);
-        }
-    }
-
-    private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
-        log.trace("Leadership Event: time = {} type = {} event = {}",
-                leadershipEvent.time(), leadershipEvent.type(),
-                leadershipEvent);
-
-        Leadership leadershipUpdate = leadershipEvent.subject();
-        LeadershipEvent.Type eventType = leadershipEvent.type();
-        String topic = leadershipUpdate.topic();
-
-        AtomicBoolean updateAccepted = new AtomicBoolean(false);
-        if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
-            leaderBoard.compute(topic, (k, currentLeadership) -> {
-                if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
-                    updateAccepted.set(true);
-                    return leadershipUpdate;
-                }
-                return currentLeadership;
-            });
-        } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
-            leaderBoard.compute(topic, (k, currentLeadership) -> {
-                if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
-                    updateAccepted.set(true);
-                    // FIXME: Removing entries from leaderboard is not safe and should be visited.
-                    return null;
-                }
-                return currentLeadership;
-            });
-        } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
-            candidateBoard.compute(topic, (k, currentInfo) -> {
-                if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
-                    updateAccepted.set(true);
-                    return leadershipUpdate;
-                }
-                return currentInfo;
-            });
-        } else {
-            throw new IllegalStateException("Unknown event type.");
-        }
-
-        if (updateAccepted.get()) {
-            eventDispatcher.post(leadershipEvent);
-        }
-    }
-
-    private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
-        lockExecutor.schedule(
-                () -> doRunForLeadership(path, future),
-                RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
-                TimeUnit.MILLISECONDS);
-    }
-
-    private void retryWithdraw(String path, CompletableFuture<Void> future) {
-        lockExecutor.schedule(
-                () -> doWithdraw(path, future),
-                RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
-                TimeUnit.MILLISECONDS);
-    }
-
-    private void scheduleStaleLeadershipPurge(int afterDelaySec) {
-        if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
-            staleLeadershipPurgeExecutor.schedule(
-                    this::purgeStaleLeadership,
-                    afterDelaySec,
-                    TimeUnit.SECONDS);
-        }
-    }
-
-    /**
-     * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
-     */
-    private void purgeStaleLeadership() {
-        AtomicBoolean rerunPurge = new AtomicBoolean(false);
-        try {
-            staleLeadershipPurgeScheduled.set(false);
-            leaderMap.entrySet()
-                .stream()
-                .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
-                .forEach(entry -> {
-                    String path = entry.getKey();
-                    NodeId nodeId = entry.getValue().value();
-                    try {
-                        leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
-                    } catch (Exception e) {
-                        log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
-                        rerunPurge.set(true);
-                    }
-                });
-
-            candidateMap.entrySet()
-                .forEach(entry -> {
-                    String path = entry.getKey();
-                    Versioned<List<NodeId>> candidates = entry.getValue();
-                    List<NodeId> candidatesList = candidates != null
-                            ? candidates.value() : Collections.emptyList();
-                    List<NodeId> activeCandidatesList =
-                            candidatesList.stream()
-                                          .filter(n -> clusterService.getState(n) == ACTIVE)
-                                          .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
-                                          .collect(Collectors.toList());
-                    if (activeCandidatesList.size() < candidatesList.size()) {
-                        Set<NodeId> removedCandidates =
-                                Sets.difference(Sets.newHashSet(candidatesList),
-                                                Sets.newHashSet(activeCandidatesList));
-                        try {
-                            candidateMap.computeIf(path,
-                                        c -> c.stream()
-                                              .filter(n -> clusterService.getState(n) == INACTIVE)
-                                              .count() > 0,
-                                        (topic, c) -> c.stream()
-                                                       .filter(n -> clusterService.getState(n) == ACTIVE)
-                                                       .filter(n -> !localNodeId.equals(n) ||
-                                                                   activeTopics.contains(path))
-                                                       .collect(Collectors.toList()));
-                        } catch (Exception e) {
-                            log.debug("Failed to evict inactive candidates {} from "
-                                    + "candidate list for {}", removedCandidates, path, e);
-                            rerunPurge.set(true);
-                        }
-                    }
-                });
-        } catch (Exception e) {
-            log.debug("Failure purging state leadership.", e);
-            rerunPurge.set(true);
-        }
-
-        if (rerunPurge.get()) {
-            log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
-            scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
-        }
-    }
-
-    private void refreshLeaderBoard() {
-        try {
-            Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
-            leaderMap.entrySet().forEach(entry -> {
-                String path = entry.getKey();
-                Versioned<NodeId> leader = entry.getValue();
-                Leadership leadership = new Leadership(path,
-                                                       leader.value(),
-                                                       leader.version(),
-                                                       leader.creationTime());
-                newLeaderBoard.put(path, leadership);
-            });
-
-            // first take snapshot of current leader board.
-            Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
-
-            MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
-
-            // evict stale leaders
-            diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
-                log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
-                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
-            });
-
-            // add missing leaders
-            diff.entriesOnlyOnRight().forEach((path, leadership) -> {
-                log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
-                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
-            });
-
-            // add updated leaders
-            diff.entriesDiffering().forEach((path, difference) -> {
-                Leadership current = difference.leftValue();
-                Leadership updated = difference.rightValue();
-                if (current.epoch() < updated.epoch()) {
-                    log.debug("Updated {} in leaderboard.", updated);
-                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
-                }
-            });
-        } catch (Exception e) {
-            log.debug("Failed to refresh leader board", e);
-        }
-    }
-
-    private class InternalClusterEventListener implements ClusterEventListener {
-
-        @Override
-        public void event(ClusterEvent event) {
-            if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
-                scheduleStaleLeadershipPurge(0);
-            }
-        }
-    }
-}