[SDFAB-1006] Purge groups and flows failed

With the persistence mastership in place the stats are no longer
discarded because of the lack of a master. This change introduces
further checks based on the availability. Additionally, it purges
the pending groups map which was leaking memory.

Change-Id: I6b0d127bf9a247d7981871f02847f6e2f4074182
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index d593e45..9bc5abc 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -530,9 +530,13 @@
                 // Flow rule is still valid, let's try to update the stats
                 if (storedEntry.state() != FlowEntry.FlowEntryState.PENDING_REMOVE &&
                         checkRuleLiveness(flowEntry, storedEntry)) {
+                    if (!shouldHandle(flowEntry.deviceId())) {
+                        return false;
+                    }
                     FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry);
-                    /* Something went wrong or there is no master
-                       better check if it is the latter */
+                    // Something went wrong or there is no master or
+                    // the device is not available better check if it
+                    // is the latter cases
                     if (event == null) {
                         log.debug("No flow store event generated for addOrUpdate of {}", flowEntry);
                         return false;
@@ -545,8 +549,8 @@
                     removeFlowRules(flowEntry);
                 }
             } else {
-                /* It was already removed or there is no master
-                   better check if it is the latter */
+                // It was already removed or there is no master
+                // better check if it is the latter
                 return false;
             }
             return true;
@@ -603,6 +607,11 @@
             pushFlowMetricsInternal(deviceId, flowEntries, false);
         }
 
