Avoids delete of dataplane state during mastership change

Leftover in the flow stats creates duplicate flow stats entry.
These entries were considered as flows not in the store and thus removed

Additionally adds further guards during the processing of the stats and
updates unit tests

Change-Id: Iba07996e1413c54374b7a4ce7efd21109b429eeb
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index d8cdcbd..a5d535e 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -31,6 +31,8 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
@@ -72,6 +74,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -133,6 +136,8 @@
 
     private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
 
+    private NodeId local;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected FlowRuleStore store;
 
@@ -151,6 +156,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
     @Activate
     public void activate(ComponentContext context) {
         store.setDelegate(delegate);
@@ -159,6 +167,7 @@
         cfgService.registerProperties(getClass());
         modified(context);
         idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+        local = clusterService.getLocalNode().id();
         log.info("Started");
     }
 
@@ -464,7 +473,7 @@
             log.debug("Flow {} is on switch but not in store.", flowRule);
         }
 
-        private void flowAdded(FlowEntry flowEntry) {
+        private boolean flowAdded(FlowEntry flowEntry) {
             checkNotNull(flowEntry, FLOW_RULE_NULL);
             checkValidity();
 
@@ -472,6 +481,7 @@
                 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
                 if (event == null) {
                     log.debug("No flow store event generated.");
+                    return false;
                 } else {
                     log.trace("Flow {} {}", flowEntry, event.type());
                     post(event);
@@ -480,6 +490,7 @@
                 log.debug("Removing flow rules....");
                 removeFlowRules(flowEntry);
             }
+            return true;
         }
 
         private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
@@ -537,15 +548,32 @@
                                              boolean useMissingFlow) {
             Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
             store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
+            NodeId master;
+            boolean done;
 
+            // Processing flow rules
             for (FlowEntry rule : flowEntries) {
                 try {
                     FlowEntry storedRule = storedRules.remove(rule);
                     if (storedRule != null) {
                         if (storedRule.exactMatch(rule)) {
                             // we both have the rule, let's update some info then.
-                            flowAdded(rule);
+                            done = flowAdded(rule);
+                            if (!done) {
+                                // Mastership change can occur during this iteration
+                                master = mastershipService.getMasterFor(deviceId);
+                                if (!Objects.equals(local, master)) {
+                                    log.warn("Tried to update the flow stats while the node was not the master");
+                                    return;
+                                }
+                            }
                         } else {
+                            // Mastership change can occur during this iteration
+                            master = mastershipService.getMasterFor(deviceId);
+                            if (!Objects.equals(local, master)) {
+                                log.warn("Tried to update the flows while the node was not the master");
+                                return;
+                            }
                             // the two rules are not an exact match - remove the
                             // switch's rule and install our rule
                             extraneousFlow(rule);
@@ -554,6 +582,12 @@
                     } else {
                         // the device has a rule the store does not have
                         if (!allowExtraneousRules) {
+                            // Mastership change can occur during this iteration
+                            master = mastershipService.getMasterFor(deviceId);
+                            if (!Objects.equals(local, master)) {
+                                log.warn("Tried to remove flows while the node was not the master");
+                                return;
+                            }
                             extraneousFlow(rule);
                         }
                     }
@@ -566,6 +600,12 @@
             // DO NOT reinstall
             if (useMissingFlow) {
                 for (FlowEntry rule : storedRules.keySet()) {
+                    // Mastership change can occur during this iteration
+                    master = mastershipService.getMasterFor(deviceId);
+                    if (!Objects.equals(local, master)) {
+                        log.warn("Tried to install missing rules while the node was not the master");
+                        return;
+                    }
                     try {
                         // there are rules in the store that aren't on the switch
                         log.debug("Adding the rule that is present in store but not on switch : {}", rule);
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 3697641..7622409 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -25,7 +25,11 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestTools;
+import org.onlab.packet.Ip4Address;
 import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.common.event.impl.TestEventDispatcher;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
@@ -84,6 +88,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.easymock.EasyMock.*;
 import static org.junit.Assert.*;
 import static org.onosproject.net.NetTestTools.injectEventDispatcher;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
@@ -119,6 +124,7 @@
     private ApplicationId appId;
 
     private TestDriverManager driverService;
+    private static final String NODE_ID = "1";
 
     @Before
     public void setUp() {
@@ -131,6 +137,15 @@
         mgr.operationsService = MoreExecutors.newDirectExecutorService();
         mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
         mgr.cfgService = new ComponentConfigAdapter();
+
+        ClusterService mockClusterService = createMock(ClusterService.class);
+        NodeId nodeId = new NodeId(NODE_ID);
+        MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
+        expect(mockClusterService.getLocalNode())
+                .andReturn(mockControllerNode).anyTimes();
+        replay(mockClusterService);
+        mgr.clusterService = mockClusterService;
+
         service = mgr;
         registry = mgr;
 
@@ -706,6 +721,11 @@
         public MastershipRole getLocalRole(DeviceId deviceId) {
             return MastershipRole.MASTER;
         }
+
+        @Override
+        public NodeId getMasterFor(DeviceId deviceId) {
+            return new NodeId(NODE_ID);
+        }
     }
 
     private class TestDriverManager extends DriverManager {
@@ -739,4 +759,27 @@
             return rules;
         }
     }
+
+    private static class MockControllerNode implements ControllerNode {
+        final NodeId id;
+
+        public MockControllerNode(NodeId id) {
+            this.id = id;
+        }
+
+        @Override
+        public NodeId id() {
+            return this.id;
+        }
+
+        @Override
+        public Ip4Address ip() {
+            return Ip4Address.valueOf("127.0.0.1");
+        }
+
+        @Override
+        public int tcpPort() {
+            return 0;
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 1627053..3987df0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -39,6 +39,7 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
@@ -83,6 +84,7 @@
 
     private final DeviceId deviceId;
     private final ClusterCommunicationService clusterCommunicator;
+    private final DeviceService deviceService;
     private final LifecycleManager lifecycleManager;
     private final ScheduledExecutorService scheduler;
     private final Executor executor;
@@ -115,6 +117,7 @@
         ClusterService clusterService,
         ClusterCommunicationService clusterCommunicator,
         LifecycleManager lifecycleManager,
+        DeviceService deviceService,
         ScheduledExecutorService scheduler,
         Executor executor,
         long backupPeriod,
@@ -122,6 +125,7 @@
         this.deviceId = deviceId;
         this.clusterCommunicator = clusterCommunicator;
         this.lifecycleManager = lifecycleManager;
+        this.deviceService = deviceService;
         this.scheduler = scheduler;
         this.executor = executor;
         this.localNodeId = clusterService.getLocalNode().id();
@@ -244,6 +248,8 @@
                 SERIALIZER::encode,
                 SERIALIZER::decode,
                 replicaInfo.master());
+        } else if (deviceService.isAvailable(deviceId)) {
+            throw new RuntimeException("There is no master for available device " + deviceId);
         } else {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 884b74b..cb2e344 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -688,6 +688,7 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(id),
+                deviceService,
                 backupScheduler,
                 new OrderedExecutor(backupExecutor),
                 backupPeriod,
@@ -725,6 +726,7 @@
                 clusterService,
                 clusterCommunicator,
                 new InternalLifecycleManager(deviceId),
+                deviceService,
                 backupScheduler,
                 new OrderedExecutor(backupExecutor),
                 backupPeriod,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 6e8b126..cff63d1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -137,6 +137,8 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DriverService driverService;
 
+    private NodeId local;
+
     private ScheduledExecutorService executor;
     private Consumer<Status> statusChangeListener;
     // Per device group table with (device id + app cookie) as key
@@ -245,6 +247,8 @@
         groupTopic = getOrCreateGroupTopic(serializer);
         groupTopic.subscribe(this::processGroupMessage);
 
+        local = clusterService.getLocalNode().id();
+
         log.info("Started");
     }
 
@@ -1385,6 +1389,7 @@
                 Sets.newHashSet(getStoredGroups(deviceId));
         Set<Group> extraneousStoredEntries =
                 Sets.newHashSet(getExtraneousGroups(deviceId));
+        NodeId master;
 
         if (log.isTraceEnabled()) {
             log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
@@ -1404,7 +1409,14 @@
 
         garbageCollect(deviceId, southboundGroupEntries, storedGroupEntries);
 
+        // update stats
         for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to update the group stats while the node was not the master");
+                return;
+            }
             Group group = it2.next();
             if (storedGroupEntries.remove(group)) {
                 // we both have the group, let's update some info then.
@@ -1415,6 +1427,8 @@
                 it2.remove();
             }
         }
+
+        // extraneous groups in the dataplane
         for (Group group : southboundGroupEntries) {
             if (getGroup(group.deviceId(), group.id()) != null) {
                 // There is a group existing with the same id
@@ -1427,6 +1441,12 @@
                                      + "not present in key based table");
                 }
             } else {
+                // Mastership change can occur during this iteration
+                master = mastershipService.getMasterFor(deviceId);
+                if (!Objects.equals(local, master)) {
+                    log.warn("Tried to process extraneous groups while the node was not the master");
+                    return;
+                }
                 // there are groups in the switch that aren't in the store
                 log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
                           group.id(), deviceId);
@@ -1438,13 +1458,29 @@
                 }
             }
         }
+
+        // missing groups in the dataplane
         for (StoredGroupEntry group : storedGroupEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process missing groups while the node was not the master");
+                return;
+            }
             // there are groups in the store that aren't in the switch
             log.debug("Group AUDIT: group {} missing in data plane for device {}",
                       group.id(), deviceId);
             groupMissing(group);
         }
