Add atomic mastership/term/backups method to MastershipService
Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
index 634582a..feb517c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/ReplicaInfo.java
@@ -29,21 +29,33 @@
*/
public final class ReplicaInfo {
+ private final long term;
private final Optional<NodeId> master;
private final List<NodeId> backups;
/**
* Creates a ReplicaInfo instance.
*
+ * @param term monotonically increasing unique mastership term
* @param master NodeId of the node where the master copy should be
* @param backups list of NodeId, where backup copies should be placed
*/
- public ReplicaInfo(NodeId master, List<NodeId> backups) {
+ public ReplicaInfo(long term, NodeId master, List<NodeId> backups) {
+ this.term = term;
this.master = Optional.ofNullable(master);
this.backups = checkNotNull(backups);
}
/**
+ * Returns the mastership term.
+ *
+ * @return the mastership term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
* Returns the NodeId, if there is a Node where the master copy should be.
*
* @return NodeId, where the master copy should be placed
@@ -78,6 +90,7 @@
// for Serializer
private ReplicaInfo() {
+ this.term = 0;
this.master = Optional.empty();
this.backups = Collections.emptyList();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
index 79c9147..cfa998d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ReplicaInfoManager.java
@@ -15,18 +15,16 @@
*/
package org.onosproject.store.flow.impl;
-import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -36,8 +34,6 @@
import org.onosproject.store.flow.ReplicaInfoService;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
@@ -79,7 +75,7 @@
@Override
public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
- return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
+ return buildFromRoleInfo(mastershipService.getMastershipFor(deviceId));
}
@Override
@@ -92,17 +88,15 @@
listenerRegistry.removeListener(checkNotNull(listener));
}
- private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
- List<NodeId> backups = roles.backups() == null ?
- Collections.emptyList() : ImmutableList.copyOf(roles.backups());
- return new ReplicaInfo(roles.master(), backups);
+ private static ReplicaInfo buildFromRoleInfo(MastershipInfo mastership) {
+ return new ReplicaInfo(mastership.term(), mastership.master().orElse(null), mastership.backups());
}
final class InternalMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
- final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
+ final ReplicaInfo replicaInfo = buildFromRoleInfo(event.mastershipInfo());
switch (event.type()) {
case MASTER_CHANGED:
eventDispatcher.post(new ReplicaInfoEvent(MASTER_CHANGED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 1b406e7..826665d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -23,7 +23,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -34,6 +35,7 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +52,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
@@ -62,10 +65,7 @@
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
-import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* Implementation of the MastershipStore on top of Leadership Service.
@@ -158,7 +158,7 @@
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
List<NodeId> candidates = leadership == null ?
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
- MastershipRole role = Objects.equal(localNodeId, leader) ?
+ MastershipRole role = Objects.equals(localNodeId, leader) ?
MastershipRole.MASTER : candidates.contains(localNodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
return CompletableFuture.completedFuture(role);
}
@@ -173,7 +173,7 @@
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
List<NodeId> candidates = leadership == null ?
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
- return Objects.equal(nodeId, leader) ?
+ return Objects.equals(nodeId, leader) ?
MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@@ -187,27 +187,15 @@
@Override
public RoleInfo getNodes(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+ return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+ }
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService.getNodes()
- .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
-
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
-
- List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
-
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
-
- List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
-
- return new RoleInfo(master, sortedStandbyList);
+ @Override
+ public MastershipInfo getMastership(DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(deviceId));
+ return buildMastershipFromLeadership(leadership);
}
@Override
@@ -263,7 +251,7 @@
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
NodeId newMaster = candidates.stream()
- .filter(candidate -> !Objects.equal(nodeId, candidate))
+ .filter(candidate -> !Objects.equals(nodeId, candidate))
.findFirst()
.orElse(null);
log.info("Transitioning to role {} for {}. Next master: {}",
@@ -304,7 +292,7 @@
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
- return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getMastership(deviceId)));
}
@Override
@@ -312,6 +300,27 @@
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
+ private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+ ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+ if (leadership.leaderNodeId() != null) {
+ builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+ }
+ leadership.candidates().stream()
+ .filter(nodeId -> !Objects.equals(leadership.leaderNodeId(), nodeId))
+ .forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !Objects.equals(leadership.leaderNodeId(), node.id()))
+ .filter(node -> !leadership.candidates().contains(node.id()))
+ .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+ return new MastershipInfo(
+ leadership.leader() != null ? leadership.leader().term() : 0,
+ leadership.leader() != null
+ ? Optional.of(leadership.leader().nodeId())
+ : Optional.empty(),
+ builder.build());
+ }
+
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
@@ -328,27 +337,23 @@
private void handleEvent(LeadershipEvent event) {
Leadership leadership = event.subject();
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
- NodeId master = event.subject().leaderNodeId();
- List<NodeId> backups = event.subject().candidates()
- .stream()
- .filter(n -> !n.equals(master))
- .collect(Collectors.toList());
- RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
- ? new RoleInfo(master, backups)
- : new RoleInfo();
+ MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+ ? buildMastershipFromLeadership(event.subject())
+ : new MastershipInfo();
+
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case LEADER_CHANGED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
break;
case SERVICE_DISRUPTED:
- notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
break;
case SERVICE_RESTORED:
// Do nothing, wait for updates from peers
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
index 9709643..992b77b 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ReplicaInfoManagerTest.java
@@ -15,32 +15,33 @@
*/
package org.onosproject.store.flow.impl;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
import org.onosproject.store.flow.ReplicaInfo;
import org.onosproject.store.flow.ReplicaInfoEvent;
import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -101,7 +102,7 @@
// fake MastershipEvent
eventDispatcher.post(new MastershipEvent(Type.MASTER_CHANGED, DID1,
- new RoleInfo(NID1, new LinkedList<>())));
+ new MastershipInfo(1, Optional.of(NID1), ImmutableMap.of(NID1, MastershipRole.MASTER))));
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
@@ -149,8 +150,11 @@
}
@Override
- public RoleInfo getNodesFor(DeviceId deviceId) {
- return new RoleInfo(masters.get(deviceId), Collections.emptyList());
+ public MastershipInfo getMastershipFor(DeviceId deviceId) {
+ return new MastershipInfo(
+ 1,
+ Optional.ofNullable(masters.get(deviceId)),
+ ImmutableMap.of(NID1, MastershipRole.MASTER));
}
@Override