Avoids delete of dataplane state during mastership change

Leftover in the flow stats creates duplicate flow stats entry.
These entries were considered as flows not in the store and thus removed

Additionally adds further guards during the processing of the stats and
updates unit tests

Change-Id: Iba07996e1413c54374b7a4ce7efd21109b429eeb
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index f3206c3..ff89a6f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -40,10 +40,12 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
 import org.onosproject.net.flow.StoredFlowEntry;
+import org.onosproject.net.flow.FlowRuleStoreException;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -85,6 +87,7 @@
 
     private final DeviceId deviceId;
     private final ClusterCommunicationService clusterCommunicator;
+    private final DeviceService deviceService;
     private final LifecycleManager lifecycleManager;
     private final ScheduledExecutorService scheduler;
     private final Executor executor;
@@ -117,6 +120,7 @@
         ClusterService clusterService,
         ClusterCommunicationService clusterCommunicator,
         LifecycleManager lifecycleManager,
+        DeviceService deviceService,
         ScheduledExecutorService scheduler,
         Executor executor,
         long backupPeriod,
@@ -124,6 +128,7 @@
         this.deviceId = deviceId;
         this.clusterCommunicator = clusterCommunicator;
         this.lifecycleManager = lifecycleManager;
+        this.deviceService = deviceService;
         this.scheduler = scheduler;
         this.executor = executor;
         this.localNodeId = clusterService.getLocalNode().id();
@@ -247,6 +252,8 @@
                 SERIALIZER::decode,
                 replicaInfo.master(),
                 Duration.ofSeconds(GET_FLOW_ENTRIES_TIMEOUT));
+        } else if (deviceService.isAvailable(deviceId)) {
+            throw new FlowRuleStoreException("There is no master for available device " + deviceId);
         } else {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index b4e80a8..7949dc5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -697,6 +697,7 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(id),
+                deviceService,
                 backupScheduler,
                 new OrderedExecutor(backupExecutor),
                 backupPeriod,
@@ -734,6 +735,7 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(deviceId),
+                deviceService,
                 backupScheduler,
                 new OrderedExecutor(backupExecutor),
                 backupPeriod,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 23c1c9f..87355d6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -145,6 +145,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DriverService driverService;
 
+    private NodeId local;
+
     private ScheduledExecutorService executor;
     private Consumer<Status> statusChangeListener;
     // Per device group table with (device id + app cookie) as key
@@ -250,6 +252,8 @@
         groupTopic = getOrCreateGroupTopic(serializer);
         groupTopic.subscribe(this::processGroupMessage);
 
+        local = clusterService.getLocalNode().id();
+
         log.info("Started");
     }
 
@@ -1390,6 +1394,7 @@
                 Sets.newHashSet(getStoredGroups(deviceId));
         Set<Group> extraneousStoredEntries =
                 Sets.newHashSet(getExtraneousGroups(deviceId));
+        NodeId master;
 
         if (log.isTraceEnabled()) {
             log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
@@ -1409,7 +1414,14 @@
 
         garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
 
+        // update stats
         for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to update the group stats while the node was not the master");
+                return;
+            }
             Group group = it2.next();
             if (storedGroupEntries.remove(group)) {
                 // we both have the group, let's update some info then.
@@ -1420,6 +1432,8 @@
                 it2.remove();
             }
         }
+
+        // extraneous groups in the dataplane
         for (Group group : southboundGroupEntries) {
             if (getGroup(group.deviceId(), group.id()) != null) {
                 // There is a group existing with the same id
@@ -1432,6 +1446,12 @@
                                      + "not present in key based table");
                 }
             } else {
+                // Mastership change can occur during this iteration
+                master = mastershipService.getMasterFor(deviceId);
+                if (!Objects.equals(local, master)) {
+                    log.warn("Tried to process extraneous groups while the node was not the master");
+                    return;
+                }
                 // there are groups in the switch that aren't in the store
                 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
                           group.id(), deviceId);
@@ -1443,13 +1463,29 @@
                 }
             }
         }
+
+        // missing groups in the dataplane
         for (StoredGroupEntry group : storedGroupEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process missing groups while the node was not the master");
+                return;
+            }
             // there are groups in the store that aren't in the switch
             log.debug("Group AUDIT: group {} missing in data plane for device {}",
                       group.id(), deviceId);
             groupMissing(group);
         }
+
+        // extraneous groups in the store
         for (Group group : extraneousStoredEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process node extraneous groups while the node was not the master");
+                return;
+            }
             // there are groups in the extraneous store that
             // aren't in the switch
             log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
@@ -1481,8 +1517,15 @@
             return;
         }
 
+        NodeId master;
         Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
         while (it.hasNext()) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to run garbage collector while the node was not the master");
+                return;
+            }
             StoredGroupEntry group = it.next();
             if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
                 log.debug("Garbage collecting group {} on {}", group, deviceId);