[SDFAB-359] Allow purging flows, groups and meters by device and application ID

Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78aef84..2b29504 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -115,6 +115,15 @@
     void purgeFlowRules(DeviceId deviceId);
 
     /**
+     * Purges all the flow rules on the specified device from the given application id.
+     * @param deviceId device identifier
+     * @param appId application identifier
+     */
+    default void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        throw new UnsupportedOperationException("purgeFlowRules not implemented!");
+    }
+
+    /**
      * Removes the specified flow rules from their respective devices. If the
      * device is not presently connected to the controller, these flow will
      * be removed once the device reconnects.
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 07493a8..1d9e60e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.net.flow;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
@@ -137,6 +138,15 @@
     default void purgeFlowRule(DeviceId deviceId) {}
 
     /**
+     * Removes all flow entries of given device and application ID from store.
+     *
+     * @param deviceId device id
+     * @param appId application id
+     * @return true if operation was successful, false otherwise.
+     */
+    boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId);
+
+    /**
      * Removes all flow entries from store.
      */
     void purgeFlowRules();
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupService.java b/core/api/src/main/java/org/onosproject/net/group/GroupService.java
index 460ccb6..bf18a2e 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupService.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupService.java
@@ -133,6 +133,16 @@
     void purgeGroupEntries(DeviceId deviceId);
 
     /**
+     * Purges all the group entries on the specified device and application.
+     *
+     * @param deviceId device identifier
+     * @param appId application identifier
+     */
+    default void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        throw new UnsupportedOperationException("purgeGroupEntries not implemented");
+    }
+
+    /**
      * Purges all group entries.
      */
     default void purgeGroupEntries() {}
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
index 850b2ce..672bb4e 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
@@ -17,6 +17,7 @@
 
 import java.util.Collection;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.GroupId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.Store;
@@ -130,6 +131,14 @@
     void purgeGroupEntry(DeviceId deviceId);
 
     /**
+     * Removes all group entries of given device and given application from store.
+     *
+     * @param deviceId device id
+     * @param appId application id
+     */
+    void purgeGroupEntries(DeviceId deviceId, ApplicationId appId);
+
+    /**
      * Removes all group entries from store.
      */
     default void purgeGroupEntries() {}
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterService.java b/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
index cf99b5e..6339101 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.net.meter;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
 
@@ -93,8 +94,18 @@
      * Purges all the meters on the specified device.
      * @param deviceId device identifier
      */
-    default void purgeMeters(DeviceId deviceId){
+    default void purgeMeters(DeviceId deviceId) {
         //Default implementation does nothing
     }
 
+    /**
+     * Purges all the meters on the given device and for the given application.
+     *
+     * @param deviceId device identifier
+     * @param appId application identifier
+     */
+    default void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+        throw new UnsupportedOperationException("purgeMeter not implemented");
+    }
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 2e632ed..811b418 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.net.meter;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.store.Store;
 
@@ -196,9 +197,19 @@
 
     /**
      * Removes all meters of given device from store.
+     * This API is typically used when the device is offline.
      *
      * @param deviceId the device id
      */
     void purgeMeter(DeviceId deviceId);
 
+    /**
+     * Removes all meters of given device and for the given application from store.
+     * This API is typically used when the device is offline.
+     *
+     * @param deviceId the device id
+     * @param appId the application id
+     */
+    void purgeMeters(DeviceId deviceId, ApplicationId appId);
+
 }
diff --git a/core/api/src/test/java/org/onosproject/net/NetTestTools.java b/core/api/src/test/java/org/onosproject/net/NetTestTools.java
index dcc3e15..42f828f 100644
--- a/core/api/src/test/java/org/onosproject/net/NetTestTools.java
+++ b/core/api/src/test/java/org/onosproject/net/NetTestTools.java
@@ -57,6 +57,7 @@
 
     public static final ProviderId PID = new ProviderId("of", "foo");
     public static final ApplicationId APP_ID = new TestApplicationId("foo");
+    public static final ApplicationId APP_ID_2 = new TestApplicationId("foo2");
     public static final NodeId NODE_ID = new NodeId("node1");
 
     // Short-hand for producing a device id from a string
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
index 21fd045..4f53b96 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
@@ -38,7 +38,11 @@
     }
 
     @Override
