blob: f1dc285862ba5056ab34a1557016d84960501c87 [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010024import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010025import org.onlab.util.KryoNamespace;
26import org.onlab.util.PredictableExecutor;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080029import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010030import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010032import org.onosproject.net.ConnectPoint;
pierventre30368ab2021-02-24 23:23:22 +010033import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010034import org.onosproject.net.Link;
35import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010036import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010037import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.flow.TrafficTreatment;
39import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010040import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flowobjective.DefaultObjectiveContext;
42import org.onosproject.net.flowobjective.FlowObjectiveService;
43import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010044import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010045import org.onosproject.net.flowobjective.Objective;
46import org.onosproject.net.flowobjective.ObjectiveContext;
47import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010048import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010049import org.onosproject.segmentrouting.SegmentRoutingService;
50import org.onosproject.segmentrouting.policy.api.DropPolicy;
51import org.onosproject.segmentrouting.policy.api.Policy;
pierventre30368ab2021-02-24 23:23:22 +010052import org.onosproject.segmentrouting.policy.api.PolicyData;
53import org.onosproject.segmentrouting.policy.api.PolicyId;
54import org.onosproject.segmentrouting.policy.api.PolicyService;
55import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010056import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010057import org.onosproject.segmentrouting.policy.api.TrafficMatch;
58import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
59import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
60import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
pierventreb37a11a2021-03-18 16:50:04 +010061import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
62import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
pierventre30368ab2021-02-24 23:23:22 +010063import org.onosproject.store.serializers.KryoNamespaces;
64import org.onosproject.store.service.ConsistentMap;
65import org.onosproject.store.service.MapEvent;
66import org.onosproject.store.service.MapEventListener;
67import org.onosproject.store.service.Serializer;
68import org.onosproject.store.service.StorageException;
69import org.onosproject.store.service.StorageService;
70import org.onosproject.store.service.Versioned;
71import org.osgi.service.component.annotations.Activate;
72import org.osgi.service.component.annotations.Component;
73import org.osgi.service.component.annotations.Deactivate;
74import org.osgi.service.component.annotations.Reference;
75import org.osgi.service.component.annotations.ReferenceCardinality;
76import org.slf4j.Logger;
77
78import java.util.List;
79import java.util.Map;
80import java.util.Optional;
81import java.util.Set;
82import java.util.concurrent.CompletableFuture;
83import java.util.stream.Collectors;
84
85import static org.onlab.util.Tools.groupedThreads;
86import static org.slf4j.LoggerFactory.getLogger;
87
88/**
89 * Implementation of the policy service interface.
90 */
91@Component(immediate = true, service = PolicyService.class)
92public class PolicyManager implements PolicyService {
93
94 // App related things
95 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
96 private ApplicationId appId;
97 private Logger log = getLogger(getClass());
98 static final String KEY_SEPARATOR = "|";
99
pierventre4f68ffa2021-03-09 22:52:14 +0100100 // Supported policies
pierventreb37a11a2021-03-18 16:50:04 +0100101 private static final Set<PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
pierventre4f68ffa2021-03-09 22:52:14 +0100102 PolicyType.DROP, PolicyType.REDIRECT);
103
104 // Driver should use this meta to match port_is_edge field in the ACL table
105 private static final long EDGE_PORT = 1;
106 private static final long INFRA_PORT = 0;
107
pierventre30368ab2021-02-24 23:23:22 +0100108 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
109 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
110 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100111 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100112 private static final String POLICY_STORE = "sr-policy-store";
113 private ConsistentMap<PolicyId, PolicyRequest> policies;
114 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
115 private Map<PolicyId, PolicyRequest> policiesMap;
116
117 private static final String OPS_STORE = "sr-ops-store";
118 private ConsistentMap<String, Operation> operations;
119 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
120 private Map<String, Operation> opsMap;
121
122 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
123 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
124 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
125 new InternalTMatchMapEventListener();
126 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
127
128 // Leadership related objects - consistent hashing
129 private static final HashFunction HASH_FN = Hashing.md5();
130 // Read only cache of the Policy leader
131 private Map<PolicyId, NodeId> policyLeaderCache;
132
133 // Worker threads for policy and traffic match related ops
134 private static final int DEFAULT_THREADS = 4;
135 protected PredictableExecutor workers;
136
137 // Serializers and ONOS services
138 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
139 .register(KryoNamespaces.API)
140 .register(PolicyId.class)
141 .register(PolicyType.class)
142 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100143 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100144 .register(PolicyState.class)
145 .register(PolicyRequest.class)
146 .register(TrafficMatchId.class)
147 .register(TrafficMatchState.class)
148 .register(TrafficMatch.class)
149 .register(TrafficMatchRequest.class)
150 .register(Operation.class);
151 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
152
153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
154 private CoreService coreService;
155
156 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800157 private CodecService codecService;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100160 private StorageService storageService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 private ClusterService clusterService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100166 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100167
Wailok Shumee90c132021-03-11 21:00:11 +0800168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100169 private SegmentRoutingService srService;
pierventre30368ab2021-02-24 23:23:22 +0100170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100172 private FlowObjectiveService flowObjectiveService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
175 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100176
177 @Activate
178 public void activate() {
179 appId = coreService.registerApplication(APP_NAME);
180
Wailok Shumee90c132021-03-11 21:00:11 +0800181 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
182 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
183 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
184
pierventre30368ab2021-02-24 23:23:22 +0100185 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
186 .withName(POLICY_STORE)
187 .withSerializer(serializer).build();
188 policies.addListener(mapPolListener);
189 policiesMap = policies.asJavaMap();
190
191 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
192 .withName(TRAFFIC_MATCH_STORE)
193 .withSerializer(serializer).build();
194 trafficMatches.addListener(mapTMatchListener);
195 trafficMatchesMap = trafficMatches.asJavaMap();
196
197 operations = storageService.<String, Operation>consistentMapBuilder()
198 .withName(OPS_STORE)
199 .withSerializer(serializer).build();
200 operations.addListener(mapOpsListener);
201 opsMap = operations.asJavaMap();
202
203 policyLeaderCache = Maps.newConcurrentMap();
204
205 workers = new PredictableExecutor(DEFAULT_THREADS,
206 groupedThreads("sr-policy", "worker-%d", log));
207
208 log.info("Started");
209 }
210
211 @Deactivate
212 public void deactivate() {
213 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800214 codecService.unregisterCodec(DropPolicy.class);
215 codecService.unregisterCodec(RedirectPolicy.class);
216 codecService.unregisterCodec(TrafficMatch.class);
pierventre30368ab2021-02-24 23:23:22 +0100217 policies.removeListener(mapPolListener);
218 policies.destroy();
219 policiesMap.clear();
220 trafficMatches.removeListener(mapTMatchListener);
221 trafficMatches.destroy();
222 trafficMatchesMap.clear();
223 operations.removeListener(mapOpsListener);
224 operations.destroy();
225 operations.clear();
226 workers.shutdown();
227
228 log.info("Stopped");
229 }
230
231 @Override
232 //FIXME update does not work well
233 public PolicyId addOrUpdatePolicy(Policy policy) {
234 PolicyId policyId = policy.policyId();
235 try {
236 policies.put(policyId, new PolicyRequest(policy));
237 } catch (StorageException e) {
238 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
239 e.getMessage(), e);
240 policyId = null;
241 }
242 return policyId;
243 }
244
245 @Override
246 public boolean removePolicy(PolicyId policyId) {
247 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100248 if (dependingTrafficMatches(policyId).isPresent()) {
249 if (log.isDebugEnabled()) {
250 log.debug("Found depending traffic matches");
251 }
252 return false;
253 }
pierventre30368ab2021-02-24 23:23:22 +0100254 try {
255 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
256 if (v.policyState() != PolicyState.PENDING_REMOVE) {
257 v.policyState(PolicyState.PENDING_REMOVE);
258 }
259 return v;
260 })) != null;
261 } catch (StorageException e) {
262 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
263 e.getMessage(), e);
264 result = false;
265 }
266 return result;
267 }
268
269 @Override
270 public Set<PolicyData> policies(Set<PolicyType> filter) {
271 Set<PolicyData> policyData = Sets.newHashSet();
272 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
273 Set<PolicyRequest> policyRequests;
274 if (filter.isEmpty()) {
275 policyRequests = ImmutableSet.copyOf(policiesMap.values());
276 } else {
277 policyRequests = policiesMap.values().stream()
278 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
279 .collect(Collectors.toSet());
280 }
281 PolicyKey policyKey;
282 List<String> ops;
283 for (PolicyRequest policyRequest : policyRequests) {
284 ops = Lists.newArrayList();
285 for (DeviceId deviceId : edgeDeviceIds) {
286 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
287 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
288 if (operation != null) {
289 ops.add(deviceId + " -> " + operation.toStringMinimal());
290 }
291 }
292 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
293 }
294 return policyData;
295 }
296
297 @Override
298 //FIXME update does not work well
299 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
300 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
301 try {
302 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
303 } catch (StorageException e) {
304 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
305 e.getMessage(), e);
306 trafficMatchId = null;
307 }
308 return trafficMatchId;
309 }
310
311 @Override
312 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
313 boolean result;
314 try {
315 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
316 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
317 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
318 }
319 return v;
320 })) != null;
321 } catch (StorageException e) {
322 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
323 e.getMessage(), e);
324 result = false;
325 }
326 return result;
327 }
328
329 @Override
330 public Set<TrafficMatchData> trafficMatches() {
331 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
332 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
333 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
334 TrafficMatchKey trafficMatchKey;
335 List<String> ops;
336 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
337 ops = Lists.newArrayList();
338 for (DeviceId deviceId : edgeDeviceIds) {
339 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
340 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
341 if (operation != null) {
342 ops.add(deviceId + " -> " + operation.toStringMinimal());
343 }
344 }
345 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
346 trafficMatchRequest.trafficMatch(), ops));
347 }
348 return trafficMatchData;
349 }
350
351 // Install/remove the policies on the edge devices
352 private void sendPolicy(Policy policy, boolean install) {
353 if (!isLeader(policy.policyId())) {
354 if (log.isDebugEnabled()) {
355 log.debug("Instance is not leader for policy {}", policy.policyId());
356 }
357 return;
358 }
359 // We know that we are the leader, offloads to the workers the remaining
360 // part: issue fobj installation/removal and update the maps
361 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
362 for (DeviceId deviceId : edgeDeviceIds) {
363 workers.execute(() -> {
364 if (install) {
365 installPolicyInDevice(deviceId, policy);
366 } else {
367 removePolicyInDevice(deviceId, policy);
368 }
369 }, deviceId.hashCode());
370 }
371 }
372
373 // Orchestrate policy installation according to the type
374 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100375 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100376 log.warn("Policy {} type {} not yet supported",
377 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100378 return;
379 }
380 PolicyKey policyKey;
381 Operation.Builder operation;
382 if (log.isDebugEnabled()) {
383 log.debug("Installing {} policy {} for dev: {}",
384 policy.policyType(), policy.policyId(), deviceId);
385 }
386 policyKey = new PolicyKey(deviceId, policy.policyId());
387 operation = Operation.builder()
388 .isInstall(true)
389 .policy(policy);
390 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
391 if (policy.policyType() == PolicyType.DROP) {
392 // DROP policies do not need the next objective installation phase
393 // we can update directly the map and signal the ops as done
394 operation.isDone(true);
395 operations.put(policyKey.toString(), operation.build());
396 } else if (policy.policyType() == PolicyType.REDIRECT) {
397 // REDIRECT Uses next objective context to update the ops as done when
398 // it returns successfully. In the other cases leaves the ops as undone
399 // and the relative policy will remain in pending.
400 operations.put(policyKey.toString(), operation.build());
401 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
402 // Handle error here - leave the operation as undone and pending
403 if (builder != null) {
404 CompletableFuture<Objective> future = new CompletableFuture<>();
405 if (log.isDebugEnabled()) {
406 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
407 }
408 ObjectiveContext context = new DefaultObjectiveContext(
409 (objective) -> {
410 if (log.isDebugEnabled()) {
411 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
412 policy.policyId(), deviceId);
413 }
414 future.complete(objective);
415 },
416 (objective, error) -> {
417 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
418 policy.policyId(), error, deviceId);
419 future.complete(null);
420 });
421 // Context is not serializable
422 NextObjective serializableObjective = builder.add();
423 flowObjectiveService.next(deviceId, builder.add(context));
424 future.whenComplete((objective, ex) -> {
425 if (ex != null) {
426 log.error("Exception installing REDIRECT next objective", ex);
427 } else if (objective != null) {
428 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
429 if (!v.isDone() && v.isInstall()) {
430 v.isDone(true);
431 v.objectiveOperation(serializableObjective);
432 }
433 return v;
434 });
435 }
436 });
437 }
pierventre30368ab2021-02-24 23:23:22 +0100438 }
439 }
440
441 // Remove policy in a device according to the type
442 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100443 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
444 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
445 // Policy might be still in pending or not present anymore
446 if (operation == null || operation.objectiveOperation() == null) {
447 log.warn("There are no ops associated with {}", policyKey);
448 operation = Operation.builder()
449 .isDone(true)
450 .isInstall(false)
451 .policy(policy)
452 .build();
453 operations.put(policyKey.toString(), operation);
454 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100455 if (log.isDebugEnabled()) {
456 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
457 }
458 Operation.Builder operationBuilder = Operation.builder()
459 .isInstall(false)
460 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100461 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100462 operationBuilder.isDone(true);
463 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100464 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100465 // REDIRECT has to remove the next objective first
466 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
467 operations.put(policyKey.toString(), operationBuilder.build());
468 NextObjective.Builder builder = oldObj.copy();
469 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100470 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100471 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100472 }
pierventre4f68ffa2021-03-09 22:52:14 +0100473 ObjectiveContext context = new DefaultObjectiveContext(
474 (objective) -> {
475 if (log.isDebugEnabled()) {
476 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
477 policy.policyId(), deviceId);
478 }
479 future.complete(objective);
480 },
481 (objective, error) -> {
482 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
483 policy.policyId(), error, deviceId);
484 future.complete(null);
485 });
486 NextObjective serializableObjective = builder.remove();
487 flowObjectiveService.next(deviceId, builder.remove(context));
488 future.whenComplete((objective, ex) -> {
489 if (ex != null) {
490 log.error("Exception Removing REDIRECT next objective", ex);
491 } else if (objective != null) {
492 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
493 if (!v.isDone() && !v.isInstall()) {
494 v.isDone(true);
495 v.objectiveOperation(serializableObjective);
496 }
497 return v;
498 });
499 }
500 });
pierventre30368ab2021-02-24 23:23:22 +0100501 }
502 }
503 }
504
505 // Updates policy status if all the pending ops are done
506 private void updatePolicy(PolicyId policyId, boolean install) {
507 if (!isLeader(policyId)) {
508 if (log.isDebugEnabled()) {
509 log.debug("Instance is not leader for policy {}", policyId);
510 }
511 return;
512 }
513 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
514 }
515
516 private void updatePolicyInternal(PolicyId policyId, boolean install) {
517 // If there are no more pending ops we are ready to go; potentially we can check
518 // if the id is contained. Updates policies only if they are still present
519 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
520 .filter(entry -> entry.getValue().value().policy().isPresent())
521 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
522 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
523 .findFirst();
524 if (notYetDone.isEmpty()) {
525 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
526 if (v.policyState() == PolicyState.PENDING_ADD && install) {
527 if (log.isDebugEnabled()) {
528 log.debug("Policy {} is ready", policyId);
529 }
530 v.policyState(PolicyState.ADDED);
531 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
532 if (log.isDebugEnabled()) {
533 log.debug("Policy {} is removed", policyId);
534 }
535 v = null;
536 }
537 return v;
538 }));
539 // Greedy check for pending traffic matches
540 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
541 updatePendingTrafficMatches(policyRequest.policyId());
542 }
543 }
544 }
545
546 // Install/remove the traffic match on the edge devices
547 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
548 if (!isLeader(trafficMatch.policyId())) {
549 if (log.isDebugEnabled()) {
550 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
551 }
552 return;
553 }
554 // We know that we are the leader, offloads to the workers the remaining
555 // part: issue fobj installation/removal and update the maps
556 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
557 for (DeviceId deviceId : edgeDeviceIds) {
558 workers.execute(() -> {
559 if (install) {
560 installTrafficMatchToDevice(deviceId, trafficMatch);
561 } else {
562 removeTrafficMatchInDevice(deviceId, trafficMatch);
563 }
564 }, deviceId.hashCode());
565 }
566 }
567
568 // Orchestrate traffic match installation according to the type
569 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
570 if (log.isDebugEnabled()) {
571 log.debug("Installing traffic match {} associated to policy {}",
572 trafficMatch.trafficMatchId(), trafficMatch.policyId());
573 }
pierventre30368ab2021-02-24 23:23:22 +0100574 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
pierventre4f68ffa2021-03-09 22:52:14 +0100575 Operation trafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
576 if (trafficOperation != null && trafficOperation.isInstall()) {
577 if (log.isDebugEnabled()) {
578 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
579 "for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
580 }
581 return;
582 }
pierventre30368ab2021-02-24 23:23:22 +0100583 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
584 // policies require to retrieve the next Id and sets the next step.
585 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
586 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
587 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100588 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
589 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
590 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100591 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
592 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
593 return;
594 }
pierventre4f68ffa2021-03-09 22:52:14 +0100595 // Updates the store and then send the versatile fwd objective to the pipeliner
596 trafficOperation = Operation.builder()
597 .isInstall(true)
598 .trafficMatch(trafficMatch)
599 .build();
600 operations.put(trafficMatchKey.toString(), trafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100601 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100602 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100603 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100604 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100605 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
606 .wipeDeferred()
607 .build();
608 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100609 } else if (policy.policyType() == PolicyType.REDIRECT) {
610
611 // Here we need to set only the next step
612 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100613 }
pierventre4f68ffa2021-03-09 22:52:14 +0100614 // Once, the fwd objective has completed its execution, we update the policiesOps map
615 CompletableFuture<Objective> future = new CompletableFuture<>();
616 if (log.isDebugEnabled()) {
617 log.debug("Installing forwarding objective for dev: {}", deviceId);
618 }
619 ObjectiveContext context = new DefaultObjectiveContext(
620 (objective) -> {
621 if (log.isDebugEnabled()) {
622 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
623 }
624 future.complete(objective);
625 },
626 (objective, error) -> {
627 log.warn("Failed to install forwarding objective for policy {}: {}",
628 trafficMatch.policyId(), error);
629 future.complete(null);
630 });
631 // Context is not serializable
632 ForwardingObjective serializableObjective = builder.add();
633 flowObjectiveService.forward(deviceId, builder.add(context));
634 future.whenComplete((objective, ex) -> {
635 if (ex != null) {
636 log.error("Exception installing forwarding objective", ex);
637 } else if (objective != null) {
638 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
639 if (!v.isDone() && v.isInstall()) {
640 v.isDone(true);
641 v.objectiveOperation(serializableObjective);
642 }
643 return v;
644 });
645 }
646 });
pierventre30368ab2021-02-24 23:23:22 +0100647 }
648
649 // Updates traffic match status if all the pending ops are done
650 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
651 if (!isLeader(trafficMatch.policyId())) {
652 if (log.isDebugEnabled()) {
653 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
654 }
655 return;
656 }
657 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
658 trafficMatch.policyId().hashCode());
659 }
660
661 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
662 // If there are no more pending ops we are ready to go; potentially we can check
663 // if the id is contained. Updates traffic matches only if they are still present
664 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
665 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
666 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
667 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
668 .findFirst();
669 if (notYetDone.isEmpty()) {
670 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
671 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
672 if (log.isDebugEnabled()) {
673 log.debug("Traffic match {} is ready", trafficMatchId);
674 }
675 v.trafficMatchState(TrafficMatchState.ADDED);
676 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
677 if (log.isDebugEnabled()) {
678 log.debug("Traffic match {} is removed", trafficMatchId);
679 }
680 v = null;
681 }
682 return v;
683 });
684 }
685 }
686
687 // Look for any pending traffic match waiting for the policy
688 private void updatePendingTrafficMatches(PolicyId policyId) {
689 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
690 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
691 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
692 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
693 .collect(Collectors.toSet());
694 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
695 sendTrafficMatch(trafficMatch.trafficMatch(), true);
696 }
697 }
698
699 // Traffic match removal in a device
700 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
701 if (log.isDebugEnabled()) {
702 log.debug("Removing traffic match {} associated to policy {}",
703 trafficMatch.trafficMatchId(), trafficMatch.policyId());
704 }
705 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
706 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
707 if (operation == null || operation.objectiveOperation() == null) {
708 log.warn("There are no ops associated with {}", trafficMatchKey);
709 operation = Operation.builder()
710 .isDone(true)
711 .isInstall(false)
712 .trafficMatch(trafficMatch)
713 .build();
714 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100715 } else if (!operation.isInstall()) {
716 if (log.isDebugEnabled()) {
717 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
718 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
719 }
pierventre30368ab2021-02-24 23:23:22 +0100720 } else {
721 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
722 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100723 .isInstall(false)
724 .build();
725 operations.put(trafficMatchKey.toString(), operation);
726 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
727 CompletableFuture<Objective> future = new CompletableFuture<>();
728 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100729 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100730 }
731 ObjectiveContext context = new DefaultObjectiveContext(
732 (objective) -> {
733 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100734 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100735 }
736 future.complete(objective);
737 },
738 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100739 log.warn("Failed to remove forwarding objective for policy {}: {}",
740 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100741 future.complete(null);
742 });
743 ForwardingObjective serializableObjective = builder.remove();
744 flowObjectiveService.forward(deviceId, builder.remove(context));
745 future.whenComplete((objective, ex) -> {
746 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100747 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100748 } else if (objective != null) {
749 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
750 if (!v.isDone() && !v.isInstall()) {
751 v.isDone(true);
752 v.objectiveOperation(serializableObjective);
753 }
754 return v;
755 });
756 }
757 });
758 }
759 }
760
pierventre4f68ffa2021-03-09 22:52:14 +0100761 // It is used when a policy has been removed but there are still traffic matches depending on it
762 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
763 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100764 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
765 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
766 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100767 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100768 }
769
770 // Utility that removes operations related to a policy or to a traffic match.
771 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
772 if (!isLeader(policyId)) {
773 if (log.isDebugEnabled()) {
774 log.debug("Instance is not leader for policy {}", policyId);
775 }
776 return;
777 }
778 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
779 for (DeviceId deviceId : edgeDeviceIds) {
780 workers.execute(() -> {
781 String key;
782 if (trafficMatchId.isPresent()) {
783 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
784 } else {
785 key = new PolicyKey(deviceId, policyId).toString();
786 }
787 operations.remove(key);
788 }, deviceId.hashCode());
789 }
790 }
791
pierventre4f68ffa2021-03-09 22:52:14 +0100792 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
793 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
794 if (policyType == PolicyType.REDIRECT) {
795 metaBuilder.matchMetadata(EDGE_PORT);
796 }
pierventre30368ab2021-02-24 23:23:22 +0100797 return DefaultForwardingObjective.builder()
798 .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
799 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100800 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100801 .fromApp(appId)
802 .withFlag(ForwardingObjective.Flag.VERSATILE)
803 .makePermanent();
804 }
805
pierventre4f68ffa2021-03-09 22:52:14 +0100806 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
807 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
808 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
809 List<DeviceId> edgeDevices = srService.getEdgeDeviceIds();
810 egressLinks.stream()
811 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
812 !edgeDevices.contains(link.dst().deviceId()))
813 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
814 // No ports no friend
815 if (egressPortsToEnforce.isEmpty()) {
816 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
817 return null;
818 }
819 // We need to add a treatment for each valid egress port. The treatment
820 // requires to set src and dst mac address and set the egress port. We are
821 // deliberately not providing the metadata to prevent the programming of
822 // some tables which are already controlled by SegmentRouting or are unnecessary
823 int nextId = flowObjectiveService.allocateNextId();
824 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
825 .withId(nextId)
826 .withType(NextObjective.Type.HASHED)
827 .fromApp(appId);
828 MacAddress srcDeviceMac;
829 try {
830 srcDeviceMac = srService.getDeviceMacAddress(srcDevice);
831 } catch (DeviceConfigNotFoundException e) {
832 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
833 return null;
834 }
835 MacAddress neigborDeviceMac;
836 TrafficTreatment.Builder tBuilder;
837 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
838 try {
839 neigborDeviceMac = srService.getDeviceMacAddress(entry.getValue());
840 } catch (DeviceConfigNotFoundException e) {
841 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
842 return null;
843 }
844 tBuilder = DefaultTrafficTreatment.builder()
845 .setEthSrc(srcDeviceMac)
846 .setEthDst(neigborDeviceMac)
847 .setOutput(entry.getKey().port());
848 builder.addTreatment(tBuilder.build());
849 }
850 return builder;
851 }
852
pierventre30368ab2021-02-24 23:23:22 +0100853 // Each map has an event listener enabling the events distribution across the cluster
854 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
855 @Override
856 public void event(MapEvent<PolicyId, PolicyRequest> event) {
857 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
858 event.oldValue() : event.newValue();
859 PolicyRequest policyRequest = value.value();
860 Policy policy = policyRequest.policy();
861 switch (event.type()) {
862 case INSERT:
863 case UPDATE:
864 switch (policyRequest.policyState()) {
865 case PENDING_ADD:
866 sendPolicy(policy, true);
867 break;
868 case PENDING_REMOVE:
869 sendPolicy(policy, false);
870 break;
871 case ADDED:
872 break;
873 default:
874 log.warn("Unknown policy state type {}", policyRequest.policyState());
875 }
876 break;
877 case REMOVE:
878 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100879 break;
880 default:
881 log.warn("Unknown event type {}", event.type());
882
883 }
884 }
885 }
886
887 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
888 @Override
889 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
890 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
891 event.oldValue() : event.newValue();
892 TrafficMatchRequest trafficMatchRequest = value.value();
893 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
894 switch (event.type()) {
895 case INSERT:
896 case UPDATE:
897 switch (trafficMatchRequest.trafficMatchState()) {
898 case PENDING_ADD:
899 sendTrafficMatch(trafficMatch, true);
900 break;
901 case PENDING_REMOVE:
902 sendTrafficMatch(trafficMatch, false);
903 break;
904 case ADDED:
905 break;
906 default:
907 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
908 }
909 break;
910 case REMOVE:
911 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
912 break;
913 default:
914 log.warn("Unknown event type {}", event.type());
915 }
916 }
917 }
918
919 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
920 @Override
921 public void event(MapEvent<String, Operation> event) {
922 String key = event.key();
923 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
924 event.oldValue() : event.newValue();
925 Operation operation = value.value();
926 switch (event.type()) {
927 case INSERT:
928 case UPDATE:
929 if (operation.isDone()) {
930 if (operation.policy().isPresent()) {
931 PolicyKey policyKey = PolicyKey.fromString(key);
932 updatePolicy(policyKey.policyId(), operation.isInstall());
933 } else if (operation.trafficMatch().isPresent()) {
934 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
935 } else {
936 log.warn("Unknown pending operation");
937 }
938 }
939 break;
940 case REMOVE:
941 break;
942 default:
943 log.warn("Unknown event type {}", event.type());
944 }
945 }
946 }
947
948 // Using the work partition service defines who is in charge of a given policy.
949 private boolean isLeader(PolicyId policyId) {
950 final NodeId currentNodeId = clusterService.getLocalNode().id();
951 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
952 if (leader == null) {
953 log.error("Fail to elect a leader for {}.", policyId);
954 return false;
955 }
956 policyLeaderCache.put(policyId, leader);
957 return currentNodeId.equals(leader);
958 }
959
960 private Long hasher(PolicyId policyId) {
961 return HASH_FN.newHasher()
962 .putUnencodedChars(policyId.toString())
963 .hash()
964 .asLong();
965 }
966
pierventre4f68ffa2021-03-09 22:52:14 +0100967 // TODO Periodic checker, consider to add store and delegates.
968
pierventre30368ab2021-02-24 23:23:22 +0100969 // Check periodically for any issue and try to resolve automatically if possible
970 private final class PolicyChecker implements Runnable {
971 @Override
972 public void run() {
973 }
974 }
975}