Refresh candidate board from source on each election round + Disbale east-west synchronization of candidate board
Change-Id: Ie796e0ff0bdd2da834f70f24e98725a309e97787
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index fa9d7cb..fd8afb7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -106,7 +106,6 @@
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
- private static final int LEADER_CANDIDATE_POS = 0;
private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
@@ -303,7 +302,7 @@
newCandidates.version(),
newCandidates.creationTime())));
} else {
- log.warn("Failed to withdraw from candidates list. Will retry");
+ log.warn("Failed to withdraw from candidates list for {}. Will retry", path);
retryWithdraw(path, future);
}
} catch (Exception e) {
@@ -403,10 +402,11 @@
try {
candidateMap.entrySet().forEach(entry -> {
String path = entry.getKey();
- List<NodeId> candidates = entry.getValue().value();
+ Versioned<List<NodeId>> candidates = entry.getValue();
+ // for active topics, check if this node can become a leader (if it isn't already)
if (activeTopics.contains(path)) {
lockExecutor.submit(() -> {
- Leadership leadership = electLeader(path, candidates);
+ Leadership leadership = electLeader(path, candidates.value());
if (leadership != null) {
CompletableFuture<Leadership> future = pendingFutures.remove(path);
if (future != null) {
@@ -415,6 +415,14 @@
}
});
}
+ // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
+ // and also to update local listeners.
+ // Don't worry about duplicate events as they will be suppressed.
+ onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ candidates.value(),
+ candidates.version(),
+ candidates.creationTime())));
});
} catch (Exception e) {
log.debug("Failure electing leaders", e);
@@ -579,12 +587,6 @@
SERIALIZER::encode);
}
});
- candidateBoard.forEach((path, leadership) -> {
- LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
- clusterCommunicator.broadcast(event,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
- });
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}