Make leadership election more robust to failures.
- Catch exceptions thrown by lock extension calls.
- Dealing with potential race conditions between joining and withdrawing from a race.
Change-Id: I429045b33f5972c459d5ed031fe8593438813e8d
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index b4091b2..49fdb8d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -1,7 +1,6 @@
package org.onlab.onos.store.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -75,7 +74,7 @@
private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
- private final Map<String, Lock> openContests = Maps.newHashMap();
+ private final Map<String, Lock> openContests = Maps.newConcurrentMap();
private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private ControllerNode localNode;
@@ -133,19 +132,21 @@
@Override
public void runForLeadership(String path) {
checkArgument(path != null);
+
if (openContests.containsKey(path)) {
log.info("Already in the leadership contest for {}", path);
return;
} else {
Lock lock = lockService.create(path);
openContests.put(path, lock);
- tryAcquireLeadership(path);
+ threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
}
}
@Override
public void withdraw(String path) {
checkArgument(path != null);
+
Lock lock = openContests.remove(path);
if (lock != null && lock.isLocked()) {
@@ -171,13 +172,20 @@
private void notifyListeners(LeadershipEvent event) {
for (LeadershipEventListener listener : listeners) {
- listener.event(event);
+ try {
+ listener.event(event);
+ } catch (Exception e) {
+ log.error("Notifying listener failed with exception.", e);
+ }
}
}
private void tryAcquireLeadership(String path) {
Lock lock = openContests.get(path);
- verifyNotNull(lock, "Lock should not be null");
+ if (lock == null) {
+ // withdrew from race.
+ return;
+ }
lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
if (error == null) {
threadPool.schedule(
@@ -190,13 +198,8 @@
new Leadership(lock.path(), localNode, lock.epoch())));
return;
} else {
- log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
- try {
- Thread.sleep(WAIT_BEFORE_RETRY_MS);
- tryAcquireLeadership(path);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
+ threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
});
}
@@ -211,24 +214,52 @@
@Override
public void run() {
- if (lock.extendExpiration(TERM_DURATION_MS)) {
+ if (!openContests.containsKey(lock.path())) {
+ log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
+ return;
+ }
+
+ boolean lockExtended = false;
+ try {
+ lockExtended = lock.extendExpiration(TERM_DURATION_MS);
+ } catch (Exception e) {
+ log.warn("Attempt to extend lock failed with an exception.", e);
+ }
+
+ if (lockExtended) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(lock.path(), localNode, lock.epoch())));
threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
+ // Check if this node already withdrew from the contest, in which case
+ // we don't need to notify here.
if (openContests.containsKey(lock.path())) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNode, lock.epoch())));
- tryAcquireLeadership(lock.path());
+ // Retry leadership after a brief wait.
+ threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
}
}
}
+ private class TryLeadership implements Runnable {
+ private final Lock lock;
+
+ public TryLeadership(Lock lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void run() {
+ tryAcquireLeadership(lock.path());
+ }
+ }
+
private class PeerAdvertiser implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {