blob: 78b86001aad6d1714e5e196967e0ba852a13bc5d [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010024import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010025import org.onlab.util.KryoNamespace;
26import org.onlab.util.PredictableExecutor;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080029import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010032import org.onosproject.net.ConnectPoint;
pierventre30368ab2021-02-24 23:23:22 +010033import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010034import org.onosproject.net.Link;
35import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010036import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010037import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.flow.TrafficTreatment;
39import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010040import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flowobjective.DefaultObjectiveContext;
42import org.onosproject.net.flowobjective.FlowObjectiveService;
43import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010044import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010045import org.onosproject.net.flowobjective.Objective;
46import org.onosproject.net.flowobjective.ObjectiveContext;
47import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010048import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010049import org.onosproject.segmentrouting.SegmentRoutingService;
50import org.onosproject.segmentrouting.policy.api.DropPolicy;
51import org.onosproject.segmentrouting.policy.api.Policy;
pierventre30368ab2021-02-24 23:23:22 +010052import org.onosproject.segmentrouting.policy.api.PolicyData;
53import org.onosproject.segmentrouting.policy.api.PolicyId;
54import org.onosproject.segmentrouting.policy.api.PolicyService;
55import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010056import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010057import org.onosproject.segmentrouting.policy.api.TrafficMatch;
58import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
59import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
Wailok Shum37dd29a2021-04-27 18:13:55 +080060import org.onosproject.segmentrouting.policy.api.TrafficMatchPriority;
pierventre30368ab2021-02-24 23:23:22 +010061import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
pierventreb37a11a2021-03-18 16:50:04 +010062import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
63import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
pierventre30368ab2021-02-24 23:23:22 +010064import org.onosproject.store.serializers.KryoNamespaces;
65import org.onosproject.store.service.ConsistentMap;
66import org.onosproject.store.service.MapEvent;
67import org.onosproject.store.service.MapEventListener;
68import org.onosproject.store.service.Serializer;
69import org.onosproject.store.service.StorageException;
70import org.onosproject.store.service.StorageService;
71import org.onosproject.store.service.Versioned;
72import org.osgi.service.component.annotations.Activate;
73import org.osgi.service.component.annotations.Component;
74import org.osgi.service.component.annotations.Deactivate;
75import org.osgi.service.component.annotations.Reference;
76import org.osgi.service.component.annotations.ReferenceCardinality;
77import org.slf4j.Logger;
78
79import java.util.List;
80import java.util.Map;
81import java.util.Optional;
82import java.util.Set;
83import java.util.concurrent.CompletableFuture;
84import java.util.stream.Collectors;
85
86import static org.onlab.util.Tools.groupedThreads;
87import static org.slf4j.LoggerFactory.getLogger;
88
89/**
90 * Implementation of the policy service interface.
91 */
92@Component(immediate = true, service = PolicyService.class)
93public class PolicyManager implements PolicyService {
94
95 // App related things
96 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
97 private ApplicationId appId;
98 private Logger log = getLogger(getClass());
99 static final String KEY_SEPARATOR = "|";
100
pierventre4f68ffa2021-03-09 22:52:14 +0100101 // Supported policies
pierventreb37a11a2021-03-18 16:50:04 +0100102 private static final Set<PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
pierventre4f68ffa2021-03-09 22:52:14 +0100103 PolicyType.DROP, PolicyType.REDIRECT);
104
pierventre12aaa052021-03-22 12:56:03 +0100105 // Driver should use this meta to match ig_port_type field in the ACL table
pierventre4f68ffa2021-03-09 22:52:14 +0100106 private static final long EDGE_PORT = 1;
pierventre4f68ffa2021-03-09 22:52:14 +0100107
pierventre30368ab2021-02-24 23:23:22 +0100108 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
109 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
110 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100111 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100112 private static final String POLICY_STORE = "sr-policy-store";
113 private ConsistentMap<PolicyId, PolicyRequest> policies;
114 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
115 private Map<PolicyId, PolicyRequest> policiesMap;
116
117 private static final String OPS_STORE = "sr-ops-store";
118 private ConsistentMap<String, Operation> operations;
119 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
120 private Map<String, Operation> opsMap;
121
122 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
123 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
124 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
125 new InternalTMatchMapEventListener();
126 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
127
128 // Leadership related objects - consistent hashing
129 private static final HashFunction HASH_FN = Hashing.md5();
130 // Read only cache of the Policy leader
131 private Map<PolicyId, NodeId> policyLeaderCache;
132
133 // Worker threads for policy and traffic match related ops
134 private static final int DEFAULT_THREADS = 4;
135 protected PredictableExecutor workers;
136
137 // Serializers and ONOS services
138 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
139 .register(KryoNamespaces.API)
140 .register(PolicyId.class)
141 .register(PolicyType.class)
142 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100143 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100144 .register(PolicyState.class)
145 .register(PolicyRequest.class)
146 .register(TrafficMatchId.class)
147 .register(TrafficMatchState.class)
Wailok Shum37dd29a2021-04-27 18:13:55 +0800148 .register(TrafficMatchPriority.class)
pierventre30368ab2021-02-24 23:23:22 +0100149 .register(TrafficMatch.class)
150 .register(TrafficMatchRequest.class)
151 .register(Operation.class);
152 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
153
154 @Reference(cardinality = ReferenceCardinality.MANDATORY)
155 private CoreService coreService;
156
157 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800158 private CodecService codecService;
159
160 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100161 private StorageService storageService;
162
163 @Reference(cardinality = ReferenceCardinality.MANDATORY)
164 private ClusterService clusterService;
165
166 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100167 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100168
Wailok Shumee90c132021-03-11 21:00:11 +0800169 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100170 private SegmentRoutingService srService;
pierventre30368ab2021-02-24 23:23:22 +0100171
172 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100173 private FlowObjectiveService flowObjectiveService;
174
175 @Reference(cardinality = ReferenceCardinality.MANDATORY)
176 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100177
178 @Activate
179 public void activate() {
180 appId = coreService.registerApplication(APP_NAME);
181
Wailok Shumee90c132021-03-11 21:00:11 +0800182 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
183 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
184 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
185
pierventre30368ab2021-02-24 23:23:22 +0100186 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
187 .withName(POLICY_STORE)
188 .withSerializer(serializer).build();
189 policies.addListener(mapPolListener);
190 policiesMap = policies.asJavaMap();
191
192 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
193 .withName(TRAFFIC_MATCH_STORE)
194 .withSerializer(serializer).build();
195 trafficMatches.addListener(mapTMatchListener);
196 trafficMatchesMap = trafficMatches.asJavaMap();
197
198 operations = storageService.<String, Operation>consistentMapBuilder()
199 .withName(OPS_STORE)
200 .withSerializer(serializer).build();
201 operations.addListener(mapOpsListener);
202 opsMap = operations.asJavaMap();
203
204 policyLeaderCache = Maps.newConcurrentMap();
205
206 workers = new PredictableExecutor(DEFAULT_THREADS,
207 groupedThreads("sr-policy", "worker-%d", log));
208
209 log.info("Started");
210 }
211
212 @Deactivate
213 public void deactivate() {
214 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800215 codecService.unregisterCodec(DropPolicy.class);
216 codecService.unregisterCodec(RedirectPolicy.class);
217 codecService.unregisterCodec(TrafficMatch.class);
pierventre30368ab2021-02-24 23:23:22 +0100218 policies.removeListener(mapPolListener);
219 policies.destroy();
220 policiesMap.clear();
221 trafficMatches.removeListener(mapTMatchListener);
222 trafficMatches.destroy();
223 trafficMatchesMap.clear();
224 operations.removeListener(mapOpsListener);
225 operations.destroy();
226 operations.clear();
227 workers.shutdown();
228
229 log.info("Stopped");
230 }
231
232 @Override
233 //FIXME update does not work well
234 public PolicyId addOrUpdatePolicy(Policy policy) {
235 PolicyId policyId = policy.policyId();
236 try {
237 policies.put(policyId, new PolicyRequest(policy));
238 } catch (StorageException e) {
239 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
240 e.getMessage(), e);
241 policyId = null;
242 }
243 return policyId;
244 }
245
246 @Override
247 public boolean removePolicy(PolicyId policyId) {
248 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100249 if (dependingTrafficMatches(policyId).isPresent()) {
250 if (log.isDebugEnabled()) {
251 log.debug("Found depending traffic matches");
252 }
253 return false;
254 }
pierventre30368ab2021-02-24 23:23:22 +0100255 try {
256 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
257 if (v.policyState() != PolicyState.PENDING_REMOVE) {
258 v.policyState(PolicyState.PENDING_REMOVE);
259 }
260 return v;
261 })) != null;
262 } catch (StorageException e) {
263 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
264 e.getMessage(), e);
265 result = false;
266 }
267 return result;
268 }
269
270 @Override
271 public Set<PolicyData> policies(Set<PolicyType> filter) {
272 Set<PolicyData> policyData = Sets.newHashSet();
273 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
274 Set<PolicyRequest> policyRequests;
275 if (filter.isEmpty()) {
276 policyRequests = ImmutableSet.copyOf(policiesMap.values());
277 } else {
278 policyRequests = policiesMap.values().stream()
279 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
280 .collect(Collectors.toSet());
281 }
282 PolicyKey policyKey;
283 List<String> ops;
284 for (PolicyRequest policyRequest : policyRequests) {
285 ops = Lists.newArrayList();
286 for (DeviceId deviceId : edgeDeviceIds) {
287 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
288 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
289 if (operation != null) {
290 ops.add(deviceId + " -> " + operation.toStringMinimal());
291 }
292 }
293 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
294 }
295 return policyData;
296 }
297
298 @Override
299 //FIXME update does not work well
300 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
301 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
302 try {
303 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
304 } catch (StorageException e) {
305 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
306 e.getMessage(), e);
307 trafficMatchId = null;
308 }
309 return trafficMatchId;
310 }
311
312 @Override
313 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
314 boolean result;
315 try {
316 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
317 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
318 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
319 }
320 return v;
321 })) != null;
322 } catch (StorageException e) {
323 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
324 e.getMessage(), e);
325 result = false;
326 }
327 return result;
328 }
329
330 @Override
331 public Set<TrafficMatchData> trafficMatches() {
332 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
333 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
334 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
335 TrafficMatchKey trafficMatchKey;
336 List<String> ops;
337 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
338 ops = Lists.newArrayList();
339 for (DeviceId deviceId : edgeDeviceIds) {
340 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
341 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
342 if (operation != null) {
343 ops.add(deviceId + " -> " + operation.toStringMinimal());
344 }
345 }
346 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
347 trafficMatchRequest.trafficMatch(), ops));
348 }
349 return trafficMatchData;
350 }
351
352 // Install/remove the policies on the edge devices
353 private void sendPolicy(Policy policy, boolean install) {
354 if (!isLeader(policy.policyId())) {
355 if (log.isDebugEnabled()) {
356 log.debug("Instance is not leader for policy {}", policy.policyId());
357 }
358 return;
359 }
360 // We know that we are the leader, offloads to the workers the remaining
361 // part: issue fobj installation/removal and update the maps
362 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
363 for (DeviceId deviceId : edgeDeviceIds) {
364 workers.execute(() -> {
365 if (install) {
366 installPolicyInDevice(deviceId, policy);
367 } else {
368 removePolicyInDevice(deviceId, policy);
369 }
370 }, deviceId.hashCode());
371 }
372 }
373
374 // Orchestrate policy installation according to the type
375 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100376 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100377 log.warn("Policy {} type {} not yet supported",
378 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100379 return;
380 }
381 PolicyKey policyKey;
382 Operation.Builder operation;
383 if (log.isDebugEnabled()) {
384 log.debug("Installing {} policy {} for dev: {}",
385 policy.policyType(), policy.policyId(), deviceId);
386 }
387 policyKey = new PolicyKey(deviceId, policy.policyId());
388 operation = Operation.builder()
389 .isInstall(true)
390 .policy(policy);
391 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
392 if (policy.policyType() == PolicyType.DROP) {
393 // DROP policies do not need the next objective installation phase
394 // we can update directly the map and signal the ops as done
395 operation.isDone(true);
396 operations.put(policyKey.toString(), operation.build());
397 } else if (policy.policyType() == PolicyType.REDIRECT) {
398 // REDIRECT Uses next objective context to update the ops as done when
399 // it returns successfully. In the other cases leaves the ops as undone
400 // and the relative policy will remain in pending.
401 operations.put(policyKey.toString(), operation.build());
402 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
403 // Handle error here - leave the operation as undone and pending
404 if (builder != null) {
405 CompletableFuture<Objective> future = new CompletableFuture<>();
406 if (log.isDebugEnabled()) {
407 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
408 }
409 ObjectiveContext context = new DefaultObjectiveContext(
410 (objective) -> {
411 if (log.isDebugEnabled()) {
412 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
413 policy.policyId(), deviceId);
414 }
415 future.complete(objective);
416 },
417 (objective, error) -> {
418 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
419 policy.policyId(), error, deviceId);
420 future.complete(null);
421 });
422 // Context is not serializable
423 NextObjective serializableObjective = builder.add();
424 flowObjectiveService.next(deviceId, builder.add(context));
425 future.whenComplete((objective, ex) -> {
426 if (ex != null) {
427 log.error("Exception installing REDIRECT next objective", ex);
428 } else if (objective != null) {
429 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
430 if (!v.isDone() && v.isInstall()) {
431 v.isDone(true);
432 v.objectiveOperation(serializableObjective);
433 }
434 return v;
435 });
436 }
437 });
438 }
pierventre30368ab2021-02-24 23:23:22 +0100439 }
440 }
441
442 // Remove policy in a device according to the type
443 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100444 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
445 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
446 // Policy might be still in pending or not present anymore
447 if (operation == null || operation.objectiveOperation() == null) {
448 log.warn("There are no ops associated with {}", policyKey);
449 operation = Operation.builder()
450 .isDone(true)
451 .isInstall(false)
452 .policy(policy)
453 .build();
454 operations.put(policyKey.toString(), operation);
455 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100456 if (log.isDebugEnabled()) {
457 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
458 }
459 Operation.Builder operationBuilder = Operation.builder()
460 .isInstall(false)
461 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100462 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100463 operationBuilder.isDone(true);
464 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100465 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100466 // REDIRECT has to remove the next objective first
467 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
468 operations.put(policyKey.toString(), operationBuilder.build());
469 NextObjective.Builder builder = oldObj.copy();
470 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100471 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100472 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100473 }
pierventre4f68ffa2021-03-09 22:52:14 +0100474 ObjectiveContext context = new DefaultObjectiveContext(
475 (objective) -> {
476 if (log.isDebugEnabled()) {
477 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
478 policy.policyId(), deviceId);
479 }
480 future.complete(objective);
481 },
482 (objective, error) -> {
483 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
484 policy.policyId(), error, deviceId);
485 future.complete(null);
486 });
487 NextObjective serializableObjective = builder.remove();
488 flowObjectiveService.next(deviceId, builder.remove(context));
489 future.whenComplete((objective, ex) -> {
490 if (ex != null) {
491 log.error("Exception Removing REDIRECT next objective", ex);
492 } else if (objective != null) {
493 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
494 if (!v.isDone() && !v.isInstall()) {
495 v.isDone(true);
496 v.objectiveOperation(serializableObjective);
497 }
498 return v;
499 });
500 }
501 });
pierventre30368ab2021-02-24 23:23:22 +0100502 }
503 }
504 }
505
506 // Updates policy status if all the pending ops are done
507 private void updatePolicy(PolicyId policyId, boolean install) {
508 if (!isLeader(policyId)) {
509 if (log.isDebugEnabled()) {
510 log.debug("Instance is not leader for policy {}", policyId);
511 }
512 return;
513 }
514 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
515 }
516
517 private void updatePolicyInternal(PolicyId policyId, boolean install) {
518 // If there are no more pending ops we are ready to go; potentially we can check
519 // if the id is contained. Updates policies only if they are still present
520 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
521 .filter(entry -> entry.getValue().value().policy().isPresent())
522 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
523 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
524 .findFirst();
525 if (notYetDone.isEmpty()) {
526 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
527 if (v.policyState() == PolicyState.PENDING_ADD && install) {
528 if (log.isDebugEnabled()) {
529 log.debug("Policy {} is ready", policyId);
530 }
531 v.policyState(PolicyState.ADDED);
532 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
533 if (log.isDebugEnabled()) {
534 log.debug("Policy {} is removed", policyId);
535 }
536 v = null;
537 }
538 return v;
539 }));
540 // Greedy check for pending traffic matches
541 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
542 updatePendingTrafficMatches(policyRequest.policyId());
543 }
544 }
545 }
546
547 // Install/remove the traffic match on the edge devices
548 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
549 if (!isLeader(trafficMatch.policyId())) {
550 if (log.isDebugEnabled()) {
551 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
552 }
553 return;
554 }
555 // We know that we are the leader, offloads to the workers the remaining
556 // part: issue fobj installation/removal and update the maps
557 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
558 for (DeviceId deviceId : edgeDeviceIds) {
559 workers.execute(() -> {
560 if (install) {
561 installTrafficMatchToDevice(deviceId, trafficMatch);
562 } else {
563 removeTrafficMatchInDevice(deviceId, trafficMatch);
564 }
565 }, deviceId.hashCode());
566 }
567 }
568
569 // Orchestrate traffic match installation according to the type
570 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
571 if (log.isDebugEnabled()) {
572 log.debug("Installing traffic match {} associated to policy {}",
573 trafficMatch.trafficMatchId(), trafficMatch.policyId());
574 }
pierventre30368ab2021-02-24 23:23:22 +0100575 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
pierventre4f68ffa2021-03-09 22:52:14 +0100576 Operation trafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
577 if (trafficOperation != null && trafficOperation.isInstall()) {
578 if (log.isDebugEnabled()) {
579 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
580 "for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
581 }
582 return;
583 }
pierventre30368ab2021-02-24 23:23:22 +0100584 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
585 // policies require to retrieve the next Id and sets the next step.
586 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
587 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
588 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100589 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
590 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
591 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100592 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
593 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
594 return;
595 }
pierventre4f68ffa2021-03-09 22:52:14 +0100596 // Updates the store and then send the versatile fwd objective to the pipeliner
597 trafficOperation = Operation.builder()
598 .isInstall(true)
599 .trafficMatch(trafficMatch)
600 .build();
601 operations.put(trafficMatchKey.toString(), trafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100602 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100603 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100604 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100605 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100606 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
607 .wipeDeferred()
608 .build();
609 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100610 } else if (policy.policyType() == PolicyType.REDIRECT) {
611
612 // Here we need to set only the next step
613 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100614 }
pierventre4f68ffa2021-03-09 22:52:14 +0100615 // Once, the fwd objective has completed its execution, we update the policiesOps map
616 CompletableFuture<Objective> future = new CompletableFuture<>();
617 if (log.isDebugEnabled()) {
618 log.debug("Installing forwarding objective for dev: {}", deviceId);
619 }
620 ObjectiveContext context = new DefaultObjectiveContext(
621 (objective) -> {
622 if (log.isDebugEnabled()) {
623 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
624 }
625 future.complete(objective);
626 },
627 (objective, error) -> {
628 log.warn("Failed to install forwarding objective for policy {}: {}",
629 trafficMatch.policyId(), error);
630 future.complete(null);
631 });
632 // Context is not serializable
633 ForwardingObjective serializableObjective = builder.add();
634 flowObjectiveService.forward(deviceId, builder.add(context));
635 future.whenComplete((objective, ex) -> {
636 if (ex != null) {
637 log.error("Exception installing forwarding objective", ex);
638 } else if (objective != null) {
639 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
640 if (!v.isDone() && v.isInstall()) {
641 v.isDone(true);
642 v.objectiveOperation(serializableObjective);
643 }
644 return v;
645 });
646 }
647 });
pierventre30368ab2021-02-24 23:23:22 +0100648 }
649
650 // Updates traffic match status if all the pending ops are done
651 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
652 if (!isLeader(trafficMatch.policyId())) {
653 if (log.isDebugEnabled()) {
654 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
655 }
656 return;
657 }
658 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
659 trafficMatch.policyId().hashCode());
660 }
661
662 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
663 // If there are no more pending ops we are ready to go; potentially we can check
664 // if the id is contained. Updates traffic matches only if they are still present
665 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
666 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
667 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
668 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
669 .findFirst();
670 if (notYetDone.isEmpty()) {
671 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
672 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
673 if (log.isDebugEnabled()) {
674 log.debug("Traffic match {} is ready", trafficMatchId);
675 }
676 v.trafficMatchState(TrafficMatchState.ADDED);
677 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
678 if (log.isDebugEnabled()) {
679 log.debug("Traffic match {} is removed", trafficMatchId);
680 }
681 v = null;
682 }
683 return v;
684 });
685 }
686 }
687
688 // Look for any pending traffic match waiting for the policy
689 private void updatePendingTrafficMatches(PolicyId policyId) {
690 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
691 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
692 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
693 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
694 .collect(Collectors.toSet());
695 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
696 sendTrafficMatch(trafficMatch.trafficMatch(), true);
697 }
698 }
699
700 // Traffic match removal in a device
701 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
702 if (log.isDebugEnabled()) {
703 log.debug("Removing traffic match {} associated to policy {}",
704 trafficMatch.trafficMatchId(), trafficMatch.policyId());
705 }
706 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
707 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
708 if (operation == null || operation.objectiveOperation() == null) {
709 log.warn("There are no ops associated with {}", trafficMatchKey);
710 operation = Operation.builder()
711 .isDone(true)
712 .isInstall(false)
713 .trafficMatch(trafficMatch)
714 .build();
715 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100716 } else if (!operation.isInstall()) {
717 if (log.isDebugEnabled()) {
718 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
719 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
720 }
pierventre30368ab2021-02-24 23:23:22 +0100721 } else {
722 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
723 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100724 .isInstall(false)
725 .build();
726 operations.put(trafficMatchKey.toString(), operation);
727 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
728 CompletableFuture<Objective> future = new CompletableFuture<>();
729 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100730 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100731 }
732 ObjectiveContext context = new DefaultObjectiveContext(
733 (objective) -> {
734 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100735 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100736 }
737 future.complete(objective);
738 },
739 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100740 log.warn("Failed to remove forwarding objective for policy {}: {}",
741 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100742 future.complete(null);
743 });
744 ForwardingObjective serializableObjective = builder.remove();
745 flowObjectiveService.forward(deviceId, builder.remove(context));
746 future.whenComplete((objective, ex) -> {
747 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100748 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100749 } else if (objective != null) {
750 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
751 if (!v.isDone() && !v.isInstall()) {
752 v.isDone(true);
753 v.objectiveOperation(serializableObjective);
754 }
755 return v;
756 });
757 }
758 });
759 }
760 }
761
pierventre4f68ffa2021-03-09 22:52:14 +0100762 // It is used when a policy has been removed but there are still traffic matches depending on it
763 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
764 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100765 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
766 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
767 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100768 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100769 }
770
771 // Utility that removes operations related to a policy or to a traffic match.
772 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
773 if (!isLeader(policyId)) {
774 if (log.isDebugEnabled()) {
775 log.debug("Instance is not leader for policy {}", policyId);
776 }
777 return;
778 }
779 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
780 for (DeviceId deviceId : edgeDeviceIds) {
781 workers.execute(() -> {
782 String key;
783 if (trafficMatchId.isPresent()) {
784 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
785 } else {
786 key = new PolicyKey(deviceId, policyId).toString();
787 }
788 operations.remove(key);
789 }, deviceId.hashCode());
790 }
791 }
792
pierventre4f68ffa2021-03-09 22:52:14 +0100793 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
794 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
795 if (policyType == PolicyType.REDIRECT) {
796 metaBuilder.matchMetadata(EDGE_PORT);
797 }
pierventre30368ab2021-02-24 23:23:22 +0100798 return DefaultForwardingObjective.builder()
Wailok Shum37dd29a2021-04-27 18:13:55 +0800799 .withPriority(trafficMatch.trafficMatchPriority().priority())
pierventre30368ab2021-02-24 23:23:22 +0100800 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100801 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100802 .fromApp(appId)
803 .withFlag(ForwardingObjective.Flag.VERSATILE)
804 .makePermanent();
805 }
806
pierventre4f68ffa2021-03-09 22:52:14 +0100807 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
808 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
809 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
810 List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
811 egressLinks.stream()
812 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
813 !edgeDevices.contains(link.dst().deviceId()))
814 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
815 // No ports no friend
816 if (egressPortsToEnforce.isEmpty()) {
817 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
818 return null;
819 }
820 // We need to add a treatment for each valid egress port. The treatment
821 // requires to set src and dst mac address and set the egress port. We are
822 // deliberately not providing the metadata to prevent the programming of
823 // some tables which are already controlled by SegmentRouting or are unnecessary
824 int nextId = flowObjectiveService.allocateNextId();
825 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
826 .withId(nextId)
827 .withType(NextObjective.Type.HASHED)
828 .fromApp(appId);
829 MacAddress srcDeviceMac;
830 try {
831 srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
832 } catch (DeviceConfigNotFoundException e) {
833 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
834 return null;
835 }
836 MacAddress neigborDeviceMac;
837 TrafficTreatment.Builder tBuilder;
838 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
839 try {
840 neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
841 } catch (DeviceConfigNotFoundException e) {
842 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
843 return null;
844 }
845 tBuilder = DefaultTrafficTreatment.builder()
846 .setEthSrc(srcDeviceMac)
847 .setEthDst(neigborDeviceMac)
848 .setOutput(entry.getKey().port());
849 builder.addTreatment(tBuilder.build());
850 }
851 return builder;
852 }
853
pierventre30368ab2021-02-24 23:23:22 +0100854 // Each map has an event listener enabling the events distribution across the cluster
855 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
856 @Override
857 public void event(MapEvent<PolicyId, PolicyRequest> event) {
858 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
859 event.oldValue() : event.newValue();
860 PolicyRequest policyRequest = value.value();
861 Policy policy = policyRequest.policy();
862 switch (event.type()) {
863 case INSERT:
864 case UPDATE:
865 switch (policyRequest.policyState()) {
866 case PENDING_ADD:
867 sendPolicy(policy, true);
868 break;
869 case PENDING_REMOVE:
870 sendPolicy(policy, false);
871 break;
872 case ADDED:
873 break;
874 default:
875 log.warn("Unknown policy state type {}", policyRequest.policyState());
876 }
877 break;
878 case REMOVE:
879 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100880 break;
881 default:
882 log.warn("Unknown event type {}", event.type());
883
884 }
885 }
886 }
887
888 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
889 @Override
890 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
891 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
892 event.oldValue() : event.newValue();
893 TrafficMatchRequest trafficMatchRequest = value.value();
894 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
895 switch (event.type()) {
896 case INSERT:
897 case UPDATE:
898 switch (trafficMatchRequest.trafficMatchState()) {
899 case PENDING_ADD:
900 sendTrafficMatch(trafficMatch, true);
901 break;
902 case PENDING_REMOVE:
903 sendTrafficMatch(trafficMatch, false);
904 break;
905 case ADDED:
906 break;
907 default:
908 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
909 }
910 break;
911 case REMOVE:
912 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
913 break;
914 default:
915 log.warn("Unknown event type {}", event.type());
916 }
917 }
918 }
919
920 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
921 @Override
922 public void event(MapEvent<String, Operation> event) {
923 String key = event.key();
924 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
925 event.oldValue() : event.newValue();
926 Operation operation = value.value();
927 switch (event.type()) {
928 case INSERT:
929 case UPDATE:
930 if (operation.isDone()) {
931 if (operation.policy().isPresent()) {
932 PolicyKey policyKey = PolicyKey.fromString(key);
933 updatePolicy(policyKey.policyId(), operation.isInstall());
934 } else if (operation.trafficMatch().isPresent()) {
935 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
936 } else {
937 log.warn("Unknown pending operation");
938 }
939 }
940 break;
941 case REMOVE:
942 break;
943 default:
944 log.warn("Unknown event type {}", event.type());
945 }
946 }
947 }
948
949 // Using the work partition service defines who is in charge of a given policy.
950 private boolean isLeader(PolicyId policyId) {
951 final NodeId currentNodeId = clusterService.getLocalNode().id();
952 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
953 if (leader == null) {
954 log.error("Fail to elect a leader for {}.", policyId);
955 return false;
956 }
957 policyLeaderCache.put(policyId, leader);
958 return currentNodeId.equals(leader);
959 }
960
961 private Long hasher(PolicyId policyId) {
962 return HASH_FN.newHasher()
963 .putUnencodedChars(policyId.toString())
964 .hash()
965 .asLong();
966 }
967
pierventre4f68ffa2021-03-09 22:52:14 +0100968 // TODO Periodic checker, consider to add store and delegates.
969
pierventre30368ab2021-02-24 23:23:22 +0100970 // Check periodically for any issue and try to resolve automatically if possible
971 private final class PolicyChecker implements Runnable {
972 @Override
973 public void run() {
974 }
975 }
976}