[SDFAB-338] Revisit DistributedFabricUpfStore in fabric.p4

Remove the global far id consistent map and uses consistent hashing
for the allocation of the farId. Stores the reverse lookup (far -> ruleId)
into an EC consistent map and purges its entries on the far removal

Change-Id: Ia401d081c9dcb28329fab4424b1fcd3d8e8ad916
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
index af3a297..31dd975 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
@@ -46,10 +46,10 @@
             return;
         }
 
-        Map<UpfRuleIdentifier, Integer> farIdMap = upfStore.getFarIdMap();
-        print("farIdMap size: " + farIdMap.size());
+        Map<Integer, UpfRuleIdentifier> reverseFarIdMap = upfStore.getReverseFarIdMap();
+        print("reverseFarIdMap size: " + reverseFarIdMap.size());
         if (verbose) {
-            farIdMap.entrySet().forEach(entry -> print(entry.toString()));
+            reverseFarIdMap.entrySet().forEach(entry -> print(entry.toString()));
         }
     }
 }
\ No newline at end of file
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
index 02923c2..21c6054 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
@@ -18,15 +18,15 @@
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.Maps;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
 import org.onlab.util.ImmutableByteSequence;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.EventuallyConsistentMap;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -36,7 +36,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Distributed implementation of FabricUpfStore.
@@ -67,68 +67,67 @@
             .put(9, 1)
             .build();
 
-    // Distributed local FAR ID to global FAR ID mapping
-    protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap;
-    private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener;
-    // Local, reversed copy of farIdMapper for better reverse lookup performance
-    protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap;
-    private int nextGlobalFarId = 1;
+    // EC map to remember the mapping far_id -> rule_id this is mostly used during reads,
+    // it can be definitely removed by simplifying the logical pipeline
+    protected EventuallyConsistentMap<Integer, UpfRuleIdentifier> reverseFarIdMap;
 
     @Activate
     protected void activate() {
-        // Allow unit test to inject farIdMap here.
+        // Allow unit test to inject reverseFarIdMap here.
         if (storageService != null) {
-            this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder()
+            this.reverseFarIdMap = storageService.<Integer, UpfRuleIdentifier>eventuallyConsistentMapBuilder()
                     .withName(FAR_ID_MAP_NAME)
-                    .withRelaxedReadConsistency()
-                    .withSerializer(Serializer.using(SERIALIZER.build()))
+                    .withSerializer(SERIALIZER)
+                    .withTimestampProvider((k, v) -> new WallClockTimestamp())
                     .build();
         }
-        farIdMapListener = new FarIdMapListener();
-        farIdMap.addListener(farIdMapListener);
-
-        reverseFarIdMap = Maps.newHashMap();
-        farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey()));
 
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
-        farIdMap.removeListener(farIdMapListener);
-        farIdMap.destroy();
-        reverseFarIdMap.clear();
+        reverseFarIdMap.destroy();
 
         log.info("Stopped");
     }
 
     @Override
     public void reset() {
-        farIdMap.clear();
         reverseFarIdMap.clear();
-        nextGlobalFarId = 0;
     }
 
     @Override
-    public Map<UpfRuleIdentifier, Integer> getFarIdMap() {
-        return Map.copyOf(farIdMap.asJavaMap());
+    public Map<Integer, UpfRuleIdentifier> getReverseFarIdMap() {
+        return reverseFarIdMap.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
     }
 
     @Override
     public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
-        int globalFarId = farIdMap.compute(farIdPair,
-                (k, existingId) -> {
-                    return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++);
-                }).value();
+        int globalFarId = getGlobalFarIdOf(farIdPair);
+        reverseFarIdMap.put(globalFarId, farIdPair);
         log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
         return globalFarId;
     }
 
     @Override
+    public int removeGlobalFarId(UpfRuleIdentifier farIdPair) {
+        int globalFarId = getGlobalFarIdOf(farIdPair);
+        reverseFarIdMap.remove(globalFarId);
+        return globalFarId;
+    }
+
+    @Override
     public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
         UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
         return globalFarIdOf(farId);
+    }
 
+    @Override
+    public int removeGlobalFarId(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
+        UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
+        return removeGlobalFarId(farId);
     }
 
     @Override
@@ -146,25 +145,14 @@
         return reverseFarIdMap.get(globalFarId);
     }
 
-    // NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that
-    //       reverseFarIdMap update always finishes right after farIdMap is updated
-    private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> {
-        @Override
-        public void event(MapEvent<UpfRuleIdentifier, Integer> event) {
-            switch (event.type()) {
-                case INSERT:
-                    reverseFarIdMap.put(event.newValue().value(), event.key());
-                    break;
-                case UPDATE:
-                    reverseFarIdMap.remove(event.oldValue().value());
-                    reverseFarIdMap.put(event.newValue().value(), event.key());
-                    break;
-                case REMOVE:
-                    reverseFarIdMap.remove(event.oldValue().value());
-                    break;
-                default:
-                    break;
-            }
-        }
+    // Compute global far id by hashing the pfcp session id and the session local far
+    private int getGlobalFarIdOf(UpfRuleIdentifier farIdPair) {
+        HashFunction hashFunction = Hashing.murmur3_32();
+        HashCode hashCode = hashFunction.newHasher()
+                .putInt(farIdPair.getSessionLocalId())
+                .putBytes(farIdPair.getPfcpSessionId().asArray())
+                .hash();
+        return hashCode.asInt();
     }
+
 }
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
index 3735238..401b5db 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
@@ -554,7 +554,7 @@
         log.info("Removing {}", far.toString());
 
         PiCriterion match = PiCriterion.builder()
