blob: deac5235d07f84ca7e85cdcdb344697f64b3e52e [file] [log] [blame]
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.ofagent.impl;
import com.google.common.collect.ImmutableSet;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.virtual.NetworkId;
import org.onosproject.incubator.net.virtual.VirtualNetworkEvent;
import org.onosproject.incubator.net.virtual.VirtualNetworkListener;
import org.onosproject.incubator.net.virtual.VirtualNetworkService;
import org.onosproject.incubator.net.virtual.VirtualPort;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.onosproject.ofagent.api.OFAgent;
import org.onosproject.ofagent.api.OFAgentEvent;
import org.onosproject.ofagent.api.OFAgentListener;
import org.onosproject.ofagent.api.OFAgentService;
import org.onosproject.ofagent.api.OFController;
import org.onosproject.ofagent.api.OFSwitch;
import org.onosproject.ofagent.api.OFSwitchCapabilities;
import org.onosproject.ofagent.api.OFSwitchService;
import org.projectfloodlight.openflow.types.DatapathId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.BoundedThreadPool.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
/**
* Manages OF switches.
*/
@Component(immediate = true)
@Service
public class OFSwitchManager implements OFSwitchService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final OFSwitchCapabilities DEFAULT_CAPABILITIES =
DefaultOFSwitchCapabilities.builder()
.flowStats()
.tableStats()
.portStats()
.groupStats()
.queueStats()
.ipReasm()
.portBlocked()
.build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualNetworkService virtualNetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OFAgentService ofAgentService;
private final ConcurrentHashMap<DeviceId, OFSwitch> ofSwitchMap = new ConcurrentHashMap<>();
private final ExecutorService eventExecutor = newSingleThreadExecutor(
groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
private final OFAgentListener ofAgentListener = new InternalOFAgentListener();
private final VirtualNetworkListener vNetworkListener = new InternalVirtualNetworkListener();
private final DeviceListener deviceListener = new InternalDeviceListener();
private final FlowRuleListener flowRuleListener = new InternalFlowRuleListener();
private final PacketProcessor packetProcessor = new InternalPacketProcessor();
private NioEventLoopGroup ioWorker;
private ApplicationId appId;
private NodeId localId;
@Activate
protected void activate() {
appId = coreService.registerApplication(APPLICATION_NAME);
localId = clusterService.getLocalNode().id();
ioWorker = new NioEventLoopGroup();
ofAgentService.agents().forEach(this::processOFAgentCreated);
ofAgentService.addListener(ofAgentListener);
virtualNetService.addListener(vNetworkListener);
log.info("Started");
}
@Deactivate
protected void deactivate() {
virtualNetService.removeListener(vNetworkListener);
ofAgentService.removeListener(ofAgentListener);
ofAgentService.agents().forEach(this::processOFAgentStopped);
ioWorker.shutdownGracefully();
eventExecutor.shutdown();
log.info("Stopped");
}
@Override
public Set<OFSwitch> ofSwitches() {
return ImmutableSet.copyOf(ofSwitchMap.values());
}
@Override
public Set<OFSwitch> ofSwitches(NetworkId networkId) {
Set<OFSwitch> ofSwitches = devices(networkId).stream()
.map(ofSwitchMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(ofSwitches);
}
@Override
public Set<Port> ports(NetworkId networkId, DeviceId deviceId) {
Set<Port> ports = virtualNetService.getVirtualPorts(networkId, deviceId)
.stream()
.collect(Collectors.toSet());
return ImmutableSet.copyOf(ports);
}
private void addOFSwitch(NetworkId networkId, DeviceId deviceId) {
OFSwitch ofSwitch = DefaultOFSwitch.of(
dpidWithDeviceId(deviceId),
DEFAULT_CAPABILITIES, networkId, deviceId,
virtualNetService.getServiceDirectory());
ofSwitchMap.put(deviceId, ofSwitch);
log.info("Added virtual OF switch for {}", deviceId);
OFAgent ofAgent = ofAgentService.agent(networkId);
if (ofAgent == null) {
log.error("OFAgent for network {} does not exist", networkId);
return;
}
if (ofAgent.state() == STARTED) {
connectController(ofSwitch, ofAgent.controllers());
}
}
private void deleteOFSwitch(DeviceId deviceId) {
OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
ofSwitch.controllerChannels().forEach(ChannelOutboundInvoker::disconnect);
ofSwitchMap.remove(deviceId);
log.info("Removed virtual OFSwitch for {}", deviceId);
}
private void connectController(OFSwitch ofSwitch, Set<OFController> controllers) {
controllers.forEach(controller -> {
OFConnectionHandler connectionHandler = new OFConnectionHandler(
ofSwitch,
controller,
ioWorker);
connectionHandler.connect();
});
}
private void disconnectController(OFSwitch ofSwitch, Set<OFController> controllers) {
Set<SocketAddress> controllerAddrs = controllers.stream()
.map(ctrl -> new InetSocketAddress(
ctrl.ip().toInetAddress(), ctrl.port().toInt()))
.collect(Collectors.toSet());
ofSwitch.controllerChannels().stream()
.filter(channel -> controllerAddrs.contains(channel.remoteAddress()))
.forEach(ChannelOutboundInvoker::disconnect);
}
private Set<DeviceId> devices(NetworkId networkId) {
Set<DeviceId> deviceIds = virtualNetService.getVirtualDevices(networkId)
.stream()
.map(Device::id)
.collect(Collectors.toSet());
return ImmutableSet.copyOf(deviceIds);
}
private DatapathId dpidWithDeviceId(DeviceId deviceId) {
String strDeviceId = deviceId.toString().split(":")[1];
checkArgument(strDeviceId.length() == 16, "Invalid device ID " + strDeviceId);
String resultedHexString = "";
for (int i = 0; i < 8; i++) {
resultedHexString = resultedHexString + strDeviceId.charAt(2 * i)
+ strDeviceId.charAt(2 * i + 1);
if (i != 7) {
resultedHexString += ":";
}
}
return DatapathId.of(resultedHexString);
}
private void processOFAgentCreated(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(deviceId -> {
addOFSwitch(ofAgent.networkId(), deviceId);
});
}
private void processOFAgentRemoved(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(this::deleteOFSwitch);
}
private void processOFAgentStarted(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(deviceId -> {
OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
if (ofSwitch != null) {
connectController(ofSwitch, ofAgent.controllers());
}
});
DeviceService deviceService = virtualNetService.get(
ofAgent.networkId(),
DeviceService.class);
deviceService.addListener(deviceListener);
PacketService packetService = virtualNetService.get(
ofAgent.networkId(),
PacketService.class);
packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
FlowRuleService flowRuleService = virtualNetService.get(
ofAgent.networkId(),
FlowRuleService.class);
flowRuleService.addListener(flowRuleListener);
}
private void processOFAgentStopped(OFAgent ofAgent) {
devices(ofAgent.networkId()).forEach(deviceId -> {
OFSwitch ofSwitch = ofSwitchMap.get(deviceId);
if (ofSwitch != null) {
disconnectController(ofSwitch, ofAgent.controllers());
}
});
DeviceService deviceService = virtualNetService.get(
ofAgent.networkId(),
DeviceService.class);
deviceService.removeListener(deviceListener);
PacketService packetService = virtualNetService.get(
ofAgent.networkId(),
PacketService.class);
packetService.removeProcessor(packetProcessor);
FlowRuleService flowRuleService = virtualNetService.get(
ofAgent.networkId(),
FlowRuleService.class);
flowRuleService.removeListener(flowRuleListener);
}
private class InternalVirtualNetworkListener implements VirtualNetworkListener {
@Override
public void event(VirtualNetworkEvent event) {
log.trace("Vnet event {}", event);
switch (event.type()) {
case VIRTUAL_DEVICE_ADDED:
eventExecutor.execute(() -> {
log.debug("Virtual device {} added to network {}",
event.virtualDevice().id(),
event.subject());
addOFSwitch(event.subject(), event.virtualDevice().id());
});
break;
case VIRTUAL_DEVICE_UPDATED:
// TODO handle device availability updates
break;
case VIRTUAL_DEVICE_REMOVED:
eventExecutor.execute(() -> {
log.debug("Virtual device {} removed from network {}",
event.virtualDevice().id(),
event.subject());
deleteOFSwitch(event.virtualDevice().id());
});
break;
case NETWORK_UPDATED:
case NETWORK_REMOVED:
case NETWORK_ADDED:
break;
case VIRTUAL_PORT_ADDED:
eventExecutor.execute(() -> {
OFSwitch ofSwitch = ofSwitch(event.virtualPort());
if (ofSwitch != null) {
ofSwitch.processPortAdded(event.virtualPort());
log.debug("Virtual port {} added to network {}",
event.virtualPort(),
event.subject());
}
});
break;
case VIRTUAL_PORT_UPDATED:
break;
case VIRTUAL_PORT_REMOVED:
eventExecutor.execute(() -> {
OFSwitch ofSwitch = ofSwitch(event.virtualPort());
if (ofSwitch != null) {
ofSwitch.processPortRemoved(event.virtualPort());
log.debug("Virtual port {} removed from network {}",
event.virtualPort(),
event.subject());
}
});
break;
default:
// do nothing
break;
}
}
private OFSwitch ofSwitch(VirtualPort virtualPort) {
OFSwitch ofSwitch = ofSwitchMap.get(virtualPort.element().id());
if (ofSwitch == null) {
log.warn("Switch does not exist for port {}", virtualPort);
} else {
log.trace("Switch exists for port {}", virtualPort);
}
return ofSwitch;
}
}
private class InternalOFAgentListener implements OFAgentListener {
@Override
public boolean isRelevant(OFAgentEvent event) {
return Objects.equals(localId, leadershipService.getLeader(appId.name()));
}
@Override
public void event(OFAgentEvent event) {
switch (event.type()) {
case OFAGENT_CREATED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent created: {}", ofAgent);
processOFAgentCreated(ofAgent);
});
break;
case OFAGENT_REMOVED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent removed: {}", ofAgent);
processOFAgentRemoved(ofAgent);
});
break;
case OFAGENT_CONTROLLER_ADDED:
// TODO handle additional controller
break;
case OFAGENT_CONTROLLER_REMOVED:
// TODO handle removed controller
break;
case OFAGENT_STARTED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent started: {}", ofAgent);
processOFAgentStarted(ofAgent);
});
break;
case OFAGENT_STOPPED:
eventExecutor.execute(() -> {
OFAgent ofAgent = event.subject();
log.debug("Processing OFAgent stopped: {}", ofAgent);
processOFAgentStopped(ofAgent);
});
break;
default:
// do nothing
break;
}
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_ADDED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
case PORT_ADDED:
// TODO handle event
case PORT_REMOVED:
// TODO handle event
case PORT_STATS_UPDATED:
case PORT_UPDATED:
default:
break;
}
}
}
private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
// TODO handle packet-in
}
}
private class InternalFlowRuleListener implements FlowRuleListener {
@Override
public void event(FlowRuleEvent event) {
// TODO handle flow rule event
}
}
}