ONOS-1883: Fix for lost flow rules on CLI directed mastership changes.
- Made all mastership role change operations asynchronous, which they are.
- In flowrule store we now check to see if any new backups need to be made when a device backup location (standby) changes
- In device mastership store we now wait briefly before we step down from mastership after promoting a new candidate as highest standy

Change-Id: Icb76cf4d0d23403053a3fd5a458a940b847da49f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index e042982..8bfab64 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -62,6 +62,8 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.flow.ReplicaInfo;
+import org.onosproject.store.flow.ReplicaInfoEvent;
+import org.onosproject.store.flow.ReplicaInfoEventListener;
 import org.onosproject.store.flow.ReplicaInfoService;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.onosproject.store.serializers.StoreSerializer;
@@ -181,6 +183,7 @@
         registerMessageHandlers(messageHandlingExecutor);
 
         if (backupEnabled) {
+            replicaInfoManager.addListener(flowTable);
             backupTask = backupSenderExecutor.scheduleWithFixedDelay(
                     flowTable::backup,
                     0,
@@ -193,6 +196,10 @@
 
     @Deactivate
     public void deactivate(ComponentContext context) {
+        if (backupEnabled) {
+            replicaInfoManager.removeListener(flowTable);
+            backupTask.cancel(true);
+        }
         configService.unregisterProperties(getClass(), false);
         unregisterMessageHandlers();
         messageHandlingExecutor.shutdownNow();
@@ -232,9 +239,14 @@
         boolean restartBackupTask = false;
         if (newBackupEnabled != backupEnabled) {
             backupEnabled = newBackupEnabled;
-            if (!backupEnabled && backupTask != null) {
-                backupTask.cancel(false);
-                backupTask = null;
+            if (!backupEnabled) {
+                replicaInfoManager.removeListener(flowTable);
+                if (backupTask != null) {
+                    backupTask.cancel(false);
+                    backupTask = null;
+                }
+            } else {
+                replicaInfoManager.addListener(flowTable);
             }
             restartBackupTask = backupEnabled;
         }
@@ -590,7 +602,7 @@
         }
     }
 
-    private class InternalFlowTable {
+    private class InternalFlowTable implements ReplicaInfoEventListener {
 
         private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
                 flowEntries = new ConcurrentHashMap<>();
@@ -603,6 +615,43 @@
             return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
         }
 
+        @Override
+        public void event(ReplicaInfoEvent event) {
+            if (event.type() == ReplicaInfoEvent.Type.BACKUPS_CHANGED) {
+                DeviceId deviceId = event.subject();
+                if (!Objects.equal(local, replicaInfoManager.getReplicaInfoFor(deviceId).master())) {
+                 // ignore since this event is for a device this node does not manage.
+                    return;
+                }
+                NodeId latestBackupNode = getBackupNode(deviceId);
+                NodeId existingBackupNode = lastBackupNodes.get(deviceId);
+                if (Objects.equal(latestBackupNode, existingBackupNode)) {
+                    // ignore since backup location hasn't changed.
+                    return;
+                }
+                backupFlowEntries(latestBackupNode, Sets.newHashSet(deviceId));
+            }
+        }
+
+        private void backupFlowEntries(NodeId nodeId, Set<DeviceId> deviceIds) {
+            log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId);
+            Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
+                    Maps.newConcurrentMap();
+            flowEntries.forEach((key, value) -> {
+                if (deviceIds.contains(key)) {
+                    deviceFlowEntries.put(key, value);
+                }
+            });
+            clusterCommunicator.unicast(deviceFlowEntries,
+                    FLOW_TABLE_BACKUP,
+                    SERIALIZER::encode,
+                    nodeId);
+            deviceIds.forEach(id -> {
+                lastBackupTimes.put(id, System.currentTimeMillis());
+                lastBackupNodes.put(id, nodeId);
+            });
+        }
+
         /**
          * Returns the flow table for specified device.
          *
@@ -662,7 +711,6 @@
             if (!backupEnabled) {
                 return;
             }
-            //TODO: Force backup when backups change.
             try {
                 // determine the set of devices that we need to backup during this run.
                 Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
@@ -686,35 +734,15 @@
                                              .add(deviceId);
                     }
                 });
-
                 // send the device flow entries to their respective backup nodes
-                devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
-                    Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
-                            Maps.newConcurrentMap();
-                    flowEntries.forEach((key, value) -> {
-                        if (deviceIds.contains(key)) {
-                            deviceFlowEntries.put(key, value);
-                        }
-                    });
-                    clusterCommunicator.unicast(deviceFlowEntries,
-                            FLOW_TABLE_BACKUP,
-                            SERIALIZER::encode,
-                            nodeId);
-                });
-
-                // update state for use in subsequent run.
-                devicesToBackupByNode.forEach((node, devices) -> {
-                    devices.forEach(id -> {
-                        lastBackupTimes.put(id, System.currentTimeMillis());
-                        lastBackupNodes.put(id, node);
-                    });
-                });
+                devicesToBackupByNode.forEach(this::backupFlowEntries);
             } catch (Exception e) {
                 log.error("Backup failed.", e);
             }
         }
 
         private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
+            log.debug("Received flows for {} to backup", flowTables.keySet());
             Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
             // Only process those devices are that not managed by the local node.
             Maps.filterKeys(flowTables, deviceId -> !managedDevices.contains(deviceId))