blob: dbd566766fa30054b75b84cfe3816ffeac4e8e93 [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
23import org.glassfish.jersey.internal.guava.Sets;
24import org.onlab.util.KryoNamespace;
25import org.onlab.util.PredictableExecutor;
26import org.onosproject.cluster.ClusterService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.net.DeviceId;
31import org.onosproject.net.flow.DefaultTrafficTreatment;
32import org.onosproject.net.flow.TrafficTreatment;
33import org.onosproject.net.flowobjective.DefaultForwardingObjective;
34import org.onosproject.net.flowobjective.DefaultObjectiveContext;
35import org.onosproject.net.flowobjective.FlowObjectiveService;
36import org.onosproject.net.flowobjective.ForwardingObjective;
37import org.onosproject.net.flowobjective.Objective;
38import org.onosproject.net.flowobjective.ObjectiveContext;
39import org.onosproject.net.intent.WorkPartitionService;
40import org.onosproject.segmentrouting.SegmentRoutingService;
41import org.onosproject.segmentrouting.policy.api.DropPolicy;
42import org.onosproject.segmentrouting.policy.api.Policy;
43import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
44import org.onosproject.segmentrouting.policy.api.PolicyData;
45import org.onosproject.segmentrouting.policy.api.PolicyId;
46import org.onosproject.segmentrouting.policy.api.PolicyService;
47import org.onosproject.segmentrouting.policy.api.PolicyState;
48import org.onosproject.segmentrouting.policy.api.TrafficMatch;
49import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
50import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
51import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
52import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.service.ConsistentMap;
54import org.onosproject.store.service.MapEvent;
55import org.onosproject.store.service.MapEventListener;
56import org.onosproject.store.service.Serializer;
57import org.onosproject.store.service.StorageException;
58import org.onosproject.store.service.StorageService;
59import org.onosproject.store.service.Versioned;
60import org.osgi.service.component.annotations.Activate;
61import org.osgi.service.component.annotations.Component;
62import org.osgi.service.component.annotations.Deactivate;
63import org.osgi.service.component.annotations.Reference;
64import org.osgi.service.component.annotations.ReferenceCardinality;
65import org.slf4j.Logger;
66
67import java.util.List;
68import java.util.Map;
69import java.util.Optional;
70import java.util.Set;
71import java.util.concurrent.CompletableFuture;
72import java.util.stream.Collectors;
73
74import static org.onlab.util.Tools.groupedThreads;
75import static org.slf4j.LoggerFactory.getLogger;
76
77/**
78 * Implementation of the policy service interface.
79 */
80@Component(immediate = true, service = PolicyService.class)
81public class PolicyManager implements PolicyService {
82
83 // App related things
84 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
85 private ApplicationId appId;
86 private Logger log = getLogger(getClass());
87 static final String KEY_SEPARATOR = "|";
88
89 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
90 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
91 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
92 private static final String POLICY_STORE = "sr-policy-store";
93 private ConsistentMap<PolicyId, PolicyRequest> policies;
94 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
95 private Map<PolicyId, PolicyRequest> policiesMap;
96
97 private static final String OPS_STORE = "sr-ops-store";
98 private ConsistentMap<String, Operation> operations;
99 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
100 private Map<String, Operation> opsMap;
101
102 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
103 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
104 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
105 new InternalTMatchMapEventListener();
106 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
107
108 // Leadership related objects - consistent hashing
109 private static final HashFunction HASH_FN = Hashing.md5();
110 // Read only cache of the Policy leader
111 private Map<PolicyId, NodeId> policyLeaderCache;
112
113 // Worker threads for policy and traffic match related ops
114 private static final int DEFAULT_THREADS = 4;
115 protected PredictableExecutor workers;
116
117 // Serializers and ONOS services
118 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
119 .register(KryoNamespaces.API)
120 .register(PolicyId.class)
121 .register(PolicyType.class)
122 .register(DropPolicy.class)
123 .register(PolicyState.class)
124 .register(PolicyRequest.class)
125 .register(TrafficMatchId.class)
126 .register(TrafficMatchState.class)
127 .register(TrafficMatch.class)
128 .register(TrafficMatchRequest.class)
129 .register(Operation.class);
130 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 private CoreService coreService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 private StorageService storageService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 private ClusterService clusterService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
142 public WorkPartitionService workPartitionService;
143
144 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
145 public SegmentRoutingService srService;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
148 public FlowObjectiveService flowObjectiveService;
149
150 @Activate
151 public void activate() {
152 appId = coreService.registerApplication(APP_NAME);
153
154 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
155 .withName(POLICY_STORE)
156 .withSerializer(serializer).build();
157 policies.addListener(mapPolListener);
158 policiesMap = policies.asJavaMap();
159
160 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
161 .withName(TRAFFIC_MATCH_STORE)
162 .withSerializer(serializer).build();
163 trafficMatches.addListener(mapTMatchListener);
164 trafficMatchesMap = trafficMatches.asJavaMap();
165
166 operations = storageService.<String, Operation>consistentMapBuilder()
167 .withName(OPS_STORE)
168 .withSerializer(serializer).build();
169 operations.addListener(mapOpsListener);
170 opsMap = operations.asJavaMap();
171
172 policyLeaderCache = Maps.newConcurrentMap();
173
174 workers = new PredictableExecutor(DEFAULT_THREADS,
175 groupedThreads("sr-policy", "worker-%d", log));
176
177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
182 // Teardown everything
183 policies.removeListener(mapPolListener);
184 policies.destroy();
185 policiesMap.clear();
186 trafficMatches.removeListener(mapTMatchListener);
187 trafficMatches.destroy();
188 trafficMatchesMap.clear();
189 operations.removeListener(mapOpsListener);
190 operations.destroy();
191 operations.clear();
192 workers.shutdown();
193
194 log.info("Stopped");
195 }
196
197 @Override
198 //FIXME update does not work well
199 public PolicyId addOrUpdatePolicy(Policy policy) {
200 PolicyId policyId = policy.policyId();
201 try {
202 policies.put(policyId, new PolicyRequest(policy));
203 } catch (StorageException e) {
204 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
205 e.getMessage(), e);
206 policyId = null;
207 }
208 return policyId;
209 }
210
211 @Override
212 public boolean removePolicy(PolicyId policyId) {
213 boolean result;
214 try {
215 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
216 if (v.policyState() != PolicyState.PENDING_REMOVE) {
217 v.policyState(PolicyState.PENDING_REMOVE);
218 }
219 return v;
220 })) != null;
221 } catch (StorageException e) {
222 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
223 e.getMessage(), e);
224 result = false;
225 }
226 return result;
227 }
228
229 @Override
230 public Set<PolicyData> policies(Set<PolicyType> filter) {
231 Set<PolicyData> policyData = Sets.newHashSet();
232 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
233 Set<PolicyRequest> policyRequests;
234 if (filter.isEmpty()) {
235 policyRequests = ImmutableSet.copyOf(policiesMap.values());
236 } else {
237 policyRequests = policiesMap.values().stream()
238 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
239 .collect(Collectors.toSet());
240 }
241 PolicyKey policyKey;
242 List<String> ops;
243 for (PolicyRequest policyRequest : policyRequests) {
244 ops = Lists.newArrayList();
245 for (DeviceId deviceId : edgeDeviceIds) {
246 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
247 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
248 if (operation != null) {
249 ops.add(deviceId + " -> " + operation.toStringMinimal());
250 }
251 }
252 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
253 }
254 return policyData;
255 }
256
257 @Override
258 //FIXME update does not work well
259 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
260 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
261 try {
262 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
263 } catch (StorageException e) {
264 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
265 e.getMessage(), e);
266 trafficMatchId = null;
267 }
268 return trafficMatchId;
269 }
270
271 @Override
272 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
273 boolean result;
274 try {
275 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
276 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
277 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
278 }
279 return v;
280 })) != null;
281 } catch (StorageException e) {
282 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
283 e.getMessage(), e);
284 result = false;
285 }
286 return result;
287 }
288
289 @Override
290 public Set<TrafficMatchData> trafficMatches() {
291 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
292 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
293 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
294 TrafficMatchKey trafficMatchKey;
295 List<String> ops;
296 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
297 ops = Lists.newArrayList();
298 for (DeviceId deviceId : edgeDeviceIds) {
299 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
300 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
301 if (operation != null) {
302 ops.add(deviceId + " -> " + operation.toStringMinimal());
303 }
304 }
305 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
306 trafficMatchRequest.trafficMatch(), ops));
307 }
308 return trafficMatchData;
309 }
310
311 // Install/remove the policies on the edge devices
312 private void sendPolicy(Policy policy, boolean install) {
313 if (!isLeader(policy.policyId())) {
314 if (log.isDebugEnabled()) {
315 log.debug("Instance is not leader for policy {}", policy.policyId());
316 }
317 return;
318 }
319 // We know that we are the leader, offloads to the workers the remaining
320 // part: issue fobj installation/removal and update the maps
321 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
322 for (DeviceId deviceId : edgeDeviceIds) {
323 workers.execute(() -> {
324 if (install) {
325 installPolicyInDevice(deviceId, policy);
326 } else {
327 removePolicyInDevice(deviceId, policy);
328 }
329 }, deviceId.hashCode());
330 }
331 }
332
333 // Orchestrate policy installation according to the type
334 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
335 PolicyKey policyKey;
336 Operation operation;
337 if (policy.policyType() == PolicyType.DROP) {
338 if (log.isDebugEnabled()) {
339 log.debug("Installing DROP policy {}", policy.policyId());
340 }
341 // DROP policies do not need the next objective installation phase
342 // we can update directly the map and signal the ops as done
343 policyKey = new PolicyKey(deviceId, policy.policyId());
344 operation = Operation.builder()
345 .isDone(true)
346 .isInstall(true)
347 .policy(policy)
348 .build();
349 operations.put(policyKey.toString(), operation);
350 } else if (policy.policyType() == PolicyType.REDIRECT) {
351 if (log.isDebugEnabled()) {
352 log.debug("Installing REDIRECT policy {}", policy.policyId());
353 }
354 // REDIRECT Uses objective context to update the ops as done when it returns
355 // successfully. In the other cases leaves the ops as undone and the
356 // relative policy will remain in pending.
357 } else {
358 log.warn("Policy {} type {} not yet supported",
359 policy.policyId(), policy.policyType());
360 }
361 }
362
363 // Remove policy in a device according to the type
364 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
365 if (log.isDebugEnabled()) {
366 log.debug("Removing policy {}", policy.policyId());
367 }
368 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
369 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
370 // Policy might be still in pending or not present anymore
371 if (operation == null || operation.objectiveOperation() == null) {
372 log.warn("There are no ops associated with {}", policyKey);
373 operation = Operation.builder()
374 .isDone(true)
375 .isInstall(false)
376 .policy(policy)
377 .build();
378 operations.put(policyKey.toString(), operation);
379 } else {
380 if (policy.policyType() == PolicyType.DROP) {
381 if (log.isDebugEnabled()) {
382 log.debug("Removing DROP policy {}", policy.policyId());
383 }
384 operation = Operation.builder()
385 .isDone(true)
386 .isInstall(false)
387 .policy(policy)
388 .build();
389 operations.put(policyKey.toString(), operation);
390 } else if (policy.policyType() == PolicyType.REDIRECT) {
391 if (log.isDebugEnabled()) {
392 log.debug("Removing REDIRECT policy {}", policy.policyId());
393 }
394 // REDIRECT has to remove first a next objective
395 } else {
396 log.warn("Policy {} type {} not yet supported",
397 policy.policyId(), policy.policyType());
398 }
399 }
400 }
401
402 // Updates policy status if all the pending ops are done
403 private void updatePolicy(PolicyId policyId, boolean install) {
404 if (!isLeader(policyId)) {
405 if (log.isDebugEnabled()) {
406 log.debug("Instance is not leader for policy {}", policyId);
407 }
408 return;
409 }
410 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
411 }
412
413 private void updatePolicyInternal(PolicyId policyId, boolean install) {
414 // If there are no more pending ops we are ready to go; potentially we can check
415 // if the id is contained. Updates policies only if they are still present
416 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
417 .filter(entry -> entry.getValue().value().policy().isPresent())
418 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
419 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
420 .findFirst();
421 if (notYetDone.isEmpty()) {
422 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
423 if (v.policyState() == PolicyState.PENDING_ADD && install) {
424 if (log.isDebugEnabled()) {
425 log.debug("Policy {} is ready", policyId);
426 }
427 v.policyState(PolicyState.ADDED);
428 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
429 if (log.isDebugEnabled()) {
430 log.debug("Policy {} is removed", policyId);
431 }
432 v = null;
433 }
434 return v;
435 }));
436 // Greedy check for pending traffic matches
437 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
438 updatePendingTrafficMatches(policyRequest.policyId());
439 }
440 }
441 }
442
443 // Install/remove the traffic match on the edge devices
444 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
445 if (!isLeader(trafficMatch.policyId())) {
446 if (log.isDebugEnabled()) {
447 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
448 }
449 return;
450 }
451 // We know that we are the leader, offloads to the workers the remaining
452 // part: issue fobj installation/removal and update the maps
453 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
454 for (DeviceId deviceId : edgeDeviceIds) {
455 workers.execute(() -> {
456 if (install) {
457 installTrafficMatchToDevice(deviceId, trafficMatch);
458 } else {
459 removeTrafficMatchInDevice(deviceId, trafficMatch);
460 }
461 }, deviceId.hashCode());
462 }
463 }
464
465 // Orchestrate traffic match installation according to the type
466 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
467 if (log.isDebugEnabled()) {
468 log.debug("Installing traffic match {} associated to policy {}",
469 trafficMatch.trafficMatchId(), trafficMatch.policyId());
470 }
471 // Updates the store and then send the versatile fwd objective to the pipeliner
472 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
473 Operation trafficOperation = Operation.builder()
474 .isInstall(true)
475 .trafficMatch(trafficMatch)
476 .build();
477 operations.put(trafficMatchKey.toString(), trafficOperation);
478 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
479 // policies require to retrieve the next Id and sets the next step.
480 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
481 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
482 if (policyOperation == null || !policyOperation.isDone() ||
483 !policyOperation.isInstall() || policyOperation.policy().isEmpty()) {
484 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
485 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
486 return;
487 }
488 Policy policy = policyOperation.policy().get();
489 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch);
490 // TODO we can try to reuse some code: context and completable future logic
491 if (policy.policyType() == PolicyType.DROP) {
492 // Firstly builds the fwd objective with the wipeDeferred action. Once, the fwd
493 // objective has completed its execution, we update the policiesOps map
494 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
495 .wipeDeferred()
496 .build();
497 builder.withTreatment(dropTreatment);
498 CompletableFuture<Objective> future = new CompletableFuture<>();
499 if (log.isDebugEnabled()) {
500 log.debug("Installing ACL drop forwarding objectives for dev: {}", deviceId);
501 }
502 ObjectiveContext context = new DefaultObjectiveContext(
503 (objective) -> {
504 if (log.isDebugEnabled()) {
505 log.debug("ACL drop rule for policy {} installed", trafficMatch.policyId());
506 }
507 future.complete(objective);
508 },
509 (objective, error) -> {
510 log.warn("Failed to install ACL drop rule for policy {}: {}", trafficMatch.policyId(), error);
511 future.complete(null);
512 });
513 // Context is not serializable
514 ForwardingObjective serializableObjective = builder.add();
515 flowObjectiveService.forward(deviceId, builder.add(context));
516 future.whenComplete((objective, ex) -> {
517 if (ex != null) {
518 log.error("Exception installing ACL drop rule", ex);
519 } else if (objective != null) {
520 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
521 if (!v.isDone() && v.isInstall()) {
522 v.isDone(true);
523 v.objectiveOperation(serializableObjective);
524 }
525 return v;
526 });
527 }
528 });
529 } else {
530 log.warn("Policy {} type {} not yet supported", policy.policyId(), policy.policyType());
531 }
532 }
533
534 // Updates traffic match status if all the pending ops are done
535 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
536 if (!isLeader(trafficMatch.policyId())) {
537 if (log.isDebugEnabled()) {
538 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
539 }
540 return;
541 }
542 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
543 trafficMatch.policyId().hashCode());
544 }
545
546 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
547 // If there are no more pending ops we are ready to go; potentially we can check
548 // if the id is contained. Updates traffic matches only if they are still present
549 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
550 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
551 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
552 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
553 .findFirst();
554 if (notYetDone.isEmpty()) {
555 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
556 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
557 if (log.isDebugEnabled()) {
558 log.debug("Traffic match {} is ready", trafficMatchId);
559 }
560 v.trafficMatchState(TrafficMatchState.ADDED);
561 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
562 if (log.isDebugEnabled()) {
563 log.debug("Traffic match {} is removed", trafficMatchId);
564 }
565 v = null;
566 }
567 return v;
568 });
569 }
570 }
571
572 // Look for any pending traffic match waiting for the policy
573 private void updatePendingTrafficMatches(PolicyId policyId) {
574 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
575 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
576 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
577 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
578 .collect(Collectors.toSet());
579 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
580 sendTrafficMatch(trafficMatch.trafficMatch(), true);
581 }
582 }
583
584 // Traffic match removal in a device
585 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
586 if (log.isDebugEnabled()) {
587 log.debug("Removing traffic match {} associated to policy {}",
588 trafficMatch.trafficMatchId(), trafficMatch.policyId());
589 }
590 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
591 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
592 if (operation == null || operation.objectiveOperation() == null) {
593 log.warn("There are no ops associated with {}", trafficMatchKey);
594 operation = Operation.builder()
595 .isDone(true)
596 .isInstall(false)
597 .trafficMatch(trafficMatch)
598 .build();
599 operations.put(trafficMatchKey.toString(), operation);
600 } else {
601 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
602 operation = Operation.builder(operation)
603 .isDone(false)
604 .isInstall(false)
605 .build();
606 operations.put(trafficMatchKey.toString(), operation);
607 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
608 CompletableFuture<Objective> future = new CompletableFuture<>();
609 if (log.isDebugEnabled()) {
610 log.debug("Removing ACL drop forwarding objectives for dev: {}", deviceId);
611 }
612 ObjectiveContext context = new DefaultObjectiveContext(
613 (objective) -> {
614 if (log.isDebugEnabled()) {
615 log.debug("ACL drop rule for policy {} removed", trafficMatch.policyId());
616 }
617 future.complete(objective);
618 },
619 (objective, error) -> {
620 log.warn("Failed to remove ACL drop rule for policy {}: {}", trafficMatch.policyId(), error);
621 future.complete(null);
622 });
623 ForwardingObjective serializableObjective = builder.remove();
624 flowObjectiveService.forward(deviceId, builder.remove(context));
625 future.whenComplete((objective, ex) -> {
626 if (ex != null) {
627 log.error("Exception removing ACL drop rule", ex);
628 } else if (objective != null) {
629 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
630 if (!v.isDone() && !v.isInstall()) {
631 v.isDone(true);
632 v.objectiveOperation(serializableObjective);
633 }
634 return v;
635 });
636 }
637 });
638 }
639 }
640
641 // Update any depending traffic match on the policy. It is used when a policy
642 // has been removed but there are still traffic matches depending on it
643 private void updateDependingTrafficMatches(PolicyId policyId) {
644 if (!isLeader(policyId)) {
645 if (log.isDebugEnabled()) {
646 log.debug("Instance is not leader for policy {}", policyId);
647 }
648 return;
649 }
650 workers.execute(() -> updateDependingTrafficMatchesInternal(policyId), policyId.hashCode());
651 }
652
653 private void updateDependingTrafficMatchesInternal(PolicyId policyId) {
654 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
655 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
656 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
657 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
658 .collect(Collectors.toSet());
659 for (TrafficMatchRequest trafficMatchRequest : pendingTrafficMatches) {
660 trafficMatches.computeIfPresent(trafficMatchRequest.trafficMatchId(), (k, v) -> {
661 if (v.trafficMatchState() == TrafficMatchState.ADDED) {
662 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
663 }
664 return v;
665 });
666 }
667 }
668
669 // Utility that removes operations related to a policy or to a traffic match.
670 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
671 if (!isLeader(policyId)) {
672 if (log.isDebugEnabled()) {
673 log.debug("Instance is not leader for policy {}", policyId);
674 }
675 return;
676 }
677 List<DeviceId> edgeDeviceIds = srService.getEdgeDeviceIds();
678 for (DeviceId deviceId : edgeDeviceIds) {
679 workers.execute(() -> {
680 String key;
681 if (trafficMatchId.isPresent()) {
682 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
683 } else {
684 key = new PolicyKey(deviceId, policyId).toString();
685 }
686 operations.remove(key);
687 }, deviceId.hashCode());
688 }
689 }
690
691 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch) {
692 return DefaultForwardingObjective.builder()
693 .withPriority(PolicyService.TRAFFIC_MATCH_PRIORITY)
694 .withSelector(trafficMatch.trafficSelector())
695 .fromApp(appId)
696 .withFlag(ForwardingObjective.Flag.VERSATILE)
697 .makePermanent();
698 }
699
700 // Each map has an event listener enabling the events distribution across the cluster
701 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
702 @Override
703 public void event(MapEvent<PolicyId, PolicyRequest> event) {
704 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
705 event.oldValue() : event.newValue();
706 PolicyRequest policyRequest = value.value();
707 Policy policy = policyRequest.policy();
708 switch (event.type()) {
709 case INSERT:
710 case UPDATE:
711 switch (policyRequest.policyState()) {
712 case PENDING_ADD:
713 sendPolicy(policy, true);
714 break;
715 case PENDING_REMOVE:
716 sendPolicy(policy, false);
717 break;
718 case ADDED:
719 break;
720 default:
721 log.warn("Unknown policy state type {}", policyRequest.policyState());
722 }
723 break;
724 case REMOVE:
725 removeOperations(policy.policyId(), Optional.empty());
726 updateDependingTrafficMatches(policy.policyId());
727 break;
728 default:
729 log.warn("Unknown event type {}", event.type());
730
731 }
732 }
733 }
734
735 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
736 @Override
737 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
738 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
739 event.oldValue() : event.newValue();
740 TrafficMatchRequest trafficMatchRequest = value.value();
741 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
742 switch (event.type()) {
743 case INSERT:
744 case UPDATE:
745 switch (trafficMatchRequest.trafficMatchState()) {
746 case PENDING_ADD:
747 sendTrafficMatch(trafficMatch, true);
748 break;
749 case PENDING_REMOVE:
750 sendTrafficMatch(trafficMatch, false);
751 break;
752 case ADDED:
753 break;
754 default:
755 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
756 }
757 break;
758 case REMOVE:
759 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
760 break;
761 default:
762 log.warn("Unknown event type {}", event.type());
763 }
764 }
765 }
766
767 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
768 @Override
769 public void event(MapEvent<String, Operation> event) {
770 String key = event.key();
771 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
772 event.oldValue() : event.newValue();
773 Operation operation = value.value();
774 switch (event.type()) {
775 case INSERT:
776 case UPDATE:
777 if (operation.isDone()) {
778 if (operation.policy().isPresent()) {
779 PolicyKey policyKey = PolicyKey.fromString(key);
780 updatePolicy(policyKey.policyId(), operation.isInstall());
781 } else if (operation.trafficMatch().isPresent()) {
782 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
783 } else {
784 log.warn("Unknown pending operation");
785 }
786 }
787 break;
788 case REMOVE:
789 break;
790 default:
791 log.warn("Unknown event type {}", event.type());
792 }
793 }
794 }
795
796 // Using the work partition service defines who is in charge of a given policy.
797 private boolean isLeader(PolicyId policyId) {
798 final NodeId currentNodeId = clusterService.getLocalNode().id();
799 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
800 if (leader == null) {
801 log.error("Fail to elect a leader for {}.", policyId);
802 return false;
803 }
804 policyLeaderCache.put(policyId, leader);
805 return currentNodeId.equals(leader);
806 }
807
808 private Long hasher(PolicyId policyId) {
809 return HASH_FN.newHasher()
810 .putUnencodedChars(policyId.toString())
811 .hash()
812 .asLong();
813 }
814
815 // Check periodically for any issue and try to resolve automatically if possible
816 private final class PolicyChecker implements Runnable {
817 @Override
818 public void run() {
819 }
820 }
821}