ONOS-2068: Refresh Leadership periodically from global map.

Change-Id: I50cff6546d79a275f4c026a4f3b2efe5d2eefd58
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 85834a7..763deca 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -4,6 +4,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -89,7 +90,7 @@
     private ScheduledExecutorService electionRunner;
     private ScheduledExecutorService lockExecutor;
     private ScheduledExecutorService staleLeadershipPurgeExecutor;
-    private ScheduledExecutorService leadershipStatusBroadcaster;
+    private ScheduledExecutorService leadershipRefresher;
 
     private ConsistentMap<String, NodeId> leaderMap;
     private ConsistentMap<String, List<NodeId>> candidateMap;
@@ -106,7 +107,7 @@
     // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
     private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
     private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
-    private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
+    private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
     private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
 
     private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
@@ -135,8 +136,8 @@
                 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
         staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
-        leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/store/leadership", "peer-updater"));
+        leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "refresh-thread"));
         clusterCommunicator.addSubscriber(
                 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
                 SERIALIZER::decode,
@@ -148,8 +149,8 @@
         electionRunner.scheduleWithFixedDelay(
                 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
 
-        leadershipStatusBroadcaster.scheduleWithFixedDelay(
-                this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
+        leadershipRefresher.scheduleWithFixedDelay(
+                this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
 
         listenerRegistry = new ListenerRegistry<>();
         eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
@@ -173,7 +174,7 @@
         messageHandlingExecutor.shutdown();
         lockExecutor.shutdown();
         staleLeadershipPurgeExecutor.shutdown();
-        leadershipStatusBroadcaster.shutdown();
+        leadershipRefresher.shutdown();
 
         log.info("Stopped");
     }
@@ -458,6 +459,7 @@
             leaderBoard.compute(topic, (k, currentLeadership) -> {
                 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
                     updateAccepted.set(true);
+                    // FIXME: Removing entries from leaderboard is not safe and should be visited.
                     return null;
                 }
                 return currentLeadership;
@@ -579,18 +581,47 @@
         }
     }
 
-    private void sendLeadershipStatus() {
+    private void refreshLeaderBoard() {
         try {
-            leaderBoard.forEach((path, leadership) -> {
-                if (leadership.leader().equals(localNodeId)) {
-                    LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
-                    clusterCommunicator.broadcast(event,
-                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                            SERIALIZER::encode);
+            Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
+            leaderMap.entrySet().forEach(entry -> {
+                String path = entry.getKey();
+                Versioned<NodeId> leader = entry.getValue();
+                Leadership leadership = new Leadership(path,
+                                                       leader.value(),
+                                                       leader.version(),
+                                                       leader.creationTime());
+                newLeaderBoard.put(path, leadership);
+            });
+
+            // first take snapshot of current leader board.
+            Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
+
+            MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
+
+            // evict stale leaders
+            diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
+                log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
+                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
+            });
+
+            // add missing leaders
+            diff.entriesOnlyOnRight().forEach((path, leadership) -> {
+                log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
+                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
+            });
+
+            // add updated leaders
+            diff.entriesDiffering().forEach((path, difference) -> {
+                Leadership current = difference.leftValue();
+                Leadership updated = difference.rightValue();
+                if (current.epoch() < updated.epoch()) {
+                    log.debug("Updated {} in leaderboard.", updated);
+                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
                 }
             });
         } catch (Exception e) {
-            log.debug("Failed to send leadership updates", e);
+            log.debug("Failed to refresh leader board", e);
         }
     }