[SDFAB-359] Allow purging flows, groups and meters by device and application ID
Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
}
/**
+ * Purges the flows with the given application id.
+ *
+ * @param appId the application id
+ * @return a future to be completed once flow rules with given application
+ * id have been purged on all buckets
+ */
+ public CompletableFuture<Void> purge(ApplicationId appId) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ // If the master's term is not currently active (has not been synchronized
+ // with prior replicas), enqueue the changes to be executed once the master
+ // has been synchronized.
+ final long term = replicaInfo.term();
+ List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operations for device {}", deviceId);
+ flowBuckets.values().forEach(
+ bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+ b -> new LinkedList<>())
+ .add(() -> future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term)));
+ });
+
+ } else {
+ flowBuckets.values().forEach(bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term));
+ });
+ }
+ return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+ }
+
+ /**
* Closes the device flow table.
*/
public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
private final Logger log = getLogger(getClass());
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ private static final long PURGE_TIMEOUT_MILLIS = 30000;
private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
/** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+ clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+ PURGE_FLOW_RULES,
+ serializer::decode,
+ p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+ serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
}
@Override
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (Objects.equals(local, master)) {
+ // bypass and handle it locally
+ return flowTable.purgeFlowRules(deviceId, appId);
+ }
+
+ if (master == null) {
+ log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+ return false;
+ }
+
+ log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+ master, deviceId);
+
+ return Tools.futureGetOrElse(
+ clusterCommunicator.sendAndReceive(
+ Pair.of(deviceId, appId),
+ PURGE_FLOW_RULES,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ false);
+ }
+
+ @Override
public void purgeFlowRules() {
flowTable.purgeFlowRules();
}
@@ -874,6 +911,33 @@
}
/**
+ * Purges flow rules for the given device and application id.
+ *
+ * @param deviceId the device for which to purge flow rules
+ * @param appId the application id for with to purge flow rules
+ * @return true if purge is successful, false otherwise
+ */
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ if (flowTable != null) {
+ // flowTable.purge() returns a CompletableFuture<Void>, we want
+ // to return true when the completable future returns correctly
+ // within the timeout, otherwise return false.
+ try {
+ // Use higher timeout, purge(appId) may require more time
+ // than normal operations because it's applying the purge
+ // operation on every single flow table bucket.
+ flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException ignored) {
+ }
+ }
+ return false;
+ }
+
+ /**
* Purges all flow rules from the table.
*/
public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
public static final MessageSubject FLOW_TABLE_BACKUP
= new MessageSubject("peer-flow-table-backup");
+
+ public static final MessageSubject PURGE_FLOW_RULES
+ = new MessageSubject("peer-purge-flow-rules");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
}
/**
+ * Purge the entries with the given application ID.
+ *
+ * @param appId the application ID
+ * @param term the term in which the purge occurred
+ * @param clock the logical clock
+ */
+ public void purge(ApplicationId appId, long term, LogicalClock clock) {
+ boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+ flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+ return flowEntryMap.isEmpty();
+ });
+ if (anythingRemoved) {
+ recordUpdate(term, clock.getTimestamp());
+ }
+ }
+
+
+ /**
* Clears the bucket.
*/
public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
}
@Override
+ public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+ new HashSet<>();
+
+ getGroupStoreKeyMap().entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+ .forEach(entriesPendingRemove::add);
+
+ purgeGroupEntries(entriesPendingRemove);
+ }
+
+ @Override
public void purgeGroupEntries() {
purgeGroupEntries(getGroupStoreKeyMap().entrySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang.math.RandomUtils;
import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
@Override
public void purgeMeter(DeviceId deviceId) {
- // Purge api (typically used when the device is offline)
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- // Remove definitely the meter
metersPendingRemove.forEach(versionedMeterKey
-> purgeMeter(versionedMeterKey.value().meter()));
}
@Override
+ public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+ List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+ .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+ e.getValue().value().meter().appId().equals(appId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ }
+
+ @Override
public long getMaxMeters(MeterFeaturesKey key) {
// Leverage the meter features to know the max id
// Create a Meter Table key with FeaturesKey's device and global scope
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 0dfc2a2..60261d6 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -50,7 +50,6 @@
import org.onosproject.store.service.TestStorageService;
import org.onlab.packet.Ip4Address;
-import java.util.Iterator;
import java.util.Optional;
import org.osgi.service.component.ComponentContext;
@@ -64,6 +63,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
/**
@@ -83,6 +83,7 @@
private static final IntentTestsMocks.MockTreatment TREATMENT =
new IntentTestsMocks.MockTreatment();
DeviceId deviceId = did("device1");
+ DeviceId deviceId2 = did("device2");
FlowRule flowRule =
DefaultFlowRule.builder()
.forDevice(deviceId)
@@ -101,6 +102,25 @@
.makeTemporary(44)
.fromApp(APP_ID)
.build();
+ FlowRule flowRule2 =
+ DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(SELECTOR)
+ .withTreatment(TREATMENT)
+ .withPriority(44)
+ .makePermanent()
+ .fromApp(APP_ID_2)
+ .build();
+ FlowRule flowRule3 =
+ DefaultFlowRule.builder()
+ .forDevice(deviceId2)
+ .withSelector(SELECTOR)
+ .withTreatment(TREATMENT)
+ .withPriority(55)
+ .makePermanent()
+ .fromApp(APP_ID_2)
+ .build();
+
static class MasterOfAll extends MastershipServiceAdapter {
@Override
@@ -247,14 +267,7 @@
assertEquals("PENDING_ADD", flowEntry1.state().toString());
flowStoreImpl.addOrUpdateFlowRule(flowEntry);
- Iterable<FlowEntry> flows = flowStoreImpl.getFlowEntries(deviceId);
- int sum = 0;
- Iterator it = flows.iterator();
- while (it.hasNext()) {
- it.next();
- sum++;
- }
- assertThat(sum, is(1));
+ assertFlowsOnDevice(deviceId, 1);
FlowEntry flowEntry2 = flowStoreImpl.getFlowEntry(flowRule);
assertEquals("ADDED", flowEntry2.state().toString());
@@ -270,15 +283,7 @@
for (FlowEntry flow : flows1) {
flowStoreImpl.removeFlowRule(flow);
}
-
- Iterable<FlowEntry> flows2 = flowStoreImpl.getFlowEntries(deviceId);
- int sum = 0;
- Iterator it = flows2.iterator();
- while (it.hasNext()) {
- it.next();
- sum++;
- }
- assertThat(sum, is(0));
+ assertFlowsOnDevice(deviceId, 0);
}
/**
@@ -291,23 +296,41 @@
FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
- Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
- int sum2 = 0;
- Iterator it1 = flows1.iterator();
- while (it1.hasNext()) {
- it1.next();
- sum2++;
- }
- assertThat(sum2, is(2));
+ assertFlowsOnDevice(deviceId, 2);
flowStoreImpl.purgeFlowRule(deviceId);
- Iterable<FlowEntry> flows3 = flowStoreImpl.getFlowEntries(deviceId);
- int sum3 = 0;
- Iterator it3 = flows3.iterator();
- while (it3.hasNext()) {
- it3.next();
- sum3++;
+ assertFlowsOnDevice(deviceId, 0);
+ }
+
+ /**
+ * Tests purge flow for a device and an application.
+ */
+ @Test
+ public void testPurgeFlowAppId() {
+ FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
+
+ FlowEntry flowEntry2 = new DefaultFlowEntry(flowRule2);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry2);
+
+ FlowEntry flowEntry3 = new DefaultFlowEntry(flowRule3);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry3);
+
+ assertFlowsOnDevice(deviceId, 2);
+ assertFlowsOnDevice(deviceId2, 1);
+
+ flowStoreImpl.purgeFlowRules(deviceId, APP_ID_2);
+
+ assertFlowsOnDevice(deviceId, 1);
+ assertFlowsOnDevice(deviceId2, 1);
+ }
+
+ private void assertFlowsOnDevice(DeviceId deviceId, int nFlows) {
+ Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
+ int sum1 = 0;
+ for (FlowEntry flowEntry : flows1) {
+ sum1++;
}
- assertThat(sum3, is(0));
+ assertThat(sum1, is(nFlows));
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 9e0dd9d..e3dc8a4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -69,6 +69,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
import static org.onosproject.net.group.GroupDescription.Type.ALL;
import static org.onosproject.net.group.GroupDescription.Type.INDIRECT;
@@ -84,9 +85,13 @@
private final GroupId groupId1 = new GroupId(1);
private final GroupId groupId2 = new GroupId(2);
private final GroupId groupId3 = new GroupId(3);
+ private final GroupId groupId4 = new GroupId(4);
+ private final GroupId groupId5 = new GroupId(5);
private final GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
private final GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
private final GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
+ private final GroupKey groupKey4 = new DefaultGroupKey("jkl".getBytes());
+ private final GroupKey groupKey5 = new DefaultGroupKey("mno".getBytes());
private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
private final TrafficTreatment treatment2 = DefaultTrafficTreatment.builder()
@@ -118,6 +123,20 @@
groupKey3,
groupId3.id(),
APP_ID);
+ private final GroupDescription groupDescription4 = new DefaultGroupDescription(
+ deviceId2,
+ INDIRECT,
+ indirectGroupBuckets,
+ groupKey4,
+ groupId4.id(),
+ APP_ID_2);
+ private final GroupDescription groupDescription5 = new DefaultGroupDescription(
+ deviceId1,
+ INDIRECT,
+ indirectGroupBuckets,
+ groupKey5,
+ groupId5.id(),
+ APP_ID_2);
private DistributedGroupStore groupStoreImpl;
private GroupStore groupStore;
@@ -310,6 +329,30 @@
}
/**
+ * Tests removing all groups on the given device from a specific application.
+ */
+ @Test
+ public void testRemoveGroupOnDeviceFromApp() throws Exception {
+ groupStore.deviceInitialAuditCompleted(deviceId1, true);
+ assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+ groupStore.deviceInitialAuditCompleted(deviceId2, true);
+ assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+ // Make sure the pending list starts out empty
+ assertThat(auditPendingReqQueue.size(), is(0));
+
+ groupStore.storeGroupDescription(groupDescription3);
+ groupStore.storeGroupDescription(groupDescription4);
+ groupStore.storeGroupDescription(groupDescription5);
+ assertThat(groupStore.getGroupCount(deviceId1), is(1));
+ assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+ groupStore.purgeGroupEntries(deviceId2, APP_ID_2);
+ assertThat(groupStore.getGroupCount(deviceId1), is(1));
+ assertThat(groupStore.getGroupCount(deviceId2), is(1));
+ }
+
+ /**
* Tests adding and removing a group.
*/
@Test
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 4cddf54..0527755 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -54,6 +54,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
/**
@@ -79,6 +80,7 @@
// Meter ids used during the tests
private MeterId mid1 = MeterId.meterId(1);
private MeterId mid2 = MeterId.meterId(2);
+ private MeterId mid3 = MeterId.meterId(3);
private MeterId mid10 = MeterId.meterId(10);
// Bands used during the tests
@@ -96,6 +98,22 @@
.withBands(Collections.singletonList(b1))
.build();
+ private Meter m2 = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID_2)
+ .withCellId(mid2)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
+ private Meter m3 = DefaultMeter.builder()
+ .forDevice(did2)
+ .fromApp(APP_ID_2)
+ .withCellId(mid3)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
// Meter features used during the tests
private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
.withMaxMeters(3L)
@@ -417,6 +435,31 @@
}
/**
+ * Test purge meter given device and application.
+ */
+ @Test
+ public void testPurgeMeterDeviceAndApp() {
+ // Init the store
+ initMeterStore();
+ // add the meters
+ ((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
+ ((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
+ ((DefaultMeter) m3).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(m1);
+ meterStore.storeMeter(m2);
+ meterStore.storeMeter(m3);
+ assertThat(3, is(meterStore.getAllMeters().size()));
+
+ meterStore.purgeMeters(did1, APP_ID_2);
+ // Verify delete
+ MeterKey keyTwo = MeterKey.key(did1, mid2);
+ assertThat(2, is(meterStore.getAllMeters().size()));
+ assertThat(1, is(meterStore.getAllMeters(did1).size()));
+ assertThat(1, is(meterStore.getAllMeters(did2).size()));
+ assertNull(meterStore.getMeter(keyTwo));
+ }
+
+ /**
* Test getMeters API immutability.
*/
@Test