-    public void purgeFlowRules(DeviceId deviceId){
+    public void purgeFlowRules(DeviceId deviceId) {
+    }
+
+    @Override
+    public void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
     }
 
     @Override
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
index 31b58fb..3b667a8 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.Streams;
 import com.google.common.util.concurrent.SettableFuture;
 import org.onlab.util.Tools;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.flow.CompletedBatchOperation;
 import org.onosproject.net.flow.DefaultFlowEntry;
@@ -301,6 +302,16 @@
     }
 
     @Override
+    public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        flowEntries.get(deviceId).values()
+                .removeIf(storedFlowEntries -> {
+                    storedFlowEntries.removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+                    return storedFlowEntries.isEmpty();
+                });
+        return true;
+    }
+
+    @Override
     public void purgeFlowRules() {
         flowEntries.clear();
     }
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
index bf8ba59..8ece648 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Sets;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.GroupId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.group.DefaultGroup;
@@ -51,6 +52,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -482,6 +484,31 @@
     }
 
     @Override
+    public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        List<StoredGroupEntry> entryPendingRemove =
+                groupEntriesById.get(deviceId).values().stream()
+                        .filter(storedGroupEntry -> storedGroupEntry.appId().equals(appId))
+                        .collect(Collectors.toList());
+
+        entryPendingRemove.forEach(storedGroupEntry -> {
+            groupEntriesById.computeIfPresent(deviceId, (k, value) -> {
+                value.remove(storedGroupEntry.id());
+                if (value.isEmpty()) {
+                    return null;
+                }
+                return value;
+            });
+            groupEntriesByKey.computeIfPresent(deviceId, (k, value) -> {
+                value.remove(storedGroupEntry.appCookie());
+                if (value.isEmpty()) {
+                    return null;
+                }
+                return value;
+            });
+        });
+    }
+
+    @Override
     public void purgeGroupEntries() {
         groupEntriesById.values().forEach(groupEntries -> {
             groupEntries.entrySet().forEach(entry -> {
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index dd7f802..ac97636 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -320,6 +320,15 @@
     }
 
     @Override
+    public void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        checkPermission(FLOWRULE_WRITE);
+        checkNotNull(deviceId, DEVICE_ID_NULL);
+        checkNotNull(appId, "Application ID cannot be null!");
+
+        store.purgeFlowRules(deviceId, appId);
+    }
+
+    @Override
     public void removeFlowRules(FlowRule... flowRules) {
         checkPermission(FLOWRULE_WRITE);
 
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index e633a96..65ec5d3 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -305,6 +305,12 @@
     }
 
     @Override
+    public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        checkPermission(GROUP_WRITE);
+        store.purgeGroupEntries(deviceId, appId);
+    }
+
+    @Override
     public void purgeGroupEntries() {
         checkPermission(GROUP_WRITE);
         store.purgeGroupEntries();
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 03bb5ca..805009d 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -23,6 +23,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.config.NetworkConfigRegistry;
@@ -331,6 +332,11 @@
         store.purgeMeter(deviceId);
     }
 
+    @Override
+    public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+        store.purgeMeters(deviceId, appId);
+    }
+
     private class InternalMeterProviderService
             extends AbstractProviderService<MeterProvider>
             implements MeterProviderService {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
     }
 
     /**
+     * Purges the flows with the given application id.
+     *
+     * @param appId the application id
+     * @return a future to be completed once flow rules with given application
+     * id have been purged on all buckets
+     */
+    public CompletableFuture<Void> purge(ApplicationId appId) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        // If the master's term is not currently active (has not been synchronized
+        // with prior replicas), enqueue the changes to be executed once the master
+        // has been synchronized.
+        final long term = replicaInfo.term();
+        List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operations for device {}", deviceId);
+            flowBuckets.values().forEach(
+                    bucket -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        completablePurges.add(future);
+                        flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+                                                  b -> new LinkedList<>())
+                                .add(() -> future.complete(apply((bkt, trm) -> {
+                                    bkt.purge(appId, trm, clock);
+                                    return null;
+                                    }, bucket, term)));
+                    });
+
+        } else {
+            flowBuckets.values().forEach(bucket -> {
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                completablePurges.add(future);
+                future.complete(apply((bkt, trm) -> {
+                    bkt.purge(appId, trm, clock);
+                    return null;
+                    }, bucket, term));
+            });
+        }
+        return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+    }
+
+    /**
      * Closes the device flow table.
      */
     public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
     private final Logger log = getLogger(getClass());
 
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    private static final long PURGE_TIMEOUT_MILLIS = 30000;
     private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
 
     /** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
             serializer::encode, executor);
         clusterCommunicator.addSubscriber(
             REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+        clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+                PURGE_FLOW_RULES,
+                serializer::decode,
+                p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+                serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
     }
 
     @Override
+    public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+
+        if (Objects.equals(local, master)) {
+            // bypass and handle it locally
+            return flowTable.purgeFlowRules(deviceId, appId);
+        }
+
+        if (master == null) {
+            log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+            return false;
+        }
+
+        log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+                  master, deviceId);
+
+        return Tools.futureGetOrElse(
+                clusterCommunicator.sendAndReceive(
+                        Pair.of(deviceId, appId),
+                        PURGE_FLOW_RULES,
+                        serializer::encode,
+                        serializer::decode,
+                        master),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                false);
+    }
+
+    @Override
     public void purgeFlowRules() {
         flowTable.purgeFlowRules();
     }
@@ -874,6 +911,33 @@
         }
 
         /**
+         * Purges flow rules for the given device and application id.
+         *
+         * @param deviceId the device for which to purge flow rules
+         * @param appId the application id for with to purge flow rules
+         * @return true if purge is successful, false otherwise
+         */
+        public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+            DeviceFlowTable flowTable = flowTables.get(deviceId);
+            if (flowTable != null) {
+                // flowTable.purge() returns a CompletableFuture<Void>, we want
+                // to return true when the completable future returns correctly
+                // within the timeout, otherwise return false.
+                try {
+                    // Use higher timeout, purge(appId) may require more time
+                    // than normal operations because it's applying the purge
+                    // operation on every single flow table bucket.
+                    flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    return true;
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException | TimeoutException ignored) {
+                }
+            }
+            return false;
+        }
+
+        /**
          * Purges all flow rules from the table.
          */
         public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
 
     public static final MessageSubject FLOW_TABLE_BACKUP
         = new MessageSubject("peer-flow-table-backup");
