[SDFAB-359] Allow purging flows, groups and meters by device and application ID

Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
     }
 
     /**
+     * Purges the flows with the given application id.
+     *
+     * @param appId the application id
+     * @return a future to be completed once flow rules with given application
+     * id have been purged on all buckets
+     */
+    public CompletableFuture<Void> purge(ApplicationId appId) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        // If the master's term is not currently active (has not been synchronized
+        // with prior replicas), enqueue the changes to be executed once the master
+        // has been synchronized.
+        final long term = replicaInfo.term();
+        List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operations for device {}", deviceId);
+            flowBuckets.values().forEach(
+                    bucket -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        completablePurges.add(future);
+                        flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+                                                  b -> new LinkedList<>())
+                                .add(() -> future.complete(apply((bkt, trm) -> {
+                                    bkt.purge(appId, trm, clock);
+                                    return null;
+                                    }, bucket, term)));
+                    });
+
+        } else {
+            flowBuckets.values().forEach(bucket -> {
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                completablePurges.add(future);
+                future.complete(apply((bkt, trm) -> {
+                    bkt.purge(appId, trm, clock);
+                    return null;
+                    }, bucket, term));
+            });
+        }
+        return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+    }
+
+    /**
      * Closes the device flow table.
      */
     public void close() {