blob: f997d53e50154aca2b2bce86e968a9d8bbd88b04 [file] [log] [blame]
Charles Chanc7b3c452018-06-19 20:31:57 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.segmentrouting.xconnect.impl;
17
18import com.google.common.collect.ImmutableSet;
19import com.google.common.collect.Sets;
20import org.apache.felix.scr.annotations.Activate;
21import org.apache.felix.scr.annotations.Component;
22import org.apache.felix.scr.annotations.Deactivate;
23import org.apache.felix.scr.annotations.Reference;
24import org.apache.felix.scr.annotations.ReferenceCardinality;
25import org.apache.felix.scr.annotations.Service;
26import org.onlab.packet.MacAddress;
27import org.onlab.packet.VlanId;
28import org.onlab.util.KryoNamespace;
29import org.onosproject.codec.CodecService;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.mastership.MastershipService;
33import org.onosproject.net.ConnectPoint;
34import org.onosproject.net.DeviceId;
35import org.onosproject.net.PortNumber;
36import org.onosproject.net.config.NetworkConfigService;
37import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.device.DeviceListener;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.flow.DefaultTrafficSelector;
41import org.onosproject.net.flow.DefaultTrafficTreatment;
42import org.onosproject.net.flow.TrafficSelector;
43import org.onosproject.net.flow.TrafficTreatment;
44import org.onosproject.net.flow.criteria.Criteria;
45import org.onosproject.net.flowobjective.DefaultFilteringObjective;
46import org.onosproject.net.flowobjective.DefaultForwardingObjective;
47import org.onosproject.net.flowobjective.DefaultNextObjective;
48import org.onosproject.net.flowobjective.DefaultObjectiveContext;
49import org.onosproject.net.flowobjective.FilteringObjective;
50import org.onosproject.net.flowobjective.FlowObjectiveService;
51import org.onosproject.net.flowobjective.ForwardingObjective;
52import org.onosproject.net.flowobjective.NextObjective;
53import org.onosproject.net.flowobjective.Objective;
54import org.onosproject.net.flowobjective.ObjectiveContext;
55import org.onosproject.net.flowobjective.ObjectiveError;
56import org.onosproject.segmentrouting.SegmentRoutingService;
57import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
58import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
59import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
60import org.onosproject.segmentrouting.xconnect.api.XconnectService;
61import org.onosproject.store.serializers.KryoNamespaces;
62import org.onosproject.store.service.ConsistentMap;
63import org.onosproject.store.service.MapEvent;
64import org.onosproject.store.service.MapEventListener;
65import org.onosproject.store.service.Serializer;
66import org.onosproject.store.service.StorageService;
67import org.onosproject.store.service.Versioned;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.io.Serializable;
72import java.util.Set;
73import java.util.concurrent.CompletableFuture;
74import java.util.function.BiConsumer;
75import java.util.function.Consumer;
76import java.util.stream.Collectors;
77
78@Service
79@Component(immediate = true)
80public class XconnectManager implements XconnectService {
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 private CoreService coreService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 private CodecService codecService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 private StorageService storageService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 public NetworkConfigService netCfgService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 public DeviceService deviceService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 public FlowObjectiveService flowObjectiveService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 public MastershipService mastershipService;
101
102 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
103 public SegmentRoutingService srService;
104
105 private static final String APP_NAME = "org.onosproject.xconnect";
106 private static final String ERROR_NOT_MASTER = "Not master controller";
107
108 private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
109
110 private ApplicationId appId;
111 private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
112 private ConsistentMap<XconnectKey, NextObjective> xconnectNextObjStore;
113
114 private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
115 private final DeviceListener deviceListener = new InternalDeviceListener();
116
117 @Activate
118 void activate() {
119 appId = coreService.registerApplication(APP_NAME);
120 codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
121
122 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
123 .register(KryoNamespaces.API)
124 .register(XconnectKey.class);
125
126 xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
127 .withName("onos-sr-xconnect")
128 .withRelaxedReadConsistency()
129 .withSerializer(Serializer.using(serializer.build()))
130 .build();
131 xconnectStore.addListener(xconnectListener);
132
133 xconnectNextObjStore = storageService.<XconnectKey, NextObjective>consistentMapBuilder()
134 .withName("onos-sr-xconnect-next")
135 .withRelaxedReadConsistency()
136 .withSerializer(Serializer.using(serializer.build()))
137 .build();
138
139 deviceService.addListener(deviceListener);
140
141 log.info("Started");
142 }
143
144 @Deactivate
145 void deactivate() {
146 xconnectStore.removeListener(xconnectListener);
147 deviceService.removeListener(deviceListener);
148 codecService.unregisterCodec(XconnectDesc.class);
149
150 log.info("Stopped");
151 }
152
153 @Override
154 public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
155 log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
156 deviceId, vlanId, ports);
157 final XconnectKey key = new XconnectKey(deviceId, vlanId);
158 xconnectStore.put(key, ports);
159 }
160
161 @Override
162 public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
163 log.info("Removing xconnect. deviceId={}, vlanId={}",
164 deviceId, vlanId);
165 final XconnectKey key = new XconnectKey(deviceId, vlanId);
166 xconnectStore.remove(key);
167 }
168
169 @Override
170 public Set<XconnectDesc> getXconnects() {
171 return xconnectStore.asJavaMap().entrySet().stream()
172 .map(e -> new XconnectDesc(e.getKey(), e.getValue()))
173 .collect(Collectors.toSet());
174 }
175
176 @Override
177 public boolean hasXconnect(ConnectPoint cp) {
178 return getXconnects().stream().anyMatch(desc ->
179 desc.key().deviceId().equals(cp.deviceId()) && desc.ports().contains(cp.port())
180 );
181 }
182
183 private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
184 @Override
185 public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
186 XconnectKey key = event.key();
187 Versioned<Set<PortNumber>> ports = event.newValue();
188 Versioned<Set<PortNumber>> oldPorts = event.oldValue();
189
190 switch (event.type()) {
191 case INSERT:
192 populateXConnect(key, ports.value());
193 break;
194 case UPDATE:
195 updateXConnect(key, oldPorts.value(), ports.value());
196 break;
197 case REMOVE:
198 revokeXConnect(key, oldPorts.value());
199 break;
200 default:
201 break;
202 }
203 }
204 }
205
206 private class InternalDeviceListener implements DeviceListener {
207 @Override
208 public void event(DeviceEvent event) {
209 DeviceId deviceId = event.subject().id();
210 if (!mastershipService.isLocalMaster(deviceId)) {
211 return;
212 }
213
214 switch (event.type()) {
215 case DEVICE_ADDED:
216 case DEVICE_AVAILABILITY_CHANGED:
217 case DEVICE_UPDATED:
218 if (deviceService.isAvailable(deviceId)) {
219 init(deviceId);
220 } else {
221 cleanup(deviceId);
222 }
223 break;
224 default:
225 break;
226 }
227 }
228 }
229
230 void init(DeviceId deviceId) {
231 getXconnects().stream()
232 .filter(desc -> desc.key().deviceId().equals(deviceId))
233 .forEach(desc -> populateXConnect(desc.key(), desc.ports()));
234 }
235
236 void cleanup(DeviceId deviceId) {
237 xconnectNextObjStore.entrySet().stream()
238 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
239 .forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
240 log.debug("{} is removed from xConnectNextObjStore", deviceId);
241 }
242
243 /**
244 * Populates XConnect groups and flows for given key.
245 *
246 * @param key XConnect key
247 * @param ports a set of ports to be cross-connected
248 */
249 private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
250 if (!mastershipService.isLocalMaster(key.deviceId())) {
251 log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
252 return;
253 }
254
255 ports = addPairPort(key.deviceId(), ports);
256 populateFilter(key, ports);
257 populateFwd(key, populateNext(key, ports));
258 populateAcl(key);
259 }
260
261 /**
262 * Populates filtering objectives for given XConnect.
263 *
264 * @param key XConnect store key
265 * @param ports XConnect ports
266 */
267 private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
268 ports.forEach(port -> {
269 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
270 ObjectiveContext context = new DefaultObjectiveContext(
271 (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
272 key, port),
273 (objective, error) ->
274 log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
275 key, port, error));
276 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
277 });
278 }
279
280 /**
281 * Populates next objectives for given XConnect.
282 *
283 * @param key XConnect store key
284 * @param ports XConnect ports
285 */
286 private NextObjective populateNext(XconnectKey key, Set<PortNumber> ports) {
287 NextObjective nextObj;
288 if (xconnectNextObjStore.containsKey(key)) {
289 nextObj = xconnectNextObjStore.get(key).value();
290 log.debug("NextObj for {} found, id={}", key, nextObj.id());
291 } else {
292 NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
293 ObjectiveContext nextContext = new DefaultObjectiveContext(
294 // To serialize this with kryo
295 (Serializable & Consumer<Objective>) (objective) ->
296 log.debug("XConnect NextObj for {} added", key),
297 (Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) ->
298 log.warn("Failed to add XConnect NextObj for {}: {}", key, error)
299 );
300 nextObj = nextObjBuilder.add(nextContext);
301 flowObjectiveService.next(key.deviceId(), nextObj);
302 xconnectNextObjStore.put(key, nextObj);
303 log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
304 }
305 return nextObj;
306 }
307
308 /**
309 * Populates bridging forwarding objectives for given XConnect.
310 *
311 * @param key XConnect store key
312 * @param nextObj next objective
313 */
314 private void populateFwd(XconnectKey key, NextObjective nextObj) {
315 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
316 ObjectiveContext fwdContext = new DefaultObjectiveContext(
317 (objective) -> log.debug("XConnect FwdObj for {} populated", key),
318 (objective, error) ->
319 log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
320 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
321 }
322
323 /**
324 * Populates ACL forwarding objectives for given XConnect.
325 *
326 * @param key XConnect store key
327 */
328 private void populateAcl(XconnectKey key) {
329 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
330 ObjectiveContext aclContext = new DefaultObjectiveContext(
331 (objective) -> log.debug("XConnect AclObj for {} populated", key),
332 (objective, error) ->
333 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
334 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
335 }
336
337 /**
338 * Revokes XConnect groups and flows for given key.
339 *
340 * @param key XConnect key
341 * @param ports XConnect ports
342 */
343 private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
344 if (!mastershipService.isLocalMaster(key.deviceId())) {
345 log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
346 return;
347 }
348
349 ports = addPairPort(key.deviceId(), ports);
350 revokeFilter(key, ports);
351 if (xconnectNextObjStore.containsKey(key)) {
352 NextObjective nextObj = xconnectNextObjStore.get(key).value();
353 revokeFwd(key, nextObj, null);
354 revokeNext(key, nextObj, null);
355 } else {
356 log.warn("NextObj for {} does not exist in the store.", key);
357 }
358 revokeAcl(key);
359 }
360
361 /**
362 * Revokes filtering objectives for given XConnect.
363 *
364 * @param key XConnect store key
365 * @param ports XConnect ports
366 */
367 private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
368 ports.forEach(port -> {
369 FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
370 ObjectiveContext context = new DefaultObjectiveContext(
371 (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
372 key, port),
373 (objective, error) ->
374 log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
375 key, port, error));
376 flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
377 });
378 }
379
380 /**
381 * Revokes next objectives for given XConnect.
382 *
383 * @param key XConnect store key
384 * @param nextObj next objective
385 * @param nextFuture completable future for this next objective operation
386 */
387 private void revokeNext(XconnectKey key, NextObjective nextObj,
388 CompletableFuture<ObjectiveError> nextFuture) {
389 ObjectiveContext context = new ObjectiveContext() {
390 @Override
391 public void onSuccess(Objective objective) {
392 log.debug("Previous NextObj for {} removed", key);
393 if (nextFuture != null) {
394 nextFuture.complete(null);
395 }
396 }
397
398 @Override
399 public void onError(Objective objective, ObjectiveError error) {
400 log.warn("Failed to remove previous NextObj for {}: {}", key, error);
401 if (nextFuture != null) {
402 nextFuture.complete(error);
403 }
404 }
405 };
406 flowObjectiveService.next(key.deviceId(),
407 (NextObjective) nextObj.copy().remove(context));
408 xconnectNextObjStore.remove(key);
409 }
410
411 /**
412 * Revokes bridging forwarding objectives for given XConnect.
413 *
414 * @param key XConnect store key
415 * @param nextObj next objective
416 * @param fwdFuture completable future for this forwarding objective operation
417 */
418 private void revokeFwd(XconnectKey key, NextObjective nextObj,
419 CompletableFuture<ObjectiveError> fwdFuture) {
420 ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
421 ObjectiveContext context = new ObjectiveContext() {
422 @Override
423 public void onSuccess(Objective objective) {
424 log.debug("Previous FwdObj for {} removed", key);
425 if (fwdFuture != null) {
426 fwdFuture.complete(null);
427 }
428 }
429
430 @Override
431 public void onError(Objective objective, ObjectiveError error) {
432 log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
433 if (fwdFuture != null) {
434 fwdFuture.complete(error);
435 }
436 }
437 };
438 flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
439 }
440
441 /**
442 * Revokes ACL forwarding objectives for given XConnect.
443 *
444 * @param key XConnect store key
445 */
446 private void revokeAcl(XconnectKey key) {
447 ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
448 ObjectiveContext aclContext = new DefaultObjectiveContext(
449 (objective) -> log.debug("XConnect AclObj for {} populated", key),
450 (objective, error) ->
451 log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
452 flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
453 }
454
455 /**
456 * Updates XConnect groups and flows for given key.
457 *
458 * @param key XConnect key
459 * @param prevPorts previous XConnect ports
460 * @param ports new XConnect ports
461 */
462 private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
463 Set<PortNumber> ports) {
464 // NOTE: ACL flow doesn't include port information. No need to update it.
465 // Pair port is built-in and thus not going to change. No need to update it.
466
467 // remove old filter
468 prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
469 revokeFilter(key, ImmutableSet.of(port)));
470 // install new filter
471 ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
472 populateFilter(key, ImmutableSet.of(port)));
473
474 CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
475 CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
476
477 if (xconnectNextObjStore.containsKey(key)) {
478 NextObjective nextObj = xconnectNextObjStore.get(key).value();
479 revokeFwd(key, nextObj, fwdFuture);
480
481 fwdFuture.thenAcceptAsync(fwdStatus -> {
482 if (fwdStatus == null) {
483 log.debug("Fwd removed. Now remove group {}", key);
484 revokeNext(key, nextObj, nextFuture);
485 }
486 });
487
488 nextFuture.thenAcceptAsync(nextStatus -> {
489 if (nextStatus == null) {
490 log.debug("Installing new group and flow for {}", key);
491 populateFwd(key, populateNext(key, ports));
492 }
493 });
494 } else {
495 log.warn("NextObj for {} does not exist in the store.", key);
496 }
497 }
498
499 /**
500 * Creates a next objective builder for XConnect.
501 *
502 * @param key XConnect key
503 * @param ports set of XConnect ports
504 * @return next objective builder
505 */
506 private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
507 int nextId = flowObjectiveService.allocateNextId();
508 TrafficSelector metadata =
509 DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
510 NextObjective.Builder nextObjBuilder = DefaultNextObjective
511 .builder().withId(nextId)
512 .withType(NextObjective.Type.BROADCAST).fromApp(appId)
513 .withMeta(metadata);
514 ports.forEach(port -> {
515 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
516 tBuilder.setOutput(port);
517 nextObjBuilder.addTreatment(tBuilder.build());
518 });
519 return nextObjBuilder;
520 }
521
522 /**
523 * Creates a bridging forwarding objective builder for XConnect.
524 *
525 * @param key XConnect key
526 * @param nextId next ID of the broadcast group for this XConnect key
527 * @return forwarding objective builder
528 */
529 private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
530 /*
531 * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
532 * as the VLAN cross-connect broadcast rules
533 */
534 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
535 sbuilder.matchVlanId(key.vlanId());
536 sbuilder.matchEthDst(MacAddress.NONE);
537
538 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
539 fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
540 .withSelector(sbuilder.build())
541 .nextStep(nextId)
542 .withPriority(XCONNECT_PRIORITY)
543 .fromApp(appId)
544 .makePermanent();
545 return fob;
546 }
547
548 /**
549 * Creates an ACL forwarding objective builder for XConnect.
550 *
551 * @param vlanId cross connect VLAN id
552 * @return forwarding objective builder
553 */
554 private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
555 TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
556 sbuilder.matchVlanId(vlanId);
557
558 TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
559
560 ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
561 fob.withFlag(ForwardingObjective.Flag.VERSATILE)
562 .withSelector(sbuilder.build())
563 .withTreatment(tbuilder.build())
564 .withPriority(XCONNECT_ACL_PRIORITY)
565 .fromApp(appId)
566 .makePermanent();
567 return fob;
568 }
569
570 /**
571 * Creates a filtering objective builder for XConnect.
572 *
573 * @param key XConnect key
574 * @param port XConnect ports
575 * @return next objective builder
576 */
577 private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
578 FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
579 fob.withKey(Criteria.matchInPort(port))
580 .addCondition(Criteria.matchVlanId(key.vlanId()))
581 .addCondition(Criteria.matchEthDst(MacAddress.NONE))
582 .withPriority(XCONNECT_PRIORITY);
583 return fob.permit().fromApp(appId);
584 }
585
586 /**
587 * Add pair port to the given set of port.
588 *
589 * @param deviceId device Id
590 * @param ports ports specified in the xconnect config
591 * @return port specified in the xconnect config plus the pair port (if configured)
592 */
593 private Set<PortNumber> addPairPort(DeviceId deviceId, Set<PortNumber> ports) {
594 if (srService == null) {
595 return ports;
596 }
597 Set<PortNumber> newPorts = Sets.newHashSet(ports);
598 srService.getPairLocalPort(deviceId).ifPresent(newPorts::add);
599 return newPorts;
600 }
601
602 // TODO DEVICE listener
603 // up : init
604 // down: removeDevice
605
606
607}