DistributedLeadershipManager tracks topic election candidates in addition to
leaders. Includes update to leaders CLI command to list candidates.

part of: Device Mastership store on top of LeadershipService
Reference: ONOS-76

Conflicts:
	core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java

Change-Id: I587bb9e9ad16a9c8392969dde45001181053e5e6
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index a60da76..07b5b2b 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -17,12 +17,15 @@
 
 import java.util.Comparator;
 import java.util.Map;
+import java.util.List;
 
 import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
 import org.onlab.util.Tools;
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,6 +39,12 @@
 public class LeaderCommand extends AbstractShellCommand {
 
     private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
+    private static final String FMT_C = "%-20s | %-15s | %-19s |";
+
+    @Option(name = "-c", aliases = "--candidates",
+            description = "List candidate Nodes for each topic's leadership race",
+            required = false, multiValued = false)
+    private boolean showCandidates = false;
 
     /**
      * Compares leaders, sorting by toString() output.
@@ -75,6 +84,28 @@
         print("--------------------------------------------------------------");
     }
 
+    private void displayCandidates(Map<String, Leadership> leaderBoard,
+            Map<String, List<NodeId>> candidates) {
+        print("--------------------------------------------------------------");
+        print(FMT_C, "Topic", "Leader", "Candidates");
+        print("--------------------------------------------------------------");
+        leaderBoard
+                .values()
+                .stream()
+                .sorted(leadershipComparator)
+                .forEach(l -> {
+                        List<NodeId> list = candidates.get(l.topic());
+                        print(FMT_C,
+                            l.topic(),
+                            l.leader(),
+                            list.remove(0).toString());
+                            // formatting hacks to get it into a table
+                            list.forEach(n -> print(FMT_C, " ", " ", n));
+                            print(FMT_C, " ", " ", " ");
+                        });
+        print("--------------------------------------------------------------");
+    }
+
     /**
      * Returns JSON node representing the leaders.
      *
@@ -91,6 +122,7 @@
                             mapper.createObjectNode()
                                 .put("topic", l.topic())
                                 .put("leader", l.leader().toString())
+                                .put("candidates", l.candidates().toString())
                                 .put("epoch", l.epoch())
                                 .put("electedTime", Tools.timeAgo(l.electedTime()))));
 
@@ -106,7 +138,12 @@
         if (outputJson()) {
             print("%s", json(leaderBoard));
         } else {
-            displayLeaders(leaderBoard);
+            if (showCandidates) {
+                Map<String, List<NodeId>> candidates = leaderService.getCandidates();
+                displayCandidates(leaderBoard, candidates);
+            } else {
+                displayLeaders(leaderBoard);
+            }
         }
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index ac0036e..c4f59be 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -49,11 +49,10 @@
         LEADER_BOOTED,
 
         /**
-         * Signifies that the list of candidates for leadership for a resource
-         * has changed. If the change in the backups list is accompanied by a
-         * change in the leader, the event is subsumed by the leadership change.
+         * Signifies that the list of candidates for leadership for a topic has
+         * changed.
          */
-        LEADER_CANDIDATES_CHANGED
+        CANDIDATES_CHANGED
     }
 
     /**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 65ec687..15a198c 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -17,6 +17,7 @@
 
 import java.util.Map;
 import java.util.Set;
+import java.util.List;
 
 /**
  * Service for leader election.
@@ -67,6 +68,19 @@
     Map<String, Leadership> getLeaderBoard();
 
     /**
+     * Returns the candidates for all known topics.
+     * @return A map of topics to lists of NodeIds.
+     */
+    Map<String, List<NodeId>> getCandidates();
+
+    /**
+     * Returns the candidates for a given topic.
+     * @param path topic
+     * @return A lists of NodeIds, which may be empty.
+     */
+    List<NodeId> getCandidates(String path);
+
+    /**
      * Registers a event listener to be notified of leadership events.
      * @param listener listener that will asynchronously notified of leadership events.
      */
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index 818edb9..02742d4 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.cluster;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -62,4 +63,14 @@
     public void removeListener(LeadershipEventListener listener) {
 
     }
+
+    @Override
+    public Map<String, List<NodeId>> getCandidates() {
+        return null;
+    }
+
+    @Override
+    public List<NodeId> getCandidates(String path) {
+        return null;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index d4b46ab..2f6a149 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -46,6 +46,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -573,4 +574,14 @@
                 eventDispatcher.post(leadershipEvent);
         }
     }
