blob: 6bc25222a591c7abd646467a242ca58cba0e87a2 [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
Wailok Shume2ab3d62021-05-27 04:40:38 +080023
pierventre30368ab2021-02-24 23:23:22 +010024import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010025import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010026import org.onlab.util.KryoNamespace;
27import org.onlab.util.PredictableExecutor;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080030import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010031import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010033import org.onosproject.net.ConnectPoint;
Wailok Shume2ab3d62021-05-27 04:40:38 +080034import org.onosproject.net.config.NetworkConfigEvent;
35import org.onosproject.net.config.NetworkConfigListener;
36import org.onosproject.net.config.NetworkConfigRegistry;
37import org.onosproject.net.device.DeviceService;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010039import org.onosproject.net.Link;
40import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010042import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010043import org.onosproject.net.flow.TrafficTreatment;
44import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010045import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010046import org.onosproject.net.flowobjective.DefaultObjectiveContext;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010049import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010050import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveContext;
52import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010053import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010054import org.onosproject.segmentrouting.policy.api.DropPolicy;
55import org.onosproject.segmentrouting.policy.api.Policy;
pierventre30368ab2021-02-24 23:23:22 +010056import org.onosproject.segmentrouting.policy.api.PolicyData;
57import org.onosproject.segmentrouting.policy.api.PolicyId;
58import org.onosproject.segmentrouting.policy.api.PolicyService;
59import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010060import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010061import org.onosproject.segmentrouting.policy.api.TrafficMatch;
62import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
63import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
Wailok Shum37dd29a2021-04-27 18:13:55 +080064import org.onosproject.segmentrouting.policy.api.TrafficMatchPriority;
pierventre30368ab2021-02-24 23:23:22 +010065import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
pierventreb37a11a2021-03-18 16:50:04 +010066import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Wailok Shume2ab3d62021-05-27 04:40:38 +080067import org.onosproject.segmentrouting.config.SegmentRoutingDeviceConfig;
pierventreb37a11a2021-03-18 16:50:04 +010068import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
pierventre30368ab2021-02-24 23:23:22 +010069import org.onosproject.store.serializers.KryoNamespaces;
70import org.onosproject.store.service.ConsistentMap;
71import org.onosproject.store.service.MapEvent;
72import org.onosproject.store.service.MapEventListener;
73import org.onosproject.store.service.Serializer;
74import org.onosproject.store.service.StorageException;
75import org.onosproject.store.service.StorageService;
76import org.onosproject.store.service.Versioned;
77import org.osgi.service.component.annotations.Activate;
78import org.osgi.service.component.annotations.Component;
79import org.osgi.service.component.annotations.Deactivate;
80import org.osgi.service.component.annotations.Reference;
81import org.osgi.service.component.annotations.ReferenceCardinality;
82import org.slf4j.Logger;
83
Wailok Shume2ab3d62021-05-27 04:40:38 +080084import java.util.ArrayList;
pierventre30368ab2021-02-24 23:23:22 +010085import java.util.List;
86import java.util.Map;
87import java.util.Optional;
88import java.util.Set;
89import java.util.concurrent.CompletableFuture;
Wailok Shume2ab3d62021-05-27 04:40:38 +080090import java.util.concurrent.ExecutorService;
91import java.util.concurrent.Executors;
pierventre30368ab2021-02-24 23:23:22 +010092import java.util.stream.Collectors;
93
94import static org.onlab.util.Tools.groupedThreads;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Implementation of the policy service interface.
99 */
100@Component(immediate = true, service = PolicyService.class)
101public class PolicyManager implements PolicyService {
102
103 // App related things
104 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
105 private ApplicationId appId;
106 private Logger log = getLogger(getClass());
107 static final String KEY_SEPARATOR = "|";
108
pierventre4f68ffa2021-03-09 22:52:14 +0100109 // Supported policies
pierventreb37a11a2021-03-18 16:50:04 +0100110 private static final Set<PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
pierventre4f68ffa2021-03-09 22:52:14 +0100111 PolicyType.DROP, PolicyType.REDIRECT);
112
pierventre12aaa052021-03-22 12:56:03 +0100113 // Driver should use this meta to match ig_port_type field in the ACL table
pierventre4f68ffa2021-03-09 22:52:14 +0100114 private static final long EDGE_PORT = 1;
pierventre4f68ffa2021-03-09 22:52:14 +0100115
pierventre30368ab2021-02-24 23:23:22 +0100116 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
117 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
118 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100119 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100120 private static final String POLICY_STORE = "sr-policy-store";
121 private ConsistentMap<PolicyId, PolicyRequest> policies;
122 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
123 private Map<PolicyId, PolicyRequest> policiesMap;
124
125 private static final String OPS_STORE = "sr-ops-store";
126 private ConsistentMap<String, Operation> operations;
127 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
128 private Map<String, Operation> opsMap;
129
130 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
131 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
132 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
133 new InternalTMatchMapEventListener();
134 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
135
136 // Leadership related objects - consistent hashing
137 private static final HashFunction HASH_FN = Hashing.md5();
138 // Read only cache of the Policy leader
139 private Map<PolicyId, NodeId> policyLeaderCache;
140
141 // Worker threads for policy and traffic match related ops
142 private static final int DEFAULT_THREADS = 4;
143 protected PredictableExecutor workers;
144
145 // Serializers and ONOS services
146 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
147 .register(KryoNamespaces.API)
148 .register(PolicyId.class)
149 .register(PolicyType.class)
150 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100151 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100152 .register(PolicyState.class)
153 .register(PolicyRequest.class)
154 .register(TrafficMatchId.class)
155 .register(TrafficMatchState.class)
Wailok Shum37dd29a2021-04-27 18:13:55 +0800156 .register(TrafficMatchPriority.class)
pierventre30368ab2021-02-24 23:23:22 +0100157 .register(TrafficMatch.class)
158 .register(TrafficMatchRequest.class)
159 .register(Operation.class);
160 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 private CoreService coreService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800166 private CodecService codecService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100169 private StorageService storageService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
172 private ClusterService clusterService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100175 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100176
Wailok Shumee90c132021-03-11 21:00:11 +0800177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100178 private FlowObjectiveService flowObjectiveService;
179
180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
181 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100182
Wailok Shume2ab3d62021-05-27 04:40:38 +0800183 @Reference(cardinality = ReferenceCardinality.MANDATORY)
184 private NetworkConfigRegistry cfgService;
185
186 @Reference(cardinality = ReferenceCardinality.MANDATORY)
187 private DeviceService deviceService;
188
189 // Netcfg listener
190 private final InternalConfigListener cfgListener = new InternalConfigListener();
191 // EventExecutor for netcfg event
192 private ExecutorService eventExecutor;
193
pierventre30368ab2021-02-24 23:23:22 +0100194 @Activate
195 public void activate() {
196 appId = coreService.registerApplication(APP_NAME);
197
Wailok Shumee90c132021-03-11 21:00:11 +0800198 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
199 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
200 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
201
Wailok Shume2ab3d62021-05-27 04:40:38 +0800202 cfgService.addListener(cfgListener);
203
pierventre30368ab2021-02-24 23:23:22 +0100204 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
205 .withName(POLICY_STORE)
206 .withSerializer(serializer).build();
207 policies.addListener(mapPolListener);
208 policiesMap = policies.asJavaMap();
209
210 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
211 .withName(TRAFFIC_MATCH_STORE)
212 .withSerializer(serializer).build();
213 trafficMatches.addListener(mapTMatchListener);
214 trafficMatchesMap = trafficMatches.asJavaMap();
215
216 operations = storageService.<String, Operation>consistentMapBuilder()
217 .withName(OPS_STORE)
218 .withSerializer(serializer).build();
219 operations.addListener(mapOpsListener);
220 opsMap = operations.asJavaMap();
221
222 policyLeaderCache = Maps.newConcurrentMap();
223
224 workers = new PredictableExecutor(DEFAULT_THREADS,
225 groupedThreads("sr-policy", "worker-%d", log));
226
Wailok Shume2ab3d62021-05-27 04:40:38 +0800227 eventExecutor = Executors.newSingleThreadExecutor();
228
pierventre30368ab2021-02-24 23:23:22 +0100229 log.info("Started");
230 }
231
232 @Deactivate
233 public void deactivate() {
234 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800235 codecService.unregisterCodec(DropPolicy.class);
236 codecService.unregisterCodec(RedirectPolicy.class);
237 codecService.unregisterCodec(TrafficMatch.class);
Wailok Shume2ab3d62021-05-27 04:40:38 +0800238 cfgService.removeListener(cfgListener);
pierventre30368ab2021-02-24 23:23:22 +0100239 policies.removeListener(mapPolListener);
240 policies.destroy();
241 policiesMap.clear();
242 trafficMatches.removeListener(mapTMatchListener);
243 trafficMatches.destroy();
244 trafficMatchesMap.clear();
245 operations.removeListener(mapOpsListener);
246 operations.destroy();
247 operations.clear();
248 workers.shutdown();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800249 eventExecutor.shutdown();
pierventre30368ab2021-02-24 23:23:22 +0100250
251 log.info("Stopped");
252 }
253
254 @Override
255 //FIXME update does not work well
256 public PolicyId addOrUpdatePolicy(Policy policy) {
257 PolicyId policyId = policy.policyId();
258 try {
259 policies.put(policyId, new PolicyRequest(policy));
260 } catch (StorageException e) {
261 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
262 e.getMessage(), e);
263 policyId = null;
264 }
265 return policyId;
266 }
267
268 @Override
269 public boolean removePolicy(PolicyId policyId) {
270 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100271 if (dependingTrafficMatches(policyId).isPresent()) {
272 if (log.isDebugEnabled()) {
273 log.debug("Found depending traffic matches");
274 }
275 return false;
276 }
pierventre30368ab2021-02-24 23:23:22 +0100277 try {
278 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
279 if (v.policyState() != PolicyState.PENDING_REMOVE) {
280 v.policyState(PolicyState.PENDING_REMOVE);
281 }
282 return v;
283 })) != null;
284 } catch (StorageException e) {
285 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
286 e.getMessage(), e);
287 result = false;
288 }
289 return result;
290 }
291
292 @Override
293 public Set<PolicyData> policies(Set<PolicyType> filter) {
294 Set<PolicyData> policyData = Sets.newHashSet();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800295 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100296 Set<PolicyRequest> policyRequests;
297 if (filter.isEmpty()) {
298 policyRequests = ImmutableSet.copyOf(policiesMap.values());
299 } else {
300 policyRequests = policiesMap.values().stream()
301 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
302 .collect(Collectors.toSet());
303 }
304 PolicyKey policyKey;
305 List<String> ops;
306 for (PolicyRequest policyRequest : policyRequests) {
307 ops = Lists.newArrayList();
308 for (DeviceId deviceId : edgeDeviceIds) {
309 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
310 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
311 if (operation != null) {
312 ops.add(deviceId + " -> " + operation.toStringMinimal());
313 }
314 }
315 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
316 }
317 return policyData;
318 }
319
320 @Override
321 //FIXME update does not work well
322 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
323 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
324 try {
325 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
326 } catch (StorageException e) {
327 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
328 e.getMessage(), e);
329 trafficMatchId = null;
330 }
331 return trafficMatchId;
332 }
333
334 @Override
335 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
336 boolean result;
337 try {
338 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
339 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
340 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
341 }
342 return v;
343 })) != null;
344 } catch (StorageException e) {
345 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
346 e.getMessage(), e);
347 result = false;
348 }
349 return result;
350 }
351
352 @Override
353 public Set<TrafficMatchData> trafficMatches() {
354 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800355 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100356 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
357 TrafficMatchKey trafficMatchKey;
358 List<String> ops;
359 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
360 ops = Lists.newArrayList();
361 for (DeviceId deviceId : edgeDeviceIds) {
362 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
363 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
364 if (operation != null) {
365 ops.add(deviceId + " -> " + operation.toStringMinimal());
366 }
367 }
368 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
369 trafficMatchRequest.trafficMatch(), ops));
370 }
371 return trafficMatchData;
372 }
373
374 // Install/remove the policies on the edge devices
375 private void sendPolicy(Policy policy, boolean install) {
376 if (!isLeader(policy.policyId())) {
377 if (log.isDebugEnabled()) {
378 log.debug("Instance is not leader for policy {}", policy.policyId());
379 }
380 return;
381 }
382 // We know that we are the leader, offloads to the workers the remaining
383 // part: issue fobj installation/removal and update the maps
Wailok Shume2ab3d62021-05-27 04:40:38 +0800384 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100385 for (DeviceId deviceId : edgeDeviceIds) {
386 workers.execute(() -> {
387 if (install) {
388 installPolicyInDevice(deviceId, policy);
389 } else {
390 removePolicyInDevice(deviceId, policy);
391 }
392 }, deviceId.hashCode());
393 }
394 }
395
396 // Orchestrate policy installation according to the type
397 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100398 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100399 log.warn("Policy {} type {} not yet supported",
400 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100401 return;
402 }
403 PolicyKey policyKey;
404 Operation.Builder operation;
405 if (log.isDebugEnabled()) {
406 log.debug("Installing {} policy {} for dev: {}",
407 policy.policyType(), policy.policyId(), deviceId);
408 }
409 policyKey = new PolicyKey(deviceId, policy.policyId());
Wailok Shume2ab3d62021-05-27 04:40:38 +0800410 // Prevent duplicate installation of the policies. With the current
411 // implementation is not possible to update a policy since a change
412 // in the params will create a new policy. Thus, there is no need to
413 // check the equality like we do for the TM
414 Operation oldPolicyOp = Versioned.valueOrNull(operations.get(policyKey.toString()));
415 if (oldPolicyOp != null && oldPolicyOp.isInstall()) {
416 if (log.isDebugEnabled()) {
417 log.debug("There is already an install operation for policy {}", policy.policyId());
418 }
Wailok Shum4df59312021-06-08 21:44:00 +0800419 // If we add or submit a policy multiple times
420 // We skip the installation update the policy state directly
421 updatePolicy(policy.policyId(), true);
Wailok Shume2ab3d62021-05-27 04:40:38 +0800422 return;
423 }
pierventre4f68ffa2021-03-09 22:52:14 +0100424 operation = Operation.builder()
425 .isInstall(true)
426 .policy(policy);
427 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
428 if (policy.policyType() == PolicyType.DROP) {
429 // DROP policies do not need the next objective installation phase
430 // we can update directly the map and signal the ops as done
431 operation.isDone(true);
Wailok Shum4df59312021-06-08 21:44:00 +0800432 operations.put(policyKey.toString(), operation.build());
pierventre4f68ffa2021-03-09 22:52:14 +0100433 } else if (policy.policyType() == PolicyType.REDIRECT) {
434 // REDIRECT Uses next objective context to update the ops as done when
435 // it returns successfully. In the other cases leaves the ops as undone
436 // and the relative policy will remain in pending.
437 operations.put(policyKey.toString(), operation.build());
438 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
439 // Handle error here - leave the operation as undone and pending
440 if (builder != null) {
441 CompletableFuture<Objective> future = new CompletableFuture<>();
442 if (log.isDebugEnabled()) {
443 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
444 }
445 ObjectiveContext context = new DefaultObjectiveContext(
446 (objective) -> {
447 if (log.isDebugEnabled()) {
448 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
449 policy.policyId(), deviceId);
450 }
451 future.complete(objective);
452 },
453 (objective, error) -> {
454 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
455 policy.policyId(), error, deviceId);
456 future.complete(null);
457 });
458 // Context is not serializable
459 NextObjective serializableObjective = builder.add();
460 flowObjectiveService.next(deviceId, builder.add(context));
461 future.whenComplete((objective, ex) -> {
462 if (ex != null) {
463 log.error("Exception installing REDIRECT next objective", ex);
464 } else if (objective != null) {
465 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
466 if (!v.isDone() && v.isInstall()) {
467 v.isDone(true);
468 v.objectiveOperation(serializableObjective);
469 }
470 return v;
471 });
472 }
473 });
474 }
pierventre30368ab2021-02-24 23:23:22 +0100475 }
476 }
477
478 // Remove policy in a device according to the type
479 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100480 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
481 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
482 // Policy might be still in pending or not present anymore
483 if (operation == null || operation.objectiveOperation() == null) {
484 log.warn("There are no ops associated with {}", policyKey);
485 operation = Operation.builder()
486 .isDone(true)
487 .isInstall(false)
488 .policy(policy)
489 .build();
490 operations.put(policyKey.toString(), operation);
491 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100492 if (log.isDebugEnabled()) {
493 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
494 }
495 Operation.Builder operationBuilder = Operation.builder()
496 .isInstall(false)
497 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100498 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100499 operationBuilder.isDone(true);
500 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100501 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100502 // REDIRECT has to remove the next objective first
503 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
504 operations.put(policyKey.toString(), operationBuilder.build());
505 NextObjective.Builder builder = oldObj.copy();
506 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100507 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100508 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100509 }
pierventre4f68ffa2021-03-09 22:52:14 +0100510 ObjectiveContext context = new DefaultObjectiveContext(
511 (objective) -> {
512 if (log.isDebugEnabled()) {
513 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
514 policy.policyId(), deviceId);
515 }
516 future.complete(objective);
517 },
518 (objective, error) -> {
519 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
520 policy.policyId(), error, deviceId);
521 future.complete(null);
522 });
523 NextObjective serializableObjective = builder.remove();
524 flowObjectiveService.next(deviceId, builder.remove(context));
525 future.whenComplete((objective, ex) -> {
526 if (ex != null) {
527 log.error("Exception Removing REDIRECT next objective", ex);
528 } else if (objective != null) {
529 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
530 if (!v.isDone() && !v.isInstall()) {
531 v.isDone(true);
532 v.objectiveOperation(serializableObjective);
533 }
534 return v;
535 });
536 }
537 });
pierventre30368ab2021-02-24 23:23:22 +0100538 }
539 }
540 }
541
542 // Updates policy status if all the pending ops are done
543 private void updatePolicy(PolicyId policyId, boolean install) {
544 if (!isLeader(policyId)) {
545 if (log.isDebugEnabled()) {
546 log.debug("Instance is not leader for policy {}", policyId);
547 }
548 return;
549 }
550 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
551 }
552
553 private void updatePolicyInternal(PolicyId policyId, boolean install) {
554 // If there are no more pending ops we are ready to go; potentially we can check
555 // if the id is contained. Updates policies only if they are still present
556 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
557 .filter(entry -> entry.getValue().value().policy().isPresent())
558 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
559 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
560 .findFirst();
561 if (notYetDone.isEmpty()) {
562 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
563 if (v.policyState() == PolicyState.PENDING_ADD && install) {
564 if (log.isDebugEnabled()) {
565 log.debug("Policy {} is ready", policyId);
566 }
567 v.policyState(PolicyState.ADDED);
568 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
569 if (log.isDebugEnabled()) {
570 log.debug("Policy {} is removed", policyId);
571 }
572 v = null;
573 }
574 return v;
575 }));
576 // Greedy check for pending traffic matches
577 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
578 updatePendingTrafficMatches(policyRequest.policyId());
579 }
580 }
581 }
582
583 // Install/remove the traffic match on the edge devices
584 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
585 if (!isLeader(trafficMatch.policyId())) {
586 if (log.isDebugEnabled()) {
587 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
588 }
589 return;
590 }
591 // We know that we are the leader, offloads to the workers the remaining
592 // part: issue fobj installation/removal and update the maps
Wailok Shume2ab3d62021-05-27 04:40:38 +0800593 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100594 for (DeviceId deviceId : edgeDeviceIds) {
595 workers.execute(() -> {
596 if (install) {
597 installTrafficMatchToDevice(deviceId, trafficMatch);
598 } else {
599 removeTrafficMatchInDevice(deviceId, trafficMatch);
600 }
601 }, deviceId.hashCode());
602 }
603 }
604
605 // Orchestrate traffic match installation according to the type
606 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
607 if (log.isDebugEnabled()) {
608 log.debug("Installing traffic match {} associated to policy {}",
609 trafficMatch.trafficMatchId(), trafficMatch.policyId());
610 }
pierventre30368ab2021-02-24 23:23:22 +0100611 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
Wailok Shum6f71cd92021-05-06 20:05:29 +0800612 Operation oldTrafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
613 if (oldTrafficOperation != null && oldTrafficOperation.isInstall()) {
614 if (trafficMatch.equals(oldTrafficOperation.trafficMatch().orElse(null))) {
615 if (log.isDebugEnabled()) {
616 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
617 "for device {}",
618 trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
619 }
Wailok Shum4df59312021-06-08 21:44:00 +0800620 // If we add or submit a trafficMatch multiple times
621 // We skip the installation and update the state directly
622 updateTrafficMatch(trafficMatch, true);
Wailok Shum6f71cd92021-05-06 20:05:29 +0800623 return;
624 } else {
625 if (log.isDebugEnabled()) {
626 log.debug("Starts updating traffic match {} associated to policy {} " +
627 "for device {}",
628 trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
629 }
pierventre4f68ffa2021-03-09 22:52:14 +0100630 }
pierventre4f68ffa2021-03-09 22:52:14 +0100631 }
pierventre30368ab2021-02-24 23:23:22 +0100632 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
633 // policies require to retrieve the next Id and sets the next step.
634 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
635 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
636 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100637 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
638 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
639 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100640 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
641 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
642 return;
643 }
pierventre4f68ffa2021-03-09 22:52:14 +0100644 // Updates the store and then send the versatile fwd objective to the pipeliner
Wailok Shum6f71cd92021-05-06 20:05:29 +0800645 Operation newTrafficOperation = Operation.builder()
pierventre4f68ffa2021-03-09 22:52:14 +0100646 .isInstall(true)
647 .trafficMatch(trafficMatch)
648 .build();
Wailok Shum6f71cd92021-05-06 20:05:29 +0800649 operations.put(trafficMatchKey.toString(), newTrafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100650 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100651 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100652 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100653 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100654 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
655 .wipeDeferred()
656 .build();
657 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100658 } else if (policy.policyType() == PolicyType.REDIRECT) {
659
660 // Here we need to set only the next step
661 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100662 }
pierventre4f68ffa2021-03-09 22:52:14 +0100663 // Once, the fwd objective has completed its execution, we update the policiesOps map
Wailok Shum6f71cd92021-05-06 20:05:29 +0800664 CompletableFuture<Objective> addNewFuture = new CompletableFuture<>();
665 CompletableFuture<Objective> removeOldFuture = new CompletableFuture<>();
pierventre4f68ffa2021-03-09 22:52:14 +0100666 if (log.isDebugEnabled()) {
667 log.debug("Installing forwarding objective for dev: {}", deviceId);
668 }
Wailok Shum6f71cd92021-05-06 20:05:29 +0800669 ObjectiveContext addNewContext = new DefaultObjectiveContext(
pierventre4f68ffa2021-03-09 22:52:14 +0100670 (objective) -> {
671 if (log.isDebugEnabled()) {
672 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
673 }
Wailok Shum6f71cd92021-05-06 20:05:29 +0800674 addNewFuture.complete(objective);
pierventre4f68ffa2021-03-09 22:52:14 +0100675 },
676 (objective, error) -> {
677 log.warn("Failed to install forwarding objective for policy {}: {}",
678 trafficMatch.policyId(), error);
Wailok Shum6f71cd92021-05-06 20:05:29 +0800679 addNewFuture.complete(null);
pierventre4f68ffa2021-03-09 22:52:14 +0100680 });
Wailok Shum6f71cd92021-05-06 20:05:29 +0800681 ObjectiveContext removeOldContext = new DefaultObjectiveContext(
682 (objective) -> {
683 if (log.isDebugEnabled()) {
684 log.debug("Old forwarding objective for policy {} removed, update finished",
685 trafficMatch.policyId());
686 }
687 removeOldFuture.complete(objective);
688 },
689 (objective, error) -> {
690 log.warn("Failed to remove old forwarding objective for policy {}: {}",
691 trafficMatch.policyId(), error);
692 removeOldFuture.complete(null);
693 });
pierventre4f68ffa2021-03-09 22:52:14 +0100694 // Context is not serializable
695 ForwardingObjective serializableObjective = builder.add();
Wailok Shum6f71cd92021-05-06 20:05:29 +0800696 flowObjectiveService.forward(deviceId, builder.add(addNewContext));
697 addNewFuture.whenComplete((objective, ex) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100698 if (ex != null) {
699 log.error("Exception installing forwarding objective", ex);
700 } else if (objective != null) {
Wailok Shum4df59312021-06-08 21:44:00 +0800701 // Remove existing flow after new flow is installed
702 // base on priority, selector and metadata change
Wailok Shum6f71cd92021-05-06 20:05:29 +0800703 if (oldTrafficOperation != null && oldTrafficOperation.objectiveOperation() != null
704 && oldTrafficOperation.isInstall()
Wailok Shum4df59312021-06-08 21:44:00 +0800705 && (oldTrafficOperation.objectiveOperation().priority() != serializableObjective.priority()
706 || !((ForwardingObjective) oldTrafficOperation.objectiveOperation()).selector()
707 .equals(serializableObjective.selector())
708 || !((ForwardingObjective) oldTrafficOperation.objectiveOperation()).meta()
709 .equals(serializableObjective.meta()))) {
Wailok Shum6f71cd92021-05-06 20:05:29 +0800710 ForwardingObjective oldFwdObj = (ForwardingObjective) oldTrafficOperation.objectiveOperation();
711 ForwardingObjective.Builder oldBuilder = DefaultForwardingObjective.builder(oldFwdObj);
712 flowObjectiveService.forward(deviceId, oldBuilder.remove(removeOldContext));
713 } else {
714 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
715 if (!v.isDone() && v.isInstall()) {
716 v.isDone(true);
717 v.objectiveOperation(serializableObjective);
718 }
719 return v;
720 });
721 }
722 }
723 });
724 removeOldFuture.whenComplete((objective, ex) -> {
725 if (ex != null) {
726 log.error("Exception removing old forwarding objective", ex);
727 } else if (objective != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100728 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
729 if (!v.isDone() && v.isInstall()) {
730 v.isDone(true);
731 v.objectiveOperation(serializableObjective);
732 }
733 return v;
734 });
735 }
736 });
pierventre30368ab2021-02-24 23:23:22 +0100737 }
738
739 // Updates traffic match status if all the pending ops are done
740 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
741 if (!isLeader(trafficMatch.policyId())) {
742 if (log.isDebugEnabled()) {
743 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
744 }
745 return;
746 }
747 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
748 trafficMatch.policyId().hashCode());
749 }
750
751 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
752 // If there are no more pending ops we are ready to go; potentially we can check
753 // if the id is contained. Updates traffic matches only if they are still present
754 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
755 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
756 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
757 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
758 .findFirst();
759 if (notYetDone.isEmpty()) {
760 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
761 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
762 if (log.isDebugEnabled()) {
763 log.debug("Traffic match {} is ready", trafficMatchId);
764 }
765 v.trafficMatchState(TrafficMatchState.ADDED);
766 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
767 if (log.isDebugEnabled()) {
768 log.debug("Traffic match {} is removed", trafficMatchId);
769 }
770 v = null;
771 }
772 return v;
773 });
774 }
775 }
776
777 // Look for any pending traffic match waiting for the policy
778 private void updatePendingTrafficMatches(PolicyId policyId) {
779 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
780 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
781 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
782 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
783 .collect(Collectors.toSet());
784 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
785 sendTrafficMatch(trafficMatch.trafficMatch(), true);
786 }
787 }
788
789 // Traffic match removal in a device
790 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
791 if (log.isDebugEnabled()) {
792 log.debug("Removing traffic match {} associated to policy {}",
793 trafficMatch.trafficMatchId(), trafficMatch.policyId());
794 }
795 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
796 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
797 if (operation == null || operation.objectiveOperation() == null) {
798 log.warn("There are no ops associated with {}", trafficMatchKey);
799 operation = Operation.builder()
800 .isDone(true)
801 .isInstall(false)
802 .trafficMatch(trafficMatch)
803 .build();
804 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100805 } else if (!operation.isInstall()) {
806 if (log.isDebugEnabled()) {
807 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
808 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
809 }
pierventre30368ab2021-02-24 23:23:22 +0100810 } else {
811 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
812 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100813 .isInstall(false)
814 .build();
815 operations.put(trafficMatchKey.toString(), operation);
816 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
817 CompletableFuture<Objective> future = new CompletableFuture<>();
818 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100819 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100820 }
821 ObjectiveContext context = new DefaultObjectiveContext(
822 (objective) -> {
823 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100824 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100825 }
826 future.complete(objective);
827 },
828 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100829 log.warn("Failed to remove forwarding objective for policy {}: {}",
830 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100831 future.complete(null);
832 });
833 ForwardingObjective serializableObjective = builder.remove();
834 flowObjectiveService.forward(deviceId, builder.remove(context));
835 future.whenComplete((objective, ex) -> {
836 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100837 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100838 } else if (objective != null) {
839 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
840 if (!v.isDone() && !v.isInstall()) {
841 v.isDone(true);
842 v.objectiveOperation(serializableObjective);
843 }
844 return v;
845 });
846 }
847 });
848 }
849 }
850
pierventre4f68ffa2021-03-09 22:52:14 +0100851 // It is used when a policy has been removed but there are still traffic matches depending on it
852 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
853 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100854 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
855 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
856 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100857 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100858 }
859
860 // Utility that removes operations related to a policy or to a traffic match.
861 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
862 if (!isLeader(policyId)) {
863 if (log.isDebugEnabled()) {
864 log.debug("Instance is not leader for policy {}", policyId);
865 }
866 return;
867 }
Wailok Shume2ab3d62021-05-27 04:40:38 +0800868 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100869 for (DeviceId deviceId : edgeDeviceIds) {
870 workers.execute(() -> {
871 String key;
872 if (trafficMatchId.isPresent()) {
873 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
874 } else {
875 key = new PolicyKey(deviceId, policyId).toString();
876 }
877 operations.remove(key);
878 }, deviceId.hashCode());
879 }
880 }
881
pierventre4f68ffa2021-03-09 22:52:14 +0100882 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
883 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
884 if (policyType == PolicyType.REDIRECT) {
885 metaBuilder.matchMetadata(EDGE_PORT);
886 }
pierventre30368ab2021-02-24 23:23:22 +0100887 return DefaultForwardingObjective.builder()
Wailok Shum37dd29a2021-04-27 18:13:55 +0800888 .withPriority(trafficMatch.trafficMatchPriority().priority())
pierventre30368ab2021-02-24 23:23:22 +0100889 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100890 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100891 .fromApp(appId)
892 .withFlag(ForwardingObjective.Flag.VERSATILE)
893 .makePermanent();
894 }
895
pierventre4f68ffa2021-03-09 22:52:14 +0100896 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
897 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
898 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800899 List<DeviceId> edgeDevices = getEdgeDeviceIds();
pierventre4f68ffa2021-03-09 22:52:14 +0100900 egressLinks.stream()
901 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
902 !edgeDevices.contains(link.dst().deviceId()))
903 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
904 // No ports no friend
905 if (egressPortsToEnforce.isEmpty()) {
906 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
907 return null;
908 }
909 // We need to add a treatment for each valid egress port. The treatment
910 // requires to set src and dst mac address and set the egress port. We are
911 // deliberately not providing the metadata to prevent the programming of
912 // some tables which are already controlled by SegmentRouting or are unnecessary
913 int nextId = flowObjectiveService.allocateNextId();
914 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
915 .withId(nextId)
916 .withType(NextObjective.Type.HASHED)
917 .fromApp(appId);
918 MacAddress srcDeviceMac;
919 try {
Wailok Shume2ab3d62021-05-27 04:40:38 +0800920 srcDeviceMac = getDeviceMacAddress(srcDevice);
pierventre4f68ffa2021-03-09 22:52:14 +0100921 } catch (DeviceConfigNotFoundException e) {
922 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
923 return null;
924 }
925 MacAddress neigborDeviceMac;
926 TrafficTreatment.Builder tBuilder;
927 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
928 try {
Wailok Shume2ab3d62021-05-27 04:40:38 +0800929 neigborDeviceMac = getDeviceMacAddress(entry.getValue());
pierventre4f68ffa2021-03-09 22:52:14 +0100930 } catch (DeviceConfigNotFoundException e) {
931 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
932 return null;
933 }
934 tBuilder = DefaultTrafficTreatment.builder()
935 .setEthSrc(srcDeviceMac)
936 .setEthDst(neigborDeviceMac)
937 .setOutput(entry.getKey().port());
938 builder.addTreatment(tBuilder.build());
939 }
940 return builder;
941 }
942
pierventre30368ab2021-02-24 23:23:22 +0100943 // Each map has an event listener enabling the events distribution across the cluster
944 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
945 @Override
946 public void event(MapEvent<PolicyId, PolicyRequest> event) {
947 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
948 event.oldValue() : event.newValue();
949 PolicyRequest policyRequest = value.value();
950 Policy policy = policyRequest.policy();
951 switch (event.type()) {
952 case INSERT:
953 case UPDATE:
954 switch (policyRequest.policyState()) {
955 case PENDING_ADD:
956 sendPolicy(policy, true);
957 break;
958 case PENDING_REMOVE:
959 sendPolicy(policy, false);
960 break;
961 case ADDED:
962 break;
963 default:
964 log.warn("Unknown policy state type {}", policyRequest.policyState());
965 }
966 break;
967 case REMOVE:
968 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100969 break;
970 default:
971 log.warn("Unknown event type {}", event.type());
972
973 }
974 }
975 }
976
977 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
978 @Override
979 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
980 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
981 event.oldValue() : event.newValue();
982 TrafficMatchRequest trafficMatchRequest = value.value();
983 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
984 switch (event.type()) {
985 case INSERT:
986 case UPDATE:
987 switch (trafficMatchRequest.trafficMatchState()) {
988 case PENDING_ADD:
989 sendTrafficMatch(trafficMatch, true);
990 break;
991 case PENDING_REMOVE:
992 sendTrafficMatch(trafficMatch, false);
993 break;
994 case ADDED:
995 break;
996 default:
997 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
998 }
999 break;
1000 case REMOVE:
1001 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
1002 break;
1003 default:
1004 log.warn("Unknown event type {}", event.type());
1005 }
1006 }
1007 }
1008
1009 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
1010 @Override
1011 public void event(MapEvent<String, Operation> event) {
1012 String key = event.key();
1013 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
1014 event.oldValue() : event.newValue();
1015 Operation operation = value.value();
1016 switch (event.type()) {
1017 case INSERT:
1018 case UPDATE:
1019 if (operation.isDone()) {
1020 if (operation.policy().isPresent()) {
1021 PolicyKey policyKey = PolicyKey.fromString(key);
1022 updatePolicy(policyKey.policyId(), operation.isInstall());
1023 } else if (operation.trafficMatch().isPresent()) {
1024 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
1025 } else {
1026 log.warn("Unknown pending operation");
1027 }
1028 }
1029 break;
1030 case REMOVE:
1031 break;
1032 default:
1033 log.warn("Unknown event type {}", event.type());
1034 }
1035 }
1036 }
1037
1038 // Using the work partition service defines who is in charge of a given policy.
1039 private boolean isLeader(PolicyId policyId) {
1040 final NodeId currentNodeId = clusterService.getLocalNode().id();
1041 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
1042 if (leader == null) {
1043 log.error("Fail to elect a leader for {}.", policyId);
1044 return false;
1045 }
1046 policyLeaderCache.put(policyId, leader);
1047 return currentNodeId.equals(leader);
1048 }
1049
1050 private Long hasher(PolicyId policyId) {
1051 return HASH_FN.newHasher()
1052 .putUnencodedChars(policyId.toString())
1053 .hash()
1054 .asLong();
1055 }
1056
pierventre4f68ffa2021-03-09 22:52:14 +01001057 // TODO Periodic checker, consider to add store and delegates.
1058
pierventre30368ab2021-02-24 23:23:22 +01001059 // Check periodically for any issue and try to resolve automatically if possible
1060 private final class PolicyChecker implements Runnable {
1061 @Override
1062 public void run() {
1063 }
1064 }
Wailok Shume2ab3d62021-05-27 04:40:38 +08001065
1066 // Netcfg event listener
1067 private class InternalConfigListener implements NetworkConfigListener {
1068 @Override
1069 public void event(NetworkConfigEvent event) {
1070 eventExecutor.execute(() -> {
1071 switch (event.type()) {
1072 case CONFIG_ADDED:
1073 case CONFIG_UPDATED:
1074 if (event.configClass() == SegmentRoutingDeviceConfig.class) {
1075 // Reset all policy state to PENDING_ADD
1076 ImmutableSet<PolicyId> policyIds = ImmutableSet.copyOf(policies.keySet());
1077 policyIds.forEach((policyId) -> {
1078 policies.computeIfPresent(policyId, (k, v) -> {
1079 v.policyState(PolicyState.PENDING_ADD);
1080 return v;
1081 });
1082 });
1083 // Reset all TrafficMatch state to PENDING_ADD
1084 ImmutableSet<TrafficMatchId> trafficMatchIds = ImmutableSet.copyOf(trafficMatches.keySet());
1085 trafficMatchIds.forEach((trafficMatchId) -> {
1086 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
1087 v.trafficMatchState(TrafficMatchState.PENDING_ADD);
1088 return v;
1089 });
1090 });
1091 }
1092 break;
1093 default:
1094 break;
1095 }
1096 });
1097 }
1098 }
1099
1100 private List<DeviceId> getEdgeDeviceIds() {
1101 List<DeviceId> edges = new ArrayList<>();
1102 deviceService.getDevices().forEach((device) -> {
1103 DeviceId deviceId = device.id();
1104 SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
1105 if (srDevCfg != null && srDevCfg.isEdgeRouter()) {
1106 edges.add(deviceId);
1107 }
1108 });
1109 return edges;
1110 }
1111
1112 private MacAddress getDeviceMacAddress(DeviceId deviceId) throws DeviceConfigNotFoundException {
1113 SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
1114 if (srDevCfg != null) {
1115 return srDevCfg.routerMac();
1116 } else {
1117 throw new DeviceConfigNotFoundException("Config for device: " + deviceId.toString() + " not found");
1118 }
1119 }
pierventre30368ab2021-02-24 23:23:22 +01001120}