[SDFAB-189] UpfProgrammable implementation for fabric v1model

Change-Id: I4ea7980830d761a0da8a78943c08229c2da9410d
(cherry picked from commit 8d630f1091c63ff6e7b4ea31669344c5274773cc)
diff --git a/pipelines/fabric/impl/BUILD b/pipelines/fabric/impl/BUILD
index e5c214b..17748ec 100644
--- a/pipelines/fabric/impl/BUILD
+++ b/pipelines/fabric/impl/BUILD
@@ -8,7 +8,12 @@
     "//drivers/p4runtime:onos-drivers-p4runtime",
 ]
 
+TEST_DEPS = TEST_ADAPTERS + JACKSON + [
+    "//protocols/p4runtime/api:onos-protocols-p4runtime-api",
+    "@io_grpc_grpc_java//api",
+]
+
 osgi_jar_with_tests(
-    test_deps = TEST_ADAPTERS,
+    test_deps = TEST_DEPS,
     deps = COMPILE_DEPS,
 )
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfManager.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfManager.java
index d8af395..7d1ee4e 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfManager.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/FabricPipeconfManager.java
@@ -19,6 +19,8 @@
 import org.onosproject.net.behaviour.inbandtelemetry.IntProgrammable;
 
 import org.onosproject.net.behaviour.BngProgrammable;
+import org.onosproject.net.behaviour.upf.UpfProgrammable;
+import org.onosproject.pipelines.fabric.impl.behaviour.upf.FabricUpfProgrammable;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.pi.model.DefaultPiPipeconf;
 import org.onosproject.net.pi.model.PiPipeconf;
@@ -51,6 +53,7 @@
     private static final String INT_PROFILE_SUFFIX = "-int";
     private static final String FULL_PROFILE_SUFFIX = "-full";
     private static final String BNG_PROFILE_SUFFIX = "-bng";
+    private static final String UPF_PROFILE_SUFFIX = "-spgw";
 
     private static Logger log = getLogger(FabricPipeconfLoader.class);
 
@@ -98,6 +101,11 @@
         if (profileName.endsWith(BNG_PROFILE_SUFFIX)) {
             pipeconfBuilder.addBehaviour(BngProgrammable.class, FabricBngProgrammable.class);
         }
