[SDFAB-359] Allow purging flows, groups and meters by device and application ID

Change-Id: I5e507d230789979ac997dbc99697fa0483363f70
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index c438885..a257cbd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -33,19 +33,21 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
 import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.store.LogicalTimestamp;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
@@ -960,6 +962,51 @@
     }
 
     /**
+     * Purges the flows with the given application id.
+     *
+     * @param appId the application id
+     * @return a future to be completed once flow rules with given application
+     * id have been purged on all buckets
+     */
+    public CompletableFuture<Void> purge(ApplicationId appId) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        // If the master's term is not currently active (has not been synchronized
+        // with prior replicas), enqueue the changes to be executed once the master
+        // has been synchronized.
+        final long term = replicaInfo.term();
+        List<CompletableFuture<Void>> completablePurges = Lists.newArrayList();
+        if (activeTerm < term) {
+            log.debug("Enqueueing operations for device {}", deviceId);
+            flowBuckets.values().forEach(
+                    bucket -> {
+                        CompletableFuture<Void> future = new CompletableFuture<>();
+                        completablePurges.add(future);
+                        flowTasks.computeIfAbsent(bucket.bucketId().bucket(),
+                                                  b -> new LinkedList<>())
+                                .add(() -> future.complete(apply((bkt, trm) -> {
+                                    bkt.purge(appId, trm, clock);
+                                    return null;
+                                    }, bucket, term)));
+                    });
+
+        } else {
+            flowBuckets.values().forEach(bucket -> {
+                CompletableFuture<Void> future = new CompletableFuture<>();
+                completablePurges.add(future);
+                future.complete(apply((bkt, trm) -> {
+                    bkt.purge(appId, trm, clock);
+                    return null;
+                    }, bucket, term));
+            });
+        }
+        return CompletableFuture.allOf(completablePurges.toArray(new CompletableFuture[0]));
+    }
+
+    /**
      * Closes the device flow table.
      */
     public void close() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
index 0a96252..e4850d8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStore.java
@@ -44,6 +44,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
@@ -63,8 +64,8 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.FlowRuleStore;
 import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.FlowRuleStoreException;
+import org.onosproject.net.flow.StoredFlowEntry;
 import org.onosproject.net.flow.TableStatisticsEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry;
 import org.onosproject.net.flow.oldbatch.FlowRuleBatchEntry.FlowRuleOperation;
@@ -110,6 +111,7 @@
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.FLOW_TABLE_BACKUP;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_DEVICE_FLOW_COUNT;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.GET_FLOW_ENTRY;
+import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.PURGE_FLOW_RULES;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOTE_APPLY_COMPLETED;
 import static org.onosproject.store.flow.impl.ECFlowRuleStoreMessageSubjects.REMOVE_FLOW_ENTRY;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -137,6 +139,7 @@
     private final Logger log = getLogger(getClass());
 
     private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
+    private static final long PURGE_TIMEOUT_MILLIS = 30000;
     private static final int GET_FLOW_ENTRIES_TIMEOUT = 30; //seconds
 
     /** Number of threads in the message handler pool. */
@@ -340,6 +343,11 @@
             serializer::encode, executor);
         clusterCommunicator.addSubscriber(
             REMOVE_FLOW_ENTRY, serializer::decode, this::removeFlowRuleInternal, serializer::encode, executor);
+        clusterCommunicator.<Pair<DeviceId, ApplicationId>, Boolean>addSubscriber(
+                PURGE_FLOW_RULES,
+                serializer::decode,
+                p -> flowTable.purgeFlowRules(p.getLeft(), p.getRight()),
+                serializer::encode, executor);
     }
 
     private void unregisterMessageHandlers() {
@@ -634,6 +642,35 @@
     }
 
     @Override
+    public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+        NodeId master = mastershipService.getMasterFor(deviceId);
+
+        if (Objects.equals(local, master)) {
+            // bypass and handle it locally
+            return flowTable.purgeFlowRules(deviceId, appId);
+        }
+
+        if (master == null) {
+            log.warn("Failed to purgeFlowRules: No master for {}", deviceId);
+            return false;
+        }
+
+        log.trace("Forwarding purgeFlowRules to {}, which is the master for device {}",
+                  master, deviceId);
+
+        return Tools.futureGetOrElse(
+                clusterCommunicator.sendAndReceive(
+                        Pair.of(deviceId, appId),
+                        PURGE_FLOW_RULES,
+                        serializer::encode,
+                        serializer::decode,
+                        master),
+                FLOW_RULE_STORE_TIMEOUT_MILLIS,
+                TimeUnit.MILLISECONDS,
+                false);
+    }
+
+    @Override
     public void purgeFlowRules() {
         flowTable.purgeFlowRules();
     }
