Implement CLI and REST API for Xconnect
Deprecate the old way of configuring Xconnect via network config
Change-Id: I5b9ac7852517c25805bcbfc0e7b3bec3a52eed9f
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
new file mode 100644
index 0000000..f997d53
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/xconnect/impl/XconnectManager.java
@@ -0,0 +1,607 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.segmentrouting.xconnect.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.codec.CodecService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.NetworkConfigService;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
+import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
+import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
+import org.onosproject.segmentrouting.xconnect.api.XconnectService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+@Service
+@Component(immediate = true)
+public class XconnectManager implements XconnectService {
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private CodecService codecService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public NetworkConfigService netCfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public FlowObjectiveService flowObjectiveService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ public MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+ public SegmentRoutingService srService;
+
+ private static final String APP_NAME = "org.onosproject.xconnect";
+ private static final String ERROR_NOT_MASTER = "Not master controller";
+
+ private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
+
+ private ApplicationId appId;
+ private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
+ private ConsistentMap<XconnectKey, NextObjective> xconnectNextObjStore;
+
+ private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+
+ @Activate
+ void activate() {
+ appId = coreService.registerApplication(APP_NAME);
+ codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
+
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(XconnectKey.class);
+
+ xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
+ .withName("onos-sr-xconnect")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(serializer.build()))
+ .build();
+ xconnectStore.addListener(xconnectListener);
+
+ xconnectNextObjStore = storageService.<XconnectKey, NextObjective>consistentMapBuilder()
+ .withName("onos-sr-xconnect-next")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(serializer.build()))
+ .build();
+
+ deviceService.addListener(deviceListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ void deactivate() {
+ xconnectStore.removeListener(xconnectListener);
+ deviceService.removeListener(deviceListener);
+ codecService.unregisterCodec(XconnectDesc.class);
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
+ log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
+ deviceId, vlanId, ports);
+ final XconnectKey key = new XconnectKey(deviceId, vlanId);
+ xconnectStore.put(key, ports);
+ }
+
+ @Override
+ public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
+ log.info("Removing xconnect. deviceId={}, vlanId={}",
+ deviceId, vlanId);
+ final XconnectKey key = new XconnectKey(deviceId, vlanId);
+ xconnectStore.remove(key);
+ }
+
+ @Override
+ public Set<XconnectDesc> getXconnects() {
+ return xconnectStore.asJavaMap().entrySet().stream()
+ .map(e -> new XconnectDesc(e.getKey(), e.getValue()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public boolean hasXconnect(ConnectPoint cp) {
+ return getXconnects().stream().anyMatch(desc ->
+ desc.key().deviceId().equals(cp.deviceId()) && desc.ports().contains(cp.port())
+ );
+ }
+
+ private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
+ @Override
+ public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
+ XconnectKey key = event.key();
+ Versioned<Set<PortNumber>> ports = event.newValue();
+ Versioned<Set<PortNumber>> oldPorts = event.oldValue();
+
+ switch (event.type()) {
+ case INSERT:
+ populateXConnect(key, ports.value());
+ break;
+ case UPDATE:
+ updateXConnect(key, oldPorts.value(), ports.value());
+ break;
+ case REMOVE:
+ revokeXConnect(key, oldPorts.value());
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceId deviceId = event.subject().id();
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ return;
+ }
+
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ case DEVICE_UPDATED:
+ if (deviceService.isAvailable(deviceId)) {
+ init(deviceId);
+ } else {
+ cleanup(deviceId);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ void init(DeviceId deviceId) {
+ getXconnects().stream()
+ .filter(desc -> desc.key().deviceId().equals(deviceId))
+ .forEach(desc -> populateXConnect(desc.key(), desc.ports()));
+ }
+
+ void cleanup(DeviceId deviceId) {
+ xconnectNextObjStore.entrySet().stream()
+ .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+ .forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
+ log.debug("{} is removed from xConnectNextObjStore", deviceId);
+ }
+
+ /**
+ * Populates XConnect groups and flows for given key.
+ *
+ * @param key XConnect key
+ * @param ports a set of ports to be cross-connected
+ */
+ private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
+ if (!mastershipService.isLocalMaster(key.deviceId())) {
+ log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+ return;
+ }
+
+ ports = addPairPort(key.deviceId(), ports);
+ populateFilter(key, ports);
+ populateFwd(key, populateNext(key, ports));
+ populateAcl(key);
+ }
+
+ /**
+ * Populates filtering objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param ports XConnect ports
+ */
+ private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
+ ports.forEach(port -> {
+ FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
+ key, port),
+ (objective, error) ->
+ log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
+ key, port, error));
+ flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
+ });
+ }
+
+ /**
+ * Populates next objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param ports XConnect ports
+ */
+ private NextObjective populateNext(XconnectKey key, Set<PortNumber> ports) {
+ NextObjective nextObj;
+ if (xconnectNextObjStore.containsKey(key)) {
+ nextObj = xconnectNextObjStore.get(key).value();
+ log.debug("NextObj for {} found, id={}", key, nextObj.id());
+ } else {
+ NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
+ ObjectiveContext nextContext = new DefaultObjectiveContext(
+ // To serialize this with kryo
+ (Serializable & Consumer<Objective>) (objective) ->
+ log.debug("XConnect NextObj for {} added", key),
+ (Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) ->
+ log.warn("Failed to add XConnect NextObj for {}: {}", key, error)
+ );
+ nextObj = nextObjBuilder.add(nextContext);
+ flowObjectiveService.next(key.deviceId(), nextObj);
+ xconnectNextObjStore.put(key, nextObj);
+ log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
+ }
+ return nextObj;
+ }
+
+ /**
+ * Populates bridging forwarding objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param nextObj next objective
+ */
+ private void populateFwd(XconnectKey key, NextObjective nextObj) {
+ ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
+ ObjectiveContext fwdContext = new DefaultObjectiveContext(
+ (objective) -> log.debug("XConnect FwdObj for {} populated", key),
+ (objective, error) ->
+ log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
+ flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
+ }
+
+ /**
+ * Populates ACL forwarding objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ */
+ private void populateAcl(XconnectKey key) {
+ ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
+ ObjectiveContext aclContext = new DefaultObjectiveContext(
+ (objective) -> log.debug("XConnect AclObj for {} populated", key),
+ (objective, error) ->
+ log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
+ flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
+ }
+
+ /**
+ * Revokes XConnect groups and flows for given key.
+ *
+ * @param key XConnect key
+ * @param ports XConnect ports
+ */
+ private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
+ if (!mastershipService.isLocalMaster(key.deviceId())) {
+ log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
+ return;
+ }
+
+ ports = addPairPort(key.deviceId(), ports);
+ revokeFilter(key, ports);
+ if (xconnectNextObjStore.containsKey(key)) {
+ NextObjective nextObj = xconnectNextObjStore.get(key).value();
+ revokeFwd(key, nextObj, null);
+ revokeNext(key, nextObj, null);
+ } else {
+ log.warn("NextObj for {} does not exist in the store.", key);
+ }
+ revokeAcl(key);
+ }
+
+ /**
+ * Revokes filtering objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param ports XConnect ports
+ */
+ private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
+ ports.forEach(port -> {
+ FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
+ ObjectiveContext context = new DefaultObjectiveContext(
+ (objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
+ key, port),
+ (objective, error) ->
+ log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
+ key, port, error));
+ flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
+ });
+ }
+
+ /**
+ * Revokes next objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param nextObj next objective
+ * @param nextFuture completable future for this next objective operation
+ */
+ private void revokeNext(XconnectKey key, NextObjective nextObj,
+ CompletableFuture<ObjectiveError> nextFuture) {
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Previous NextObj for {} removed", key);
+ if (nextFuture != null) {
+ nextFuture.complete(null);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.warn("Failed to remove previous NextObj for {}: {}", key, error);
+ if (nextFuture != null) {
+ nextFuture.complete(error);
+ }
+ }
+ };
+ flowObjectiveService.next(key.deviceId(),
+ (NextObjective) nextObj.copy().remove(context));
+ xconnectNextObjStore.remove(key);
+ }
+
+ /**
+ * Revokes bridging forwarding objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ * @param nextObj next objective
+ * @param fwdFuture completable future for this forwarding objective operation
+ */
+ private void revokeFwd(XconnectKey key, NextObjective nextObj,
+ CompletableFuture<ObjectiveError> fwdFuture) {
+ ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextObj.id());
+ ObjectiveContext context = new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Previous FwdObj for {} removed", key);
+ if (fwdFuture != null) {
+ fwdFuture.complete(null);
+ }
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
+ if (fwdFuture != null) {
+ fwdFuture.complete(error);
+ }
+ }
+ };
+ flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
+ }
+
+ /**
+ * Revokes ACL forwarding objectives for given XConnect.
+ *
+ * @param key XConnect store key
+ */
+ private void revokeAcl(XconnectKey key) {
+ ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
+ ObjectiveContext aclContext = new DefaultObjectiveContext(
+ (objective) -> log.debug("XConnect AclObj for {} populated", key),
+ (objective, error) ->
+ log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
+ flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
+ }
+
+ /**
+ * Updates XConnect groups and flows for given key.
+ *
+ * @param key XConnect key
+ * @param prevPorts previous XConnect ports
+ * @param ports new XConnect ports
+ */
+ private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
+ Set<PortNumber> ports) {
+ // NOTE: ACL flow doesn't include port information. No need to update it.
+ // Pair port is built-in and thus not going to change. No need to update it.
+
+ // remove old filter
+ prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
+ revokeFilter(key, ImmutableSet.of(port)));
+ // install new filter
+ ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
+ populateFilter(key, ImmutableSet.of(port)));
+
+ CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
+ CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
+
+ if (xconnectNextObjStore.containsKey(key)) {
+ NextObjective nextObj = xconnectNextObjStore.get(key).value();
+ revokeFwd(key, nextObj, fwdFuture);
+
+ fwdFuture.thenAcceptAsync(fwdStatus -> {
+ if (fwdStatus == null) {
+ log.debug("Fwd removed. Now remove group {}", key);
+ revokeNext(key, nextObj, nextFuture);
+ }
+ });
+
+ nextFuture.thenAcceptAsync(nextStatus -> {
+ if (nextStatus == null) {
+ log.debug("Installing new group and flow for {}", key);
+ populateFwd(key, populateNext(key, ports));
+ }
+ });
+ } else {
+ log.warn("NextObj for {} does not exist in the store.", key);
+ }
+ }
+
+ /**
+ * Creates a next objective builder for XConnect.
+ *
+ * @param key XConnect key
+ * @param ports set of XConnect ports
+ * @return next objective builder
+ */
+ private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
+ int nextId = flowObjectiveService.allocateNextId();
+ TrafficSelector metadata =
+ DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
+ NextObjective.Builder nextObjBuilder = DefaultNextObjective
+ .builder().withId(nextId)
+ .withType(NextObjective.Type.BROADCAST).fromApp(appId)
+ .withMeta(metadata);
+ ports.forEach(port -> {
+ TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+ tBuilder.setOutput(port);
+ nextObjBuilder.addTreatment(tBuilder.build());
+ });
+ return nextObjBuilder;
+ }
+
+ /**
+ * Creates a bridging forwarding objective builder for XConnect.
+ *
+ * @param key XConnect key
+ * @param nextId next ID of the broadcast group for this XConnect key
+ * @return forwarding objective builder
+ */
+ private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
+ /*
+ * Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
+ * as the VLAN cross-connect broadcast rules
+ */
+ TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+ sbuilder.matchVlanId(key.vlanId());
+ sbuilder.matchEthDst(MacAddress.NONE);
+
+ ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
+ fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
+ .withSelector(sbuilder.build())
+ .nextStep(nextId)
+ .withPriority(XCONNECT_PRIORITY)
+ .fromApp(appId)
+ .makePermanent();
+ return fob;
+ }
+
+ /**
+ * Creates an ACL forwarding objective builder for XConnect.
+ *
+ * @param vlanId cross connect VLAN id
+ * @return forwarding objective builder
+ */
+ private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
+ TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+ sbuilder.matchVlanId(vlanId);
+
+ TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
+
+ ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
+ fob.withFlag(ForwardingObjective.Flag.VERSATILE)
+ .withSelector(sbuilder.build())
+ .withTreatment(tbuilder.build())
+ .withPriority(XCONNECT_ACL_PRIORITY)
+ .fromApp(appId)
+ .makePermanent();
+ return fob;
+ }
+
+ /**
+ * Creates a filtering objective builder for XConnect.
+ *
+ * @param key XConnect key
+ * @param port XConnect ports
+ * @return next objective builder
+ */
+ private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
+ FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
+ fob.withKey(Criteria.matchInPort(port))
+ .addCondition(Criteria.matchVlanId(key.vlanId()))
+ .addCondition(Criteria.matchEthDst(MacAddress.NONE))
+ .withPriority(XCONNECT_PRIORITY);
+ return fob.permit().fromApp(appId);
+ }
+
+ /**
+ * Add pair port to the given set of port.
+ *
+ * @param deviceId device Id
+ * @param ports ports specified in the xconnect config
+ * @return port specified in the xconnect config plus the pair port (if configured)
+ */
+ private Set<PortNumber> addPairPort(DeviceId deviceId, Set<PortNumber> ports) {
+ if (srService == null) {
+ return ports;
+ }
+ Set<PortNumber> newPorts = Sets.newHashSet(ports);
+ srService.getPairLocalPort(deviceId).ifPresent(newPorts::add);
+ return newPorts;
+ }
+
+ // TODO DEVICE listener
+ // up : init
+ // down: removeDevice
+
+
+}