LeadershipStore updates:
 - Now tracking leader and candidates for a topic using a single map.
 - Using term numbers that are incremented by one every time a new leader is elected.
 - Introduced a separate LeadershipStore to conform to the  manager-store pattern

Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.mastership.impl;
 
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
 import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
 import org.slf4j.Logger;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 /**
  * Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
     protected LeadershipService leadershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipAdminService leadershipAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
     private NodeId localNodeId;
-    private final Set<DeviceId> connectedDevices = Sets.newHashSet();
 
     private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
             new MessageSubject("mastership-store-device-role-relinquish");
-    private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
-            new MessageSubject("mastership-store-device-mastership-relinquish");
 
     private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
             Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
                 this::relinquishLocalRole,
                 SERIALIZER::encode,
                 messageHandlingExecutor);
-        clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                SERIALIZER::decode,
-                this::transitionFromMasterToStandby,
-                SERIALIZER::encode,
-                messageHandlingExecutor);
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
@@ -146,7 +141,6 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
-        clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
         messageHandlingExecutor.shutdown();
         transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        connectedDevices.add(deviceId);
-        return leadershipService.runForLeadership(leadershipTopic)
-                .thenApply(leadership -> {
-                    return Objects.equal(localNodeId, leadership.leader())
-                            ? MastershipRole.MASTER : MastershipRole.STANDBY;
-                });
+        Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+        return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+                ? MastershipRole.MASTER : MastershipRole.STANDBY);
     }
 
     @Override
@@ -173,20 +164,19 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId leader = leadershipService.getLeader(leadershipTopic);
-        if (Objects.equal(nodeId, leader)) {
-            return MastershipRole.MASTER;
-        }
-        return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
-                MastershipRole.STANDBY : MastershipRole.NONE;
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+        List<NodeId> candidates = leadership == null ?
+                ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+        return Objects.equal(nodeId, leader) ?
+                MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
     }
 
     @Override
     public NodeId getMaster(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        return leadershipService.getLeader(leadershipTopic);
+        return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
     }
 
     @Override
@@ -194,9 +184,8 @@
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         Map<NodeId, MastershipRole> roles = Maps.newHashMap();
-        clusterService
-            .getNodes()
-            .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+        clusterService.getNodes()
+                      .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
 
         NodeId master = null;
         final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        NodeId currentMaster = getMaster(deviceId);
-        if (nodeId.equals(currentMaster)) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            String leadershipTopic = createDeviceMastershipTopic(deviceId);
-            List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
-            if (candidates.isEmpty()) {
-                return  CompletableFuture.completedFuture(null);
-            }
-            if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
-                CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
-                // There is brief wait before we step down from mastership.
-                // This is to ensure any work that happens when standby preference
-                // order changes can complete. For example: flow entries need to be backed
-                // to the new top standby (ONOS-1883)
-                // FIXME: This potentially introduces a race-condition.
-                // Right now role changes are only forced via CLI.
-                transferExecutor.schedule(() -> {
-                    result.complete(transitionFromMasterToStandby(deviceId));
-                }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
-                return result;
-            } else {
-                log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
-            }
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+            transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+                    WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -267,7 +236,7 @@
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         Leadership leadership = leadershipService.getLeadership(leadershipTopic);
-        return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+        return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
     }
 
     @Override
@@ -318,71 +287,44 @@
     private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
-        // Check if this node is can be managed by this node.
-        if (!connectedDevices.contains(deviceId)) {
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
             return CompletableFuture.completedFuture(null);
         }
-
-        String leadershipTopic = createDeviceMastershipTopic(deviceId);
-        NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
-        MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
-            ? MastershipEvent.Type.MASTER_CHANGED
-            : MastershipEvent.Type.BACKUPS_CHANGED;
-
-        connectedDevices.remove(deviceId);
-        return leadershipService.withdraw(leadershipTopic)
-                                .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
-    }
-
-    private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
-        checkArgument(deviceId != null, DEVICE_ID_NULL);
-
-        NodeId currentMaster = getMaster(deviceId);
-        if (currentMaster == null) {
-            return null;
-        }
-
-        if (!currentMaster.equals(localNodeId)) {
-            log.info("Forwarding request to relinquish "
-                    + "mastership for device {} to {}", deviceId, currentMaster);
-            return futureGetOrElse(clusterCommunicator.sendAndReceive(
-                    deviceId,
-                    TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
-                    SERIALIZER::encode,
-                    SERIALIZER::decode,
-                    currentMaster), null);
-        }
-
-        return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
-                ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+        MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+                MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+        leadershipService.withdraw(leadershipTopic);
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
     }
 
     @Override
     public void relinquishAllRole(NodeId nodeId) {
-        // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+        // Noop. LeadershipService already takes care of detecting and purging stale locks.
     }
 
     private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            return isDeviceMastershipTopic(leadership.topic());
+        }
+
         @Override
         public void event(LeadershipEvent event) {
             Leadership leadership = event.subject();
-            if (!isDeviceMastershipTopic(leadership.topic())) {
-                return;
-            }
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+            RoleInfo roleInfo = getNodes(deviceId);
             switch (event.type()) {
-            case LEADER_ELECTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_AND_CANDIDATES_CHANGED:
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
-            case LEADER_REELECTED:
-                // There is no concept of leader re-election in the new distributed leadership manager.
-                throw new IllegalStateException("Unexpected event type");
-            case LEADER_BOOTED:
-                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+            case LEADER_CHANGED:
+                notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
                 break;
             case CANDIDATES_CHANGED:
-                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
                 break;
             default:
                 return;
@@ -407,5 +349,4 @@
         Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
         return m.matches();
     }
-
-}
+}
\ No newline at end of file