blob: 5e42e570d2133e4e0682cfc69a884eb407d24565 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.segmentrouting.xconnect.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.codec.CodecService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.DefaultObjectiveContext;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.segmentrouting.SegmentRoutingService;
import org.onosproject.segmentrouting.xconnect.api.XconnectCodec;
import org.onosproject.segmentrouting.xconnect.api.XconnectDesc;
import org.onosproject.segmentrouting.xconnect.api.XconnectKey;
import org.onosproject.segmentrouting.xconnect.api.XconnectService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true, service = XconnectService.class)
public class XconnectManager implements XconnectService {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private CodecService codecService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
public NetworkConfigService netCfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
public DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
public FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
public MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL)
public SegmentRoutingService srService;
private static final String APP_NAME = "org.onosproject.xconnect";
private static final String ERROR_NOT_MASTER = "Not master controller";
private static Logger log = LoggerFactory.getLogger(XconnectManager.class);
private ApplicationId appId;
private ConsistentMap<XconnectKey, Set<PortNumber>> xconnectStore;
private ConsistentMap<XconnectKey, Integer> xconnectNextObjStore;
private final MapEventListener<XconnectKey, Set<PortNumber>> xconnectListener = new XconnectMapListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
private ExecutorService deviceEventExecutor;
@Activate
void activate() {
appId = coreService.registerApplication(APP_NAME);
codecService.registerCodec(XconnectDesc.class, new XconnectCodec());
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(XconnectManager.class)
.register(XconnectKey.class);
xconnectStore = storageService.<XconnectKey, Set<PortNumber>>consistentMapBuilder()
.withName("onos-sr-xconnect")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(serializer.build()))
.build();
xconnectStore.addListener(xconnectListener);
xconnectNextObjStore = storageService.<XconnectKey, Integer>consistentMapBuilder()
.withName("onos-sr-xconnect-next")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(serializer.build()))
.build();
deviceEventExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("sr-xconnect-device-event", "%d", log));
deviceService.addListener(deviceListener);
log.info("Started");
}
@Deactivate
void deactivate() {
xconnectStore.removeListener(xconnectListener);
deviceService.removeListener(deviceListener);
codecService.unregisterCodec(XconnectDesc.class);
deviceEventExecutor.shutdown();
log.info("Stopped");
}
@Override
public void addOrUpdateXconnect(DeviceId deviceId, VlanId vlanId, Set<PortNumber> ports) {
log.info("Adding or updating xconnect. deviceId={}, vlanId={}, ports={}",
deviceId, vlanId, ports);
final XconnectKey key = new XconnectKey(deviceId, vlanId);
xconnectStore.put(key, ports);
}
@Override
public void removeXonnect(DeviceId deviceId, VlanId vlanId) {
log.info("Removing xconnect. deviceId={}, vlanId={}",
deviceId, vlanId);
final XconnectKey key = new XconnectKey(deviceId, vlanId);
xconnectStore.remove(key);
}
@Override
public Set<XconnectDesc> getXconnects() {
return xconnectStore.asJavaMap().entrySet().stream()
.map(e -> new XconnectDesc(e.getKey(), e.getValue()))
.collect(Collectors.toSet());
}
@Override
public boolean hasXconnect(ConnectPoint cp) {
return getXconnects().stream().anyMatch(desc ->
desc.key().deviceId().equals(cp.deviceId()) && desc.ports().contains(cp.port())
);
}
@Override
public ImmutableMap<XconnectKey, Integer> getNext() {
if (xconnectNextObjStore != null) {
return ImmutableMap.copyOf(xconnectNextObjStore.asJavaMap());
} else {
return ImmutableMap.of();
}
}
@Override
public void removeNextId(int nextId) {
xconnectNextObjStore.entrySet().forEach(e -> {
if (e.getValue().value() == nextId) {
xconnectNextObjStore.remove(e.getKey());
}
});
}
private class XconnectMapListener implements MapEventListener<XconnectKey, Set<PortNumber>> {
@Override
public void event(MapEvent<XconnectKey, Set<PortNumber>> event) {
XconnectKey key = event.key();
Versioned<Set<PortNumber>> ports = event.newValue();
Versioned<Set<PortNumber>> oldPorts = event.oldValue();
switch (event.type()) {
case INSERT:
populateXConnect(key, ports.value());
break;
case UPDATE:
updateXConnect(key, oldPorts.value(), ports.value());
break;
case REMOVE:
revokeXConnect(key, oldPorts.value());
break;
default:
break;
}
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
deviceEventExecutor.execute(() -> {
DeviceId deviceId = event.subject().id();
if (!mastershipService.isLocalMaster(deviceId)) {
return;
}
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_UPDATED:
if (deviceService.isAvailable(deviceId)) {
init(deviceId);
} else {
cleanup(deviceId);
}
break;
default:
break;
}
});
}
}
private void init(DeviceId deviceId) {
getXconnects().stream()
.filter(desc -> desc.key().deviceId().equals(deviceId))
.forEach(desc -> populateXConnect(desc.key(), desc.ports()));
}
private void cleanup(DeviceId deviceId) {
xconnectNextObjStore.entrySet().stream()
.filter(entry -> entry.getKey().deviceId().equals(deviceId))
.forEach(entry -> xconnectNextObjStore.remove(entry.getKey()));
log.debug("{} is removed from xConnectNextObjStore", deviceId);
}
/**
* Populates XConnect groups and flows for given key.
*
* @param key XConnect key
* @param ports a set of ports to be cross-connected
*/
private void populateXConnect(XconnectKey key, Set<PortNumber> ports) {
if (!mastershipService.isLocalMaster(key.deviceId())) {
log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
return;
}
ports = addPairPort(key.deviceId(), ports);
populateFilter(key, ports);
populateFwd(key, populateNext(key, ports));
populateAcl(key);
}
/**
* Populates filtering objectives for given XConnect.
*
* @param key XConnect store key
* @param ports XConnect ports
*/
private void populateFilter(XconnectKey key, Set<PortNumber> ports) {
ports.forEach(port -> {
FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> log.debug("XConnect FilterObj for {} on port {} populated",
key, port),
(objective, error) ->
log.warn("Failed to populate XConnect FilterObj for {} on port {}: {}",
key, port, error));
flowObjectiveService.filter(key.deviceId(), filtObjBuilder.add(context));
});
}
/**
* Populates next objectives for given XConnect.
*
* @param key XConnect store key
* @param ports XConnect ports
*/
private int populateNext(XconnectKey key, Set<PortNumber> ports) {
if (xconnectNextObjStore.containsKey(key)) {
int nextId = xconnectNextObjStore.get(key).value();
log.debug("NextObj for {} found, id={}", key, nextId);
return nextId;
} else {
NextObjective.Builder nextObjBuilder = nextObjBuilder(key, ports);
ObjectiveContext nextContext = new DefaultObjectiveContext(
// To serialize this with kryo
(Serializable & Consumer<Objective>) (objective) ->
log.debug("XConnect NextObj for {} added", key),
(Serializable & BiConsumer<Objective, ObjectiveError>) (objective, error) -> {
log.warn("Failed to add XConnect NextObj for {}: {}", key, error);
srService.invalidateNextObj(objective.id());
});
NextObjective nextObj = nextObjBuilder.add(nextContext);
flowObjectiveService.next(key.deviceId(), nextObj);
xconnectNextObjStore.put(key, nextObj.id());
log.debug("NextObj for {} not found. Creating new NextObj with id={}", key, nextObj.id());
return nextObj.id();
}
}
/**
* Populates bridging forwarding objectives for given XConnect.
*
* @param key XConnect store key
* @param nextId next objective id
*/
private void populateFwd(XconnectKey key, int nextId) {
ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
ObjectiveContext fwdContext = new DefaultObjectiveContext(
(objective) -> log.debug("XConnect FwdObj for {} populated", key),
(objective, error) ->
log.warn("Failed to populate XConnect FwdObj for {}: {}", key, error));
flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.add(fwdContext));
}
/**
* Populates ACL forwarding objectives for given XConnect.
*
* @param key XConnect store key
*/
private void populateAcl(XconnectKey key) {
ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
ObjectiveContext aclContext = new DefaultObjectiveContext(
(objective) -> log.debug("XConnect AclObj for {} populated", key),
(objective, error) ->
log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
flowObjectiveService.forward(key.deviceId(), aclObjBuilder.add(aclContext));
}
/**
* Revokes XConnect groups and flows for given key.
*
* @param key XConnect key
* @param ports XConnect ports
*/
private void revokeXConnect(XconnectKey key, Set<PortNumber> ports) {
if (!mastershipService.isLocalMaster(key.deviceId())) {
log.info("Abort populating XConnect {}: {}", key, ERROR_NOT_MASTER);
return;
}
ports = addPairPort(key.deviceId(), ports);
revokeFilter(key, ports);
if (xconnectNextObjStore.containsKey(key)) {
int nextId = xconnectNextObjStore.get(key).value();
revokeFwd(key, nextId, null);
revokeNext(key, ports, nextId, null);
} else {
log.warn("NextObj for {} does not exist in the store.", key);
}
revokeAcl(key);
}
/**
* Revokes filtering objectives for given XConnect.
*
* @param key XConnect store key
* @param ports XConnect ports
*/
private void revokeFilter(XconnectKey key, Set<PortNumber> ports) {
ports.forEach(port -> {
FilteringObjective.Builder filtObjBuilder = filterObjBuilder(key, port);
ObjectiveContext context = new DefaultObjectiveContext(
(objective) -> log.debug("XConnect FilterObj for {} on port {} revoked",
key, port),
(objective, error) ->
log.warn("Failed to revoke XConnect FilterObj for {} on port {}: {}",
key, port, error));
flowObjectiveService.filter(key.deviceId(), filtObjBuilder.remove(context));
});
}
/**
* Revokes next objectives for given XConnect.
*
* @param key XConnect store key
* @param ports ports in the XConnect
* @param nextId next objective id
* @param nextFuture completable future for this next objective operation
*/
private void revokeNext(XconnectKey key, Set<PortNumber> ports, int nextId,
CompletableFuture<ObjectiveError> nextFuture) {
ObjectiveContext context = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
log.debug("Previous NextObj for {} removed", key);
if (nextFuture != null) {
nextFuture.complete(null);
}
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.warn("Failed to remove previous NextObj for {}: {}", key, error);
if (nextFuture != null) {
nextFuture.complete(error);
}
srService.invalidateNextObj(objective.id());
}
};
flowObjectiveService.next(key.deviceId(), nextObjBuilder(key, ports, nextId).remove(context));
xconnectNextObjStore.remove(key);
}
/**
* Revokes bridging forwarding objectives for given XConnect.
*
* @param key XConnect store key
* @param nextId next objective id
* @param fwdFuture completable future for this forwarding objective operation
*/
private void revokeFwd(XconnectKey key, int nextId, CompletableFuture<ObjectiveError> fwdFuture) {
ForwardingObjective.Builder fwdObjBuilder = fwdObjBuilder(key, nextId);
ObjectiveContext context = new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
log.debug("Previous FwdObj for {} removed", key);
if (fwdFuture != null) {
fwdFuture.complete(null);
}
}
@Override
public void onError(Objective objective, ObjectiveError error) {
log.warn("Failed to remove previous FwdObj for {}: {}", key, error);
if (fwdFuture != null) {
fwdFuture.complete(error);
}
}
};
flowObjectiveService.forward(key.deviceId(), fwdObjBuilder.remove(context));
}
/**
* Revokes ACL forwarding objectives for given XConnect.
*
* @param key XConnect store key
*/
private void revokeAcl(XconnectKey key) {
ForwardingObjective.Builder aclObjBuilder = aclObjBuilder(key.vlanId());
ObjectiveContext aclContext = new DefaultObjectiveContext(
(objective) -> log.debug("XConnect AclObj for {} populated", key),
(objective, error) ->
log.warn("Failed to populate XConnect AclObj for {}: {}", key, error));
flowObjectiveService.forward(key.deviceId(), aclObjBuilder.remove(aclContext));
}
/**
* Updates XConnect groups and flows for given key.
*
* @param key XConnect key
* @param prevPorts previous XConnect ports
* @param ports new XConnect ports
*/
private void updateXConnect(XconnectKey key, Set<PortNumber> prevPorts,
Set<PortNumber> ports) {
// NOTE: ACL flow doesn't include port information. No need to update it.
// Pair port is built-in and thus not going to change. No need to update it.
// remove old filter
prevPorts.stream().filter(port -> !ports.contains(port)).forEach(port ->
revokeFilter(key, ImmutableSet.of(port)));
// install new filter
ports.stream().filter(port -> !prevPorts.contains(port)).forEach(port ->
populateFilter(key, ImmutableSet.of(port)));
CompletableFuture<ObjectiveError> fwdFuture = new CompletableFuture<>();
CompletableFuture<ObjectiveError> nextFuture = new CompletableFuture<>();
if (xconnectNextObjStore.containsKey(key)) {
int nextId = xconnectNextObjStore.get(key).value();
revokeFwd(key, nextId, fwdFuture);
fwdFuture.thenAcceptAsync(fwdStatus -> {
if (fwdStatus == null) {
log.debug("Fwd removed. Now remove group {}", key);
revokeNext(key, prevPorts, nextId, nextFuture);
}
});
nextFuture.thenAcceptAsync(nextStatus -> {
if (nextStatus == null) {
log.debug("Installing new group and flow for {}", key);
populateFwd(key, populateNext(key, ports));
}
});
} else {
log.warn("NextObj for {} does not exist in the store.", key);
}
}
/**
* Creates a next objective builder for XConnect with given nextId.
*
* @param key XConnect key
* @param ports set of XConnect ports
* @param nextId next objective id
* @return next objective builder
*/
private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports, int nextId) {
TrafficSelector metadata =
DefaultTrafficSelector.builder().matchVlanId(key.vlanId()).build();
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withId(nextId)
.withType(NextObjective.Type.BROADCAST).fromApp(appId)
.withMeta(metadata);
ports.forEach(port -> {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
tBuilder.setOutput(port);
nextObjBuilder.addTreatment(tBuilder.build());
});
return nextObjBuilder;
}
/**
* Creates a next objective builder for XConnect.
*
* @param key XConnect key
* @param ports set of XConnect ports
* @return next objective builder
*/
private NextObjective.Builder nextObjBuilder(XconnectKey key, Set<PortNumber> ports) {
int nextId = flowObjectiveService.allocateNextId();
return nextObjBuilder(key, ports, nextId);
}
/**
* Creates a bridging forwarding objective builder for XConnect.
*
* @param key XConnect key
* @param nextId next ID of the broadcast group for this XConnect key
* @return forwarding objective builder
*/
private ForwardingObjective.Builder fwdObjBuilder(XconnectKey key, int nextId) {
/*
* Driver should treat objectives with MacAddress.NONE and !VlanId.NONE
* as the VLAN cross-connect broadcast rules
*/
TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
sbuilder.matchVlanId(key.vlanId());
sbuilder.matchEthDst(MacAddress.NONE);
ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
fob.withFlag(ForwardingObjective.Flag.SPECIFIC)
.withSelector(sbuilder.build())
.nextStep(nextId)
.withPriority(XCONNECT_PRIORITY)
.fromApp(appId)
.makePermanent();
return fob;
}
/**
* Creates an ACL forwarding objective builder for XConnect.
*
* @param vlanId cross connect VLAN id
* @return forwarding objective builder
*/
private ForwardingObjective.Builder aclObjBuilder(VlanId vlanId) {
TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
sbuilder.matchVlanId(vlanId);
TrafficTreatment.Builder tbuilder = DefaultTrafficTreatment.builder();
ForwardingObjective.Builder fob = DefaultForwardingObjective.builder();
fob.withFlag(ForwardingObjective.Flag.VERSATILE)
.withSelector(sbuilder.build())
.withTreatment(tbuilder.build())
.withPriority(XCONNECT_ACL_PRIORITY)
.fromApp(appId)
.makePermanent();
return fob;
}
/**
* Creates a filtering objective builder for XConnect.
*
* @param key XConnect key
* @param port XConnect ports
* @return next objective builder
*/
private FilteringObjective.Builder filterObjBuilder(XconnectKey key, PortNumber port) {
FilteringObjective.Builder fob = DefaultFilteringObjective.builder();
fob.withKey(Criteria.matchInPort(port))
.addCondition(Criteria.matchVlanId(key.vlanId()))
.addCondition(Criteria.matchEthDst(MacAddress.NONE))
.withPriority(XCONNECT_PRIORITY);
return fob.permit().fromApp(appId);
}
/**
* Add pair port to the given set of port.
*
* @param deviceId device Id
* @param ports ports specified in the xconnect config
* @return port specified in the xconnect config plus the pair port (if configured)
*/
private Set<PortNumber> addPairPort(DeviceId deviceId, Set<PortNumber> ports) {
if (srService == null) {
return ports;
}
Set<PortNumber> newPorts = Sets.newHashSet(ports);
srService.getPairLocalPort(deviceId).ifPresent(newPorts::add);
return newPorts;
}
}