blob: 21c60540b46fff8b78a69c5f497f31eb8686eeec [file] [log] [blame]
Daniele Moro8d630f12021-06-15 20:53:22 +02001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.impl.behaviour.upf;
18
19import com.google.common.collect.BiMap;
20import com.google.common.collect.ImmutableBiMap;
pierventre1eb98712021-07-13 18:03:22 +020021import com.google.common.hash.HashCode;
22import com.google.common.hash.HashFunction;
23import com.google.common.hash.Hashing;
Daniele Moro8d630f12021-06-15 20:53:22 +020024import org.onlab.util.ImmutableByteSequence;
25import org.onlab.util.KryoNamespace;
Daniele Moro8d630f12021-06-15 20:53:22 +020026import org.onosproject.store.serializers.KryoNamespaces;
pierventre1eb98712021-07-13 18:03:22 +020027import org.onosproject.store.service.EventuallyConsistentMap;
Daniele Moro8d630f12021-06-15 20:53:22 +020028import org.onosproject.store.service.StorageService;
pierventre1eb98712021-07-13 18:03:22 +020029import org.onosproject.store.service.WallClockTimestamp;
Daniele Moro8d630f12021-06-15 20:53:22 +020030import org.osgi.service.component.annotations.Activate;
31import org.osgi.service.component.annotations.Component;
32import org.osgi.service.component.annotations.Deactivate;
33import org.osgi.service.component.annotations.Reference;
34import org.osgi.service.component.annotations.ReferenceCardinality;
35import org.slf4j.Logger;
36import org.slf4j.LoggerFactory;
37
Daniele Moro8d630f12021-06-15 20:53:22 +020038import java.util.Map;
pierventre1eb98712021-07-13 18:03:22 +020039import java.util.stream.Collectors;
Daniele Moro8d630f12021-06-15 20:53:22 +020040
41/**
42 * Distributed implementation of FabricUpfStore.
43 */
44// FIXME: this store is generic and not tied to a single device, should we have a store based on deviceId?
45@Component(immediate = true, service = FabricUpfStore.class)
46public final class DistributedFabricUpfStore implements FabricUpfStore {
47
48 private final Logger log = LoggerFactory.getLogger(getClass());
49
50 @Reference(cardinality = ReferenceCardinality.MANDATORY)
51 protected StorageService storageService;
52
53 protected static final String FAR_ID_MAP_NAME = "fabric-upf-far-id";
Daniele Moro8d630f12021-06-15 20:53:22 +020054 protected static final KryoNamespace.Builder SERIALIZER = KryoNamespace.newBuilder()
55 .register(KryoNamespaces.API)
56 .register(UpfRuleIdentifier.class);
57
58 // TODO: check queue IDs for BMv2, is priority inverted?
59 // Mapping between scheduling priority ranges with BMv2 priority queues
60 private static final BiMap<Integer, Integer> SCHEDULING_PRIORITY_MAP
61 = new ImmutableBiMap.Builder<Integer, Integer>()
62 // Highest scheduling priority for 3GPP is 1 and highest BMv2 queue priority is 7
63 .put(1, 5)
64 .put(6, 4)
65 .put(7, 3)
66 .put(8, 2)
67 .put(9, 1)
68 .build();
69
pierventre1eb98712021-07-13 18:03:22 +020070 // EC map to remember the mapping far_id -> rule_id this is mostly used during reads,
71 // it can be definitely removed by simplifying the logical pipeline
72 protected EventuallyConsistentMap<Integer, UpfRuleIdentifier> reverseFarIdMap;
Daniele Moro8d630f12021-06-15 20:53:22 +020073
Daniele Moro8d630f12021-06-15 20:53:22 +020074 @Activate
75 protected void activate() {
pierventre1eb98712021-07-13 18:03:22 +020076 // Allow unit test to inject reverseFarIdMap here.
Daniele Moro8d630f12021-06-15 20:53:22 +020077 if (storageService != null) {
pierventre1eb98712021-07-13 18:03:22 +020078 this.reverseFarIdMap = storageService.<Integer, UpfRuleIdentifier>eventuallyConsistentMapBuilder()
Daniele Moro8d630f12021-06-15 20:53:22 +020079 .withName(FAR_ID_MAP_NAME)
pierventre1eb98712021-07-13 18:03:22 +020080 .withSerializer(SERIALIZER)
81 .withTimestampProvider((k, v) -> new WallClockTimestamp())
Daniele Moro8d630f12021-06-15 20:53:22 +020082 .build();
Daniele Moro8d630f12021-06-15 20:53:22 +020083 }
Daniele Moro8d630f12021-06-15 20:53:22 +020084
85 log.info("Started");
86 }
87
88 @Deactivate
89 protected void deactivate() {
pierventre1eb98712021-07-13 18:03:22 +020090 reverseFarIdMap.destroy();
Daniele Moro8d630f12021-06-15 20:53:22 +020091
92 log.info("Stopped");
93 }
94
95 @Override
96 public void reset() {
Daniele Moro8d630f12021-06-15 20:53:22 +020097 reverseFarIdMap.clear();
Daniele Moro8d630f12021-06-15 20:53:22 +020098 }
99
100 @Override
pierventre1eb98712021-07-13 18:03:22 +0200101 public Map<Integer, UpfRuleIdentifier> getReverseFarIdMap() {
102 return reverseFarIdMap.entrySet().stream()
103 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Daniele Moro8d630f12021-06-15 20:53:22 +0200104 }
105
106 @Override
107 public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
pierventre1eb98712021-07-13 18:03:22 +0200108 int globalFarId = getGlobalFarIdOf(farIdPair);
109 reverseFarIdMap.put(globalFarId, farIdPair);
Daniele Moro8d630f12021-06-15 20:53:22 +0200110 log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
111 return globalFarId;
112 }
113
114 @Override
pierventre1eb98712021-07-13 18:03:22 +0200115 public int removeGlobalFarId(UpfRuleIdentifier farIdPair) {
116 int globalFarId = getGlobalFarIdOf(farIdPair);
117 reverseFarIdMap.remove(globalFarId);
118 return globalFarId;
119 }
120
121 @Override
Daniele Moro8d630f12021-06-15 20:53:22 +0200122 public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
123 UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
124 return globalFarIdOf(farId);
pierventre1eb98712021-07-13 18:03:22 +0200125 }
Daniele Moro8d630f12021-06-15 20:53:22 +0200126
pierventre1eb98712021-07-13 18:03:22 +0200127 @Override
128 public int removeGlobalFarId(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
129 UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
130 return removeGlobalFarId(farId);
Daniele Moro8d630f12021-06-15 20:53:22 +0200131 }
132
133 @Override
134 public String queueIdOf(int schedulingPriority) {
135 return (SCHEDULING_PRIORITY_MAP.get(schedulingPriority)).toString();
136 }
137
138 @Override
139 public String schedulingPriorityOf(int queueId) {
140 return (SCHEDULING_PRIORITY_MAP.inverse().get(queueId)).toString();
141 }
142
143 @Override
144 public UpfRuleIdentifier localFarIdOf(int globalFarId) {
145 return reverseFarIdMap.get(globalFarId);
146 }
147
pierventre1eb98712021-07-13 18:03:22 +0200148 // Compute global far id by hashing the pfcp session id and the session local far
149 private int getGlobalFarIdOf(UpfRuleIdentifier farIdPair) {
150 HashFunction hashFunction = Hashing.murmur3_32();
151 HashCode hashCode = hashFunction.newHasher()
152 .putInt(farIdPair.getSessionLocalId())
153 .putBytes(farIdPair.getPfcpSessionId().asArray())
154 .hash();
155 return hashCode.asInt();
Daniele Moro8d630f12021-06-15 20:53:22 +0200156 }
pierventre1eb98712021-07-13 18:03:22 +0200157
Daniele Moro8d630f12021-06-15 20:53:22 +0200158}