+
+        // extraneous groups in the store
         for (Group group : extraneousStoredEntries) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to process node extraneous groups while the node was not the master");
+                return;
+            }
             // there are groups in the extraneous store that
             // aren't in the switch
             log.debug("Group AUDIT: clearing extraneous group {} from store for device {}",
@@ -1476,8 +1512,15 @@
             return;
         }
 
+        NodeId master;
         Iterator<StoredGroupEntry> it = storedGroupEntries.iterator();
         while (it.hasNext()) {
+            // Mastership change can occur during this iteration
+            master = mastershipService.getMasterFor(deviceId);
+            if (!Objects.equals(local, master)) {
+                log.warn("Tried to run garbage collector while the node was not the master");
+                return;
+            }
             StoredGroupEntry group = it.next();
             if (group.state() != GroupState.PENDING_DELETE && checkGroupRefCount(group)) {
                 log.debug("Garbage collecting group {} on {}", group, deviceId);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 0a44359..83b8abc 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -22,7 +22,10 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.junit.TestUtils;
+import org.onlab.packet.Ip4Address;
 import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.GroupId;
 import org.onosproject.mastership.MastershipServiceAdapter;
@@ -54,6 +57,7 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.easymock.EasyMock.*;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
@@ -125,7 +129,7 @@
 
         @Override
         public NodeId getMasterFor(DeviceId deviceId) {
-            return new NodeId("foo");
+            return new NodeId(NODE_ID);
         }
     }
 
@@ -141,6 +145,32 @@
         }
     }
 
