ONOS-1965: Evict inactive nodes from candidates map + Rely on cluster events to trigger stale lock purge
Change-Id: Ib7cfea397f98d6271beb78e3b88041bb84550506
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 434d0e2..5976325 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -13,6 +13,9 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEvent.Type;
+import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
@@ -32,6 +35,7 @@
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -55,7 +59,7 @@
/**
* Distributed Lock Manager implemented on top of ConsistentMap.
* <p>
- * This implementation makes use of cluster manager's failure
+ * This implementation makes use of ClusterService's failure
* detection capabilities to detect and purge stale locks.
* TODO: Ensure lock safety and liveness.
*/
@@ -81,27 +85,28 @@
private final Logger log = getLogger(getClass());
private ExecutorService messageHandlingExecutor;
private ScheduledExecutorService retryLeaderLockExecutor;
- private ScheduledExecutorService deadLockDetectionExecutor;
+ private ScheduledExecutorService staleLeadershipPurgeExecutor;
private ScheduledExecutorService leadershipStatusBroadcaster;
private ConsistentMap<String, NodeId> leaderMap;
private ConsistentMap<String, List<NodeId>> candidateMap;
- private ListenerRegistry<LeadershipEvent, LeadershipEventListener>
- listenerRegistry;
+ private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
- private NodeId localNodeId;
+ private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
+ private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
- private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
-
+ private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
private static final int LEADER_CANDIDATE_POS = 0;
+ private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
+
private static final Serializer SERIALIZER = Serializer.using(
new KryoNamespace.Builder().register(KryoNamespaces.API).build());
@@ -122,8 +127,8 @@
groupedThreads("onos/store/leadership", "message-handler"));
retryLeaderLockExecutor = Executors.newScheduledThreadPool(
4, groupedThreads("onos/store/leadership", "election-thread-%d"));
- deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "dead-lock-detector"));
+ staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "peer-updater"));
clusterCommunicator.addSubscriber(
@@ -132,8 +137,8 @@
this::onLeadershipEvent,
messageHandlingExecutor);
- deadLockDetectionExecutor.scheduleWithFixedDelay(
- this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
+ clusterService.addListener(clusterEventListener);
+
leadershipStatusBroadcaster.scheduleWithFixedDelay(
this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
@@ -151,12 +156,13 @@
}
});
+ clusterService.removeListener(clusterEventListener);
eventDispatcher.removeSink(LeadershipEvent.class);
clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
messageHandlingExecutor.shutdown();
retryLeaderLockExecutor.shutdown();
- deadLockDetectionExecutor.shutdown();
+ staleLeadershipPurgeExecutor.shutdown();
leadershipStatusBroadcaster.shutdown();
log.info("Stopped");
@@ -508,12 +514,25 @@
TimeUnit.SECONDS);
}
- private void purgeStaleLocks() {
+ private void scheduleStaleLeadershipPurge(int afterDelaySec) {
+ if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
+ staleLeadershipPurgeExecutor.schedule(
+ this::purgeStaleLeadership,
+ afterDelaySec,
+ TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
+ */
+ private void purgeStaleLeadership() {
+ AtomicBoolean rerunPurge = new AtomicBoolean(false);
try {
+ staleLeadershipPurgeScheduled.set(false);
leaderMap.entrySet()
.stream()
.filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
- .filter(e -> activeTopics.contains(e.getKey()))
.forEach(entry -> {
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
@@ -528,10 +547,52 @@
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
+ rerunPurge.set(true);
+ }
+ });
+
+ candidateMap.entrySet()
+ .forEach(entry -> {
+ String path = entry.getKey();
+ Versioned<List<NodeId>> candidates = entry.getValue();
+ List<NodeId> candidatesList = candidates != null
+ ? candidates.value() : Collections.emptyList();
+ List<NodeId> activeCandidatesList =
+ candidatesList.stream()
+ .filter(n -> clusterService.getState(n) == ACTIVE)
+ .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
+ .collect(Collectors.toList());
+ if (activeCandidatesList.size() < candidatesList.size()) {
+ Set<NodeId> removedCandidates =
+ Sets.difference(Sets.newHashSet(candidatesList),
+ Sets.newHashSet(activeCandidatesList));
+ try {
+ if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
+ log.info("Evicted inactive candidates {} from "
+ + "candidate list for {}", removedCandidates, path);
+ Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ updatedCandidates.value(),
+ updatedCandidates.version(),
+ updatedCandidates.creationTime())));
+ }
+ } catch (Exception e) {
+ log.warn("Failed to evict inactive candidates {} from "
+ + "candidate list for {}", removedCandidates, path, e);
+ rerunPurge.set(true);
+ }
}
});
} catch (Exception e) {
- log.debug("Failed cleaning up stale locks", e);
+ log.warn("Failure purging state leadership.", e);
+ rerunPurge.set(true);
+ }
+
+ if (rerunPurge.get()) {
+ log.info("Rescheduling stale leadership purge due to errors encountered in previous run");
+ scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
}
}
@@ -555,4 +616,14 @@
log.debug("Failed to send leadership updates", e);
}
}
+
+ private class InternalClusterEventListener implements ClusterEventListener {
+
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
+ scheduleStaleLeadershipPurge(0);
+ }
+ }
+ }
}