ONOS-2026: Address polling issue in LeaderElection
Change-Id: Ib5c94d932de6b2c3419b07a97d6fe91d5c588538
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 2959880..df8dd2e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -84,7 +84,8 @@
private final Logger log = getLogger(getClass());
private ExecutorService messageHandlingExecutor;
- private ScheduledExecutorService retryLeaderLockExecutor;
+ private ScheduledExecutorService electionRunner;
+ private ScheduledExecutorService lockExecutor;
private ScheduledExecutorService staleLeadershipPurgeExecutor;
private ScheduledExecutorService leadershipStatusBroadcaster;
@@ -98,6 +99,7 @@
private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
+ private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
@@ -125,7 +127,9 @@
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/leadership", "message-handler"));
- retryLeaderLockExecutor = Executors.newScheduledThreadPool(
+ electionRunner = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/leadership", "election-runner"));
+ lockExecutor = Executors.newScheduledThreadPool(
4, groupedThreads("onos/store/leadership", "election-thread-%d"));
staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
@@ -139,6 +143,9 @@
clusterService.addListener(clusterEventListener);
+ electionRunner.scheduleWithFixedDelay(
+ this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
+
leadershipStatusBroadcaster.scheduleWithFixedDelay(
this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
@@ -160,8 +167,9 @@
eventDispatcher.removeSink(LeadershipEvent.class);
clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
+ electionRunner.shutdown();
messageHandlingExecutor.shutdown();
- retryLeaderLockExecutor.shutdown();
+ lockExecutor.shutdown();
staleLeadershipPurgeExecutor.shutdown();
leadershipStatusBroadcaster.shutdown();
@@ -236,7 +244,12 @@
candidates.creationTime())));
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
- tryLeaderLock(path, future);
+ Leadership leadership = electLeader(path, candidates.value());
+ if (leadership == null) {
+ pendingFutures.put(path, future);
+ } else {
+ future.complete(leadership);
+ }
} catch (ConsistentMapException e) {
log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
rerunForLeadership(path, future);
@@ -315,7 +328,6 @@
localNodeId,
leader.version(),
leader.creationTime())));
- retryLock(path, new CompletableFuture<>());
return true;
}
}
@@ -355,50 +367,55 @@
return true;
}
- private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
- if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
- return;
- }
- try {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- if (candidates != null) {
- List<NodeId> activeNodes = candidates.value()
- .stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .collect(Collectors.toList());
- if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
- leaderLockAttempt(path, candidates.value(), future);
- } else {
- retryLock(path, future);
+ private Leadership electLeader(String path, List<NodeId> candidates) {
+ Leadership currentLeadership = getLeadership(path);
+ if (currentLeadership != null) {
+ return currentLeadership;
+ } else {
+ NodeId topCandidate = candidates
+ .stream()
+ .filter(n -> clusterService.getState(n) == ACTIVE)
+ .findFirst()
+ .orElse(null);
+ try {
+ Versioned<NodeId> leader = localNodeId.equals(topCandidate)
+ ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
+ if (leader != null) {
+ Leadership newLeadership = new Leadership(path,
+ leader.value(),
+ leader.version(),
+ leader.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ newLeadership));
+ return newLeadership;
}
- } else {
- throw new IllegalStateException("should not be here");
+ } catch (Exception e) {
+ log.debug("Failed to elect leader for {}", path, e);
}
- } catch (Exception e) {
- log.debug("Failed to fetch candidate information for {}", path, e);
- retryLock(path, future);
}
+ return null;
}
- private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
+ private void electLeaders() {
try {
- Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId);
- if (Objects.equals(leader.value(), localNodeId)) {
- log.debug("Assumed leadership for {}", path);
- Leadership leadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- future.complete(leadership);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- leadership));
- } else {
- retryLock(path, future);
- }
+ candidateMap.entrySet().forEach(entry -> {
+ String path = entry.getKey();
+ List<NodeId> candidates = entry.getValue().value();
+ if (activeTopics.contains(path)) {
+ lockExecutor.submit(() -> {
+ Leadership leadership = electLeader(path, candidates);
+ if (leadership != null) {
+ CompletableFuture<Leadership> future = pendingFutures.remove(path);
+ if (future != null) {
+ future.complete(leadership);
+ }
+ }
+ });
+ }
+ });
} catch (Exception e) {
- log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
- retryLock(path, future);
+ log.debug("Failure electing leaders", e);
}
}
@@ -451,21 +468,14 @@
}
private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
- retryLeaderLockExecutor.schedule(
+ lockExecutor.schedule(
() -> doRunForLeadership(path, future),
ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
TimeUnit.SECONDS);
}
- private void retryLock(String path, CompletableFuture<Leadership> future) {
- retryLeaderLockExecutor.schedule(
- () -> tryLeaderLock(path, future),
- DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
- TimeUnit.SECONDS);
- }
-
private void retryWithdraw(String path, CompletableFuture<Void> future) {
- retryLeaderLockExecutor.schedule(
+ lockExecutor.schedule(
() -> doWithdraw(path, future),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);