+
+    public static final MessageSubject PURGE_FLOW_RULES
+            = new MessageSubject("peer-purge-flow-rules");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
     }
 
     /**
+     * Purge the entries with the given application ID.
+     *
+     * @param appId the application ID
+     * @param term  the term in which the purge occurred
+     * @param clock the logical clock
+     */
+    public void purge(ApplicationId appId, long term, LogicalClock clock) {
+        boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+            flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+            return flowEntryMap.isEmpty();
+        });
+        if (anythingRemoved) {
+            recordUpdate(term, clock.getTimestamp());
+        }
+    }
+
+
+    /**
      * Clears the bucket.
      */
     public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.GroupId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
     }
 
     @Override
+    public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+                new HashSet<>();
+
+        getGroupStoreKeyMap().entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+                .forEach(entriesPendingRemove::add);
+
+        purgeGroupEntries(entriesPendingRemove);
+    }
+
+    @Override
     public void purgeGroupEntries() {
         purgeGroupEntries(getGroupStoreKeyMap().entrySet());
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.math.RandomUtils;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.MeterQuery;
 import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
 
     @Override
     public void purgeMeter(DeviceId deviceId) {
-        // Purge api (typically used when the device is offline)
         List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toList());
-        // Remove definitely the meter
         metersPendingRemove.forEach(versionedMeterKey
                 -> purgeMeter(versionedMeterKey.value().meter()));
     }
 
     @Override
