LeadershipStore updates:
- Now tracking leader and candidates for a topic using a single map.
- Using term numbers that are incremented by one every time a new leader is elected.
- Introduced a separate LeadershipStore to conform to the manager-store pattern
Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
new file mode 100644
index 0000000..c6647b7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
+ */
+@Service
+@Component(immediate = true, enabled = true)
+public class DistributedLeadershipStore
+ extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
+ implements LeadershipStore {
+
+ private static final Logger log = getLogger(DistributedLeadershipStore.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ protected NodeId localNodeId;
+ protected ConsistentMap<String, InternalLeadership> leadershipMap;
+ private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
+ event -> {
+ Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
+ Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
+ boolean leaderChanged =
+ !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
+ boolean candidatesChanged =
+ !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
+ ImmutableSet.<NodeId>of() : oldValue.candidates()),
+ Sets.newHashSet(newValue.candidates())).isEmpty();
+ LeadershipEvent.Type eventType = null;
+ if (leaderChanged && candidatesChanged) {
+ eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
+ }
+ if (leaderChanged && !candidatesChanged) {
+ eventType = LeadershipEvent.Type.LEADER_CHANGED;
+ }
+ if (!leaderChanged && candidatesChanged) {
+ eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
+ }
+ notifyDelegate(new LeadershipEvent(eventType, newValue));
+ };
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
+ .withName("onos-leadership")
+ .withPartitionsDisabled()
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
+ .build();
+ leadershipMap.addListener(leadershipChangeListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ leadershipMap.removeListener(leadershipChangeListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Leadership addRegistration(String topic) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v == null || !v.candidates().contains(localNodeId),
+ (k, v) -> {
+ if (v == null || v.candidates().isEmpty()) {
+ return new InternalLeadership(topic,
+ localNodeId,
+ v == null ? 1 : v.term() + 1,
+ System.currentTimeMillis(),
+ ImmutableList.of(localNodeId));
+ }
+ List<NodeId> newCandidates = new ArrayList<>(v.candidates());
+ newCandidates.add(localNodeId);
+ return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
+ });
+ return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
+ }
+
+ @Override
+ public void removeRegistration(String topic) {
+ removeRegistration(topic, localNodeId);
+ }
+
+ private void removeRegistration(String topic, NodeId nodeId) {
+ leadershipMap.computeIf(topic,
+ v -> v != null && v.candidates().contains(nodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = v.candidates()
+ .stream()
+ .filter(id -> !nodeId.equals(id))
+ .collect(Collectors.toList());
+ NodeId newLeader = nodeId.equals(v.leader()) ?
+ newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
+ long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+ v.term() : v.term() + 1;
+ long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+ v.termStartTime() : System.currentTimeMillis();
+ return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
+ });
+ }
+
+ @Override
+ public void removeRegistration(NodeId nodeId) {
+ leadershipMap.entrySet()
+ .stream()
+ .filter(e -> e.getValue().value().candidates().contains(nodeId))
+ .map(e -> e.getKey())
+ .forEach(topic -> this.removeRegistration(topic, nodeId));
+ }
+
+ @Override
+ public boolean moveLeadership(String topic, NodeId toNodeId) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v != null &&
+ v.candidates().contains(toNodeId) &&
+ !Objects.equal(v.leader(), toNodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = new ArrayList<>();
+ newCandidates.add(toNodeId);
+ newCandidates.addAll(v.candidates()
+ .stream()
+ .filter(id -> !toNodeId.equals(id))
+ .collect(Collectors.toList()));
+ return new InternalLeadership(topic,
+ toNodeId,
+ v.term() + 1,
+ System.currentTimeMillis(),
+ newCandidates);
+ });
+ return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
+ }
+
+ @Override
+ public boolean makeTopCandidate(String topic, NodeId nodeId) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v != null &&
+ v.candidates().contains(nodeId) &&
+ !v.candidates().get(0).equals(nodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = new ArrayList<>();
+ newCandidates.add(nodeId);
+ newCandidates.addAll(v.candidates()
+ .stream()
+ .filter(id -> !nodeId.equals(id))
+ .collect(Collectors.toList()));
+ return new InternalLeadership(topic,
+ v.leader(),
+ v.term(),
+ System.currentTimeMillis(),
+ newCandidates);
+ });
+ return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
+ }
+
+ @Override
+ public Leadership getLeadership(String topic) {
+ return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
+ }
+
+ @Override
+ public Map<String, Leadership> getLeaderships() {
+ Map<String, Leadership> leaderships = Maps.newHashMap();
+ leadershipMap.entrySet().forEach(e -> {
+ leaderships.put(e.getKey(), e.getValue().value().asLeadership());
+ });
+ return ImmutableMap.copyOf(leaderships);
+ }
+
+ private static class InternalLeadership {
+ private final String topic;
+ private final NodeId leader;
+ private final long term;
+ private final long termStartTime;
+ private final List<NodeId> candidates;
+
+ public InternalLeadership(String topic,
+ NodeId leader,
+ long term,
+ long termStartTime,
+ List<NodeId> candidates) {
+ this.topic = topic;
+ this.leader = leader;
+ this.term = term;
+ this.termStartTime = termStartTime;
+ this.candidates = ImmutableList.copyOf(candidates);
+ }
+
+ public NodeId leader() {
+ return this.leader;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ public long termStartTime() {
+ return termStartTime;
+ }
+
+ public List<NodeId> candidates() {
+ return candidates;
+ }
+
+ public Leadership asLeadership() {
+ return new Leadership(topic, leader == null ?
+ null : new Leader(leader, term, termStartTime), candidates);
+ }
+
+ public static Leadership toLeadership(InternalLeadership internalLeadership) {
+ return internalLeadership == null ? null : internalLeadership.asLeadership();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("leader", leader)
+ .add("term", term)
+ .add("termStartTime", termStartTime)
+ .add("candidates", candidates)
+ .toString();
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index bfb4754..d2c63f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -21,8 +21,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
@@ -76,7 +74,6 @@
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
- private ClusterEventListener clusterListener = new InternalClusterEventListener();
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
@@ -84,7 +81,6 @@
@Activate
public void activate() {
leadershipService.addListener(leaderListener);
- clusterService.addListener(clusterListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
@@ -103,7 +99,6 @@
eventDispatcher.removeSink(IntentPartitionEvent.class);
leadershipService.removeListener(leaderListener);
- clusterService.removeListener(clusterListener);
}
/**
@@ -180,7 +175,7 @@
List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
.stream()
- .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
+ .filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
.filter(l -> l.topic().startsWith(ELECTION_PREFIX))
.collect(Collectors.toList());
@@ -220,24 +215,16 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
+ if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
- // See if we need to let some partitions go
- scheduleRebalance(0);
-
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
- }
- }
- private final class InternalClusterEventListener implements
- ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- scheduleRebalance(0);
+ if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
+ scheduleRebalance(0);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
import org.slf4j.Logger;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipAdminService leadershipAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
private NodeId localNodeId;
- private final Set<DeviceId> connectedDevices = Sets.newHashSet();
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-role-relinquish");
- private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
- new MessageSubject("mastership-store-device-mastership-relinquish");
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
- clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::decode,
- this::transitionFromMasterToStandby,
- SERIALIZER::encode,
- messageHandlingExecutor);
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
@@ -146,7 +141,6 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
- clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- connectedDevices.add(deviceId);
- return leadershipService.runForLeadership(leadershipTopic)
- .thenApply(leadership -> {
- return Objects.equal(localNodeId, leadership.leader())
- ? MastershipRole.MASTER : MastershipRole.STANDBY;
- });
+ Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+ return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+ ? MastershipRole.MASTER : MastershipRole.STANDBY);
}
@Override
@@ -173,20 +164,19 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId leader = leadershipService.getLeader(leadershipTopic);
- if (Objects.equal(nodeId, leader)) {
- return MastershipRole.MASTER;
- }
- return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
- MastershipRole.STANDBY : MastershipRole.NONE;
+ Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ return Objects.equal(nodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@Override
public NodeId getMaster(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- return leadershipService.getLeader(leadershipTopic);
+ return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
}
@Override
@@ -194,9 +184,8 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService
- .getNodes()
- .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+ clusterService.getNodes()
+ .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (nodeId.equals(currentMaster)) {
- return CompletableFuture.completedFuture(null);
- } else {
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
- if (candidates.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
- if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
- // There is brief wait before we step down from mastership.
- // This is to ensure any work that happens when standby preference
- // order changes can complete. For example: flow entries need to be backed
- // to the new top standby (ONOS-1883)
- // FIXME: This potentially introduces a race-condition.
- // Right now role changes are only forced via CLI.
- transferExecutor.schedule(() -> {
- result.complete(transitionFromMasterToStandby(deviceId));
- }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
- return result;
- } else {
- log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
- }
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+ transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+ WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
}
return CompletableFuture.completedFuture(null);
}
@@ -267,7 +236,7 @@
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
- return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+ return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
}
@Override
@@ -318,71 +287,44 @@
private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- // Check if this node is can be managed by this node.
- if (!connectedDevices.contains(deviceId)) {
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
return CompletableFuture.completedFuture(null);
}
-
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
- MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
- ? MastershipEvent.Type.MASTER_CHANGED
- : MastershipEvent.Type.BACKUPS_CHANGED;
-
- connectedDevices.remove(deviceId);
- return leadershipService.withdraw(leadershipTopic)
- .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
- }
-
- private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
-
- NodeId currentMaster = getMaster(deviceId);
- if (currentMaster == null) {
- return null;
- }
-
- if (!currentMaster.equals(localNodeId)) {
- log.info("Forwarding request to relinquish "
- + "mastership for device {} to {}", deviceId, currentMaster);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
- deviceId,
- TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- currentMaster), null);
- }
-
- return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
- ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+ MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+ MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+ leadershipService.withdraw(leadershipTopic);
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
}
@Override
public void relinquishAllRole(NodeId nodeId) {
- // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+ // Noop. LeadershipService already takes care of detecting and purging stale locks.
}
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ Leadership leadership = event.subject();
+ return isDeviceMastershipTopic(leadership.topic());
+ }
+
@Override
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (!isDeviceMastershipTopic(leadership.topic())) {
- return;
- }
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+ RoleInfo roleInfo = getNodes(deviceId);
switch (event.type()) {
- case LEADER_ELECTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_AND_CANDIDATES_CHANGED:
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
- case LEADER_REELECTED:
- // There is no concept of leader re-election in the new distributed leadership manager.
- throw new IllegalStateException("Unexpected event type");
- case LEADER_BOOTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_CHANGED:
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
default:
return;
@@ -407,5 +349,4 @@
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
return m.matches();
}
-
-}
+}
\ No newline at end of file