[SDFAB-287] Remove and query only upf-related flow entries

In FabricUpfProgrammable, make sure to get and remove only flow entries generated
for the specific device ID and by the UpfProgrammable driver behaviour. Otherwise,
we might remove or get entries from different devices or insert by different driver
behaviour.
Also, add a way to check if a flow rule has been created by a UPF programmable behvaiour.

Change-Id: I7a66885154963fdba8e69f10e187560a1662ad33
(cherry picked from commit d5e3fcbf73f53470a8940630243f439edc1fa549)
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/upf/UpfProgrammable.java b/core/api/src/main/java/org/onosproject/net/behaviour/upf/UpfProgrammable.java
index 5c98f09..159adb0 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/upf/UpfProgrammable.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/upf/UpfProgrammable.java
@@ -18,6 +18,7 @@
 
 import com.google.common.annotations.Beta;
 import org.onosproject.net.driver.HandlerBehaviour;
+import org.onosproject.net.flow.FlowRule;
 
 /**
  * Provides means to update the device forwarding state to implement a 3GPP
@@ -34,4 +35,13 @@
      * @return True if initialized, false otherwise.
      */
     boolean init();
+
+
+    /**
+     * Checks if the given flow rule has been generated by this UPF behaviour.
+     *
+     * @param flowRule the flow rule to check
+     * @return True if the given flow rule has been created by this UPF behaviour, False otherwise.
+     */
+    boolean fromThisUpf(FlowRule flowRule);
 }
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfLoader.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfLoader.java
index 1b6c2ce..97dace9 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfLoader.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfLoader.java
@@ -56,6 +56,7 @@
 public final class FabricPipeconfLoader {
 
     public static final String PIPELINE_APP_NAME = "org.onosproject.pipelines.fabric";
+    public static final String PIPELINE_APP_NAME_UPF = "org.onosproject.pipelines.fabric.upf";
 
     private static Logger log = getLogger(FabricPipeconfLoader.class);
 
@@ -85,6 +86,7 @@
     @Activate
     public void activate() {
         coreService.registerApplication(PIPELINE_APP_NAME);
+        coreService.registerApplication(PIPELINE_APP_NAME_UPF);
         // Registers all pipeconf at component activation.
         pipeconfs = buildAllPipeconfs();
         pipeconfs.forEach(piPipeconfService::register);
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
index 3c81482..7e5791f 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
@@ -58,6 +58,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.onosproject.net.behaviour.upf.UpfProgrammableException.Type.UNSUPPORTED_OPERATION;
 import static org.onosproject.net.pi.model.PiCounterType.INDIRECT;
@@ -118,7 +119,7 @@
         fabricUpfStore = handler().get(FabricUpfStore.class);
         upfTranslator = new FabricUpfTranslator(fabricUpfStore);
         final CoreService coreService = handler().get(CoreService.class);
-        appId = coreService.getAppId(FabricPipeconfLoader.PIPELINE_APP_NAME);
+        appId = coreService.getAppId(FabricPipeconfLoader.PIPELINE_APP_NAME_UPF);
         if (appId == null) {
             log.warn("Application ID is null. Cannot initialize behaviour.");
             return false;
@@ -143,6 +144,12 @@
         return false;
     }
 
+    @Override
+    public boolean fromThisUpf(FlowRule flowRule) {
+        return flowRule.deviceId().equals(this.deviceId) &&
+                flowRule.appId() == appId.id();
+    }
+
     /**
      * Grab the capacities for the PDR and FAR tables from the pipeconf. Runs only once, on initialization.
      *
@@ -229,7 +236,13 @@
             return;
         }
         log.info("Clearing all UPF-related table entries.");
-        flowRuleService.removeFlowRulesById(appId);
+        // Getting flow entries by device ID and filtering by Application ID
+        // is more efficient than getting by Application ID and filtering for a
+        // device ID.
+        List<FlowEntry> flowEntriesToRemove = StreamSupport.stream(
+                flowRuleService.getFlowEntries(deviceId).spliterator(), false)
+                .filter(flowEntry -> flowEntry.appId() == appId.id()).collect(Collectors.toList());
+        flowRuleService.removeFlowRules(flowEntriesToRemove.toArray(new FlowRule[0]));
         fabricUpfStore.reset();
     }
 
@@ -239,7 +252,7 @@
             return;
         }
         log.info("Clearing all UPF interfaces.");
-        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule entry : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricInterface(entry)) {
                 flowRuleService.removeFlowRules(entry);
             }
@@ -254,7 +267,7 @@
         log.info("Clearing all UE sessions.");
         int pdrsCleared = 0;
         int farsCleared = 0;
-        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule entry : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricPdr(entry)) {
                 pdrsCleared++;
                 flowRuleService.removeFlowRules(entry);
@@ -454,7 +467,7 @@
          *   with correct and complete actions and parameters, but P4Runtime deletion requests
          *   will not have those.
          */
-        for (FlowEntry installedEntry : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowEntry installedEntry : flowRuleService.getFlowEntries(deviceId)) {
             if (installedEntry.selector().equals(entry.selector())) {
                 log.info("Found matching entry to remove, it has FlowID {}", installedEntry.id());
                 flowRuleService.removeFlowRules(installedEntry);
@@ -474,7 +487,7 @@
             return null;
         }
         ArrayList<PacketDetectionRule> pdrs = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricPdr(flowRule)) {
                 pdrs.add(upfTranslator.fabricEntryToPdr(flowRule));
             }
@@ -488,7 +501,7 @@
             return null;
         }
         ArrayList<ForwardingActionRule> fars = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricFar(flowRule)) {
                 fars.add(upfTranslator.fabricEntryToFar(flowRule));
             }
@@ -502,7 +515,7 @@
             return null;
         }
         ArrayList<UpfInterface> ifaces = new ArrayList<>();
-        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+        for (FlowRule flowRule : flowRuleService.getFlowEntries(deviceId)) {
             if (upfTranslator.isFabricInterface(flowRule)) {
                 ifaces.add(upfTranslator.fabricEntryToInterface(flowRule));
             }
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
index d270371..c8cce84 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
@@ -410,32 +410,4 @@
                 .withPriority(priority)
                 .build();
     }
-
-//    public FlowRule buildGtpuWithPscEncapRule(DeviceId deviceId, ApplicationId appId, int qfi) {
-//        PiAction action = PiAction.builder()
-//                .withId(FABRIC_EGRESS_SPGW_GTPU_WITH_PSC)
-//                .withParameter(new PiActionParam(QFI, qfi))
-//                .build();
-//        // Default entry, no selector.
-//        return DefaultFlowRule.builder()
-//                .forDevice(deviceId).fromApp(appId).makePermanent()
-//                .forTable(FABRIC_EGRESS_SPGW_GTPU_ENCAP)
-//                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
-//                .withPriority(0)
-//                .build();
-//    }
-//
-//    public FlowRule buildGtpuOnlyEncapRule(DeviceId deviceId, ApplicationId appId) {
-//        PiAction action = PiAction.builder()
-//                .withId(FABRIC_EGRESS_SPGW_GTPU_ONLY)
-//                .build();
-//        // Default entry, no selector.
-//        return DefaultFlowRule.builder()
-//                .forDevice(deviceId).fromApp(appId).makePermanent()
-//                .forTable(FABRIC_EGRESS_SPGW_GTPU_ENCAP)
-//                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
-//                .withPriority(0)
-//                .build();
-//    }
-
 }