[SDFAB-359] Allow purging flows, groups and meters by device and application ID

Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
     }
 
     /**
+     * Purges the flows with the given application id.
+     *
+     * @param appId the application id
+     * @return a future to be completed once flow rules with given application
+     * id have been purged on all buckets
+     */
+    public CompletableFuture<Void> purge(ApplicationId appId) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        // If the master's term is not currently active (has not been synchronized
+        // with prior replicas), enqueue the changes to be executed once the master
+        // has been synchronized.
+        final long term = replicaInfo.term();
+        List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operations for device {}", deviceId);
+            flowBuckets.values().forEach(
+                    bucket -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        completablePurges.add(future);
+                        flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+                                                  b -> new LinkedList<>())
+                                .add(() -> future.complete(apply((bkt, trm) -> {
+                                    bkt.purge(appId, trm, clock);
+                                    return null;
+                                    }, bucket, term)));
+                    });
+
+        } else {
+            flowBuckets.values().forEach(bucket -> {
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                completablePurges.add(future);
+                future.complete(apply((bkt, trm) -> {
+                    bkt.purge(appId, trm, clock);
+                    return null;
+                    }, bucket, term));
+            });
+        }
+        return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+    }
+
+    /**
      * Closes the device flow table.
      */
     public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
     private final Logger log = getLogger(getClass());
 
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    private static final long PURGE_TIMEOUT_MILLIS = 30000;
     private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
 
     /** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
             serializer::encode, executor);
         clusterCommunicator.addSubscriber(
             REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+        clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+                PURGE_FLOW_RULES,
+                serializer::decode,
+                p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+                serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
     }
 
     @Override
+    public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+
+        if (Objects.equals(local, master)) {
+            // bypass and handle it locally
+            return flowTable.purgeFlowRules(deviceId, appId);
+        }
+
+        if (master == null) {
+            log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+            return false;
+        }
+
+        log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+                  master, deviceId);
+
+        return Tools.futureGetOrElse(
+                clusterCommunicator.sendAndReceive(
+                        Pair.of(deviceId, appId),
+                        PURGE_FLOW_RULES,
+                        serializer::encode,
+                        serializer::decode,
+                        master),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                false);
+    }
+
+    @Override
     public void purgeFlowRules() {
         flowTable.purgeFlowRules();
     }
@@ -874,6 +911,33 @@
         }
 
         /**
+         * Purges flow rules for the given device and application id.
+         *
+         * @param deviceId the device for which to purge flow rules
+         * @param appId the application id for with to purge flow rules
+         * @return true if purge is successful, false otherwise
+         */
+        public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+            DeviceFlowTable flowTable = flowTables.get(deviceId);
+            if (flowTable != null) {
+                // flowTable.purge() returns a CompletableFuture<Void>, we want
+                // to return true when the completable future returns correctly
+                // within the timeout, otherwise return false.
+                try {
+                    // Use higher timeout, purge(appId) may require more time
+                    // than normal operations because it's applying the purge
+                    // operation on every single flow table bucket.
+                    flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    return true;
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException | TimeoutException ignored) {
+                }
+            }
+            return false;
+        }
+
+        /**
          * Purges all flow rules from the table.
          */
         public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
 
     public static final MessageSubject FLOW_TABLE_BACKUP
         = new MessageSubject("peer-flow-table-backup");
+
+    public static final MessageSubject PURGE_FLOW_RULES
+            = new MessageSubject("peer-purge-flow-rules");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
     }
 
     /**
+     * Purge the entries with the given application ID.
+     *
+     * @param appId the application ID
+     * @param term  the term in which the purge occurred
+     * @param clock the logical clock
+     */
+    public void purge(ApplicationId appId, long term, LogicalClock clock) {
+        boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+            flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+            return flowEntryMap.isEmpty();
+        });
+        if (anythingRemoved) {
+            recordUpdate(term, clock.getTimestamp());
+        }
+    }
+
+
+    /**
      * Clears the bucket.
      */
     public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.GroupId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
     }
 
     @Override
+    public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+                new HashSet<>();
+
+        getGroupStoreKeyMap().entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+                .forEach(entriesPendingRemove::add);
+
+        purgeGroupEntries(entriesPendingRemove);
+    }
+
+    @Override
     public void purgeGroupEntries() {
         purgeGroupEntries(getGroupStoreKeyMap().entrySet());
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.math.RandomUtils;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.MeterQuery;
 import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
 
     @Override
     public void purgeMeter(DeviceId deviceId) {
-        // Purge api (typically used when the device is offline)
         List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toList());
-        // Remove definitely the meter
         metersPendingRemove.forEach(versionedMeterKey
                 -> purgeMeter(versionedMeterKey.value().meter()));
     }
 
     @Override
