ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy

Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5bc55af..862b3cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -25,8 +25,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -98,11 +101,13 @@
             Pattern.compile("device:(.*)");
 
     private ExecutorService messageHandlingExecutor;
+    private ScheduledExecutorService transferExecutor;
     private final LeadershipEventListener leadershipEventListener =
             new InternalDeviceMastershipEventListener();
 
     private static final String NODE_ID_NULL = "Node ID cannot be null";
-    private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+    private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+    private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
 
     public static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -119,7 +124,11 @@
     @Activate
     public void activate() {
         messageHandlingExecutor =
-                Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+                Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/store/device/mastership", "message-handler"));
+        transferExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
         clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
                 SERIALIZER::decode,
                 deviceId -> getRole(localNodeId, deviceId),
@@ -127,7 +136,7 @@
                 messageHandlingExecutor);
         clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
                 SERIALIZER::decode,
-                deviceId -> relinquishRole(localNodeId, deviceId),
+                this::relinquishLocalRole,
                 SERIALIZER::encode,
                 messageHandlingExecutor);
         clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
@@ -147,6 +156,7 @@
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
         clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
         messageHandlingExecutor.shutdown();
+        transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
 
         log.info("Stoppped.");
@@ -246,26 +256,36 @@
     }
 
     @Override
-    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         NodeId currentMaster = getMaster(deviceId);
         if (nodeId.equals(currentMaster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         } else {
             String leadershipTopic = createDeviceMastershipTopic(deviceId);
             List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
             if (candidates.isEmpty()) {
-                return null;
+                return  CompletableFuture.completedFuture(null);
             }
             if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
-                return transitionFromMasterToStandby(deviceId);
+                CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
+                // There is brief wait before we step down from mastership.
+                // This is to ensure any work that happens when standby preference
+                // order changes can complete. For example: flow entries need to be backed
+                // to the new top standby (ONOS-1883)
+                // FIXME: This potentially introduces a race-condition.
+                // Right now role changes are only forced via CLI.
+                transferExecutor.schedule(() -> {
+                    result.complete(transitionFromMasterToStandby(deviceId));
+                }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+                return result;
             } else {
                 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
             }
         }
-        return null;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -278,13 +298,13 @@
     }
 
     @Override
-    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         NodeId currentMaster = getMaster(deviceId);
         if (!nodeId.equals(currentMaster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
@@ -304,20 +324,25 @@
     }
 
     @Override
-    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         if (!nodeId.equals(localNodeId)) {
             log.debug("Forwarding request to relinquish "
                     + "role for device {} to {}", deviceId, nodeId);
-            return futureGetOrElse(clusterCommunicator.sendAndReceive(
+            return clusterCommunicator.sendAndReceive(
                     deviceId,
                     ROLE_RELINQUISH_SUBJECT,
                     SERIALIZER::encode,
                     SERIALIZER::decode,
-                    nodeId), null);
+                    nodeId);
         }
+        return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
+    }
+
+    private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         // Check if this node is can be managed by this node.
         if (!connectedDevices.contains(deviceId)) {