blob: 089612b6f77e0b04c474bb21a6ed4e86a14a5a3c [file] [log] [blame]
pierventre30368ab2021-02-24 23:23:22 +01001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.policy.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Lists;
20import com.google.common.collect.Maps;
21import com.google.common.hash.HashFunction;
22import com.google.common.hash.Hashing;
Wailok Shume2ab3d62021-05-27 04:40:38 +080023
pierventre30368ab2021-02-24 23:23:22 +010024import org.glassfish.jersey.internal.guava.Sets;
pierventre4f68ffa2021-03-09 22:52:14 +010025import org.onlab.packet.MacAddress;
pierventre30368ab2021-02-24 23:23:22 +010026import org.onlab.util.KryoNamespace;
27import org.onlab.util.PredictableExecutor;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.NodeId;
Wailok Shumee90c132021-03-11 21:00:11 +080030import org.onosproject.codec.CodecService;
pierventre30368ab2021-02-24 23:23:22 +010031import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
pierventre4f68ffa2021-03-09 22:52:14 +010033import org.onosproject.net.ConnectPoint;
Wailok Shume2ab3d62021-05-27 04:40:38 +080034import org.onosproject.net.config.NetworkConfigEvent;
35import org.onosproject.net.config.NetworkConfigListener;
36import org.onosproject.net.config.NetworkConfigRegistry;
37import org.onosproject.net.device.DeviceService;
pierventre30368ab2021-02-24 23:23:22 +010038import org.onosproject.net.DeviceId;
pierventre4f68ffa2021-03-09 22:52:14 +010039import org.onosproject.net.Link;
40import org.onosproject.net.flow.DefaultTrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010041import org.onosproject.net.flow.DefaultTrafficTreatment;
pierventre4f68ffa2021-03-09 22:52:14 +010042import org.onosproject.net.flow.TrafficSelector;
pierventre30368ab2021-02-24 23:23:22 +010043import org.onosproject.net.flow.TrafficTreatment;
44import org.onosproject.net.flowobjective.DefaultForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010045import org.onosproject.net.flowobjective.DefaultNextObjective;
pierventre30368ab2021-02-24 23:23:22 +010046import org.onosproject.net.flowobjective.DefaultObjectiveContext;
47import org.onosproject.net.flowobjective.FlowObjectiveService;
48import org.onosproject.net.flowobjective.ForwardingObjective;
pierventre4f68ffa2021-03-09 22:52:14 +010049import org.onosproject.net.flowobjective.NextObjective;
pierventre30368ab2021-02-24 23:23:22 +010050import org.onosproject.net.flowobjective.Objective;
51import org.onosproject.net.flowobjective.ObjectiveContext;
52import org.onosproject.net.intent.WorkPartitionService;
pierventre4f68ffa2021-03-09 22:52:14 +010053import org.onosproject.net.link.LinkService;
pierventre30368ab2021-02-24 23:23:22 +010054import org.onosproject.segmentrouting.policy.api.DropPolicy;
55import org.onosproject.segmentrouting.policy.api.Policy;
pierventre30368ab2021-02-24 23:23:22 +010056import org.onosproject.segmentrouting.policy.api.PolicyData;
57import org.onosproject.segmentrouting.policy.api.PolicyId;
58import org.onosproject.segmentrouting.policy.api.PolicyService;
59import org.onosproject.segmentrouting.policy.api.PolicyState;
pierventre4f68ffa2021-03-09 22:52:14 +010060import org.onosproject.segmentrouting.policy.api.RedirectPolicy;
pierventre30368ab2021-02-24 23:23:22 +010061import org.onosproject.segmentrouting.policy.api.TrafficMatch;
62import org.onosproject.segmentrouting.policy.api.TrafficMatchData;
63import org.onosproject.segmentrouting.policy.api.TrafficMatchId;
Wailok Shum37dd29a2021-04-27 18:13:55 +080064import org.onosproject.segmentrouting.policy.api.TrafficMatchPriority;
pierventre30368ab2021-02-24 23:23:22 +010065import org.onosproject.segmentrouting.policy.api.TrafficMatchState;
pierventreb37a11a2021-03-18 16:50:04 +010066import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
Wailok Shume2ab3d62021-05-27 04:40:38 +080067import org.onosproject.segmentrouting.config.SegmentRoutingDeviceConfig;
pierventreb37a11a2021-03-18 16:50:04 +010068import org.onosproject.segmentrouting.policy.api.Policy.PolicyType;
pierventre30368ab2021-02-24 23:23:22 +010069import org.onosproject.store.serializers.KryoNamespaces;
70import org.onosproject.store.service.ConsistentMap;
71import org.onosproject.store.service.MapEvent;
72import org.onosproject.store.service.MapEventListener;
73import org.onosproject.store.service.Serializer;
74import org.onosproject.store.service.StorageException;
75import org.onosproject.store.service.StorageService;
76import org.onosproject.store.service.Versioned;
77import org.osgi.service.component.annotations.Activate;
78import org.osgi.service.component.annotations.Component;
79import org.osgi.service.component.annotations.Deactivate;
80import org.osgi.service.component.annotations.Reference;
81import org.osgi.service.component.annotations.ReferenceCardinality;
82import org.slf4j.Logger;
83
Wailok Shume2ab3d62021-05-27 04:40:38 +080084import java.util.ArrayList;
pierventre30368ab2021-02-24 23:23:22 +010085import java.util.List;
86import java.util.Map;
87import java.util.Optional;
88import java.util.Set;
89import java.util.concurrent.CompletableFuture;
Wailok Shume2ab3d62021-05-27 04:40:38 +080090import java.util.concurrent.ExecutorService;
91import java.util.concurrent.Executors;
pierventre30368ab2021-02-24 23:23:22 +010092import java.util.stream.Collectors;
93
94import static org.onlab.util.Tools.groupedThreads;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Implementation of the policy service interface.
99 */
100@Component(immediate = true, service = PolicyService.class)
101public class PolicyManager implements PolicyService {
102
103 // App related things
104 private static final String APP_NAME = "org.onosproject.segmentrouting.policy";
105 private ApplicationId appId;
106 private Logger log = getLogger(getClass());
107 static final String KEY_SEPARATOR = "|";
108
pierventre4f68ffa2021-03-09 22:52:14 +0100109 // Supported policies
pierventreb37a11a2021-03-18 16:50:04 +0100110 private static final Set<PolicyType> SUPPORTED_POLICIES = ImmutableSet.of(
pierventre4f68ffa2021-03-09 22:52:14 +0100111 PolicyType.DROP, PolicyType.REDIRECT);
112
pierventre12aaa052021-03-22 12:56:03 +0100113 // Driver should use this meta to match ig_port_type field in the ACL table
pierventre4f68ffa2021-03-09 22:52:14 +0100114 private static final long EDGE_PORT = 1;
pierventre4f68ffa2021-03-09 22:52:14 +0100115
pierventre30368ab2021-02-24 23:23:22 +0100116 // Policy/TrafficMatch store related objects. We use these consistent maps to keep track of the
117 // lifecycle of a policy/traffic match. These are decomposed in multiple operations which have
118 // to be performed on multiple devices in order to have a policy/traffic match in ADDED state.
pierventre4f68ffa2021-03-09 22:52:14 +0100119 // TODO Consider to add store and delegate
pierventre30368ab2021-02-24 23:23:22 +0100120 private static final String POLICY_STORE = "sr-policy-store";
121 private ConsistentMap<PolicyId, PolicyRequest> policies;
122 private MapEventListener<PolicyId, PolicyRequest> mapPolListener = new InternalPolMapEventListener();
123 private Map<PolicyId, PolicyRequest> policiesMap;
124
125 private static final String OPS_STORE = "sr-ops-store";
126 private ConsistentMap<String, Operation> operations;
127 private MapEventListener<String, Operation> mapOpsListener = new InternalOpsMapEventListener();
128 private Map<String, Operation> opsMap;
129
130 private static final String TRAFFIC_MATCH_STORE = "sr-tmatch-store";
131 private ConsistentMap<TrafficMatchId, TrafficMatchRequest> trafficMatches;
132 private MapEventListener<TrafficMatchId, TrafficMatchRequest> mapTMatchListener =
133 new InternalTMatchMapEventListener();
134 private Map<TrafficMatchId, TrafficMatchRequest> trafficMatchesMap;
135
136 // Leadership related objects - consistent hashing
137 private static final HashFunction HASH_FN = Hashing.md5();
138 // Read only cache of the Policy leader
139 private Map<PolicyId, NodeId> policyLeaderCache;
140
141 // Worker threads for policy and traffic match related ops
142 private static final int DEFAULT_THREADS = 4;
143 protected PredictableExecutor workers;
144
145 // Serializers and ONOS services
146 private static final KryoNamespace.Builder APP_KRYO_BUILDER = KryoNamespace.newBuilder()
147 .register(KryoNamespaces.API)
148 .register(PolicyId.class)
149 .register(PolicyType.class)
150 .register(DropPolicy.class)
pierventre4f68ffa2021-03-09 22:52:14 +0100151 .register(RedirectPolicy.class)
pierventre30368ab2021-02-24 23:23:22 +0100152 .register(PolicyState.class)
153 .register(PolicyRequest.class)
154 .register(TrafficMatchId.class)
155 .register(TrafficMatchState.class)
Wailok Shum37dd29a2021-04-27 18:13:55 +0800156 .register(TrafficMatchPriority.class)
pierventre30368ab2021-02-24 23:23:22 +0100157 .register(TrafficMatch.class)
158 .register(TrafficMatchRequest.class)
159 .register(Operation.class);
160 private Serializer serializer = Serializer.using(Lists.newArrayList(APP_KRYO_BUILDER.build()));
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 private CoreService coreService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Wailok Shumee90c132021-03-11 21:00:11 +0800166 private CodecService codecService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre30368ab2021-02-24 23:23:22 +0100169 private StorageService storageService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
172 private ClusterService clusterService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100175 private WorkPartitionService workPartitionService;
pierventre30368ab2021-02-24 23:23:22 +0100176
Wailok Shumee90c132021-03-11 21:00:11 +0800177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
pierventre4f68ffa2021-03-09 22:52:14 +0100178 private FlowObjectiveService flowObjectiveService;
179
180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
181 private LinkService linkService;
pierventre30368ab2021-02-24 23:23:22 +0100182
Wailok Shume2ab3d62021-05-27 04:40:38 +0800183 @Reference(cardinality = ReferenceCardinality.MANDATORY)
184 private NetworkConfigRegistry cfgService;
185
186 @Reference(cardinality = ReferenceCardinality.MANDATORY)
187 private DeviceService deviceService;
188
189 // Netcfg listener
190 private final InternalConfigListener cfgListener = new InternalConfigListener();
191 // EventExecutor for netcfg event
192 private ExecutorService eventExecutor;
193
pierventre30368ab2021-02-24 23:23:22 +0100194 @Activate
195 public void activate() {
196 appId = coreService.registerApplication(APP_NAME);
197
Wailok Shumee90c132021-03-11 21:00:11 +0800198 codecService.registerCodec(DropPolicy.class, new DropPolicyCodec());
199 codecService.registerCodec(RedirectPolicy.class, new RedirectPolicyCodec());
200 codecService.registerCodec(TrafficMatch.class, new TrafficMatchCodec());
201
Wailok Shume2ab3d62021-05-27 04:40:38 +0800202 cfgService.addListener(cfgListener);
203
pierventre30368ab2021-02-24 23:23:22 +0100204 policies = storageService.<PolicyId, PolicyRequest>consistentMapBuilder()
205 .withName(POLICY_STORE)
206 .withSerializer(serializer).build();
207 policies.addListener(mapPolListener);
208 policiesMap = policies.asJavaMap();
209
210 trafficMatches = storageService.<TrafficMatchId, TrafficMatchRequest>consistentMapBuilder()
211 .withName(TRAFFIC_MATCH_STORE)
212 .withSerializer(serializer).build();
213 trafficMatches.addListener(mapTMatchListener);
214 trafficMatchesMap = trafficMatches.asJavaMap();
215
216 operations = storageService.<String, Operation>consistentMapBuilder()
217 .withName(OPS_STORE)
218 .withSerializer(serializer).build();
219 operations.addListener(mapOpsListener);
220 opsMap = operations.asJavaMap();
221
222 policyLeaderCache = Maps.newConcurrentMap();
223
224 workers = new PredictableExecutor(DEFAULT_THREADS,
225 groupedThreads("sr-policy", "worker-%d", log));
226
Wailok Shume2ab3d62021-05-27 04:40:38 +0800227 eventExecutor = Executors.newSingleThreadExecutor();
228
pierventre30368ab2021-02-24 23:23:22 +0100229 log.info("Started");
230 }
231
232 @Deactivate
233 public void deactivate() {
234 // Teardown everything
Wailok Shumee90c132021-03-11 21:00:11 +0800235 codecService.unregisterCodec(DropPolicy.class);
236 codecService.unregisterCodec(RedirectPolicy.class);
237 codecService.unregisterCodec(TrafficMatch.class);
Wailok Shume2ab3d62021-05-27 04:40:38 +0800238 cfgService.removeListener(cfgListener);
pierventre30368ab2021-02-24 23:23:22 +0100239 policies.removeListener(mapPolListener);
240 policies.destroy();
241 policiesMap.clear();
242 trafficMatches.removeListener(mapTMatchListener);
243 trafficMatches.destroy();
244 trafficMatchesMap.clear();
245 operations.removeListener(mapOpsListener);
246 operations.destroy();
247 operations.clear();
248 workers.shutdown();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800249 eventExecutor.shutdown();
pierventre30368ab2021-02-24 23:23:22 +0100250
251 log.info("Stopped");
252 }
253
254 @Override
255 //FIXME update does not work well
256 public PolicyId addOrUpdatePolicy(Policy policy) {
257 PolicyId policyId = policy.policyId();
258 try {
259 policies.put(policyId, new PolicyRequest(policy));
260 } catch (StorageException e) {
261 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
262 e.getMessage(), e);
263 policyId = null;
264 }
265 return policyId;
266 }
267
268 @Override
269 public boolean removePolicy(PolicyId policyId) {
270 boolean result;
pierventre4f68ffa2021-03-09 22:52:14 +0100271 if (dependingTrafficMatches(policyId).isPresent()) {
272 if (log.isDebugEnabled()) {
273 log.debug("Found depending traffic matches");
274 }
275 return false;
276 }
pierventre30368ab2021-02-24 23:23:22 +0100277 try {
278 result = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
279 if (v.policyState() != PolicyState.PENDING_REMOVE) {
280 v.policyState(PolicyState.PENDING_REMOVE);
281 }
282 return v;
283 })) != null;
284 } catch (StorageException e) {
285 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
286 e.getMessage(), e);
287 result = false;
288 }
289 return result;
290 }
291
292 @Override
293 public Set<PolicyData> policies(Set<PolicyType> filter) {
294 Set<PolicyData> policyData = Sets.newHashSet();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800295 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100296 Set<PolicyRequest> policyRequests;
297 if (filter.isEmpty()) {
298 policyRequests = ImmutableSet.copyOf(policiesMap.values());
299 } else {
300 policyRequests = policiesMap.values().stream()
301 .filter(policyRequest -> filter.contains(policyRequest.policyType()))
302 .collect(Collectors.toSet());
303 }
304 PolicyKey policyKey;
305 List<String> ops;
306 for (PolicyRequest policyRequest : policyRequests) {
307 ops = Lists.newArrayList();
308 for (DeviceId deviceId : edgeDeviceIds) {
309 policyKey = new PolicyKey(deviceId, policyRequest.policyId());
310 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
311 if (operation != null) {
312 ops.add(deviceId + " -> " + operation.toStringMinimal());
313 }
314 }
315 policyData.add(new PolicyData(policyRequest.policyState(), policyRequest.policy(), ops));
316 }
317 return policyData;
318 }
319
320 @Override
321 //FIXME update does not work well
322 public TrafficMatchId addOrUpdateTrafficMatch(TrafficMatch trafficMatch) {
323 TrafficMatchId trafficMatchId = trafficMatch.trafficMatchId();
324 try {
325 trafficMatches.put(trafficMatchId, new TrafficMatchRequest(trafficMatch));
326 } catch (StorageException e) {
327 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
328 e.getMessage(), e);
329 trafficMatchId = null;
330 }
331 return trafficMatchId;
332 }
333
334 @Override
335 public boolean removeTrafficMatch(TrafficMatchId trafficMatchId) {
336 boolean result;
337 try {
338 result = Versioned.valueOrNull(trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
339 if (v.trafficMatchState() != TrafficMatchState.PENDING_REMOVE) {
340 v.trafficMatchState(TrafficMatchState.PENDING_REMOVE);
341 }
342 return v;
343 })) != null;
344 } catch (StorageException e) {
345 log.error("{} thrown a storage exception: {}", e.getStackTrace()[0].getMethodName(),
346 e.getMessage(), e);
347 result = false;
348 }
349 return result;
350 }
351
352 @Override
353 public Set<TrafficMatchData> trafficMatches() {
354 Set<TrafficMatchData> trafficMatchData = Sets.newHashSet();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800355 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100356 Set<TrafficMatchRequest> trafficMatchRequests = ImmutableSet.copyOf(trafficMatchesMap.values());
357 TrafficMatchKey trafficMatchKey;
358 List<String> ops;
359 for (TrafficMatchRequest trafficMatchRequest : trafficMatchRequests) {
360 ops = Lists.newArrayList();
361 for (DeviceId deviceId : edgeDeviceIds) {
362 trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatchRequest.trafficMatch().trafficMatchId());
363 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
364 if (operation != null) {
365 ops.add(deviceId + " -> " + operation.toStringMinimal());
366 }
367 }
368 trafficMatchData.add(new TrafficMatchData(trafficMatchRequest.trafficMatchState(),
369 trafficMatchRequest.trafficMatch(), ops));
370 }
371 return trafficMatchData;
372 }
373
374 // Install/remove the policies on the edge devices
375 private void sendPolicy(Policy policy, boolean install) {
376 if (!isLeader(policy.policyId())) {
377 if (log.isDebugEnabled()) {
378 log.debug("Instance is not leader for policy {}", policy.policyId());
379 }
380 return;
381 }
382 // We know that we are the leader, offloads to the workers the remaining
383 // part: issue fobj installation/removal and update the maps
Wailok Shume2ab3d62021-05-27 04:40:38 +0800384 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100385 for (DeviceId deviceId : edgeDeviceIds) {
386 workers.execute(() -> {
387 if (install) {
388 installPolicyInDevice(deviceId, policy);
389 } else {
390 removePolicyInDevice(deviceId, policy);
391 }
392 }, deviceId.hashCode());
393 }
394 }
395
396 // Orchestrate policy installation according to the type
397 private void installPolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre4f68ffa2021-03-09 22:52:14 +0100398 if (!SUPPORTED_POLICIES.contains(policy.policyType())) {
pierventre30368ab2021-02-24 23:23:22 +0100399 log.warn("Policy {} type {} not yet supported",
400 policy.policyId(), policy.policyType());
pierventre4f68ffa2021-03-09 22:52:14 +0100401 return;
402 }
403 PolicyKey policyKey;
404 Operation.Builder operation;
405 if (log.isDebugEnabled()) {
406 log.debug("Installing {} policy {} for dev: {}",
407 policy.policyType(), policy.policyId(), deviceId);
408 }
409 policyKey = new PolicyKey(deviceId, policy.policyId());
Wailok Shume2ab3d62021-05-27 04:40:38 +0800410 // Prevent duplicate installation of the policies. With the current
411 // implementation is not possible to update a policy since a change
412 // in the params will create a new policy. Thus, there is no need to
413 // check the equality like we do for the TM
414 Operation oldPolicyOp = Versioned.valueOrNull(operations.get(policyKey.toString()));
415 if (oldPolicyOp != null && oldPolicyOp.isInstall()) {
416 if (log.isDebugEnabled()) {
417 log.debug("There is already an install operation for policy {}", policy.policyId());
418 }
419 return;
420 }
pierventre4f68ffa2021-03-09 22:52:14 +0100421 operation = Operation.builder()
422 .isInstall(true)
423 .policy(policy);
424 // TODO To better handle different policy types consider the abstraction of a compiler (subtypes ?)
425 if (policy.policyType() == PolicyType.DROP) {
426 // DROP policies do not need the next objective installation phase
427 // we can update directly the map and signal the ops as done
428 operation.isDone(true);
Wailok Shum53031b32021-05-22 00:37:51 +0800429 Operation preOp = Versioned.valueOrNull(operations.put(policyKey.toString(), operation.build()));
430 if (preOp != null && preOp.equals(operation.build())) {
431 updatePolicy(policy.policyId(), true);
432 }
pierventre4f68ffa2021-03-09 22:52:14 +0100433 } else if (policy.policyType() == PolicyType.REDIRECT) {
434 // REDIRECT Uses next objective context to update the ops as done when
435 // it returns successfully. In the other cases leaves the ops as undone
436 // and the relative policy will remain in pending.
437 operations.put(policyKey.toString(), operation.build());
438 NextObjective.Builder builder = redirectPolicyNextObjective(deviceId, (RedirectPolicy) policy);
439 // Handle error here - leave the operation as undone and pending
440 if (builder != null) {
441 CompletableFuture<Objective> future = new CompletableFuture<>();
442 if (log.isDebugEnabled()) {
443 log.debug("Installing REDIRECT next objective for dev: {}", deviceId);
444 }
445 ObjectiveContext context = new DefaultObjectiveContext(
446 (objective) -> {
447 if (log.isDebugEnabled()) {
448 log.debug("REDIRECT next objective for policy {} installed in dev: {}",
449 policy.policyId(), deviceId);
450 }
451 future.complete(objective);
452 },
453 (objective, error) -> {
454 log.warn("Failed to install REDIRECT next objective for policy {}: {} in dev: {}",
455 policy.policyId(), error, deviceId);
456 future.complete(null);
457 });
458 // Context is not serializable
459 NextObjective serializableObjective = builder.add();
460 flowObjectiveService.next(deviceId, builder.add(context));
461 future.whenComplete((objective, ex) -> {
462 if (ex != null) {
463 log.error("Exception installing REDIRECT next objective", ex);
464 } else if (objective != null) {
465 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
466 if (!v.isDone() && v.isInstall()) {
467 v.isDone(true);
468 v.objectiveOperation(serializableObjective);
469 }
470 return v;
471 });
472 }
473 });
474 }
pierventre30368ab2021-02-24 23:23:22 +0100475 }
476 }
477
478 // Remove policy in a device according to the type
479 private void removePolicyInDevice(DeviceId deviceId, Policy policy) {
pierventre30368ab2021-02-24 23:23:22 +0100480 PolicyKey policyKey = new PolicyKey(deviceId, policy.policyId());
481 Operation operation = Versioned.valueOrNull(operations.get(policyKey.toString()));
482 // Policy might be still in pending or not present anymore
483 if (operation == null || operation.objectiveOperation() == null) {
484 log.warn("There are no ops associated with {}", policyKey);
485 operation = Operation.builder()
486 .isDone(true)
487 .isInstall(false)
488 .policy(policy)
489 .build();
490 operations.put(policyKey.toString(), operation);
491 } else {
pierventre4f68ffa2021-03-09 22:52:14 +0100492 if (log.isDebugEnabled()) {
493 log.debug("Removing {} policy {} in device {}", policy.policyType(), policy.policyId(), deviceId);
494 }
495 Operation.Builder operationBuilder = Operation.builder()
496 .isInstall(false)
497 .policy(policy);
pierventre30368ab2021-02-24 23:23:22 +0100498 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100499 operationBuilder.isDone(true);
500 operations.put(policyKey.toString(), operationBuilder.build());
pierventre30368ab2021-02-24 23:23:22 +0100501 } else if (policy.policyType() == PolicyType.REDIRECT) {
pierventre4f68ffa2021-03-09 22:52:14 +0100502 // REDIRECT has to remove the next objective first
503 NextObjective oldObj = (NextObjective) operation.objectiveOperation();
504 operations.put(policyKey.toString(), operationBuilder.build());
505 NextObjective.Builder builder = oldObj.copy();
506 CompletableFuture<Objective> future = new CompletableFuture<>();
pierventre30368ab2021-02-24 23:23:22 +0100507 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100508 log.debug("Removing REDIRECT next objective for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100509 }
pierventre4f68ffa2021-03-09 22:52:14 +0100510 ObjectiveContext context = new DefaultObjectiveContext(
511 (objective) -> {
512 if (log.isDebugEnabled()) {
513 log.debug("REDIRECT next objective for policy {} removed in dev: {}",
514 policy.policyId(), deviceId);
515 }
516 future.complete(objective);
517 },
518 (objective, error) -> {
519 log.warn("Failed to remove REDIRECT next objective for policy {}: {} in dev: {}",
520 policy.policyId(), error, deviceId);
521 future.complete(null);
522 });
523 NextObjective serializableObjective = builder.remove();
524 flowObjectiveService.next(deviceId, builder.remove(context));
525 future.whenComplete((objective, ex) -> {
526 if (ex != null) {
527 log.error("Exception Removing REDIRECT next objective", ex);
528 } else if (objective != null) {
529 operations.computeIfPresent(policyKey.toString(), (k, v) -> {
530 if (!v.isDone() && !v.isInstall()) {
531 v.isDone(true);
532 v.objectiveOperation(serializableObjective);
533 }
534 return v;
535 });
536 }
537 });
pierventre30368ab2021-02-24 23:23:22 +0100538 }
539 }
540 }
541
542 // Updates policy status if all the pending ops are done
543 private void updatePolicy(PolicyId policyId, boolean install) {
544 if (!isLeader(policyId)) {
545 if (log.isDebugEnabled()) {
546 log.debug("Instance is not leader for policy {}", policyId);
547 }
548 return;
549 }
550 workers.execute(() -> updatePolicyInternal(policyId, install), policyId.hashCode());
551 }
552
553 private void updatePolicyInternal(PolicyId policyId, boolean install) {
554 // If there are no more pending ops we are ready to go; potentially we can check
555 // if the id is contained. Updates policies only if they are still present
556 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
557 .filter(entry -> entry.getValue().value().policy().isPresent())
558 .filter(entry -> PolicyKey.fromString(entry.getKey()).policyId().equals(policyId))
559 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
560 .findFirst();
561 if (notYetDone.isEmpty()) {
562 PolicyRequest policyRequest = Versioned.valueOrNull(policies.computeIfPresent(policyId, (k, v) -> {
563 if (v.policyState() == PolicyState.PENDING_ADD && install) {
564 if (log.isDebugEnabled()) {
565 log.debug("Policy {} is ready", policyId);
566 }
567 v.policyState(PolicyState.ADDED);
568 } else if (v.policyState() == PolicyState.PENDING_REMOVE && !install) {
569 if (log.isDebugEnabled()) {
570 log.debug("Policy {} is removed", policyId);
571 }
572 v = null;
573 }
574 return v;
575 }));
576 // Greedy check for pending traffic matches
577 if (policyRequest != null && policyRequest.policyState() == PolicyState.ADDED) {
578 updatePendingTrafficMatches(policyRequest.policyId());
579 }
580 }
581 }
582
583 // Install/remove the traffic match on the edge devices
584 private void sendTrafficMatch(TrafficMatch trafficMatch, boolean install) {
585 if (!isLeader(trafficMatch.policyId())) {
586 if (log.isDebugEnabled()) {
587 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
588 }
589 return;
590 }
591 // We know that we are the leader, offloads to the workers the remaining
592 // part: issue fobj installation/removal and update the maps
Wailok Shume2ab3d62021-05-27 04:40:38 +0800593 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100594 for (DeviceId deviceId : edgeDeviceIds) {
595 workers.execute(() -> {
596 if (install) {
597 installTrafficMatchToDevice(deviceId, trafficMatch);
598 } else {
599 removeTrafficMatchInDevice(deviceId, trafficMatch);
600 }
601 }, deviceId.hashCode());
602 }
603 }
604
605 // Orchestrate traffic match installation according to the type
606 private void installTrafficMatchToDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
607 if (log.isDebugEnabled()) {
608 log.debug("Installing traffic match {} associated to policy {}",
609 trafficMatch.trafficMatchId(), trafficMatch.policyId());
610 }
pierventre30368ab2021-02-24 23:23:22 +0100611 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
Wailok Shum6f71cd92021-05-06 20:05:29 +0800612 Operation oldTrafficOperation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
613 if (oldTrafficOperation != null && oldTrafficOperation.isInstall()) {
614 if (trafficMatch.equals(oldTrafficOperation.trafficMatch().orElse(null))) {
615 if (log.isDebugEnabled()) {
616 log.debug("There is already an install operation for traffic match {} associated to policy {} " +
617 "for device {}",
618 trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
619 }
620 return;
621 } else {
622 if (log.isDebugEnabled()) {
623 log.debug("Starts updating traffic match {} associated to policy {} " +
624 "for device {}",
625 trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
626 }
pierventre4f68ffa2021-03-09 22:52:14 +0100627 }
pierventre4f68ffa2021-03-09 22:52:14 +0100628 }
pierventre30368ab2021-02-24 23:23:22 +0100629 // For the DROP policy we need to set an ACL drop in the fwd objective. The other
630 // policies require to retrieve the next Id and sets the next step.
631 PolicyKey policyKey = new PolicyKey(deviceId, trafficMatch.policyId());
632 Operation policyOperation = Versioned.valueOrNull(operations.get(policyKey.toString()));
633 if (policyOperation == null || !policyOperation.isDone() ||
pierventre4f68ffa2021-03-09 22:52:14 +0100634 !policyOperation.isInstall() || policyOperation.policy().isEmpty() ||
635 (policyOperation.policy().get().policyType() == PolicyType.REDIRECT &&
636 policyOperation.objectiveOperation() == null)) {
pierventre30368ab2021-02-24 23:23:22 +0100637 log.info("Deferring traffic match {} installation on device {}. Policy {} not yet installed",
638 trafficMatch.trafficMatchId(), deviceId, trafficMatch.policyId());
639 return;
640 }
pierventre4f68ffa2021-03-09 22:52:14 +0100641 // Updates the store and then send the versatile fwd objective to the pipeliner
Wailok Shum6f71cd92021-05-06 20:05:29 +0800642 Operation newTrafficOperation = Operation.builder()
pierventre4f68ffa2021-03-09 22:52:14 +0100643 .isInstall(true)
644 .trafficMatch(trafficMatch)
645 .build();
Wailok Shum6f71cd92021-05-06 20:05:29 +0800646 operations.put(trafficMatchKey.toString(), newTrafficOperation);
pierventre30368ab2021-02-24 23:23:22 +0100647 Policy policy = policyOperation.policy().get();
pierventre4f68ffa2021-03-09 22:52:14 +0100648 ForwardingObjective.Builder builder = trafficMatchFwdObjective(trafficMatch, policy.policyType());
pierventre30368ab2021-02-24 23:23:22 +0100649 if (policy.policyType() == PolicyType.DROP) {
pierventre4f68ffa2021-03-09 22:52:14 +0100650 // Firstly builds the fwd objective with the wipeDeferred action.
pierventre30368ab2021-02-24 23:23:22 +0100651 TrafficTreatment dropTreatment = DefaultTrafficTreatment.builder()
652 .wipeDeferred()
653 .build();
654 builder.withTreatment(dropTreatment);
pierventre4f68ffa2021-03-09 22:52:14 +0100655 } else if (policy.policyType() == PolicyType.REDIRECT) {
656
657 // Here we need to set only the next step
658 builder.nextStep(policyOperation.objectiveOperation().id());
pierventre30368ab2021-02-24 23:23:22 +0100659 }
pierventre4f68ffa2021-03-09 22:52:14 +0100660 // Once, the fwd objective has completed its execution, we update the policiesOps map
Wailok Shum6f71cd92021-05-06 20:05:29 +0800661 CompletableFuture<Objective> addNewFuture = new CompletableFuture<>();
662 CompletableFuture<Objective> removeOldFuture = new CompletableFuture<>();
pierventre4f68ffa2021-03-09 22:52:14 +0100663 if (log.isDebugEnabled()) {
664 log.debug("Installing forwarding objective for dev: {}", deviceId);
665 }
Wailok Shum6f71cd92021-05-06 20:05:29 +0800666 ObjectiveContext addNewContext = new DefaultObjectiveContext(
pierventre4f68ffa2021-03-09 22:52:14 +0100667 (objective) -> {
668 if (log.isDebugEnabled()) {
669 log.debug("Forwarding objective for policy {} installed", trafficMatch.policyId());
670 }
Wailok Shum6f71cd92021-05-06 20:05:29 +0800671 addNewFuture.complete(objective);
pierventre4f68ffa2021-03-09 22:52:14 +0100672 },
673 (objective, error) -> {
674 log.warn("Failed to install forwarding objective for policy {}: {}",
675 trafficMatch.policyId(), error);
Wailok Shum6f71cd92021-05-06 20:05:29 +0800676 addNewFuture.complete(null);
pierventre4f68ffa2021-03-09 22:52:14 +0100677 });
Wailok Shum6f71cd92021-05-06 20:05:29 +0800678 ObjectiveContext removeOldContext = new DefaultObjectiveContext(
679 (objective) -> {
680 if (log.isDebugEnabled()) {
681 log.debug("Old forwarding objective for policy {} removed, update finished",
682 trafficMatch.policyId());
683 }
684 removeOldFuture.complete(objective);
685 },
686 (objective, error) -> {
687 log.warn("Failed to remove old forwarding objective for policy {}: {}",
688 trafficMatch.policyId(), error);
689 removeOldFuture.complete(null);
690 });
pierventre4f68ffa2021-03-09 22:52:14 +0100691 // Context is not serializable
692 ForwardingObjective serializableObjective = builder.add();
Wailok Shum6f71cd92021-05-06 20:05:29 +0800693 flowObjectiveService.forward(deviceId, builder.add(addNewContext));
694 addNewFuture.whenComplete((objective, ex) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100695 if (ex != null) {
696 log.error("Exception installing forwarding objective", ex);
697 } else if (objective != null) {
Wailok Shum6f71cd92021-05-06 20:05:29 +0800698 // Remove existing flow with previous priority after new flow is installed
699 if (oldTrafficOperation != null && oldTrafficOperation.objectiveOperation() != null
700 && oldTrafficOperation.isInstall()
701 && oldTrafficOperation.objectiveOperation().priority() != serializableObjective.priority()) {
702 ForwardingObjective oldFwdObj = (ForwardingObjective) oldTrafficOperation.objectiveOperation();
703 ForwardingObjective.Builder oldBuilder = DefaultForwardingObjective.builder(oldFwdObj);
704 flowObjectiveService.forward(deviceId, oldBuilder.remove(removeOldContext));
705 } else {
706 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
707 if (!v.isDone() && v.isInstall()) {
708 v.isDone(true);
709 v.objectiveOperation(serializableObjective);
710 }
711 return v;
712 });
713 }
714 }
715 });
716 removeOldFuture.whenComplete((objective, ex) -> {
717 if (ex != null) {
718 log.error("Exception removing old forwarding objective", ex);
719 } else if (objective != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100720 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
721 if (!v.isDone() && v.isInstall()) {
722 v.isDone(true);
723 v.objectiveOperation(serializableObjective);
724 }
725 return v;
726 });
727 }
728 });
pierventre30368ab2021-02-24 23:23:22 +0100729 }
730
731 // Updates traffic match status if all the pending ops are done
732 private void updateTrafficMatch(TrafficMatch trafficMatch, boolean install) {
733 if (!isLeader(trafficMatch.policyId())) {
734 if (log.isDebugEnabled()) {
735 log.debug("Instance is not leader for policy {}", trafficMatch.policyId());
736 }
737 return;
738 }
739 workers.execute(() -> updateTrafficMatchInternal(trafficMatch.trafficMatchId(), install),
740 trafficMatch.policyId().hashCode());
741 }
742
743 private void updateTrafficMatchInternal(TrafficMatchId trafficMatchId, boolean install) {
744 // If there are no more pending ops we are ready to go; potentially we can check
745 // if the id is contained. Updates traffic matches only if they are still present
746 Optional<Map.Entry<String, Versioned<Operation>>> notYetDone = operations.entrySet().stream()
747 .filter(entry -> entry.getValue().value().trafficMatch().isPresent())
748 .filter(entry -> TrafficMatchKey.fromString(entry.getKey()).trafficMatchId().equals(trafficMatchId))
749 .filter(entry -> !entry.getValue().value().isDone() && entry.getValue().value().isInstall() == install)
750 .findFirst();
751 if (notYetDone.isEmpty()) {
752 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
753 if (v.trafficMatchState() == TrafficMatchState.PENDING_ADD && install) {
754 if (log.isDebugEnabled()) {
755 log.debug("Traffic match {} is ready", trafficMatchId);
756 }
757 v.trafficMatchState(TrafficMatchState.ADDED);
758 } else if (v.trafficMatchState() == TrafficMatchState.PENDING_REMOVE && !install) {
759 if (log.isDebugEnabled()) {
760 log.debug("Traffic match {} is removed", trafficMatchId);
761 }
762 v = null;
763 }
764 return v;
765 });
766 }
767 }
768
769 // Look for any pending traffic match waiting for the policy
770 private void updatePendingTrafficMatches(PolicyId policyId) {
771 Set<TrafficMatchRequest> pendingTrafficMatches = trafficMatches.stream()
772 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
773 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.PENDING_ADD)
774 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
775 .collect(Collectors.toSet());
776 for (TrafficMatchRequest trafficMatch : pendingTrafficMatches) {
777 sendTrafficMatch(trafficMatch.trafficMatch(), true);
778 }
779 }
780
781 // Traffic match removal in a device
782 private void removeTrafficMatchInDevice(DeviceId deviceId, TrafficMatch trafficMatch) {
783 if (log.isDebugEnabled()) {
784 log.debug("Removing traffic match {} associated to policy {}",
785 trafficMatch.trafficMatchId(), trafficMatch.policyId());
786 }
787 TrafficMatchKey trafficMatchKey = new TrafficMatchKey(deviceId, trafficMatch.trafficMatchId());
788 Operation operation = Versioned.valueOrNull(operations.get(trafficMatchKey.toString()));
789 if (operation == null || operation.objectiveOperation() == null) {
790 log.warn("There are no ops associated with {}", trafficMatchKey);
791 operation = Operation.builder()
792 .isDone(true)
793 .isInstall(false)
794 .trafficMatch(trafficMatch)
795 .build();
796 operations.put(trafficMatchKey.toString(), operation);
pierventre4f68ffa2021-03-09 22:52:14 +0100797 } else if (!operation.isInstall()) {
798 if (log.isDebugEnabled()) {
799 log.debug("There is already an uninstall operation for traffic match {} associated to policy {}" +
800 " for device {}", trafficMatch.trafficMatchId(), trafficMatch.policyId(), deviceId);
801 }
pierventre30368ab2021-02-24 23:23:22 +0100802 } else {
803 ForwardingObjective oldObj = (ForwardingObjective) operation.objectiveOperation();
804 operation = Operation.builder(operation)
pierventre30368ab2021-02-24 23:23:22 +0100805 .isInstall(false)
806 .build();
807 operations.put(trafficMatchKey.toString(), operation);
808 ForwardingObjective.Builder builder = DefaultForwardingObjective.builder(oldObj);
809 CompletableFuture<Objective> future = new CompletableFuture<>();
810 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100811 log.debug("Removing forwarding objectives for dev: {}", deviceId);
pierventre30368ab2021-02-24 23:23:22 +0100812 }
813 ObjectiveContext context = new DefaultObjectiveContext(
814 (objective) -> {
815 if (log.isDebugEnabled()) {
pierventre4f68ffa2021-03-09 22:52:14 +0100816 log.debug("Forwarding objective for policy {} removed", trafficMatch.policyId());
pierventre30368ab2021-02-24 23:23:22 +0100817 }
818 future.complete(objective);
819 },
820 (objective, error) -> {
pierventre4f68ffa2021-03-09 22:52:14 +0100821 log.warn("Failed to remove forwarding objective for policy {}: {}",
822 trafficMatch.policyId(), error);
pierventre30368ab2021-02-24 23:23:22 +0100823 future.complete(null);
824 });
825 ForwardingObjective serializableObjective = builder.remove();
826 flowObjectiveService.forward(deviceId, builder.remove(context));
827 future.whenComplete((objective, ex) -> {
828 if (ex != null) {
pierventre4f68ffa2021-03-09 22:52:14 +0100829 log.error("Exception removing forwarding objective", ex);
pierventre30368ab2021-02-24 23:23:22 +0100830 } else if (objective != null) {
831 operations.computeIfPresent(trafficMatchKey.toString(), (k, v) -> {
832 if (!v.isDone() && !v.isInstall()) {
833 v.isDone(true);
834 v.objectiveOperation(serializableObjective);
835 }
836 return v;
837 });
838 }
839 });
840 }
841 }
842
pierventre4f68ffa2021-03-09 22:52:14 +0100843 // It is used when a policy has been removed but there are still traffic matches depending on it
844 private Optional<TrafficMatchRequest> dependingTrafficMatches(PolicyId policyId) {
845 return trafficMatches.stream()
pierventre30368ab2021-02-24 23:23:22 +0100846 .filter(trafficMatchEntry -> trafficMatchEntry.getValue().value().policyId().equals(policyId) &&
847 trafficMatchEntry.getValue().value().trafficMatchState() == TrafficMatchState.ADDED)
848 .map(trafficMatchEntry -> trafficMatchEntry.getValue().value())
pierventre4f68ffa2021-03-09 22:52:14 +0100849 .findFirst();
pierventre30368ab2021-02-24 23:23:22 +0100850 }
851
852 // Utility that removes operations related to a policy or to a traffic match.
853 private void removeOperations(PolicyId policyId, Optional<TrafficMatchId> trafficMatchId) {
854 if (!isLeader(policyId)) {
855 if (log.isDebugEnabled()) {
856 log.debug("Instance is not leader for policy {}", policyId);
857 }
858 return;
859 }
Wailok Shume2ab3d62021-05-27 04:40:38 +0800860 List<DeviceId> edgeDeviceIds = getEdgeDeviceIds();
pierventre30368ab2021-02-24 23:23:22 +0100861 for (DeviceId deviceId : edgeDeviceIds) {
862 workers.execute(() -> {
863 String key;
864 if (trafficMatchId.isPresent()) {
865 key = new TrafficMatchKey(deviceId, trafficMatchId.get()).toString();
866 } else {
867 key = new PolicyKey(deviceId, policyId).toString();
868 }
869 operations.remove(key);
870 }, deviceId.hashCode());
871 }
872 }
873
pierventre4f68ffa2021-03-09 22:52:14 +0100874 private ForwardingObjective.Builder trafficMatchFwdObjective(TrafficMatch trafficMatch, PolicyType policyType) {
875 TrafficSelector.Builder metaBuilder = DefaultTrafficSelector.builder(trafficMatch.trafficSelector());
876 if (policyType == PolicyType.REDIRECT) {
877 metaBuilder.matchMetadata(EDGE_PORT);
878 }
pierventre30368ab2021-02-24 23:23:22 +0100879 return DefaultForwardingObjective.builder()
Wailok Shum37dd29a2021-04-27 18:13:55 +0800880 .withPriority(trafficMatch.trafficMatchPriority().priority())
pierventre30368ab2021-02-24 23:23:22 +0100881 .withSelector(trafficMatch.trafficSelector())
pierventre4f68ffa2021-03-09 22:52:14 +0100882 .withMeta(metaBuilder.build())
pierventre30368ab2021-02-24 23:23:22 +0100883 .fromApp(appId)
884 .withFlag(ForwardingObjective.Flag.VERSATILE)
885 .makePermanent();
886 }
887
pierventre4f68ffa2021-03-09 22:52:14 +0100888 private NextObjective.Builder redirectPolicyNextObjective(DeviceId srcDevice, RedirectPolicy redirectPolicy) {
889 Set<Link> egressLinks = linkService.getDeviceEgressLinks(srcDevice);
890 Map<ConnectPoint, DeviceId> egressPortsToEnforce = Maps.newHashMap();
Wailok Shume2ab3d62021-05-27 04:40:38 +0800891 List<DeviceId> edgeDevices = getEdgeDeviceIds();
pierventre4f68ffa2021-03-09 22:52:14 +0100892 egressLinks.stream()
893 .filter(link -> redirectPolicy.spinesToEnforce().contains(link.dst().deviceId()) &&
894 !edgeDevices.contains(link.dst().deviceId()))
895 .forEach(link -> egressPortsToEnforce.put(link.src(), link.dst().deviceId()));
896 // No ports no friend
897 if (egressPortsToEnforce.isEmpty()) {
898 log.warn("There are no port available for the REDIRECT policy {}", redirectPolicy.policyId());
899 return null;
900 }
901 // We need to add a treatment for each valid egress port. The treatment
902 // requires to set src and dst mac address and set the egress port. We are
903 // deliberately not providing the metadata to prevent the programming of
904 // some tables which are already controlled by SegmentRouting or are unnecessary
905 int nextId = flowObjectiveService.allocateNextId();
906 DefaultNextObjective.Builder builder = DefaultNextObjective.builder()
907 .withId(nextId)
908 .withType(NextObjective.Type.HASHED)
909 .fromApp(appId);
910 MacAddress srcDeviceMac;
911 try {
Wailok Shume2ab3d62021-05-27 04:40:38 +0800912 srcDeviceMac = getDeviceMacAddress(srcDevice);
pierventre4f68ffa2021-03-09 22:52:14 +0100913 } catch (DeviceConfigNotFoundException e) {
914 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
915 return null;
916 }
917 MacAddress neigborDeviceMac;
918 TrafficTreatment.Builder tBuilder;
919 for (Map.Entry<ConnectPoint, DeviceId> entry : egressPortsToEnforce.entrySet()) {
920 try {
Wailok Shume2ab3d62021-05-27 04:40:38 +0800921 neigborDeviceMac = getDeviceMacAddress(entry.getValue());
pierventre4f68ffa2021-03-09 22:52:14 +0100922 } catch (DeviceConfigNotFoundException e) {
923 log.warn(e.getMessage() + " Aborting installation REDIRECT policy {}", redirectPolicy.policyId());
924 return null;
925 }
926 tBuilder = DefaultTrafficTreatment.builder()
927 .setEthSrc(srcDeviceMac)
928 .setEthDst(neigborDeviceMac)
929 .setOutput(entry.getKey().port());
930 builder.addTreatment(tBuilder.build());
931 }
932 return builder;
933 }
934
pierventre30368ab2021-02-24 23:23:22 +0100935 // Each map has an event listener enabling the events distribution across the cluster
936 private class InternalPolMapEventListener implements MapEventListener<PolicyId, PolicyRequest> {
937 @Override
938 public void event(MapEvent<PolicyId, PolicyRequest> event) {
939 Versioned<PolicyRequest> value = event.type() == MapEvent.Type.REMOVE ?
940 event.oldValue() : event.newValue();
941 PolicyRequest policyRequest = value.value();
942 Policy policy = policyRequest.policy();
943 switch (event.type()) {
944 case INSERT:
945 case UPDATE:
946 switch (policyRequest.policyState()) {
947 case PENDING_ADD:
948 sendPolicy(policy, true);
949 break;
950 case PENDING_REMOVE:
951 sendPolicy(policy, false);
952 break;
953 case ADDED:
954 break;
955 default:
956 log.warn("Unknown policy state type {}", policyRequest.policyState());
957 }
958 break;
959 case REMOVE:
960 removeOperations(policy.policyId(), Optional.empty());
pierventre30368ab2021-02-24 23:23:22 +0100961 break;
962 default:
963 log.warn("Unknown event type {}", event.type());
964
965 }
966 }
967 }
968
969 private class InternalTMatchMapEventListener implements MapEventListener<TrafficMatchId, TrafficMatchRequest> {
970 @Override
971 public void event(MapEvent<TrafficMatchId, TrafficMatchRequest> event) {
972 Versioned<TrafficMatchRequest> value = event.type() == MapEvent.Type.REMOVE ?
973 event.oldValue() : event.newValue();
974 TrafficMatchRequest trafficMatchRequest = value.value();
975 TrafficMatch trafficMatch = trafficMatchRequest.trafficMatch();
976 switch (event.type()) {
977 case INSERT:
978 case UPDATE:
979 switch (trafficMatchRequest.trafficMatchState()) {
980 case PENDING_ADD:
981 sendTrafficMatch(trafficMatch, true);
982 break;
983 case PENDING_REMOVE:
984 sendTrafficMatch(trafficMatch, false);
985 break;
986 case ADDED:
987 break;
988 default:
989 log.warn("Unknown traffic match state type {}", trafficMatchRequest.trafficMatchState());
990 }
991 break;
992 case REMOVE:
993 removeOperations(trafficMatch.policyId(), Optional.of(trafficMatch.trafficMatchId()));
994 break;
995 default:
996 log.warn("Unknown event type {}", event.type());
997 }
998 }
999 }
1000
1001 private class InternalOpsMapEventListener implements MapEventListener<String, Operation> {
1002 @Override
1003 public void event(MapEvent<String, Operation> event) {
1004 String key = event.key();
1005 Versioned<Operation> value = event.type() == MapEvent.Type.REMOVE ?
1006 event.oldValue() : event.newValue();
1007 Operation operation = value.value();
1008 switch (event.type()) {
1009 case INSERT:
1010 case UPDATE:
1011 if (operation.isDone()) {
1012 if (operation.policy().isPresent()) {
1013 PolicyKey policyKey = PolicyKey.fromString(key);
1014 updatePolicy(policyKey.policyId(), operation.isInstall());
1015 } else if (operation.trafficMatch().isPresent()) {
1016 updateTrafficMatch(operation.trafficMatch().get(), operation.isInstall());
1017 } else {
1018 log.warn("Unknown pending operation");
1019 }
1020 }
1021 break;
1022 case REMOVE:
1023 break;
1024 default:
1025 log.warn("Unknown event type {}", event.type());
1026 }
1027 }
1028 }
1029
1030 // Using the work partition service defines who is in charge of a given policy.
1031 private boolean isLeader(PolicyId policyId) {
1032 final NodeId currentNodeId = clusterService.getLocalNode().id();
1033 final NodeId leader = workPartitionService.getLeader(policyId, this::hasher);
1034 if (leader == null) {
1035 log.error("Fail to elect a leader for {}.", policyId);
1036 return false;
1037 }
1038 policyLeaderCache.put(policyId, leader);
1039 return currentNodeId.equals(leader);
1040 }
1041
1042 private Long hasher(PolicyId policyId) {
1043 return HASH_FN.newHasher()
1044 .putUnencodedChars(policyId.toString())
1045 .hash()
1046 .asLong();
1047 }
1048
pierventre4f68ffa2021-03-09 22:52:14 +01001049 // TODO Periodic checker, consider to add store and delegates.
1050
pierventre30368ab2021-02-24 23:23:22 +01001051 // Check periodically for any issue and try to resolve automatically if possible
1052 private final class PolicyChecker implements Runnable {
1053 @Override
1054 public void run() {
1055 }
1056 }
Wailok Shume2ab3d62021-05-27 04:40:38 +08001057
1058 // Netcfg event listener
1059 private class InternalConfigListener implements NetworkConfigListener {
1060 @Override
1061 public void event(NetworkConfigEvent event) {
1062 eventExecutor.execute(() -> {
1063 switch (event.type()) {
1064 case CONFIG_ADDED:
1065 case CONFIG_UPDATED:
1066 if (event.configClass() == SegmentRoutingDeviceConfig.class) {
1067 // Reset all policy state to PENDING_ADD
1068 ImmutableSet<PolicyId> policyIds = ImmutableSet.copyOf(policies.keySet());
1069 policyIds.forEach((policyId) -> {
1070 policies.computeIfPresent(policyId, (k, v) -> {
1071 v.policyState(PolicyState.PENDING_ADD);
1072 return v;
1073 });
1074 });
1075 // Reset all TrafficMatch state to PENDING_ADD
1076 ImmutableSet<TrafficMatchId> trafficMatchIds = ImmutableSet.copyOf(trafficMatches.keySet());
1077 trafficMatchIds.forEach((trafficMatchId) -> {
1078 trafficMatches.computeIfPresent(trafficMatchId, (k, v) -> {
1079 v.trafficMatchState(TrafficMatchState.PENDING_ADD);
1080 return v;
1081 });
1082 });
1083 }
1084 break;
1085 default:
1086 break;
1087 }
1088 });
1089 }
1090 }
1091
1092 private List<DeviceId> getEdgeDeviceIds() {
1093 List<DeviceId> edges = new ArrayList<>();
1094 deviceService.getDevices().forEach((device) -> {
1095 DeviceId deviceId = device.id();
1096 SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
1097 if (srDevCfg != null && srDevCfg.isEdgeRouter()) {
1098 edges.add(deviceId);
1099 }
1100 });
1101 return edges;
1102 }
1103
1104 private MacAddress getDeviceMacAddress(DeviceId deviceId) throws DeviceConfigNotFoundException {
1105 SegmentRoutingDeviceConfig srDevCfg = cfgService.getConfig(deviceId, SegmentRoutingDeviceConfig.class);
1106 if (srDevCfg != null) {
1107 return srDevCfg.routerMac();
1108 } else {
1109 throw new DeviceConfigNotFoundException("Config for device: " + deviceId.toString() + " not found");
1110 }
1111 }
pierventre30368ab2021-02-24 23:23:22 +01001112}