blob: 1f40f3b59c28340a6dbff8c31be57e9e185c3fe3 [file] [log] [blame]
Daniele Moro8d630f12021-06-15 20:53:22 +02001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.pipelines.fabric.impl.behaviour.upf;
18
19import com.google.common.collect.BiMap;
20import com.google.common.collect.ImmutableBiMap;
21import com.google.common.collect.Maps;
22import org.onlab.packet.Ip4Address;
23import org.onlab.util.ImmutableByteSequence;
24import org.onlab.util.KryoNamespace;
25import org.onosproject.net.behaviour.upf.PacketDetectionRule;
26import org.onosproject.store.serializers.KryoNamespaces;
27import org.onosproject.store.service.ConsistentMap;
28import org.onosproject.store.service.DistributedSet;
29import org.onosproject.store.service.MapEvent;
30import org.onosproject.store.service.MapEventListener;
31import org.onosproject.store.service.Serializer;
32import org.onosproject.store.service.StorageService;
33import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Component;
35import org.osgi.service.component.annotations.Deactivate;
36import org.osgi.service.component.annotations.Reference;
37import org.osgi.service.component.annotations.ReferenceCardinality;
38import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
41import java.util.HashSet;
42import java.util.Map;
43import java.util.Objects;
44import java.util.Set;
45
46import static com.google.common.base.Preconditions.checkNotNull;
47
48/**
49 * Distributed implementation of FabricUpfStore.
50 */
51// FIXME: this store is generic and not tied to a single device, should we have a store based on deviceId?
52@Component(immediate = true, service = FabricUpfStore.class)
53public final class DistributedFabricUpfStore implements FabricUpfStore {
54
55 private final Logger log = LoggerFactory.getLogger(getClass());
56
57 @Reference(cardinality = ReferenceCardinality.MANDATORY)
58 protected StorageService storageService;
59
60 protected static final String FAR_ID_MAP_NAME = "fabric-upf-far-id";
61 protected static final String BUFFER_FAR_ID_SET_NAME = "fabric-upf-buffer-far-id";
62 protected static final String FAR_ID_UE_MAP_NAME = "fabric-upf-far-id-ue";
63 protected static final KryoNamespace.Builder SERIALIZER = KryoNamespace.newBuilder()
64 .register(KryoNamespaces.API)
65 .register(UpfRuleIdentifier.class);
66
67 // TODO: check queue IDs for BMv2, is priority inverted?
68 // Mapping between scheduling priority ranges with BMv2 priority queues
69 private static final BiMap<Integer, Integer> SCHEDULING_PRIORITY_MAP
70 = new ImmutableBiMap.Builder<Integer, Integer>()
71 // Highest scheduling priority for 3GPP is 1 and highest BMv2 queue priority is 7
72 .put(1, 5)
73 .put(6, 4)
74 .put(7, 3)
75 .put(8, 2)
76 .put(9, 1)
77 .build();
78
79 // Distributed local FAR ID to global FAR ID mapping
80 protected ConsistentMap<UpfRuleIdentifier, Integer> farIdMap;
81 private MapEventListener<UpfRuleIdentifier, Integer> farIdMapListener;
82 // Local, reversed copy of farIdMapper for better reverse lookup performance
83 protected Map<Integer, UpfRuleIdentifier> reverseFarIdMap;
84 private int nextGlobalFarId = 1;
85
86 protected DistributedSet<UpfRuleIdentifier> bufferFarIds;
87 protected ConsistentMap<UpfRuleIdentifier, Set<Ip4Address>> farIdToUeAddrs;
88
89 @Activate
90 protected void activate() {
91 // Allow unit test to inject farIdMap here.
92 if (storageService != null) {
93 this.farIdMap = storageService.<UpfRuleIdentifier, Integer>consistentMapBuilder()
94 .withName(FAR_ID_MAP_NAME)
95 .withRelaxedReadConsistency()
96 .withSerializer(Serializer.using(SERIALIZER.build()))
97 .build();
98 this.bufferFarIds = storageService.<UpfRuleIdentifier>setBuilder()
99 .withName(BUFFER_FAR_ID_SET_NAME)
100 .withRelaxedReadConsistency()
101 .withSerializer(Serializer.using(SERIALIZER.build()))
102 .build().asDistributedSet();
103 this.farIdToUeAddrs = storageService.<UpfRuleIdentifier, Set<Ip4Address>>consistentMapBuilder()
104 .withName(FAR_ID_UE_MAP_NAME)
105 .withRelaxedReadConsistency()
106 .withSerializer(Serializer.using(SERIALIZER.build()))
107 .build();
108
109 }
110 farIdMapListener = new FarIdMapListener();
111 farIdMap.addListener(farIdMapListener);
112
113 reverseFarIdMap = Maps.newHashMap();
114 farIdMap.entrySet().forEach(entry -> reverseFarIdMap.put(entry.getValue().value(), entry.getKey()));
115
116 log.info("Started");
117 }
118
119 @Deactivate
120 protected void deactivate() {
121 farIdMap.removeListener(farIdMapListener);
122 farIdMap.destroy();
123 reverseFarIdMap.clear();
124
125 log.info("Stopped");
126 }
127
128 @Override
129 public void reset() {
130 farIdMap.clear();
131 reverseFarIdMap.clear();
132 bufferFarIds.clear();
133 farIdToUeAddrs.clear();
134 nextGlobalFarId = 0;
135 }
136
137 @Override
138 public Map<UpfRuleIdentifier, Integer> getFarIdMap() {
139 return Map.copyOf(farIdMap.asJavaMap());
140 }
141
142 @Override
143 public int globalFarIdOf(UpfRuleIdentifier farIdPair) {
144 int globalFarId = farIdMap.compute(farIdPair,
145 (k, existingId) -> {
146 return Objects.requireNonNullElseGet(existingId, () -> nextGlobalFarId++);
147 }).value();
148 log.info("{} translated to GlobalFarId={}", farIdPair, globalFarId);
149 return globalFarId;
150 }
151
152 @Override
153 public int globalFarIdOf(ImmutableByteSequence pfcpSessionId, int sessionLocalFarId) {
154 UpfRuleIdentifier farId = new UpfRuleIdentifier(pfcpSessionId, sessionLocalFarId);
155 return globalFarIdOf(farId);
156
157 }
158
159 @Override
160 public String queueIdOf(int schedulingPriority) {
161 return (SCHEDULING_PRIORITY_MAP.get(schedulingPriority)).toString();
162 }
163
164 @Override
165 public String schedulingPriorityOf(int queueId) {
166 return (SCHEDULING_PRIORITY_MAP.inverse().get(queueId)).toString();
167 }
168
169 @Override
170 public UpfRuleIdentifier localFarIdOf(int globalFarId) {
171 return reverseFarIdMap.get(globalFarId);
172 }
173
174 public void learnFarIdToUeAddrs(PacketDetectionRule pdr) {
175 UpfRuleIdentifier ruleId = UpfRuleIdentifier.of(pdr.sessionId(), pdr.farId());
176 farIdToUeAddrs.compute(ruleId, (k, set) -> {
177 if (set == null) {
178 set = new HashSet<>();
179 }
180 set.add(pdr.ueAddress());
181 return set;
182 });
183 }
184
185 @Override
186 public boolean isFarIdBuffering(UpfRuleIdentifier farId) {
187 checkNotNull(farId);
188 return bufferFarIds.contains(farId);
189 }
190
191 @Override
192 public void learBufferingFarId(UpfRuleIdentifier farId) {
193 checkNotNull(farId);
194 bufferFarIds.add(farId);
195 }
196
197 @Override
198 public void forgetBufferingFarId(UpfRuleIdentifier farId) {
199 checkNotNull(farId);
200 bufferFarIds.remove(farId);
201 }
202
203 @Override
204 public void forgetUeAddr(Ip4Address ueAddr) {
205 farIdToUeAddrs.keySet().forEach(
206 farId -> farIdToUeAddrs.computeIfPresent(farId, (farIdz, ueAddrs) -> {
207 ueAddrs.remove(ueAddr);
208 return ueAddrs;
209 }));
210 }
211
212 @Override
213 public Set<Ip4Address> ueAddrsOfFarId(UpfRuleIdentifier farId) {
214 return farIdToUeAddrs.getOrDefault(farId, Set.of()).value();
215 }
216
217 @Override
218 public Set<UpfRuleIdentifier> getBufferFarIds() {
219 return Set.copyOf(bufferFarIds);
220 }
221
222 @Override
223 public Map<UpfRuleIdentifier, Set<Ip4Address>> getFarIdToUeAddrs() {
224 return Map.copyOf(farIdToUeAddrs.asJavaMap());
225 }
226
227 // NOTE: FarIdMapListener is run on the same thread intentionally in order to ensure that
228 // reverseFarIdMap update always finishes right after farIdMap is updated
229 private class FarIdMapListener implements MapEventListener<UpfRuleIdentifier, Integer> {
230 @Override
231 public void event(MapEvent<UpfRuleIdentifier, Integer> event) {
232 switch (event.type()) {
233 case INSERT:
234 reverseFarIdMap.put(event.newValue().value(), event.key());
235 break;
236 case UPDATE:
237 reverseFarIdMap.remove(event.oldValue().value());
238 reverseFarIdMap.put(event.newValue().value(), event.key());
239 break;
240 case REMOVE:
241 reverseFarIdMap.remove(event.oldValue().value());
242 break;
243 default:
244 break;
245 }
246 }
247 }
248}