Avoids delete of dataplane state during mastership change

Leftover in the flow stats creates duplicate flow stats entry.
These entries were considered as flows not in the store and thus removed

Additionally adds further guards during the processing of the stats and
updates unit tests

Change-Id: I242009f560660fb572a633f153edf9798256bfd0
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index bdc48ac..d601d8d 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -23,6 +23,8 @@
 import com.google.common.collect.Sets;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
@@ -70,6 +72,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -148,6 +151,8 @@
 
     private final Map<Long, FlowOperationsProcessor> pendingFlowOperations = new ConcurrentHashMap<>();
 
+    private NodeId local;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowRuleStore store;
 
@@ -166,6 +171,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DriverService driverService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     @Activate
     public void activate(ComponentContext context) {
         store.setDelegate(delegate);
@@ -174,6 +182,7 @@
         cfgService.registerProperties(getClass());
         modified(context);
         idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+        local = clusterService.getLocalNode().id();
         log.info("Started");
     }
 
@@ -492,7 +501,7 @@
             log.debug("Flow {} is on switch but not in store.", flowRule);
         }
 
-        private void flowAdded(FlowEntry flowEntry) {
+        private boolean flowAdded(FlowEntry flowEntry) {
             checkNotNull(flowEntry, FLOW_RULE_NULL);
             checkValidity();
 
@@ -500,6 +509,7 @@
                 FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
                 if (event == null) {
                     log.debug("No flow store event generated.");
+                    return false;
                 } else {
                     log.trace("Flow {} {}", flowEntry, event.type());
                     post(event);
@@ -508,6 +518,7 @@
                 log.debug("Removing flow rules....");
                 removeFlowRules(flowEntry);
             }
+            return true;
         }
 
         private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) {
@@ -565,15 +576,32 @@
                                              boolean useMissingFlow) {
             Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
             store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
+            NodeId master;
+            boolean done;
 
+            // Processing flow rules
             for (FlowEntry rule : flowEntries) {
                 try {
                     FlowEntry storedRule = storedRules.remove(rule);
                     if (storedRule != null) {
                         if (storedRule.exactMatch(rule)) {
                             // we both have the rule, let's update some info then.
-                            flowAdded(rule);
+                            done = flowAdded(rule);
+                            if (!done) {
+                                // Mastership change can occur during this iteration
+                                master = mastershipService.getMasterFor(deviceId);
+                                if (!Objects.equals(local, master)) {
+                                    log.warn("Tried to update the flow stats while the node was not the master");
+                                    return;
+                                }
+                            }
                         } else {
+                            // Mastership change can occur during this iteration
+                            master = mastershipService.getMasterFor(deviceId);
+                            if (!Objects.equals(local, master)) {
+                                log.warn("Tried to update the flows while the node was not the master");
+                                return;
+                            }
                             // the two rules are not an exact match - remove the
                             // switch's rule and install our rule
                             extraneousFlow(rule);
@@ -582,9 +610,23 @@
                     } else {
                         // the device has a rule the store does not have
                         if (!allowExtraneousRules) {
+                            // Mastership change can occur during this iteration
+                            master = mastershipService.getMasterFor(deviceId);
+                            if (!Objects.equals(local, master)) {
+                                log.warn("Tried to remove flows while the node was not the master");
+                                return;
+                            }
                             extraneousFlow(rule);
                         } else if (importExtraneousRules) { // Stores the rule, if so is indicated
-                            store.addOrUpdateFlowRule(rule);
+                            FlowRuleEvent flowRuleEvent = store.addOrUpdateFlowRule(rule);
+                            if (flowRuleEvent == null) {
+                                // Mastership change can occur during this iteration
+                                master = mastershipService.getMasterFor(deviceId);
+                                if (!Objects.equals(local, master)) {
+                                    log.warn("Tried to import flows while the node was not the master");
+                                    return;
+                                }
+                            }
                         }
                     }
                 } catch (Exception e) {
@@ -596,6 +638,12 @@
             // DO NOT reinstall
             if (useMissingFlow) {
                 for (FlowEntry rule : storedRules.keySet()) {
+                    // Mastership change can occur during this iteration
+                    master = mastershipService.getMasterFor(deviceId);
+                    if (!Objects.equals(local, master)) {
+                        log.warn("Tried to install missing rules while the node was not the master");
+                        return;
+                    }
                     try {
                         // there are rules in the store that aren't on the switch
                         log.debug("Adding the rule that is present in store but not on switch : {}", rule);