Couple of fixes:
1. Retry leadership lock after a successful stepdown
2. setStandby should adjust the candidates list to ensure another node steps up to become the master.
Change-Id: I8dc5da82c9b8b9e99d4118ec33a63037543927f0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 8cd6b59..0d85784 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -53,8 +53,6 @@
import org.onosproject.net.MastershipRole;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
@@ -122,12 +120,16 @@
public void activate() {
messageHandlingExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
- clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
- new RoleQueryHandler(),
+ clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
+ SERIALIZER::decode,
+ deviceId -> getRole(localNodeId, deviceId),
+ SERIALIZER::encode,
messageHandlingExecutor);
- clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
- new RoleRelinquishHandler(),
- messageHandlingExecutor);
+ clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::decode,
+ deviceId -> relinquishRole(localNodeId, deviceId),
+ SERIALIZER::encode,
+ messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
SERIALIZER::decode,
this::transitionFromMasterToStandby,
@@ -211,8 +213,6 @@
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
clusterService
.getNodes()
- .stream()
- .parallel()
.forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
@@ -282,9 +282,21 @@
if (!nodeId.equals(currentMaster)) {
return null;
}
- // FIXME: This can become the master again unless it
- // is first demoted to the end of candidates list.
- return transitionFromMasterToStandby(deviceId);
+
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+
+ NodeId newMaster = candidates.stream()
+ .filter(candidate -> !Objects.equal(nodeId, candidate))
+ .findFirst()
+ .orElse(null);
+ log.info("Transitioning to role {} for {}. Next master: {}",
+ newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
+
+ if (newMaster != null) {
+ return setMaster(newMaster, deviceId);
+ }
+ return relinquishRole(nodeId, deviceId);
}
@Override
@@ -344,28 +356,11 @@
? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
}
- private class RoleQueryHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
- }
- }
-
-
@Override
public void relinquishAllRole(NodeId nodeId) {
// Noop. LeadershipService already takes care of detecting and purging deadlocks.
}
- private class RoleRelinquishHandler implements ClusterMessageHandler {
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
- }
- }
-
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {