ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy
Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5bc55af..862b3cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -25,8 +25,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -98,11 +101,13 @@
Pattern.compile("device:(.*)");
private ExecutorService messageHandlingExecutor;
+ private ScheduledExecutorService transferExecutor;
private final LeadershipEventListener leadershipEventListener =
new InternalDeviceMastershipEventListener();
private static final String NODE_ID_NULL = "Node ID cannot be null";
- private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
public static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -119,7 +124,11 @@
@Activate
public void activate() {
messageHandlingExecutor =
- Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+ Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/device/mastership", "message-handler"));
+ transferExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
SERIALIZER::decode,
deviceId -> getRole(localNodeId, deviceId),
@@ -127,7 +136,7 @@
messageHandlingExecutor);
clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode,
- deviceId -> relinquishRole(localNodeId, deviceId),
+ this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
@@ -147,6 +156,7 @@
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
+ transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
log.info("Stoppped.");
@@ -246,26 +256,36 @@
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- return transitionFromMasterToStandby(deviceId);
+ CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
+ // There is brief wait before we step down from mastership.
+ // This is to ensure any work that happens when standby preference
+ // order changes can complete. For example: flow entries need to be backed
+ // to the new top standby (ONOS-1883)
+ // FIXME: This potentially introduces a race-condition.
+ // Right now role changes are only forced via CLI.
+ transferExecutor.schedule(() -> {
+ result.complete(transitionFromMasterToStandby(deviceId));
+ }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+ return result;
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
}
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
@@ -278,13 +298,13 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (!nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
@@ -304,20 +324,25 @@
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
+ return clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
- nodeId), null);
+ nodeId);
}
+ return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
+ }
+
+ private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
// Check if this node is can be managed by this node.
if (!connectedDevices.contains(deviceId)) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index b1e3ac2..d6a857c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -130,7 +131,7 @@
}
@Override
- public MastershipEvent setMaster(NodeId newMaster, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
roleMap.lock(deviceId);
try {
@@ -147,7 +148,7 @@
log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
// trigger BACKUPS_CHANGED?
}
- return null;
+ return CompletableFuture.completedFuture(null);
case STANDBY:
case NONE:
final NodeId currentMaster = rv.get(MASTER);
@@ -163,10 +164,11 @@
rv.reassign(newMaster, STANDBY, NONE);
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
- return null;
+ return CompletableFuture.completedFuture(null);
}
} finally {
roleMap.unlock(deviceId);
@@ -282,7 +284,7 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
// if nodeId was MASTER, rotate STANDBY
// if nodeId was STANDBY no-op
// if nodeId was NONE, add to STANDBY
@@ -298,30 +300,33 @@
updateTerm(deviceId);
if (newMaster != null) {
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// no master candidate
roleMap.put(deviceId, rv);
// TBD: Should there be new event type for no MASTER?
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
}
case STANDBY:
- return null;
+ return CompletableFuture.completedFuture(null);
case NONE:
rv.reassign(nodeId, NONE, STANDBY);
roleMap.put(deviceId, rv);
- return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return null;
+ return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
// relinquishRole is basically set to None
// If nodeId was master reelect next and remove nodeId
@@ -337,13 +342,14 @@
if (newMaster != null) {
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
} else {
// No master candidate - no more backups, device is likely
// fully disconnected
roleMap.put(deviceId, rv);
// Should there be new event type?
- return null;
+ return CompletableFuture.completedFuture(null);
}
case STANDBY:
//fall through to reinforce relinquishment
@@ -351,13 +357,14 @@
boolean modified = rv.reassign(nodeId, STANDBY, NONE);
if (modified) {
roleMap.put(deviceId, rv);
- return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+ return CompletableFuture.completedFuture(
+ new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
}
- return null;
+ return CompletableFuture.completedFuture(null);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return null;
+ return CompletableFuture.completedFuture(null);
} finally {
roleMap.unlock(deviceId);
}
@@ -374,10 +381,11 @@
if (roleValue.contains(MASTER, nodeId) ||
roleValue.contains(STANDBY, nodeId)) {
- MastershipEvent event = relinquishRole(nodeId, deviceId);
- if (event != null) {
- events.add(event);
- }
+ relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
+ if (event != null) {
+ events.add(event);
+ }
+ });
}
}
notifyDelegate(events);