LeadershipStore updates:
- Now tracking leader and candidates for a topic using a single map.
- Using term numbers that are incremented by one every time a new leader is elected.
- Introduced a separate LeadershipStore to conform to the manager-store pattern
Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
import org.slf4j.Logger;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipAdminService leadershipAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
private NodeId localNodeId;
- private final Set<DeviceId> connectedDevices = Sets.newHashSet();
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-role-relinquish");
- private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
- new MessageSubject("mastership-store-device-mastership-relinquish");
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
- clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::decode,
- this::transitionFromMasterToStandby,
- SERIALIZER::encode,
- messageHandlingExecutor);
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
@@ -146,7 +141,6 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
- clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- connectedDevices.add(deviceId);
- return leadershipService.runForLeadership(leadershipTopic)
- .thenApply(leadership -> {
- return Objects.equal(localNodeId, leadership.leader())
- ? MastershipRole.MASTER : MastershipRole.STANDBY;
- });
+ Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+ return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+ ? MastershipRole.MASTER : MastershipRole.STANDBY);
}
@Override
@@ -173,20 +164,19 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId leader = leadershipService.getLeader(leadershipTopic);
- if (Objects.equal(nodeId, leader)) {
- return MastershipRole.MASTER;
- }
- return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
- MastershipRole.STANDBY : MastershipRole.NONE;
+ Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ return Objects.equal(nodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@Override
public NodeId getMaster(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- return leadershipService.getLeader(leadershipTopic);
+ return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
}
@Override
@@ -194,9 +184,8 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService
- .getNodes()
- .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+ clusterService.getNodes()
+ .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (nodeId.equals(currentMaster)) {
- return CompletableFuture.completedFuture(null);
- } else {
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
- if (candidates.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
- if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
- // There is brief wait before we step down from mastership.
- // This is to ensure any work that happens when standby preference
- // order changes can complete. For example: flow entries need to be backed
- // to the new top standby (ONOS-1883)
- // FIXME: This potentially introduces a race-condition.
- // Right now role changes are only forced via CLI.
- transferExecutor.schedule(() -> {
- result.complete(transitionFromMasterToStandby(deviceId));
- }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
- return result;
- } else {
- log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
- }
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+ transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+ WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
}
return CompletableFuture.completedFuture(null);
}
@@ -267,7 +236,7 @@
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
- return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+ return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
}
@Override
@@ -318,71 +287,44 @@
private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- // Check if this node is can be managed by this node.
- if (!connectedDevices.contains(deviceId)) {
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
return CompletableFuture.completedFuture(null);
}
-
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
- MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
- ? MastershipEvent.Type.MASTER_CHANGED
- : MastershipEvent.Type.BACKUPS_CHANGED;
-
- connectedDevices.remove(deviceId);
- return leadershipService.withdraw(leadershipTopic)
- .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
- }
-
- private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
-
- NodeId currentMaster = getMaster(deviceId);
- if (currentMaster == null) {
- return null;
- }
-
- if (!currentMaster.equals(localNodeId)) {
- log.info("Forwarding request to relinquish "
- + "mastership for device {} to {}", deviceId, currentMaster);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
- deviceId,
- TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- currentMaster), null);
- }
-
- return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
- ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+ MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+ MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+ leadershipService.withdraw(leadershipTopic);
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
}
@Override
public void relinquishAllRole(NodeId nodeId) {
- // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+ // Noop. LeadershipService already takes care of detecting and purging stale locks.
}
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ Leadership leadership = event.subject();
+ return isDeviceMastershipTopic(leadership.topic());
+ }
+
@Override
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (!isDeviceMastershipTopic(leadership.topic())) {
- return;
- }
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+ RoleInfo roleInfo = getNodes(deviceId);
switch (event.type()) {
- case LEADER_ELECTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_AND_CANDIDATES_CHANGED:
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
- case LEADER_REELECTED:
- // There is no concept of leader re-election in the new distributed leadership manager.
- throw new IllegalStateException("Unexpected event type");
- case LEADER_BOOTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_CHANGED:
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
default:
return;
@@ -407,5 +349,4 @@
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
return m.matches();
}
-
-}
+}
\ No newline at end of file