blob: 8f7c69c4edb52ab8e0f38af0a2ad60638496b91f [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010024import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010025import org.onlab.util.KryoNamespace;
26import org.onlab.util.PredictableExecutor;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
29import org.onosproject.core.ApplicationId;
30import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010031import org.onosproject.net.ConnectPoint;
pierventre30368ab2021-02-24 23:23:22 +010032import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010033import org.onosproject.net.Link;
34import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010035import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010036import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010037import org.onosproject.net.flow.TrafficTreatment;
38import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010039import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010040import org.onosproject.net.flowobjective.DefaultObjectiveContext;
41import org.onosproject.net.flowobjective.FlowObjectiveService;
42import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010043import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010044import org.onosproject.net.flowobjective.Objective;
45import org.onosproject.net.flowobjective.ObjectiveContext;
46import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010047import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010048import org.onosproject.segmentrouting.SegmentRoutingService;
pierventre4f68ffa2021-03-09 22:52:14 +010049import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
pierventre30368ab2021-02-24 23:23:22 +010050import org.onosproject.segmentrouting.policy.api.DropPolicy;
51import org.onosproject.segmentrouting.policy.api.Policy;
52import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
53import org.onosproject.segmentrouting.policy.api.PolicyData;
54import org.onosproject.segmentrouting.policy.api.PolicyId;
55import org.onosproject.segmentrouting.policy.api.PolicyService;
56import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010057import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010058import org.onosproject.segmentrouting.policy.api.TrafficMatch;
59import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
60import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
61import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
62import org.onosproject.store.serializers.KryoNamespaces;
63import org.onosproject.store.service.ConsistentMap;
64import org.onosproject.store.service.MapEvent;
65import org.onosproject.store.service.MapEventListener;
66import org.onosproject.store.service.Serializer;
67import org.onosproject.store.service.StorageException;
68import org.onosproject.store.service.StorageService;
69import org.onosproject.store.service.Versioned;
70import org.osgi.service.component.annotations.Activate;
71import org.osgi.service.component.annotations.Component;
72import org.osgi.service.component.annotations.Deactivate;
73import org.osgi.service.component.annotations.Reference;
74import org.osgi.service.component.annotations.ReferenceCardinality;
75import org.slf4j.Logger;
76
77import java.util.List;
78import java.util.Map;
79import java.util.Optional;
80import java.util.Set;
81import java.util.concurrent.CompletableFuture;
82import java.util.stream.Collectors;
83
84import static org.onlab.util.Tools.groupedThreads;
85import static org.slf4j.LoggerFactory.getLogger;
86
87/**
88 * Implementation of the policy service interface.
89 */
90@Component(immediate = true, service = PolicyService.class)
91public class PolicyManager implements PolicyService {
92
93 // App related things
94 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
95 private ApplicationId appId;
96 private Logger log = getLogger(getClass());
97 static final String KEY_SEPARATOR = "|";
98
pierventre4f68ffa2021-03-09 22:52:14 +010099 // Supported policies
100 private static final Set<Policy.PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
101 PolicyType.DROP, PolicyType.REDIRECT);
102
103 // Driver should use this meta to match port_is_edge field in the ACL table
104 private static final long EDGE_PORT = 1;
105 private static final long INFRA_PORT = 0;
106
pierventre30368ab2021-02-24 23:23:22 +0100107 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
108 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
109 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100110 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100111 private static final String POLICY_STORE = "sr-policy-store";
112 private ConsistentMap<PolicyId, PolicyRequest> policies;
113 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
114 private Map<PolicyId, PolicyRequest> policiesMap;
115
116 private static final String OPS_STORE = "sr-ops-store";
117 private ConsistentMap<String, Operation> operations;
118 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
119 private Map<String, Operation> opsMap;
120
121 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
122 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
123 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
124 new InternalTMatchMapEventListener();
125 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
126
127 // Leadership related objects - consistent hashing
128 private static final HashFunction HASH_FN = Hashing.md5();
129 // Read only cache of the Policy leader
130 private Map<PolicyId, NodeId> policyLeaderCache;
131
132 // Worker threads for policy and traffic match related ops
133 private static final int DEFAULT_THREADS = 4;
134 protected PredictableExecutor workers;
135
136 // Serializers and ONOS services
137 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(PolicyId.class)
140 .register(PolicyType.class)
141 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100142 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100143 .register(PolicyState.class)
144 .register(PolicyRequest.class)
145 .register(TrafficMatchId.class)
146 .register(TrafficMatchState.class)
147 .register(TrafficMatch.class)
148 .register(TrafficMatchRequest.class)
149 .register(Operation.class);
150 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
153 private CoreService coreService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
156 private StorageService storageService;
157
158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 private ClusterService clusterService;
160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100162 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100163
164 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
pierventre4f68ffa2021-03-09 22:52:14 +0100165 private SegmentRoutingService srService;
pierventre30368ab2021-02-24 23:23:22 +0100166
167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100168 private FlowObjectiveService flowObjectiveService;
169
170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
171 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100172
173 @Activate
174 public void activate() {
175 appId = coreService.registerApplication(APP_NAME);
176
177 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
178 .withName(POLICY_STORE)
179 .withSerializer(serializer).build();
180 policies.addListener(mapPolListener);
181 policiesMap = policies.asJavaMap();
182
183 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
184 .withName(TRAFFIC_MATCH_STORE)
185 .withSerializer(serializer).build();
186 trafficMatches.addListener(mapTMatchListener);
187 trafficMatchesMap = trafficMatches.asJavaMap();
188
189 operations = storageService.<String, Operation>consistentMapBuilder()
190 .withName(OPS_STORE)
191 .withSerializer(serializer).build();
192 operations.addListener(mapOpsListener);
193 opsMap = operations.asJavaMap();
194
195 policyLeaderCache = Maps.newConcurrentMap();
196
197 workers = new PredictableExecutor(DEFAULT_THREADS,
198 groupedThreads("sr-policy", "worker-%d", log));
199
200 log.info("Started");
201 }
202
203 @Deactivate
204 public void deactivate() {
205 // Teardown everything
206 policies.removeListener(mapPolListener);
207 policies.destroy();
208 policiesMap.clear();
209 trafficMatches.removeListener(mapTMatchListener);
210 trafficMatches.destroy();
211 trafficMatchesMap.clear();
212 operations.removeListener(mapOpsListener);
213 operations.destroy();
214 operations.clear();
215 workers.shutdown();
216
217 log.info("Stopped");
218 }
219
220 @Override
221 //FIXME update does not work well
222 public PolicyId addOrUpdatePolicy(Policy policy) {
223 PolicyId policyId = policy.policyId();
224 try {
225 policies.put(policyId, new PolicyRequest(policy));
226 } catch (StorageException e) {
227 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
228 e.getMessage(), e);
229 policyId = null;
230 }
231 return policyId;
232 }
233
234 @Override
235 public boolean removePolicy(PolicyId policyId) {
236 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100237 if (dependingTrafficMatches(policyId).isPresent()) {
238 if (log.isDebugEnabled()) {
239 log.debug("Found depending traffic matches");
240 }
241 return false;
242 }
pierventre30368ab2021-02-24 23:23:22 +0100243 try {
244 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
245 if (v.policyState() != PolicyState.PENDING_REMOVE) {
246 v.policyState(PolicyState.PENDING_REMOVE);
247 }
248 return v;
249 })) != null;
250 } catch (StorageException e) {
251 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
252 e.getMessage(), e);
253 result = false;
254 }
255 return result;
256 }
257
258 @Override
259 public Set<PolicyData> policies(Set<PolicyType> filter) {
260 Set<PolicyData> policyData = Sets.newHashSet();
261 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
262 Set<PolicyRequest> policyRequests;
263 if (filter.isEmpty()) {
264 policyRequests = ImmutableSet.copyOf(policiesMap.values());
265 } else {
266 policyRequests = policiesMap.values().stream()
267 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
268 .collect(Collectors.toSet());
269 }
270 PolicyKey policyKey;
271 List<String> ops;
272 for (PolicyRequest policyRequest : policyRequests) {
273 ops = Lists.newArrayList();
274 for (DeviceId deviceId : edgeDeviceIds) {
275 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
276 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
277 if (operation != null) {
278 ops.add(deviceId + " -> " + operation.toStringMinimal());
279 }
280 }
281 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
282 }
283 return policyData;
284 }
285
286 @Override
287 //FIXME update does not work well
288 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
289 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
290 try {
291 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
292 } catch (StorageException e) {
293 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
294 e.getMessage(), e);
295 trafficMatchId = null;
296 }
297 return trafficMatchId;
298 }
299
300 @Override
301 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
302 boolean result;
303 try {
304 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
305 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
306 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
307 }
308 return v;
309 })) != null;
310 } catch (StorageException e) {
311 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
312 e.getMessage(), e);
313 result = false;
314 }
315 return result;
316 }
317
318 @Override
319 public Set<TrafficMatchData> trafficMatches() {
320 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
321 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
322 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
323 TrafficMatchKey trafficMatchKey;
324 List<String> ops;
325 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
326 ops = Lists.newArrayList();
327 for (DeviceId deviceId : edgeDeviceIds) {
328 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
329 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
330 if (operation != null) {
331 ops.add(deviceId + " -> " + operation.toStringMinimal());
332 }
333 }
334 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
335 trafficMatchRequest.trafficMatch(), ops));
336 }
337 return trafficMatchData;
338 }
339
340 // Install/remove the policies on the edge devices
341 private void sendPolicy(Policy policy, boolean install) {
342 if (!isLeader(policy.policyId())) {
343 if (log.isDebugEnabled()) {
344 log.debug("Instance is not leader for policy {}", policy.policyId());
345 }
346 return;
347 }
348 // We know that we are the leader, offloads to the workers the remaining
349 // part: issue fobj installation/removal and update the maps
350 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
351 for (DeviceId deviceId : edgeDeviceIds) {
352 workers.execute(() -> {
353 if (install) {
354 installPolicyInDevice(deviceId, policy);
355 } else {
356 removePolicyInDevice(deviceId, policy);
357 }
358 }, deviceId.hashCode());
359 }
360 }
361
362 // Orchestrate policy installation according to the type
363 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100364 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100365 log.warn("Policy {} type {} not yet supported",
366 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100367 return;
368 }
369 PolicyKey policyKey;
370 Operation.Builder operation;
371 if (log.isDebugEnabled()) {
372 log.debug("Installing {} policy {} for dev: {}",
373 policy.policyType(), policy.policyId(), deviceId);
374 }
375 policyKey = new PolicyKey(deviceId, policy.policyId());
376 operation = Operation.builder()
377 .isInstall(true)
378 .policy(policy);
379 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
380 if (policy.policyType() == PolicyType.DROP) {
381 // DROP policies do not need the next objective installation phase
382 // we can update directly the map and signal the ops as done
383 operation.isDone(true);
384 operations.put(policyKey.toString(), operation.build());
385 } else if (policy.policyType() == PolicyType.REDIRECT) {
386 // REDIRECT Uses next objective context to update the ops as done when
387 // it returns successfully. In the other cases leaves the ops as undone
388 // and the relative policy will remain in pending.
389 operations.put(policyKey.toString(), operation.build());
390 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
391 // Handle error here - leave the operation as undone and pending
392 if (builder != null) {
393 CompletableFuture<Objective> future = new CompletableFuture<>();
394 if (log.isDebugEnabled()) {
395 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
396 }
397 ObjectiveContext context = new DefaultObjectiveContext(
398 (objective) -> {
399 if (log.isDebugEnabled()) {
400 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
401 policy.policyId(), deviceId);
402 }
403 future.complete(objective);
404 },
405 (objective, error) -> {
406 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
407 policy.policyId(), error, deviceId);
408 future.complete(null);
409 });
410 // Context is not serializable
411 NextObjective serializableObjective = builder.add();
412 flowObjectiveService.next(deviceId, builder.add(context));
413 future.whenComplete((objective, ex) -> {
414 if (ex != null) {
415 log.error("Exception installing REDIRECT next objective", ex);
416 } else if (objective != null) {
417 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
418 if (!v.isDone() && v.isInstall()) {
419 v.isDone(true);
420 v.objectiveOperation(serializableObjective);
421 }
422 return v;
423 });
424 }
425 });
426 }
pierventre30368ab2021-02-24 23:23:22 +0100427 }
428 }
429
430 // Remove policy in a device according to the type
431 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100432 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
433 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
434 // Policy might be still in pending or not present anymore
435 if (operation == null || operation.objectiveOperation() == null) {
436 log.warn("There are no ops associated with {}", policyKey);
437 operation = Operation.builder()
438 .isDone(true)
439 .isInstall(false)
440 .policy(policy)
441 .build();
442 operations.put(policyKey.toString(), operation);
443 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100444 if (log.isDebugEnabled()) {
445 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
446 }
447 Operation.Builder operationBuilder = Operation.builder()
448 .isInstall(false)
449 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100450 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100451 operationBuilder.isDone(true);
452 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100453 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100454 // REDIRECT has to remove the next objective first
455 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
456 operations.put(policyKey.toString(), operationBuilder.build());
457 NextObjective.Builder builder = oldObj.copy();
458 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100459 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100460 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100461 }
pierventre4f68ffa2021-03-09 22:52:14 +0100462 ObjectiveContext context = new DefaultObjectiveContext(
463 (objective) -> {
464 if (log.isDebugEnabled()) {
465 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
466 policy.policyId(), deviceId);
467 }
468 future.complete(objective);
469 },
470 (objective, error) -> {
471 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
472 policy.policyId(), error, deviceId);
473 future.complete(null);
474 });
475 NextObjective serializableObjective = builder.remove();
476 flowObjectiveService.next(deviceId, builder.remove(context));
477 future.whenComplete((objective, ex) -> {
478 if (ex != null) {
479 log.error("Exception Removing REDIRECT next objective", ex);
480 } else if (objective != null) {
481 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
482 if (!v.isDone() && !v.isInstall()) {
483 v.isDone(true);
484 v.objectiveOperation(serializableObjective);
485 }
486 return v;
487 });
488 }
489 });
pierventre30368ab2021-02-24 23:23:22 +0100490 }
491 }
492 }
493
494 // Updates policy status if all the pending ops are done
495 private void updatePolicy(PolicyId policyId, boolean install) {
496 if (!isLeader(policyId)) {
497 if (log.isDebugEnabled()) {
498 log.debug("Instance is not leader for policy {}", policyId);
499 }
500 return;
501 }
502 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
503 }
504
505 private void updatePolicyInternal(PolicyId policyId, boolean install) {
506 // If there are no more pending ops we are ready to go; potentially we can check
507 // if the id is contained. Updates policies only if they are still present
508 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
509 .filter(entry -> entry.getValue().value().policy().isPresent())
510 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
511 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
512 .findFirst();
513 if (notYetDone.isEmpty()) {
514 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
515 if (v.policyState() == PolicyState.PENDING_ADD && install) {
516 if (log.isDebugEnabled()) {
517 log.debug("Policy {} is ready", policyId);
518 }
519 v.policyState(PolicyState.ADDED);
520 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
521 if (log.isDebugEnabled()) {
522 log.debug("Policy {} is removed", policyId);
523 }
524 v = null;
525 }
526 return v;
527 }));
528 // Greedy check for pending traffic matches
529 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
530 updatePendingTrafficMatches(policyRequest.policyId());
531 }
532 }
533 }
534
535 // Install/remove the traffic match on the edge devices
536 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
537 if (!isLeader(trafficMatch.policyId())) {
538 if (log.isDebugEnabled()) {
539 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
540 }
541 return;
542 }
543 // We know that we are the leader, offloads to the workers the remaining
544 // part: issue fobj installation/removal and update the maps
545 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
546 for (DeviceId deviceId : edgeDeviceIds) {
547 workers.execute(() -> {
548 if (install) {
549 installTrafficMatchToDevice(deviceId, trafficMatch);
550 } else {
551 removeTrafficMatchInDevice(deviceId, trafficMatch);
552 }
553 }, deviceId.hashCode());
554 }
555 }
556
557 // Orchestrate traffic match installation according to the type
558 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
559 if (log.isDebugEnabled()) {
560 log.debug("Installing traffic match {} associated to policy {}",
561 trafficMatch.trafficMatchId(), trafficMatch.policyId());
562 }
pierventre30368ab2021-02-24 23:23:22 +0100563 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
pierventre4f68ffa2021-03-09 22:52:14 +0100564 Operation trafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
565 if (trafficOperation != null && trafficOperation.isInstall()) {
566 if (log.isDebugEnabled()) {
567 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
568 "for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
569 }
570 return;
571 }
pierventre30368ab2021-02-24 23:23:22 +0100572 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
573 // policies require to retrieve the next Id and sets the next step.
574 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
575 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
576 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100577 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
578 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
579 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100580 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
581 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
582 return;
583 }
pierventre4f68ffa2021-03-09 22:52:14 +0100584 // Updates the store and then send the versatile fwd objective to the pipeliner
585 trafficOperation = Operation.builder()
586 .isInstall(true)
587 .trafficMatch(trafficMatch)
588 .build();
589 operations.put(trafficMatchKey.toString(), trafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100590 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100591 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100592 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100593 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100594 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
595 .wipeDeferred()
596 .build();
597 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100598 } else if (policy.policyType() == PolicyType.REDIRECT) {
599
600 // Here we need to set only the next step
601 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100602 }
pierventre4f68ffa2021-03-09 22:52:14 +0100603 // Once, the fwd objective has completed its execution, we update the policiesOps map
604 CompletableFuture<Objective> future = new CompletableFuture<>();
605 if (log.isDebugEnabled()) {
606 log.debug("Installing forwarding objective for dev: {}", deviceId);
607 }
608 ObjectiveContext context = new DefaultObjectiveContext(
609 (objective) -> {
610 if (log.isDebugEnabled()) {
611 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
612 }
613 future.complete(objective);
614 },
615 (objective, error) -> {
616 log.warn("Failed to install forwarding objective for policy {}: {}",
617 trafficMatch.policyId(), error);
618 future.complete(null);
619 });
620 // Context is not serializable
621 ForwardingObjective serializableObjective = builder.add();
622 flowObjectiveService.forward(deviceId, builder.add(context));
623 future.whenComplete((objective, ex) -> {
624 if (ex != null) {
625 log.error("Exception installing forwarding objective", ex);
626 } else if (objective != null) {
627 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
628 if (!v.isDone() && v.isInstall()) {
629 v.isDone(true);
630 v.objectiveOperation(serializableObjective);
631 }
632 return v;
633 });
634 }
635 });
pierventre30368ab2021-02-24 23:23:22 +0100636 }
637
638 // Updates traffic match status if all the pending ops are done
639 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
640 if (!isLeader(trafficMatch.policyId())) {
641 if (log.isDebugEnabled()) {
642 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
643 }
644 return;
645 }
646 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
647 trafficMatch.policyId().hashCode());
648 }
649
650 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
651 // If there are no more pending ops we are ready to go; potentially we can check
652 // if the id is contained. Updates traffic matches only if they are still present
653 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
654 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
655 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
656 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
657 .findFirst();
658 if (notYetDone.isEmpty()) {
659 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
660 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
661 if (log.isDebugEnabled()) {
662 log.debug("Traffic match {} is ready", trafficMatchId);
663 }
664 v.trafficMatchState(TrafficMatchState.ADDED);
665 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
666 if (log.isDebugEnabled()) {
667 log.debug("Traffic match {} is removed", trafficMatchId);
668 }
669 v = null;
670 }
671 return v;
672 });
673 }
674 }
675
676 // Look for any pending traffic match waiting for the policy
677 private void updatePendingTrafficMatches(PolicyId policyId) {
678 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
679 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
680 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
681 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
682 .collect(Collectors.toSet());
683 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
684 sendTrafficMatch(trafficMatch.trafficMatch(), true);
685 }
686 }
687
688 // Traffic match removal in a device
689 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
690 if (log.isDebugEnabled()) {
691 log.debug("Removing traffic match {} associated to policy {}",
692 trafficMatch.trafficMatchId(), trafficMatch.policyId());
693 }
694 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
695 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
696 if (operation == null || operation.objectiveOperation() == null) {
697 log.warn("There are no ops associated with {}", trafficMatchKey);
698 operation = Operation.builder()
699 .isDone(true)
700 .isInstall(false)
701 .trafficMatch(trafficMatch)
702 .build();
703 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100704 } else if (!operation.isInstall()) {
705 if (log.isDebugEnabled()) {
706 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
707 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
708 }
pierventre30368ab2021-02-24 23:23:22 +0100709 } else {
710 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
711 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100712 .isInstall(false)
713 .build();
714 operations.put(trafficMatchKey.toString(), operation);
715 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
716 CompletableFuture<Objective> future = new CompletableFuture<>();
717 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100718 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100719 }
720 ObjectiveContext context = new DefaultObjectiveContext(
721 (objective) -> {
722 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100723 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100724 }
725 future.complete(objective);
726 },
727 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100728 log.warn("Failed to remove forwarding objective for policy {}: {}",
729 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100730 future.complete(null);
731 });
732 ForwardingObjective serializableObjective = builder.remove();
733 flowObjectiveService.forward(deviceId, builder.remove(context));
734 future.whenComplete((objective, ex) -> {
735 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100736 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100737 } else if (objective != null) {
738 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
739 if (!v.isDone() && !v.isInstall()) {
740 v.isDone(true);
741 v.objectiveOperation(serializableObjective);
742 }
743 return v;
744 });
745 }
746 });
747 }
748 }
749
pierventre4f68ffa2021-03-09 22:52:14 +0100750 // It is used when a policy has been removed but there are still traffic matches depending on it
751 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
752 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100753 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
754 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
755 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100756 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100757 }
758
759 // Utility that removes operations related to a policy or to a traffic match.
760 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
761 if (!isLeader(policyId)) {
762 if (log.isDebugEnabled()) {
763 log.debug("Instance is not leader for policy {}", policyId);
764 }
765 return;
766 }
767 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
768 for (DeviceId deviceId : edgeDeviceIds) {
769 workers.execute(() -> {
770 String key;
771 if (trafficMatchId.isPresent()) {
772 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
773 } else {
774 key = new PolicyKey(deviceId, policyId).toString();
775 }
776 operations.remove(key);
777 }, deviceId.hashCode());
778 }
779 }
780
pierventre4f68ffa2021-03-09 22:52:14 +0100781 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
782 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
783 if (policyType == PolicyType.REDIRECT) {
784 metaBuilder.matchMetadata(EDGE_PORT);
785 }
pierventre30368ab2021-02-24 23:23:22 +0100786 return DefaultForwardingObjective.builder()
787 .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
788 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100789 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100790 .fromApp(appId)
791 .withFlag(ForwardingObjective.Flag.VERSATILE)
792 .makePermanent();
793 }
794
pierventre4f68ffa2021-03-09 22:52:14 +0100795 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
796 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
797 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
798 List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
799 egressLinks.stream()
800 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
801 !edgeDevices.contains(link.dst().deviceId()))
802 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
803 // No ports no friend
804 if (egressPortsToEnforce.isEmpty()) {
805 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
806 return null;
807 }
808 // We need to add a treatment for each valid egress port. The treatment
809 // requires to set src and dst mac address and set the egress port. We are
810 // deliberately not providing the metadata to prevent the programming of
811 // some tables which are already controlled by SegmentRouting or are unnecessary
812 int nextId = flowObjectiveService.allocateNextId();
813 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
814 .withId(nextId)
815 .withType(NextObjective.Type.HASHED)
816 .fromApp(appId);
817 MacAddress srcDeviceMac;
818 try {
819 srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
820 } catch (DeviceConfigNotFoundException e) {
821 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
822 return null;
823 }
824 MacAddress neigborDeviceMac;
825 TrafficTreatment.Builder tBuilder;
826 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
827 try {
828 neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
829 } catch (DeviceConfigNotFoundException e) {
830 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
831 return null;
832 }
833 tBuilder = DefaultTrafficTreatment.builder()
834 .setEthSrc(srcDeviceMac)
835 .setEthDst(neigborDeviceMac)
836 .setOutput(entry.getKey().port());
837 builder.addTreatment(tBuilder.build());
838 }
839 return builder;
840 }
841
pierventre30368ab2021-02-24 23:23:22 +0100842 // Each map has an event listener enabling the events distribution across the cluster
843 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
844 @Override
845 public void event(MapEvent<PolicyId, PolicyRequest> event) {
846 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
847 event.oldValue() : event.newValue();
848 PolicyRequest policyRequest = value.value();
849 Policy policy = policyRequest.policy();
850 switch (event.type()) {
851 case INSERT:
852 case UPDATE:
853 switch (policyRequest.policyState()) {
854 case PENDING_ADD:
855 sendPolicy(policy, true);
856 break;
857 case PENDING_REMOVE:
858 sendPolicy(policy, false);
859 break;
860 case ADDED:
861 break;
862 default:
863 log.warn("Unknown policy state type {}", policyRequest.policyState());
864 }
865 break;
866 case REMOVE:
867 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100868 break;
869 default:
870 log.warn("Unknown event type {}", event.type());
871
872 }
873 }
874 }
875
876 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
877 @Override
878 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
879 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
880 event.oldValue() : event.newValue();
881 TrafficMatchRequest trafficMatchRequest = value.value();
882 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
883 switch (event.type()) {
884 case INSERT:
885 case UPDATE:
886 switch (trafficMatchRequest.trafficMatchState()) {
887 case PENDING_ADD:
888 sendTrafficMatch(trafficMatch, true);
889 break;
890 case PENDING_REMOVE:
891 sendTrafficMatch(trafficMatch, false);
892 break;
893 case ADDED:
894 break;
895 default:
896 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
897 }
898 break;
899 case REMOVE:
900 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
901 break;
902 default:
903 log.warn("Unknown event type {}", event.type());
904 }
905 }
906 }
907
908 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
909 @Override
910 public void event(MapEvent<String, Operation> event) {
911 String key = event.key();
912 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
913 event.oldValue() : event.newValue();
914 Operation operation = value.value();
915 switch (event.type()) {
916 case INSERT:
917 case UPDATE:
918 if (operation.isDone()) {
919 if (operation.policy().isPresent()) {
920 PolicyKey policyKey = PolicyKey.fromString(key);
921 updatePolicy(policyKey.policyId(), operation.isInstall());
922 } else if (operation.trafficMatch().isPresent()) {
923 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
924 } else {
925 log.warn("Unknown pending operation");
926 }
927 }
928 break;
929 case REMOVE:
930 break;
931 default:
932 log.warn("Unknown event type {}", event.type());
933 }
934 }
935 }
936
937 // Using the work partition service defines who is in charge of a given policy.
938 private boolean isLeader(PolicyId policyId) {
939 final NodeId currentNodeId = clusterService.getLocalNode().id();
940 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
941 if (leader == null) {
942 log.error("Fail to elect a leader for {}.", policyId);
943 return false;
944 }
945 policyLeaderCache.put(policyId, leader);
946 return currentNodeId.equals(leader);
947 }
948
949 private Long hasher(PolicyId policyId) {
950 return HASH_FN.newHasher()
951 .putUnencodedChars(policyId.toString())
952 .hash()
953 .asLong();
954 }
955
pierventre4f68ffa2021-03-09 22:52:14 +0100956 // TODO Periodic checker, consider to add store and delegates.
957
pierventre30368ab2021-02-24 23:23:22 +0100958 // Check periodically for any issue and try to resolve automatically if possible
959 private final class PolicyChecker implements Runnable {
960 @Override
961 public void run() {
962 }
963 }
964}