Create local storage for topic candidates mapping. This also includes:

 - using Optional in Leadership, and some commenting.
 - using MutableBooleans + compute()

    part of: Device Mastership store on top of LeadershipService
    Reference: ONOS-76

Conflicts:
	core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
	core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java

Change-Id: I7f090abb123cf23bb5126a935a6e72be00f3e3ce
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index 07b5b2b..bd5ba3a 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -85,7 +85,7 @@
     }
 
     private void displayCandidates(Map<String, Leadership> leaderBoard,
-            Map<String, List<NodeId>> candidates) {
+            Map<String, Leadership> candidates) {
         print("--------------------------------------------------------------");
         print(FMT_C, "Topic", "Leader", "Candidates");
         print("--------------------------------------------------------------");
@@ -94,13 +94,13 @@
                 .stream()
                 .sorted(leadershipComparator)
                 .forEach(l -> {
-                        List<NodeId> list = candidates.get(l.topic());
+                        List<NodeId> list = candidates.get(l.topic()).candidates();
                         print(FMT_C,
                             l.topic(),
                             l.leader(),
-                            list.remove(0).toString());
+                            list.get(0).toString());
                             // formatting hacks to get it into a table
-                            list.forEach(n -> print(FMT_C, " ", " ", n));
+                            list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
                             print(FMT_C, " ", " ", " ");
                         });
         print("--------------------------------------------------------------");
