Updated DistributedLeadershipManager to use ConsistentMap notifications
Change-Id: Ice4e9b295f4216fee13144ec631904f34bdf7b2b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 763deca..439852f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -15,7 +15,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEvent.Type;
import org.onosproject.cluster.ClusterEventListener;
@@ -28,10 +27,10 @@
import org.onosproject.event.ListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
@@ -46,7 +45,6 @@
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -82,11 +80,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
- private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
- new MessageSubject("distributed-leadership-manager-events");
-
private final Logger log = getLogger(getClass());
- private ExecutorService messageHandlingExecutor;
private ScheduledExecutorService electionRunner;
private ScheduledExecutorService lockExecutor;
private ScheduledExecutorService staleLeadershipPurgeExecutor;
@@ -104,7 +98,7 @@
private Set<String> activeTopics = Sets.newConcurrentHashSet();
private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
- // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
+ // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
@@ -112,8 +106,7 @@
private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
- private static final Serializer SERIALIZER = Serializer.using(
- new KryoNamespace.Builder().register(KryoNamespaces.API).build());
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
@Activate
public void activate() {
@@ -126,10 +119,38 @@
.withSerializer(SERIALIZER)
.withPartitionsDisabled().build();
+ leaderMap.addListener(event -> {
+ log.debug("Received {}", event);
+ LeadershipEvent.Type leadershipEventType = null;
+ if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+ leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
+ } else if (event.type() == MapEvent.Type.REMOVE) {
+ leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
+ }
+ onLeadershipEvent(new LeadershipEvent(
+ leadershipEventType,
+ new Leadership(event.key(),
+ event.value().value(),
+ event.value().version(),
+ event.value().creationTime())));
+ });
+
+ candidateMap.addListener(event -> {
+ log.debug("Received {}", event);
+ if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
+ log.error("Entries must not be removed from candidate map");
+ return;
+ }
+ onLeadershipEvent(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(event.key(),
+ event.value().value(),
+ event.value().version(),
+ event.value().creationTime())));
+ });
+
localNodeId = clusterService.getLocalNode().id();
- messageHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/leadership", "message-handler"));
electionRunner = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "election-runner"));
lockExecutor = Executors.newScheduledThreadPool(
@@ -138,11 +159,6 @@
groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "refresh-thread"));
- clusterCommunicator.addSubscriber(
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::decode,
- this::onLeadershipEvent,
- messageHandlingExecutor);
clusterService.addListener(clusterEventListener);
@@ -168,10 +184,8 @@
clusterService.removeListener(clusterEventListener);
eventDispatcher.removeSink(LeadershipEvent.class);
- clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
electionRunner.shutdown();
- messageHandlingExecutor.shutdown();
lockExecutor.shutdown();
staleLeadershipPurgeExecutor.shutdown();
leadershipRefresher.shutdown();
@@ -239,12 +253,6 @@
return newList;
}
});
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- candidates.value(),
- candidates.version(),
- candidates.creationTime())));
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
Leadership leadership = electLeader(path, candidates.value());
@@ -273,41 +281,14 @@
future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
}
try {
- Versioned<NodeId> leader = leaderMap.get(path);
- if (leader != null && Objects.equals(leader.value(), localNodeId)) {
- if (leaderMap.remove(path, leader.version())) {
- log.debug("Gave up leadership for {}", path);
- future.complete(null);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(path,
- localNodeId,
- leader.version(),
- leader.creationTime())));
- }
- }
- // else we are not the current leader, can still be a candidate.
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- List<NodeId> candidateList = candidates != null
- ? Lists.newArrayList(candidates.value())
- : Lists.newArrayList();
- if (!candidateList.remove(localNodeId)) {
- future.complete(null);
- return;
- }
- if (candidateMap.replace(path, candidates.version(), candidateList)) {
- Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- future.complete(null);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- newCandidates.value(),
- newCandidates.version(),
- newCandidates.creationTime())));
- } else {
- log.debug("Failed to withdraw from candidates list for {}. Will retry", path);
- retryWithdraw(path, future);
- }
+ leaderMap.computeIf(path,
+ localNodeId::equals,
+ (topic, leader) -> null);
+ candidateMap.computeIf(path,
+ candidates -> candidates != null && candidates.contains(localNodeId),
+ (topic, candidates) -> candidates.stream()
+ .filter(nodeId -> !localNodeId.equals(nodeId))
+ .collect(Collectors.toList()));
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
retryWithdraw(path, future);
@@ -321,19 +302,9 @@
}
try {
- Versioned<NodeId> leader = leaderMap.get(path);
- if (leader != null && Objects.equals(leader.value(), localNodeId)) {
- if (leaderMap.remove(path, leader.version())) {
- log.debug("Stepped down from leadership for {}", path);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(path,
- localNodeId,
- leader.version(),
- leader.creationTime())));
- return true;
- }
- }
+ return leaderMap.computeIf(path,
+ localNodeId::equals,
+ (topic, leader) -> null) == null;
} catch (Exception e) {
log.warn("Error executing stepdown for {}", path, e);
}
@@ -352,7 +323,7 @@
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
- Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
+ Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
candidates -> candidates != null &&
candidates.contains(nodeId) &&
!nodeId.equals(Iterables.getFirst(candidates, null)),
@@ -362,13 +333,8 @@
candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
return updatedCandidates;
});
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- newCandidates.value(),
- newCandidates.version(),
- newCandidates.creationTime())));
- return true;
+ List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
+ return candidates.size() > 0 && nodeId.equals(candidates.get(0));
}
private Leadership electLeader(String path, List<NodeId> candidates) {
@@ -389,9 +355,6 @@
leader.value(),
leader.version(),
leader.creationTime());
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- newLeadership));
return newLeadership;
}
} catch (Exception e) {
@@ -432,11 +395,6 @@
}
}
- private void publish(LeadershipEvent event) {
- onLeadershipEvent(event);
- clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
- }
-
private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
log.trace("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
@@ -517,15 +475,8 @@
.forEach(entry -> {
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
- long epoch = entry.getValue().version();
- long creationTime = entry.getValue().creationTime();
try {
- if (leaderMap.remove(path, epoch)) {
- log.debug("Purged stale lock held by {} for {}", nodeId, path);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(path, nodeId, epoch, creationTime)));
- }
+ leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
} catch (Exception e) {
log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
rerunPurge.set(true);
@@ -548,21 +499,15 @@
Sets.difference(Sets.newHashSet(candidatesList),
Sets.newHashSet(activeCandidatesList));
try {
- if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
- log.info("Evicted inactive candidates {} from "
- + "candidate list for {}", removedCandidates, path);
- Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
- publish(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- updatedCandidates.value(),
- updatedCandidates.version(),
- updatedCandidates.creationTime())));
- } else {
- // Conflicting update detected. Rerun purge to make sure
- // inactive candidates are evicted.
- rerunPurge.set(true);
- }
+ candidateMap.computeIf(path,
+ c -> c.stream()
+ .filter(n -> clusterService.getState(n) == INACTIVE)
+ .count() > 0,
+ (topic, c) -> c.stream()
+ .filter(n -> clusterService.getState(n) == ACTIVE)
+ .filter(n -> !localNodeId.equals(n) ||
+ activeTopics.contains(path))
+ .collect(Collectors.toList()));
} catch (Exception e) {
log.debug("Failed to evict inactive candidates {} from "
+ "candidate list for {}", removedCandidates, path, e);