ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy
Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5bc55af..862b3cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -25,8 +25,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -98,11 +101,13 @@
Pattern.compile("device:(.*)");
private ExecutorService messageHandlingExecutor;
+ private ScheduledExecutorService transferExecutor;
private final LeadershipEventListener leadershipEventListener =
new InternalDeviceMastershipEventListener();
private static final String NODE_ID_NULL = "Node ID cannot be null";
- private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
public static final StoreSerializer SERIALIZER = new KryoSerializer() {
@Override
@@ -119,7 +124,11 @@
@Activate
public void activate() {
messageHandlingExecutor =
- Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+ Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/device/mastership", "message-handler"));
+ transferExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
SERIALIZER::decode,
deviceId -> getRole(localNodeId, deviceId),
@@ -127,7 +136,7 @@
messageHandlingExecutor);
clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
SERIALIZER::decode,
- deviceId -> relinquishRole(localNodeId, deviceId),
+ this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
@@ -147,6 +156,7 @@
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
+ transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
log.info("Stoppped.");
@@ -246,26 +256,36 @@
}
@Override
- public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- return transitionFromMasterToStandby(deviceId);
+ CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
+ // There is brief wait before we step down from mastership.
+ // This is to ensure any work that happens when standby preference
+ // order changes can complete. For example: flow entries need to be backed
+ // to the new top standby (ONOS-1883)
+ // FIXME: This potentially introduces a race-condition.
+ // Right now role changes are only forced via CLI.
+ transferExecutor.schedule(() -> {
+ result.complete(transitionFromMasterToStandby(deviceId));
+ }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+ return result;
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
}
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
@@ -278,13 +298,13 @@
}
@Override
- public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (!nodeId.equals(currentMaster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
@@ -304,20 +324,25 @@
}
@Override
- public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+ public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
+ return clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
- nodeId), null);
+ nodeId);
}
+ return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
+ }
+
+ private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
// Check if this node is can be managed by this node.
if (!connectedDevices.contains(deviceId)) {