CORD-394 Purge group/flow store when device goes offline

Stage 1: (this commit)
Add a component config purgeOnDisconnection, which is false by default.
When set to true, GroupManager and FlowManager will purge groups/flows
associated with a device when the device goes offline.

Stage 2: (upcoming commit)
Enable these configs in SegmentRoutingManager
Clean up group related information in SegmentRountingManager

Change-Id: I46d047d690d4641e030f6cdd084ce16ac02d8919
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 662a7eb..a2b9995 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -611,6 +611,11 @@
     }
 
     @Override
+    public void purgeFlowRule(DeviceId deviceId) {
+        flowTable.purgeFlowRule(deviceId);
+    }
+
+    @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
         //FIXME: need a per device pending response
         NodeId nodeId = pendingResponses.remove(event.subject().batchId());
@@ -827,6 +832,10 @@
             }
         }
 
+        public void purgeFlowRule(DeviceId deviceId) {
+            flowEntries.remove(deviceId);
+        }
+
         private NodeId getBackupNode(DeviceId deviceId) {
             List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
             // pick the standby which is most likely to become next master
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 4f4c06f..1d7ded1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -67,8 +67,10 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -845,6 +847,21 @@
     }
 
     @Override
+    public void purgeGroupEntry(DeviceId deviceId) {
+        Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
+                new HashSet<>();
+
+        groupStoreEntriesByKey.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                .forEach(entryPendingRemove::add);
+
+        entryPendingRemove.forEach(entry -> {
+            groupStoreEntriesByKey.remove(entry.getKey());
+            notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
+        });
+    }
+
+    @Override
     public void deviceInitialAuditCompleted(DeviceId deviceId,
                                             boolean completed) {
         synchronized (deviceAuditStatus) {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index dbb63b2..066fced 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -71,8 +71,10 @@
     DeviceId deviceId2 = did("dev2");
     GroupId groupId1 = new DefaultGroupId(1);
     GroupId groupId2 = new DefaultGroupId(2);
+    GroupId groupId3 = new DefaultGroupId(3);
     GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
     GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
+    GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
 
     TrafficTreatment treatment =
             DefaultTrafficTreatment.emptyTreatment();
@@ -97,6 +99,13 @@
             groupKey2,
             groupId2.id(),
             APP_ID);
+    GroupDescription groupDescription3 = new DefaultGroupDescription(
+            deviceId2,
+            GroupDescription.Type.INDIRECT,
+            buckets,
+            groupKey3,
+            groupId3.id(),
+            APP_ID);
 
     DistributedGroupStore groupStoreImpl;
     GroupStore groupStore;
@@ -202,6 +211,30 @@
     }
 
     /**
+     * Tests removing all groups on the given device.
+     */
+    @Test
+    public void testRemoveGroupOnDevice() throws Exception {
+        groupStore.deviceInitialAuditCompleted(deviceId1, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+        groupStore.deviceInitialAuditCompleted(deviceId2, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+        // Make sure the pending list starts out empty
+        assertThat(auditPendingReqQueue.size(), is(0));
+
+        groupStore.storeGroupDescription(groupDescription1);
+        groupStore.storeGroupDescription(groupDescription2);
+        groupStore.storeGroupDescription(groupDescription3);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+        groupStore.purgeGroupEntry(deviceId2);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(0));
+    }
+
+    /**
      * Tests adding and removing a group.
      */
     @Test