+        private boolean shouldHandle(DeviceId deviceId) {
+            NodeId master = mastershipService.getMasterFor(deviceId);
+            return Objects.equals(local, master) && deviceService.isAvailable(deviceId);
+        }
+
         private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
                                              boolean useMissingFlow) {
             Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
@@ -620,17 +629,17 @@
                             done = handleExistingFlow(rule);
                             if (!done) {
                                 // Mastership change can occur during this iteration
-                                master = mastershipService.getMasterFor(deviceId);
-                                if (!Objects.equals(local, master)) {
-                                    log.warn("Tried to update the flow stats while the node was not the master");
+                                if (!shouldHandle(deviceId)) {
+                                    log.warn("Tried to update the flow stats while the node was not the master" +
+                                            " or the device {} was not available", deviceId);
                                     return;
                                 }
                             }
                         } else {
                             // Mastership change can occur during this iteration
-                            master = mastershipService.getMasterFor(deviceId);
-                            if (!Objects.equals(local, master)) {
-                                log.warn("Tried to update the flows while the node was not the master");
+                            if (!shouldHandle(deviceId)) {
+                                log.warn("Tried to update the flows while the node was not the master" +
+                                        " or the device {} was not available", deviceId);
                                 return;
                             }
                             // the two rules are not an exact match - remove the
@@ -642,9 +651,9 @@
                         // the device has a rule the store does not have
                         if (!allowExtraneousRules) {
                             // Mastership change can occur during this iteration
-                            master = mastershipService.getMasterFor(deviceId);
-                            if (!Objects.equals(local, master)) {
-                                log.warn("Tried to remove flows while the node was not the master");
+                            if (!shouldHandle(deviceId)) {
+                                log.warn("Tried to remove flows while the node was not the master" +
+                                        " or the device {} was not available", deviceId);
                                 return;
                             }
                             extraneousFlow(rule);
@@ -652,9 +661,9 @@
                             FlowRuleEvent flowRuleEvent = store.addOrUpdateFlowRule(rule);
                             if (flowRuleEvent == null) {
                                 // Mastership change can occur during this iteration
-                                master = mastershipService.getMasterFor(deviceId);
-                                if (!Objects.equals(local, master)) {
-                                    log.warn("Tried to import flows while the node was not the master");
+                                if (!shouldHandle(deviceId)) {
+                                    log.warn("Tried to import flows while the node was not the master" +
+                                            " or the device {} was not available", deviceId);
                                     return;
                                 }
                             }
@@ -670,9 +679,9 @@
             if (useMissingFlow) {
                 for (FlowEntry rule : storedRules.keySet()) {
                     // Mastership change can occur during this iteration
-                    master = mastershipService.getMasterFor(deviceId);
-                    if (!Objects.equals(local, master)) {
-                        log.warn("Tried to install missing rules while the node was not the master");
+                    if (!shouldHandle(deviceId)) {
+                        log.warn("Tried to install missing rules while the node was not the master" +
+                                " or the device {} was not available", deviceId);
                         return;
                     }
                     try {
diff --git a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
index 300b023..4be8220 100644
--- a/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flow/impl/FlowRuleManagerTest.java
@@ -543,6 +543,14 @@
         public Device getDevice(DeviceId deviceId) {
             return deviceId.equals(FOO_DID) ? FOO_DEV : DEV;
         }
+
+        @Override
+        public boolean isAvailable(DeviceId deviceId) {
+            if (deviceId.equals(DID) || deviceId.equals(FOO_DID)) {
+                return true;
+            }
+            return false;
+        }
     }
 
     private class TestProvider extends AbstractProvider implements FlowRuleProvider {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index d358a77..9032a23 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -1100,6 +1100,16 @@
                 .forEach(entriesPendingRemove::add);
 
         purgeGroupEntries(entriesPendingRemove);
+
+        // it is unlikely to happen but better clear also the
+        // pending groups: device disconnected before we got
+        // the first stats and the apps were able to push groups.
+        entriesPendingRemove.clear();
+        getPendingGroupKeyTable().entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                .forEach(entriesPendingRemove::add);
+
+        purgeGroupEntries(entriesPendingRemove);
     }
 
     @Override
@@ -1112,11 +1122,25 @@
                 .forEach(entriesPendingRemove::add);
 
         purgeGroupEntries(entriesPendingRemove);
+
+        // it is unlikely to happen but better clear also the
+        // pending groups: device disconnected before we got
+        // the first stats and the apps were able to push groups.
+        entriesPendingRemove.clear();
+        getPendingGroupKeyTable().entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+                .forEach(entriesPendingRemove::add);
+
+        purgeGroupEntries(entriesPendingRemove);
     }
 
     @Override
     public void purgeGroupEntries() {
         purgeGroupEntries(getGroupStoreKeyMap().entrySet());
+        // it is unlikely to happen but better clear also the
+        // pending groups: device disconnected before we got
+        // the first stats and the apps were able to push groups.
+        purgeGroupEntries(getPendingGroupKeyTable().entrySet());
     }
 
     @Override
@@ -1140,7 +1164,14 @@
                     log.debug("processing pending group add requests for device {}: {}",
                             deviceId, pendingIds);
                 }
+                NodeId master;
                 for (Group group : pendingGroupRequests) {
+                    // Mastership change can occur during this iteration
+                    if (!shouldHandle(deviceId)) {
+                        log.warn("Tried to process pending groups while the node was not the master" +
+                                " or the device {} was not available", deviceId);
+                        return;
+                    }
                     GroupDescription tmp = new DefaultGroupDescription(
                             group.deviceId(),
                             group.type(),
@@ -1470,6 +1501,11 @@
         }
     }
 
+    private boolean shouldHandle(DeviceId deviceId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+        return Objects.equals(local, master) && deviceService.isAvailable(deviceId);
+    }
+
     @Override
     public void pushGroupMetrics(DeviceId deviceId,
                                  Collection<Group> groupEntries) {
@@ -1504,9 +1540,9 @@
         // update stats
         for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
             // Mastership change can occur during this iteration
-            master = mastershipService.getMasterFor(deviceId);
-            if (!Objects.equals(local, master)) {
-                log.warn("Tried to update the group stats while the node was not the master");
+            if (!shouldHandle(deviceId)) {
+                log.warn("Tried to update the group stats while the node was not the master" +
+                        " or the device {} was not available", deviceId);
                 return;
             }
             Group group = it2.next();
@@ -1534,9 +1570,9 @@
                 }
             } else {
                 // Mastership change can occur during this iteration
-                master = mastershipService.getMasterFor(deviceId);
-                if (!Objects.equals(local, master)) {
-                    log.warn("Tried to process extraneous groups while the node was not the master");
+                if (!shouldHandle(deviceId)) {
+                    log.warn("Tried to process extraneous groups while the node was not the master" +
+                            " or the device {} was not available", deviceId);
                     return;
                 }
                 // there are groups in the switch that aren't in the store
@@ -1554,9 +1590,9 @@
         // missing groups in the dataplane
         for (StoredGroupEntry group : storedGroupEntries) {
             // Mastership change can occur during this iteration
-            master = mastershipService.getMasterFor(deviceId);
-            if (!Objects.equals(local, master)) {
-                log.warn("Tried to process missing groups while the node was not the master");
+            if (!shouldHandle(deviceId)) {
+                log.warn("Tried to process missing groups while the node was not the master" +
+                        " or the device {} was not available", deviceId);
                 return;
             }
             // there are groups in the store that aren't in the switch
@@ -1568,9 +1604,9 @@
         // extraneous groups in the store
         for (Group group : extraneousStoredEntries) {
             // Mastership change can occur during this iteration
-            master = mastershipService.getMasterFor(deviceId);
-            if (!Objects.equals(local, master)) {
-                log.warn("Tried to process node extraneous groups while the node was not the master");
+            if (!shouldHandle(deviceId)) {
+                log.warn("Tried to process node extraneous groups while the node was not the master" +
+                        " or the device {} was not available", deviceId);
                 return;
             }
             // there are groups in the extraneous store that