+    public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+                        e.getValue().value().meter().appId().equals(appId))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+        metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+    }
+
+    @Override
     public long getMaxMeters(MeterFeaturesKey key) {
         // Leverage the meter features to know the max id
         // Create a Meter Table key with FeaturesKey's device and global scope
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 0dfc2a2..60261d6 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -50,7 +50,6 @@
 import org.onosproject.store.service.TestStorageService;
 
 import org.onlab.packet.Ip4Address;
-import java.util.Iterator;
 import java.util.Optional;
 
 import org.osgi.service.component.ComponentContext;
@@ -64,6 +63,7 @@
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 
 /**
@@ -83,6 +83,7 @@
     private static final IntentTestsMocks.MockTreatment TREATMENT =
             new IntentTestsMocks.MockTreatment();
     DeviceId deviceId = did("device1");
+    DeviceId deviceId2 = did("device2");
     FlowRule flowRule =
             DefaultFlowRule.builder()
                     .forDevice(deviceId)
@@ -101,6 +102,25 @@
                     .makeTemporary(44)
                     .fromApp(APP_ID)
                     .build();
+    FlowRule flowRule2 =
+            DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(SELECTOR)
+                    .withTreatment(TREATMENT)
+                    .withPriority(44)
+                    .makePermanent()
+                    .fromApp(APP_ID_2)
+                    .build();
+    FlowRule flowRule3 =
+            DefaultFlowRule.builder()
+                    .forDevice(deviceId2)
+                    .withSelector(SELECTOR)
+                    .withTreatment(TREATMENT)
+                    .withPriority(55)
+                    .makePermanent()
+                    .fromApp(APP_ID_2)
+                    .build();
+
 
     static class MasterOfAll extends MastershipServiceAdapter {
         @Override
@@ -247,14 +267,7 @@
         assertEquals("PENDING_ADD", flowEntry1.state().toString());
 
         flowStoreImpl.addOrUpdateFlowRule(flowEntry);
-        Iterable<FlowEntry> flows = flowStoreImpl.getFlowEntries(deviceId);
-        int sum = 0;
-        Iterator it = flows.iterator();
-        while (it.hasNext()) {
-            it.next();
-            sum++;
-        }
-        assertThat(sum, is(1));
+        assertFlowsOnDevice(deviceId, 1);
 
         FlowEntry flowEntry2 = flowStoreImpl.getFlowEntry(flowRule);
         assertEquals("ADDED", flowEntry2.state().toString());
@@ -270,15 +283,7 @@
         for (FlowEntry flow : flows1) {
             flowStoreImpl.removeFlowRule(flow);
         }
-
-        Iterable<FlowEntry> flows2 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum = 0;
-        Iterator it = flows2.iterator();
-        while (it.hasNext()) {
-            it.next();
-            sum++;
-        }
-        assertThat(sum, is(0));
+        assertFlowsOnDevice(deviceId, 0);
     }
 
     /**
@@ -291,23 +296,41 @@
 
         FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
         flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
-        Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum2 = 0;
-        Iterator it1 = flows1.iterator();
-        while (it1.hasNext()) {
-            it1.next();
-            sum2++;
-        }
-        assertThat(sum2, is(2));
+        assertFlowsOnDevice(deviceId, 2);
         flowStoreImpl.purgeFlowRule(deviceId);
 
-        Iterable<FlowEntry> flows3 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum3 = 0;
-        Iterator it3 = flows3.iterator();
-        while (it3.hasNext()) {
-            it3.next();
-            sum3++;
+        assertFlowsOnDevice(deviceId, 0);
+    }
+
+    /**
+     * Tests purge flow for a device and an application.
+     */
+    @Test
+    public void testPurgeFlowAppId() {
+        FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
+
+        FlowEntry flowEntry2 = new DefaultFlowEntry(flowRule2);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry2);
+
+        FlowEntry flowEntry3 = new DefaultFlowEntry(flowRule3);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry3);
+
+        assertFlowsOnDevice(deviceId, 2);
+        assertFlowsOnDevice(deviceId2, 1);
+
+        flowStoreImpl.purgeFlowRules(deviceId, APP_ID_2);
+
+        assertFlowsOnDevice(deviceId, 1);
+        assertFlowsOnDevice(deviceId2, 1);
+    }
+
+    private void assertFlowsOnDevice(DeviceId deviceId, int nFlows) {
+        Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
+        int sum1 = 0;
+        for (FlowEntry flowEntry : flows1) {
+            sum1++;
         }
-        assertThat(sum3, is(0));
+        assertThat(sum1, is(nFlows));
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 9e0dd9d..e3dc8a4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -69,6 +69,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 import static org.onosproject.net.group.GroupDescription.Type.ALL;
 import static org.onosproject.net.group.GroupDescription.Type.INDIRECT;
@@ -84,9 +85,13 @@
     private final GroupId groupId1 = new GroupId(1);
     private final GroupId groupId2 = new GroupId(2);
     private final GroupId groupId3 = new GroupId(3);
+    private final GroupId groupId4 = new GroupId(4);
+    private final GroupId groupId5 = new GroupId(5);
     private final GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
     private final GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
     private final GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
+    private final GroupKey groupKey4 = new DefaultGroupKey("jkl".getBytes());
+    private final GroupKey groupKey5 = new DefaultGroupKey("mno".getBytes());
 
     private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
     private final TrafficTreatment treatment2 = DefaultTrafficTreatment.builder()
@@ -118,6 +123,20 @@
             groupKey3,
             groupId3.id(),
             APP_ID);
