blob: 87fa61a80f80d143c8fb2fe10dee84d70c4d183e [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010024import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010025import org.onlab.util.KryoNamespace;
26import org.onlab.util.PredictableExecutor;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080029import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010032import org.onosproject.net.ConnectPoint;
pierventre30368ab2021-02-24 23:23:22 +010033import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010034import org.onosproject.net.Link;
35import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010036import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010037import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.flow.TrafficTreatment;
39import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010040import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flowobjective.DefaultObjectiveContext;
42import org.onosproject.net.flowobjective.FlowObjectiveService;
43import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010044import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010045import org.onosproject.net.flowobjective.Objective;
46import org.onosproject.net.flowobjective.ObjectiveContext;
47import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010048import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010049import org.onosproject.segmentrouting.SegmentRoutingService;
pierventre4f68ffa2021-03-09 22:52:14 +010050import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
pierventre30368ab2021-02-24 23:23:22 +010051import org.onosproject.segmentrouting.policy.api.DropPolicy;
Wailok Shumee90c132021-03-11 21:00:11 +080052import org.onosproject.segmentrouting.policy.api.DropPolicyCodec;
pierventre30368ab2021-02-24 23:23:22 +010053import org.onosproject.segmentrouting.policy.api.Policy;
54import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
55import org.onosproject.segmentrouting.policy.api.PolicyData;
56import org.onosproject.segmentrouting.policy.api.PolicyId;
57import org.onosproject.segmentrouting.policy.api.PolicyService;
58import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010059import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
Wailok Shumee90c132021-03-11 21:00:11 +080060import org.onosproject.segmentrouting.policy.api.RedirectPolicyCodec;
pierventre30368ab2021-02-24 23:23:22 +010061import org.onosproject.segmentrouting.policy.api.TrafficMatch;
Wailok Shumee90c132021-03-11 21:00:11 +080062import org.onosproject.segmentrouting.policy.api.TrafficMatchCodec;
pierventre30368ab2021-02-24 23:23:22 +010063import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
64import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
65import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
66import org.onosproject.store.serializers.KryoNamespaces;
67import org.onosproject.store.service.ConsistentMap;
68import org.onosproject.store.service.MapEvent;
69import org.onosproject.store.service.MapEventListener;
70import org.onosproject.store.service.Serializer;
71import org.onosproject.store.service.StorageException;
72import org.onosproject.store.service.StorageService;
73import org.onosproject.store.service.Versioned;
74import org.osgi.service.component.annotations.Activate;
75import org.osgi.service.component.annotations.Component;
76import org.osgi.service.component.annotations.Deactivate;
77import org.osgi.service.component.annotations.Reference;
78import org.osgi.service.component.annotations.ReferenceCardinality;
79import org.slf4j.Logger;
80
81import java.util.List;
82import java.util.Map;
83import java.util.Optional;
84import java.util.Set;
85import java.util.concurrent.CompletableFuture;
86import java.util.stream.Collectors;
87
88import static org.onlab.util.Tools.groupedThreads;
89import static org.slf4j.LoggerFactory.getLogger;
90
91/**
92 * Implementation of the policy service interface.
93 */
94@Component(immediate = true, service = PolicyService.class)
95public class PolicyManager implements PolicyService {
96
97 // App related things
98 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
99 private ApplicationId appId;
100 private Logger log = getLogger(getClass());
101 static final String KEY_SEPARATOR = "|";
102
pierventre4f68ffa2021-03-09 22:52:14 +0100103 // Supported policies
104 private static final Set<Policy.PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
105 PolicyType.DROP, PolicyType.REDIRECT);
106
107 // Driver should use this meta to match port_is_edge field in the ACL table
108 private static final long EDGE_PORT = 1;
109 private static final long INFRA_PORT = 0;
110
pierventre30368ab2021-02-24 23:23:22 +0100111 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
112 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
113 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100114 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100115 private static final String POLICY_STORE = "sr-policy-store";
116 private ConsistentMap<PolicyId, PolicyRequest> policies;
117 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
118 private Map<PolicyId, PolicyRequest> policiesMap;
119
120 private static final String OPS_STORE = "sr-ops-store";
121 private ConsistentMap<String, Operation> operations;
122 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
123 private Map<String, Operation> opsMap;
124
125 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
126 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
127 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
128 new InternalTMatchMapEventListener();
129 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
130
131 // Leadership related objects - consistent hashing
132 private static final HashFunction HASH_FN = Hashing.md5();
133 // Read only cache of the Policy leader
134 private Map<PolicyId, NodeId> policyLeaderCache;
135
136 // Worker threads for policy and traffic match related ops
137 private static final int DEFAULT_THREADS = 4;
138 protected PredictableExecutor workers;
139
140 // Serializers and ONOS services
141 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
142 .register(KryoNamespaces.API)
143 .register(PolicyId.class)
144 .register(PolicyType.class)
145 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100146 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100147 .register(PolicyState.class)
148 .register(PolicyRequest.class)
149 .register(TrafficMatchId.class)
150 .register(TrafficMatchState.class)
151 .register(TrafficMatch.class)
152 .register(TrafficMatchRequest.class)
153 .register(Operation.class);
154 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
157 private CoreService coreService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800160 private CodecService codecService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100163 private StorageService storageService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
166 private ClusterService clusterService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100169 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100170
Wailok Shumee90c132021-03-11 21:00:11 +0800171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100172 private SegmentRoutingService srService;
pierventre30368ab2021-02-24 23:23:22 +0100173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100175 private FlowObjectiveService flowObjectiveService;
176
177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
178 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100179
180 @Activate
181 public void activate() {
182 appId = coreService.registerApplication(APP_NAME);
183
Wailok Shumee90c132021-03-11 21:00:11 +0800184 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
185 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
186 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
187
pierventre30368ab2021-02-24 23:23:22 +0100188 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
189 .withName(POLICY_STORE)
190 .withSerializer(serializer).build();
191 policies.addListener(mapPolListener);
192 policiesMap = policies.asJavaMap();
193
194 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
195 .withName(TRAFFIC_MATCH_STORE)
196 .withSerializer(serializer).build();
197 trafficMatches.addListener(mapTMatchListener);
198 trafficMatchesMap = trafficMatches.asJavaMap();
199
200 operations = storageService.<String, Operation>consistentMapBuilder()
201 .withName(OPS_STORE)
202 .withSerializer(serializer).build();
203 operations.addListener(mapOpsListener);
204 opsMap = operations.asJavaMap();
205
206 policyLeaderCache = Maps.newConcurrentMap();
207
208 workers = new PredictableExecutor(DEFAULT_THREADS,
209 groupedThreads("sr-policy", "worker-%d", log));
210
211 log.info("Started");
212 }
213
214 @Deactivate
215 public void deactivate() {
216 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800217 codecService.unregisterCodec(DropPolicy.class);
218 codecService.unregisterCodec(RedirectPolicy.class);
219 codecService.unregisterCodec(TrafficMatch.class);
pierventre30368ab2021-02-24 23:23:22 +0100220 policies.removeListener(mapPolListener);
221 policies.destroy();
222 policiesMap.clear();
223 trafficMatches.removeListener(mapTMatchListener);
224 trafficMatches.destroy();
225 trafficMatchesMap.clear();
226 operations.removeListener(mapOpsListener);
227 operations.destroy();
228 operations.clear();
229 workers.shutdown();
230
231 log.info("Stopped");
232 }
233
234 @Override
235 //FIXME update does not work well
236 public PolicyId addOrUpdatePolicy(Policy policy) {
237 PolicyId policyId = policy.policyId();
238 try {
239 policies.put(policyId, new PolicyRequest(policy));
240 } catch (StorageException e) {
241 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
242 e.getMessage(), e);
243 policyId = null;
244 }
245 return policyId;
246 }
247
248 @Override
249 public boolean removePolicy(PolicyId policyId) {
250 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100251 if (dependingTrafficMatches(policyId).isPresent()) {
252 if (log.isDebugEnabled()) {
253 log.debug("Found depending traffic matches");
254 }
255 return false;
256 }
pierventre30368ab2021-02-24 23:23:22 +0100257 try {
258 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
259 if (v.policyState() != PolicyState.PENDING_REMOVE) {
260 v.policyState(PolicyState.PENDING_REMOVE);
261 }
262 return v;
263 })) != null;
264 } catch (StorageException e) {
265 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
266 e.getMessage(), e);
267 result = false;
268 }
269 return result;
270 }
271
272 @Override
273 public Set<PolicyData> policies(Set<PolicyType> filter) {
274 Set<PolicyData> policyData = Sets.newHashSet();
275 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
276 Set<PolicyRequest> policyRequests;
277 if (filter.isEmpty()) {
278 policyRequests = ImmutableSet.copyOf(policiesMap.values());
279 } else {
280 policyRequests = policiesMap.values().stream()
281 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
282 .collect(Collectors.toSet());
283 }
284 PolicyKey policyKey;
285 List<String> ops;
286 for (PolicyRequest policyRequest : policyRequests) {
287 ops = Lists.newArrayList();
288 for (DeviceId deviceId : edgeDeviceIds) {
289 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
290 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
291 if (operation != null) {
292 ops.add(deviceId + " -> " + operation.toStringMinimal());
293 }
294 }
295 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
296 }
297 return policyData;
298 }
299
300 @Override
301 //FIXME update does not work well
302 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
303 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
304 try {
305 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
306 } catch (StorageException e) {
307 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
308 e.getMessage(), e);
309 trafficMatchId = null;
310 }
311 return trafficMatchId;
312 }
313
314 @Override
315 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
316 boolean result;
317 try {
318 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
319 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
320 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
321 }
322 return v;
323 })) != null;
324 } catch (StorageException e) {
325 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
326 e.getMessage(), e);
327 result = false;
328 }
329 return result;
330 }
331
332 @Override
333 public Set<TrafficMatchData> trafficMatches() {
334 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
335 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
336 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
337 TrafficMatchKey trafficMatchKey;
338 List<String> ops;
339 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
340 ops = Lists.newArrayList();
341 for (DeviceId deviceId : edgeDeviceIds) {
342 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
343 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
344 if (operation != null) {
345 ops.add(deviceId + " -> " + operation.toStringMinimal());
346 }
347 }
348 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
349 trafficMatchRequest.trafficMatch(), ops));
350 }
351 return trafficMatchData;
352 }
353
354 // Install/remove the policies on the edge devices
355 private void sendPolicy(Policy policy, boolean install) {
356 if (!isLeader(policy.policyId())) {
357 if (log.isDebugEnabled()) {
358 log.debug("Instance is not leader for policy {}", policy.policyId());
359 }
360 return;
361 }
362 // We know that we are the leader, offloads to the workers the remaining
363 // part: issue fobj installation/removal and update the maps
364 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
365 for (DeviceId deviceId : edgeDeviceIds) {
366 workers.execute(() -> {
367 if (install) {
368 installPolicyInDevice(deviceId, policy);
369 } else {
370 removePolicyInDevice(deviceId, policy);
371 }
372 }, deviceId.hashCode());
373 }
374 }
375
376 // Orchestrate policy installation according to the type
377 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100378 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100379 log.warn("Policy {} type {} not yet supported",
380 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100381 return;
382 }
383 PolicyKey policyKey;
384 Operation.Builder operation;
385 if (log.isDebugEnabled()) {
386 log.debug("Installing {} policy {} for dev: {}",
387 policy.policyType(), policy.policyId(), deviceId);
388 }
389 policyKey = new PolicyKey(deviceId, policy.policyId());
390 operation = Operation.builder()
391 .isInstall(true)
392 .policy(policy);
393 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
394 if (policy.policyType() == PolicyType.DROP) {
395 // DROP policies do not need the next objective installation phase
396 // we can update directly the map and signal the ops as done
397 operation.isDone(true);
398 operations.put(policyKey.toString(), operation.build());
399 } else if (policy.policyType() == PolicyType.REDIRECT) {
400 // REDIRECT Uses next objective context to update the ops as done when
401 // it returns successfully. In the other cases leaves the ops as undone
402 // and the relative policy will remain in pending.
403 operations.put(policyKey.toString(), operation.build());
404 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
405 // Handle error here - leave the operation as undone and pending
406 if (builder != null) {
407 CompletableFuture<Objective> future = new CompletableFuture<>();
408 if (log.isDebugEnabled()) {
409 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
410 }
411 ObjectiveContext context = new DefaultObjectiveContext(
412 (objective) -> {
413 if (log.isDebugEnabled()) {
414 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
415 policy.policyId(), deviceId);
416 }
417 future.complete(objective);
418 },
419 (objective, error) -> {
420 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
421 policy.policyId(), error, deviceId);
422 future.complete(null);
423 });
424 // Context is not serializable
425 NextObjective serializableObjective = builder.add();
426 flowObjectiveService.next(deviceId, builder.add(context));
427 future.whenComplete((objective, ex) -> {
428 if (ex != null) {
429 log.error("Exception installing REDIRECT next objective", ex);
430 } else if (objective != null) {
431 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
432 if (!v.isDone() && v.isInstall()) {
433 v.isDone(true);
434 v.objectiveOperation(serializableObjective);
435 }
436 return v;
437 });
438 }
439 });
440 }
pierventre30368ab2021-02-24 23:23:22 +0100441 }
442 }
443
444 // Remove policy in a device according to the type
445 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100446 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
447 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
448 // Policy might be still in pending or not present anymore
449 if (operation == null || operation.objectiveOperation() == null) {
450 log.warn("There are no ops associated with {}", policyKey);
451 operation = Operation.builder()
452 .isDone(true)
453 .isInstall(false)
454 .policy(policy)
455 .build();
456 operations.put(policyKey.toString(), operation);
457 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100458 if (log.isDebugEnabled()) {
459 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
460 }
461 Operation.Builder operationBuilder = Operation.builder()
462 .isInstall(false)
463 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100464 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100465 operationBuilder.isDone(true);
466 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100467 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100468 // REDIRECT has to remove the next objective first
469 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
470 operations.put(policyKey.toString(), operationBuilder.build());
471 NextObjective.Builder builder = oldObj.copy();
472 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100473 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100474 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100475 }
pierventre4f68ffa2021-03-09 22:52:14 +0100476 ObjectiveContext context = new DefaultObjectiveContext(
477 (objective) -> {
478 if (log.isDebugEnabled()) {
479 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
480 policy.policyId(), deviceId);
481 }
482 future.complete(objective);
483 },
484 (objective, error) -> {
485 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
486 policy.policyId(), error, deviceId);
487 future.complete(null);
488 });
489 NextObjective serializableObjective = builder.remove();
490 flowObjectiveService.next(deviceId, builder.remove(context));
491 future.whenComplete((objective, ex) -> {
492 if (ex != null) {
493 log.error("Exception Removing REDIRECT next objective", ex);
494 } else if (objective != null) {
495 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
496 if (!v.isDone() && !v.isInstall()) {
497 v.isDone(true);
498 v.objectiveOperation(serializableObjective);
499 }
500 return v;
501 });
502 }
503 });
pierventre30368ab2021-02-24 23:23:22 +0100504 }
505 }
506 }
507
508 // Updates policy status if all the pending ops are done
509 private void updatePolicy(PolicyId policyId, boolean install) {
510 if (!isLeader(policyId)) {
511 if (log.isDebugEnabled()) {
512 log.debug("Instance is not leader for policy {}", policyId);
513 }
514 return;
515 }
516 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
517 }
518
519 private void updatePolicyInternal(PolicyId policyId, boolean install) {
520 // If there are no more pending ops we are ready to go; potentially we can check
521 // if the id is contained. Updates policies only if they are still present
522 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
523 .filter(entry -> entry.getValue().value().policy().isPresent())
524 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
525 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
526 .findFirst();
527 if (notYetDone.isEmpty()) {
528 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
529 if (v.policyState() == PolicyState.PENDING_ADD && install) {
530 if (log.isDebugEnabled()) {
531 log.debug("Policy {} is ready", policyId);
532 }
533 v.policyState(PolicyState.ADDED);
534 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
535 if (log.isDebugEnabled()) {
536 log.debug("Policy {} is removed", policyId);
537 }
538 v = null;
539 }
540 return v;
541 }));
542 // Greedy check for pending traffic matches
543 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
544 updatePendingTrafficMatches(policyRequest.policyId());
545 }
546 }
547 }
548
549 // Install/remove the traffic match on the edge devices
550 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
551 if (!isLeader(trafficMatch.policyId())) {
552 if (log.isDebugEnabled()) {
553 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
554 }
555 return;
556 }
557 // We know that we are the leader, offloads to the workers the remaining
558 // part: issue fobj installation/removal and update the maps
559 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
560 for (DeviceId deviceId : edgeDeviceIds) {
561 workers.execute(() -> {
562 if (install) {
563 installTrafficMatchToDevice(deviceId, trafficMatch);
564 } else {
565 removeTrafficMatchInDevice(deviceId, trafficMatch);
566 }
567 }, deviceId.hashCode());
568 }
569 }
570
571 // Orchestrate traffic match installation according to the type
572 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
573 if (log.isDebugEnabled()) {
574 log.debug("Installing traffic match {} associated to policy {}",
575 trafficMatch.trafficMatchId(), trafficMatch.policyId());
576 }
pierventre30368ab2021-02-24 23:23:22 +0100577 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
pierventre4f68ffa2021-03-09 22:52:14 +0100578 Operation trafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
579 if (trafficOperation != null && trafficOperation.isInstall()) {
580 if (log.isDebugEnabled()) {
581 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
582 "for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
583 }
584 return;
585 }
pierventre30368ab2021-02-24 23:23:22 +0100586 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
587 // policies require to retrieve the next Id and sets the next step.
588 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
589 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
590 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100591 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
592 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
593 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100594 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
595 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
596 return;
597 }
pierventre4f68ffa2021-03-09 22:52:14 +0100598 // Updates the store and then send the versatile fwd objective to the pipeliner
599 trafficOperation = Operation.builder()
600 .isInstall(true)
601 .trafficMatch(trafficMatch)
602 .build();
603 operations.put(trafficMatchKey.toString(), trafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100604 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100605 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100606 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100607 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100608 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
609 .wipeDeferred()
610 .build();
611 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100612 } else if (policy.policyType() == PolicyType.REDIRECT) {
613
614 // Here we need to set only the next step
615 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100616 }
pierventre4f68ffa2021-03-09 22:52:14 +0100617 // Once, the fwd objective has completed its execution, we update the policiesOps map
618 CompletableFuture<Objective> future = new CompletableFuture<>();
619 if (log.isDebugEnabled()) {
620 log.debug("Installing forwarding objective for dev: {}", deviceId);
621 }
622 ObjectiveContext context = new DefaultObjectiveContext(
623 (objective) -> {
624 if (log.isDebugEnabled()) {
625 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
626 }
627 future.complete(objective);
628 },
629 (objective, error) -> {
630 log.warn("Failed to install forwarding objective for policy {}: {}",
631 trafficMatch.policyId(), error);
632 future.complete(null);
633 });
634 // Context is not serializable
635 ForwardingObjective serializableObjective = builder.add();
636 flowObjectiveService.forward(deviceId, builder.add(context));
637 future.whenComplete((objective, ex) -> {
638 if (ex != null) {
639 log.error("Exception installing forwarding objective", ex);
640 } else if (objective != null) {
641 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
642 if (!v.isDone() && v.isInstall()) {
643 v.isDone(true);
644 v.objectiveOperation(serializableObjective);
645 }
646 return v;
647 });
648 }
649 });
pierventre30368ab2021-02-24 23:23:22 +0100650 }
651
652 // Updates traffic match status if all the pending ops are done
653 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
654 if (!isLeader(trafficMatch.policyId())) {
655 if (log.isDebugEnabled()) {
656 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
657 }
658 return;
659 }
660 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
661 trafficMatch.policyId().hashCode());
662 }
663
664 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
665 // If there are no more pending ops we are ready to go; potentially we can check
666 // if the id is contained. Updates traffic matches only if they are still present
667 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
668 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
669 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
670 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
671 .findFirst();
672 if (notYetDone.isEmpty()) {
673 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
674 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
675 if (log.isDebugEnabled()) {
676 log.debug("Traffic match {} is ready", trafficMatchId);
677 }
678 v.trafficMatchState(TrafficMatchState.ADDED);
679 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
680 if (log.isDebugEnabled()) {
681 log.debug("Traffic match {} is removed", trafficMatchId);
682 }
683 v = null;
684 }
685 return v;
686 });
687 }
688 }
689
690 // Look for any pending traffic match waiting for the policy
691 private void updatePendingTrafficMatches(PolicyId policyId) {
692 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
693 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
694 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
695 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
696 .collect(Collectors.toSet());
697 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
698 sendTrafficMatch(trafficMatch.trafficMatch(), true);
699 }
700 }
701
702 // Traffic match removal in a device
703 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
704 if (log.isDebugEnabled()) {
705 log.debug("Removing traffic match {} associated to policy {}",
706 trafficMatch.trafficMatchId(), trafficMatch.policyId());
707 }
708 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
709 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
710 if (operation == null || operation.objectiveOperation() == null) {
711 log.warn("There are no ops associated with {}", trafficMatchKey);
712 operation = Operation.builder()
713 .isDone(true)
714 .isInstall(false)
715 .trafficMatch(trafficMatch)
716 .build();
717 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100718 } else if (!operation.isInstall()) {
719 if (log.isDebugEnabled()) {
720 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
721 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
722 }
pierventre30368ab2021-02-24 23:23:22 +0100723 } else {
724 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
725 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100726 .isInstall(false)
727 .build();
728 operations.put(trafficMatchKey.toString(), operation);
729 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
730 CompletableFuture<Objective> future = new CompletableFuture<>();
731 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100732 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100733 }
734 ObjectiveContext context = new DefaultObjectiveContext(
735 (objective) -> {
736 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100737 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100738 }
739 future.complete(objective);
740 },
741 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100742 log.warn("Failed to remove forwarding objective for policy {}: {}",
743 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100744 future.complete(null);
745 });
746 ForwardingObjective serializableObjective = builder.remove();
747 flowObjectiveService.forward(deviceId, builder.remove(context));
748 future.whenComplete((objective, ex) -> {
749 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100750 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100751 } else if (objective != null) {
752 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
753 if (!v.isDone() && !v.isInstall()) {
754 v.isDone(true);
755 v.objectiveOperation(serializableObjective);
756 }
757 return v;
758 });
759 }
760 });
761 }
762 }
763
pierventre4f68ffa2021-03-09 22:52:14 +0100764 // It is used when a policy has been removed but there are still traffic matches depending on it
765 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
766 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100767 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
768 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
769 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100770 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100771 }
772
773 // Utility that removes operations related to a policy or to a traffic match.
774 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
775 if (!isLeader(policyId)) {
776 if (log.isDebugEnabled()) {
777 log.debug("Instance is not leader for policy {}", policyId);
778 }
779 return;
780 }
781 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
782 for (DeviceId deviceId : edgeDeviceIds) {
783 workers.execute(() -> {
784 String key;
785 if (trafficMatchId.isPresent()) {
786 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
787 } else {
788 key = new PolicyKey(deviceId, policyId).toString();
789 }
790 operations.remove(key);
791 }, deviceId.hashCode());
792 }
793 }
794
pierventre4f68ffa2021-03-09 22:52:14 +0100795 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
796 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
797 if (policyType == PolicyType.REDIRECT) {
798 metaBuilder.matchMetadata(EDGE_PORT);
799 }
pierventre30368ab2021-02-24 23:23:22 +0100800 return DefaultForwardingObjective.builder()
801 .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
802 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100803 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100804 .fromApp(appId)
805 .withFlag(ForwardingObjective.Flag.VERSATILE)
806 .makePermanent();
807 }
808
pierventre4f68ffa2021-03-09 22:52:14 +0100809 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
810 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
811 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
812 List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
813 egressLinks.stream()
814 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
815 !edgeDevices.contains(link.dst().deviceId()))
816 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
817 // No ports no friend
818 if (egressPortsToEnforce.isEmpty()) {
819 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
820 return null;
821 }
822 // We need to add a treatment for each valid egress port. The treatment
823 // requires to set src and dst mac address and set the egress port. We are
824 // deliberately not providing the metadata to prevent the programming of
825 // some tables which are already controlled by SegmentRouting or are unnecessary
826 int nextId = flowObjectiveService.allocateNextId();
827 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
828 .withId(nextId)
829 .withType(NextObjective.Type.HASHED)
830 .fromApp(appId);
831 MacAddress srcDeviceMac;
832 try {
833 srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
834 } catch (DeviceConfigNotFoundException e) {
835 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
836 return null;
837 }
838 MacAddress neigborDeviceMac;
839 TrafficTreatment.Builder tBuilder;
840 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
841 try {
842 neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
843 } catch (DeviceConfigNotFoundException e) {
844 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
845 return null;
846 }
847 tBuilder = DefaultTrafficTreatment.builder()
848 .setEthSrc(srcDeviceMac)
849 .setEthDst(neigborDeviceMac)
850 .setOutput(entry.getKey().port());
851 builder.addTreatment(tBuilder.build());
852 }
853 return builder;
854 }
855
pierventre30368ab2021-02-24 23:23:22 +0100856 // Each map has an event listener enabling the events distribution across the cluster
857 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
858 @Override
859 public void event(MapEvent<PolicyId, PolicyRequest> event) {
860 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
861 event.oldValue() : event.newValue();
862 PolicyRequest policyRequest = value.value();
863 Policy policy = policyRequest.policy();
864 switch (event.type()) {
865 case INSERT:
866 case UPDATE:
867 switch (policyRequest.policyState()) {
868 case PENDING_ADD:
869 sendPolicy(policy, true);
870 break;
871 case PENDING_REMOVE:
872 sendPolicy(policy, false);
873 break;
874 case ADDED:
875 break;
876 default:
877 log.warn("Unknown policy state type {}", policyRequest.policyState());
878 }
879 break;
880 case REMOVE:
881 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100882 break;
883 default:
884 log.warn("Unknown event type {}", event.type());
885
886 }
887 }
888 }
889
890 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
891 @Override
892 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
893 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
894 event.oldValue() : event.newValue();
895 TrafficMatchRequest trafficMatchRequest = value.value();
896 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
897 switch (event.type()) {
898 case INSERT:
899 case UPDATE:
900 switch (trafficMatchRequest.trafficMatchState()) {
901 case PENDING_ADD:
902 sendTrafficMatch(trafficMatch, true);
903 break;
904 case PENDING_REMOVE:
905 sendTrafficMatch(trafficMatch, false);
906 break;
907 case ADDED:
908 break;
909 default:
910 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
911 }
912 break;
913 case REMOVE:
914 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
915 break;
916 default:
917 log.warn("Unknown event type {}", event.type());
918 }
919 }
920 }
921
922 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
923 @Override
924 public void event(MapEvent<String, Operation> event) {
925 String key = event.key();
926 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
927 event.oldValue() : event.newValue();
928 Operation operation = value.value();
929 switch (event.type()) {
930 case INSERT:
931 case UPDATE:
932 if (operation.isDone()) {
933 if (operation.policy().isPresent()) {
934 PolicyKey policyKey = PolicyKey.fromString(key);
935 updatePolicy(policyKey.policyId(), operation.isInstall());
936 } else if (operation.trafficMatch().isPresent()) {
937 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
938 } else {
939 log.warn("Unknown pending operation");
940 }
941 }
942 break;
943 case REMOVE:
944 break;
945 default:
946 log.warn("Unknown event type {}", event.type());
947 }
948 }
949 }
950
951 // Using the work partition service defines who is in charge of a given policy.
952 private boolean isLeader(PolicyId policyId) {
953 final NodeId currentNodeId = clusterService.getLocalNode().id();
954 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
955 if (leader == null) {
956 log.error("Fail to elect a leader for {}.", policyId);
957 return false;
958 }
959 policyLeaderCache.put(policyId, leader);
960 return currentNodeId.equals(leader);
961 }
962
963 private Long hasher(PolicyId policyId) {
964 return HASH_FN.newHasher()
965 .putUnencodedChars(policyId.toString())
966 .hash()
967 .asLong();
968 }
969
pierventre4f68ffa2021-03-09 22:52:14 +0100970 // TODO Periodic checker, consider to add store and delegates.
971
pierventre30368ab2021-02-24 23:23:22 +0100972 // Check periodically for any issue and try to resolve automatically if possible
973 private final class PolicyChecker implements Runnable {
974 @Override
975 public void run() {
976 }
977 }
978}