ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy
Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
index 35d7c4e..6b64705 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
@@ -16,6 +16,7 @@
package org.onosproject.mastership;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
@@ -80,7 +81,7 @@
* @param deviceId device identifier
* @return a mastership event
*/
- MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId);
+ CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId);
/**
* Returns the current master and number of past mastership hand-offs
@@ -100,7 +101,7 @@
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
- MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId);
+ CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId);
/**
* Allows a controller instance to give up its current role for a device.
@@ -111,7 +112,7 @@
* @param deviceId device to revoke mastership role for
* @return a mastership event
*/
- MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId);
+ CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId);
/**
* Removes all the roles for the specified controller instance.
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index e34790d..cb2fd18 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,7 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -50,6 +51,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.metrics.MetricsUtil.startTimer;
@@ -111,26 +114,28 @@
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
- MastershipEvent event = null;
+ CompletableFuture<MastershipEvent> eventFuture = null;
switch (role) {
case MASTER:
- event = store.setMaster(nodeId, deviceId);
+ eventFuture = store.setMaster(nodeId, deviceId);
break;
case STANDBY:
- event = store.setStandby(nodeId, deviceId);
+ eventFuture = store.setStandby(nodeId, deviceId);
break;
case NONE:
- event = store.relinquishRole(nodeId, deviceId);
+ eventFuture = store.relinquishRole(nodeId, deviceId);
break;
default:
log.info("Unknown role; ignoring");
return;
}
- if (event != null) {
- post(event);
- }
+ eventFuture.whenComplete((event, error) -> {
+ if (event != null) {
+ post(event);
+ }
+ });
}
@Override
@@ -141,12 +146,12 @@
@Override
public void relinquishMastership(DeviceId deviceId) {
- MastershipEvent event = null;
- event = store.relinquishRole(
- clusterService.getLocalNode().id(), deviceId);
- if (event != null) {
- post(event);
- }
+ store.relinquishRole(clusterService.getLocalNode().id(), deviceId)
+ .whenComplete((event, error) -> {
+ if (event != null) {
+ post(event);
+ }
+ });
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index e042982..8bfab64 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -62,6 +62,8 @@
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoEvent;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
@@ -181,6 +183,7 @@
registerMessageHandlers(messageHandlingExecutor);
if (backupEnabled) {
+ replicaInfoManager.addListener(flowTable);
backupTask = backupSenderExecutor.scheduleWithFixedDelay(
flowTable::backup,
0,
@@ -193,6 +196,10 @@
@Deactivate
public void deactivate(ComponentContext context) {
+ if (backupEnabled) {
+ replicaInfoManager.removeListener(flowTable);
+ backupTask.cancel(true);
+ }
configService.unregisterProperties(getClass(), false);
unregisterMessageHandlers();
messageHandlingExecutor.shutdownNow();
@@ -232,9 +239,14 @@
boolean restartBackupTask = false;
if (newBackupEnabled != backupEnabled) {
backupEnabled = newBackupEnabled;
- if (!backupEnabled && backupTask != null) {
- backupTask.cancel(false);
- backupTask = null;
+ if (!backupEnabled) {
+ replicaInfoManager.removeListener(flowTable);
+ if (backupTask != null) {
+ backupTask.cancel(false);
+ backupTask = null;
+ }
+ } else {
+ replicaInfoManager.addListener(flowTable);
}
restartBackupTask = backupEnabled;
}
@@ -590,7 +602,7 @@
}
}
- private class InternalFlowTable {
+ private class InternalFlowTable implements ReplicaInfoEventListener {
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
@@ -603,6 +615,43 @@
return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
}
+ @Override
+ public void event(ReplicaInfoEvent event) {
+ if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
+ DeviceId deviceId = event.subject();
+ if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
+ // ignore since this event is for a device this node does not manage.
+ return;
+ }
+ NodeId latestBackupNode = getBackupNode(deviceId);
+ NodeId existingBackupNode = lastBackupNodes.get(deviceId);
+ if (Objects.equal(latestBackupNode, existingBackupNode)) {
+ // ignore since backup location hasn't changed.
+ return;
+ }
+ backupFlowEntries(latestBackupNode, Sets.newHashSet(deviceId));
+ }
+ }
+
+ private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
+ log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
+ Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
+ Maps.newConcurrentMap();
+ flowEntries.forEach((key, value) -> {
+ if (deviceIds.contains(key)) {
+ deviceFlowEntries.put(key, value);
+ }
+ });
+ clusterCommunicator.unicast(deviceFlowEntries,
+ FLOW_TABLE_BACKUP,
+ SERIALIZER::encode,
+ nodeId);
+ deviceIds.forEach(id -> {
+ lastBackupTimes.put(id, System.currentTimeMillis());
+ lastBackupNodes.put(id, nodeId);
+ });
+ }
+
/**
* Returns the flow table for specified device.
*
@@ -662,7 +711,6 @@
if (!backupEnabled) {
return;
}
- //TODO: Force backup when backups change.
try {
// determine the set of devices that we need to backup during this run.
Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
@@ -686,35 +734,15 @@
.add(deviceId);
}
});
-
// send the device flow entries to their respective backup nodes
- devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
- Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
- Maps.newConcurrentMap();
- flowEntries.forEach((key, value) -> {
- if (deviceIds.contains(key)) {
- deviceFlowEntries.put(key, value);
- }
- });
- clusterCommunicator.unicast(deviceFlowEntries,
- FLOW_TABLE_BACKUP,
- SERIALIZER::encode,
- nodeId);
- });
-
- // update state for use in subsequent run.
- devicesToBackupByNode.forEach((node, devices) -> {
- devices.forEach(id -> {
- lastBackupTimes.put(id, System.currentTimeMillis());
- lastBackupNodes.put(id, node);
- });
- });
+ devicesToBackupByNode.forEach(this::backupFlowEntries);
} catch (Exception e) {
log.error("Backup failed.", e);
}
}
private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
+ log.debug("Received flows for {} to backup", flowTables.keySet());
Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
// Only process those devices are that not managed by the local node.
Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5bc55af..862b3cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -25,8 +25,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -98,11 +101,13 @@
Pattern.compile("device:(.*)");
private ExecutorService messageHandlingExecutor;
+ private ScheduledExecutorService transferExecutor;
private final LeadershipEventListener leadershipEventListener =
new InternalDeviceMastershipEventListener();
private static final String NODE_ID_NULL = "Node ID cannot be null";
- private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
public static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -119,7 +124,11 @@
@Activate
public void activate() {
messageHandlingExecutor =
- Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+ Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/device/mastership", "message-handler"));
+ transferExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
SERIALIZER::decode,
deviceId -> getRole(localNodeId, deviceId),
@@ -127,7 +136,7 @@
messageHandlingExecutor);
clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode,
- deviceId -> relinquishRole(localNodeId, deviceId),
+ this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
@@ -147,6 +156,7 @@
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
+ transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
log.info("Stoppped.");
@@ -246,26 +256,36 @@
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- return transitionFromMasterToStandby(deviceId);
+ CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
+ // There is brief wait before we step down from mastership.
+ // This is to ensure any work that happens when standby preference
+ // order changes can complete. For example: flow entries need to be backed
+ // to the new top standby (ONOS-1883)
+ // FIXME: This potentially introduces a race-condition.
+ // Right now role changes are only forced via CLI.
+ transferExecutor.schedule(() -> {
+ result.complete(transitionFromMasterToStandby(deviceId));
+ }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+ return result;
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
}
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
@@ -278,13 +298,13 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (!nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
@@ -304,20 +324,25 @@
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
+ return clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
- nodeId), null);
+ nodeId);
}
+ return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
+ }
+
+ private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
// Check if this node is can be managed by this node.
if (!connectedDevices.contains(deviceId)) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index b1e3ac2..d6a857c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -130,7 +131,7 @@
}
@Override
- public MastershipEvent setMaster(NodeId newMaster, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
roleMap.lock(deviceId);
try {
@@ -147,7 +148,7 @@
log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
// trigger BACKUPS_CHANGED?
}
- return null;
+ return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
final NodeId currentMaster = rv.get(MASTER);
@@ -163,10 +164,11 @@
rv.reassign(newMaster, STANDBY, NONE);
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
- return null;
+ return CompletableFuture.completedFuture(null);
}
} finally {
roleMap.unlock(deviceId);
@@ -282,7 +284,7 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
// if nodeId was MASTER, rotate STANDBY
// if nodeId was STANDBY no-op
// if nodeId was NONE, add to STANDBY
@@ -298,30 +300,33 @@
updateTerm(deviceId);
if (newMaster != null) {
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// no master candidate
roleMap.put(deviceId, rv);
// TBD: Should there be new event type for no MASTER?
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
}
case STANDBY:
- return null;
+ return CompletableFuture.completedFuture(null);
case NONE:
rv.reassign(nodeId, NONE, STANDBY);
roleMap.put(deviceId, rv);
- return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return null;
+ return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
// relinquishRole is basically set to None
// If nodeId was master reelect next and remove nodeId
@@ -337,13 +342,14 @@
if (newMaster != null) {
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// No master candidate - no more backups, device is likely
// fully disconnected
roleMap.put(deviceId, rv);
// Should there be new event type?
- return null;
+ return CompletableFuture.completedFuture(null);
}
case STANDBY:
//fall through to reinforce relinquishment
@@ -351,13 +357,14 @@
boolean modified = rv.reassign(nodeId, STANDBY, NONE);
if (modified) {
roleMap.put(deviceId, rv);
- return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
}
- return null;
+ return CompletableFuture.completedFuture(null);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return null;
+ return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
@@ -374,10 +381,11 @@
if (roleValue.contains(MASTER, nodeId) ||
roleValue.contains(STANDBY, nodeId)) {
- MastershipEvent event = relinquishRole(nodeId, deviceId);
- if (event != null) {
- events.add(event);
- }
+ relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
+ if (event != null) {
+ events.add(event);
+ }
+ });
}
}
notifyDelegate(events);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
index 7d99259..56879e8 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -44,6 +44,7 @@
import org.onosproject.store.serializers.KryoSerializer;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -168,15 +169,15 @@
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
- assertNull("wrong event:", dms.setMaster(N1, DID1));
+ assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1)));
//switch over to N2
- assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
+ assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.setMaster(N2, DID1)).type());
System.out.println(dms.getTermFor(DID1).master() + ":" + dms.getTermFor(DID1).termNumber());
assertEquals("wrong term", MastershipTerm.of(N2, 2), dms.getTermFor(DID1));
//orphan switch - should be rare case
- assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
+ assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.setMaster(N2, DID2)).type());
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
//disconnect and reconnect - sign of failing re-election or single-instance channel
dms.roleMap.clear();
@@ -190,18 +191,18 @@
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
//no backup, no new MASTER/event
- assertNull("wrong event:", dms.relinquishRole(N1, DID1));
+ assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1)));
dms.requestRole(DID1);
//add backup CN2, get it elected MASTER by relinquishing
testStore.setCurrent(CN2);
assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
- assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
+ assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
//all nodes "give up" on device, which goes back to NONE.
- assertNull("wrong event:", dms.relinquishRole(N2, DID1));
+ assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N2, DID1)));
assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
assertEquals("wrong number of retired nodes", 2,
@@ -215,11 +216,11 @@
dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
//If STANDBY, should drop to NONE
- assertEquals("wrong event:", Type.BACKUPS_CHANGED, dms.relinquishRole(N1, DID1).type());
+ assertEquals("wrong event:", Type.BACKUPS_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
//NONE - nothing happens
- assertEquals("wrong event:", Type.BACKUPS_CHANGED, dms.relinquishRole(N1, DID2).type());
+ assertEquals("wrong event:", Type.BACKUPS_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID2)).type());
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
index 52e09dd..39bc893 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
@@ -143,13 +144,13 @@
}
@Override
- public synchronized MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ public synchronized CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
// no-op
- return null;
+ return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
NodeId prevMaster = masterMap.put(deviceId, nodeId);
@@ -162,8 +163,8 @@
return null;
}
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
}
@Override
@@ -285,7 +286,7 @@
}
@Override
- public synchronized MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public synchronized CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
@@ -294,22 +295,22 @@
// no master alternative
masterMap.remove(deviceId);
// TODO: Should there be new event type for no MASTER?
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
} else {
NodeId prevMaster = masterMap.put(deviceId, backup);
incrementTerm(deviceId);
addToBackup(deviceId, prevMaster);
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
}
case STANDBY:
case NONE:
boolean modified = addToBackup(deviceId, nodeId);
if (modified) {
- return new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
}
break;
@@ -335,20 +336,20 @@
}
@Override
- public synchronized MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public synchronized CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
NodeId backup = reelect(deviceId, nodeId);
masterMap.put(deviceId, backup);
incrementTerm(deviceId);
- return new MastershipEvent(MASTER_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
case STANDBY:
if (removeFromBackups(deviceId, nodeId)) {
- return new MastershipEvent(BACKUPS_CHANGED, deviceId,
- getNodes(deviceId));
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
}
break;
@@ -358,12 +359,12 @@
default:
log.warn("unknown Mastership Role {}", role);
}
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
public synchronized void relinquishAllRole(NodeId nodeId) {
- List<MastershipEvent> events = new ArrayList<>();
+ List<CompletableFuture<MastershipEvent>> eventFutures = new ArrayList<>();
Set<DeviceId> toRelinquish = new HashSet<>();
masterMap.entrySet().stream()
@@ -375,12 +376,13 @@
.forEach(entry -> toRelinquish.add(entry.getKey()));
toRelinquish.forEach(deviceId -> {
- MastershipEvent event = relinquishRole(nodeId, deviceId);
- if (event != null) {
- events.add(event);
- }
+ eventFutures.add(relinquishRole(nodeId, deviceId));
});
- notifyDelegate(events);
+ eventFutures.forEach(future -> {
+ future.whenComplete((event, error) -> {
+ notifyDelegate(event);
+ });
+ });
}
}
diff --git a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
index f312bbc..5b44515 100644
--- a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -29,6 +29,7 @@
import org.onosproject.net.DeviceId;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -92,15 +93,15 @@
@Test
public void setMaster() {
put(DID1, N1, false, false);
- assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID1).type());
+ assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID1)).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID1));
//set node that's already master - should be ignored
- assertNull("wrong event", sms.setMaster(N1, DID1));
+ assertNull("wrong event", Futures.getUnchecked(sms.setMaster(N1, DID1)));
//set STANDBY to MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
- assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID2).type());
+ assertEquals("wrong event", MASTER_CHANGED, Futures.getUnchecked(sms.setMaster(N1, DID2)).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID2));
}
@@ -156,7 +157,7 @@
//no backup, MASTER
put(DID1, N1, true, false);
- assertNull("expect no MASTER event", sms.setStandby(N1, DID1).roleInfo().master());
+ assertNull("expect no MASTER event", Futures.getUnchecked(sms.setStandby(N1, DID1)).roleInfo().master());
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
@@ -164,7 +165,7 @@
put(DID1, N1, true, true);
put(DID1, N2, false, true);
put(DID2, N2, true, true);
- MastershipEvent event = sms.setStandby(N1, DID1);
+ MastershipEvent event = Futures.getUnchecked(sms.setStandby(N1, DID1));
assertEquals("wrong event", MASTER_CHANGED, event.type());
assertEquals("wrong master", N2, event.roleInfo().master());
}