+    public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+                        e.getValue().value().meter().appId().equals(appId))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+        metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+    }
+
+    @Override
     public long getMaxMeters(MeterFeaturesKey key) {
         // Leverage the meter features to know the max id
         // Create a Meter Table key with FeaturesKey's device and global scope
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 0dfc2a2..60261d6 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -50,7 +50,6 @@
 import org.onosproject.store.service.TestStorageService;
 
 import org.onlab.packet.Ip4Address;
-import java.util.Iterator;
 import java.util.Optional;
 
 import org.osgi.service.component.ComponentContext;
@@ -64,6 +63,7 @@
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 
 /**
@@ -83,6 +83,7 @@
     private static final IntentTestsMocks.MockTreatment TREATMENT =
             new IntentTestsMocks.MockTreatment();
     DeviceId deviceId = did("device1");
+    DeviceId deviceId2 = did("device2");
     FlowRule flowRule =
             DefaultFlowRule.builder()
                     .forDevice(deviceId)
@@ -101,6 +102,25 @@
                     .makeTemporary(44)
                     .fromApp(APP_ID)
                     .build();
+    FlowRule flowRule2 =
+            DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(SELECTOR)
+                    .withTreatment(TREATMENT)
+                    .withPriority(44)
+                    .makePermanent()
+                    .fromApp(APP_ID_2)
+                    .build();
+    FlowRule flowRule3 =
+            DefaultFlowRule.builder()
+                    .forDevice(deviceId2)
+                    .withSelector(SELECTOR)
+                    .withTreatment(TREATMENT)
+                    .withPriority(55)
+                    .makePermanent()
+                    .fromApp(APP_ID_2)
+                    .build();
+
 
     static class MasterOfAll extends MastershipServiceAdapter {
         @Override
@@ -247,14 +267,7 @@
         assertEquals("PENDING_ADD", flowEntry1.state().toString());
 
         flowStoreImpl.addOrUpdateFlowRule(flowEntry);
-        Iterable<FlowEntry> flows = flowStoreImpl.getFlowEntries(deviceId);
-        int sum = 0;
-        Iterator it = flows.iterator();
-        while (it.hasNext()) {
-            it.next();
-            sum++;
-        }
-        assertThat(sum, is(1));
+        assertFlowsOnDevice(deviceId, 1);
 
         FlowEntry flowEntry2 = flowStoreImpl.getFlowEntry(flowRule);
         assertEquals("ADDED", flowEntry2.state().toString());
@@ -270,15 +283,7 @@
         for (FlowEntry flow : flows1) {
             flowStoreImpl.removeFlowRule(flow);
         }
-
-        Iterable<FlowEntry> flows2 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum = 0;
-        Iterator it = flows2.iterator();
-        while (it.hasNext()) {
-            it.next();
-            sum++;
-        }
-        assertThat(sum, is(0));
+        assertFlowsOnDevice(deviceId, 0);
     }
 
     /**
@@ -291,23 +296,41 @@
 
         FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
         flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
-        Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum2 = 0;
-        Iterator it1 = flows1.iterator();
-        while (it1.hasNext()) {
-            it1.next();
-            sum2++;
-        }
-        assertThat(sum2, is(2));
+        assertFlowsOnDevice(deviceId, 2);
         flowStoreImpl.purgeFlowRule(deviceId);
 
-        Iterable<FlowEntry> flows3 = flowStoreImpl.getFlowEntries(deviceId);
-        int sum3 = 0;
-        Iterator it3 = flows3.iterator();
-        while (it3.hasNext()) {
-            it3.next();
-            sum3++;
+        assertFlowsOnDevice(deviceId, 0);
+    }
+
+    /**
+     * Tests purge flow for a device and an application.
+     */
+    @Test
+    public void testPurgeFlowAppId() {
+        FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
+
+        FlowEntry flowEntry2 = new DefaultFlowEntry(flowRule2);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry2);
+
+        FlowEntry flowEntry3 = new DefaultFlowEntry(flowRule3);
+        flowStoreImpl.addOrUpdateFlowRule(flowEntry3);
+
+        assertFlowsOnDevice(deviceId, 2);
+        assertFlowsOnDevice(deviceId2, 1);
+
+        flowStoreImpl.purgeFlowRules(deviceId, APP_ID_2);
+
+        assertFlowsOnDevice(deviceId, 1);
+        assertFlowsOnDevice(deviceId2, 1);
+    }
+
+    private void assertFlowsOnDevice(DeviceId deviceId, int nFlows) {
+        Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
+        int sum1 = 0;
+        for (FlowEntry flowEntry : flows1) {
+            sum1++;
         }
-        assertThat(sum3, is(0));
+        assertThat(sum1, is(nFlows));
     }
 }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 9e0dd9d..e3dc8a4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -69,6 +69,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 import static org.onosproject.net.group.GroupDescription.Type.ALL;
 import static org.onosproject.net.group.GroupDescription.Type.INDIRECT;
