Create local storage for topic candidates mapping. This also includes:

 - using Optional in Leadership, and some commenting.
 - using MutableBooleans + compute()

    part of: Device Mastership store on top of LeadershipService
    Reference: ONOS-76

Conflicts:
	core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
	core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java

Change-Id: I7f090abb123cf23bb5126a935a6e72be00f3e3ce
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 10bf6a4..1d918c8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -576,7 +576,7 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
+    public Map<String, Leadership> getCandidates() {
         return null;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index cf3700b..303129d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,6 +12,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
@@ -88,6 +89,7 @@
     private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
         listenerRegistry;
     private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+    private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
     private NodeId localNodeId;
 
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
@@ -164,16 +166,14 @@
     }
 
     @Override
-    public Map<String, List<NodeId>> getCandidates() {
-        Map<String, List<NodeId>> candidates = Maps.newHashMap();
-        candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
-        return ImmutableMap.copyOf(candidates);
+    public Map<String, Leadership> getCandidates() {
+        return ImmutableMap.copyOf(candidateBoard);
     }
 
     @Override
     public List<NodeId> getCandidates(String path) {
-        Versioned<List<NodeId>> candidates = candidateMap.get(path);
-        return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+        Leadership current = candidateBoard.get(path);
+        return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
     }
 
     @Override
@@ -207,13 +207,21 @@
                 List<NodeId> candidateList = Lists.newArrayList(candidates.value());
                 if (!candidateList.contains(localNodeId)) {
                     candidateList.add(localNodeId);
-                    if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+                    if (candidateMap.replace(path, candidates.version(), candidateList)) {
+                        Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                        notifyCandidateAdded(
+                                path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                    } else {
                         rerunForLeadership(path);
                         return;
                     }
                 }
             } else {
-                if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+                List<NodeId> candidateList = ImmutableList.of(localNodeId);
+                if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
+                    Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                    notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+                } else {
                     rerunForLeadership(path);
                     return;
                 }
@@ -247,10 +255,19 @@
             if (!candidateList.remove(localNodeId)) {
                 return;
             }
-            boolean success =  candidateList.isEmpty()
-                    ? candidateMap.remove(path, candidates.version())
-                    : candidateMap.replace(path, candidates.version(), candidateList);
-            if (!success) {
+            boolean success = false;
+            if (candidateList.isEmpty()) {
+                if (candidateMap.remove(path, candidates.version())) {
+                    success = true;
+                }
+            } else {
+                if (candidateMap.replace(path, candidates.version(), candidateList)) {
+                    success = true;
+                }
+            }
+            if (success) {
+                notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
+            } else {
                 log.warn("Failed to withdraw from candidates list. Will retry");
                 retryWithdraw(path);
             }
@@ -321,21 +338,63 @@
         }
     }
 
