CORD-394 Purge group/flow store when device goes offline

Stage 1: (this commit)
Add a component config purgeOnDisconnection, which is false by default.
When set to true, GroupManager and FlowManager will purge groups/flows
associated with a device when the device goes offline.

Stage 2: (upcoming commit)
Enable these configs in SegmentRoutingManager
Clean up group related information in SegmentRountingManager

Change-Id: I46d047d690d4641e030f6cdd084ce16ac02d8919
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index ae20f2a..0d62467 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -107,6 +107,13 @@
     FlowRuleEvent pendingFlowRule(FlowEntry rule);
 
     /**
+     * Removes all flow entries of given device from store.
+     *
+     * @param deviceId device id
+     */
+    void purgeFlowRule(DeviceId deviceId);
+
+    /**
      * Updates the flow table statistics of the specified device using
      * the given statistics.
      *
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
index 8b6df5d..dcb8360 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
@@ -118,6 +118,13 @@
     void removeGroupEntry(Group group);
 
     /**
+     * Removes all group entries of given device from store.
+     *
+     * @param deviceId device id
+     */
+    void purgeGroupEntry(DeviceId deviceId);
+
+    /**
      * A group entry that is present in switch but not in the store.
      *
      * @param group group entry
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
index 33f1cc5..e80d175 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
@@ -274,6 +274,10 @@
         return null;
     }
 
+    public void purgeFlowRule(DeviceId deviceId) {
+        flowEntries.remove(deviceId);
+    }
+
     @Override
     public void storeBatch(
             FlowRuleBatchOperation operation) {
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
index 230fa33..d3ab890 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
@@ -23,6 +23,8 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -477,6 +479,19 @@
     }
 
     @Override
+    public void purgeGroupEntry(DeviceId deviceId) {
+        Set<Map.Entry<GroupId, StoredGroupEntry>> entryPendingRemove =
+                groupEntriesById.get(deviceId).entrySet();
+
+        groupEntriesById.remove(deviceId);
+        groupEntriesByKey.remove(deviceId);
+
+        entryPendingRemove.forEach(entry -> {
+            notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
+        });
+    }
+
+    @Override
     public void deviceInitialAuditCompleted(DeviceId deviceId,
                                             boolean completed) {
         synchronized (deviceAuditStatus) {
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java
index b10fca5..15eb9bc 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStoreTest.java
@@ -16,6 +16,8 @@
 package org.onosproject.store.trivial;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.onosproject.net.DeviceId.deviceId;
 
 import java.util.ArrayList;
@@ -199,6 +201,11 @@
 
         // Testing removeGroupEntry operation from southbound
         testRemoveGroupFromSB(currKey);
+
+        // Testing removing all groups on the given device
+        newKey = new DefaultGroupKey("group1".getBytes());
+        testStoreAndGetGroup(newKey);
+        testDeleteGroupOnDevice(newKey);
     }
 
     // Testing storeGroup operation
@@ -376,6 +383,13 @@
         simpleGroupStore.unsetDelegate(deleteGroupDescDelegate);
     }
 
+    // Testing deleteGroupDescription operation from northbound
+    private void testDeleteGroupOnDevice(GroupKey currKey) {
+        assertThat(simpleGroupStore.getGroupCount(D1), is(1));
+        simpleGroupStore.purgeGroupEntry(D1);
+        assertThat(simpleGroupStore.getGroupCount(D1), is(0));
+    }
+
     // Testing removeGroupEntry operation from southbound
     private void testRemoveGroupFromSB(GroupKey currKey) {
         Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 570383a..9162ff6 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.net.flow.impl;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -31,8 +30,9 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -75,6 +75,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_ADD_REQUESTED;
 import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVE_REQUESTED;
@@ -101,9 +102,14 @@
             label = "Allow flow rules in switch not installed by ONOS")
     private boolean allowExtraneousRules = ALLOW_EXTRANEOUS_RULES;
 
+    @Property(name = "purgeOnDisconnection", boolValue = false,
+            label = "Purge entries associated with a device when the device goes offline")
+    private boolean purgeOnDisconnection = false;
+
     private final Logger log = getLogger(getClass());
 
     private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
+    private final DeviceListener deviceListener = new InternalDeviceListener();
 
     protected ExecutorService deviceInstallers =
             Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d"));
@@ -130,13 +136,12 @@
 
     @Activate
     public void activate(ComponentContext context) {
-        cfgService.registerProperties(getClass());
-        idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
-
-        modified(context);
-
         store.setDelegate(delegate);
         eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
+        deviceService.addListener(deviceListener);
+        cfgService.registerProperties(getClass());
+        idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
+        modified(context);
         log.info("Started");
     }
 
@@ -152,18 +157,59 @@
 
     @Modified
     public void modified(ComponentContext context) {
-        if (context == null) {
-            return;
+        if (context != null) {
+            readComponentConfiguration(context);
         }
+    }
 
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
         Dictionary<?, ?> properties = context.getProperties();
+        Boolean flag;
 
-        String s = Tools.get(properties, "allowExtraneousRules");
-        allowExtraneousRules = Strings.isNullOrEmpty(s) ? ALLOW_EXTRANEOUS_RULES : Boolean.valueOf(s);
-
-        if (allowExtraneousRules) {
-            log.info("Allowing flow rules not installed by ONOS");
+        flag = isPropertyEnabled(properties, "allowExtraneousRules");
+        if (flag == null) {
+            log.info("AllowExtraneousRules is not configured, " +
+                    "using current value of {}", allowExtraneousRules);
+        } else {
+            allowExtraneousRules = flag;
+            log.info("Configured. AllowExtraneousRules is {}",
+                    allowExtraneousRules ? "enabled" : "disabled");
         }
+
+        flag = isPropertyEnabled(properties, "purgeOnDisconnection");
+        if (flag == null) {
+            log.info("PurgeOnDisconnection is not configured, " +
+                    "using current value of {}", purgeOnDisconnection);
+        } else {
+            purgeOnDisconnection = flag;
+            log.info("Configured. PurgeOnDisconnection is {}",
+                    purgeOnDisconnection ? "enabled" : "disabled");
+        }
+    }
+
+    /**
+     * Check property name is defined and set to true.
+     *
+     * @param properties   properties to be looked up
+     * @param propertyName the name of the property to look up
+     * @return value when the propertyName is defined or return null
+     */
+    private static Boolean isPropertyEnabled(Dictionary<?, ?> properties,
+            String propertyName) {
+        Boolean value = null;
+        try {
+            String s = (String) properties.get(propertyName);
+            value = isNullOrEmpty(s) ? null : s.trim().equals("true");
+        } catch (ClassCastException e) {
+            // No propertyName defined.
+            value = null;
+        }
+        return value;
     }
 
     @Override
