ONOS-2026: Address polling issue in LeaderElection

Change-Id: Ib5c94d932de6b2c3419b07a97d6fe91d5c588538
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 2959880..df8dd2e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -84,7 +84,8 @@
 
     private final Logger log = getLogger(getClass());
     private ExecutorService messageHandlingExecutor;
-    private ScheduledExecutorService retryLeaderLockExecutor;
+    private ScheduledExecutorService electionRunner;
+    private ScheduledExecutorService lockExecutor;
     private ScheduledExecutorService staleLeadershipPurgeExecutor;
     private ScheduledExecutorService leadershipStatusBroadcaster;
 
@@ -98,6 +99,7 @@
 
     private NodeId localNodeId;
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
+    private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
 
     private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
     private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
@@ -125,7 +127,9 @@
 
         messageHandlingExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/store/leadership", "message-handler"));
-        retryLeaderLockExecutor = Executors.newScheduledThreadPool(
+        electionRunner = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "election-runner"));
+        lockExecutor = Executors.newScheduledThreadPool(
                 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
         staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
@@ -139,6 +143,9 @@
 
         clusterService.addListener(clusterEventListener);
 
+        electionRunner.scheduleWithFixedDelay(
+                this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
+
         leadershipStatusBroadcaster.scheduleWithFixedDelay(
                 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
 
@@ -160,8 +167,9 @@
         eventDispatcher.removeSink(LeadershipEvent.class);
         clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
 
+        electionRunner.shutdown();
         messageHandlingExecutor.shutdown();
-        retryLeaderLockExecutor.shutdown();
+        lockExecutor.shutdown();
         staleLeadershipPurgeExecutor.shutdown();
         leadershipStatusBroadcaster.shutdown();
 
@@ -236,7 +244,12 @@
                             candidates.creationTime())));
             log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
             activeTopics.add(path);
-            tryLeaderLock(path, future);
+            Leadership leadership = electLeader(path, candidates.value());
+            if (leadership == null) {
+                pendingFutures.put(path, future);
+            } else {
+                future.complete(leadership);
+            }
         } catch (ConsistentMapException e) {
             log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
             rerunForLeadership(path, future);
@@ -315,7 +328,6 @@
                                 localNodeId,
                                 leader.version(),
                                 leader.creationTime())));
-                    retryLock(path, new CompletableFuture<>());
                     return true;
                 }
             }
@@ -355,50 +367,55 @@
         return true;
     }
 
-    private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
-        if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
-            return;
-        }
-        try {
-            Versioned<List<NodeId>> candidates = candidateMap.get(path);
-            if (candidates != null) {
-                List<NodeId> activeNodes = candidates.value()
-                                  .stream()
-                                  .filter(n -> clusterService.getState(n) == ACTIVE)
-                                  .collect(Collectors.toList());
-                if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
-                    leaderLockAttempt(path, candidates.value(), future);
-                } else {
-                    retryLock(path, future);
+    private Leadership electLeader(String path, List<NodeId> candidates) {
+        Leadership currentLeadership = getLeadership(path);
+        if (currentLeadership != null) {
+            return currentLeadership;
+        } else {
+            NodeId topCandidate = candidates
+                        .stream()
+                        .filter(n -> clusterService.getState(n) == ACTIVE)
+                        .findFirst()
+                        .orElse(null);
+            try {
+                Versioned<NodeId> leader = localNodeId.equals(topCandidate)
+                        ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
+                if (leader != null) {
+                    Leadership newLeadership = new Leadership(path,
+                            leader.value(),
+                            leader.version(),
+                            leader.creationTime());
+                    publish(new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_ELECTED,
+                            newLeadership));
+                    return newLeadership;
                 }
-            } else {
-                throw new IllegalStateException("should not be here");
+            } catch (Exception e) {
+                log.debug("Failed to elect leader for {}", path, e);
             }
-        } catch (Exception e) {
-            log.debug("Failed to fetch candidate information for {}", path, e);
-            retryLock(path, future);
         }
+        return null;
     }
 
-    private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
+    private void electLeaders() {
         try {
-            Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId);
-            if (Objects.equals(leader.value(), localNodeId)) {
-                log.debug("Assumed leadership for {}", path);
-                Leadership leadership = new Leadership(path,
-                        leader.value(),
-                        leader.version(),
-                        leader.creationTime());
-                future.complete(leadership);
-                publish(new LeadershipEvent(
-                        LeadershipEvent.Type.LEADER_ELECTED,
-                        leadership));
-            } else {
-                retryLock(path, future);
-            }
+            candidateMap.entrySet().forEach(entry -> {
+                String path = entry.getKey();
+                List<NodeId> candidates = entry.getValue().value();
+                if (activeTopics.contains(path)) {
+                    lockExecutor.submit(() -> {
+                        Leadership leadership = electLeader(path, candidates);
+                        if (leadership != null) {
+                            CompletableFuture<Leadership> future = pendingFutures.remove(path);
+                            if (future != null) {
+                                future.complete(leadership);
+                            }
+                        }
+                    });
+                }
+            });
         } catch (Exception e) {
-            log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
-            retryLock(path, future);
+            log.debug("Failure electing leaders", e);
         }
     }
 
@@ -451,21 +468,14 @@
     }
 
     private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
-        retryLeaderLockExecutor.schedule(
+        lockExecutor.schedule(
                 () -> doRunForLeadership(path, future),
                 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
                 TimeUnit.SECONDS);
     }
 
-    private void retryLock(String path, CompletableFuture<Leadership> future) {
-        retryLeaderLockExecutor.schedule(
-                () -> tryLeaderLock(path, future),
-                DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
-                TimeUnit.SECONDS);
-    }
-
     private void retryWithdraw(String path, CompletableFuture<Void> future) {
-        retryLeaderLockExecutor.schedule(
+        lockExecutor.schedule(
                 () -> doWithdraw(path, future),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);