+
+    @Override
+    public Map<String, List<NodeId>> getCandidates() {
+        return null;
+    }
+
+    @Override
+    public List<NodeId> getCandidates(String path) {
+        return null;
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 39b4844..8bebff7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -1,8 +1,11 @@
 package org.onosproject.store.consistent.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +15,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipEvent;
 import org.onosproject.cluster.LeadershipEventListener;
@@ -24,8 +28,8 @@
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
@@ -35,6 +39,7 @@
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -77,7 +82,9 @@
     private ScheduledExecutorService deadLockDetectionExecutor;
     private ScheduledExecutorService leadershipStatusBroadcaster;
 
-    private ConsistentMap<String, NodeId> lockMap;
+    private ConsistentMap<String, NodeId> leaderMap;
+    private ConsistentMap<String, List<NodeId>> candidateMap;
+
     private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
         listenerRegistry;
     private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
@@ -85,26 +92,26 @@
 
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
 
+    private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
     private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
     private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
     private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .build()
-                .populate(1);
-        }
-    };
+    private static final int LEADER_CANDIDATE_POS = 0;
+
+    private static final Serializer SERIALIZER = Serializer.using(
+            new KryoNamespace.Builder().register(KryoNamespaces.API).build());
 
     @Activate
     public void activate() {
-        lockMap = storageService.<String, NodeId>consistentMapBuilder()
-                    .withName("onos-leader-locks")
-                    .withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
-                    .withPartitionsDisabled().build();
+        leaderMap = storageService.<String, NodeId>consistentMapBuilder()
+                .withName("onos-topic-leaders")
+                .withSerializer(SERIALIZER)
+                .withPartitionsDisabled().build();
+        candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
+                .withName("onos-topic-candidates")
+                .withSerializer(SERIALIZER)
+                .withPartitionsDisabled().build();
 
         localNodeId = clusterService.getLocalNode().id();
 
@@ -157,6 +164,19 @@
     }
 
     @Override
+    public Map<String, List<NodeId>> getCandidates() {
+        Map<String, List<NodeId>> candidates = Maps.newHashMap();
+        candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
+        return ImmutableMap.copyOf(candidates);
+    }
+
+    @Override
+    public List<NodeId> getCandidates(String path) {
+        Versioned<List<NodeId>> candidates = candidateMap.get(path);
+        return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+    }
+
+    @Override
     public NodeId getLeader(String path) {
         Leadership leadership = leaderBoard.get(path);
         return leadership != null ? leadership.leader() : null;
@@ -181,24 +201,62 @@
     @Override
     public void runForLeadership(String path) {
         log.debug("Running for leadership for topic: {}", path);
-        activeTopics.add(path);
-        tryLeaderLock(path);
+        try {
+            Versioned<List<NodeId>> candidates = candidateMap.get(path);
+            if (candidates != null) {
+                List<NodeId> candidateList = Lists.newArrayList(candidates.value());
+                if (!candidateList.contains(localNodeId)) {
+                    candidateList.add(localNodeId);
+                    if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+                        rerunForLeadership(path);
+                        return;
+                    }
+                }
+            } else {
+                if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+                    rerunForLeadership(path);
+                    return;
+                }
+            }
+            log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
+            activeTopics.add(path);
+            tryLeaderLock(path);
+        } catch (ConsistentMapException e) {
+            log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
+            rerunForLeadership(path);
+        }
     }
 
     @Override
     public void withdraw(String path) {
         activeTopics.remove(path);
+
         try {
-            Versioned<NodeId> leader = lockMap.get(path);
-            if (Objects.equals(leader.value(), localNodeId)) {
-                if (lockMap.remove(path, leader.version())) {
+            Versioned<NodeId> leader = leaderMap.get(path);
+            if (leader != null && Objects.equals(leader.value(), localNodeId)) {
+                if (leaderMap.remove(path, leader.version())) {
                     log.info("Gave up leadership for {}", path);
                     notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
                 }
             }
-            // else we are not the current owner.
+            // else we are not the current leader, can still be a candidate.
+            Versioned<List<NodeId>> candidates = candidateMap.get(path);
+            List<NodeId> candidateList = candidates != null
+                    ? Lists.newArrayList(candidates.value())
+                    : Lists.newArrayList();
+            if (!candidateList.remove(localNodeId)) {
+                return;
+            }
+            boolean success =  candidateList.isEmpty()
+                    ? candidateMap.remove(path, candidates.version())
+                    : candidateMap.replace(path, candidates.version(), candidateList);
+            if (!success) {
+                log.warn("Failed to withdraw from candidates list. Will retry");
+                retryWithdraw(path);
+            }
         } catch (Exception e) {
             log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
+            retryWithdraw(path);
         }
     }
 
@@ -216,39 +274,62 @@
         if (!activeTopics.contains(path)) {
             return;
         }
+
+        Versioned<List<NodeId>> candidates = candidateMap.get(path);
+        if (candidates != null) {
+            List<NodeId> activeNodes = candidates.value().stream()
+                              .filter(n -> clusterService.getState(n) == State.ACTIVE)
+                              .collect(Collectors.toList());
+            if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
+                leaderLockAttempt(path, candidates.value());
+            } else {
+                retryLock(path);
+            }
+        } else {
+            throw new IllegalStateException("should not be here");
+        }
+    }
+
+    private void leaderLockAttempt(String path, List<NodeId> candidates) {
         try {
-            Versioned<NodeId> currentLeader = lockMap.get(path);
+            Versioned<NodeId> currentLeader = leaderMap.get(path);
             if (currentLeader != null) {
                 if (localNodeId.equals(currentLeader.value())) {
                     log.info("Already has leadership for {}", path);
-                    notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
+                    // FIXME: candidates can get out of sync.
+                    notifyNewLeader(
+                            path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
                 } else {
                     // someone else has leadership. will retry after sometime.
-                    retry(path);
+                    retryLock(path);
                 }
             } else {
-                if (lockMap.putIfAbsent(path, localNodeId) == null) {
+                if (leaderMap.putIfAbsent(path, localNodeId) == null) {
                     log.info("Assumed leadership for {}", path);
                     // do a get again to get the version (epoch)
-                    Versioned<NodeId> newLeader = lockMap.get(path);
-                    notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
+                    Versioned<NodeId> newLeader = leaderMap.get(path);
+                    // FIXME: candidates can get out of sync
+                    notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
                 } else {
                     // someone beat us to it.
-                    retry(path);
+                    retryLock(path);
                 }
             }
         } catch (Exception e) {
             log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
-            retry(path);
+            retryLock(path);
         }
     }
 
-    private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
-        Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
+    private void notifyNewLeader(String path, NodeId leader,
+            List<NodeId> candidates, long epoch, long electedTime) {
+        Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
         boolean updatedLeader = false;
+        log.debug("candidates for new Leadership {}", candidates);
         synchronized (leaderBoard) {
             Leadership currentLeader = leaderBoard.get(path);
             if (currentLeader == null || currentLeader.epoch() < epoch) {
+                log.debug("updating leaderboard with new {}", newLeadership);
                 leaderBoard.put(path, newLeadership);
                 updatedLeader = true;
             }
@@ -256,17 +337,23 @@
 
         if (updatedLeader) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
-            eventDispatcher.post(event);
-            clusterCommunicator.broadcast(
-                    new ClusterMessage(
-                            clusterService.getLocalNode().id(),
-                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                            SERIALIZER.encode(event)));
+            notifyPeers(event);
         }
     }
 
+    private void notifyPeers(LeadershipEvent event) {
+        eventDispatcher.post(event);
+        clusterCommunicator.broadcast(
+                new ClusterMessage(
+                        clusterService.getLocalNode().id(),
+                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                        SERIALIZER.encode(event)));
+    }
+
     private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
-        Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
+        Versioned<List<NodeId>> candidates = candidateMap.get(path);
+        Leadership oldLeadership = new Leadership(
+                path, leader, candidates.value(), epoch, electedTime);
         boolean updatedLeader = false;
         synchronized (leaderBoard) {
             Leadership currentLeader = leaderBoard.get(path);
@@ -316,6 +403,11 @@
                         leaderBoard.remove(topic);
                         updateAccepted = true;
                     }
+                } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
+                        leaderBoard.replace(topic, leadershipUpdate);
+                        updateAccepted = true;
+                    }
                 } else {
                     throw new IllegalStateException("Unknown event type.");
                 }