@@ -139,7 +139,7 @@
             print("%s", json(leaderBoard));
         } else {
             if (showCandidates) {
-                Map<String, List<NodeId>> candidates = leaderService.getCandidates();
+                Map<String, Leadership> candidates = leaderService.getCandidates();
                 displayCandidates(leaderBoard, candidates);
             } else {
                 displayLeaders(leaderBoard);
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
index a0c5e70..175de2d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
@@ -17,6 +17,7 @@
 
 import java.util.Objects;
 import java.util.List;
+import java.util.Optional;
 
 import org.joda.time.DateTime;
 
@@ -33,19 +34,21 @@
  * rest in decreasing preference order.</li>
  * <li>The epoch is the logical age of a Leadership construct, and should be
  * used for comparing two Leaderships, but only of the same topic.</li>
+ * <li>The leader may be null if its accuracy can't be guaranteed. This applies
+ * to CANDIDATES_CHANGED events and candidate board contents.</li>
  * </ul>
  */
 public class Leadership {
 
     private final String topic;
-    private final NodeId leader;
+    private final Optional<NodeId> leader;
     private final List<NodeId> candidates;
     private final long epoch;
     private final long electedTime;
 
     public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
         this.topic = topic;
-        this.leader = leader;
+        this.leader = Optional.of(leader);
         this.candidates = ImmutableList.of(leader);
         this.epoch = epoch;
         this.electedTime = electedTime;
@@ -54,7 +57,16 @@
     public Leadership(String topic, NodeId leader, List<NodeId> candidates,
             long epoch, long electedTime) {
         this.topic = topic;
-        this.leader = leader;
+        this.leader = Optional.of(leader);
+        this.candidates = ImmutableList.copyOf(candidates);
+        this.epoch = epoch;
+        this.electedTime = electedTime;
+    }
+
+    public Leadership(String topic, List<NodeId> candidates,
+            long epoch, long electedTime) {
+        this.topic = topic;
+        this.leader = Optional.empty();
         this.candidates = ImmutableList.copyOf(candidates);
         this.epoch = epoch;
         this.electedTime = electedTime;
@@ -74,8 +86,9 @@
      *
      * @return leader node.
      */
+    // This will return Optional<NodeId> in the future.
     public NodeId leader() {
-        return leader;
+        return leader.orElse(null);
     }
 
     /**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index c4f59be..3456e22 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -43,14 +43,14 @@
         LEADER_REELECTED,
 
         /**
-         * Signifies that the leader has been booted and lost leadership. The event subject is the
-         * former leader.
+         * Signifies that the leader has been booted and lost leadership. The
+         * event subject is the former leader.
          */
         LEADER_BOOTED,
 
         /**
          * Signifies that the list of candidates for leadership for a topic has
-         * changed.
+         * changed. This event does not guarantee accurate leader information.
          */
         CANDIDATES_CHANGED
     }
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 1acdc61..bc490d9 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -76,9 +76,9 @@
     /**
      * Returns the candidates for all known topics.
      *
-     * @return A map of topics to lists of NodeIds.
+     * @return A mapping from topics to up-to-date candidate info.
      */
-    Map<String, List<NodeId>> getCandidates();
+    Map<String, Leadership> getCandidates();
 
     /**
      * Returns the candidates for a given topic.
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index 02742d4..3c503bb 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -65,7 +65,7 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
+    public Map<String, Leadership> getCandidates() {
         return null;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 10bf6a4..1d918c8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -576,7 +576,7 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
+    public Map<String, Leadership> getCandidates() {
         return null;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index cf3700b..303129d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,6 +12,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
@@ -88,6 +89,7 @@
     private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
         listenerRegistry;
     private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+    private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
     private NodeId localNodeId;
 
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
@@ -164,16 +166,14 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
-        Map<String, List<NodeId>> candidates = Maps.newHashMap();
-        candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
-        return ImmutableMap.copyOf(candidates);
+    public Map<String, Leadership> getCandidates() {
+        return ImmutableMap.copyOf(candidateBoard);
     }
 
     @Override
     public List<NodeId> getCandidates(String path) {
-        Versioned<List<NodeId>> candidates = candidateMap.get(path);
-        return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+        Leadership current = candidateBoard.get(path);
+        return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
     }
 
     @Override
@@ -207,13 +207,21 @@
                 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
                 if (!candidateList.contains(localNodeId)) {
                     candidateList.add(localNodeId);
-                    if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+                    if (candidateMap.replace(path, candidates.version(), candidateList)) {
+                        Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                        notifyCandidateAdded(
+                                path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                    } else {
                         rerunForLeadership(path);
                         return;
                     }
                 }
             } else {
-                if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+                List<NodeId> candidateList = ImmutableList.of(localNodeId);
+                if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
+                    Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                    notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                } else {
                     rerunForLeadership(path);
                     return;
                 }
@@ -247,10 +255,19 @@
             if (!candidateList.remove(localNodeId)) {
                 return;
             }
-            boolean success =  candidateList.isEmpty()
-                    ? candidateMap.remove(path, candidates.version())
-                    : candidateMap.replace(path, candidates.version(), candidateList);
-            if (!success) {
+            boolean success = false;
+            if (candidateList.isEmpty()) {
+                if (candidateMap.remove(path, candidates.version())) {
+                    success = true;
+                }
+            } else {
+                if (candidateMap.replace(path, candidates.version(), candidateList)) {
+                    success = true;
+                }
+            }
+            if (success) {
+                notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
+            } else {
                 log.warn("Failed to withdraw from candidates list. Will retry");
                 retryWithdraw(path);
             }
@@ -321,21 +338,63 @@
         }
     }
 
+    private void notifyCandidateAdded(
+            String path, List<NodeId> candidates, long epoch, long electedTime) {
+        Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+        final MutableBoolean updated = new MutableBoolean(false);
+        candidateBoard.compute(path, (k, current) -> {
+            if (current == null || current.epoch() < newInfo.epoch()) {
+                log.info("updating candidateboard with {}", newInfo);
+                updated.setTrue();
+                return newInfo;
+            }
+            return current;
+        });
+        // maybe rethink types of candidates events
+        if (updated.booleanValue()) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+            notifyPeers(event);
+        }
+    }
+
+    private void notifyCandidateRemoved(
+            String path, List<NodeId> candidates, long epoch, long electedTime) {
+        Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+        final MutableBoolean updated = new MutableBoolean(false);
+        candidateBoard.compute(path, (k, current) -> {
+            if (current != null && current.epoch() == newInfo.epoch()) {
+                log.info("updating candidateboard with {}", newInfo);
+                updated.setTrue();
+                if (candidates.isEmpty()) {
+                    return null;
+                } else {
+                    return newInfo;
+                }
+            }
+            return current;
+        });
+        // maybe rethink types of candidates events
+        if (updated.booleanValue()) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+            notifyPeers(event);
+        }
+    }
+
     private void notifyNewLeader(String path, NodeId leader,
             List<NodeId> candidates, long epoch, long electedTime) {
         Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
-        boolean updatedLeader = false;
+        final MutableBoolean updatedLeader = new MutableBoolean(false);
         log.debug("candidates for new Leadership {}", candidates);
-        synchronized (leaderBoard) {
-            Leadership currentLeader = leaderBoard.get(path);
+        leaderBoard.compute(path, (k, currentLeader) -> {
             if (currentLeader == null || currentLeader.epoch() < epoch) {
                 log.debug("updating leaderboard with new {}", newLeadership);
-                leaderBoard.put(path, newLeadership);
-                updatedLeader = true;
+                updatedLeader.setTrue();
+                return newLeadership;
             }
-        }
+            return currentLeader;
+        });
 
-        if (updatedLeader) {
+        if (updatedLeader.booleanValue()) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
             notifyPeers(event);
         }
@@ -352,21 +411,18 @@
         Versioned<List<NodeId>> candidates = candidateMap.get(path);
         Leadership oldLeadership = new Leadership(
                 path, leader, candidates.value(), epoch, electedTime);
-        boolean updatedLeader = false;
-        synchronized (leaderBoard) {
-            Leadership currentLeader = leaderBoard.get(path);
+        final MutableBoolean updatedLeader = new MutableBoolean(false);
+        leaderBoard.compute(path, (k, currentLeader) -> {
             if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
-                leaderBoard.remove(path);
-                updatedLeader = true;
+                updatedLeader.setTrue();
+                return null;
             }
-        }
+            return currentLeader;
+        });
 
-        if (updatedLeader) {
+        if (updatedLeader.booleanValue()) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
-            eventDispatcher.post(event);
-            clusterCommunicator.broadcast(event,
-                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                    SERIALIZER::encode);
+            notifyPeers(event);
         }
     }
 
@@ -385,31 +441,37 @@
             LeadershipEvent.Type eventType = leadershipEvent.type();
             String topic = leadershipUpdate.topic();
 
-            boolean updateAccepted = false;
-
-            synchronized (leaderBoard) {
-                Leadership currentLeadership = leaderBoard.get(topic);
-                if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+            MutableBoolean updateAccepted = new MutableBoolean(false);
+            if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+                leaderBoard.compute(topic, (k, currentLeadership) -> {
                     if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
-                        leaderBoard.put(topic, leadershipUpdate);
-                        updateAccepted = true;
+                        updateAccepted.setTrue();
+                        return leadershipUpdate;
                     }
-                } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
-                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
-                        leaderBoard.remove(topic);
-                        updateAccepted = true;
+                    return currentLeadership;
+                });
+            } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+                leaderBoard.compute(topic, (k, currentLeadership) -> {
+                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                        updateAccepted.setTrue();
+                        return null;
                     }
-                } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
-                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
-                        leaderBoard.replace(topic, leadershipUpdate);
-                        updateAccepted = true;
+                    return currentLeadership;
+                });
+            } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+                candidateBoard.compute(topic, (k, currentInfo) -> {
+                    if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
+                        updateAccepted.setTrue();
+                        return leadershipUpdate;
                     }
-                } else {
-                    throw new IllegalStateException("Unknown event type.");
-                }
-                if (updateAccepted) {
-                    eventDispatcher.post(leadershipEvent);
-                }
+                    return currentInfo;
+                });
+            } else {
+                throw new IllegalStateException("Unknown event type.");
+            }
+
+            if (updateAccepted.booleanValue()) {
+                eventDispatcher.post(leadershipEvent);
             }
         }
     }
@@ -470,6 +532,12 @@
                             SERIALIZER::encode);
                 }
             });
+            candidateBoard.forEach((path, leadership) -> {
+                LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
+                clusterCommunicator.broadcast(event,
+                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                        SERIALIZER::encode);
+            });
         } catch (Exception e) {
             log.debug("Failed to send leadership updates", e);
         }
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 0cf0625..97e9f24 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -111,7 +111,7 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
+    public Map<String, Leadership> getCandidates() {
         return null;
     }