[SDFAB-338] Revisit DistributedFabricUpfStore in fabric.p4
Remove the global far id consistent map and uses consistent hashing
for the allocation of the farId. Stores the reverse lookup (far -> ruleId)
into an EC consistent map and purges its entries on the far removal
Change-Id: Ia401d081c9dcb28329fab4424b1fcd3d8e8ad916
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
index cf09c02..3bf3b26 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
@@ -16,8 +16,8 @@
package org.onosproject.pipelines.fabric.impl.behaviour.upf;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestConsistentMap;
+import org.onosproject.store.service.TestEventuallyConsistentMap;
+import org.onosproject.store.service.WallClockTimestamp;
import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.FAR_ID_MAP_NAME;
import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.SERIALIZER;
@@ -30,22 +30,20 @@
public static DistributedFabricUpfStore build() {
var store = new DistributedFabricUpfStore();
- TestConsistentMap.Builder<UpfRuleIdentifier, Integer> farIdMapBuilder =
- TestConsistentMap.builder();
- farIdMapBuilder.withName(FAR_ID_MAP_NAME)
- .withRelaxedReadConsistency()
- .withSerializer(Serializer.using(SERIALIZER.build()));
- store.farIdMap = farIdMapBuilder.build();
+ TestEventuallyConsistentMap.Builder<Integer, UpfRuleIdentifier> reverseFarIdMapBuilder =
+ TestEventuallyConsistentMap.builder();
+ reverseFarIdMapBuilder.withName(FAR_ID_MAP_NAME)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withSerializer(SERIALIZER.build());
+ store.reverseFarIdMap = reverseFarIdMapBuilder.build();
store.activate();
// Init with some translation state.
- store.farIdMap.put(
- new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID),
- TestUpfConstants.UPLINK_PHYSICAL_FAR_ID);
- store.farIdMap.put(
- new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID),
- TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID);
+ store.reverseFarIdMap.put(TestUpfConstants.UPLINK_PHYSICAL_FAR_ID,
+ new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID));
+ store.reverseFarIdMap.put(TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID,
+ new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID));
return store;
}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
index c24f382..6fee244 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
@@ -16,6 +16,7 @@
package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+import com.google.common.hash.Hashing;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onlab.util.ImmutableByteSequence;
@@ -75,9 +76,19 @@
public static final int DOWNLINK_COUNTER_CELL_ID = 2;
public static final int PDR_ID = 0; // TODO: PDR ID currently not stored on writes, so all reads are 0
public static final int UPLINK_FAR_ID = 1;
- public static final int UPLINK_PHYSICAL_FAR_ID = 4;
+ public static final int UPLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+ .newHasher()
+ .putInt(UPLINK_FAR_ID)
+ .putBytes(SESSION_ID.asArray())
+ .hash()
+ .asInt();
public static final int DOWNLINK_FAR_ID = 2;
- public static final int DOWNLINK_PHYSICAL_FAR_ID = 5;
+ public static final int DOWNLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+ .newHasher()
+ .putInt(DOWNLINK_FAR_ID)
+ .putBytes(SESSION_ID.asArray())
+ .hash()
+ .asInt();
public static final int UPLINK_PRIORITY = 9;
public static final int DOWNLINK_PRIORITY = 1;