+    private final GroupDescription groupDescription4 = new DefaultGroupDescription(
+            deviceId2,
+            INDIRECT,
+            indirectGroupBuckets,
+            groupKey4,
+            groupId4.id(),
+            APP_ID_2);
+    private final GroupDescription groupDescription5 = new DefaultGroupDescription(
+            deviceId1,
+            INDIRECT,
+            indirectGroupBuckets,
+            groupKey5,
+            groupId5.id(),
+            APP_ID_2);
 
     private DistributedGroupStore groupStoreImpl;
     private GroupStore groupStore;
@@ -310,6 +329,30 @@
     }
 
     /**
+     * Tests removing all groups on the given device from a specific application.
+     */
+    @Test
+    public void testRemoveGroupOnDeviceFromApp() throws Exception {
+        groupStore.deviceInitialAuditCompleted(deviceId1, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+        groupStore.deviceInitialAuditCompleted(deviceId2, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+        // Make sure the pending list starts out empty
+        assertThat(auditPendingReqQueue.size(), is(0));
+
+        groupStore.storeGroupDescription(groupDescription3);
+        groupStore.storeGroupDescription(groupDescription4);
+        groupStore.storeGroupDescription(groupDescription5);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+        groupStore.purgeGroupEntries(deviceId2, APP_ID_2);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(1));
+    }
+
+    /**
      * Tests adding and removing a group.
      */
     @Test
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 4cddf54..0527755 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -54,6 +54,7 @@
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.*;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 
 /**
@@ -79,6 +80,7 @@
     // Meter ids used during the tests
     private MeterId mid1 = MeterId.meterId(1);
     private MeterId mid2 = MeterId.meterId(2);
+    private MeterId mid3 = MeterId.meterId(3);
     private MeterId mid10 = MeterId.meterId(10);
 
     // Bands used during the tests
@@ -96,6 +98,22 @@
             .withBands(Collections.singletonList(b1))
             .build();
 
+    private Meter m2 = DefaultMeter.builder()
+            .forDevice(did1)
+            .fromApp(APP_ID_2)
+            .withCellId(mid2)
+            .withUnit(Meter.Unit.KB_PER_SEC)
+            .withBands(Collections.singletonList(b1))
+            .build();
+
+    private Meter m3 = DefaultMeter.builder()
+            .forDevice(did2)
+            .fromApp(APP_ID_2)
+            .withCellId(mid3)
+            .withUnit(Meter.Unit.KB_PER_SEC)
+            .withBands(Collections.singletonList(b1))
+            .build();
+
     // Meter features used during the tests
     private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
             .withMaxMeters(3L)
@@ -417,6 +435,31 @@
     }
 
     /**
+     * Test purge meter given device and application.
+     */
+    @Test
+    public void testPurgeMeterDeviceAndApp() {
+        // Init the store
+        initMeterStore();
+        // add the meters
+        ((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
+        ((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
+        ((DefaultMeter) m3).setState(MeterState.PENDING_ADD);
+        meterStore.storeMeter(m1);
+        meterStore.storeMeter(m2);
+        meterStore.storeMeter(m3);
+        assertThat(3, is(meterStore.getAllMeters().size()));
+
+        meterStore.purgeMeters(did1, APP_ID_2);
+        // Verify delete
+        MeterKey keyTwo = MeterKey.key(did1, mid2);
+        assertThat(2, is(meterStore.getAllMeters().size()));
+        assertThat(1, is(meterStore.getAllMeters(did1).size()));
+        assertThat(1, is(meterStore.getAllMeters(did2).size()));
+        assertNull(meterStore.getMeter(keyTwo));
+    }
+
+    /**
      * Test getMeters API immutability.
      */
     @Test