+    private static class MockControllerNode implements ControllerNode {
+        final NodeId id;
+
+        public MockControllerNode(NodeId id) {
+            this.id = id;
+        }
+
+        @Override
+        public NodeId id() {
+            return this.id;
+        }
+
+        @Override
+        public Ip4Address ip() {
+            return Ip4Address.valueOf("127.0.0.1");
+        }
+
+        @Override
+        public int tcpPort() {
+            return 0;
+        }
+    }
+
+    private static final String NODE_ID = "foo";
+
+
     @Before
     public void setUp() throws Exception {
         groupStoreImpl = new DistributedGroupStore();
@@ -148,6 +178,15 @@
         groupStoreImpl.clusterCommunicator = new ClusterCommunicationServiceAdapter();
         groupStoreImpl.mastershipService = new MasterOfAll();
         groupStoreImpl.cfgService = new ComponentConfigAdapter();
+
+        ClusterService mockClusterService = createMock(ClusterService.class);
+        NodeId nodeId = new NodeId(NODE_ID);
+        MockControllerNode mockControllerNode = new MockControllerNode(nodeId);
+        expect(mockClusterService.getLocalNode())
+                .andReturn(mockControllerNode).anyTimes();
+        replay(mockClusterService);
+
+        groupStoreImpl.clusterService = mockClusterService;
         groupStoreImpl.activate(null);
         groupStore = groupStoreImpl;
         auditPendingReqQueue =
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 9113e5c..3280ab5 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -709,11 +709,22 @@
                 }
                 log.info("Transitioned switch {} to MASTER", dpid);
                 activeMasterSwitches.put(dpid, sw);