@@ -613,4 +659,23 @@
         checkPermission(FLOWRULE_READ);
         return store.getTableStatistics(deviceId);
     }
+
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            switch (event.type()) {
+                case DEVICE_REMOVED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    DeviceId deviceId = event.subject().id();
+                    if (!deviceService.isAvailable(deviceId)) {
+                        if (purgeOnDisconnection) {
+                            store.purgeFlowRule(deviceId);
+                        }
+                    }
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
 }
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index 96e9b19..d6158b5 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -18,9 +18,12 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
@@ -43,11 +46,14 @@
 import org.onosproject.net.group.GroupStore.UpdateType;
 import org.onosproject.net.group.GroupStoreDelegate;
 import org.onosproject.net.provider.AbstractProviderService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Dictionary;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.slf4j.LoggerFactory.getLogger;
 import static org.onosproject.security.AppPermission.Type.*;
@@ -75,21 +81,78 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    @Property(name = "purgeOnDisconnection", boolValue = false,
+            label = "Purge entries associated with a device when the device goes offline")
+    private boolean purgeOnDisconnection = false;
+
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
         store.setDelegate(delegate);
         eventDispatcher.addSink(GroupEvent.class, listenerRegistry);
         deviceService.addListener(deviceListener);
+        cfgService.registerProperties(getClass());
+        modified(context);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(GroupEvent.class);
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context != null) {
+            readComponentConfiguration(context);
+        }
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+        Boolean flag;
+
+        flag = isPropertyEnabled(properties, "purgeOnDisconnection");
+        if (flag == null) {
+            log.info("PurgeOnDisconnection is not configured, " +
+                    "using current value of {}", purgeOnDisconnection);
+        } else {
+            purgeOnDisconnection = flag;
+            log.info("Configured. PurgeOnDisconnection is {}",
+                    purgeOnDisconnection ? "enabled" : "disabled");
+        }
+    }
+
+    /**
+     * Check property name is defined and set to true.
+     *
+     * @param properties   properties to be looked up
+     * @param propertyName the name of the property to look up
+     * @return value when the propertyName is defined or return null
+     */
+    private static Boolean isPropertyEnabled(Dictionary<?, ?> properties,
+            String propertyName) {
+        Boolean value = null;
+        try {
+            String s = (String) properties.get(propertyName);
+            value = isNullOrEmpty(s) ? null : s.trim().equals("true");
+        } catch (ClassCastException e) {
+            // No propertyName defined.
+            value = null;
+        }
+        return value;
+    }
+
     /**
      * Create a group in the specified device with the provided parameters.
      *
@@ -303,13 +366,17 @@
             switch (event.type()) {
                 case DEVICE_REMOVED:
                 case DEVICE_AVAILABILITY_CHANGED:
-                    if (!deviceService.isAvailable(event.subject().id())) {
+                    DeviceId deviceId = event.subject().id();
+                    if (!deviceService.isAvailable(deviceId)) {
                         log.debug("Device {} became un available; clearing initial audit status",
                                   event.type(), event.subject().id());
                         store.deviceInitialAuditCompleted(event.subject().id(), false);
+
+                        if (purgeOnDisconnection) {
+                            store.purgeGroupEntry(deviceId);
+                        }
                     }
                     break;
-
                 default:
                     break;
             }
diff --git a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
index 73ce393..27aaaf9 100644
--- a/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/group/impl/GroupManagerTest.java
@@ -32,6 +32,7 @@
 import org.junit.Test;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
+import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.core.DefaultGroupId;
@@ -89,11 +90,12 @@
         mgr = new GroupManager();
         groupService = mgr;
         mgr.deviceService = new DeviceManager();
+        mgr.cfgService = new ComponentConfigAdapter();
         mgr.store = new SimpleGroupStore();
         injectEventDispatcher(mgr, new TestEventDispatcher());
         providerRegistry = mgr;
 
-        mgr.activate();
+        mgr.activate(null);
         mgr.addListener(listener);
 
         internalProvider = new TestGroupProvider(PID);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 662a7eb..a2b9995 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -611,6 +611,11 @@
     }
 
     @Override
+    public void purgeFlowRule(DeviceId deviceId) {
+        flowTable.purgeFlowRule(deviceId);
+    }
+
+    @Override
     public void batchOperationComplete(FlowRuleBatchEvent event) {
         //FIXME: need a per device pending response
         NodeId nodeId = pendingResponses.remove(event.subject().batchId());
@@ -827,6 +832,10 @@
             }
         }
 
+        public void purgeFlowRule(DeviceId deviceId) {
+            flowEntries.remove(deviceId);
+        }
+
         private NodeId getBackupNode(DeviceId deviceId) {
             List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
             // pick the standby which is most likely to become next master
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 4f4c06f..1d7ded1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -67,8 +67,10 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -845,6 +847,21 @@
     }
 
     @Override
+    public void purgeGroupEntry(DeviceId deviceId) {
+        Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entryPendingRemove =
+                new HashSet<>();
+
+        groupStoreEntriesByKey.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                .forEach(entryPendingRemove::add);
+
+        entryPendingRemove.forEach(entry -> {
+            groupStoreEntriesByKey.remove(entry.getKey());
+            notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, entry.getValue()));
+        });
+    }
+
+    @Override
     public void deviceInitialAuditCompleted(DeviceId deviceId,
                                             boolean completed) {
         synchronized (deviceAuditStatus) {
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index dbb63b2..066fced 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -71,8 +71,10 @@
     DeviceId deviceId2 = did("dev2");
     GroupId groupId1 = new DefaultGroupId(1);
     GroupId groupId2 = new DefaultGroupId(2);
+    GroupId groupId3 = new DefaultGroupId(3);
     GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
     GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
+    GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
 
     TrafficTreatment treatment =
             DefaultTrafficTreatment.emptyTreatment();
@@ -97,6 +99,13 @@
             groupKey2,
             groupId2.id(),
             APP_ID);
+    GroupDescription groupDescription3 = new DefaultGroupDescription(
+            deviceId2,
+            GroupDescription.Type.INDIRECT,
+            buckets,
+            groupKey3,
+            groupId3.id(),
+            APP_ID);
 
     DistributedGroupStore groupStoreImpl;
     GroupStore groupStore;
@@ -202,6 +211,30 @@
     }
 
     /**
+     * Tests removing all groups on the given device.
+     */
+    @Test
+    public void testRemoveGroupOnDevice() throws Exception {
+        groupStore.deviceInitialAuditCompleted(deviceId1, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+        groupStore.deviceInitialAuditCompleted(deviceId2, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+        // Make sure the pending list starts out empty
+        assertThat(auditPendingReqQueue.size(), is(0));
+
+        groupStore.storeGroupDescription(groupDescription1);
+        groupStore.storeGroupDescription(groupDescription2);
+        groupStore.storeGroupDescription(groupDescription3);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+        groupStore.purgeGroupEntry(deviceId2);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(0));
+    }
+
+    /**
      * Tests adding and removing a group.
      */
     @Test