netty4 OpenFlow southbound
- separate I/O thread and message dispatch threads
Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 100203f..b2d77cf 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,24 +16,29 @@
package org.onosproject.openflow.controller.impl;
+import static org.onlab.util.Tools.groupedThreads;
+
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.packet.IpAddress;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
import org.onosproject.openflow.controller.driver.SwitchStateException;
import org.projectfloodlight.openflow.exceptions.OFParseError;
@@ -78,11 +83,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
+
/**
* Channel handler deals with the switch connection and dispatches
* switch messages to the appropriate locations.
*/
-class OFChannelHandler extends IdleStateAwareChannelHandler {
+class OFChannelHandler extends ChannelInboundHandlerAdapter
+ implements OpenFlowSession {
+
private static final Logger log = LoggerFactory.getLogger(OFChannelHandler.class);
private static final String RESET_BY_PEER = "Connection reset by peer";
@@ -91,7 +105,11 @@
private final Controller controller;
private OpenFlowSwitchDriver sw;
private long thisdpid; // channelHandler cached value of connected switch id
+
private Channel channel;
+ private String channelId;
+
+
// State needs to be volatile because the HandshakeTimeoutHandler
// needs to check if the handshake is complete
private volatile ChannelState state;
@@ -121,6 +139,7 @@
//Indicates the openflow version used by this switch
protected OFVersion ofVersion;
+ protected OFFactory factory;
// deprecated in 1.10.0
@Deprecated
@@ -135,11 +154,43 @@
*/
private int handshakeTransactionIds = -1;
+
+
+ private static final int MSG_READ_BUFFER = 5000;
+
+ /**
+ * OFMessage dispatch queue.
+ */
+ private final BlockingQueue<OFMessage> dispatchQueue =
+ new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+
+ /**
+ * Single thread executor for OFMessage dispatching.
+ *
+ * Gets initialized on channelActive, shutdown on channelInactive.
+ */
+ private ExecutorService dispatcher;
+
+ /**
+ * Handle for dispatcher thread.
+ * <p>
+ * Should only be touched from the Channel I/O thread
+ */
+ private Future<?> dispatcherHandle = CompletableFuture.completedFuture(null);
+
+ /**
+ * Dispatch backlog.
+ * <p>
+ * Should only be touched from the Channel I/O thread
+ */
+ private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+
/**
* Create a new unconnected OFChannelHandler.
* @param controller parent controller
*/
OFChannelHandler(Controller controller) {
+
this.controller = controller;
this.state = ChannelState.INIT;
this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
@@ -208,30 +259,33 @@
if (m.getVersion().getWireVersion() >= OFVersion.OF_13.getWireVersion()) {
log.debug("Received {} Hello from {} - switching to OF "
+ "version 1.3+", m.getVersion(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
h.ofVersion = m.getVersion();
+ h.factory = OFFactories.getFactory(h.ofVersion);
h.sendHandshakeHelloMessage();
} else if (m.getVersion().getWireVersion() >= OFVersion.OF_10.getWireVersion()) {
log.debug("Received {} Hello from {} - switching to OF "
+ "version 1.0", m.getVersion(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
h.ofVersion = m.getVersion();
+ h.factory = OFFactories.getFactory(h.ofVersion);
OFHello hi =
- OFFactories.getFactory(h.ofVersion).buildHello()
+ h.factory.buildHello()
.setXid(h.handshakeTransactionIds--)
.build();
- h.channel.write(Collections.singletonList(hi));
+ h.channel.writeAndFlush(Collections.singletonList(hi));
} else {
log.error("Received Hello of version {} from switch at {}. "
+ "This controller works with OF1.0 and OF1.3 "
+ "switches. Disconnecting switch ...",
- m.getVersion(), h.channel.getRemoteAddress());
+ m.getVersion(), h.channel.remoteAddress());
h.channel.disconnect();
return;
}
h.sendHandshakeFeaturesRequestMessage();
h.setState(WAIT_FEATURES_REPLY);
}
+
@Override
void processOFFeaturesReply(OFChannelHandler h, OFFeaturesReply m)
throws IOException, SwitchStateException {
@@ -313,7 +367,7 @@
if (m.getStatsType() != OFStatsType.PORT_DESC) {
log.warn("Expecting port description stats but received stats "
+ "type {} from {}. Ignoring ...", m.getStatsType(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
if (m.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
@@ -465,22 +519,21 @@
if (m.getStatsType() != OFStatsType.DESC) {
log.warn("Expecting Description stats but received stats "
+ "type {} from {}. Ignoring ...", m.getStatsType(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
OFDescStatsReply drep = (OFDescStatsReply) m;
log.info("Received switch description reply {} from switch at {}",
- drep, h.channel.getRemoteAddress());
+ drep, h.channel.remoteAddress());
// Here is where we differentiate between different kinds of switches
h.sw = h.controller.getOFSwitchInstance(h.thisdpid, drep, h.ofVersion);
h.sw.setOFVersion(h.ofVersion);
h.sw.setFeaturesReply(h.featuresReply);
- //h.sw.setPortDescReply(h.portDescReply);
h.sw.setPortDescReplies(h.portDescReplies);
h.sw.setMeterFeaturesReply(h.meterFeaturesReply);
h.sw.setConnected(true);
- h.sw.setChannel(h.channel);
+ h.sw.setChannel(h);
// boolean success = h.sw.connectSwitch();
//
// if (!success) {
@@ -1072,7 +1125,7 @@
throws IOException, SwitchStateException {
// we only expect hello in the WAIT_HELLO state
log.warn("Received Hello outside WAIT_HELLO state; switch {} is not complaint.",
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
}
void processOFBarrierReply(OFChannelHandler h, OFBarrierReply m)
@@ -1084,16 +1137,15 @@
throws IOException {
if (h.ofVersion == null) {
log.error("No OF version set for {}. Not sending Echo REPLY",
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
- OFFactory factory = OFFactories.getFactory(h.ofVersion);
- OFEchoReply reply = factory
- .buildEchoReply()
- .setXid(m.getXid())
- .setData(m.getData())
- .build();
- h.channel.write(Collections.singletonList(reply));
+ OFEchoReply reply = h.factory
+ .buildEchoReply()
+ .setXid(m.getXid())
+ .setData(m.getData())
+ .build();
+ h.channel.writeAndFlush(Collections.singletonList(reply));
}
void processOFEchoReply(OFChannelHandler h, OFEchoReply m)
@@ -1175,11 +1227,28 @@
//*************************
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- channel = e.getChannel();
+ public void channelActive(ChannelHandlerContext ctx)
+ throws Exception {
+
+ channel = ctx.channel();
log.info("New switch connection from {}",
- channel.getRemoteAddress());
+ channel.remoteAddress());
+
+ SocketAddress address = channel.remoteAddress();
+ if (address instanceof InetSocketAddress) {
+ final InetSocketAddress inetAddress = (InetSocketAddress) address;
+ final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+ if (ipAddress.isIp4()) {
+ channelId = ipAddress.toString() + ':' + inetAddress.getPort();
+ } else {
+ channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
+ }
+ } else {
+ channelId = channel.toString();
+ }
+
+ dispatcher = Executors.newSingleThreadExecutor(groupedThreads("onos/of/dispatcher", channelId, log));
+
/*
hack to wait for the switch to tell us what it's
max version is. This is not spec compliant and should
@@ -1190,81 +1259,89 @@
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx)
+ throws Exception {
+
log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
- getSwitchInfoString());
- if (thisdpid != 0) {
- if (!duplicateDpidFound) {
- // if the disconnected switch (on this ChannelHandler)
- // was not one with a duplicate-dpid, it is safe to remove all
- // state for it at the controller. Notice that if the disconnected
- // switch was a duplicate-dpid, calling the method below would clear
- // all state for the original switch (with the same dpid),
- // which we obviously don't want.
- log.info("{}:removal called", getSwitchInfoString());
- if (sw != null) {
- sw.removeConnectedSwitch();
- }
- } else {
- // A duplicate was disconnected on this ChannelHandler,
- // this is the same switch reconnecting, but the original state was
- // not cleaned up - XXX check liveness of original ChannelHandler
- log.info("{}:duplicate found", getSwitchInfoString());
- duplicateDpidFound = Boolean.FALSE;
- }
- } else {
- log.warn("no dpid in channelHandler registered for "
- + "disconnected switch {}", getSwitchInfoString());
+ getSwitchInfoString());
+
+ if (dispatcher != null) {
+ dispatcher.shutdown();
}
+
+ if (thisdpid != 0) {
+ if (!duplicateDpidFound) {
+ // if the disconnected switch (on this ChannelHandler)
+ // was not one with a duplicate-dpid, it is safe to remove all
+ // state for it at the controller. Notice that if the disconnected
+ // switch was a duplicate-dpid, calling the method below would clear
+ // all state for the original switch (with the same dpid),
+ // which we obviously don't want.
+ log.info("{}:removal called", getSwitchInfoString());
+ if (sw != null) {
+ sw.removeConnectedSwitch();
+ }
+ } else {
+ // A duplicate was disconnected on this ChannelHandler,
+ // this is the same switch reconnecting, but the original state was
+ // not cleaned up - XXX check liveness of original ChannelHandler
+ log.info("{}:duplicate found", getSwitchInfoString());
+ duplicateDpidFound = Boolean.FALSE;
+ }
+ } else {
+ log.warn("no dpid in channelHandler registered for "
+ + "disconnected switch {}", getSwitchInfoString());
+ }
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx,
+ Throwable cause)
throws Exception {
- if (e.getCause() instanceof ReadTimeoutException) {
+
+ if (cause instanceof ReadTimeoutException) {
// switch timeout
log.error("Disconnecting switch {} due to read timeout",
getSwitchInfoString());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof HandshakeTimeoutException) {
+ ctx.channel().close();
+ } else if (cause instanceof HandshakeTimeoutException) {
log.error("Disconnecting switch {}: failed to complete handshake",
getSwitchInfoString());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof ClosedChannelException) {
+ ctx.channel().close();
+ } else if (cause instanceof ClosedChannelException) {
log.debug("Channel for sw {} already closed", getSwitchInfoString());
- } else if (e.getCause() instanceof IOException) {
- if (!e.getCause().getMessage().equals(RESET_BY_PEER) &&
- !e.getCause().getMessage().equals(BROKEN_PIPE)) {
+ } else if (cause instanceof IOException) {
+ if (!cause.getMessage().equals(RESET_BY_PEER) &&
+ !cause.getMessage().equals(BROKEN_PIPE)) {
log.error("Disconnecting switch {} due to IO Error: {}",
- getSwitchInfoString(), e.getCause().getMessage());
+ getSwitchInfoString(), cause.getMessage());
if (log.isDebugEnabled()) {
// still print stack trace if debug is enabled
- log.debug("StackTrace for previous Exception: ", e.getCause());
+ log.debug("StackTrace for previous Exception: ", cause);
}
}
- ctx.getChannel().close();
- } else if (e.getCause() instanceof SwitchStateException) {
+ ctx.channel().close();
+ } else if (cause instanceof SwitchStateException) {
log.error("Disconnecting switch {} due to switch state error: {}",
- getSwitchInfoString(), e.getCause().getMessage());
+ getSwitchInfoString(), cause.getMessage());
if (log.isDebugEnabled()) {
// still print stack trace if debug is enabled
- log.debug("StackTrace for previous Exception: ", e.getCause());
+ log.debug("StackTrace for previous Exception: ", cause);
}
- ctx.getChannel().close();
- } else if (e.getCause() instanceof OFParseError) {
+ ctx.channel().close();
+ } else if (cause instanceof OFParseError) {
log.error("Disconnecting switch "
+ getSwitchInfoString() +
" due to message parse failure",
- e.getCause());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof RejectedExecutionException) {
+ cause);
+ ctx.channel().close();
+ } else if (cause instanceof RejectedExecutionException) {
log.warn("Could not process message: queue full");
} else {
log.error("Error while processing message from switch "
+ getSwitchInfoString()
- + "state " + this.state, e.getCause());
- ctx.getChannel().close();
+ + "state " + this.state, cause);
+ ctx.channel().close();
}
}
@@ -1273,38 +1350,54 @@
return getSwitchInfoString();
}
- @Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+ protected void channelIdle(ChannelHandlerContext ctx,
+ IdleStateEvent e)
throws Exception {
- OFFactory factory = OFFactories.getFactory(ofVersion);
OFMessage m = factory.buildEchoRequest().build();
log.debug("Sending Echo Request on idle channel: {}",
- e.getChannel().getPipeline().getLast());
- e.getChannel().write(Collections.singletonList(m));
+ ctx.channel());
+ ctx.write(Collections.singletonList(m), ctx.voidPromise());
// XXX S some problems here -- echo request has no transaction id, and
// echo reply is not correlated to the echo request.
state.processIdle(this);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void userEventTriggered(ChannelHandlerContext ctx,
+ Object evt)
throws Exception {
- if (e.getMessage() instanceof List) {
- @SuppressWarnings("unchecked")
- List<OFMessage> msglist = (List<OFMessage>) e.getMessage();
+ if (evt instanceof IdleStateEvent) {
+ channelIdle(ctx, (IdleStateEvent) evt);
+ }
- for (OFMessage ofm : msglist) {
- // Do the actual packet processing
- state.processOFMessage(this, ofm);
+ super.userEventTriggered(ctx, evt);
+ }
+
+ // SimpleChannelInboundHandler without dependency to TypeParameterMatcher
+ @Override
+ public void channelRead(ChannelHandlerContext ctx,
+ Object msg) throws Exception {
+
+ boolean release = true;
+ try {
+ if (msg instanceof OFMessage) {
+ // channelRead0 inlined
+ state.processOFMessage(this, (OFMessage) msg);
+ } else {
+ release = false;
+ ctx.fireChannelRead(msg);
}
- } else {
- state.processOFMessage(this, (OFMessage) e.getMessage());
+ } finally {
+ if (release) {
+ ReferenceCountUtil.release(msg);
+ }
}
}
+
//*************************
// Channel utility methods
//*************************
@@ -1318,7 +1411,63 @@
}
private void dispatchMessage(OFMessage m) {
- sw.handleMessage(m);
+
+ if (dispatchBacklog.isEmpty()) {
+ if (!dispatchQueue.offer(m)) {
+ // queue full
+ channel.config().setAutoRead(false);
+ // put it on the head of backlog
+ dispatchBacklog.addFirst(m);
+ return;
+ }
+ } else {
+ dispatchBacklog.addLast(m);
+ }
+
+ while (!dispatchBacklog.isEmpty()) {
+ OFMessage msg = dispatchBacklog.pop();
+
+ if (!dispatchQueue.offer(msg)) {
+ // queue full
+ channel.config().setAutoRead(false);
+ // put it back to the head of backlog
+ dispatchBacklog.addFirst(msg);
+ return;
+ }
+ }
+
+
+ if (dispatcherHandle.isDone()) {
+ // dispatcher terminated for some reason, restart
+
+ dispatcherHandle = dispatcher.submit(() -> {
+ try {
+ List<OFMessage> msgs = new ArrayList<>();
+ for (;;) {
+ // wait for new message
+ OFMessage msg = dispatchQueue.take();
+ sw.handleMessage(msg);
+
+ while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
+ if (!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ }
+ msgs.forEach(sw::handleMessage);
+ msgs.clear();
+ }
+
+ if (!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // interrupted. gracefully shutting down
+ return;
+ }
+
+ });
+ }
}
/**
@@ -1331,10 +1480,10 @@
return sw.toString();
}
String channelString;
- if (channel == null || channel.getRemoteAddress() == null) {
+ if (channel == null || channel.remoteAddress() == null) {
channelString = "?";
} else {
- channelString = channel.getRemoteAddress().toString();
+ channelString = channel.remoteAddress().toString();
}
String dpidString;
if (featuresReply == null) {
@@ -1378,8 +1527,8 @@
.buildHello()
.setXid(this.handshakeTransactionIds--)
.setElements(Collections.singletonList(hem));
- log.info("Sending {} Hello to {}", version, channel.getRemoteAddress());
- channel.write(Collections.singletonList(mb.build()));
+ log.info("Sending {} Hello to {}", version, channel.remoteAddress());
+ channel.writeAndFlush(Collections.singletonList(mb.build()));
}
/**
@@ -1387,12 +1536,11 @@
* @throws IOException
*/
private void sendHandshakeFeaturesRequestMessage() throws IOException {
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending FEATURES_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending FEATURES_REQUEST to {}", channel.remoteAddress());
OFMessage m = factory.buildFeaturesRequest()
.setXid(this.handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(m));
+ channel.writeAndFlush(Collections.singletonList(m));
}
/**
@@ -1401,8 +1549,7 @@
* @throws IOException
*/
private void sendHandshakeSetConfig() throws IOException {
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending CONFIG_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending CONFIG_REQUEST to {}", channel.remoteAddress());
List<OFMessage> msglist = new ArrayList<>(3);
// Ensure we receive the full packet via PacketIn
@@ -1431,7 +1578,7 @@
.setXid(this.handshakeTransactionIds--)
.build();
msglist.add(gcr);
- channel.write(msglist);
+ channel.writeAndFlush(msglist);
}
/**
@@ -1440,13 +1587,12 @@
*/
private void sendHandshakeDescriptionStatsRequest() throws IOException {
// Get Description to set switch-specific flags
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending DESC_STATS_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending DESC_STATS_REQUEST to {}", channel.remoteAddress());
OFDescStatsRequest dreq = factory
.buildDescStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(dreq));
+ channel.writeAndFlush(Collections.singletonList(dreq));
}
/**
@@ -1457,26 +1603,59 @@
private void sendMeterFeaturesRequest() throws IOException {
// Get meter features including the MaxMeters value available for the device
OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending METER_FEATURES_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending METER_FEATURES_REQUEST to {}", channel.remoteAddress());
OFMeterFeaturesStatsRequest mfreq = factory
.buildMeterFeaturesStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(mfreq));
+ channel.writeAndFlush(Collections.singletonList(mfreq));
}
private void sendHandshakeOFPortDescRequest() throws IOException {
- log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.remoteAddress());
// Get port description for 1.3+ switch
- OFPortDescStatsRequest preq = OFFactories.getFactory(ofVersion)
+ OFPortDescStatsRequest preq = factory
.buildPortDescStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(preq));
+ channel.writeAndFlush(Collections.singletonList(preq));
}
ChannelState getStateForTesting() {
return state;
}
+
+ @Override
+ public boolean isActive() {
+ if (channel != null) {
+ return channel.isActive();
+ }
+ return false;
+ }
+
+ @Override
+ public void closeSession() {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ @Override
+ public boolean sendMsg(Iterable<OFMessage> msgs) {
+ if (channel.isActive()) {
+ channel.writeAndFlush(msgs, channel.voidPromise());
+ return true;
+ } else {
+ log.warn("Dropping messages for switch {} because channel is not connected: {}",
+ getSwitchInfoString(), msgs);
+ return false;
+ }
+ }
+
+ @Override
+ public CharSequence sessionInfo() {
+ return channelId;
+ }
+
}