+    private void notifyCandidateAdded(
+            String path, List<NodeId> candidates, long epoch, long electedTime) {
+        Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+        final MutableBoolean updated = new MutableBoolean(false);
+        candidateBoard.compute(path, (k, current) -> {
+            if (current == null || current.epoch() < newInfo.epoch()) {
+                log.info("updating candidateboard with {}", newInfo);
+                updated.setTrue();
+                return newInfo;
+            }
+            return current;
+        });
+        // maybe rethink types of candidates events
+        if (updated.booleanValue()) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+            notifyPeers(event);
+        }
+    }
+
+    private void notifyCandidateRemoved(
+            String path, List<NodeId> candidates, long epoch, long electedTime) {
+        Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+        final MutableBoolean updated = new MutableBoolean(false);
+        candidateBoard.compute(path, (k, current) -> {
+            if (current != null && current.epoch() == newInfo.epoch()) {
+                log.info("updating candidateboard with {}", newInfo);
+                updated.setTrue();
+                if (candidates.isEmpty()) {
+                    return null;
+                } else {
+                    return newInfo;
+                }
+            }
+            return current;
+        });
+        // maybe rethink types of candidates events
+        if (updated.booleanValue()) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+            notifyPeers(event);
+        }
+    }
+
     private void notifyNewLeader(String path, NodeId leader,
             List<NodeId> candidates, long epoch, long electedTime) {
         Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
-        boolean updatedLeader = false;
+        final MutableBoolean updatedLeader = new MutableBoolean(false);
         log.debug("candidates for new Leadership {}", candidates);
-        synchronized (leaderBoard) {
-            Leadership currentLeader = leaderBoard.get(path);
+        leaderBoard.compute(path, (k, currentLeader) -> {
             if (currentLeader == null || currentLeader.epoch() < epoch) {
                 log.debug("updating leaderboard with new {}", newLeadership);
-                leaderBoard.put(path, newLeadership);
-                updatedLeader = true;
+                updatedLeader.setTrue();
+                return newLeadership;
             }
-        }
+            return currentLeader;
+        });
 
-        if (updatedLeader) {
+        if (updatedLeader.booleanValue()) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
             notifyPeers(event);
         }
@@ -352,21 +411,18 @@
         Versioned<List<NodeId>> candidates = candidateMap.get(path);
         Leadership oldLeadership = new Leadership(
                 path, leader, candidates.value(), epoch, electedTime);
-        boolean updatedLeader = false;
-        synchronized (leaderBoard) {
-            Leadership currentLeader = leaderBoard.get(path);
+        final MutableBoolean updatedLeader = new MutableBoolean(false);
+        leaderBoard.compute(path, (k, currentLeader) -> {
             if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
-                leaderBoard.remove(path);
-                updatedLeader = true;
+                updatedLeader.setTrue();
+                return null;
             }
-        }
+            return currentLeader;
+        });
 
-        if (updatedLeader) {
+        if (updatedLeader.booleanValue()) {
             LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
-            eventDispatcher.post(event);
-            clusterCommunicator.broadcast(event,
-                    LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                    SERIALIZER::encode);
+            notifyPeers(event);
         }
     }
 
@@ -385,31 +441,37 @@
             LeadershipEvent.Type eventType = leadershipEvent.type();
             String topic = leadershipUpdate.topic();
 
-            boolean updateAccepted = false;
-
-            synchronized (leaderBoard) {
-                Leadership currentLeadership = leaderBoard.get(topic);
-                if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+            MutableBoolean updateAccepted = new MutableBoolean(false);
+            if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+                leaderBoard.compute(topic, (k, currentLeadership) -> {
                     if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
-                        leaderBoard.put(topic, leadershipUpdate);
-                        updateAccepted = true;
+                        updateAccepted.setTrue();
+                        return leadershipUpdate;
                     }
-                } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
-                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
-                        leaderBoard.remove(topic);
-                        updateAccepted = true;
+                    return currentLeadership;
+                });
+            } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+                leaderBoard.compute(topic, (k, currentLeadership) -> {
+                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                        updateAccepted.setTrue();
+                        return null;
                     }
-                } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
-                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
-                        leaderBoard.replace(topic, leadershipUpdate);
-                        updateAccepted = true;
+                    return currentLeadership;
+                });
+            } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+                candidateBoard.compute(topic, (k, currentInfo) -> {
+                    if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
+                        updateAccepted.setTrue();
+                        return leadershipUpdate;
                     }
-                } else {
-                    throw new IllegalStateException("Unknown event type.");
-                }
-                if (updateAccepted) {
-                    eventDispatcher.post(leadershipEvent);
-                }
+                    return currentInfo;
+                });
+            } else {
+                throw new IllegalStateException("Unknown event type.");
+            }
+
+            if (updateAccepted.booleanValue()) {
+                eventDispatcher.post(leadershipEvent);
             }
         }
     }
@@ -470,6 +532,12 @@
                             SERIALIZER::encode);
                 }
             });
+            candidateBoard.forEach((path, leadership) -> {
+                LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
+                clusterCommunicator.broadcast(event,
+                        LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                        SERIALIZER::encode);
+            });
         } catch (Exception e) {
             log.debug("Failed to send leadership updates", e);
         }