Add atomic mastership/term/backups method to MastershipService
Change-Id: I18c3aeaa5101c9ce08ff38fffd70eaec903a0f3e
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
index 3d1c376..4176130 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -16,6 +16,7 @@
package org.onosproject.incubator.store.virtual.impl;
+import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +35,7 @@
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipInfo;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
@@ -45,7 +47,7 @@
import org.slf4j.Logger;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -64,8 +66,6 @@
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkArgument;
@Component(immediate = true, enabled = false)
@@ -188,30 +188,16 @@
public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
checkArgument(networkId != null, NETWORK_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return new RoleInfo(leadership.leaderNodeId(), leadership.candidates());
+ }
- Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService.getNodes()
- .forEach((node) -> roles.put(node.id(),
- getRole(networkId, node.id(), deviceId)));
-
- NodeId master = null;
- final List<NodeId> standbys = Lists.newLinkedList();
-
- List<NodeId> candidates = leadershipService
- .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
-
- for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
- if (entry.getValue() == MastershipRole.MASTER) {
- master = entry.getKey();
- } else if (entry.getValue() == MastershipRole.STANDBY) {
- standbys.add(entry.getKey());
- }
- }
-
- List<NodeId> sortedStandbyList = candidates.stream()
- .filter(standbys::contains).collect(Collectors.toList());
-
- return new RoleInfo(master, sortedStandbyList);
+ @Override
+ public MastershipInfo getMastership(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+ Leadership leadership = leadershipService.getLeadership(createDeviceMastershipTopic(networkId, deviceId));
+ return buildMastershipFromLeadership(leadership);
}
@Override
@@ -322,9 +308,8 @@
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
- return CompletableFuture.completedFuture(new MastershipEvent(eventType,
- deviceId,
- getNodes(networkId, deviceId)));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(eventType, deviceId, getMastership(networkId, deviceId)));
}
private CompletableFuture<MastershipEvent>
@@ -338,6 +323,24 @@
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
+ private MastershipInfo buildMastershipFromLeadership(Leadership leadership) {
+ ImmutableMap.Builder<NodeId, MastershipRole> builder = ImmutableMap.builder();
+ if (leadership.leaderNodeId() != null) {
+ builder.put(leadership.leaderNodeId(), MastershipRole.MASTER);
+ }
+ leadership.candidates().forEach(nodeId -> builder.put(nodeId, MastershipRole.STANDBY));
+ clusterService.getNodes().stream()
+ .filter(node -> !leadership.candidates().contains(node.id()))
+ .forEach(node -> builder.put(node.id(), MastershipRole.NONE));
+
+ return new MastershipInfo(
+ leadership.leader() != null ? leadership.leader().term() : 0,
+ leadership.leader() != null
+ ? Optional.of(leadership.leader().nodeId())
+ : Optional.empty(),
+ builder.build());
+ }
+
private class InternalDeviceMastershipEventListener
implements LeadershipEventListener {
@@ -357,28 +360,23 @@
NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-
- RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
- getNodes(networkId, deviceId) : new RoleInfo();
+ MastershipInfo mastershipInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED
+ ? buildMastershipFromLeadership(event.subject())
+ : new MastershipInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case LEADER_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED, deviceId, mastershipInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED, deviceId, mastershipInfo));
break;
case SERVICE_DISRUPTED:
- notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
- deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(SUSPENDED, deviceId, mastershipInfo));
break;
case SERVICE_RESTORED:
// Do nothing, wait for updates from peers