[SDFAB-359] Allow purging flows, groups and meters by device and application ID
Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 78aef84..2b29504 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -115,6 +115,15 @@
void purgeFlowRules(DeviceId deviceId);
/**
+ * Purges all the flow rules on the specified device from the given application id.
+ * @param deviceId device identifier
+ * @param appId application identifier
+ */
+ default void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ throw new UnsupportedOperationException("purgeFlowRules not implemented!");
+ }
+
+ /**
* Removes the specified flow rules from their respective devices. If the
* device is not presently connected to the controller, these flow will
* be removed once the device reconnects.
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
index 07493a8..1d9e60e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.net.flow;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEvent;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchOperation;
@@ -137,6 +138,15 @@
default void purgeFlowRule(DeviceId deviceId) {}
/**
+ * Removes all flow entries of given device and application ID from store.
+ *
+ * @param deviceId device id
+ * @param appId application id
+ * @return true if operation was successful, false otherwise.
+ */
+ boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId);
+
+ /**
* Removes all flow entries from store.
*/
void purgeFlowRules();
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupService.java b/core/api/src/main/java/org/onosproject/net/group/GroupService.java
index 460ccb6..bf18a2e 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupService.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupService.java
@@ -133,6 +133,16 @@
void purgeGroupEntries(DeviceId deviceId);
/**
+ * Purges all the group entries on the specified device and application.
+ *
+ * @param deviceId device identifier
+ * @param appId application identifier
+ */
+ default void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ throw new UnsupportedOperationException("purgeGroupEntries not implemented");
+ }
+
+ /**
* Purges all group entries.
*/
default void purgeGroupEntries() {}
diff --git a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
index 850b2ce..672bb4e 100644
--- a/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
+++ b/core/api/src/main/java/org/onosproject/net/group/GroupStore.java
@@ -17,6 +17,7 @@
import java.util.Collection;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
@@ -130,6 +131,14 @@
void purgeGroupEntry(DeviceId deviceId);
/**
+ * Removes all group entries of given device and given application from store.
+ *
+ * @param deviceId device id
+ * @param appId application id
+ */
+ void purgeGroupEntries(DeviceId deviceId, ApplicationId appId);
+
+ /**
* Removes all group entries from store.
*/
default void purgeGroupEntries() {}
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterService.java b/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
index cf99b5e..6339101 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterService.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.net.meter;
+import org.onosproject.core.ApplicationId;
import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
@@ -93,8 +94,18 @@
* Purges all the meters on the specified device.
* @param deviceId device identifier
*/
- default void purgeMeters(DeviceId deviceId){
+ default void purgeMeters(DeviceId deviceId) {
//Default implementation does nothing
}
+ /**
+ * Purges all the meters on the given device and for the given application.
+ *
+ * @param deviceId device identifier
+ * @param appId application identifier
+ */
+ default void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+ throw new UnsupportedOperationException("purgeMeter not implemented");
+ }
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index 2e632ed..811b418 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.net.meter;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
@@ -196,9 +197,19 @@
/**
* Removes all meters of given device from store.
+ * This API is typically used when the device is offline.
*
* @param deviceId the device id
*/
void purgeMeter(DeviceId deviceId);
+ /**
+ * Removes all meters of given device and for the given application from store.
+ * This API is typically used when the device is offline.
+ *
+ * @param deviceId the device id
+ * @param appId the application id
+ */
+ void purgeMeters(DeviceId deviceId, ApplicationId appId);
+
}
diff --git a/core/api/src/test/java/org/onosproject/net/NetTestTools.java b/core/api/src/test/java/org/onosproject/net/NetTestTools.java
index dcc3e15..42f828f 100644
--- a/core/api/src/test/java/org/onosproject/net/NetTestTools.java
+++ b/core/api/src/test/java/org/onosproject/net/NetTestTools.java
@@ -57,6 +57,7 @@
public static final ProviderId PID = new ProviderId("of", "foo");
public static final ApplicationId APP_ID = new TestApplicationId("foo");
+ public static final ApplicationId APP_ID_2 = new TestApplicationId("foo2");
public static final NodeId NODE_ID = new NodeId("node1");
// Short-hand for producing a device id from a string
diff --git a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
index 21fd045..4f53b96 100644
--- a/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/net/flow/FlowRuleServiceAdapter.java
@@ -38,7 +38,11 @@
}
@Override
- public void purgeFlowRules(DeviceId deviceId){
+ public void purgeFlowRules(DeviceId deviceId) {
+ }
+
+ @Override
+ public void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
}
@Override
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
index 31b58fb..3b667a8 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleFlowRuleStore.java
@@ -25,6 +25,7 @@
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.SettableFuture;
import org.onlab.util.Tools;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
@@ -301,6 +302,16 @@
}
@Override
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ flowEntries.get(deviceId).values()
+ .removeIf(storedFlowEntries -> {
+ storedFlowEntries.removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+ return storedFlowEntries.isEmpty();
+ });
+ return true;
+ }
+
+ @Override
public void purgeFlowRules() {
flowEntries.clear();
}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
index bf8ba59..8ece648 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleGroupStore.java
@@ -17,6 +17,7 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.group.DefaultGroup;
@@ -51,6 +52,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
@@ -482,6 +484,31 @@
}
@Override
+ public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ List<StoredGroupEntry> entryPendingRemove =
+ groupEntriesById.get(deviceId).values().stream()
+ .filter(storedGroupEntry -> storedGroupEntry.appId().equals(appId))
+ .collect(Collectors.toList());
+
+ entryPendingRemove.forEach(storedGroupEntry -> {
+ groupEntriesById.computeIfPresent(deviceId, (k, value) -> {
+ value.remove(storedGroupEntry.id());
+ if (value.isEmpty()) {
+ return null;
+ }
+ return value;
+ });
+ groupEntriesByKey.computeIfPresent(deviceId, (k, value) -> {
+ value.remove(storedGroupEntry.appCookie());
+ if (value.isEmpty()) {
+ return null;
+ }
+ return value;
+ });
+ });
+ }
+
+ @Override
public void purgeGroupEntries() {
groupEntriesById.values().forEach(groupEntries -> {
groupEntries.entrySet().forEach(entry -> {
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index dd7f802..ac97636 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -320,6 +320,15 @@
}
@Override
+ public void purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ checkPermission(FLOWRULE_WRITE);
+ checkNotNull(deviceId, DEVICE_ID_NULL);
+ checkNotNull(appId, "Application ID cannot be null!");
+
+ store.purgeFlowRules(deviceId, appId);
+ }
+
+ @Override
public void removeFlowRules(FlowRule... flowRules) {
checkPermission(FLOWRULE_WRITE);
diff --git a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
index e633a96..65ec5d3 100644
--- a/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
+++ b/core/net/src/main/java/org/onosproject/net/group/impl/GroupManager.java
@@ -305,6 +305,12 @@
}
@Override
+ public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ checkPermission(GROUP_WRITE);
+ store.purgeGroupEntries(deviceId, appId);
+ }
+
+ @Override
public void purgeGroupEntries() {
checkPermission(GROUP_WRITE);
store.purgeGroupEntries();
diff --git a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
index 03bb5ca..805009d 100644
--- a/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
+++ b/core/net/src/main/java/org/onosproject/net/meter/impl/MeterManager.java
@@ -23,6 +23,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.NetworkConfigRegistry;
@@ -331,6 +332,11 @@
store.purgeMeter(deviceId);
}
+ @Override
+ public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+ store.purgeMeters(deviceId, appId);
+ }
+
private class InternalMeterProviderService
extends AbstractProviderService<MeterProvider>
implements MeterProviderService {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
}
/**
+ * Purges the flows with the given application id.
+ *
+ * @param appId the application id
+ * @return a future to be completed once flow rules with given application
+ * id have been purged on all buckets
+ */
+ public CompletableFuture<Void> purge(ApplicationId appId) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ // If the master's term is not currently active (has not been synchronized
+ // with prior replicas), enqueue the changes to be executed once the master
+ // has been synchronized.
+ final long term = replicaInfo.term();
+ List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operations for device {}", deviceId);
+ flowBuckets.values().forEach(
+ bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+ b -> new LinkedList<>())
+ .add(() -> future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term)));
+ });
+
+ } else {
+ flowBuckets.values().forEach(bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term));
+ });
+ }
+ return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+ }
+
+ /**
* Closes the device flow table.
*/
public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TableStatisticsEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
private final Logger log = getLogger(getClass());
private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+ private static final long PURGE_TIMEOUT_MILLIS = 30000;
private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
/** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
serializer::encode, executor);
clusterCommunicator.addSubscriber(
REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+ clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+ PURGE_FLOW_RULES,
+ serializer::decode,
+ p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+ serializer::encode, executor);
}
private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
}
@Override
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ NodeId master = mastershipService.getMasterFor(deviceId);
+
+ if (Objects.equals(local, master)) {
+ // bypass and handle it locally
+ return flowTable.purgeFlowRules(deviceId, appId);
+ }
+
+ if (master == null) {
+ log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+ return false;
+ }
+
+ log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+ master, deviceId);
+
+ return Tools.futureGetOrElse(
+ clusterCommunicator.sendAndReceive(
+ Pair.of(deviceId, appId),
+ PURGE_FLOW_RULES,
+ serializer::encode,
+ serializer::decode,
+ master),
+ FLOW_RULE_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ false);
+ }
+
+ @Override
public void purgeFlowRules() {
flowTable.purgeFlowRules();
}
@@ -874,6 +911,33 @@
}
/**
+ * Purges flow rules for the given device and application id.
+ *
+ * @param deviceId the device for which to purge flow rules
+ * @param appId the application id for with to purge flow rules
+ * @return true if purge is successful, false otherwise
+ */
+ public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+ DeviceFlowTable flowTable = flowTables.get(deviceId);
+ if (flowTable != null) {
+ // flowTable.purge() returns a CompletableFuture<Void>, we want
+ // to return true when the completable future returns correctly
+ // within the timeout, otherwise return false.
+ try {
+ // Use higher timeout, purge(appId) may require more time
+ // than normal operations because it's applying the purge
+ // operation on every single flow table bucket.
+ flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException | TimeoutException ignored) {
+ }
+ }
+ return false;
+ }
+
+ /**
* Purges all flow rules from the table.
*/
public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
public static final MessageSubject FLOW_TABLE_BACKUP
= new MessageSubject("peer-flow-table-backup");
+
+ public static final MessageSubject PURGE_FLOW_RULES
+ = new MessageSubject("peer-purge-flow-rules");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
}
/**
+ * Purge the entries with the given application ID.
+ *
+ * @param appId the application ID
+ * @param term the term in which the purge occurred
+ * @param clock the logical clock
+ */
+ public void purge(ApplicationId appId, long term, LogicalClock clock) {
+ boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+ flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+ return flowEntryMap.isEmpty();
+ });
+ if (anythingRemoved) {
+ recordUpdate(term, clock.getTimestamp());
+ }
+ }
+
+
+ /**
* Clears the bucket.
*/
public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
}
@Override
+ public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+ Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+ new HashSet<>();
+
+ getGroupStoreKeyMap().entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+ .forEach(entriesPendingRemove::add);
+
+ purgeGroupEntries(entriesPendingRemove);
+ }
+
+ @Override
public void purgeGroupEntries() {
purgeGroupEntries(getGroupStoreKeyMap().entrySet());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang.math.RandomUtils;
import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.MeterQuery;
import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
@Override
public void purgeMeter(DeviceId deviceId) {
- // Purge api (typically used when the device is offline)
List<Versioned<MeterData>> metersPendingRemove = meters.stream()
.filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
.map(Map.Entry::getValue)
.collect(Collectors.toList());
- // Remove definitely the meter
metersPendingRemove.forEach(versionedMeterKey
-> purgeMeter(versionedMeterKey.value().meter()));
}
@Override
+ public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+ List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+ .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+ e.getValue().value().meter().appId().equals(appId))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+ }
+
+ @Override
public long getMaxMeters(MeterFeaturesKey key) {
// Leverage the meter features to know the max id
// Create a Meter Table key with FeaturesKey's device and global scope
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
index 0dfc2a2..60261d6 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/ECFlowRuleStoreTest.java
@@ -50,7 +50,6 @@
import org.onosproject.store.service.TestStorageService;
import org.onlab.packet.Ip4Address;
-import java.util.Iterator;
import java.util.Optional;
import org.osgi.service.component.ComponentContext;
@@ -64,6 +63,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
/**
@@ -83,6 +83,7 @@
private static final IntentTestsMocks.MockTreatment TREATMENT =
new IntentTestsMocks.MockTreatment();
DeviceId deviceId = did("device1");
+ DeviceId deviceId2 = did("device2");
FlowRule flowRule =
DefaultFlowRule.builder()
.forDevice(deviceId)
@@ -101,6 +102,25 @@
.makeTemporary(44)
.fromApp(APP_ID)
.build();
+ FlowRule flowRule2 =
+ DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(SELECTOR)
+ .withTreatment(TREATMENT)
+ .withPriority(44)
+ .makePermanent()
+ .fromApp(APP_ID_2)
+ .build();
+ FlowRule flowRule3 =
+ DefaultFlowRule.builder()
+ .forDevice(deviceId2)
+ .withSelector(SELECTOR)
+ .withTreatment(TREATMENT)
+ .withPriority(55)
+ .makePermanent()
+ .fromApp(APP_ID_2)
+ .build();
+
static class MasterOfAll extends MastershipServiceAdapter {
@Override
@@ -247,14 +267,7 @@
assertEquals("PENDING_ADD", flowEntry1.state().toString());
flowStoreImpl.addOrUpdateFlowRule(flowEntry);
- Iterable<FlowEntry> flows = flowStoreImpl.getFlowEntries(deviceId);
- int sum = 0;
- Iterator it = flows.iterator();
- while (it.hasNext()) {
- it.next();
- sum++;
- }
- assertThat(sum, is(1));
+ assertFlowsOnDevice(deviceId, 1);
FlowEntry flowEntry2 = flowStoreImpl.getFlowEntry(flowRule);
assertEquals("ADDED", flowEntry2.state().toString());
@@ -270,15 +283,7 @@
for (FlowEntry flow : flows1) {
flowStoreImpl.removeFlowRule(flow);
}
-
- Iterable<FlowEntry> flows2 = flowStoreImpl.getFlowEntries(deviceId);
- int sum = 0;
- Iterator it = flows2.iterator();
- while (it.hasNext()) {
- it.next();
- sum++;
- }
- assertThat(sum, is(0));
+ assertFlowsOnDevice(deviceId, 0);
}
/**
@@ -291,23 +296,41 @@
FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
- Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
- int sum2 = 0;
- Iterator it1 = flows1.iterator();
- while (it1.hasNext()) {
- it1.next();
- sum2++;
- }
- assertThat(sum2, is(2));
+ assertFlowsOnDevice(deviceId, 2);
flowStoreImpl.purgeFlowRule(deviceId);
- Iterable<FlowEntry> flows3 = flowStoreImpl.getFlowEntries(deviceId);
- int sum3 = 0;
- Iterator it3 = flows3.iterator();
- while (it3.hasNext()) {
- it3.next();
- sum3++;
+ assertFlowsOnDevice(deviceId, 0);
+ }
+
+ /**
+ * Tests purge flow for a device and an application.
+ */
+ @Test
+ public void testPurgeFlowAppId() {
+ FlowEntry flowEntry1 = new DefaultFlowEntry(flowRule1);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry1);
+
+ FlowEntry flowEntry2 = new DefaultFlowEntry(flowRule2);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry2);
+
+ FlowEntry flowEntry3 = new DefaultFlowEntry(flowRule3);
+ flowStoreImpl.addOrUpdateFlowRule(flowEntry3);
+
+ assertFlowsOnDevice(deviceId, 2);
+ assertFlowsOnDevice(deviceId2, 1);
+
+ flowStoreImpl.purgeFlowRules(deviceId, APP_ID_2);
+
+ assertFlowsOnDevice(deviceId, 1);
+ assertFlowsOnDevice(deviceId2, 1);
+ }
+
+ private void assertFlowsOnDevice(DeviceId deviceId, int nFlows) {
+ Iterable<FlowEntry> flows1 = flowStoreImpl.getFlowEntries(deviceId);
+ int sum1 = 0;
+ for (FlowEntry flowEntry : flows1) {
+ sum1++;
}
- assertThat(sum3, is(0));
+ assertThat(sum1, is(nFlows));
}
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
index 9e0dd9d..e3dc8a4 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/group/impl/DistributedGroupStoreTest.java
@@ -69,6 +69,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
import static org.onosproject.net.group.GroupDescription.Type.ALL;
import static org.onosproject.net.group.GroupDescription.Type.INDIRECT;
@@ -84,9 +85,13 @@
private final GroupId groupId1 = new GroupId(1);
private final GroupId groupId2 = new GroupId(2);
private final GroupId groupId3 = new GroupId(3);
+ private final GroupId groupId4 = new GroupId(4);
+ private final GroupId groupId5 = new GroupId(5);
private final GroupKey groupKey1 = new DefaultGroupKey("abc".getBytes());
private final GroupKey groupKey2 = new DefaultGroupKey("def".getBytes());
private final GroupKey groupKey3 = new DefaultGroupKey("ghi".getBytes());
+ private final GroupKey groupKey4 = new DefaultGroupKey("jkl".getBytes());
+ private final GroupKey groupKey5 = new DefaultGroupKey("mno".getBytes());
private final TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
private final TrafficTreatment treatment2 = DefaultTrafficTreatment.builder()
@@ -118,6 +123,20 @@
groupKey3,
groupId3.id(),
APP_ID);
+ private final GroupDescription groupDescription4 = new DefaultGroupDescription(
+ deviceId2,
+ INDIRECT,
+ indirectGroupBuckets,
+ groupKey4,
+ groupId4.id(),
+ APP_ID_2);
+ private final GroupDescription groupDescription5 = new DefaultGroupDescription(
+ deviceId1,
+ INDIRECT,
+ indirectGroupBuckets,
+ groupKey5,
+ groupId5.id(),
+ APP_ID_2);
private DistributedGroupStore groupStoreImpl;
private GroupStore groupStore;
@@ -310,6 +329,30 @@
}
/**
+ * Tests removing all groups on the given device from a specific application.
+ */
+ @Test
+ public void testRemoveGroupOnDeviceFromApp() throws Exception {
+ groupStore.deviceInitialAuditCompleted(deviceId1, true);
+ assertThat(groupStore.deviceInitialAuditStatus(deviceId1), is(true));
+ groupStore.deviceInitialAuditCompleted(deviceId2, true);
+ assertThat(groupStore.deviceInitialAuditStatus(deviceId2), is(true));
+
+ // Make sure the pending list starts out empty
+ assertThat(auditPendingReqQueue.size(), is(0));
+
+ groupStore.storeGroupDescription(groupDescription3);
+ groupStore.storeGroupDescription(groupDescription4);
+ groupStore.storeGroupDescription(groupDescription5);
+ assertThat(groupStore.getGroupCount(deviceId1), is(1));
+ assertThat(groupStore.getGroupCount(deviceId2), is(2));
+
+ groupStore.purgeGroupEntries(deviceId2, APP_ID_2);
+ assertThat(groupStore.getGroupCount(deviceId1), is(1));
+ assertThat(groupStore.getGroupCount(deviceId2), is(1));
+ }
+
+ /**
* Tests adding and removing a group.
*/
@Test
diff --git a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
index 4cddf54..0527755 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/meter/impl/DistributedMeterStoreTest.java
@@ -54,6 +54,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import static org.onosproject.net.NetTestTools.APP_ID;
+import static org.onosproject.net.NetTestTools.APP_ID_2;
import static org.onosproject.net.NetTestTools.did;
/**
@@ -79,6 +80,7 @@
// Meter ids used during the tests
private MeterId mid1 = MeterId.meterId(1);
private MeterId mid2 = MeterId.meterId(2);
+ private MeterId mid3 = MeterId.meterId(3);
private MeterId mid10 = MeterId.meterId(10);
// Bands used during the tests
@@ -96,6 +98,22 @@
.withBands(Collections.singletonList(b1))
.build();
+ private Meter m2 = DefaultMeter.builder()
+ .forDevice(did1)
+ .fromApp(APP_ID_2)
+ .withCellId(mid2)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
+ private Meter m3 = DefaultMeter.builder()
+ .forDevice(did2)
+ .fromApp(APP_ID_2)
+ .withCellId(mid3)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withBands(Collections.singletonList(b1))
+ .build();
+
// Meter features used during the tests
private MeterFeatures mef1 = DefaultMeterFeatures.builder().forDevice(did1)
.withMaxMeters(3L)
@@ -417,6 +435,31 @@
}
/**
+ * Test purge meter given device and application.
+ */
+ @Test
+ public void testPurgeMeterDeviceAndApp() {
+ // Init the store
+ initMeterStore();
+ // add the meters
+ ((DefaultMeter) m1).setState(MeterState.PENDING_ADD);
+ ((DefaultMeter) m2).setState(MeterState.PENDING_ADD);
+ ((DefaultMeter) m3).setState(MeterState.PENDING_ADD);
+ meterStore.storeMeter(m1);
+ meterStore.storeMeter(m2);
+ meterStore.storeMeter(m3);
+ assertThat(3, is(meterStore.getAllMeters().size()));
+
+ meterStore.purgeMeters(did1, APP_ID_2);
+ // Verify delete
+ MeterKey keyTwo = MeterKey.key(did1, mid2);
+ assertThat(2, is(meterStore.getAllMeters().size()));
+ assertThat(1, is(meterStore.getAllMeters(did1).size()));
+ assertThat(1, is(meterStore.getAllMeters(did2).size()));
+ assertNull(meterStore.getMeter(keyTwo));
+ }
+
+ /**
* Test getMeters API immutability.
*/
@Test