-                .matchExact(HDR_FAR_ID, fabricUpfStore.globalFarIdOf(far.sessionId(), far.farId()))
+                .matchExact(HDR_FAR_ID, fabricUpfStore.removeGlobalFarId(far.sessionId(), far.farId()))
                 .build();
 
         removeEntry(match, FABRIC_INGRESS_SPGW_FARS, false);
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
index 9b5d853..ae3b0e9 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
@@ -30,11 +30,11 @@
     void reset();
 
     /**
-     * Returns the farIdMap.
+     * Returns the reverseFarIdMap.
      *
      * @return the farIdMap.
      */
-    Map<UpfRuleIdentifier, Integer> getFarIdMap();
+    Map<Integer, UpfRuleIdentifier> getReverseFarIdMap();
 
     /**
      * Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
@@ -46,6 +46,14 @@
     int globalFarIdOf(UpfRuleIdentifier farIdPair);
 
     /**
+     * Remove the global far id from the system.
+     *
+     * @param farIdPair a RuleIdentifier instance uniquely identifying the FAR
+     * @return A globally unique integer identifier
+     */
+    int removeGlobalFarId(UpfRuleIdentifier farIdPair);
+
+    /**
      * Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
      * ID) pair.
      *
@@ -56,6 +64,15 @@
     int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId);
 
     /**
+     * Remove the global far id from the system.
+     *
+     * @param pfcpSessionId     The ID of the PFCP session that produced the FAR ID.
+     * @param sessionLocalFarId The FAR ID.
+     * @return A globally unique integer identifier
+     */
+    int removeGlobalFarId(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId);
+
+    /**
      * Get the corresponding PFCP session ID and session-local FAR ID from a globally unique FAR ID,
      * or return null if no such mapping is found.
      *
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
index c8cce84..2341e04 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
@@ -136,6 +136,9 @@
         // Grab keys and parameters that are present for all PDRs
         int globalFarId = FabricUpfTranslatorUtil.getParamInt(action, FAR_ID);
         UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+        if (farId == null) {
+            throw new UpfProgrammableException(String.format("Unable to find local far id of %s", globalFarId));
+        }
 
         PiActionId actionId = action.id();
         if (actionId.equals(FABRIC_INGRESS_SPGW_LOAD_PDR)) {
@@ -186,6 +189,9 @@
 
         int globalFarId = FabricUpfTranslatorUtil.getFieldInt(match, HDR_FAR_ID);
         UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+        if (farId == null) {
+            throw new UpfProgrammableException(String.format("Unable to find local far id of %s", globalFarId));
+        }
 
         boolean dropFlag = FabricUpfTranslatorUtil.getParamInt(action, DROP) > 0;
         boolean notifyFlag = FabricUpfTranslatorUtil.getParamInt(action, NOTIFY_CP) > 0;
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
index cf09c02..3bf3b26 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
@@ -16,8 +16,8 @@
 
 package org.onosproject.pipelines.fabric.impl.behaviour.upf;
 
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestConsistentMap;
+import org.onosproject.store.service.TestEventuallyConsistentMap;
+import org.onosproject.store.service.WallClockTimestamp;
 
 import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.FAR_ID_MAP_NAME;
 import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.SERIALIZER;
@@ -30,22 +30,20 @@
 
     public static DistributedFabricUpfStore build() {
         var store = new DistributedFabricUpfStore();
-        TestConsistentMap.Builder<UpfRuleIdentifier, Integer> farIdMapBuilder =
-                TestConsistentMap.builder();
-        farIdMapBuilder.withName(FAR_ID_MAP_NAME)
-                .withRelaxedReadConsistency()
-                .withSerializer(Serializer.using(SERIALIZER.build()));
-        store.farIdMap = farIdMapBuilder.build();
+        TestEventuallyConsistentMap.Builder<Integer, UpfRuleIdentifier> reverseFarIdMapBuilder =
+                TestEventuallyConsistentMap.builder();
+        reverseFarIdMapBuilder.withName(FAR_ID_MAP_NAME)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .withSerializer(SERIALIZER.build());
+        store.reverseFarIdMap = reverseFarIdMapBuilder.build();
 
         store.activate();
 
         // Init with some translation state.
-        store.farIdMap.put(
-                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID),
-                TestUpfConstants.UPLINK_PHYSICAL_FAR_ID);
-        store.farIdMap.put(
-                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID),
-                TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID);
+        store.reverseFarIdMap.put(TestUpfConstants.UPLINK_PHYSICAL_FAR_ID,
+                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID));
+        store.reverseFarIdMap.put(TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID,
+                new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID));
 
         return store;
     }
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
index c24f382..6fee244 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.pipelines.fabric.impl.behaviour.upf;
 
+import com.google.common.hash.Hashing;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.Ip4Prefix;
 import org.onlab.util.ImmutableByteSequence;
@@ -75,9 +76,19 @@
     public static final int DOWNLINK_COUNTER_CELL_ID = 2;
     public static final int PDR_ID = 0;  // TODO: PDR ID currently not stored on writes, so all reads are 0
     public static final int UPLINK_FAR_ID = 1;
-    public static final int UPLINK_PHYSICAL_FAR_ID = 4;
+    public static final int UPLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+            .newHasher()
+            .putInt(UPLINK_FAR_ID)
+            .putBytes(SESSION_ID.asArray())
+            .hash()
+            .asInt();
     public static final int DOWNLINK_FAR_ID = 2;
-    public static final int DOWNLINK_PHYSICAL_FAR_ID = 5;
+    public static final int DOWNLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+            .newHasher()
+            .putInt(DOWNLINK_FAR_ID)
+            .putBytes(SESSION_ID.asArray())
+            .hash()
+            .asInt();
 
     public static final int UPLINK_PRIORITY = 9;
     public static final int DOWNLINK_PRIORITY = 1;