blob: 35775364ca582312d9d798e46b9b3c061cc24d45 [file] [log] [blame]
Charles Chanc7b3c452018-06-19 20:31:57 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.xconnect.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.codec.CodecService;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.config.NetworkConfigService;
37import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.flow.DefaultTrafficSelector;
41import org.onosproject.net.flow.DefaultTrafficTreatment;
42import org.onosproject.net.flow.TrafficSelector;
43import org.onosproject.net.flow.TrafficTreatment;
44import org.onosproject.net.flow.criteria.Criteria;
45import org.onosproject.net.flowobjective.DefaultFilteringObjective;
46import org.onosproject.net.flowobjective.DefaultForwardingObjective;
47import org.onosproject.net.flowobjective.DefaultNextObjective;
48import org.onosproject.net.flowobjective.DefaultObjectiveContext;
49import org.onosproject.net.flowobjective.FilteringObjective;
50import org.onosproject.net.flowobjective.FlowObjectiveService;
51import org.onosproject.net.flowobjective.ForwardingObjective;
52import org.onosproject.net.flowobjective.NextObjective;
53import org.onosproject.net.flowobjective.Objective;
54import org.onosproject.net.flowobjective.ObjectiveContext;
55import org.onosproject.net.flowobjective.ObjectiveError;
56import org.onosproject.segmentrouting.SegmentRoutingService;
57import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
58import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
59import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
60import org.onosproject.segmentrouting.xconnect.api.XconnectService;
61import org.onosproject.store.serializers.KryoNamespaces;
62import org.onosproject.store.service.ConsistentMap;
63import org.onosproject.store.service.MapEvent;
64import org.onosproject.store.service.MapEventListener;
65import org.onosproject.store.service.Serializer;
66import org.onosproject.store.service.StorageService;
67import org.onosproject.store.service.Versioned;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.io.Serializable;
72import java.util.Set;
73import java.util.concurrent.CompletableFuture;
74import java.util.function.BiConsumer;
75import java.util.function.Consumer;
76import java.util.stream.Collectors;
77
78@Service
79@Component(immediate = true)
80public class XconnectManager implements XconnectService {
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 private CoreService coreService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 private CodecService codecService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 private StorageService storageService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 public NetworkConfigService netCfgService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 public DeviceService deviceService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 public FlowObjectiveService flowObjectiveService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 public MastershipService mastershipService;
101
102 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
103 public SegmentRoutingService srService;
104
105 private static final String APP_NAME = "org.onosproject.xconnect";
106 private static final String ERROR_NOT_MASTER = "Not master controller";
107
108 private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
109
110 private ApplicationId appId;
111 private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
112 private ConsistentMap<XconnectKey, NextObjective> xconnectNextObjStore;
113
114 private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
115 private final DeviceListener deviceListener = new InternalDeviceListener();
116
117 @Activate
118 void activate() {
119 appId = coreService.registerApplication(APP_NAME);
120 codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
121
122 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
123 .register(KryoNamespaces.API)
Charles Chan871d9182018-07-23 12:53:16 -0700124 .register(XconnectManager.class)
Charles Chanc7b3c452018-06-19 20:31:57 -0700125 .register(XconnectKey.class);
126
127 xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
128 .withName("onos-sr-xconnect")
129 .withRelaxedReadConsistency()
130 .withSerializer(Serializer.using(serializer.build()))
131 .build();
132 xconnectStore.addListener(xconnectListener);
133
134 xconnectNextObjStore = storageService.<XconnectKey, NextObjective>consistentMapBuilder()
135 .withName("onos-sr-xconnect-next")
136 .withRelaxedReadConsistency()
137 .withSerializer(Serializer.using(serializer.build()))
138 .build();
139
140 deviceService.addListener(deviceListener);
141
142 log.info("Started");
143 }
144
145 @Deactivate
146 void deactivate() {
147 xconnectStore.removeListener(xconnectListener);
148 deviceService.removeListener(deviceListener);
149 codecService.unregisterCodec(XconnectDesc.class);
150
151 log.info("Stopped");
152 }
153
154 @Override
155 public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
156 log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
157 deviceId, vlanId, ports);
158 final XconnectKey key = new XconnectKey(deviceId, vlanId);
159 xconnectStore.put(key, ports);
160 }
161
162 @Override
163 public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
164 log.info("Removing xconnect. deviceId={}, vlanId={}",
165 deviceId, vlanId);
166 final XconnectKey key = new XconnectKey(deviceId, vlanId);
167 xconnectStore.remove(key);
168 }
169
170 @Override
171 public Set<XconnectDesc> getXconnects() {
172 return xconnectStore.asJavaMap().entrySet().stream()
173 .map(e -> new XconnectDesc(e.getKey(), e.getValue()))
174 .collect(Collectors.toSet());
175 }
176
177 @Override
178 public boolean hasXconnect(ConnectPoint cp) {
179 return getXconnects().stream().anyMatch(desc ->
180 desc.key().deviceId().equals(cp.deviceId()) && desc.ports().contains(cp.port())
181 );
182 }
183
184 private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
185 @Override
186 public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
187 XconnectKey key = event.key();
188 Versioned<Set<PortNumber>> ports = event.newValue();
189 Versioned<Set<PortNumber>> oldPorts = event.oldValue();
190
191 switch (event.type()) {
192 case INSERT:
193 populateXConnect(key, ports.value());
194 break;
195 case UPDATE:
196 updateXConnect(key, oldPorts.value(), ports.value());
197 break;
198 case REMOVE:
199 revokeXConnect(key, oldPorts.value());
200 break;
201 default:
202 break;
203 }
204 }
205 }
206
207 private class InternalDeviceListener implements DeviceListener {
208 @Override
209 public void event(DeviceEvent event) {
210 DeviceId deviceId = event.subject().id();
211 if (!mastershipService.isLocalMaster(deviceId)) {
212 return;
213 }
214
215 switch (event.type()) {
216 case DEVICE_ADDED:
217 case DEVICE_AVAILABILITY_CHANGED:
218 case DEVICE_UPDATED:
219 if (deviceService.isAvailable(deviceId)) {
220 init(deviceId);
221 } else {
222 cleanup(deviceId);
223 }
224 break;
225 default:
226 break;
227 }
228 }
229 }
230
231 void init(DeviceId deviceId) {
232 getXconnects().stream()
233 .filter(desc -> desc.key().deviceId().equals(deviceId))
234 .forEach(desc -> populateXConnect(desc.key(), desc.ports()));
235 }
236
237 void cleanup(DeviceId deviceId) {
238 xconnectNextObjStore.entrySet().stream()
239 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
240 .forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
241 log.debug("{} is removed from xConnectNextObjStore", deviceId);
242 }
243
244 /**
245 * Populates XConnect groups and flows for given key.
246 *
247 * @param key XConnect key
248 * @param ports a set of ports to be cross-connected
249 */
250 private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
251 if (!mastershipService.isLocalMaster(key.deviceId())) {
252 log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
253 return;
254 }
255
256 ports = addPairPort(key.deviceId(), ports);
257 populateFilter(key, ports);
258 populateFwd(key, populateNext(key, ports));
259 populateAcl(key);
260 }
261
262 /**
263 * Populates filtering objectives for given XConnect.
264 *
265 * @param key XConnect store key
266 * @param ports XConnect ports
267 */
268 private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
269 ports.forEach(port -> {
270 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
271 ObjectiveContext context = new DefaultObjectiveContext(
272 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
273 key, port),
274 (objective, error) ->
275 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
276 key, port, error));
277 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
278 });
279 }
280
281 /**
282 * Populates next objectives for given XConnect.
283 *
284 * @param key XConnect store key
285 * @param ports XConnect ports
286 */
287 private NextObjective populateNext(XconnectKey key, Set<PortNumber> ports) {
288 NextObjective nextObj;
289 if (xconnectNextObjStore.containsKey(key)) {
290 nextObj = xconnectNextObjStore.get(key).value();
291 log.debug("NextObj for {} found, id={}", key, nextObj.id());
292 } else {
293 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
294 ObjectiveContext nextContext = new DefaultObjectiveContext(
295 // To serialize this with kryo
296 (Serializable & Consumer<Objective>) (objective) ->
297 log.debug("XConnect NextObj for {} added", key),
298 (Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) ->
299 log.warn("Failed to add XConnect NextObj for {}: {}", key, error)
300 );
301 nextObj = nextObjBuilder.add(nextContext);
302 flowObjectiveService.next(key.deviceId(), nextObj);
303 xconnectNextObjStore.put(key, nextObj);
304 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
305 }
306 return nextObj;
307 }
308
309 /**
310 * Populates bridging forwarding objectives for given XConnect.
311 *
312 * @param key XConnect store key
313 * @param nextObj next objective
314 */
315 private void populateFwd(XconnectKey key, NextObjective nextObj) {
316 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
317 ObjectiveContext fwdContext = new DefaultObjectiveContext(
318 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
319 (objective, error) ->
320 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
321 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
322 }
323
324 /**
325 * Populates ACL forwarding objectives for given XConnect.
326 *
327 * @param key XConnect store key
328 */
329 private void populateAcl(XconnectKey key) {
330 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
331 ObjectiveContext aclContext = new DefaultObjectiveContext(
332 (objective) -> log.debug("XConnect AclObj for {} populated", key),
333 (objective, error) ->
334 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
335 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
336 }
337
338 /**
339 * Revokes XConnect groups and flows for given key.
340 *
341 * @param key XConnect key
342 * @param ports XConnect ports
343 */
344 private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
345 if (!mastershipService.isLocalMaster(key.deviceId())) {
346 log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
347 return;
348 }
349
350 ports = addPairPort(key.deviceId(), ports);
351 revokeFilter(key, ports);
352 if (xconnectNextObjStore.containsKey(key)) {
353 NextObjective nextObj = xconnectNextObjStore.get(key).value();
354 revokeFwd(key, nextObj, null);
355 revokeNext(key, nextObj, null);
356 } else {
357 log.warn("NextObj for {} does not exist in the store.", key);
358 }
359 revokeAcl(key);
360 }
361
362 /**
363 * Revokes filtering objectives for given XConnect.
364 *
365 * @param key XConnect store key
366 * @param ports XConnect ports
367 */
368 private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
369 ports.forEach(port -> {
370 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
371 ObjectiveContext context = new DefaultObjectiveContext(
372 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
373 key, port),
374 (objective, error) ->
375 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
376 key, port, error));
377 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
378 });
379 }
380
381 /**
382 * Revokes next objectives for given XConnect.
383 *
384 * @param key XConnect store key
385 * @param nextObj next objective
386 * @param nextFuture completable future for this next objective operation
387 */
388 private void revokeNext(XconnectKey key, NextObjective nextObj,
389 CompletableFuture<ObjectiveError> nextFuture) {
390 ObjectiveContext context = new ObjectiveContext() {
391 @Override
392 public void onSuccess(Objective objective) {
393 log.debug("Previous NextObj for {} removed", key);
394 if (nextFuture != null) {
395 nextFuture.complete(null);
396 }
397 }
398
399 @Override
400 public void onError(Objective objective, ObjectiveError error) {
401 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
402 if (nextFuture != null) {
403 nextFuture.complete(error);
404 }
405 }
406 };
407 flowObjectiveService.next(key.deviceId(),
408 (NextObjective) nextObj.copy().remove(context));
409 xconnectNextObjStore.remove(key);
410 }
411
412 /**
413 * Revokes bridging forwarding objectives for given XConnect.
414 *
415 * @param key XConnect store key
416 * @param nextObj next objective
417 * @param fwdFuture completable future for this forwarding objective operation
418 */
419 private void revokeFwd(XconnectKey key, NextObjective nextObj,
420 CompletableFuture<ObjectiveError> fwdFuture) {
421 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
422 ObjectiveContext context = new ObjectiveContext() {
423 @Override
424 public void onSuccess(Objective objective) {
425 log.debug("Previous FwdObj for {} removed", key);
426 if (fwdFuture != null) {
427 fwdFuture.complete(null);
428 }
429 }
430
431 @Override
432 public void onError(Objective objective, ObjectiveError error) {
433 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
434 if (fwdFuture != null) {
435 fwdFuture.complete(error);
436 }
437 }
438 };
439 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
440 }
441
442 /**
443 * Revokes ACL forwarding objectives for given XConnect.
444 *
445 * @param key XConnect store key
446 */
447 private void revokeAcl(XconnectKey key) {
448 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
449 ObjectiveContext aclContext = new DefaultObjectiveContext(
450 (objective) -> log.debug("XConnect AclObj for {} populated", key),
451 (objective, error) ->
452 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
453 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
454 }
455
456 /**
457 * Updates XConnect groups and flows for given key.
458 *
459 * @param key XConnect key
460 * @param prevPorts previous XConnect ports
461 * @param ports new XConnect ports
462 */
463 private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
464 Set<PortNumber> ports) {
465 // NOTE: ACL flow doesn't include port information. No need to update it.
466 // Pair port is built-in and thus not going to change. No need to update it.
467
468 // remove old filter
469 prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
470 revokeFilter(key, ImmutableSet.of(port)));
471 // install new filter
472 ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
473 populateFilter(key, ImmutableSet.of(port)));
474
475 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
476 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
477
478 if (xconnectNextObjStore.containsKey(key)) {
479 NextObjective nextObj = xconnectNextObjStore.get(key).value();
480 revokeFwd(key, nextObj, fwdFuture);
481
482 fwdFuture.thenAcceptAsync(fwdStatus -> {
483 if (fwdStatus == null) {
484 log.debug("Fwd removed. Now remove group {}", key);
485 revokeNext(key, nextObj, nextFuture);
486 }
487 });
488
489 nextFuture.thenAcceptAsync(nextStatus -> {
490 if (nextStatus == null) {
491 log.debug("Installing new group and flow for {}", key);
492 populateFwd(key, populateNext(key, ports));
493 }
494 });
495 } else {
496 log.warn("NextObj for {} does not exist in the store.", key);
497 }
498 }
499
500 /**
501 * Creates a next objective builder for XConnect.
502 *
503 * @param key XConnect key
504 * @param ports set of XConnect ports
505 * @return next objective builder
506 */
507 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
508 int nextId = flowObjectiveService.allocateNextId();
509 TrafficSelector metadata =
510 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
511 NextObjective.Builder nextObjBuilder = DefaultNextObjective
512 .builder().withId(nextId)
513 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
514 .withMeta(metadata);
515 ports.forEach(port -> {
516 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
517 tBuilder.setOutput(port);
518 nextObjBuilder.addTreatment(tBuilder.build());
519 });
520 return nextObjBuilder;
521 }
522
523 /**
524 * Creates a bridging forwarding objective builder for XConnect.
525 *
526 * @param key XConnect key
527 * @param nextId next ID of the broadcast group for this XConnect key
528 * @return forwarding objective builder
529 */
530 private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
531 /*
532 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
533 * as the VLAN cross-connect broadcast rules
534 */
535 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
536 sbuilder.matchVlanId(key.vlanId());
537 sbuilder.matchEthDst(MacAddress.NONE);
538
539 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
540 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
541 .withSelector(sbuilder.build())
542 .nextStep(nextId)
543 .withPriority(XCONNECT_PRIORITY)
544 .fromApp(appId)
545 .makePermanent();
546 return fob;
547 }
548
549 /**
550 * Creates an ACL forwarding objective builder for XConnect.
551 *
552 * @param vlanId cross connect VLAN id
553 * @return forwarding objective builder
554 */
555 private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
556 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
557 sbuilder.matchVlanId(vlanId);
558
559 TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
560
561 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
562 fob.withFlag(ForwardingObjective.Flag.VERSATILE)
563 .withSelector(sbuilder.build())
564 .withTreatment(tbuilder.build())
565 .withPriority(XCONNECT_ACL_PRIORITY)
566 .fromApp(appId)
567 .makePermanent();
568 return fob;
569 }
570
571 /**
572 * Creates a filtering objective builder for XConnect.
573 *
574 * @param key XConnect key
575 * @param port XConnect ports
576 * @return next objective builder
577 */
578 private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
579 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
580 fob.withKey(Criteria.matchInPort(port))
581 .addCondition(Criteria.matchVlanId(key.vlanId()))
582 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
583 .withPriority(XCONNECT_PRIORITY);
584 return fob.permit().fromApp(appId);
585 }
586
587 /**
588 * Add pair port to the given set of port.
589 *
590 * @param deviceId device Id
591 * @param ports ports specified in the xconnect config
592 * @return port specified in the xconnect config plus the pair port (if configured)
593 */
594 private Set<PortNumber> addPairPort(DeviceId deviceId, Set<PortNumber> ports) {
595 if (srService == null) {
596 return ports;
597 }
598 Set<PortNumber> newPorts = Sets.newHashSet(ports);
599 srService.getPairLocalPort(deviceId).ifPresent(newPorts::add);
600 return newPorts;
601 }
Charles Chanc7b3c452018-06-19 20:31:57 -0700602}