blob: eaf67117dfae15385a938d40968c06b516048220 [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010024import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010025import org.onlab.util.KryoNamespace;
26import org.onlab.util.PredictableExecutor;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080029import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010032import org.onosproject.net.ConnectPoint;
pierventre30368ab2021-02-24 23:23:22 +010033import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010034import org.onosproject.net.Link;
35import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010036import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010037import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.flow.TrafficTreatment;
39import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010040import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flowobjective.DefaultObjectiveContext;
42import org.onosproject.net.flowobjective.FlowObjectiveService;
43import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010044import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010045import org.onosproject.net.flowobjective.Objective;
46import org.onosproject.net.flowobjective.ObjectiveContext;
47import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010048import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010049import org.onosproject.segmentrouting.SegmentRoutingService;
50import org.onosproject.segmentrouting.policy.api.DropPolicy;
51import org.onosproject.segmentrouting.policy.api.Policy;
pierventre30368ab2021-02-24 23:23:22 +010052import org.onosproject.segmentrouting.policy.api.PolicyData;
53import org.onosproject.segmentrouting.policy.api.PolicyId;
54import org.onosproject.segmentrouting.policy.api.PolicyService;
55import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010056import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010057import org.onosproject.segmentrouting.policy.api.TrafficMatch;
58import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
59import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
60import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
pierventreb37a11a2021-03-18 16:50:04 +010061import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
62import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
pierventre30368ab2021-02-24 23:23:22 +010063import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.ConsistentMap;
65import org.onosproject.store.service.MapEvent;
66import org.onosproject.store.service.MapEventListener;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageException;
69import org.onosproject.store.service.StorageService;
70import org.onosproject.store.service.Versioned;
71import org.osgi.service.component.annotations.Activate;
72import org.osgi.service.component.annotations.Component;
73import org.osgi.service.component.annotations.Deactivate;
74import org.osgi.service.component.annotations.Reference;
75import org.osgi.service.component.annotations.ReferenceCardinality;
76import org.slf4j.Logger;
77
78import java.util.List;
79import java.util.Map;
80import java.util.Optional;
81import java.util.Set;
82import java.util.concurrent.CompletableFuture;
83import java.util.stream.Collectors;
84
85import static org.onlab.util.Tools.groupedThreads;
86import static org.slf4j.LoggerFactory.getLogger;
87
88/**
89 * Implementation of the policy service interface.
90 */
91@Component(immediate = true, service = PolicyService.class)
92public class PolicyManager implements PolicyService {
93
94 // App related things
95 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
96 private ApplicationId appId;
97 private Logger log = getLogger(getClass());
98 static final String KEY_SEPARATOR = "|";
99
pierventre4f68ffa2021-03-09 22:52:14 +0100100 // Supported policies
pierventreb37a11a2021-03-18 16:50:04 +0100101 private static final Set<PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
pierventre4f68ffa2021-03-09 22:52:14 +0100102 PolicyType.DROP, PolicyType.REDIRECT);
103
pierventre12aaa052021-03-22 12:56:03 +0100104 // Driver should use this meta to match ig_port_type field in the ACL table
pierventre4f68ffa2021-03-09 22:52:14 +0100105 private static final long EDGE_PORT = 1;
pierventre4f68ffa2021-03-09 22:52:14 +0100106
pierventre30368ab2021-02-24 23:23:22 +0100107 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
108 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
109 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100110 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100111 private static final String POLICY_STORE = "sr-policy-store";
112 private ConsistentMap<PolicyId, PolicyRequest> policies;
113 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
114 private Map<PolicyId, PolicyRequest> policiesMap;
115
116 private static final String OPS_STORE = "sr-ops-store";
117 private ConsistentMap<String, Operation> operations;
118 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
119 private Map<String, Operation> opsMap;
120
121 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
122 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
123 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
124 new InternalTMatchMapEventListener();
125 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
126
127 // Leadership related objects - consistent hashing
128 private static final HashFunction HASH_FN = Hashing.md5();
129 // Read only cache of the Policy leader
130 private Map<PolicyId, NodeId> policyLeaderCache;
131
132 // Worker threads for policy and traffic match related ops
133 private static final int DEFAULT_THREADS = 4;
134 protected PredictableExecutor workers;
135
136 // Serializers and ONOS services
137 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
138 .register(KryoNamespaces.API)
139 .register(PolicyId.class)
140 .register(PolicyType.class)
141 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100142 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100143 .register(PolicyState.class)
144 .register(PolicyRequest.class)
145 .register(TrafficMatchId.class)
146 .register(TrafficMatchState.class)
147 .register(TrafficMatch.class)
148 .register(TrafficMatchRequest.class)
149 .register(Operation.class);
150 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
153 private CoreService coreService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800156 private CodecService codecService;
157
158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100159 private StorageService storageService;
160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 private ClusterService clusterService;
163
164 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100165 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100166
Wailok Shumee90c132021-03-11 21:00:11 +0800167 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100168 private SegmentRoutingService srService;
pierventre30368ab2021-02-24 23:23:22 +0100169
170 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100171 private FlowObjectiveService flowObjectiveService;
172
173 @Reference(cardinality = ReferenceCardinality.MANDATORY)
174 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100175
176 @Activate
177 public void activate() {
178 appId = coreService.registerApplication(APP_NAME);
179
Wailok Shumee90c132021-03-11 21:00:11 +0800180 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
181 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
182 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
183
pierventre30368ab2021-02-24 23:23:22 +0100184 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
185 .withName(POLICY_STORE)
186 .withSerializer(serializer).build();
187 policies.addListener(mapPolListener);
188 policiesMap = policies.asJavaMap();
189
190 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
191 .withName(TRAFFIC_MATCH_STORE)
192 .withSerializer(serializer).build();
193 trafficMatches.addListener(mapTMatchListener);
194 trafficMatchesMap = trafficMatches.asJavaMap();
195
196 operations = storageService.<String, Operation>consistentMapBuilder()
197 .withName(OPS_STORE)
198 .withSerializer(serializer).build();
199 operations.addListener(mapOpsListener);
200 opsMap = operations.asJavaMap();
201
202 policyLeaderCache = Maps.newConcurrentMap();
203
204 workers = new PredictableExecutor(DEFAULT_THREADS,
205 groupedThreads("sr-policy", "worker-%d", log));
206
207 log.info("Started");
208 }
209
210 @Deactivate
211 public void deactivate() {
212 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800213 codecService.unregisterCodec(DropPolicy.class);
214 codecService.unregisterCodec(RedirectPolicy.class);
215 codecService.unregisterCodec(TrafficMatch.class);
pierventre30368ab2021-02-24 23:23:22 +0100216 policies.removeListener(mapPolListener);
217 policies.destroy();
218 policiesMap.clear();
219 trafficMatches.removeListener(mapTMatchListener);
220 trafficMatches.destroy();
221 trafficMatchesMap.clear();
222 operations.removeListener(mapOpsListener);
223 operations.destroy();
224 operations.clear();
225 workers.shutdown();
226
227 log.info("Stopped");
228 }
229
230 @Override
231 //FIXME update does not work well
232 public PolicyId addOrUpdatePolicy(Policy policy) {
233 PolicyId policyId = policy.policyId();
234 try {
235 policies.put(policyId, new PolicyRequest(policy));
236 } catch (StorageException e) {
237 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
238 e.getMessage(), e);
239 policyId = null;
240 }
241 return policyId;
242 }
243
244 @Override
245 public boolean removePolicy(PolicyId policyId) {
246 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100247 if (dependingTrafficMatches(policyId).isPresent()) {
248 if (log.isDebugEnabled()) {
249 log.debug("Found depending traffic matches");
250 }
251 return false;
252 }
pierventre30368ab2021-02-24 23:23:22 +0100253 try {
254 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
255 if (v.policyState() != PolicyState.PENDING_REMOVE) {
256 v.policyState(PolicyState.PENDING_REMOVE);
257 }
258 return v;
259 })) != null;
260 } catch (StorageException e) {
261 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
262 e.getMessage(), e);
263 result = false;
264 }
265 return result;
266 }
267
268 @Override
269 public Set<PolicyData> policies(Set<PolicyType> filter) {
270 Set<PolicyData> policyData = Sets.newHashSet();
271 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
272 Set<PolicyRequest> policyRequests;
273 if (filter.isEmpty()) {
274 policyRequests = ImmutableSet.copyOf(policiesMap.values());
275 } else {
276 policyRequests = policiesMap.values().stream()
277 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
278 .collect(Collectors.toSet());
279 }
280 PolicyKey policyKey;
281 List<String> ops;
282 for (PolicyRequest policyRequest : policyRequests) {
283 ops = Lists.newArrayList();
284 for (DeviceId deviceId : edgeDeviceIds) {
285 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
286 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
287 if (operation != null) {
288 ops.add(deviceId + " -> " + operation.toStringMinimal());
289 }
290 }
291 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
292 }
293 return policyData;
294 }
295
296 @Override
297 //FIXME update does not work well
298 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
299 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
300 try {
301 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
302 } catch (StorageException e) {
303 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
304 e.getMessage(), e);
305 trafficMatchId = null;
306 }
307 return trafficMatchId;
308 }
309
310 @Override
311 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
312 boolean result;
313 try {
314 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
315 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
316 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
317 }
318 return v;
319 })) != null;
320 } catch (StorageException e) {
321 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
322 e.getMessage(), e);
323 result = false;
324 }
325 return result;
326 }
327
328 @Override
329 public Set<TrafficMatchData> trafficMatches() {
330 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
331 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
332 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
333 TrafficMatchKey trafficMatchKey;
334 List<String> ops;
335 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
336 ops = Lists.newArrayList();
337 for (DeviceId deviceId : edgeDeviceIds) {
338 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
339 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
340 if (operation != null) {
341 ops.add(deviceId + " -> " + operation.toStringMinimal());
342 }
343 }
344 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
345 trafficMatchRequest.trafficMatch(), ops));
346 }
347 return trafficMatchData;
348 }
349
350 // Install/remove the policies on the edge devices
351 private void sendPolicy(Policy policy, boolean install) {
352 if (!isLeader(policy.policyId())) {
353 if (log.isDebugEnabled()) {
354 log.debug("Instance is not leader for policy {}", policy.policyId());
355 }
356 return;
357 }
358 // We know that we are the leader, offloads to the workers the remaining
359 // part: issue fobj installation/removal and update the maps
360 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
361 for (DeviceId deviceId : edgeDeviceIds) {
362 workers.execute(() -> {
363 if (install) {
364 installPolicyInDevice(deviceId, policy);
365 } else {
366 removePolicyInDevice(deviceId, policy);
367 }
368 }, deviceId.hashCode());
369 }
370 }
371
372 // Orchestrate policy installation according to the type
373 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100374 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100375 log.warn("Policy {} type {} not yet supported",
376 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100377 return;
378 }
379 PolicyKey policyKey;
380 Operation.Builder operation;
381 if (log.isDebugEnabled()) {
382 log.debug("Installing {} policy {} for dev: {}",
383 policy.policyType(), policy.policyId(), deviceId);
384 }
385 policyKey = new PolicyKey(deviceId, policy.policyId());
386 operation = Operation.builder()
387 .isInstall(true)
388 .policy(policy);
389 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
390 if (policy.policyType() == PolicyType.DROP) {
391 // DROP policies do not need the next objective installation phase
392 // we can update directly the map and signal the ops as done
393 operation.isDone(true);
394 operations.put(policyKey.toString(), operation.build());
395 } else if (policy.policyType() == PolicyType.REDIRECT) {
396 // REDIRECT Uses next objective context to update the ops as done when
397 // it returns successfully. In the other cases leaves the ops as undone
398 // and the relative policy will remain in pending.
399 operations.put(policyKey.toString(), operation.build());
400 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
401 // Handle error here - leave the operation as undone and pending
402 if (builder != null) {
403 CompletableFuture<Objective> future = new CompletableFuture<>();
404 if (log.isDebugEnabled()) {
405 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
406 }
407 ObjectiveContext context = new DefaultObjectiveContext(
408 (objective) -> {
409 if (log.isDebugEnabled()) {
410 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
411 policy.policyId(), deviceId);
412 }
413 future.complete(objective);
414 },
415 (objective, error) -> {
416 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
417 policy.policyId(), error, deviceId);
418 future.complete(null);
419 });
420 // Context is not serializable
421 NextObjective serializableObjective = builder.add();
422 flowObjectiveService.next(deviceId, builder.add(context));
423 future.whenComplete((objective, ex) -> {
424 if (ex != null) {
425 log.error("Exception installing REDIRECT next objective", ex);
426 } else if (objective != null) {
427 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
428 if (!v.isDone() && v.isInstall()) {
429 v.isDone(true);
430 v.objectiveOperation(serializableObjective);
431 }
432 return v;
433 });
434 }
435 });
436 }
pierventre30368ab2021-02-24 23:23:22 +0100437 }
438 }
439
440 // Remove policy in a device according to the type
441 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100442 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
443 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
444 // Policy might be still in pending or not present anymore
445 if (operation == null || operation.objectiveOperation() == null) {
446 log.warn("There are no ops associated with {}", policyKey);
447 operation = Operation.builder()
448 .isDone(true)
449 .isInstall(false)
450 .policy(policy)
451 .build();
452 operations.put(policyKey.toString(), operation);
453 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100454 if (log.isDebugEnabled()) {
455 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
456 }
457 Operation.Builder operationBuilder = Operation.builder()
458 .isInstall(false)
459 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100460 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100461 operationBuilder.isDone(true);
462 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100463 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100464 // REDIRECT has to remove the next objective first
465 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
466 operations.put(policyKey.toString(), operationBuilder.build());
467 NextObjective.Builder builder = oldObj.copy();
468 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100469 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100470 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100471 }
pierventre4f68ffa2021-03-09 22:52:14 +0100472 ObjectiveContext context = new DefaultObjectiveContext(
473 (objective) -> {
474 if (log.isDebugEnabled()) {
475 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
476 policy.policyId(), deviceId);
477 }
478 future.complete(objective);
479 },
480 (objective, error) -> {
481 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
482 policy.policyId(), error, deviceId);
483 future.complete(null);
484 });
485 NextObjective serializableObjective = builder.remove();
486 flowObjectiveService.next(deviceId, builder.remove(context));
487 future.whenComplete((objective, ex) -> {
488 if (ex != null) {
489 log.error("Exception Removing REDIRECT next objective", ex);
490 } else if (objective != null) {
491 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
492 if (!v.isDone() && !v.isInstall()) {
493 v.isDone(true);
494 v.objectiveOperation(serializableObjective);
495 }
496 return v;
497 });
498 }
499 });
pierventre30368ab2021-02-24 23:23:22 +0100500 }
501 }
502 }
503
504 // Updates policy status if all the pending ops are done
505 private void updatePolicy(PolicyId policyId, boolean install) {
506 if (!isLeader(policyId)) {
507 if (log.isDebugEnabled()) {
508 log.debug("Instance is not leader for policy {}", policyId);
509 }
510 return;
511 }
512 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
513 }
514
515 private void updatePolicyInternal(PolicyId policyId, boolean install) {
516 // If there are no more pending ops we are ready to go; potentially we can check
517 // if the id is contained. Updates policies only if they are still present
518 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
519 .filter(entry -> entry.getValue().value().policy().isPresent())
520 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
521 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
522 .findFirst();
523 if (notYetDone.isEmpty()) {
524 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
525 if (v.policyState() == PolicyState.PENDING_ADD && install) {
526 if (log.isDebugEnabled()) {
527 log.debug("Policy {} is ready", policyId);
528 }
529 v.policyState(PolicyState.ADDED);
530 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
531 if (log.isDebugEnabled()) {
532 log.debug("Policy {} is removed", policyId);
533 }
534 v = null;
535 }
536 return v;
537 }));
538 // Greedy check for pending traffic matches
539 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
540 updatePendingTrafficMatches(policyRequest.policyId());
541 }
542 }
543 }
544
545 // Install/remove the traffic match on the edge devices
546 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
547 if (!isLeader(trafficMatch.policyId())) {
548 if (log.isDebugEnabled()) {
549 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
550 }
551 return;
552 }
553 // We know that we are the leader, offloads to the workers the remaining
554 // part: issue fobj installation/removal and update the maps
555 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
556 for (DeviceId deviceId : edgeDeviceIds) {
557 workers.execute(() -> {
558 if (install) {
559 installTrafficMatchToDevice(deviceId, trafficMatch);
560 } else {
561 removeTrafficMatchInDevice(deviceId, trafficMatch);
562 }
563 }, deviceId.hashCode());
564 }
565 }
566
567 // Orchestrate traffic match installation according to the type
568 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
569 if (log.isDebugEnabled()) {
570 log.debug("Installing traffic match {} associated to policy {}",
571 trafficMatch.trafficMatchId(), trafficMatch.policyId());
572 }
pierventre30368ab2021-02-24 23:23:22 +0100573 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
pierventre4f68ffa2021-03-09 22:52:14 +0100574 Operation trafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
575 if (trafficOperation != null && trafficOperation.isInstall()) {
576 if (log.isDebugEnabled()) {
577 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
578 "for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
579 }
580 return;
581 }
pierventre30368ab2021-02-24 23:23:22 +0100582 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
583 // policies require to retrieve the next Id and sets the next step.
584 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
585 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
586 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100587 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
588 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
589 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100590 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
591 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
592 return;
593 }
pierventre4f68ffa2021-03-09 22:52:14 +0100594 // Updates the store and then send the versatile fwd objective to the pipeliner
595 trafficOperation = Operation.builder()
596 .isInstall(true)
597 .trafficMatch(trafficMatch)
598 .build();
599 operations.put(trafficMatchKey.toString(), trafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100600 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100601 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100602 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100603 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100604 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
605 .wipeDeferred()
606 .build();
607 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100608 } else if (policy.policyType() == PolicyType.REDIRECT) {
609
610 // Here we need to set only the next step
611 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100612 }
pierventre4f68ffa2021-03-09 22:52:14 +0100613 // Once, the fwd objective has completed its execution, we update the policiesOps map
614 CompletableFuture<Objective> future = new CompletableFuture<>();
615 if (log.isDebugEnabled()) {
616 log.debug("Installing forwarding objective for dev: {}", deviceId);
617 }
618 ObjectiveContext context = new DefaultObjectiveContext(
619 (objective) -> {
620 if (log.isDebugEnabled()) {
621 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
622 }
623 future.complete(objective);
624 },
625 (objective, error) -> {
626 log.warn("Failed to install forwarding objective for policy {}: {}",
627 trafficMatch.policyId(), error);
628 future.complete(null);
629 });
630 // Context is not serializable
631 ForwardingObjective serializableObjective = builder.add();
632 flowObjectiveService.forward(deviceId, builder.add(context));
633 future.whenComplete((objective, ex) -> {
634 if (ex != null) {
635 log.error("Exception installing forwarding objective", ex);
636 } else if (objective != null) {
637 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
638 if (!v.isDone() && v.isInstall()) {
639 v.isDone(true);
640 v.objectiveOperation(serializableObjective);
641 }
642 return v;
643 });
644 }
645 });
pierventre30368ab2021-02-24 23:23:22 +0100646 }
647
648 // Updates traffic match status if all the pending ops are done
649 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
650 if (!isLeader(trafficMatch.policyId())) {
651 if (log.isDebugEnabled()) {
652 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
653 }
654 return;
655 }
656 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
657 trafficMatch.policyId().hashCode());
658 }
659
660 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
661 // If there are no more pending ops we are ready to go; potentially we can check
662 // if the id is contained. Updates traffic matches only if they are still present
663 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
664 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
665 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
666 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
667 .findFirst();
668 if (notYetDone.isEmpty()) {
669 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
670 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
671 if (log.isDebugEnabled()) {
672 log.debug("Traffic match {} is ready", trafficMatchId);
673 }
674 v.trafficMatchState(TrafficMatchState.ADDED);
675 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
676 if (log.isDebugEnabled()) {
677 log.debug("Traffic match {} is removed", trafficMatchId);
678 }
679 v = null;
680 }
681 return v;
682 });
683 }
684 }
685
686 // Look for any pending traffic match waiting for the policy
687 private void updatePendingTrafficMatches(PolicyId policyId) {
688 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
689 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
690 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
691 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
692 .collect(Collectors.toSet());
693 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
694 sendTrafficMatch(trafficMatch.trafficMatch(), true);
695 }
696 }
697
698 // Traffic match removal in a device
699 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
700 if (log.isDebugEnabled()) {
701 log.debug("Removing traffic match {} associated to policy {}",
702 trafficMatch.trafficMatchId(), trafficMatch.policyId());
703 }
704 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
705 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
706 if (operation == null || operation.objectiveOperation() == null) {
707 log.warn("There are no ops associated with {}", trafficMatchKey);
708 operation = Operation.builder()
709 .isDone(true)
710 .isInstall(false)
711 .trafficMatch(trafficMatch)
712 .build();
713 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100714 } else if (!operation.isInstall()) {
715 if (log.isDebugEnabled()) {
716 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
717 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
718 }
pierventre30368ab2021-02-24 23:23:22 +0100719 } else {
720 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
721 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100722 .isInstall(false)
723 .build();
724 operations.put(trafficMatchKey.toString(), operation);
725 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
726 CompletableFuture<Objective> future = new CompletableFuture<>();
727 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100728 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100729 }
730 ObjectiveContext context = new DefaultObjectiveContext(
731 (objective) -> {
732 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100733 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100734 }
735 future.complete(objective);
736 },
737 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100738 log.warn("Failed to remove forwarding objective for policy {}: {}",
739 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100740 future.complete(null);
741 });
742 ForwardingObjective serializableObjective = builder.remove();
743 flowObjectiveService.forward(deviceId, builder.remove(context));
744 future.whenComplete((objective, ex) -> {
745 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100746 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100747 } else if (objective != null) {
748 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
749 if (!v.isDone() && !v.isInstall()) {
750 v.isDone(true);
751 v.objectiveOperation(serializableObjective);
752 }
753 return v;
754 });
755 }
756 });
757 }
758 }
759
pierventre4f68ffa2021-03-09 22:52:14 +0100760 // It is used when a policy has been removed but there are still traffic matches depending on it
761 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
762 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100763 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
764 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
765 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100766 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100767 }
768
769 // Utility that removes operations related to a policy or to a traffic match.
770 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
771 if (!isLeader(policyId)) {
772 if (log.isDebugEnabled()) {
773 log.debug("Instance is not leader for policy {}", policyId);
774 }
775 return;
776 }
777 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
778 for (DeviceId deviceId : edgeDeviceIds) {
779 workers.execute(() -> {
780 String key;
781 if (trafficMatchId.isPresent()) {
782 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
783 } else {
784 key = new PolicyKey(deviceId, policyId).toString();
785 }
786 operations.remove(key);
787 }, deviceId.hashCode());
788 }
789 }
790
pierventre4f68ffa2021-03-09 22:52:14 +0100791 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
792 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
793 if (policyType == PolicyType.REDIRECT) {
794 metaBuilder.matchMetadata(EDGE_PORT);
795 }
pierventre30368ab2021-02-24 23:23:22 +0100796 return DefaultForwardingObjective.builder()
797 .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
798 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100799 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100800 .fromApp(appId)
801 .withFlag(ForwardingObjective.Flag.VERSATILE)
802 .makePermanent();
803 }
804
pierventre4f68ffa2021-03-09 22:52:14 +0100805 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
806 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
807 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
808 List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
809 egressLinks.stream()
810 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
811 !edgeDevices.contains(link.dst().deviceId()))
812 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
813 // No ports no friend
814 if (egressPortsToEnforce.isEmpty()) {
815 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
816 return null;
817 }
818 // We need to add a treatment for each valid egress port. The treatment
819 // requires to set src and dst mac address and set the egress port. We are
820 // deliberately not providing the metadata to prevent the programming of
821 // some tables which are already controlled by SegmentRouting or are unnecessary
822 int nextId = flowObjectiveService.allocateNextId();
823 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
824 .withId(nextId)
825 .withType(NextObjective.Type.HASHED)
826 .fromApp(appId);
827 MacAddress srcDeviceMac;
828 try {
829 srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
830 } catch (DeviceConfigNotFoundException e) {
831 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
832 return null;
833 }
834 MacAddress neigborDeviceMac;
835 TrafficTreatment.Builder tBuilder;
836 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
837 try {
838 neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
839 } catch (DeviceConfigNotFoundException e) {
840 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
841 return null;
842 }
843 tBuilder = DefaultTrafficTreatment.builder()
844 .setEthSrc(srcDeviceMac)
845 .setEthDst(neigborDeviceMac)
846 .setOutput(entry.getKey().port());
847 builder.addTreatment(tBuilder.build());
848 }
849 return builder;
850 }
851
pierventre30368ab2021-02-24 23:23:22 +0100852 // Each map has an event listener enabling the events distribution across the cluster
853 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
854 @Override
855 public void event(MapEvent<PolicyId, PolicyRequest> event) {
856 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
857 event.oldValue() : event.newValue();
858 PolicyRequest policyRequest = value.value();
859 Policy policy = policyRequest.policy();
860 switch (event.type()) {
861 case INSERT:
862 case UPDATE:
863 switch (policyRequest.policyState()) {
864 case PENDING_ADD:
865 sendPolicy(policy, true);
866 break;
867 case PENDING_REMOVE:
868 sendPolicy(policy, false);
869 break;
870 case ADDED:
871 break;
872 default:
873 log.warn("Unknown policy state type {}", policyRequest.policyState());
874 }
875 break;
876 case REMOVE:
877 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100878 break;
879 default:
880 log.warn("Unknown event type {}", event.type());
881
882 }
883 }
884 }
885
886 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
887 @Override
888 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
889 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
890 event.oldValue() : event.newValue();
891 TrafficMatchRequest trafficMatchRequest = value.value();
892 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
893 switch (event.type()) {
894 case INSERT:
895 case UPDATE:
896 switch (trafficMatchRequest.trafficMatchState()) {
897 case PENDING_ADD:
898 sendTrafficMatch(trafficMatch, true);
899 break;
900 case PENDING_REMOVE:
901 sendTrafficMatch(trafficMatch, false);
902 break;
903 case ADDED:
904 break;
905 default:
906 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
907 }
908 break;
909 case REMOVE:
910 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
911 break;
912 default:
913 log.warn("Unknown event type {}", event.type());
914 }
915 }
916 }
917
918 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
919 @Override
920 public void event(MapEvent<String, Operation> event) {
921 String key = event.key();
922 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
923 event.oldValue() : event.newValue();
924 Operation operation = value.value();
925 switch (event.type()) {
926 case INSERT:
927 case UPDATE:
928 if (operation.isDone()) {
929 if (operation.policy().isPresent()) {
930 PolicyKey policyKey = PolicyKey.fromString(key);
931 updatePolicy(policyKey.policyId(), operation.isInstall());
932 } else if (operation.trafficMatch().isPresent()) {
933 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
934 } else {
935 log.warn("Unknown pending operation");
936 }
937 }
938 break;
939 case REMOVE:
940 break;
941 default:
942 log.warn("Unknown event type {}", event.type());
943 }
944 }
945 }
946
947 // Using the work partition service defines who is in charge of a given policy.
948 private boolean isLeader(PolicyId policyId) {
949 final NodeId currentNodeId = clusterService.getLocalNode().id();
950 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
951 if (leader == null) {
952 log.error("Fail to elect a leader for {}.", policyId);
953 return false;
954 }
955 policyLeaderCache.put(policyId, leader);
956 return currentNodeId.equals(leader);
957 }
958
959 private Long hasher(PolicyId policyId) {
960 return HASH_FN.newHasher()
961 .putUnencodedChars(policyId.toString())
962 .hash()
963 .asLong();
964 }
965
pierventre4f68ffa2021-03-09 22:52:14 +0100966 // TODO Periodic checker, consider to add store and delegates.
967
pierventre30368ab2021-02-24 23:23:22 +0100968 // Check periodically for any issue and try to resolve automatically if possible
969 private final class PolicyChecker implements Runnable {
970 @Override
971 public void run() {
972 }
973 }
974}