[SDFAB-359] Allow purging flows, groups and meters by device and application ID
Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.LogicalTimestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
}
/**
+ * Purges the flows with the given application id.
+ *
+ * @param appId the application id
+ * @return a future to be completed once flow rules with given application
+ * id have been purged on all buckets
+ */
+ public CompletableFuture<Void> purge(ApplicationId appId) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ // If the master's term is not currently active (has not been synchronized
+ // with prior replicas), enqueue the changes to be executed once the master
+ // has been synchronized.
+ final long term = replicaInfo.term();
+ List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+ if (activeTerm < term) {
+ log.debug("Enqueueing operations for device {}", deviceId);
+ flowBuckets.values().forEach(
+ bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+ b -> new LinkedList<>())
+ .add(() -> future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term)));
+ });
+
+ } else {
+ flowBuckets.values().forEach(bucket -> {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ completablePurges.add(future);
+ future.complete(apply((bkt, trm) -> {
+ bkt.purge(appId, trm, clock);
+ return null;
+ }, bucket, term));
+ });
+ }
+ return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+ }
+
+ /**
* Closes the device flow table.
*/
public void close() {