@@ -84,9 +85,13 @@
     private final GroupId groupId1 = new GroupId(1);
     private final GroupId groupId2 = new GroupId(2);
     private final GroupId groupId3 = new GroupId(3);
+    private final GroupId groupId4 = new GroupId(4);
+    private final GroupId groupId5 = new GroupId(5);
     private final GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
     private final GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
     private final GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
+    private final GroupKey groupKey4 = new DefaultGroupKey("jkl".getBytes());
+    private final GroupKey groupKey5 = new DefaultGroupKey("mno".getBytes());
 
     private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
     private final TrafficTreatment treatment2 = DefaultTrafficTreatment.builder()
@@ -118,6 +123,20 @@
             groupKey3,
             groupId3.id(),
             APP_ID);
+    private final GroupDescription groupDescription4 = new DefaultGroupDescription(
+            deviceId2,
+            INDIRECT,
+            indirectGroupBuckets,
+            groupKey4,
+            groupId4.id(),
+            APP_ID_2);
+    private final GroupDescription groupDescription5 = new DefaultGroupDescription(
+            deviceId1,
+            INDIRECT,
+            indirectGroupBuckets,
+            groupKey5,
+            groupId5.id(),
+            APP_ID_2);
 
     private DistributedGroupStore groupStoreImpl;
     private GroupStore groupStore;
@@ -310,6 +329,30 @@
     }
 
     /**
+     * Tests removing all groups on the given device from a specific application.
+     */
+    @Test
+    public void testRemoveGroupOnDeviceFromApp() throws Exception {
+        groupStore.deviceInitialAuditCompleted(deviceId1, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+        groupStore.deviceInitialAuditCompleted(deviceId2, true);
+        assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+        // Make sure the pending list starts out empty
+        assertThat(auditPendingReqQueue.size(), is(0));
+
+        groupStore.storeGroupDescription(groupDescription3);
+        groupStore.storeGroupDescription(groupDescription4);
+        groupStore.storeGroupDescription(groupDescription5);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+        groupStore.purgeGroupEntries(deviceId2, APP_ID_2);
+        assertThat(groupStore.getGroupCount(deviceId1), is(1));
+        assertThat(groupStore.getGroupCount(deviceId2), is(1));
+    }
+
+    /**
      * Tests adding and removing a group.
      */
     @Test
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 4cddf54..0527755 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -54,6 +54,7 @@
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.*;
 import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
 import static org.onosproject.net.NetTestTools.did;
 
 /**
@@ -79,6 +80,7 @@
     // Meter ids used during the tests
     private MeterId mid1 = MeterId.meterId(1);
     private MeterId mid2 = MeterId.meterId(2);
+    private MeterId mid3 = MeterId.meterId(3);
     private MeterId mid10 = MeterId.meterId(10);
 
     // Bands used during the tests
@@ -96,6 +98,22 @@
             .withBands(Collections.singletonList(b1))
             .build();
 
+    private Meter m2 = DefaultMeter.builder()
+            .forDevice(did1)
+            .fromApp(APP_ID_2)
+            .withCellId(mid2)
+            .withUnit(Meter.Unit.KB_PER_SEC)
+            .withBands(Collections.singletonList(b1))
+            .build();
+
+    private Meter m3 = DefaultMeter.builder()
+            .forDevice(did2)
+            .fromApp(APP_ID_2)
+            .withCellId(mid3)
+            .withUnit(Meter.Unit.KB_PER_SEC)
+            .withBands(Collections.singletonList(b1))
+            .build();
+
     // Meter features used during the tests
     private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
             .withMaxMeters(3L)
@@ -417,6 +435,31 @@
     }
 
     /**
+     * Test purge meter given device and application.
+     */
+    @Test
+    public void testPurgeMeterDeviceAndApp() {
+        // Init the store
+        initMeterStore();
+        // add the meters
+        ((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
+        ((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
+        ((DefaultMeter) m3).setState(MeterState.PENDING_ADD);
+        meterStore.storeMeter(m1);
+        meterStore.storeMeter(m2);
+        meterStore.storeMeter(m3);
+        assertThat(3, is(meterStore.getAllMeters().size()));
+
+        meterStore.purgeMeters(did1, APP_ID_2);
+        // Verify delete
+        MeterKey keyTwo = MeterKey.key(did1, mid2);
+        assertThat(2, is(meterStore.getAllMeters().size()));
+        assertThat(1, is(meterStore.getAllMeters(did1).size()));
+        assertThat(1, is(meterStore.getAllMeters(did2).size()));
+        assertNull(meterStore.getMeter(keyTwo));
+    }
+
+    /**
      * Test getMeters API immutability.
      */
     @Test