[SDFAB-338] Revisit DistributedFabricUpfStore in fabric.p4
Remove the global far id consistent map and uses consistent hashing
for the allocation of the farId. Stores the reverse lookup (far -> ruleId)
into an EC consistent map and purges its entries on the far removal
Change-Id: Ia401d081c9dcb28329fab4424b1fcd3d8e8ad916
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
index af3a297..31dd975 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/cli/ReadInternalUpfStoreCommand.java
@@ -46,10 +46,10 @@
return;
}
- Map<UpfRuleIdentifier, Integer> farIdMap = upfStore.getFarIdMap();
- print("farIdMap size: " + farIdMap.size());
+ Map<Integer, UpfRuleIdentifier> reverseFarIdMap = upfStore.getReverseFarIdMap();
+ print("reverseFarIdMap size: " + reverseFarIdMap.size());
if (verbose) {
- farIdMap.entrySet().forEach(entry -> print(entry.toString()));
+ reverseFarIdMap.entrySet().forEach(entry -> print(entry.toString()));
}
}
}
\ No newline at end of file
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
index 02923c2..21c6054 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/DistributedFabricUpfStore.java
@@ -18,15 +18,15 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
-import com.google.common.collect.Maps;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
import org.onlab.util.ImmutableByteSequence;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -36,7 +36,7 @@
import org.slf4j.LoggerFactory;
import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Distributed implementation of FabricUpfStore.
@@ -67,68 +67,67 @@
.put(9, 1)
.build();
- // Distributed local FAR ID to global FAR ID mapping
- protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap;
- private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener;
- // Local, reversed copy of farIdMapper for better reverse lookup performance
- protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap;
- private int nextGlobalFarId = 1;
+ // EC map to remember the mapping far_id -> rule_id this is mostly used during reads,
+ // it can be definitely removed by simplifying the logical pipeline
+ protected EventuallyConsistentMap<Integer, UpfRuleIdentifier> reverseFarIdMap;
@Activate
protected void activate() {
- // Allow unit test to inject farIdMap here.
+ // Allow unit test to inject reverseFarIdMap here.
if (storageService != null) {
- this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder()
+ this.reverseFarIdMap = storageService.<Integer, UpfRuleIdentifier>eventuallyConsistentMapBuilder()
.withName(FAR_ID_MAP_NAME)
- .withRelaxedReadConsistency()
- .withSerializer(Serializer.using(SERIALIZER.build()))
+ .withSerializer(SERIALIZER)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
}
- farIdMapListener = new FarIdMapListener();
- farIdMap.addListener(farIdMapListener);
-
- reverseFarIdMap = Maps.newHashMap();
- farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey()));
log.info("Started");
}
@Deactivate
protected void deactivate() {
- farIdMap.removeListener(farIdMapListener);
- farIdMap.destroy();
- reverseFarIdMap.clear();
+ reverseFarIdMap.destroy();
log.info("Stopped");
}
@Override
public void reset() {
- farIdMap.clear();
reverseFarIdMap.clear();
- nextGlobalFarId = 0;
}
@Override
- public Map<UpfRuleIdentifier, Integer> getFarIdMap() {
- return Map.copyOf(farIdMap.asJavaMap());
+ public Map<Integer, UpfRuleIdentifier> getReverseFarIdMap() {
+ return reverseFarIdMap.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
- int globalFarId = farIdMap.compute(farIdPair,
- (k, existingId) -> {
- return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++);
- }).value();
+ int globalFarId = getGlobalFarIdOf(farIdPair);
+ reverseFarIdMap.put(globalFarId, farIdPair);
log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
return globalFarId;
}
@Override
+ public int removeGlobalFarId(UpfRuleIdentifier farIdPair) {
+ int globalFarId = getGlobalFarIdOf(farIdPair);
+ reverseFarIdMap.remove(globalFarId);
+ return globalFarId;
+ }
+
+ @Override
public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
return globalFarIdOf(farId);
+ }
+ @Override
+ public int removeGlobalFarId(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
+ UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
+ return removeGlobalFarId(farId);
}
@Override
@@ -146,25 +145,14 @@
return reverseFarIdMap.get(globalFarId);
}
- // NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that
- // reverseFarIdMap update always finishes right after farIdMap is updated
- private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> {
- @Override
- public void event(MapEvent<UpfRuleIdentifier, Integer> event) {
- switch (event.type()) {
- case INSERT:
- reverseFarIdMap.put(event.newValue().value(), event.key());
- break;
- case UPDATE:
- reverseFarIdMap.remove(event.oldValue().value());
- reverseFarIdMap.put(event.newValue().value(), event.key());
- break;
- case REMOVE:
- reverseFarIdMap.remove(event.oldValue().value());
- break;
- default:
- break;
- }
- }
+ // Compute global far id by hashing the pfcp session id and the session local far
+ private int getGlobalFarIdOf(UpfRuleIdentifier farIdPair) {
+ HashFunction hashFunction = Hashing.murmur3_32();
+ HashCode hashCode = hashFunction.newHasher()
+ .putInt(farIdPair.getSessionLocalId())
+ .putBytes(farIdPair.getPfcpSessionId().asArray())
+ .hash();
+ return hashCode.asInt();
}
+
}
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
index 3735238..401b5db 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfProgrammable.java
@@ -554,7 +554,7 @@
log.info("Removing {}", far.toString());
PiCriterion match = PiCriterion.builder()
- .matchExact(HDR_FAR_ID, fabricUpfStore.globalFarIdOf(far.sessionId(), far.farId()))
+ .matchExact(HDR_FAR_ID, fabricUpfStore.removeGlobalFarId(far.sessionId(), far.farId()))
.build();
removeEntry(match, FABRIC_INGRESS_SPGW_FARS, false);
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
index 9b5d853..ae3b0e9 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfStore.java
@@ -30,11 +30,11 @@
void reset();
/**
- * Returns the farIdMap.
+ * Returns the reverseFarIdMap.
*
* @return the farIdMap.
*/
- Map<UpfRuleIdentifier, Integer> getFarIdMap();
+ Map<Integer, UpfRuleIdentifier> getReverseFarIdMap();
/**
* Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
@@ -46,6 +46,14 @@
int globalFarIdOf(UpfRuleIdentifier farIdPair);
/**
+ * Remove the global far id from the system.
+ *
+ * @param farIdPair a RuleIdentifier instance uniquely identifying the FAR
+ * @return A globally unique integer identifier
+ */
+ int removeGlobalFarId(UpfRuleIdentifier farIdPair);
+
+ /**
* Get a globally unique integer identifier for the FAR identified by the given (Session ID, Far
* ID) pair.
*
@@ -56,6 +64,15 @@
int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId);
/**
+ * Remove the global far id from the system.
+ *
+ * @param pfcpSessionId The ID of the PFCP session that produced the FAR ID.
+ * @param sessionLocalFarId The FAR ID.
+ * @return A globally unique integer identifier
+ */
+ int removeGlobalFarId(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId);
+
+ /**
* Get the corresponding PFCP session ID and session-local FAR ID from a globally unique FAR ID,
* or return null if no such mapping is found.
*
diff --git a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
index c8cce84..2341e04 100644
--- a/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
+++ b/pipelines/fabric/impl/src/main/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/FabricUpfTranslator.java
@@ -136,6 +136,9 @@
// Grab keys and parameters that are present for all PDRs
int globalFarId = FabricUpfTranslatorUtil.getParamInt(action, FAR_ID);
UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+ if (farId == null) {
+ throw new UpfProgrammableException(String.format("Unable to find local far id of %s", globalFarId));
+ }
PiActionId actionId = action.id();
if (actionId.equals(FABRIC_INGRESS_SPGW_LOAD_PDR)) {
@@ -186,6 +189,9 @@
int globalFarId = FabricUpfTranslatorUtil.getFieldInt(match, HDR_FAR_ID);
UpfRuleIdentifier farId = fabricUpfStore.localFarIdOf(globalFarId);
+ if (farId == null) {
+ throw new UpfProgrammableException(String.format("Unable to find local far id of %s", globalFarId));
+ }
boolean dropFlag = FabricUpfTranslatorUtil.getParamInt(action, DROP) > 0;
boolean notifyFlag = FabricUpfTranslatorUtil.getParamInt(action, NOTIFY_CP) > 0;
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
index cf09c02..3bf3b26 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestDistributedFabricUpfStore.java
@@ -16,8 +16,8 @@
package org.onosproject.pipelines.fabric.impl.behaviour.upf;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestConsistentMap;
+import org.onosproject.store.service.TestEventuallyConsistentMap;
+import org.onosproject.store.service.WallClockTimestamp;
import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.FAR_ID_MAP_NAME;
import static org.onosproject.pipelines.fabric.impl.behaviour.upf.DistributedFabricUpfStore.SERIALIZER;
@@ -30,22 +30,20 @@
public static DistributedFabricUpfStore build() {
var store = new DistributedFabricUpfStore();
- TestConsistentMap.Builder<UpfRuleIdentifier, Integer> farIdMapBuilder =
- TestConsistentMap.builder();
- farIdMapBuilder.withName(FAR_ID_MAP_NAME)
- .withRelaxedReadConsistency()
- .withSerializer(Serializer.using(SERIALIZER.build()));
- store.farIdMap = farIdMapBuilder.build();
+ TestEventuallyConsistentMap.Builder<Integer, UpfRuleIdentifier> reverseFarIdMapBuilder =
+ TestEventuallyConsistentMap.builder();
+ reverseFarIdMapBuilder.withName(FAR_ID_MAP_NAME)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .withSerializer(SERIALIZER.build());
+ store.reverseFarIdMap = reverseFarIdMapBuilder.build();
store.activate();
// Init with some translation state.
- store.farIdMap.put(
- new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID),
- TestUpfConstants.UPLINK_PHYSICAL_FAR_ID);
- store.farIdMap.put(
- new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID),
- TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID);
+ store.reverseFarIdMap.put(TestUpfConstants.UPLINK_PHYSICAL_FAR_ID,
+ new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.UPLINK_FAR_ID));
+ store.reverseFarIdMap.put(TestUpfConstants.DOWNLINK_PHYSICAL_FAR_ID,
+ new UpfRuleIdentifier(TestUpfConstants.SESSION_ID, TestUpfConstants.DOWNLINK_FAR_ID));
return store;
}
diff --git a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
index c24f382..6fee244 100644
--- a/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
+++ b/pipelines/fabric/impl/src/test/java/org/onosproject/pipelines/fabric/impl/behaviour/upf/TestUpfConstants.java
@@ -16,6 +16,7 @@
package org.onosproject.pipelines.fabric.impl.behaviour.upf;
+import com.google.common.hash.Hashing;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onlab.util.ImmutableByteSequence;
@@ -75,9 +76,19 @@
public static final int DOWNLINK_COUNTER_CELL_ID = 2;
public static final int PDR_ID = 0; // TODO: PDR ID currently not stored on writes, so all reads are 0
public static final int UPLINK_FAR_ID = 1;
- public static final int UPLINK_PHYSICAL_FAR_ID = 4;
+ public static final int UPLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+ .newHasher()
+ .putInt(UPLINK_FAR_ID)
+ .putBytes(SESSION_ID.asArray())
+ .hash()
+ .asInt();
public static final int DOWNLINK_FAR_ID = 2;
- public static final int DOWNLINK_PHYSICAL_FAR_ID = 5;
+ public static final int DOWNLINK_PHYSICAL_FAR_ID = Hashing.murmur3_32()
+ .newHasher()
+ .putInt(DOWNLINK_FAR_ID)
+ .putBytes(SESSION_ID.asArray())
+ .hash()
+ .asInt();
public static final int UPLINK_PRIORITY = 9;
public static final int DOWNLINK_PRIORITY = 1;