+        // Add UpfProgrammable behavior for UPF-enabled pipelines.
+        if (profileName.contains(UPF_PROFILE_SUFFIX) ||
+                profileName.endsWith(FULL_PROFILE_SUFFIX)) {
+            pipeconfBuilder.addBehaviour(UpfProgrammable.class, FabricUpfProgrammable.class);
+        }
         return pipeconfBuilder.build();
     }
 
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/FabricCapabilities.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/FabricCapabilities.java
index 2eb947d..fbb709a 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/FabricCapabilities.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/FabricCapabilities.java
@@ -28,6 +28,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.net.pi.model.PiPipeconf.ExtensionType.CPU_PORT_TXT;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -89,6 +90,17 @@
     }
 
     /**
+     * Returns true if the pipeconf supports UPF capabilities, false otherwise.
+     *
+     * @return boolean
+     */
+    public boolean supportUpf() {
+        return pipeconf.pipelineModel()
+                .table(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS)
+                .isPresent();
+    }
+
+    /**
      * Returns true if the pipeconf supports BNG user plane capabilities, false
      * otherwise.
      *
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
new file mode 100644
index 0000000..1f40f3b
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.Maps;
+import org.onlab.packet.Ip4Address;
+import org.onlab.util.ImmutableByteSequence;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Distributed implementation of FabricUpfStore.
+ */
+// FIXME: this store is generic and not tied to a single device, should we have a store based on deviceId?
+@Component(immediate = true, service = FabricUpfStore.class)
+public final class DistributedFabricUpfStore implements FabricUpfStore {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    protected static final String FAR_ID_MAP_NAME = "fabric-upf-far-id";
+    protected static final String BUFFER_FAR_ID_SET_NAME = "fabric-upf-buffer-far-id";
+    protected static final String FAR_ID_UE_MAP_NAME = "fabric-upf-far-id-ue";
+    protected static final KryoNamespace.Builder SERIALIZER = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(UpfRuleIdentifier.class);
+
+    // TODO: check queue IDs for BMv2, is priority inverted?
+    // Mapping between scheduling priority ranges with BMv2 priority queues
+    private static final BiMap<Integer, Integer> SCHEDULING_PRIORITY_MAP
+            = new ImmutableBiMap.Builder<Integer, Integer>()
+            // Highest scheduling priority for 3GPP is 1 and highest BMv2 queue priority is 7
+            .put(1, 5)
+            .put(6, 4)
+            .put(7, 3)
+            .put(8, 2)
+            .put(9, 1)
+            .build();
+
+    // Distributed local FAR ID to global FAR ID mapping
+    protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap;
+    private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener;
+    // Local, reversed copy of farIdMapper for better reverse lookup performance
+    protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap;
+    private int nextGlobalFarId = 1;
+
+    protected DistributedSet<UpfRuleIdentifier> bufferFarIds;
+    protected ConsistentMap<UpfRuleIdentifier, Set<Ip4Address>> farIdToUeAddrs;
+
+    @Activate
+    protected void activate() {
+        // Allow unit test to inject farIdMap here.
+        if (storageService != null) {
+            this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder()
+                    .withName(FAR_ID_MAP_NAME)
+                    .withRelaxedReadConsistency()
+                    .withSerializer(Serializer.using(SERIALIZER.build()))
+                    .build();
+            this.bufferFarIds = storageService.<UpfRuleIdentifier>setBuilder()
+                    .withName(BUFFER_FAR_ID_SET_NAME)
+                    .withRelaxedReadConsistency()
+                    .withSerializer(Serializer.using(SERIALIZER.build()))
+                    .build().asDistributedSet();
+            this.farIdToUeAddrs = storageService.<UpfRuleIdentifier, Set<Ip4Address>>consistentMapBuilder()
+                    .withName(FAR_ID_UE_MAP_NAME)
+                    .withRelaxedReadConsistency()
+                    .withSerializer(Serializer.using(SERIALIZER.build()))
+                    .build();
+
+        }
+        farIdMapListener = new FarIdMapListener();
+        farIdMap.addListener(farIdMapListener);
+
+        reverseFarIdMap = Maps.newHashMap();
+        farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey()));
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        farIdMap.removeListener(farIdMapListener);
+        farIdMap.destroy();
+        reverseFarIdMap.clear();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public void reset() {
+        farIdMap.clear();
+        reverseFarIdMap.clear();
+        bufferFarIds.clear();
+        farIdToUeAddrs.clear();
+        nextGlobalFarId = 0;
+    }
+
+    @Override
+    public Map<UpfRuleIdentifier, Integer> getFarIdMap() {
+        return Map.copyOf(farIdMap.asJavaMap());
+    }
+
+    @Override
+    public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
+        int globalFarId = farIdMap.compute(farIdPair,
+                (k, existingId) -> {
+                    return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++);
+                }).value();
+        log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
+        return globalFarId;
+    }
+
+    @Override
+    public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
+        UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
+        return globalFarIdOf(farId);
+
+    }
+
+    @Override
+    public String queueIdOf(int schedulingPriority) {
+        return (SCHEDULING_PRIORITY_MAP.get(schedulingPriority)).toString();
+    }
+
+    @Override
+    public String schedulingPriorityOf(int queueId) {
+        return (SCHEDULING_PRIORITY_MAP.inverse().get(queueId)).toString();
+    }
+
+    @Override
+    public UpfRuleIdentifier localFarIdOf(int globalFarId) {
+        return reverseFarIdMap.get(globalFarId);
+    }
+
+    public void learnFarIdToUeAddrs(PacketDetectionRule pdr) {
+        UpfRuleIdentifier ruleId = UpfRuleIdentifier.of(pdr.sessionId(), pdr.farId());
+        farIdToUeAddrs.compute(ruleId, (k, set) -> {
+            if (set == null) {
+                set = new HashSet<>();
+            }
+            set.add(pdr.ueAddress());
+            return set;
+        });
+    }
+
+    @Override
+    public boolean isFarIdBuffering(UpfRuleIdentifier farId) {
+        checkNotNull(farId);
+        return bufferFarIds.contains(farId);
+    }
+
+    @Override
+    public void learBufferingFarId(UpfRuleIdentifier farId) {
+        checkNotNull(farId);
+        bufferFarIds.add(farId);
+    }
+
+    @Override
+    public void forgetBufferingFarId(UpfRuleIdentifier farId) {
+        checkNotNull(farId);
+        bufferFarIds.remove(farId);
+    }
+
+    @Override
+    public void forgetUeAddr(Ip4Address ueAddr) {
+        farIdToUeAddrs.keySet().forEach(
+                farId -> farIdToUeAddrs.computeIfPresent(farId, (farIdz, ueAddrs) -> {
+                    ueAddrs.remove(ueAddr);
+                    return ueAddrs;
+                }));
+    }
+
+    @Override
+    public Set<Ip4Address> ueAddrsOfFarId(UpfRuleIdentifier farId) {
+        return farIdToUeAddrs.getOrDefault(farId, Set.of()).value();
+    }
+
+    @Override
+    public Set<UpfRuleIdentifier> getBufferFarIds() {
+        return Set.copyOf(bufferFarIds);
+    }
+
+    @Override
+    public Map<UpfRuleIdentifier, Set<Ip4Address>> getFarIdToUeAddrs() {
+        return Map.copyOf(farIdToUeAddrs.asJavaMap());
+    }
+
+    // NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that
+    //       reverseFarIdMap update always finishes right after farIdMap is updated
+    private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> {
+        @Override
+        public void event(MapEvent<UpfRuleIdentifier, Integer> event) {
+            switch (event.type()) {
+                case INSERT:
+                    reverseFarIdMap.put(event.newValue().value(), event.key());
+                    break;
+                case UPDATE:
+                    reverseFarIdMap.remove(event.oldValue().value());
+                    reverseFarIdMap.put(event.newValue().value(), event.key());
+                    break;
+                case REMOVE:
+                    reverseFarIdMap.remove(event.oldValue().value());
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
new file mode 100644
index 0000000..77ad732
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
@@ -0,0 +1,668 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.drivers.p4runtime.AbstractP4RuntimeHandlerBehaviour;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.upf.ForwardingActionRule;
+import org.onosproject.net.behaviour.upf.GtpTunnel;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.net.behaviour.upf.PdrStats;
+import org.onosproject.net.behaviour.upf.UpfInterface;
+import org.onosproject.net.behaviour.upf.UpfProgrammable;
+import org.onosproject.net.behaviour.upf.UpfProgrammableException;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.packet.DefaultOutboundPacket;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.pi.model.PiCounterId;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.model.PiTableModel;
+import org.onosproject.net.pi.runtime.PiCounterCell;
+import org.onosproject.net.pi.runtime.PiCounterCellHandle;
+import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.pipelines.fabric.impl.FabricPipeconfLoader;
+import org.onosproject.pipelines.fabric.impl.behaviour.FabricCapabilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onosproject.net.behaviour.upf.UpfProgrammableException.Type.UNSUPPORTED_OPERATION;
+import static org.onosproject.net.pi.model.PiCounterType.INDIRECT;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_EGRESS_SPGW_PDR_COUNTER;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_FARS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_INTERFACES;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_PDR_COUNTER;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_FAR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_GTPU_IS_VALID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_IPV4_DST_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TEID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TUNNEL_IPV4_DST;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_UE_ADDR;
+
+
+/**
+ * Implementation of a UPF programmable device behavior.
+ */
+public class FabricUpfProgrammable extends AbstractP4RuntimeHandlerBehaviour
+        implements UpfProgrammable {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final int DEFAULT_PRIORITY = 128;
+    private static final long DEFAULT_P4_DEVICE_ID = 1;
+
+    protected FlowRuleService flowRuleService;
+    protected PacketService packetService;
+    protected FabricUpfStore fabricUpfStore;
+    protected FabricUpfTranslator upfTranslator;
+
+    private long farTableSize;
+    private long encappedPdrTableSize;
+    private long unencappedPdrTableSize;
+    private long pdrCounterSize;
+
+    private ApplicationId appId;
+
+    // FIXME: remove, buffer drain should be triggered by Up4Service
+    private BufferDrainer bufferDrainer;
+
+    // FIXME: dbuf tunnel should be managed by Up4Service
+    //  Up4Service should be responsible of setting up such tunnel, then transforming FARs for this
+    //  device accordingly. When the tunnel endpoint change, it should be up to Up4Service to update
+    //  the FAR on the device.
+    private GtpTunnel dbufTunnel;
+
+    @Override
+    protected boolean setupBehaviour(String opName) {
+        if (!super.setupBehaviour(opName)) {
+            return false;
+        }
+        flowRuleService = handler().get(FlowRuleService.class);
+        packetService = handler().get(PacketService.class);
+        fabricUpfStore = handler().get(FabricUpfStore.class);
+        upfTranslator = new FabricUpfTranslator(fabricUpfStore);
+        final CoreService coreService = handler().get(CoreService.class);
+        appId = coreService.getAppId(FabricPipeconfLoader.PIPELINE_APP_NAME);
+        if (appId == null) {
+            log.warn("Application ID is null. Cannot initialize behaviour.");
+            return false;
+        }
+
+        var capabilities = new FabricCapabilities(pipeconf);
+        if (!capabilities.supportUpf()) {
+            log.warn("Pipeconf {} on {} does not support UPF capabilities, " +
+                             "cannot perform {}",
+                     pipeconf.id(), deviceId, opName);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean init() {
+        if (setupBehaviour("init()")) {
+            if (!computeHardwareResourceSizes()) {
+                // error message will be printed by computeHardwareResourceSizes()
+                return false;
+            }
+            log.info("UpfProgrammable initialized for appId {} and deviceId {}", appId, deviceId);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Grab the capacities for the PDR and FAR tables from the pipeconf. Runs only once, on initialization.
+     *
+     * @return true if resource is fetched successfully, false otherwise.
+     * @throws IllegalStateException when FAR or PDR table can't be found in the pipeline model.
+     */
+    private boolean computeHardwareResourceSizes() {
+        long farTableSize = 0;
+        long encappedPdrTableSize = 0;
+        long unencappedPdrTableSize = 0;
+
+        // Get table sizes of interest
+        for (PiTableModel piTable : pipeconf.pipelineModel().tables()) {
+            if (piTable.id().equals(FABRIC_INGRESS_SPGW_UPLINK_PDRS)) {
+                encappedPdrTableSize = piTable.maxSize();
+            } else if (piTable.id().equals(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS)) {
+                unencappedPdrTableSize = piTable.maxSize();
+            } else if (piTable.id().equals(FABRIC_INGRESS_SPGW_FARS)) {
+                farTableSize = piTable.maxSize();
+            }
+        }
+        if (encappedPdrTableSize == 0) {
+            throw new IllegalStateException("Unable to find uplink PDR table in pipeline model.");
+        }
+        if (unencappedPdrTableSize == 0) {
+            throw new IllegalStateException("Unable to find downlink PDR table in pipeline model.");
+        }
+        if (encappedPdrTableSize != unencappedPdrTableSize) {
+            log.warn("The uplink and downlink PDR tables don't have equal sizes! Using the minimum of the two.");
+        }
+        if (farTableSize == 0) {
+            throw new IllegalStateException("Unable to find FAR table in pipeline model.");
+        }
+        // Get counter sizes of interest
+        long ingressCounterSize = 0;
+        long egressCounterSize = 0;
+        for (PiCounterModel piCounter : pipeconf.pipelineModel().counters()) {
+            if (piCounter.id().equals(FABRIC_INGRESS_SPGW_PDR_COUNTER)) {
+                ingressCounterSize = piCounter.size();
+            } else if (piCounter.id().equals(FABRIC_EGRESS_SPGW_PDR_COUNTER)) {
+                egressCounterSize = piCounter.size();
+            }
+        }
+        if (ingressCounterSize != egressCounterSize) {
+            log.warn("PDR ingress and egress counter sizes are not equal! Using the minimum of the two.");
+        }
+        this.farTableSize = farTableSize;
+        this.encappedPdrTableSize = encappedPdrTableSize;
+        this.unencappedPdrTableSize = unencappedPdrTableSize;
+        this.pdrCounterSize = Math.min(ingressCounterSize, egressCounterSize);
+        return true;
+    }
+
+    @Override
+    public void setBufferDrainer(BufferDrainer drainer) {
+        if (!setupBehaviour("setBufferDrainer()")) {
+            return;
+        }
+        this.bufferDrainer = drainer;
+    }
+
+    @Override
+    public void unsetBufferDrainer() {
+        if (!setupBehaviour("unsetBufferDrainer()")) {
+            return;
+        }
+        this.bufferDrainer = null;
+    }
+
+    @Override
+    public void enablePscEncap(int defaultQfi) throws UpfProgrammableException {
+        throw new UpfProgrammableException("PSC encap is not supported in fabric-v1model",
+                                           UNSUPPORTED_OPERATION);
+    }
+
+    @Override
+    public void disablePscEncap() throws UpfProgrammableException {
+        throw new UpfProgrammableException("PSC encap is not supported in fabric-v1model",
+                                           UNSUPPORTED_OPERATION);
+    }
+
+    @Override
+    public void sendPacketOut(ByteBuffer data) {
+        if (!setupBehaviour("sendPacketOut()")) {
+            return;
+        }
+        final OutboundPacket pkt = new DefaultOutboundPacket(
+                deviceId,
+                // Use TABLE logical port to have pkt routed via pipeline tables.
+                DefaultTrafficTreatment.builder()
+                        .setOutput(PortNumber.TABLE)
+                        .build(),
+                data);
+        packetService.emit(pkt);
+    }
+
+    @Override
+    public void setDbufTunnel(Ip4Address switchAddr, Ip4Address dbufAddr) {
+        if (!setupBehaviour("setDbufTunnel()")) {
+            return;
+        }
+        this.dbufTunnel = GtpTunnel.builder()
+                .setSrc(switchAddr)
+                .setDst(dbufAddr)
+                .setSrcPort((short) 2152)
+                .setTeid(0)
+                .build();
+    }
+
+    @Override
+    public void unsetDbufTunnel() {
+        if (!setupBehaviour("unsetDbufTunnel()")) {
+            return;
+        }
+        this.dbufTunnel = null;
+    }
+
+    /**
+     * Convert the given buffering FAR to a FAR that tunnels the packet to dbuf.
+     *
+     * @param far the FAR to convert
+     * @return the converted FAR
+     */
+    private ForwardingActionRule convertToDbufFar(ForwardingActionRule far) {
+        if (!far.buffers()) {
+            throw new IllegalArgumentException("Converting a non-buffering FAR to a dbuf FAR! This shouldn't happen.");
+        }
+        return ForwardingActionRule.builder()
+                .setFarId(far.farId())
+                .withSessionId(far.sessionId())
+                .setNotifyFlag(far.notifies())
+                .setBufferFlag(true)
+                .setTunnel(dbufTunnel)
+                .build();
+    }
+
+    @Override
+    public void cleanUp() {
+        if (!setupBehaviour("cleanUp()")) {
+            return;
+        }
+        log.info("Clearing all UPF-related table entries.");
+        flowRuleService.removeFlowRulesById(appId);
+        fabricUpfStore.reset();
+    }
+
+    @Override
+    public void clearInterfaces() {
+        if (!setupBehaviour("clearInterfaces()")) {
+            return;
+        }
+        log.info("Clearing all UPF interfaces.");
+        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+            if (upfTranslator.isFabricInterface(entry)) {
+                flowRuleService.removeFlowRules(entry);
+            }
+        }
+    }
+
+    @Override
+    public void clearFlows() {
+        if (!setupBehaviour("clearFlows()")) {
+            return;
+        }
+        log.info("Clearing all UE sessions.");
+        int pdrsCleared = 0;
+        int farsCleared = 0;
+        for (FlowRule entry : flowRuleService.getFlowEntriesById(appId)) {
+            if (upfTranslator.isFabricPdr(entry)) {
+                pdrsCleared++;
+                flowRuleService.removeFlowRules(entry);
+            } else if (upfTranslator.isFabricFar(entry)) {
+                farsCleared++;
+                flowRuleService.removeFlowRules(entry);
+            }
+        }
+        log.info("Cleared {} PDRs and {} FARS.", pdrsCleared, farsCleared);
+    }
+
+
+    @Override
+    public Collection<PdrStats> readAllCounters(long maxCounterId) {
+        if (!setupBehaviour("readAllCounters()")) {
+            return null;
+        }
+
+        long counterSize = pdrCounterSize();
+        if (maxCounterId != -1) {
+            counterSize = Math.min(maxCounterId, counterSize);
+        }
+
+        // Prepare PdrStats object builders, one for each counter ID currently in use
+        Map<Integer, PdrStats.Builder> pdrStatBuilders = Maps.newHashMap();
+        for (int cellId = 0; cellId < counterSize; cellId++) {
+            pdrStatBuilders.put(cellId, PdrStats.builder().withCellId(cellId));
+        }
+
+        // Generate the counter cell IDs.
+        Set<PiCounterId> counterIds = ImmutableSet.of(
+                FABRIC_INGRESS_SPGW_PDR_COUNTER,
+                FABRIC_EGRESS_SPGW_PDR_COUNTER
+        );
+
+        // Query the device.
+        Collection<PiCounterCell> counterEntryResponse = client.read(
+                DEFAULT_P4_DEVICE_ID, pipeconf)
+                .counterCells(counterIds)
+                .submitSync()
+                .all(PiCounterCell.class);
+
+        // Process response.
+        counterEntryResponse.forEach(counterCell -> {
+            if (counterCell.cellId().counterType() != INDIRECT) {
+                log.warn("Invalid counter data type {}, skipping", counterCell.cellId().counterType());
+                return;
+            }
+            if (!pdrStatBuilders.containsKey((int) counterCell.cellId().index())) {
+                // Most likely Up4config.maxUes() is set to a value smaller than what the switch
+                // pipeline can hold.
+                log.debug("Unrecognized index {} when reading all counters, " +
+                                  "that's expected if we are manually limiting maxUes", counterCell);
+                return;
+            }
+            PdrStats.Builder statsBuilder = pdrStatBuilders.get((int) counterCell.cellId().index());
+            if (counterCell.cellId().counterId().equals(FABRIC_INGRESS_SPGW_PDR_COUNTER)) {
+                statsBuilder.setIngress(counterCell.data().packets(),
+                                        counterCell.data().bytes());
+            } else if (counterCell.cellId().counterId().equals(FABRIC_EGRESS_SPGW_PDR_COUNTER)) {
+                statsBuilder.setEgress(counterCell.data().packets(),
+                                       counterCell.data().bytes());
+            } else {
+                log.warn("Unrecognized counter ID {}, skipping", counterCell);
+            }
+        });
+
+        return pdrStatBuilders
+                .values()
+                .stream()
+                .map(PdrStats.Builder::build)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public long pdrCounterSize() {
+        if (!setupBehaviour("pdrCounterSize()")) {
+            return -1;
+        }
+        computeHardwareResourceSizes();
+        return pdrCounterSize;
+    }
+
+    @Override
+    public long farTableSize() {
+        if (!setupBehaviour("farTableSize()")) {
+            return -1;
+        }
+        computeHardwareResourceSizes();
+        return farTableSize;
+    }
+
+    @Override
+    public long pdrTableSize() {
+        if (!setupBehaviour("pdrTableSize()")) {
+            return -1;
+        }
+        computeHardwareResourceSizes();
+        return Math.min(encappedPdrTableSize, unencappedPdrTableSize) * 2;
+    }
+
+    @Override
+    public PdrStats readCounter(int cellId) throws UpfProgrammableException {
+        if (!setupBehaviour("readCounter()")) {
+            return null;
+        }
+        if (cellId >= pdrCounterSize() || cellId < 0) {
+            throw new UpfProgrammableException("Requested PDR counter cell index is out of bounds.",
+                                               UpfProgrammableException.Type.COUNTER_INDEX_OUT_OF_RANGE);
+        }
+        PdrStats.Builder stats = PdrStats.builder().withCellId(cellId);
+
+        // Make list of cell handles we want to read.
+        List<PiCounterCellHandle> counterCellHandles = List.of(
+                PiCounterCellHandle.of(deviceId,
+                                       PiCounterCellId.ofIndirect(FABRIC_INGRESS_SPGW_PDR_COUNTER, cellId)),
+                PiCounterCellHandle.of(deviceId,
+                                       PiCounterCellId.ofIndirect(FABRIC_EGRESS_SPGW_PDR_COUNTER, cellId)));
+
+        // Query the device.
+        Collection<PiCounterCell> counterEntryResponse = client.read(
+                DEFAULT_P4_DEVICE_ID, pipeconf)
+                .handles(counterCellHandles).submitSync()
+                .all(PiCounterCell.class);
+
+        // Process response.
+        counterEntryResponse.forEach(counterCell -> {
+            if (counterCell.cellId().counterType() != INDIRECT) {
+                log.warn("Invalid counter data type {}, skipping", counterCell.cellId().counterType());
+                return;
+            }
+            if (cellId != counterCell.cellId().index()) {
+                log.warn("Unrecognized counter index {}, skipping", counterCell);
+                return;
+            }
+            if (counterCell.cellId().counterId().equals(FABRIC_INGRESS_SPGW_PDR_COUNTER)) {
+                stats.setIngress(counterCell.data().packets(), counterCell.data().bytes());
+            } else if (counterCell.cellId().counterId().equals(FABRIC_EGRESS_SPGW_PDR_COUNTER)) {
+                stats.setEgress(counterCell.data().packets(), counterCell.data().bytes());
+            } else {
+                log.warn("Unrecognized counter ID {}, skipping", counterCell);
+            }
+        });
+        return stats.build();
+    }
+
+
+    @Override
+    public void addPdr(PacketDetectionRule pdr) throws UpfProgrammableException {
+        if (!setupBehaviour("addPdr()")) {
+            return;
+        }
+        if (pdr.counterId() >= pdrCounterSize() || pdr.counterId() < 0) {
+            throw new UpfProgrammableException("Counter cell index referenced by PDR is out of bounds.",
+                                               UpfProgrammableException.Type.COUNTER_INDEX_OUT_OF_RANGE);
+        }
+        FlowRule fabricPdr = upfTranslator.pdrToFabricEntry(pdr, deviceId, appId, DEFAULT_PRIORITY);
+        log.info("Installing {}", pdr.toString());
+        flowRuleService.applyFlowRules(fabricPdr);
+        log.debug("PDR added with flowID {}", fabricPdr.id().value());
+
+        // If the flow rule was applied and the PDR is downlink, add the PDR to the farID->PDR mapping
+        if (pdr.matchesUnencapped()) {
+            fabricUpfStore.learnFarIdToUeAddrs(pdr);
+        }
+    }
+
+
+    @Override
+    public void addFar(ForwardingActionRule far) throws UpfProgrammableException {
+        if (!setupBehaviour("addFar()")) {
+            return;
+        }
+        UpfRuleIdentifier ruleId = UpfRuleIdentifier.of(far.sessionId(), far.farId());
+        if (far.buffers()) {
+            // If the far has the buffer flag, modify its tunnel so it directs to dbuf
+            far = convertToDbufFar(far);
+            fabricUpfStore.learBufferingFarId(ruleId);
+        }
+        FlowRule fabricFar = upfTranslator.farToFabricEntry(far, deviceId, appId, DEFAULT_PRIORITY);
+        log.info("Installing {}", far.toString());
+        flowRuleService.applyFlowRules(fabricFar);
+        log.debug("FAR added with flowID {}", fabricFar.id().value());
+        if (!far.buffers() && fabricUpfStore.isFarIdBuffering(ruleId)) {
+            // If this FAR does not buffer but used to, then drain the buffer for every UE address
+            // that hits this FAR.
+            fabricUpfStore.forgetBufferingFarId(ruleId);
+            for (var ueAddr : fabricUpfStore.ueAddrsOfFarId(ruleId)) {
+                if (bufferDrainer == null) {
+                    log.warn("Unable to drain downlink buffer for UE {}, bufferDrainer is null", ueAddr);
+                } else {
+                    bufferDrainer.drain(ueAddr);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void addInterface(UpfInterface upfInterface) throws UpfProgrammableException {
+        if (!setupBehaviour("addInterface()")) {
+            return;
+        }
+        FlowRule flowRule = upfTranslator.interfaceToFabricEntry(upfInterface, deviceId, appId, DEFAULT_PRIORITY);
+        log.info("Installing {}", upfInterface);
+        flowRuleService.applyFlowRules(flowRule);
+        log.debug("Interface added with flowID {}", flowRule.id().value());
+        // By default we enable UE-to-UE communication on the UE subnet identified by the CORE interface.
+        // TODO: allow enabling/disabling UE-to-UE via netcfg or other API.
+        log.warn("UE-to-UE traffic is not supported in fabric-v1model");
+    }
+
+    private boolean removeEntry(PiCriterion match, PiTableId tableId, boolean failSilent)
+            throws UpfProgrammableException {
+        if (!setupBehaviour("removeEntry()")) {
+            return false;
+        }
+        FlowRule entry = DefaultFlowRule.builder()
+                .forDevice(deviceId).fromApp(appId).makePermanent()
+                .forTable(tableId)
+                .withSelector(DefaultTrafficSelector.builder().matchPi(match).build())
+                .withPriority(DEFAULT_PRIORITY)
+                .build();
+
+        /*
+         *  FIXME: Stupid stupid slow hack, needed because removeFlowRules expects FlowRule objects
+         *   with correct and complete actions and parameters, but P4Runtime deletion requests
+         *   will not have those.
+         */
+        for (FlowEntry installedEntry : flowRuleService.getFlowEntriesById(appId)) {
+            if (installedEntry.selector().equals(entry.selector())) {
+                log.info("Found matching entry to remove, it has FlowID {}", installedEntry.id());
+                flowRuleService.removeFlowRules(installedEntry);
+                return true;
+            }
+        }
+        if (!failSilent) {
+            throw new UpfProgrammableException("Match criterion " + match.toString() +
+                                                       " not found in table " + tableId.toString());
+        }
+        return false;
+    }
+
+    @Override
+    public Collection<PacketDetectionRule> getPdrs() throws UpfProgrammableException {
+        if (!setupBehaviour("getPdrs()")) {
+            return null;
+        }
+        ArrayList<PacketDetectionRule> pdrs = new ArrayList<>();
+        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+            if (upfTranslator.isFabricPdr(flowRule)) {
+                pdrs.add(upfTranslator.fabricEntryToPdr(flowRule));
+            }
+        }
+        return pdrs;
+    }
+
+    @Override
+    public Collection<ForwardingActionRule> getFars() throws UpfProgrammableException {
+        if (!setupBehaviour("getFars()")) {
+            return null;
+        }
+        ArrayList<ForwardingActionRule> fars = new ArrayList<>();
+        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+            if (upfTranslator.isFabricFar(flowRule)) {
+                fars.add(upfTranslator.fabricEntryToFar(flowRule));
+            }
+        }
+        return fars;
+    }
+
+    @Override
+    public Collection<UpfInterface> getInterfaces() throws UpfProgrammableException {
+        if (!setupBehaviour("getInterfaces()")) {
+            return null;
+        }
+        ArrayList<UpfInterface> ifaces = new ArrayList<>();
+        for (FlowRule flowRule : flowRuleService.getFlowEntriesById(appId)) {
+            if (upfTranslator.isFabricInterface(flowRule)) {
+                ifaces.add(upfTranslator.fabricEntryToInterface(flowRule));
+            }
+        }
+        return ifaces;
+    }
+
+    @Override
+    public void removePdr(PacketDetectionRule pdr) throws UpfProgrammableException {
+        if (!setupBehaviour("removePdr()")) {
+            return;
+        }
+        final PiCriterion match;
+        final PiTableId tableId;
+        if (pdr.matchesEncapped()) {
+            match = PiCriterion.builder()
+                    .matchExact(HDR_TEID, pdr.teid().asArray())
+                    .matchExact(HDR_TUNNEL_IPV4_DST, pdr.tunnelDest().toInt())
+                    .build();
+            tableId = FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+        } else {
+            match = PiCriterion.builder()
+                    .matchExact(HDR_UE_ADDR, pdr.ueAddress().toInt())
+                    .build();
+            tableId = FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+        }
+        log.info("Removing {}", pdr.toString());
+        removeEntry(match, tableId, false);
+
+        // Remove the PDR from the farID->PDR mapping
+        // This is an inefficient hotfix FIXME: remove UE addrs from the mapping in sublinear time
+        if (pdr.matchesUnencapped()) {
+            // Should we remove just from the map entry with key == far ID?
+            fabricUpfStore.forgetUeAddr(pdr.ueAddress());
+        }
+    }
+
+    @Override
+    public void removeFar(ForwardingActionRule far) throws UpfProgrammableException {
+        log.info("Removing {}", far.toString());
+
+        PiCriterion match = PiCriterion.builder()
+                .matchExact(HDR_FAR_ID, fabricUpfStore.globalFarIdOf(far.sessionId(), far.farId()))
+                .build();
+
+        removeEntry(match, FABRIC_INGRESS_SPGW_FARS, false);
+    }
+
+    @Override
+    public void removeInterface(UpfInterface upfInterface) throws UpfProgrammableException {
+        if (!setupBehaviour("removeInterface()")) {
+            return;
+        }
+        Ip4Prefix ifacePrefix = upfInterface.getPrefix();
+        // If it isn't a core interface (so it is either access or unknown), try removing core
+        if (!upfInterface.isCore()) {
+            PiCriterion match1 = PiCriterion.builder()
+                    .matchLpm(HDR_IPV4_DST_ADDR, ifacePrefix.address().toInt(),
+                              ifacePrefix.prefixLength())
+                    .matchExact(HDR_GTPU_IS_VALID, 1)
+                    .build();
+            if (removeEntry(match1, FABRIC_INGRESS_SPGW_INTERFACES, true)) {
+                return;
+            }
+        }
+        // If that didn't work or didn't execute, try removing access
+        PiCriterion match2 = PiCriterion.builder()
+                .matchLpm(HDR_IPV4_DST_ADDR, ifacePrefix.address().toInt(),
+                          ifacePrefix.prefixLength())
+                .matchExact(HDR_GTPU_IS_VALID, 0)
+                .build();
+        removeEntry(match2, FABRIC_INGRESS_SPGW_INTERFACES, false);
+    }
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
new file mode 100644
index 0000000..786ef3c
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onlab.packet.Ip4Address;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Stores state required for translation of UPF entities to pipeline-specific ones.
+ */
+public interface FabricUpfStore {
+    /**
+     * Clear all state associated with translation.
+     */
+    void reset();
+
+    /**
+     * Returns the farIdMap.
+     *
+     * @return the farIdMap.
+     */
+    Map<UpfRuleIdentifier, Integer> getFarIdMap();
+
+    /**
+     * Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
+     * ID) pair.
+     *
+     * @param farIdPair a RuleIdentifier instance uniquely identifying the FAR
+     * @return A globally unique integer identifier
+     */
+    int globalFarIdOf(UpfRuleIdentifier farIdPair);
+
+    /**
+     * Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
+     * ID) pair.
+     *
+     * @param pfcpSessionId     The ID of the PFCP session that produced the FAR ID.
+     * @param sessionLocalFarId The FAR ID.
+     * @return A globally unique integer identifier
+     */
+    int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId);
+
+    /**
+     * Get the corresponding PFCP session ID and session-local FAR ID from a globally unique FAR ID,
+     * or return null if no such mapping is found.
+     *
+     * @param globalFarId globally unique FAR ID
+     * @return the corresponding PFCP session ID and session-local FAR ID, as a RuleIdentifier
+     */
+    UpfRuleIdentifier localFarIdOf(int globalFarId);
+
+    /**
+     * Get the corresponding queue Id from scheduling priority.
+     *
+     * @param schedulingPriority QCI scheduling priority
+     * @return the corresponding queue ID
+     */
+    String queueIdOf(int schedulingPriority);
+
+    /**
+     * Get the corresponding queue Id from scheduling priority.
+     *
+     * @param queueId Tofino queue Id
+     * @return the corresponding scheduling priroity
+    */
+    String schedulingPriorityOf(int queueId);
+
+    /**
+     * Stores the mapping between FAR ID and UE address as defined by the given PDR.
+     *
+     * @param pdr PDR
+     */
+    void learnFarIdToUeAddrs(PacketDetectionRule pdr);
+
+    /**
+     * Returns true if the given FAR IDs is known to be a buffering one.
+     *
+     * @param farId FAR ID
+     * @return boolean
+     */
+    boolean isFarIdBuffering(UpfRuleIdentifier farId);
+
+    /**
+     * Learns the given FAR ID as being a buffering one.
+     *
+     * @param farId FAR ID
+     */
+    void learBufferingFarId(UpfRuleIdentifier farId);
+
+    /**
+     * Forgets the given FAR ID as being a buffering one.
+     *
+     * @param farId FAR ID
+     */
+    void forgetBufferingFarId(UpfRuleIdentifier farId);
+
+    /**
+     * Returns the set of UE addresses associated with the given FAR ID.
+     *
+     * @param farId FAR ID
+     * @return Set of Ip4Address
+     */
+    Set<Ip4Address> ueAddrsOfFarId(UpfRuleIdentifier farId);
+
+    /**
+     * Removes the given UE address from the FAR ID to UE address map.
+     * @param ueAddr UE address
+     */
+    void forgetUeAddr(Ip4Address ueAddr);
+
+    /**
+     * Returns the set of known buffering FAR IDs.
+     * @return set
+     */
+    Set<UpfRuleIdentifier> getBufferFarIds();
+
+    /**
+     * Returns the FAR ID to UE addresses map.
+     *
+     * @return map
+     */
+    Map<UpfRuleIdentifier, Set<Ip4Address>> getFarIdToUeAddrs();
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
new file mode 100644
index 0000000..d270371
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
@@ -0,0 +1,441 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.packet.Ip4Address;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.upf.ForwardingActionRule;
+import org.onosproject.net.behaviour.upf.GtpTunnel;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.net.behaviour.upf.UpfInterface;
+import org.onosproject.net.behaviour.upf.UpfProgrammableException;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.pi.model.PiActionId;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.net.pi.runtime.PiTableAction;
+
+import java.util.Arrays;
+
+import static org.onosproject.pipelines.fabric.FabricConstants.CTR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.DROP;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_FARS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_INTERFACES;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_DBUF_FAR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_IFACE;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_NORMAL_FAR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_PDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_PDR_QOS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_TUNNEL_FAR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FAR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_FAR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_GTPU_IS_VALID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_IPV4_DST_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TEID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TUNNEL_IPV4_DST;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_UE_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.NEEDS_GTPU_DECAP;
+import static org.onosproject.pipelines.fabric.FabricConstants.NOTIFY_CP;
+import static org.onosproject.pipelines.fabric.FabricConstants.QID;
+import static org.onosproject.pipelines.fabric.FabricConstants.SRC_IFACE;
+import static org.onosproject.pipelines.fabric.FabricConstants.TEID;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_DST_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_SRC_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_SRC_PORT;
+
+/**
+ * Provides logic to translate UPF entities into pipeline-specific ones and vice-versa.
+ * Implementation should be stateless, with all state delegated to FabricUpfStore.
+ */
+public class FabricUpfTranslator {
+
+    // UPF related constants
+    public static final int INTERFACE_ACCESS = 1;
+    public static final int INTERFACE_CORE = 2;
+    public static final int INTERFACE_DBUF = 3;
+
+    private final FabricUpfStore fabricUpfStore;
+
+    public FabricUpfTranslator(FabricUpfStore fabricUpfStore) {
+        this.fabricUpfStore = fabricUpfStore;
+    }
+
+    /**
+     * Returns true if the given table entry is a Packet Detection Rule from the physical fabric pipeline, and
+     * false otherwise.
+     *
+     * @param entry the entry that may or may not be a fabric.p4 PDR
+     * @return true if the entry is a fabric.p4 PDR
+     */
+    public boolean isFabricPdr(FlowRule entry) {
+        return entry.table().equals(FABRIC_INGRESS_SPGW_UPLINK_PDRS)
+                || entry.table().equals(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS);
+    }
+
+    /**
+     * Returns true if the given table entry is a Forwarding Action Rule from the physical fabric pipeline, and
+     * false otherwise.
+     *
+     * @param entry the entry that may or may not be a fabric.p4 FAR
+     * @return true if the entry is a fabric.p4 FAR
+     */
+    public boolean isFabricFar(FlowRule entry) {
+        return entry.table().equals(FABRIC_INGRESS_SPGW_FARS);
+    }
+
+    /**
+     * Returns true if the given table entry is an interface table entry from the fabric.p4 physical pipeline, and
+     * false otherwise.
+     *
+     * @param entry the entry that may or may not be a fabric.p4 UPF interface
+     * @return true if the entry is a fabric.p4 UPF interface
+     */
+    public boolean isFabricInterface(FlowRule entry) {
+        return entry.table().equals(FABRIC_INGRESS_SPGW_INTERFACES);
+    }
+
+
+    /**
+     * Translate a fabric.p4 PDR table entry to a PacketDetectionRule instance for easier handling.
+     *
+     * @param entry the fabric.p4 entry to translate
+     * @return the corresponding PacketDetectionRule
+     * @throws UpfProgrammableException if the entry cannot be translated
+     */
+    public PacketDetectionRule fabricEntryToPdr(FlowRule entry)
+            throws UpfProgrammableException {
+        var pdrBuilder = PacketDetectionRule.builder();
+        Pair<PiCriterion, PiTableAction> matchActionPair = FabricUpfTranslatorUtil.fabricEntryToPiPair(entry);
+        PiCriterion match = matchActionPair.getLeft();
+        PiAction action = (PiAction) matchActionPair.getRight();
+
+        // Grab keys and parameters that are present for all PDRs
+        int globalFarId = FabricUpfTranslatorUtil.getParamInt(action, FAR_ID);
+        UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+
+        PiActionId actionId = action.id();
+        if (actionId.equals(FABRIC_INGRESS_SPGW_LOAD_PDR)) {
+            int schedulingPriority = 0;
+            pdrBuilder.withSchedulingPriority(schedulingPriority);
+        } else if (actionId.equals(FABRIC_INGRESS_SPGW_LOAD_PDR_QOS)) {
+            int queueId = FabricUpfTranslatorUtil.getParamInt(action, QID);
+            String schedulingPriority = fabricUpfStore.schedulingPriorityOf(queueId);
+            if (schedulingPriority == null) {
+                throw new UpfProgrammableException("Undefined Scheduling Priority");
+            }
+            pdrBuilder.withSchedulingPriority(Integer.parseInt(schedulingPriority));
+        } else {
+            throw new UpfProgrammableException("Unknown action ID");
+        }
+        pdrBuilder.withCounterId(FabricUpfTranslatorUtil.getParamInt(action, CTR_ID))
+                .withLocalFarId(farId.getSessionLocalId())
+                .withSessionId(farId.getPfcpSessionId());
+
+        if (FabricUpfTranslatorUtil.fieldIsPresent(match, HDR_TEID)) {
+            // F-TEID is only present for GTP-matching PDRs
+            ImmutableByteSequence teid = FabricUpfTranslatorUtil.getFieldValue(match, HDR_TEID);
+            Ip4Address tunnelDst = FabricUpfTranslatorUtil.getFieldAddress(match, HDR_TUNNEL_IPV4_DST);
+            pdrBuilder.withTeid(teid)
+                    .withTunnelDst(tunnelDst);
+        } else if (FabricUpfTranslatorUtil.fieldIsPresent(match, HDR_UE_ADDR)) {
+            // And UE address is only present for non-GTP-matching PDRs
+            pdrBuilder.withUeAddr(FabricUpfTranslatorUtil.getFieldAddress(match, HDR_UE_ADDR));
+        } else {
+            throw new UpfProgrammableException("Read malformed PDR from dataplane!:" + entry);
+        }
+        return pdrBuilder.build();
+    }
+
+    /**
+     * Translate a fabric.p4 FAR table entry to a ForwardActionRule instance for easier handling.
+     *
+     * @param entry the fabric.p4 entry to translate
+     * @return the corresponding ForwardingActionRule
+     * @throws UpfProgrammableException if the entry cannot be translated
+     */
+    public ForwardingActionRule fabricEntryToFar(FlowRule entry)
+            throws UpfProgrammableException {
+        var farBuilder = ForwardingActionRule.builder();
+        Pair<PiCriterion, PiTableAction> matchActionPair = FabricUpfTranslatorUtil.fabricEntryToPiPair(entry);
+        PiCriterion match = matchActionPair.getLeft();
+        PiAction action = (PiAction) matchActionPair.getRight();
+
+        int globalFarId = FabricUpfTranslatorUtil.getFieldInt(match, HDR_FAR_ID);
+        UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+
+        boolean dropFlag = FabricUpfTranslatorUtil.getParamInt(action, DROP) > 0;
+        boolean notifyFlag = FabricUpfTranslatorUtil.getParamInt(action, NOTIFY_CP) > 0;
+
+        // Match keys
+        farBuilder.withSessionId(farId.getPfcpSessionId())
+                .setFarId(farId.getSessionLocalId());
+
+        // Parameters common to all types of FARs
+        farBuilder.setDropFlag(dropFlag)
+                .setNotifyFlag(notifyFlag);
+
+        PiActionId actionId = action.id();
+
+        if (actionId.equals(FABRIC_INGRESS_SPGW_LOAD_TUNNEL_FAR)
+                || actionId.equals(FABRIC_INGRESS_SPGW_LOAD_DBUF_FAR)) {
+            // Grab parameters specific to encapsulating FARs if they're present
+            Ip4Address tunnelSrc = FabricUpfTranslatorUtil.getParamAddress(action, TUNNEL_SRC_ADDR);
+            Ip4Address tunnelDst = FabricUpfTranslatorUtil.getParamAddress(action, TUNNEL_DST_ADDR);
+            ImmutableByteSequence teid = FabricUpfTranslatorUtil.getParamValue(action, TEID);
+            short tunnelSrcPort = (short) FabricUpfTranslatorUtil.getParamInt(action, TUNNEL_SRC_PORT);
+
+            farBuilder.setBufferFlag(actionId.equals(FABRIC_INGRESS_SPGW_LOAD_DBUF_FAR));
+
+            farBuilder.setTunnel(
+                    GtpTunnel.builder()
+                            .setSrc(tunnelSrc)
+                            .setDst(tunnelDst)
+                            .setTeid(teid)
+                            .setSrcPort(tunnelSrcPort)
+                            .build());
+        }
+        return farBuilder.build();
+    }
+
+    /**
+     * Translate a fabric.p4 interface table entry to a UpfInterface instance for easier handling.
+     *
+     * @param entry the fabric.p4 entry to translate
+     * @return the corresponding UpfInterface
+     * @throws UpfProgrammableException if the entry cannot be translated
+     */
+    public UpfInterface fabricEntryToInterface(FlowRule entry)
+            throws UpfProgrammableException {
+        Pair<PiCriterion, PiTableAction> matchActionPair = FabricUpfTranslatorUtil.fabricEntryToPiPair(entry);
+        PiCriterion match = matchActionPair.getLeft();
+        PiAction action = (PiAction) matchActionPair.getRight();
+
+        var ifaceBuilder = UpfInterface.builder()
+                .setPrefix(FabricUpfTranslatorUtil.getFieldPrefix(match, HDR_IPV4_DST_ADDR));
+
+        int interfaceType = FabricUpfTranslatorUtil.getParamInt(action, SRC_IFACE);
+        if (interfaceType == INTERFACE_ACCESS) {
+            ifaceBuilder.setAccess();
+        } else if (interfaceType == INTERFACE_CORE) {
+            ifaceBuilder.setCore();
+        } else if (interfaceType == INTERFACE_DBUF) {
+            ifaceBuilder.setDbufReceiver();
+        }
+        return ifaceBuilder.build();
+    }
+
+    /**
+     * Translate a ForwardingActionRule to a FlowRule to be inserted into the fabric.p4 pipeline.
+     * A side effect of calling this method is the FAR object's globalFarId is assigned if it was not already.
+     *
+     * @param far      The FAR to be translated
+     * @param deviceId the ID of the device the FlowRule should be installed on
+     * @param appId    the ID of the application that will insert the FlowRule
+     * @param priority the FlowRule's priority
+     * @return the FAR translated to a FlowRule
+     * @throws UpfProgrammableException if the FAR to be translated is malformed
+     */
+    public FlowRule farToFabricEntry(ForwardingActionRule far, DeviceId deviceId, ApplicationId appId, int priority)
+            throws UpfProgrammableException {
+        PiAction action;
+        if (!far.encaps()) {
+            action = PiAction.builder()
+                    .withId(FABRIC_INGRESS_SPGW_LOAD_NORMAL_FAR)
+                    .withParameters(Arrays.asList(
+                            new PiActionParam(DROP, far.drops() ? 1 : 0),
+                            new PiActionParam(NOTIFY_CP, far.notifies() ? 1 : 0)
+                    ))
+                    .build();
+
+        } else {
+            if (far.tunnelSrc() == null || far.tunnelDst() == null
+                    || far.teid() == null || far.tunnel().srcPort() == null) {
+                throw new UpfProgrammableException(
+                        "Not all action parameters present when translating " +
+                                "intermediate encapsulating/buffering FAR to physical FAR!");
+            }
+            // TODO: copy tunnel destination port from logical switch write requests, instead of hardcoding 2152
+            PiActionId actionId = far.buffers() ? FABRIC_INGRESS_SPGW_LOAD_DBUF_FAR :
+                    FABRIC_INGRESS_SPGW_LOAD_TUNNEL_FAR;
+            action = PiAction.builder()
+                    .withId(actionId)
+                    .withParameters(Arrays.asList(
+                            new PiActionParam(DROP, far.drops() ? 1 : 0),
+                            new PiActionParam(NOTIFY_CP, far.notifies() ? 1 : 0),
+                            new PiActionParam(TEID, far.teid()),
+                            new PiActionParam(TUNNEL_SRC_ADDR, far.tunnelSrc().toInt()),
+                            new PiActionParam(TUNNEL_DST_ADDR, far.tunnelDst().toInt()),
+                            new PiActionParam(TUNNEL_SRC_PORT, far.tunnel().srcPort())
+                    ))
+                    .build();
+        }
+        PiCriterion match = PiCriterion.builder()
+                .matchExact(HDR_FAR_ID, fabricUpfStore.globalFarIdOf(far.sessionId(), far.farId()))
+                .build();
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId).fromApp(appId).makePermanent()
+                .forTable(FABRIC_INGRESS_SPGW_FARS)
+                .withSelector(DefaultTrafficSelector.builder().matchPi(match).build())
+                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
+                .withPriority(priority)
+                .build();
+    }
+
+    /**
+     * Translate a PacketDetectionRule to a FlowRule to be inserted into the fabric.p4 pipeline.
+     * A side effect of calling this method is the PDR object's globalFarId is assigned if it was not already.
+     *
+     * @param pdr      The PDR to be translated
+     * @param deviceId the ID of the device the FlowRule should be installed on
+     * @param appId    the ID of the application that will insert the FlowRule
+     * @param priority the FlowRule's priority
+     * @return the FAR translated to a FlowRule
+     * @throws UpfProgrammableException if the PDR to be translated is malformed
+     */
+    public FlowRule pdrToFabricEntry(PacketDetectionRule pdr, DeviceId deviceId, ApplicationId appId, int priority)
+            throws UpfProgrammableException {
+        PiCriterion match;
+        PiTableId tableId;
+        PiAction action;
+
+        if (pdr.matchesEncapped()) {
+            match = PiCriterion.builder()
+                    .matchExact(HDR_TEID, pdr.teid().asArray())
+                    .matchExact(HDR_TUNNEL_IPV4_DST, pdr.tunnelDest().toInt())
+                    .build();
+            tableId = FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+        } else if (pdr.matchesUnencapped()) {
+            match = PiCriterion.builder()
+                    .matchExact(HDR_UE_ADDR, pdr.ueAddress().toInt())
+                    .build();
+            tableId = FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+        } else {
+            throw new UpfProgrammableException("Flexible PDRs not yet supported! Cannot translate " + pdr.toString());
+        }
+
+        PiAction.Builder builder = PiAction.builder()
+                .withParameters(Arrays.asList(
+                        new PiActionParam(CTR_ID, pdr.counterId()),
+                        new PiActionParam(FAR_ID, fabricUpfStore.globalFarIdOf(pdr.sessionId(), pdr.farId())),
+                        new PiActionParam(NEEDS_GTPU_DECAP, pdr.matchesEncapped() ? 1 : 0)
+                ));
+        if (pdr.hasSchedulingPriority()) {
+            String queueId = fabricUpfStore.queueIdOf(pdr.schedulingPriority());
+            if (queueId == null) {
+                throw new UpfProgrammableException("Udefined Scheduling Priority");
+            }
+            action = builder
+                    .withId(FABRIC_INGRESS_SPGW_LOAD_PDR_QOS)
+                    .withParameter(new PiActionParam(QID, Integer.parseInt(queueId)))
+                    .build();
+        } else {
+            action = builder
+                    .withId(FABRIC_INGRESS_SPGW_LOAD_PDR)
+                    .build();
+        }
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId).fromApp(appId).makePermanent()
+                .forTable(tableId)
+                .withSelector(DefaultTrafficSelector.builder().matchPi(match).build())
+                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
+                .withPriority(priority)
+                .build();
+    }
+
+    /**
+     * Translate a UpfInterface to a FlowRule to be inserted into the fabric.p4 pipeline.
+     *
+     * @param upfInterface The interface to be translated
+     * @param deviceId     the ID of the device the FlowRule should be installed on
+     * @param appId        the ID of the application that will insert the FlowRule
+     * @param priority     the FlowRule's priority
+     * @return the UPF interface translated to a FlowRule
+     * @throws UpfProgrammableException if the interface cannot be translated
+     */
+    public FlowRule interfaceToFabricEntry(UpfInterface upfInterface, DeviceId deviceId,
+                                           ApplicationId appId, int priority)
+            throws UpfProgrammableException {
+        int interfaceTypeInt;
+        int gtpuValidity;
+        if (upfInterface.isDbufReceiver()) {
+            interfaceTypeInt = INTERFACE_DBUF;
+            gtpuValidity = 1;
+        } else if (upfInterface.isAccess()) {
+            interfaceTypeInt = INTERFACE_ACCESS;
+            gtpuValidity = 1;
+        } else {
+            interfaceTypeInt = INTERFACE_CORE;
+            gtpuValidity = 0;
+        }
+
+        PiCriterion match = PiCriterion.builder()
+                .matchLpm(HDR_IPV4_DST_ADDR,
+                          upfInterface.prefix().address().toInt(),
+                          upfInterface.prefix().prefixLength())
+                .matchExact(HDR_GTPU_IS_VALID, gtpuValidity)
+                .build();
+        PiAction action = PiAction.builder()
+                .withId(FABRIC_INGRESS_SPGW_LOAD_IFACE)
+                .withParameter(new PiActionParam(SRC_IFACE, interfaceTypeInt))
+                .build();
+        return DefaultFlowRule.builder()
+                .forDevice(deviceId).fromApp(appId).makePermanent()
+                .forTable(FABRIC_INGRESS_SPGW_INTERFACES)
+                .withSelector(DefaultTrafficSelector.builder().matchPi(match).build())
+                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
+                .withPriority(priority)
+                .build();
+    }
+
+//    public FlowRule buildGtpuWithPscEncapRule(DeviceId deviceId, ApplicationId appId, int qfi) {
+//        PiAction action = PiAction.builder()
+//                .withId(FABRIC_EGRESS_SPGW_GTPU_WITH_PSC)
+//                .withParameter(new PiActionParam(QFI, qfi))
+//                .build();
+//        // Default entry, no selector.
+//        return DefaultFlowRule.builder()
+//                .forDevice(deviceId).fromApp(appId).makePermanent()
+//                .forTable(FABRIC_EGRESS_SPGW_GTPU_ENCAP)
+//                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
+//                .withPriority(0)
+//                .build();
+//    }
+//
+//    public FlowRule buildGtpuOnlyEncapRule(DeviceId deviceId, ApplicationId appId) {
+//        PiAction action = PiAction.builder()
+//                .withId(FABRIC_EGRESS_SPGW_GTPU_ONLY)
+//                .build();
+//        // Default entry, no selector.
+//        return DefaultFlowRule.builder()
+//                .forDevice(deviceId).fromApp(appId).makePermanent()
+//                .forTable(FABRIC_EGRESS_SPGW_GTPU_ENCAP)
+//                .withTreatment(DefaultTrafficTreatment.builder().piTableAction(action).build())
+//                .withPriority(0)
+//                .build();
+//    }
+
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorUtil.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorUtil.java
new file mode 100644
index 0000000..a931f1f
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.net.behaviour.upf.UpfProgrammableException;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.PiInstruction;
+import org.onosproject.net.pi.model.PiActionParamId;
+import org.onosproject.net.pi.model.PiMatchFieldId;
+import org.onosproject.net.pi.model.PiMatchType;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionParam;
+import org.onosproject.net.pi.runtime.PiExactFieldMatch;
+import org.onosproject.net.pi.runtime.PiFieldMatch;
+import org.onosproject.net.pi.runtime.PiLpmFieldMatch;
+import org.onosproject.net.pi.runtime.PiRangeFieldMatch;
+import org.onosproject.net.pi.runtime.PiTableAction;
+import org.onosproject.net.pi.runtime.PiTernaryFieldMatch;
+
+import java.util.Optional;
+
+/**
+ * Utility class for manipulation of FlowRules and PiTableEntry objects specific to fabric-tna.
+ */
+final class FabricUpfTranslatorUtil {
+
+    private FabricUpfTranslatorUtil() {
+    }
+
+    static ImmutableByteSequence getFieldValue(PiFieldMatch field, PiMatchFieldId fieldId)
+            throws UpfProgrammableException {
+        if (field == null) {
+            throw new UpfProgrammableException(
+                    String.format("Unable to find field %s where expected!", fieldId.toString()));
+        }
+        if (field.type() == PiMatchType.EXACT) {
+            return ((PiExactFieldMatch) field).value();
+        } else if (field.type() == PiMatchType.LPM) {
+            return ((PiLpmFieldMatch) field).value();
+        } else if (field.type() == PiMatchType.TERNARY) {
+            return ((PiTernaryFieldMatch) field).value();
+        } else if (field.type() == PiMatchType.RANGE) {
+            return ((PiRangeFieldMatch) field).lowValue();
+        } else {
+            throw new UpfProgrammableException(
+                    String.format("Field %s has unknown match type: %s", fieldId.toString(), field.type().toString()));
+        }
+    }
+
+    static ImmutableByteSequence getFieldValue(PiCriterion criterion, PiMatchFieldId fieldId)
+            throws UpfProgrammableException {
+        return getFieldValue(criterion.fieldMatch(fieldId).orElse(null), fieldId);
+    }
+
+    static boolean fieldIsPresent(PiCriterion criterion, PiMatchFieldId fieldId) {
+        return criterion.fieldMatch(fieldId).isPresent();
+    }
+
+    static ImmutableByteSequence getParamValue(PiAction action, PiActionParamId paramId)
+            throws UpfProgrammableException {
+
+        for (PiActionParam param : action.parameters()) {
+            if (param.id().equals(paramId)) {
+                return param.value();
+            }
+        }
+        throw new UpfProgrammableException(
+                String.format("Unable to find parameter %s where expected!", paramId.toString()));
+    }
+
+    static int getFieldInt(PiCriterion criterion, PiMatchFieldId fieldId)
+            throws UpfProgrammableException {
+        return byteSeqToInt(getFieldValue(criterion, fieldId));
+    }
+
+    static int getParamInt(PiAction action, PiActionParamId paramId)
+            throws UpfProgrammableException {
+        return byteSeqToInt(getParamValue(action, paramId));
+    }
+
+    static Ip4Address getParamAddress(PiAction action, PiActionParamId paramId)
+            throws UpfProgrammableException {
+        return Ip4Address.valueOf(getParamValue(action, paramId).asArray());
+    }
+
+    static Ip4Prefix getFieldPrefix(PiCriterion criterion, PiMatchFieldId fieldId) {
+        Optional<PiFieldMatch> optField = criterion.fieldMatch(fieldId);
+        if (optField.isEmpty()) {
+            return null;
+        }
+        PiLpmFieldMatch field = (PiLpmFieldMatch) optField.get();
+        Ip4Address address = Ip4Address.valueOf(field.value().asArray());
+        return Ip4Prefix.valueOf(address, field.prefixLength());
+    }
+
+    static Ip4Address getFieldAddress(PiCriterion criterion, PiMatchFieldId fieldId)
+            throws UpfProgrammableException {
+        return Ip4Address.valueOf(getFieldValue(criterion, fieldId).asArray());
+    }
+
+    static int byteSeqToInt(ImmutableByteSequence sequence) {
+        try {
+            return sequence.fit(32).asReadOnlyBuffer().getInt();
+        } catch (ImmutableByteSequence.ByteSequenceTrimException e) {
+            throw new IllegalArgumentException("Attempted to convert a >4 byte wide sequence to an integer!");
+        }
+    }
+
+    static Pair<PiCriterion, PiTableAction> fabricEntryToPiPair(FlowRule entry) {
+        PiCriterion match = (PiCriterion) entry.selector().getCriterion(Criterion.Type.PROTOCOL_INDEPENDENT);
+        PiTableAction action = null;
+        for (Instruction instruction : entry.treatment().allInstructions()) {
+            if (instruction.type() == Instruction.Type.PROTOCOL_INDEPENDENT) {
+                PiInstruction piInstruction = (PiInstruction) instruction;
+                action = piInstruction.action();
+                break;
+            }
+        }
+        return Pair.of(match, action);
+    }
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/UpfRuleIdentifier.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/UpfRuleIdentifier.java
new file mode 100644
index 0000000..5b6c6b6
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/UpfRuleIdentifier.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onlab.util.ImmutableByteSequence;
+
+import java.util.Objects;
+
+/**
+ * Wrapper for identifying information of FARs and PDRs.
+ */
+public final class UpfRuleIdentifier {
+    private final int sessionlocalId;
+    private final ImmutableByteSequence pfcpSessionId;
+
+    /**
+     * A PDR or FAR can be globally uniquely identified by the combination of the ID of the PFCP session that
+     * produced it, and the ID that the rule was assigned in that PFCP session.
+     *
+     * @param pfcpSessionId  The PFCP session that produced the rule ID
+     * @param sessionlocalId The rule ID
+     */
+    public UpfRuleIdentifier(ImmutableByteSequence pfcpSessionId, int sessionlocalId) {
+        this.pfcpSessionId = pfcpSessionId;
+        this.sessionlocalId = sessionlocalId;
+    }
+
+    /**
+     * Create an instance of this class from the given PFCP session ID and the session-local Rule ID.
+     *
+     * @param pfcpSessionId  PFCP session ID of the rule to identify
+     * @param sessionlocalId session-local Rule ID of the rule to identify
+     * @return a new rule identifier
+     */
+    public static UpfRuleIdentifier of(ImmutableByteSequence pfcpSessionId, int sessionlocalId) {
+        return new UpfRuleIdentifier(pfcpSessionId, sessionlocalId);
+    }
+
+    /**
+     * Get the PFCP session-local rule ID.
+     *
+     * @return session-local rule ID
+     */
+    public int getSessionLocalId() {
+        return sessionlocalId;
+    }
+
+    /**
+     * Get the PFCP session ID.
+     *
+     * @return PFCP session ID
+     */
+    public ImmutableByteSequence getPfcpSessionId() {
+        return pfcpSessionId;
+    }
+
+    @Override
+    public String toString() {
+        return "RuleIdentifier{" +
+                "sessionlocalId=" + sessionlocalId +
+                ", pfcpSessionId=" + pfcpSessionId +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        UpfRuleIdentifier that = (UpfRuleIdentifier) obj;
+        return (this.sessionlocalId == that.sessionlocalId) && (this.pfcpSessionId.equals(that.pfcpSessionId));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.sessionlocalId, this.pfcpSessionId);
+    }
+}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java
new file mode 100644
index 0000000..42591786
--- /dev/null
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * UPF programmable behaviour implementation for fabric-v1model.
+ */
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammableTest.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammableTest.java
new file mode 100644
index 0000000..50fd8bc
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammableTest.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.util.HexString;
+import org.onosproject.TestApplicationId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.upf.ForwardingActionRule;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.net.behaviour.upf.PdrStats;
+import org.onosproject.net.behaviour.upf.UpfInterface;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverData;
+import org.onosproject.net.driver.DriverHandler;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.packet.PacketService;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiTableModel;
+import org.onosproject.net.pi.service.PiPipeconfService;
+import org.onosproject.net.pi.service.PiTranslationService;
+import org.onosproject.p4runtime.api.P4RuntimeController;
+import org.onosproject.pipelines.fabric.impl.FabricPipeconfLoader;
+import org.onosproject.pipelines.fabric.impl.behaviour.FabricCapabilities;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import static junit.framework.TestCase.assertNotNull;
+import static junit.framework.TestCase.assertTrue;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_EGRESS_SPGW_PDR_COUNTER;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_FARS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_PDR_COUNTER;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+
+public class FabricUpfProgrammableTest {
+
+    private static final ApplicationId APP_ID =
+            TestApplicationId.create(FabricPipeconfLoader.PIPELINE_APP_NAME);
+
+    private final DistributedFabricUpfStore upfStore = TestDistributedFabricUpfStore.build();
+    private MockPacketService packetService;
+    private FabricUpfProgrammable upfProgrammable;
+
+    // Bytes of a random but valid Ethernet frame.
+    private static final byte[] ETH_FRAME_BYTES = HexString.fromHexString(
+            "00060708090a0001020304058100000a08004500006a000100004011f92ec0a80001c0a8000204d2005" +
+                    "00056a8d5000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20" +
+                    "2122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f4041424344454" +
+                    "64748494a4b4c4d", "");
+    private static final TrafficTreatment TABLE_OUTPUT_TREATMENT = DefaultTrafficTreatment.builder()
+            .setOutput(PortNumber.TABLE)
+            .build();
+
+    private static final List<PiTableModel> TABLE_MODELS = ImmutableList.of(
+            new MockTableModel(FABRIC_INGRESS_SPGW_UPLINK_PDRS,
+                               TestUpfConstants.PHYSICAL_MAX_PDRS / 2),
+            new MockTableModel(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS,
+                               TestUpfConstants.PHYSICAL_MAX_PDRS / 2),
+            new MockTableModel(FABRIC_INGRESS_SPGW_FARS,
+                               TestUpfConstants.PHYSICAL_MAX_FARS)
+    );
+    private static final List<PiCounterModel> COUNTER_MODELS = ImmutableList.of(
+            new MockCounterModel(FABRIC_INGRESS_SPGW_PDR_COUNTER,
+                                 TestUpfConstants.PHYSICAL_COUNTER_SIZE),
+            new MockCounterModel(FABRIC_EGRESS_SPGW_PDR_COUNTER,
+                                 TestUpfConstants.PHYSICAL_COUNTER_SIZE)
+    );
+
+    @Before
+    public void setUp() throws Exception {
+        FabricCapabilities capabilities = createMock(FabricCapabilities.class);
+        expect(capabilities.supportUpf()).andReturn(true).anyTimes();
+        replay(capabilities);
+
+        // Services mock
+        packetService = new MockPacketService();
+        CoreService coreService = createMock(CoreService.class);
+        NetworkConfigService netcfgService = createMock(NetworkConfigService.class);
+        DeviceService deviceService = createMock(DeviceService.class);
+        PiTranslationService piTranslationService = createMock(PiTranslationService.class);
+        expect(coreService.getAppId(anyString())).andReturn(APP_ID).anyTimes();
+        expect(netcfgService.getConfig(TestUpfConstants.DEVICE_ID, BasicDeviceConfig.class))
+                .andReturn(TestUpfUtils.getBasicConfig(TestUpfConstants.DEVICE_ID, "/basic.json"))
+                .anyTimes();
+        replay(coreService, netcfgService);
+
+        // Mock driverData to get the right device ID
+        DriverData driverData = createMock(DriverData.class);
+        expect(driverData.deviceId()).andReturn(TestUpfConstants.DEVICE_ID).anyTimes();
+        replay(driverData);
+
+        // Mock DriverHandler to get all the required mocked services
+        DriverHandler driverHandler = createMock(DriverHandler.class);
+        expect(driverHandler.get(FlowRuleService.class)).andReturn(new MockFlowRuleService()).anyTimes();
+        expect(driverHandler.get(PacketService.class)).andReturn(packetService).anyTimes();
+        expect(driverHandler.get(FabricUpfStore.class)).andReturn(upfStore).anyTimes();
+        expect(driverHandler.get(NetworkConfigService.class)).andReturn(netcfgService).anyTimes();
+        expect(driverHandler.get(CoreService.class)).andReturn(coreService).anyTimes();
+        expect(driverHandler.get(DeviceService.class)).andReturn(deviceService).anyTimes();
+        expect(driverHandler.get(PiTranslationService.class)).andReturn(piTranslationService).anyTimes();
+        expect(driverHandler.get(PiPipeconfService.class))
+                .andReturn(new MockPiPipeconfService(TABLE_MODELS, COUNTER_MODELS))
+                .anyTimes();
+        expect(driverHandler.get(P4RuntimeController.class))
+                .andReturn(new MockP4RuntimeController(TestUpfConstants.DEVICE_ID,
+                                                       TestUpfConstants.COUNTER_PKTS,
+                                                       TestUpfConstants.COUNTER_BYTES,
+                                                       TestUpfConstants.PHYSICAL_COUNTER_SIZE))
+                .anyTimes();
+        expect(driverHandler.data()).andReturn(driverData).anyTimes();
+        replay(driverHandler);
+
+        upfProgrammable = new FabricUpfProgrammable();
+        TestUtils.setField(upfProgrammable, "handler", driverHandler);
+        TestUtils.setField(upfProgrammable, "data", driverData);
+        ConcurrentMap<DeviceId, URI> channelUris = TestUtils.getField(upfProgrammable, "CHANNEL_URIS");
+        channelUris.put(TestUpfConstants.DEVICE_ID, new URI("grpc://localhost:1234?device_id=1"));
+    }
+
+    @Test
+    public void testUplinkPdr() throws Exception {
+        assertTrue(upfProgrammable.getPdrs().isEmpty());
+        PacketDetectionRule expectedPdr = TestUpfConstants.UPLINK_PDR;
+        upfProgrammable.addPdr(expectedPdr);
+        Collection<PacketDetectionRule> installedPdrs = upfProgrammable.getPdrs();
+        assertThat(installedPdrs.size(), equalTo(1));
+        for (var readPdr : installedPdrs) {
+            assertThat(readPdr, equalTo(expectedPdr));
+        }
+        upfProgrammable.removePdr(expectedPdr.withoutActionParams());
+        assertTrue(upfProgrammable.getPdrs().isEmpty());
+    }
+
+    @Test
+    public void testDownlinkPdr() throws Exception {
+        assertTrue(upfProgrammable.getPdrs().isEmpty());
+        PacketDetectionRule expectedPdr = TestUpfConstants.DOWNLINK_PDR;
+        upfProgrammable.addPdr(expectedPdr);
+        Collection<PacketDetectionRule> installedPdrs = upfProgrammable.getPdrs();
+        assertThat(installedPdrs.size(), equalTo(1));
+        for (var readPdr : installedPdrs) {
+            assertThat(readPdr, equalTo(expectedPdr));
+        }
+        upfProgrammable.removePdr(expectedPdr.withoutActionParams());
+        assertTrue(upfProgrammable.getPdrs().isEmpty());
+    }
+
+    @Test
+    public void testUplinkFar() throws Exception {
+        assertTrue(upfProgrammable.getFars().isEmpty());
+        ForwardingActionRule expectedFar = TestUpfConstants.UPLINK_FAR;
+        upfProgrammable.addFar(expectedFar);
+        Collection<ForwardingActionRule> installedFars = upfProgrammable.getFars();
+        assertThat(installedFars.size(), equalTo(1));
+        for (var readFar : installedFars) {
+            assertThat(readFar, equalTo(expectedFar));
+        }
+        upfProgrammable.removeFar(expectedFar.withoutActionParams());
+        assertTrue(upfProgrammable.getFars().isEmpty());
+    }
+
+    @Test
+    public void testDownlinkFar() throws Exception {
+        assertTrue(upfProgrammable.getFars().isEmpty());
+        ForwardingActionRule expectedFar = TestUpfConstants.DOWNLINK_FAR;
+        upfProgrammable.addFar(expectedFar);
+        Collection<ForwardingActionRule> installedFars = upfProgrammable.getFars();
+        assertThat(installedFars.size(), equalTo(1));
+        for (var readFar : installedFars) {
+            assertThat(readFar, equalTo(expectedFar));
+        }
+        upfProgrammable.removeFar(expectedFar.withoutActionParams());
+        assertTrue(upfProgrammable.getFars().isEmpty());
+    }
+
+    @Test
+    public void testUplinkInterface() throws Exception {
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+        UpfInterface expectedInterface = TestUpfConstants.UPLINK_INTERFACE;
+        upfProgrammable.addInterface(expectedInterface);
+        Collection<UpfInterface> installedInterfaces = upfProgrammable.getInterfaces();
+        assertThat(installedInterfaces.size(), equalTo(1));
+        for (var readInterface : installedInterfaces) {
+            assertThat(readInterface, equalTo(expectedInterface));
+        }
+        upfProgrammable.removeInterface(expectedInterface);
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+    }
+
+    @Test
+    public void testDownlinkInterface() throws Exception {
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+        UpfInterface expectedInterface = TestUpfConstants.DOWNLINK_INTERFACE;
+        upfProgrammable.addInterface(expectedInterface);
+        Collection<UpfInterface> installedInterfaces = upfProgrammable.getInterfaces();
+        assertThat(installedInterfaces.size(), equalTo(1));
+        for (var readInterface : installedInterfaces) {
+            assertThat(readInterface, equalTo(expectedInterface));
+        }
+        upfProgrammable.removeInterface(expectedInterface);
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+    }
+
+    @Test
+    public void testClearInterfaces() throws Exception {
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+        upfProgrammable.addInterface(TestUpfConstants.UPLINK_INTERFACE);
+        upfProgrammable.addInterface(TestUpfConstants.DOWNLINK_INTERFACE);
+        assertThat(upfProgrammable.getInterfaces().size(), equalTo(2));
+        upfProgrammable.clearInterfaces();
+        assertTrue(upfProgrammable.getInterfaces().isEmpty());
+    }
+
+    @Test
+    public void testReadAllCounters() {
+        Collection<PdrStats> allStats = upfProgrammable.readAllCounters(-1);
+        assertThat(allStats.size(), equalTo(TestUpfConstants.PHYSICAL_COUNTER_SIZE));
+        for (PdrStats stat : allStats) {
+            assertThat(stat.getIngressBytes(), equalTo(TestUpfConstants.COUNTER_BYTES));
+            assertThat(stat.getEgressBytes(), equalTo(TestUpfConstants.COUNTER_BYTES));
+            assertThat(stat.getIngressPkts(), equalTo(TestUpfConstants.COUNTER_PKTS));
+            assertThat(stat.getEgressPkts(), equalTo(TestUpfConstants.COUNTER_PKTS));
+        }
+    }
+
+    @Test
+    public void testReadAllCountersLimitedCounters() {
+        Collection<PdrStats> allStats = upfProgrammable.readAllCounters(10);
+        assertThat(allStats.size(), equalTo(10));
+    }
+
+    @Test
+    public void testReadAllCountersPhysicalLimit() {
+        Collection<PdrStats> allStats = upfProgrammable.readAllCounters(1024);
+        assertThat(allStats.size(), equalTo(TestUpfConstants.PHYSICAL_COUNTER_SIZE));
+    }
+
+    @Test
+    public void testSendPacketOut() {
+        upfProgrammable.sendPacketOut(ByteBuffer.wrap(ETH_FRAME_BYTES));
+        var emittedPkt = packetService.emittedPackets.poll();
+        assertNotNull(emittedPkt);
+        assertThat(emittedPkt.data().array(), equalTo(ETH_FRAME_BYTES));
+        assertThat(emittedPkt.treatment(), equalTo(TABLE_OUTPUT_TREATMENT));
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorTest.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorTest.java
new file mode 100644
index 0000000..e490572
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslatorTest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.junit.Test;
+import org.onosproject.net.behaviour.upf.ForwardingActionRule;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.net.behaviour.upf.UpfInterface;
+import org.onosproject.net.behaviour.upf.UpfProgrammableException;
+import org.onosproject.net.flow.FlowRule;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class FabricUpfTranslatorTest {
+
+    private final FabricUpfTranslator upfTranslator = new FabricUpfTranslator(TestDistributedFabricUpfStore.build());
+
+    @Test
+    public void fabricEntryToUplinkPdrTest() {
+        PacketDetectionRule expectedPdr = TestUpfConstants.UPLINK_PDR;
+        PacketDetectionRule translatedPdr;
+        try {
+            translatedPdr = upfTranslator.fabricEntryToPdr(TestUpfConstants.FABRIC_UPLINK_PDR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric uplink PDR should translate to abstract PDR without error.", false);
+            return;
+        }
+        assertThat("Translated PDR should be uplink.", translatedPdr.matchesEncapped());
+        assertThat(translatedPdr, equalTo(expectedPdr));
+    }
+
+    @Test
+    public void fabricEntryToUplinkPriorityPdrTest() {
+        PacketDetectionRule expectedPdr = TestUpfConstants.UPLINK_PRIORITY_PDR;
+        PacketDetectionRule translatedPdr;
+        try {
+            translatedPdr = upfTranslator.fabricEntryToPdr(TestUpfConstants.FABRIC_UPLINK_PRIORITY_PDR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric uplink PDR should translate to abstract PDR without error.", false);
+            return;
+        }
+        assertThat("Translated PDR should be uplink.", translatedPdr.matchesEncapped());
+        assertThat(translatedPdr, equalTo(expectedPdr));
+    }
+
+    @Test
+    public void fabricEntryToDownlinkPdrTest() {
+        PacketDetectionRule expectedPdr = TestUpfConstants.DOWNLINK_PDR;
+        PacketDetectionRule translatedPdr;
+        try {
+            translatedPdr = upfTranslator.fabricEntryToPdr(TestUpfConstants.FABRIC_DOWNLINK_PDR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric downlink PDR should translate to abstract PDR without error.", false);
+            return;
+        }
+
+        assertThat("Translated PDR should be downlink.", translatedPdr.matchesUnencapped());
+        assertThat(translatedPdr, equalTo(expectedPdr));
+    }
+
+    @Test
+    public void fabricEntryToDownlinkPriorityPdrTest() {
+        PacketDetectionRule expectedPdr = TestUpfConstants.DOWNLINK_PRIORITY_PDR;
+        PacketDetectionRule translatedPdr;
+        try {
+            translatedPdr = upfTranslator.fabricEntryToPdr(TestUpfConstants.FABRIC_DOWNLINK_PRIORITY_PDR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric downlink PDR should translate to abstract PDR without error.", false);
+            return;
+        }
+
+        assertThat("Translated PDR should be downlink.", translatedPdr.matchesUnencapped());
+        assertThat(translatedPdr, equalTo(expectedPdr));
+    }
+
+    @Test
+    public void fabricEntryToUplinkFarTest() {
+        ForwardingActionRule translatedFar;
+        ForwardingActionRule expectedFar = TestUpfConstants.UPLINK_FAR;
+        try {
+            translatedFar = upfTranslator.fabricEntryToFar(TestUpfConstants.FABRIC_UPLINK_FAR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric uplink FAR should correctly translate to abstract FAR without error",
+                       false);
+            return;
+        }
+        assertThat("Translated FAR should be uplink.", translatedFar.forwards());
+        assertThat(translatedFar, equalTo(expectedFar));
+    }
+
+    @Test
+    public void fabricEntryToDownlinkFarTest() {
+        ForwardingActionRule translatedFar;
+        ForwardingActionRule expectedFar = TestUpfConstants.DOWNLINK_FAR;
+        try {
+            translatedFar = upfTranslator.fabricEntryToFar(TestUpfConstants.FABRIC_DOWNLINK_FAR);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric downlink FAR should correctly translate to abstract FAR without error",
+                       false);
+            return;
+        }
+        assertThat("Translated FAR should be downlink.", translatedFar.encaps());
+        assertThat(translatedFar, equalTo(expectedFar));
+    }
+
+    @Test
+    public void fabricEntryToUplinkInterfaceTest() {
+        UpfInterface translatedInterface;
+        UpfInterface expectedInterface = TestUpfConstants.UPLINK_INTERFACE;
+        try {
+            translatedInterface = upfTranslator.fabricEntryToInterface(TestUpfConstants.FABRIC_UPLINK_INTERFACE);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric uplink interface should correctly translate to abstract interface without error",
+                       false);
+            return;
+        }
+        assertThat("Translated interface should be uplink.", translatedInterface.isAccess());
+        assertThat(translatedInterface, equalTo(expectedInterface));
+    }
+
+    @Test
+    public void fabricEntryToDownlinkInterfaceTest() {
+        UpfInterface translatedInterface;
+        UpfInterface expectedInterface = TestUpfConstants.DOWNLINK_INTERFACE;
+        try {
+            translatedInterface = upfTranslator.fabricEntryToInterface(TestUpfConstants.FABRIC_DOWNLINK_INTERFACE);
+        } catch (UpfProgrammableException e) {
+            assertThat("Fabric downlink interface should correctly translate to abstract interface without error",
+                       false);
+            return;
+        }
+        assertThat("Translated interface should be downlink.", translatedInterface.isCore());
+        assertThat(translatedInterface, equalTo(expectedInterface));
+    }
+
+    @Test
+    public void uplinkInterfaceToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_UPLINK_INTERFACE;
+        try {
+            translatedRule = upfTranslator.interfaceToFabricEntry(TestUpfConstants.UPLINK_INTERFACE,
+                                                                  TestUpfConstants.DEVICE_ID,
+                                                                  TestUpfConstants.APP_ID,
+                                                                  TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract uplink interface should correctly translate to Fabric interface without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void downlinkInterfaceToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_DOWNLINK_INTERFACE;
+        try {
+            translatedRule = upfTranslator.interfaceToFabricEntry(TestUpfConstants.DOWNLINK_INTERFACE,
+                                                                  TestUpfConstants.DEVICE_ID,
+                                                                  TestUpfConstants.APP_ID,
+                                                                  TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract downlink interface should correctly translate to Fabric interface without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void downlinkPdrToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_DOWNLINK_PDR;
+        try {
+            translatedRule = upfTranslator.pdrToFabricEntry(TestUpfConstants.DOWNLINK_PDR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract downlink PDR should correctly translate to Fabric PDR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void downlinkPdrToFabricPriorityEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_DOWNLINK_PRIORITY_PDR;
+        try {
+            translatedRule = upfTranslator.pdrToFabricEntry(TestUpfConstants.DOWNLINK_PRIORITY_PDR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract downlink PDR should correctly translate to Fabric PDR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void uplinkFarToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_UPLINK_FAR;
+        try {
+            translatedRule = upfTranslator.farToFabricEntry(TestUpfConstants.UPLINK_FAR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract uplink FAR should correctly translate to Fabric FAR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void uplinkPdrToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_UPLINK_PDR;
+        try {
+            translatedRule = upfTranslator.pdrToFabricEntry(TestUpfConstants.UPLINK_PDR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract uplink PDR should correctly translate to Fabric PDR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void uplinkPriorityPdrToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_UPLINK_PRIORITY_PDR;
+        try {
+            translatedRule = upfTranslator.pdrToFabricEntry(TestUpfConstants.UPLINK_PRIORITY_PDR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract uplink PDR should correctly translate to Fabric PDR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+
+    @Test
+    public void downlinkFarToFabricEntryTest() {
+        FlowRule translatedRule;
+        FlowRule expectedRule = TestUpfConstants.FABRIC_DOWNLINK_FAR;
+        try {
+            translatedRule = upfTranslator.farToFabricEntry(TestUpfConstants.DOWNLINK_FAR,
+                                                            TestUpfConstants.DEVICE_ID,
+                                                            TestUpfConstants.APP_ID,
+                                                            TestUpfConstants.DEFAULT_PRIORITY);
+        } catch (UpfProgrammableException e) {
+            assertThat("Abstract downlink FAR should correctly translate to Fabric FAR without error",
+                       false);
+            return;
+        }
+        assertThat(translatedRule, equalTo(expectedRule));
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockCounterModel.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockCounterModel.java
new file mode 100644
index 0000000..dcf667b
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockCounterModel.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onosproject.net.pi.model.PiCounterId;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiCounterType;
+import org.onosproject.net.pi.model.PiTableId;
+
+public class MockCounterModel implements PiCounterModel {
+    PiCounterId id;
+    int size;
+
+    public MockCounterModel(PiCounterId id, int size) {
+        this.id = id;
+        this.size = size;
+    }
+
+    @Override
+    public PiCounterId id() {
+        return this.id;
+    }
+
+    @Override
+    public PiCounterType counterType() {
+        return null;
+    }
+
+    @Override
+    public Unit unit() {
+        return null;
+    }
+
+    @Override
+    public PiTableId table() {
+        return null;
+    }
+
+    @Override
+    public long size() {
+        return this.size;
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockFlowRuleService.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockFlowRuleService.java
new file mode 100644
index 0000000..055a4be
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockFlowRuleService.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.Sets;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.DefaultFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleServiceAdapter;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+public class MockFlowRuleService extends FlowRuleServiceAdapter {
+
+    final Set<FlowRule> flows = Sets.newHashSet();
+    boolean success;
+
+    int errorFlow = -1;
+
+    public void setErrorFlow(int errorFlow) {
+        this.errorFlow = errorFlow;
+    }
+
+    public void setFuture(boolean success) {
+        this.success = success;
+    }
+
+    @Override
+    public void apply(FlowRuleOperations ops) {
+        AtomicBoolean thisSuccess = new AtomicBoolean(success);
+        ops.stages().forEach(stage -> stage.forEach(flow -> {
+            if (errorFlow == flow.rule().id().value()) {
+                thisSuccess.set(false);
+            } else {
+                switch (flow.type()) {
+                    case ADD:
+                    case MODIFY: //TODO is this the right behavior for modify?
+                        flows.add(flow.rule());
+                        break;
+                    case REMOVE:
+                        flows.remove(flow.rule());
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }));
+        if (thisSuccess.get()) {
+            ops.callback().onSuccess(ops);
+        } else {
+            ops.callback().onError(ops);
+        }
+    }
+
+    @Override
+    public int getFlowRuleCount() {
+        return flows.size();
+    }
+
+    @Override
+    public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+        return flows.stream()
+                .filter(flow -> flow.deviceId().equals(deviceId))
+                .map(DefaultFlowEntry::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void applyFlowRules(FlowRule... flowRules) {
+        for (FlowRule flow : flowRules) {
+            flows.add(flow);
+        }
+    }
+
+    @Override
+    public void removeFlowRules(FlowRule... flowRules) {
+        for (FlowRule flow : flowRules) {
+            flows.remove(flow);
+        }
+    }
+
+    @Override
+    public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
+        return flows.stream()
+                .filter(flow -> flow.appId() == appId.id() && flow.groupId().id() == groupId)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Iterable<FlowEntry> getFlowEntriesById(ApplicationId id) {
+        return flows.stream()
+                .filter(flow -> flow.appId() == id.id())
+                .map(DefaultFlowEntry::new)
+                .collect(Collectors.toList());
+    }
+}
+
+
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockP4RuntimeController.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockP4RuntimeController.java
new file mode 100644
index 0000000..5fc42d5
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockP4RuntimeController.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import io.grpc.ManagedChannel;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceAgentListener;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.p4runtime.api.P4RuntimeClient;
+import org.onosproject.p4runtime.api.P4RuntimeController;
+import org.onosproject.p4runtime.api.P4RuntimeEventListener;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+/**
+ * Currently only used to get mock clients that mock counter read requests.
+ */
+public class MockP4RuntimeController implements P4RuntimeController {
+
+    private final P4RuntimeClient mockP4rtClient;
+
+    /**
+     * Used to mock counter read requests.
+     *
+     * @param deviceId The ID of the device
+     * @param packets Packets counter value
+     * @param bytes Bytes counter value
+     * @param counterSize The size of the counter array
+     */
+    public MockP4RuntimeController(DeviceId deviceId, long packets, long bytes, int counterSize) {
+        mockP4rtClient = createMock(P4RuntimeClient.class);
+        expect(mockP4rtClient.read(anyLong(), anyObject(PiPipeconf.class)))
+                .andReturn(new MockReadRequest(deviceId, packets, bytes, counterSize))
+                .anyTimes();
+        replay(mockP4rtClient);
+    }
+
+    @Override
+    public P4RuntimeClient get(DeviceId deviceId) {
+        return mockP4rtClient;
+    }
+
+    @Override
+    public void addListener(P4RuntimeEventListener listener) {
+
+    }
+
+    @Override
+    public void removeListener(P4RuntimeEventListener listener) {
+
+    }
+
+    @Override
+    public boolean create(DeviceId deviceId, ManagedChannel channel) {
+        return false;
+    }
+
+    @Override
+    public void remove(DeviceId deviceId) {
+
+    }
+
+    @Override
+    public void addDeviceAgentListener(DeviceId deviceId, ProviderId providerId,
+                                       DeviceAgentListener listener) {
+
+    }
+
+    @Override
+    public void removeDeviceAgentListener(DeviceId deviceId,
+                                          ProviderId providerId) {
+
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPacketService.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPacketService.java
new file mode 100644
index 0000000..8dc0928
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPacketService.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.Queues;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.packet.OutboundPacket;
+import org.onosproject.net.packet.PacketPriority;
+import org.onosproject.net.packet.PacketProcessor;
+import org.onosproject.net.packet.PacketProcessorEntry;
+import org.onosproject.net.packet.PacketRequest;
+import org.onosproject.net.packet.PacketService;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+
+public class MockPacketService implements PacketService {
+
+    Queue<OutboundPacket> emittedPackets = Queues.newArrayDeque();
+
+    @Override
+    public void addProcessor(PacketProcessor processor, int priority) {
+
+    }
+
+    @Override
+    public void removeProcessor(PacketProcessor processor) {
+
+    }
+
+    @Override
+    public List<PacketProcessorEntry> getProcessors() {
+        return null;
+    }
+
+    @Override
+    public void requestPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
+
+    }
+
+    @Override
+    public void requestPackets(TrafficSelector selector, PacketPriority priority,
+                               ApplicationId appId, Optional<DeviceId> deviceId) {
+
+    }
+
+    @Override
+    public void cancelPackets(TrafficSelector selector, PacketPriority priority, ApplicationId appId) {
+
+    }
+
+    @Override
+    public void cancelPackets(TrafficSelector selector, PacketPriority priority,
+                              ApplicationId appId, Optional<DeviceId> deviceId) {
+
+    }
+
+    @Override
+    public List<PacketRequest> getRequests() {
+        return null;
+    }
+
+    @Override
+    public void emit(OutboundPacket packet) {
+        emittedPackets.add(packet);
+    }
+}
+
+
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipeconfService.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipeconfService.java
new file mode 100644
index 0000000..20c7839
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipeconfService.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiPipeconf;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.model.PiTableModel;
+import org.onosproject.net.pi.service.PiPipeconfListener;
+import org.onosproject.net.pi.service.PiPipeconfService;
+
+import java.util.Collection;
+import java.util.Optional;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class MockPiPipeconfService implements PiPipeconfService {
+
+    private final PiPipeconf mockPiPipeconf;
+
+    public MockPiPipeconfService(Collection<PiTableModel> tables,
+                                 Collection<PiCounterModel> counters) {
+        mockPiPipeconf = createMock(PiPipeconf.class);
+        expect(mockPiPipeconf.pipelineModel())
+                .andReturn(new MockPiPipelineModel(tables, counters))
+                .anyTimes();
+        replay(mockPiPipeconf);
+    }
+
+    @Override
+    public Optional<PiPipeconf> getPipeconf(PiPipeconfId id) {
+        return Optional.of(mockPiPipeconf);
+    }
+
+    @Override
+    public Optional<PiPipeconf> getPipeconf(DeviceId deviceId) {
+        return Optional.of(mockPiPipeconf);
+    }
+
+    @Override
+    public void register(PiPipeconf pipeconf) throws IllegalStateException {
+
+    }
+
+    @Override
+    public void unregister(PiPipeconfId pipeconfId) throws IllegalStateException {
+
+    }
+
+    @Override
+    public Iterable<PiPipeconf> getPipeconfs() {
+        return null;
+    }
+
+    @Override
+    public void bindToDevice(PiPipeconfId pipeconfId, DeviceId deviceId) {
+
+    }
+
+    @Override
+    public String getMergedDriver(DeviceId deviceId, PiPipeconfId pipeconfId) {
+        return null;
+    }
+
+    @Override
+    public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public void addListener(PiPipeconfListener listener) {
+
+    }
+
+    @Override
+    public void removeListener(PiPipeconfListener listener) {
+
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipelineModel.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipelineModel.java
new file mode 100644
index 0000000..db438b5
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockPiPipelineModel.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.google.common.collect.Maps;
+import org.onosproject.net.pi.model.PiActionProfileId;
+import org.onosproject.net.pi.model.PiActionProfileModel;
+import org.onosproject.net.pi.model.PiCounterId;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.model.PiMeterModel;
+import org.onosproject.net.pi.model.PiPacketOperationModel;
+import org.onosproject.net.pi.model.PiPacketOperationType;
+import org.onosproject.net.pi.model.PiPipelineModel;
+import org.onosproject.net.pi.model.PiRegisterId;
+import org.onosproject.net.pi.model.PiRegisterModel;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.model.PiTableModel;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class MockPiPipelineModel implements PiPipelineModel {
+
+    private final Map<PiTableId, PiTableModel> tableMap = Maps.newHashMap();
+
+    private final List<PiCounterModel> counters;
+
+    public MockPiPipelineModel(Collection<PiTableModel> tables, Collection<PiCounterModel> counters) {
+        tables.forEach(tableModel -> tableMap.put(tableModel.id(), tableModel));
+        this.counters = List.copyOf(counters);
+    }
+
+    @Override
+    public Optional<PiTableModel> table(PiTableId tableId) {
+        return Optional.ofNullable(tableMap.getOrDefault(tableId, null));
+    }
+
+    @Override
+    public Collection<PiTableModel> tables() {
+        return tableMap.values();
+    }
+
+    @Override
+    public Optional<PiCounterModel> counter(PiCounterId counterId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Collection<PiCounterModel> counters() {
+        return counters;
+    }
+
+    @Override
+    public Optional<PiMeterModel> meter(PiMeterId meterId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Collection<PiMeterModel> meters() {
+        return null;
+    }
+
+    @Override
+    public Optional<PiRegisterModel> register(PiRegisterId registerId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Collection<PiRegisterModel> registers() {
+        return null;
+    }
+
+    @Override
+    public Optional<PiActionProfileModel> actionProfiles(PiActionProfileId actionProfileId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Collection<PiActionProfileModel> actionProfiles() {
+        return null;
+    }
+
+    @Override
+    public Optional<PiPacketOperationModel> packetOperationModel(PiPacketOperationType type) {
+        return Optional.empty();
+    }
+
+
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadRequest.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadRequest.java
new file mode 100644
index 0000000..0f731ea
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadRequest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiActionProfileId;
+import org.onosproject.net.pi.model.PiCounterId;
+import org.onosproject.net.pi.model.PiMeterId;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.runtime.PiCounterCellHandle;
+import org.onosproject.net.pi.runtime.PiCounterCellId;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.LongStream;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * For faking reads to a p4runtime client. Currently only used for testing
+ * UP4-specific counter reads, because all other P4 entities that UP4 reads can
+ * be read via other ONOS services.
+ */
+public class MockReadRequest implements P4RuntimeReadClient.ReadRequest {
+    List<PiHandle> handles;
+    DeviceId deviceId;
+    long packets;
+    long bytes;
+    int counterSize;
+
+    public MockReadRequest(DeviceId deviceId, long packets, long bytes, int counterSize) {
+        this.handles = new ArrayList<>();
+        this.deviceId = deviceId;
+        this.packets = packets;
+        this.bytes = bytes;
+        this.counterSize = counterSize;
+    }
+
+    @Override
+    public CompletableFuture<P4RuntimeReadClient.ReadResponse> submit() {
+        return CompletableFuture.completedFuture(
+                new MockReadResponse(this.handles, this.packets, this.bytes));
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadResponse submitSync() {
+        return new MockReadResponse(this.handles, this.packets, this.bytes);
+    }
+
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest handle(PiHandle handle) {
+        this.handles.add(handle);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest handles(Iterable<? extends PiHandle> handles) {
+        checkNotNull(handles);
+        handles.forEach(this::handle);
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest tableEntries(PiTableId tableId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest tableEntries(Iterable<PiTableId> tableIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest defaultTableEntry(PiTableId tableId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest defaultTableEntry(Iterable<PiTableId> tableIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileGroups(PiActionProfileId actionProfileId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileGroups(Iterable<PiActionProfileId> actionProfileIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileMembers(PiActionProfileId actionProfileId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest actionProfileMembers(Iterable<PiActionProfileId> actionProfileIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest counterCells(PiCounterId counterId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest counterCells(Iterable<PiCounterId> counterIds) {
+        counterIds.forEach(counterId -> {
+            LongStream.range(0, this.counterSize)
+                    .forEach(index -> {
+                        PiCounterCellId cellId =
+                                PiCounterCellId.ofIndirect(counterId, index);
+                        PiCounterCellHandle handle =
+                                PiCounterCellHandle.of(this.deviceId, cellId);
+                        this.handle(handle);
+                    });
+        });
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directCounterCells(PiTableId tableId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directCounterCells(Iterable<PiTableId> tableIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest meterCells(PiMeterId meterId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest meterCells(Iterable<PiMeterId> meterIds) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directMeterCells(PiTableId tableId) {
+        return this;
+    }
+
+    @Override
+    public P4RuntimeReadClient.ReadRequest directMeterCells(Iterable<PiTableId> tableIds) {
+        return this;
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadResponse.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadResponse.java
new file mode 100644
index 0000000..2dbc11f
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockReadResponse.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onosproject.net.pi.runtime.PiCounterCell;
+import org.onosproject.net.pi.runtime.PiCounterCellData;
+import org.onosproject.net.pi.runtime.PiCounterCellHandle;
+import org.onosproject.net.pi.runtime.PiEntity;
+import org.onosproject.net.pi.runtime.PiEntityType;
+import org.onosproject.net.pi.runtime.PiHandle;
+import org.onosproject.p4runtime.api.P4RuntimeReadClient;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * For faking reads to a p4runtime client. Currently only used for testing
+ * UP4-specific counter reads, because all other P4 entities that UP4 reads can
+ * be read via other ONOS services.
+ */
+public class MockReadResponse implements P4RuntimeReadClient.ReadResponse {
+    List<PiEntity> entities;
+    long packets;
+    long bytes;
+
+    public MockReadResponse(Iterable<? extends PiHandle> handles, long packets, long bytes) {
+        this.entities = new ArrayList<>();
+        this.packets = packets;
+        this.bytes = bytes;
+        checkNotNull(handles);
+        handles.forEach(this::handle);
+    }
+
+    @Override
+    public boolean isSuccess() {
+        return true;
+    }
+
+    public MockReadResponse handle(PiHandle handle) {
+        if (handle.entityType().equals(PiEntityType.COUNTER_CELL)) {
+            PiCounterCellHandle counterHandle = (PiCounterCellHandle) handle;
+            PiCounterCellData data =
+                    new PiCounterCellData(this.packets, this.bytes);
+            PiEntity entity = new PiCounterCell(counterHandle.cellId(), data);
+            this.entities.add(entity);
+        }
+        // Only handles counter cell so far
+
+        return this;
+    }
+
+    @Override
+    public Collection<PiEntity> all() {
+        return this.entities;
+    }
+
+    @Override
+    public <E extends PiEntity> Collection<E> all(Class<E> clazz) {
+        List<E> results = new ArrayList<>();
+        this.entities.forEach(ent -> {
+            if (ent.getClass().equals(clazz)) {
+                results.add(clazz.cast(ent));
+            }
+        });
+        return results;
+    }
+
+    @Override
+    public String explanation() {
+        return null;
+    }
+
+    @Override
+    public Throwable throwable() {
+        return null;
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockTableModel.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockTableModel.java
new file mode 100644
index 0000000..6584c18
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/MockTableModel.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onosproject.net.pi.model.PiActionId;
+import org.onosproject.net.pi.model.PiActionModel;
+import org.onosproject.net.pi.model.PiActionProfileModel;
+import org.onosproject.net.pi.model.PiCounterModel;
+import org.onosproject.net.pi.model.PiMatchFieldId;
+import org.onosproject.net.pi.model.PiMatchFieldModel;
+import org.onosproject.net.pi.model.PiMeterModel;
+import org.onosproject.net.pi.model.PiTableId;
+import org.onosproject.net.pi.model.PiTableModel;
+import org.onosproject.net.pi.model.PiTableType;
+
+import java.util.Collection;
+import java.util.Optional;
+
+public class MockTableModel implements PiTableModel {
+    PiTableId id;
+    int size;
+
+    public MockTableModel(PiTableId id, int size) {
+        this.id = id;
+        this.size = size;
+    }
+
+    @Override
+    public PiTableId id() {
+        return this.id;
+    }
+
+    @Override
+    public PiTableType tableType() {
+        return null;
+    }
+
+    @Override
+    public PiActionProfileModel actionProfile() {
+        return null;
+    }
+
+    @Override
+    public long maxSize() {
+        return size;
+    }
+
+    @Override
+    public Collection<PiCounterModel> counters() {
+        return null;
+    }
+
+    @Override
+    public Collection<PiMeterModel> meters() {
+        return null;
+    }
+
+    @Override
+    public boolean supportsAging() {
+        return false;
+    }
+
+    @Override
+    public Collection<PiMatchFieldModel> matchFields() {
+        return null;
+    }
+
+    @Override
+    public Collection<PiActionModel> actions() {
+        return null;
+    }
+
+    @Override
+    public Optional<PiActionModel> constDefaultAction() {
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean isConstantTable() {
+        return false;
+    }
+
+    @Override
+    public Optional<PiActionModel> action(PiActionId actionId) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<PiMatchFieldModel> matchField(PiMatchFieldId matchFieldId) {
+        return Optional.empty();
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
new file mode 100644
index 0000000..8500b9a
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onlab.packet.Ip4Address;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TestConsistentMap;
+import org.onosproject.store.service.TestDistributedSet;
+
+import java.util.Set;
+
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.BUFFER_FAR_ID_SET_NAME;
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.FAR_ID_MAP_NAME;
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.FAR_ID_UE_MAP_NAME;
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.SERIALIZER;
+
+
+public final class TestDistributedFabricUpfStore {
+
+    private TestDistributedFabricUpfStore() {
+    }
+
+    public static DistributedFabricUpfStore build() {
+        var store = new DistributedFabricUpfStore();
+        TestConsistentMap.Builder<UpfRuleIdentifier, Integer> farIdMapBuilder =
+                TestConsistentMap.builder();
+        farIdMapBuilder.withName(FAR_ID_MAP_NAME)
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(SERIALIZER.build()));
+        store.farIdMap = farIdMapBuilder.build();
+
+        TestDistributedSet.Builder<UpfRuleIdentifier> bufferFarIdsBuilder =
+                TestDistributedSet.builder();
+        bufferFarIdsBuilder
+                .withName(BUFFER_FAR_ID_SET_NAME)
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(SERIALIZER.build()));
+        store.bufferFarIds = bufferFarIdsBuilder.build().asDistributedSet();
+
+        TestConsistentMap.Builder<UpfRuleIdentifier, Set<Ip4Address>> farIdToUeAddrsBuilder =
+                TestConsistentMap.builder();
+        farIdToUeAddrsBuilder
+                .withName(FAR_ID_UE_MAP_NAME)
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(SERIALIZER.build()));
+        store.farIdToUeAddrs = farIdToUeAddrsBuilder.build();
+
+        store.activate();
+
+        // Init with some translation state.
+        store.farIdMap.put(
+                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID),
+                TestUpfConstants.UPLINK_PHYSICAL_FAR_ID);
+        store.farIdMap.put(
+                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID),
+                TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID);
+
+        return store;
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
new file mode 100644
index 0000000..c24f382
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
+import org.onlab.util.ImmutableByteSequence;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.upf.ForwardingActionRule;
+import org.onosproject.net.behaviour.upf.PacketDetectionRule;
+import org.onosproject.net.behaviour.upf.UpfInterface;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.criteria.PiCriterion;
+import org.onosproject.net.pi.runtime.PiAction;
+import org.onosproject.net.pi.runtime.PiActionParam;
+
+import java.util.Arrays;
+
+import static org.onosproject.pipelines.fabric.FabricConstants.CTR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.DROP;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_DOWNLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_FARS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_INTERFACES;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_IFACE;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_NORMAL_FAR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_PDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_PDR_QOS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_LOAD_TUNNEL_FAR;
+import static org.onosproject.pipelines.fabric.FabricConstants.FABRIC_INGRESS_SPGW_UPLINK_PDRS;
+import static org.onosproject.pipelines.fabric.FabricConstants.FAR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_FAR_ID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_GTPU_IS_VALID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_IPV4_DST_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TEID;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_TUNNEL_IPV4_DST;
+import static org.onosproject.pipelines.fabric.FabricConstants.HDR_UE_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.NEEDS_GTPU_DECAP;
+import static org.onosproject.pipelines.fabric.FabricConstants.NOTIFY_CP;
+import static org.onosproject.pipelines.fabric.FabricConstants.QID;
+import static org.onosproject.pipelines.fabric.FabricConstants.SRC_IFACE;
+import static org.onosproject.pipelines.fabric.FabricConstants.TEID;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_DST_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_SRC_ADDR;
+import static org.onosproject.pipelines.fabric.FabricConstants.TUNNEL_SRC_PORT;
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.FabricUpfTranslator.INTERFACE_ACCESS;
+import static org.onosproject.pipelines.fabric.impl.behaviour.upf.FabricUpfTranslator.INTERFACE_CORE;
+
+
+public final class TestUpfConstants {
+    public static final DeviceId DEVICE_ID = DeviceId.deviceId("CoolSwitch91");
+    public static final ApplicationId APP_ID = new DefaultApplicationId(5000, "up4");
+    public static final int DEFAULT_PRIORITY = 10;
+    // SESSION_ID_BITWIDTH / 8 = 12
+    public static final ImmutableByteSequence SESSION_ID = ImmutableByteSequence.ofOnes(12);
+    public static final int UPLINK_COUNTER_CELL_ID = 1;
+    public static final int DOWNLINK_COUNTER_CELL_ID = 2;
+    public static final int PDR_ID = 0;  // TODO: PDR ID currently not stored on writes, so all reads are 0
+    public static final int UPLINK_FAR_ID = 1;
+    public static final int UPLINK_PHYSICAL_FAR_ID = 4;
+    public static final int DOWNLINK_FAR_ID = 2;
+    public static final int DOWNLINK_PHYSICAL_FAR_ID = 5;
+
+    public static final int UPLINK_PRIORITY = 9;
+    public static final int DOWNLINK_PRIORITY = 1;
+    public static final int UPLINK_QID = 1;
+    public static final int DOWNLINK_QID = 5;
+    public static final int DEFAULT_SCHEDULING_PRIORITY = 0;
+
+    public static final ImmutableByteSequence TEID_VALUE = ImmutableByteSequence.copyFrom(0xff);
+    public static final Ip4Address UE_ADDR = Ip4Address.valueOf("17.0.0.1");
+    public static final Ip4Address S1U_ADDR = Ip4Address.valueOf("192.168.0.1");
+    public static final Ip4Address ENB_ADDR = Ip4Address.valueOf("192.168.0.2");
+    public static final Ip4Prefix UE_POOL = Ip4Prefix.valueOf("17.0.0.0/16");
+    // TODO: tunnel source port currently not stored on writes, so all reads are 0
+    public static final short TUNNEL_SPORT = 2160;
+    public static final int PHYSICAL_COUNTER_SIZE = 512;
+    public static final int PHYSICAL_MAX_PDRS = 512;
+    public static final int PHYSICAL_MAX_FARS = 512;
+
+    public static final long COUNTER_BYTES = 12;
+    public static final long COUNTER_PKTS = 15;
+
+    public static final PacketDetectionRule UPLINK_PDR = PacketDetectionRule.builder()
+            .withTunnelDst(S1U_ADDR)
+            .withTeid(TEID_VALUE)
+            .withLocalFarId(UPLINK_FAR_ID)
+            .withSessionId(SESSION_ID)
+            .withCounterId(UPLINK_COUNTER_CELL_ID)
+            .withSchedulingPriority(DEFAULT_SCHEDULING_PRIORITY)
+            .build();
+
+    public static final PacketDetectionRule DOWNLINK_PDR = PacketDetectionRule.builder()
+            .withUeAddr(UE_ADDR)
+            .withLocalFarId(DOWNLINK_FAR_ID)
+            .withSessionId(SESSION_ID)
+            .withCounterId(DOWNLINK_COUNTER_CELL_ID)
+            .withSchedulingPriority(DEFAULT_SCHEDULING_PRIORITY)
+            .build();
+
+    public static final PacketDetectionRule UPLINK_PRIORITY_PDR = PacketDetectionRule.builder()
+            .withTunnelDst(S1U_ADDR)
+            .withTeid(TEID_VALUE)
+            .withLocalFarId(UPLINK_FAR_ID)
+            .withSessionId(SESSION_ID)
+            .withCounterId(UPLINK_COUNTER_CELL_ID)
+            .withSchedulingPriority(UPLINK_PRIORITY)
+            .build();
+
+    public static final PacketDetectionRule DOWNLINK_PRIORITY_PDR = PacketDetectionRule.builder()
+            .withUeAddr(UE_ADDR)
+            .withLocalFarId(DOWNLINK_FAR_ID)
+            .withSessionId(SESSION_ID)
+            .withCounterId(DOWNLINK_COUNTER_CELL_ID)
+            .withSchedulingPriority(DOWNLINK_PRIORITY)
+            .build();
+
+    public static final ForwardingActionRule UPLINK_FAR = ForwardingActionRule.builder()
+            .setFarId(UPLINK_FAR_ID)
+            .withSessionId(SESSION_ID).build();
+
+    public static final ForwardingActionRule DOWNLINK_FAR = ForwardingActionRule.builder()
+            .setFarId(DOWNLINK_FAR_ID)
+            .withSessionId(SESSION_ID)
+            .setTunnel(S1U_ADDR, ENB_ADDR, TEID_VALUE, TUNNEL_SPORT)
+            .build();
+
+    public static final UpfInterface UPLINK_INTERFACE = UpfInterface.createS1uFrom(S1U_ADDR);
+
+    public static final UpfInterface DOWNLINK_INTERFACE = UpfInterface.createUePoolFrom(UE_POOL);
+
+    public static final FlowRule FABRIC_UPLINK_PRIORITY_PDR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_UPLINK_PDRS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_TEID, TEID_VALUE.asArray())
+                                                   .matchExact(HDR_TUNNEL_IPV4_DST, S1U_ADDR.toInt())
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_PDR_QOS)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(CTR_ID, UPLINK_COUNTER_CELL_ID),
+                                                                  new PiActionParam(FAR_ID, UPLINK_PHYSICAL_FAR_ID),
+                                                                  new PiActionParam(NEEDS_GTPU_DECAP, 1),
+                                                                  new PiActionParam(QID, UPLINK_QID)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_DOWNLINK_PRIORITY_PDR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_UE_ADDR, UE_ADDR.toInt())
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_PDR_QOS)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(CTR_ID, DOWNLINK_COUNTER_CELL_ID),
+                                                                  new PiActionParam(FAR_ID, DOWNLINK_PHYSICAL_FAR_ID),
+                                                                  new PiActionParam(NEEDS_GTPU_DECAP, 0),
+                                                                  new PiActionParam(QID, DOWNLINK_QID)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_UPLINK_PDR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_UPLINK_PDRS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_TEID, TEID_VALUE.asArray())
+                                                   .matchExact(HDR_TUNNEL_IPV4_DST, S1U_ADDR.toInt())
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_PDR)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(CTR_ID, UPLINK_COUNTER_CELL_ID),
+                                                                  new PiActionParam(FAR_ID, UPLINK_PHYSICAL_FAR_ID),
+                                                                  new PiActionParam(NEEDS_GTPU_DECAP, 1)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_DOWNLINK_PDR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_DOWNLINK_PDRS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_UE_ADDR, UE_ADDR.toInt())
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_PDR)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(CTR_ID, DOWNLINK_COUNTER_CELL_ID),
+                                                                  new PiActionParam(FAR_ID, DOWNLINK_PHYSICAL_FAR_ID),
+                                                                  new PiActionParam(NEEDS_GTPU_DECAP, 0)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_UPLINK_FAR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_FARS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_FAR_ID, UPLINK_PHYSICAL_FAR_ID)
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_NORMAL_FAR)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(DROP, 0),
+                                                                  new PiActionParam(NOTIFY_CP, 0)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_DOWNLINK_FAR = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_FARS)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchExact(HDR_FAR_ID, DOWNLINK_PHYSICAL_FAR_ID)
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_TUNNEL_FAR)
+                                                          .withParameters(Arrays.asList(
+                                                                  new PiActionParam(DROP, 0),
+                                                                  new PiActionParam(NOTIFY_CP, 0),
+                                                                  new PiActionParam(TEID, TEID_VALUE),
+                                                                  new PiActionParam(TUNNEL_SRC_ADDR, S1U_ADDR.toInt()),
+                                                                  new PiActionParam(TUNNEL_DST_ADDR, ENB_ADDR.toInt()),
+                                                                  new PiActionParam(TUNNEL_SRC_PORT, TUNNEL_SPORT)
+                                                          ))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_UPLINK_INTERFACE = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_INTERFACES)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchLpm(HDR_IPV4_DST_ADDR,
+                                                             S1U_ADDR.toInt(),
+                                                             32)
+                                                   .matchExact(HDR_GTPU_IS_VALID, 1)
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(
+                                           PiAction.builder()
+                                                   .withId(FABRIC_INGRESS_SPGW_LOAD_IFACE)
+                                                   .withParameter(new PiActionParam(SRC_IFACE, INTERFACE_ACCESS))
+                                                   .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    public static final FlowRule FABRIC_DOWNLINK_INTERFACE = DefaultFlowRule.builder()
+            .forDevice(DEVICE_ID).fromApp(APP_ID).makePermanent()
+            .forTable(FABRIC_INGRESS_SPGW_INTERFACES)
+            .withSelector(DefaultTrafficSelector.builder()
+                                  .matchPi(PiCriterion.builder()
+                                                   .matchLpm(HDR_IPV4_DST_ADDR,
+                                                             UE_POOL.address().toInt(),
+                                                             UE_POOL.prefixLength())
+                                                   .matchExact(HDR_GTPU_IS_VALID, 0)
+                                                   .build()).build())
+            .withTreatment(DefaultTrafficTreatment.builder()
+                                   .piTableAction(PiAction.builder()
+                                                          .withId(FABRIC_INGRESS_SPGW_LOAD_IFACE)
+                                                          .withParameter(new PiActionParam(SRC_IFACE, INTERFACE_CORE))
+                                                          .build()).build())
+            .withPriority(DEFAULT_PRIORITY)
+            .build();
+
+    /**
+     * Hidden constructor for utility class.
+     */
+    private TestUpfConstants() {
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfUtils.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfUtils.java
new file mode 100644
index 0000000..6280f8e
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.config.basics.BasicDeviceConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public final class TestUpfUtils {
+
+    private static final String BASIC_CONFIG_KEY = "basic";
+
+    private TestUpfUtils() {
+        // hide constructor
+    }
+
+    public static BasicDeviceConfig getBasicConfig(DeviceId deviceId, String fileName)
+            throws IOException {
+        BasicDeviceConfig basicCfg = new BasicDeviceConfig();
+        InputStream jsonStream = TestUpfUtils.class.getResourceAsStream(fileName);
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNode = mapper.readTree(jsonStream);
+        basicCfg.init(deviceId, BASIC_CONFIG_KEY, jsonNode, mapper, config -> {
+        });
+        return basicCfg;
+    }
+}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java
new file mode 100644
index 0000000..fe4c590
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Unit tests for UPF programmable behaviour.
+ */
+package org.onosproject.pipelines.fabric.impl.behaviour.upf;
diff --git a/pipelines/fabric/impl/src/test/resources/basic.json b/pipelines/fabric/impl/src/test/resources/basic.json
new file mode 100644
index 0000000..7f4007f
--- /dev/null
+++ b/pipelines/fabric/impl/src/test/resources/basic.json
@@ -0,0 +1,3 @@
+{
+  "managementAddress" : "grpc://localhost:1234?device_id=1"
+}
\ No newline at end of file