@@ -326,44 +418,47 @@
         }
     }
 
-    private void retry(String path) {
+    private void rerunForLeadership(String path) {
+        retryLeaderLockExecutor.schedule(
+                () -> runForLeadership(path),
+                ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
+                TimeUnit.SECONDS);
+    }
+
+    private void retryLock(String path) {
         retryLeaderLockExecutor.schedule(
                 () -> tryLeaderLock(path),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);
     }
 
+    private void retryWithdraw(String path) {
+        retryLeaderLockExecutor.schedule(
+                () -> withdraw(path),
+                DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
+                TimeUnit.SECONDS);
+    }
+
     private void purgeStaleLocks() {
         try {
-            Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
-            entries.forEach(entry -> {
-                String path = entry.getKey();
-                NodeId nodeId = entry.getValue().value();
-                long epoch = entry.getValue().version();
-                long creationTime = entry.getValue().creationTime();
-                if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
-                    log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
+            leaderMap.entrySet()
+                .stream()
+                .filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
+                .filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
+                .forEach(entry -> {
+                    String path = entry.getKey();
+                    NodeId nodeId = entry.getValue().value();
+                    long epoch = entry.getValue().version();
+                    long creationTime = entry.getValue().creationTime();
                     try {
-                        if (lockMap.remove(path, epoch)) {
+                        if (leaderMap.remove(path, epoch)) {
                             log.info("Purged stale lock held by {} for {}", nodeId, path);
                             notifyRemovedLeader(path, nodeId, epoch, creationTime);
                         }
                     } catch (Exception e) {
                         log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
                     }
-                }
-                if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
-                    log.debug("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
-                    try {
-                        if (lockMap.remove(path, epoch)) {
-                            log.info("Purged stale lock held by {} for {}", nodeId, path);
-                            notifyRemovedLeader(path, nodeId, epoch, creationTime);
-                        }
-                    } catch (Exception e) {
-                        log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
-                    }
-                }
-            });
+                });
         } catch (Exception e) {
             log.debug("Failed cleaning up stale locks", e);
         }
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 35654f2..0cf0625 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -17,6 +17,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -108,4 +109,14 @@
     public void removeListener(LeadershipEventListener listener) {
         listeners.remove(listener);
     }
+
+    @Override
+    public Map<String, List<NodeId>> getCandidates() {
+        return null;
+    }
+
+    @Override
+    public List<NodeId> getCandidates(String path) {
+        return null;
+    }
 }