[SDFAB-287] Remove and query only upf-related flow entries

In FabricUpfProgrammable, make sure to get and remove only flow entries generated
for the specific device ID and by the UpfProgrammable driver behaviour. Otherwise,
we might remove or get entries from different devices or insert by different driver
behaviour.
Also, add a way to check if a flow rule has been created by a UPF programmable behvaiour.

Change-Id: I7a66885154963fdba8e69f10e187560a1662ad33
(cherry picked from commit d5e3fcbf73f53470a8940630243f439edc1fa549)
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
index 3c81482..7e5791f 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
@@ -58,6 +58,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.onosproject.net.behaviour.upf.UpfProgrammableException.Type.UNSUPPORTED_OPERATION;
 import static org.onosproject.net.pi.model.PiCounterType.INDIRECT;
@@ -118,7 +119,7 @@
         fabricUpfStore = handler().get(FabricUpfStore.class);
         upfTranslator = new FabricUpfTranslator(fabricUpfStore);
         final CoreService coreService = handler().get(CoreService.class);
-        appId = coreService.getAppId(FabricPipeconfLoader.PIPELINE_APP_NAME);
+        appId = coreService.getAppId(FabricPipeconfLoader.PIPELINE_APP_NAME_UPF);
         if (appId == null) {
             log.warn("Application ID is null. Cannot initialize behaviour.");
             return false;
@@ -143,6 +144,12 @@
         return false;
     }
 
+    @Override
+    public boolean fromThisUpf(FlowRule flowRule) {
+        return flowRule.deviceId().equals(this.deviceId) &&
+                flowRule.appId() == appId.id();
+    }
+
     /**
      * Grab the capacities for the PDR and FAR tables from the pipeconf. Runs only once, on initialization.
      *
@@ -229,7 +236,13 @@
             return;
         }
         log.info("Clearing all UPF-related table entries.");
-        flowRuleService.removeFlowRulesById(appId);
+        // Getting flow entries by device ID and filtering by Application ID
+        // is more efficient than getting by Application ID and filtering for a
+        // device ID.
+        List<FlowEntry> flowEntriesToRemove = StreamSupport.stream(
+                flowRuleService.getFlowEntries(deviceId).spliterator(), false)
+                .filter(flowEntry -> flowEntry.appId() == appId.id()).collect(Collectors.toList());
+        flowRuleService.removeFlowRules(flowEntriesToRemove.toArray(new FlowRule[0]));
         fabricUpfStore.reset();
     }
 
@@ -239,7 +252,7 @@
             return;
         }
         log.info("Clearing all UPF interfaces.");
-        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule entry : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricInterface(entry)) {
                 flowRuleService.removeFlowRules(entry);
             }
@@ -254,7 +267,7 @@
         log.info("Clearing all UE sessions.");
         int pdrsCleared = 0;
         int farsCleared = 0;
-        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule entry : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricPdr(entry)) {
                 pdrsCleared++;
                 flowRuleService.removeFlowRules(entry);
@@ -454,7 +467,7 @@
          *   with correct and complete actions and parameters, but P4Runtime deletion requests
          *   will not have those.
          */
-        for (FlowEntry installedEntry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowEntry installedEntry : flowRuleService.getFlowEntries(deviceId)) {
             if (installedEntry.selector().equals(entry.selector())) {
                 log.info("Found matching entry to remove, it has FlowID {}", installedEntry.id());
                 flowRuleService.removeFlowRules(installedEntry);
@@ -474,7 +487,7 @@
             return null;
         }
         ArrayList<PacketDetectionRule> pdrs = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricPdr(flowRule)) {
                 pdrs.add(upfTranslator.fabricEntryToPdr(flowRule));
             }
@@ -488,7 +501,7 @@
             return null;
         }
         ArrayList<ForwardingActionRule> fars = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricFar(flowRule)) {
                 fars.add(upfTranslator.fabricEntryToFar(flowRule));
             }
@@ -502,7 +515,7 @@
             return null;
         }
         ArrayList<UpfInterface> ifaces = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricInterface(flowRule)) {
                 ifaces.add(upfTranslator.fabricEntryToInterface(flowRule));
             }