+                // purge pending stats
+                log.info("Purged pending stats {}", dpid);
+                purgeStatsSwitch(dpid);
             } finally {
                 switchLock.unlock();
             }
         }
 
+        private void purgeStatsSwitch(Dpid dpid) {
+            fullFlowStats.removeAll(dpid);
+            fullFlowLightweightStats.removeAll(dpid);
+            fullTableStats.removeAll(dpid);
+            fullGroupStats.removeAll(dpid);
+            fullGroupDescStats.removeAll(dpid);
+            fullQueueStats.removeAll(dpid);
+        }
 
         @Override
         public void transitionToEqualSwitch(Dpid dpid) {
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 07be6c9..19545b2 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -682,9 +682,9 @@
             NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
 
             if (adaptiveFlowSampling && afsc != null)  {
-                List<FlowEntry> flowEntries = replies.getEntries().stream()
+                Set<FlowEntry> flowEntries = replies.getEntries().stream()
                         .map(entry -> new FlowEntryBuilder(did, entry, handler).withSetAfsc(afsc).build())
-                        .collect(Collectors.toList());
+                        .collect(Collectors.toSet());
 
                 // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
                 if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -703,9 +703,9 @@
                     providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
                 }
             } else {
-                List<FlowEntry> flowEntries = replies.getEntries().stream()
+                Set<FlowEntry> flowEntries = replies.getEntries().stream()
                         .map(entry -> new FlowEntryBuilder(did, entry, handler).build())
-                        .collect(Collectors.toList());
+                        .collect(Collectors.toSet());
 
                 // call existing entire flow stats update with flowMissing synchronization
                 providerService.pushFlowMetrics(did, flowEntries);
@@ -718,6 +718,7 @@
             List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
                     .map(entry -> buildTableStatistics(did, entry))
                     .filter(Objects::nonNull)
+                    .distinct()
                     .collect(Collectors.toList());
             providerService.pushTableStatistics(did, tableStatsEntries);
         }
@@ -727,9 +728,9 @@
             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
             NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
             if (adaptiveFlowSampling && afsc != null)  {
-                List<FlowEntry> flowEntries = replies.getEntries().stream()
+                Set<FlowEntry> flowEntries = replies.getEntries().stream()
                         .map(entry -> new FlowEntryBuilder(did, entry, driverService).withSetAfsc(afsc).build())
-                        .collect(Collectors.toList());
+                        .collect(Collectors.toSet());
 
                 // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
                 if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
@@ -748,9 +749,9 @@
                     providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
                 }
             } else {
-                List<FlowEntry> flowEntries = replies.getEntries().stream()
+                Set<FlowEntry> flowEntries = replies.getEntries().stream()
                         .map(entry -> new FlowEntryBuilder(did, entry, driverService).build())
-                        .collect(Collectors.toList());
+                        .collect(Collectors.toSet());
                 // call existing entire flow stats update with flowMissing synchronization
                 providerService.pushFlowMetrics(did, flowEntries);
             }