ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy

Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 5bc55af..862b3cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -25,8 +25,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -98,11 +101,13 @@
             Pattern.compile("device:(.*)");
 
     private ExecutorService messageHandlingExecutor;
+    private ScheduledExecutorService transferExecutor;
     private final LeadershipEventListener leadershipEventListener =
             new InternalDeviceMastershipEventListener();
 
     private static final String NODE_ID_NULL = "Node ID cannot be null";
-    private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+    private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+    private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
 
     public static final StoreSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -119,7 +124,11 @@
     @Activate
     public void activate() {
         messageHandlingExecutor =
-                Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+                Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/store/device/mastership", "message-handler"));
+        transferExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        groupedThreads("onos/store/device/mastership", "mastership-transfer-executor"));
         clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
                 SERIALIZER::decode,
                 deviceId -> getRole(localNodeId, deviceId),
@@ -127,7 +136,7 @@
                 messageHandlingExecutor);
         clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
                 SERIALIZER::decode,
-                deviceId -> relinquishRole(localNodeId, deviceId),
+                this::relinquishLocalRole,
                 SERIALIZER::encode,
                 messageHandlingExecutor);
         clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
@@ -147,6 +156,7 @@
         clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
         clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
         messageHandlingExecutor.shutdown();
+        transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
 
         log.info("Stoppped.");
@@ -246,26 +256,36 @@
     }
 
     @Override
-    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setMaster(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         NodeId currentMaster = getMaster(deviceId);
         if (nodeId.equals(currentMaster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         } else {
             String leadershipTopic = createDeviceMastershipTopic(deviceId);
             List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
             if (candidates.isEmpty()) {
-                return null;
+                return  CompletableFuture.completedFuture(null);
             }
             if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
-                return transitionFromMasterToStandby(deviceId);
+                CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
+                // There is brief wait before we step down from mastership.
+                // This is to ensure any work that happens when standby preference
+                // order changes can complete. For example: flow entries need to be backed
+                // to the new top standby (ONOS-1883)
+                // FIXME: This potentially introduces a race-condition.
+                // Right now role changes are only forced via CLI.
+                transferExecutor.schedule(() -> {
+                    result.complete(transitionFromMasterToStandby(deviceId));
+                }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+                return result;
             } else {
                 log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
             }
         }
-        return null;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -278,13 +298,13 @@
     }
 
     @Override
-    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         NodeId currentMaster = getMaster(deviceId);
         if (!nodeId.equals(currentMaster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
@@ -304,20 +324,25 @@
     }
 
     @Override
-    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
         checkArgument(nodeId != null, NODE_ID_NULL);
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         if (!nodeId.equals(localNodeId)) {
             log.debug("Forwarding request to relinquish "
                     + "role for device {} to {}", deviceId, nodeId);
-            return futureGetOrElse(clusterCommunicator.sendAndReceive(
+            return clusterCommunicator.sendAndReceive(
                     deviceId,
                     ROLE_RELINQUISH_SUBJECT,
                     SERIALIZER::encode,
                     SERIALIZER::decode,
-                    nodeId), null);
+                    nodeId);
         }
+        return CompletableFuture.completedFuture(relinquishLocalRole(deviceId));
+    }
+
+    private MastershipEvent relinquishLocalRole(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         // Check if this node is can be managed by this node.
         if (!connectedDevices.contains(deviceId)) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index b1e3ac2..d6a857c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -130,7 +131,7 @@
     }
 
     @Override
-    public MastershipEvent setMaster(NodeId newMaster, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
 
         roleMap.lock(deviceId);
         try {
@@ -147,7 +148,7 @@
                         log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
                         // trigger BACKUPS_CHANGED?
                     }
-                    return null;
+                    return CompletableFuture.completedFuture(null);
                 case STANDBY:
                 case NONE:
                     final NodeId currentMaster = rv.get(MASTER);
@@ -163,10 +164,11 @@
                     rv.reassign(newMaster, STANDBY, NONE);
                     updateTerm(deviceId);
                     roleMap.put(deviceId, rv);
-                    return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
                 default:
                     log.warn("unknown Mastership Role {}", currentRole);
-                    return null;
+                    return CompletableFuture.completedFuture(null);
             }
         } finally {
             roleMap.unlock(deviceId);
@@ -282,7 +284,7 @@
     }
 
     @Override
-    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
         // if nodeId was MASTER, rotate STANDBY
         // if nodeId was STANDBY no-op
         // if nodeId was NONE, add to STANDBY
@@ -298,30 +300,33 @@
                     updateTerm(deviceId);
                     if (newMaster != null) {
                         roleMap.put(deviceId, rv);
-                        return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                        return CompletableFuture.completedFuture(
+                                new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
                     } else {
                         // no master candidate
                         roleMap.put(deviceId, rv);
                         // TBD: Should there be new event type for no MASTER?
-                        return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                        return CompletableFuture.completedFuture(
+                                new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
                     }
                 case STANDBY:
-                    return null;
+                    return CompletableFuture.completedFuture(null);
                 case NONE:
                     rv.reassign(nodeId, NONE, STANDBY);
                     roleMap.put(deviceId, rv);
-                    return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+                    return CompletableFuture.completedFuture(
+                            new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
                 default:
                     log.warn("unknown Mastership Role {}", currentRole);
             }
-            return null;
+            return CompletableFuture.completedFuture(null);
         } finally {
             roleMap.unlock(deviceId);
         }
     }
 
     @Override
-    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+    public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
         // relinquishRole is basically set to None
 
         // If nodeId was master reelect next and remove nodeId
@@ -337,13 +342,14 @@
                     if (newMaster != null) {
                         updateTerm(deviceId);
                         roleMap.put(deviceId, rv);
-                        return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
+                        return CompletableFuture.completedFuture(
+                                new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
                     } else {
                         // No master candidate - no more backups, device is likely
                         // fully disconnected
                         roleMap.put(deviceId, rv);
                         // Should there be new event type?
-                        return null;
+                        return CompletableFuture.completedFuture(null);
                     }
                 case STANDBY:
                     //fall through to reinforce relinquishment
@@ -351,13 +357,14 @@
                     boolean modified = rv.reassign(nodeId, STANDBY, NONE);
                     if (modified) {
                         roleMap.put(deviceId, rv);
-                        return new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo());
+                        return CompletableFuture.completedFuture(
+                                new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
                     }
-                    return null;
+                    return CompletableFuture.completedFuture(null);
                 default:
                 log.warn("unknown Mastership Role {}", currentRole);
             }
-            return null;
+            return CompletableFuture.completedFuture(null);
         } finally {
             roleMap.unlock(deviceId);
         }
@@ -374,10 +381,11 @@
             if (roleValue.contains(MASTER, nodeId) ||
                 roleValue.contains(STANDBY, nodeId)) {
 
-                MastershipEvent event = relinquishRole(nodeId, deviceId);
-                if (event != null) {
-                    events.add(event);
-                }
+                relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
+                    if (event != null) {
+                        events.add(event);
+                    }
+                });
             }
         }
         notifyDelegate(events);