Avoids delete of dataplane state during mastership change

Leftover in the flow stats creates duplicate flow stats entry.
These entries were considered as flows not in the store and thus removed

Additionally adds further guards during the processing of the stats and
updates unit tests

Change-Id: Iba07996e1413c54374b7a4ce7efd21109b429eeb
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 23c1c9f..87355d6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -145,6 +145,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DriverService driverService;
 
+    private NodeId local;
+
     private ScheduledExecutorService executor;
     private Consumer<Status> statusChangeListener;
     // Per device group table with (device id + app cookie) as key
@@ -250,6 +252,8 @@
         groupTopic = getOrCreateGroupTopic(serializer);
         groupTopic.subscribe(this::processGroupMessage);
 
+        local = clusterService.getLocalNode().id();
+
         log.info("Started");
     }
 
@@ -1390,6 +1394,7 @@
                 Sets.newHashSet(getStoredGroups(deviceId));
         Set<Group> extraneousStoredEntries =
                 Sets.newHashSet(getExtraneousGroups(deviceId));
+        NodeId master;
 
         if (log.isTraceEnabled()) {
             log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
@@ -1409,7 +1414,14 @@
 
         garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
 
+        // update stats
         for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to update the group stats while the node was not the master");
+                return;
+            }
             Group group = it2.next();
             if (storedGroupEntries.remove(group)) {
                 // we both have the group, let's update some info then.
@@ -1420,6 +1432,8 @@
                 it2.remove();
             }
         }
+
+        // extraneous groups in the dataplane
         for (Group group : southboundGroupEntries) {
             if (getGroup(group.deviceId(), group.id()) != null) {
                 // There is a group existing with the same id
@@ -1432,6 +1446,12 @@
                                      + "not present in key based table");
                 }
             } else {
+                // Mastership change can occur during this iteration
+                master = mastershipService.getMasterFor(deviceId);
+                if (!Objects.equals(local, master)) {
+                    log.warn("Tried to process extraneous groups while the node was not the master");
+                    return;
+                }
                 // there are groups in the switch that aren't in the store
                 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
                           group.id(), deviceId);
@@ -1443,13 +1463,29 @@
                 }
             }
         }
+
+        // missing groups in the dataplane
         for (StoredGroupEntry group : storedGroupEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process missing groups while the node was not the master");
+                return;
+            }
             // there are groups in the store that aren't in the switch
             log.debug("Group AUDIT: group {} missing in data plane for device {}",
                       group.id(), deviceId);
             groupMissing(group);
         }
+
+        // extraneous groups in the store
         for (Group group : extraneousStoredEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process node extraneous groups while the node was not the master");
+                return;
+            }
             // there are groups in the extraneous store that
             // aren't in the switch
             log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
@@ -1481,8 +1517,15 @@
             return;
         }
 
+        NodeId master;
         Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
         while (it.hasNext()) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to run garbage collector while the node was not the master");
+                return;
+            }
             StoredGroupEntry group = it.next();
             if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
                 log.debug("Garbage collecting group {} on {}", group, deviceId);