Couple of fixes:
1. Retry leadership lock after a successful stepdown
2. setStandby should adjust the candidates list to ensure another node steps up to become the master.
Change-Id: I8dc5da82c9b8b9e99d4118ec33a63037543927f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index ed34928..69a1a8a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,7 +12,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
@@ -23,8 +22,6 @@
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
@@ -44,6 +41,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -128,7 +126,8 @@
groupedThreads("onos/store/leadership", "peer-updater"));
clusterCommunicator.addSubscriber(
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- new InternalLeadershipEventListener(),
+ SERIALIZER::decode,
+ this::onLeadershipEvent,
messageHandlingExecutor);
deadLockDetectionExecutor.scheduleWithFixedDelay(
@@ -139,7 +138,7 @@
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
- log.info("Started.");
+ log.info("Started");
}
@Deactivate
@@ -158,7 +157,7 @@
deadLockDetectionExecutor.shutdown();
leadershipStatusBroadcaster.shutdown();
- log.info("Stopped.");
+ log.info("Stopped");
}
@Override
@@ -210,8 +209,12 @@
candidateList.add(localNodeId);
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- notifyCandidateUpdated(
- path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ newCandidates.value(),
+ newCandidates.version(),
+ newCandidates.creationTime())));
} else {
rerunForLeadership(path);
return;
@@ -221,7 +224,12 @@
List<NodeId> candidateList = ImmutableList.of(localNodeId);
if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ newCandidates.value(),
+ newCandidates.version(),
+ newCandidates.creationTime())));
} else {
rerunForLeadership(path);
return;
@@ -245,7 +253,12 @@
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
- notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(path,
+ localNodeId,
+ leader.version(),
+ leader.creationTime())));
}
}
// else we are not the current leader, can still be a candidate.
@@ -258,7 +271,12 @@
}
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates);
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ newCandidates.value(),
+ newCandidates.version(),
+ newCandidates.creationTime())));
} else {
log.warn("Failed to withdraw from candidates list. Will retry");
retryWithdraw(path);
@@ -279,8 +297,14 @@
Versioned<NodeId> leader = leaderMap.get(path);
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
- log.info("Gave up leadership for {}", path);
- notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
+ log.info("Stepped down from leadership for {}", path);
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(path,
+ localNodeId,
+ leader.version(),
+ leader.creationTime())));
+ retryLock(path);
return true;
}
}
@@ -306,30 +330,35 @@
if (candidates == null || !candidates.value().contains(nodeId)) {
return false;
}
- if (nodeId.equals(candidates.value().get(0))) {
+ List<NodeId> currentRoster = candidates.value();
+ if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
return true;
}
- List<NodeId> currentRoster = candidates.value();
List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
newRoster.add(nodeId);
currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
if (updated) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
- notifyCandidateUpdated(
- path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.CANDIDATES_CHANGED,
+ new Leadership(path,
+ newCandidates.value(),
+ newCandidates.version(),
+ newCandidates.creationTime())));
}
return updated;
}
private void tryLeaderLock(String path) {
- if (!activeTopics.contains(path)) {
+ if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
return;
}
try {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
- List<NodeId> activeNodes = candidates.value().stream()
+ List<NodeId> activeNodes = candidates.value()
+ .stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
@@ -353,8 +382,12 @@
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
// FIXME: candidates can get out of sync.
- notifyNewLeader(
- path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(path,
+ localNodeId,
+ currentLeader.version(),
+ currentLeader.creationTime())));
} else {
// someone else has leadership. will retry after sometime.
retryLock(path);
@@ -365,7 +398,12 @@
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = leaderMap.get(path);
// FIXME: candidates can get out of sync
- notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(path,
+ newLeader.value(),
+ newLeader.version(),
+ newLeader.creationTime())));
} else {
// someone beat us to it.
retryLock(path);
@@ -377,140 +415,51 @@
}
}
- private void notifyCandidateUpdated(
- String path, List<NodeId> candidates, long epoch, long electedTime) {
- Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
- final MutableBoolean updated = new MutableBoolean(false);
- candidateBoard.compute(path, (k, current) -> {
- if (current == null || current.epoch() < newInfo.epoch()) {
- log.debug("updating candidateboard with {}", newInfo);
- updated.setTrue();
- return newInfo;
- }
- return current;
- });
- // maybe rethink types of candidates events
- if (updated.booleanValue()) {
- LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
- notifyPeers(event);
+ private void publish(LeadershipEvent event) {
+ onLeadershipEvent(event);
+ clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
+ }
+
+ private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
+ log.trace("Leadership Event: time = {} type = {} event = {}",
+ leadershipEvent.time(), leadershipEvent.type(),
+ leadershipEvent);
+
+ Leadership leadershipUpdate = leadershipEvent.subject();
+ LeadershipEvent.Type eventType = leadershipEvent.type();
+ String topic = leadershipUpdate.topic();
+
+ AtomicBoolean updateAccepted = new AtomicBoolean(false);
+ if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
+ if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+ updateAccepted.set(true);
+ return leadershipUpdate;
+ }
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
+ if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+ updateAccepted.set(true);
+ return null;
+ }
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+ candidateBoard.compute(topic, (k, currentInfo) -> {
+ if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
+ updateAccepted.set(true);
+ return leadershipUpdate;
+ }
+ return currentInfo;
+ });
+ } else {
+ throw new IllegalStateException("Unknown event type.");
}
- }
- private void notifyCandidateRemoved(
- String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
- Leadership newInfo = (candidates == null)
- ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
- : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
- final MutableBoolean updated = new MutableBoolean(false);
-
- candidateBoard.compute(path, (k, current) -> {
- if (current != null && current.epoch() < newInfo.epoch()) {
- updated.setTrue();
- return newInfo;
- }
- return current;
- });
- // maybe rethink types of candidates events
- if (updated.booleanValue()) {
- log.debug("updated candidateboard with removal: {}", newInfo);
- LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
- notifyPeers(event);
- }
- }
-
- private void notifyNewLeader(String path, NodeId leader,
- List<NodeId> candidates, long epoch, long electedTime) {
- Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
- final MutableBoolean updatedLeader = new MutableBoolean(false);
- log.debug("candidates for new Leadership {}", candidates);
- leaderBoard.compute(path, (k, currentLeader) -> {
- if (currentLeader == null || currentLeader.epoch() < epoch) {
- log.debug("updating leaderboard with new {}", newLeadership);
- updatedLeader.setTrue();
- return newLeadership;
- }
- return currentLeader;
- });
-
- if (updatedLeader.booleanValue()) {
- LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
- notifyPeers(event);
- }
- }
-
- private void notifyPeers(LeadershipEvent event) {
- eventDispatcher.post(event);
- clusterCommunicator.broadcast(event,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
- }
-
- private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- Leadership oldLeadership = new Leadership(
- path, leader, candidates.value(), epoch, electedTime);
- final MutableBoolean updatedLeader = new MutableBoolean(false);
- leaderBoard.compute(path, (k, currentLeader) -> {
- if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
- updatedLeader.setTrue();
- return null;
- }
- return currentLeader;
- });
-
- if (updatedLeader.booleanValue()) {
- LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
- notifyPeers(event);
- }
- }
-
- private class InternalLeadershipEventListener implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- LeadershipEvent leadershipEvent =
- SERIALIZER.decode(message.payload());
-
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
-
- Leadership leadershipUpdate = leadershipEvent.subject();
- LeadershipEvent.Type eventType = leadershipEvent.type();
- String topic = leadershipUpdate.topic();
-
- MutableBoolean updateAccepted = new MutableBoolean(false);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.setTrue();
- return leadershipUpdate;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
- updateAccepted.setTrue();
- return null;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- candidateBoard.compute(topic, (k, currentInfo) -> {
- if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.setTrue();
- return leadershipUpdate;
- }
- return currentInfo;
- });
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
-
- if (updateAccepted.booleanValue()) {
- eventDispatcher.post(leadershipEvent);
- }
+ if (updateAccepted.get()) {
+ eventDispatcher.post(leadershipEvent);
}
}
@@ -549,7 +498,9 @@
try {
if (leaderMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
- notifyRemovedLeader(path, nodeId, epoch, creationTime);
+ publish(new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(path, nodeId, epoch, creationTime)));
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8cd6b59..0d85784 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -53,8 +53,6 @@
import org.onosproject.net.MastershipRole;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -122,12 +120,16 @@
public void activate() {
messageHandlingExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
- clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
- new RoleQueryHandler(),
+ clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
+ SERIALIZER::decode,
+ deviceId -> getRole(localNodeId, deviceId),
+ SERIALIZER::encode,
messageHandlingExecutor);
- clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
- new RoleRelinquishHandler(),
- messageHandlingExecutor);
+ clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::decode,
+ deviceId -> relinquishRole(localNodeId, deviceId),
+ SERIALIZER::encode,
+ messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
SERIALIZER::decode,
this::transitionFromMasterToStandby,
@@ -211,8 +213,6 @@
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
clusterService
.getNodes()
- .stream()
- .parallel()
.forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
@@ -282,9 +282,21 @@
if (!nodeId.equals(currentMaster)) {
return null;
}
- // FIXME: This can become the master again unless it
- // is first demoted to the end of candidates list.
- return transitionFromMasterToStandby(deviceId);
+
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+
+ NodeId newMaster = candidates.stream()
+ .filter(candidate -> !Objects.equal(nodeId, candidate))
+ .findFirst()
+ .orElse(null);
+ log.info("Transitioning to role {} for {}. Next master: {}",
+ newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
+
+ if (newMaster != null) {
+ return setMaster(newMaster, deviceId);
+ }
+ return relinquishRole(nodeId, deviceId);
}
@Override
@@ -344,28 +356,11 @@
? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
}
- private class RoleQueryHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
- }
- }
-
-
@Override
public void relinquishAllRole(NodeId nodeId) {
// Noop. LeadershipService already takes care of detecting and purging deadlocks.
}
- private class RoleRelinquishHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
- }
- }
-
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {