Add atomic mastership/term/backups method to MastershipService
Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
package org.onosproject.incubator.store.virtual.impl;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
import org.slf4j.Logger;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkArgument;
@Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
checkArgument(networkId != null, NETWORK_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+ }
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService.getNodes()
- .forEach((node) -> roles.put(node.id(),
- getRole(networkId, node.id(), deviceId)));
-
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
-
- List<NodeId> candidates = leadershipService
- .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
-
- List<NodeId> sortedStandbyList = candidates.stream()
- .filter(standbys::contains).collect(Collectors.toList());
-
- return new RoleInfo(master, sortedStandbyList);
+ @Override
+ public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return buildMastershipFromLeadership(leadership);
}
@Override
@@ -322,9 +308,8 @@
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
- return CompletableFuture.completedFuture(new MastershipEvent(eventType,
- deviceId,
- getNodes(networkId, deviceId)));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
}
private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
+ private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+ ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+ if (leadership.leaderNodeId() != null) {
+ builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+ }
+ leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !leadership.candidates().contains(node.id()))
+ .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+ return new MastershipInfo(
+ leadership.leader() != null ? leadership.leader().term() : 0,
+ leadership.leader() != null
+ ? Optional.of(leadership.leader().nodeId())
+ : Optional.empty(),
+ builder.build());
+ }
+
private class InternalDeviceMastershipEventListener
implements LeadershipEventListener {
@@ -357,28 +360,23 @@
NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
- RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
- getNodes(networkId, deviceId) : new RoleInfo();
+ MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+ ? buildMastershipFromLeadership(event.subject())
+ : new MastershipInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case LEADER_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
break;
case SERVICE_DISRUPTED:
- notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
break;
case SERVICE_RESTORED:
// Do nothing, wait for updates from peers
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
index 730ebee..346999b 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/SimpleVirtualMastershipStore.java
@@ -17,6 +17,7 @@
package org.onosproject.incubator.store.virtual.impl;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -36,6 +37,7 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -50,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -122,7 +125,7 @@
// remove from backup list
removeFromBackups(networkId, deviceId, node);
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
@@ -132,13 +135,13 @@
masterMap.put(deviceId, node);
incrementTerm(networkId, deviceId);
notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
// add to backup list
if (addToBackup(networkId, deviceId, node)) {
notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
return CompletableFuture.completedFuture(MastershipRole.STANDBY);
default:
@@ -184,6 +187,27 @@
}
@Override
+ public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+ Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
+ Map<DeviceId, AtomicInteger> termMap = getTermMap(networkId);
+ Map<DeviceId, List<NodeId>> backups = getBackups(networkId);
+ ImmutableMap.Builder<NodeId, MastershipRole> roleBuilder = ImmutableMap.builder();
+ NodeId master = masterMap.get(deviceId);
+ if (master != null) {
+ roleBuilder.put(master, MastershipRole.MASTER);
+ }
+ backups.getOrDefault(master, Collections.emptyList())
+ .forEach(nodeId -> roleBuilder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !masterMap.containsValue(node.id()))
+ .forEach(node -> roleBuilder.put(node.id(), MastershipRole.NONE));
+ return new MastershipInfo(
+ termMap.getOrDefault(deviceId, new AtomicInteger(NOTHING)).get(),
+ Optional.ofNullable(master),
+ roleBuilder.build());
+ }
+
+ @Override
public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
Map<DeviceId, NodeId> masterMap = getMasterMap(networkId);
@@ -219,7 +243,7 @@
}
return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(networkId, deviceId)));
+ new MastershipEvent(MASTER_CHANGED, deviceId, getMastership(networkId, deviceId)));
}
@Override
@@ -249,14 +273,14 @@
// TODO: Should there be new event type for no MASTER?
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
} else {
NodeId prevMaster = masterMap.put(deviceId, backup);
incrementTerm(networkId, deviceId);
addToBackup(networkId, deviceId, prevMaster);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
case STANDBY:
@@ -265,7 +289,7 @@
if (modified) {
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
break;
@@ -314,13 +338,13 @@
incrementTerm(networkId, deviceId);
return CompletableFuture.completedFuture(
new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
case STANDBY:
if (removeFromBackups(networkId, deviceId, nodeId)) {
return CompletableFuture.completedFuture(
new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(networkId, deviceId)));
+ getMastership(networkId, deviceId)));
}
break;