@@ -874,6 +911,33 @@
         }
 
         /**
+         * Purges flow rules for the given device and application id.
+         *
+         * @param deviceId the device for which to purge flow rules
+         * @param appId the application id for with to purge flow rules
+         * @return true if purge is successful, false otherwise
+         */
+        public boolean purgeFlowRules(DeviceId deviceId, ApplicationId appId) {
+            DeviceFlowTable flowTable = flowTables.get(deviceId);
+            if (flowTable != null) {
+                // flowTable.purge() returns a CompletableFuture<Void>, we want
+                // to return true when the completable future returns correctly
+                // within the timeout, otherwise return false.
+                try {
+                    // Use higher timeout, purge(appId) may require more time
+                    // than normal operations because it's applying the purge
+                    // operation on every single flow table bucket.
+                    flowTable.purge(appId).get(PURGE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+                    return true;
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (ExecutionException | TimeoutException ignored) {
+                }
+            }
+            return false;
+        }
+
+        /**
          * Purges all flow rules from the table.
          */
         public void purgeFlowRules() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
index e5a7631..80f376e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/ECFlowRuleStoreMessageSubjects.java
@@ -40,4 +40,7 @@
 
     public static final MessageSubject FLOW_TABLE_BACKUP
         = new MessageSubject("peer-flow-table-backup");
+
+    public static final MessageSubject PURGE_FLOW_RULES
+            = new MessageSubject("peer-purge-flow-rules");
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
index ea1887d..020559f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/FlowBucket.java
@@ -21,6 +21,7 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Maps;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.flow.DefaultFlowEntry;
 import org.onosproject.net.flow.FlowEntry;
 import org.onosproject.net.flow.FlowId;
@@ -271,6 +272,24 @@
     }
 
     /**
+     * Purge the entries with the given application ID.
+     *
+     * @param appId the application ID
+     * @param term  the term in which the purge occurred
+     * @param clock the logical clock
+     */
+    public void purge(ApplicationId appId, long term, LogicalClock clock) {
+        boolean anythingRemoved = flowBucket.values().removeIf(flowEntryMap -> {
+            flowEntryMap.values().removeIf(storedFlowEntry -> storedFlowEntry.appId() == appId.id());
+            return flowEntryMap.isEmpty();
+        });
+        if (anythingRemoved) {
+            recordUpdate(term, clock.getTimestamp());
+        }
+    }
+
+
+    /**
      * Clears the bucket.
      */
     public void clear() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 8fd32ab..8337e84 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -23,6 +23,7 @@
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.GroupId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -1093,6 +1094,18 @@
     }
 
     @Override
+    public void purgeGroupEntries(DeviceId deviceId, ApplicationId appId) {
+        Set<Entry<GroupStoreKeyMapKey, StoredGroupEntry>> entriesPendingRemove =
+                new HashSet<>();
+
+        getGroupStoreKeyMap().entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId) && entry.getValue().appId().equals(appId))
+                .forEach(entriesPendingRemove::add);
+
+        purgeGroupEntries(entriesPendingRemove);
+    }
+
+    @Override
     public void purgeGroupEntries() {
         purgeGroupEntries(getGroupStoreKeyMap().entrySet());
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
index 38f8748..b2034dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/meter/impl/DistributedMeterStore.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.math.RandomUtils;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.MeterQuery;
 import org.onosproject.net.driver.DriverHandler;
@@ -440,17 +441,25 @@
 
     @Override
     public void purgeMeter(DeviceId deviceId) {
-        // Purge api (typically used when the device is offline)
         List<Versioned<MeterData>> metersPendingRemove = meters.stream()
                 .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toList());
-        // Remove definitely the meter
         metersPendingRemove.forEach(versionedMeterKey
                 -> purgeMeter(versionedMeterKey.value().meter()));
     }
 
     @Override
+    public void purgeMeters(DeviceId deviceId, ApplicationId appId) {
+        List<Versioned<MeterData>> metersPendingRemove = meters.stream()
+                .filter(e -> Objects.equals(e.getKey().deviceId(), deviceId) &&
+                        e.getValue().value().meter().appId().equals(appId))
+                .map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+        metersPendingRemove.forEach(versionedMeterKey -> deleteMeterNow(versionedMeterKey.value().meter()));
+    }
+
+    @Override
     public long getMaxMeters(MeterFeaturesKey key) {
         // Leverage the meter features to know the max id
         // Create a Meter Table key with FeaturesKey's device and global scope