netty4 OpenFlow southbound
- separate I/O thread and message dispatch threads
Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/BUCK b/protocols/openflow/ctl/BUCK
index 62f40af..fb7667f 100644
--- a/protocols/openflow/ctl/BUCK
+++ b/protocols/openflow/ctl/BUCK
@@ -1,7 +1,13 @@
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//lib:openflowj',
- '//protocols/openflow/api:onos-protocols-openflow-api'
+ '//protocols/openflow/api:onos-protocols-openflow-api',
+ '//lib:netty-buffer',
+ '//lib:netty-codec',
+ '//lib:netty-common',
+ '//lib:netty-handler',
+ '//lib:netty-transport',
+ '//lib:netty-transport-native-epoll',
]
TEST_DEPS = [
diff --git a/protocols/openflow/ctl/pom.xml b/protocols/openflow/ctl/pom.xml
index 7dcd69d..baccf7c 100644
--- a/protocols/openflow/ctl/pom.xml
+++ b/protocols/openflow/ctl/pom.xml
@@ -20,29 +20,56 @@
<parent>
<groupId>org.onosproject</groupId>
- <artifactId>onos-of</artifactId>
+ <artifactId>onos-protocols-openflow</artifactId>
<version>1.11.0-SNAPSHOT</version>
</parent>
- <artifactId>onos-of-ctl</artifactId>
+ <artifactId>onos-protocols-openflow-ctl</artifactId>
<packaging>bundle</packaging>
<description>ONOS OpenFlow controller subsystem API</description>
<dependencies>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>${netty4.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <version>${netty4.version}</version>
+ <classifier>linux-x86_64</classifier>
+ </dependency>
+<!--
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-kqueue</artifactId>
+ <version>${netty4.version}</version>
+ </dependency>
+ -->
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-of-api</artifactId>
+ <artifactId>onos-protocols-openflow-api</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>openflowj</artifactId>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
@@ -68,6 +95,38 @@
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+
+ <!--
+ Netty native code related utils seems to assume,
+ Native related loading uses same class loader(~=bundle)
+ Embed inline required slice of netty-common and native-transport
+ -->
+ <configuration>
+ <instructions combine.children="append">
+ <DynamicImport-Package>*</DynamicImport-Package>
+ <Embed-Dependency>
+ netty-transport-native-epoll;inline=true,
+ *;artifactId=netty-common;inline=io/netty/util/internal/*,
+ </Embed-Dependency>
+ <Embed-Transitive>false</Embed-Transitive>
+ <Bundle-NativeCode>
+ META-INF/native/libnetty-transport-native-epoll.so;osname=linux;processor=x86_64,
+ *
+ </Bundle-NativeCode>
+ <Import-Package>
+ sun.misc;resolution:=optional,
+ org.apache.commons.logging;resolution:=optional,
+ org.apache.logging.log4j;resolution:=optional,
+ *
+ </Import-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index b854898..f328ae7 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -18,11 +18,17 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.net.DeviceId;
@@ -46,13 +52,11 @@
import java.io.FileInputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
-import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -96,7 +100,8 @@
private OpenFlowAgent agent;
- private NioServerSocketChannelFactory execFactory;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
protected String ksLocation;
protected String tsLocation;
@@ -144,20 +149,19 @@
try {
final ServerBootstrap bootstrap = createServerBootStrap();
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+ bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+ bootstrap.childOption(ChannelOption.SO_SNDBUF, Controller.SEND_BUFFER_SIZE);
+// bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+// new WriteBufferWaterMark(8 * 1024, 32 * 1024));
- bootstrap.setOption("reuseAddr", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
+ bootstrap.childHandler(new OFChannelInitializer(this, null, sslContext));
- ChannelPipelineFactory pfact =
- new OpenflowPipelineFactory(this, null, sslContext);
- bootstrap.setPipelineFactory(pfact);
- cg = new DefaultChannelGroup();
openFlowPorts.forEach(port -> {
- InetSocketAddress sa = new InetSocketAddress(port);
- cg.add(bootstrap.bind(sa));
- log.info("Listening for switch connections on {}", sa);
+ // TODO revisit if this is best way to listen to multiple ports
+ cg.add(bootstrap.bind(port).syncUninterruptibly().channel());
+ log.info("Listening for switch connections on {}", port);
});
} catch (Exception e) {
@@ -168,20 +172,43 @@
private ServerBootstrap createServerBootStrap() {
- if (workerThreads == 0) {
- execFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
- Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)));
- return new ServerBootstrap(execFactory);
- } else {
- execFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
- Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)), workerThreads);
- return new ServerBootstrap(execFactory);
+ int bossThreads = Math.max(1, openFlowPorts.size());
+ try {
+ bossGroup = new EpollEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+ workerGroup = new EpollEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+ ServerBootstrap bs = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(EpollServerSocketChannel.class);
+ log.info("Using Epoll transport");
+ return bs;
+ } catch (Throwable e) {
+ log.debug("Failed to initialize native (epoll) transport: {}", e.getMessage());
}
+
+// Requires 4.1.11 or later
+// try {
+// bossGroup = new KQueueEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+// workerGroup = new KQueueEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+// ServerBootstrap bs = new ServerBootstrap()
+// .group(bossGroup, workerGroup)
+// .channel(KQueueServerSocketChannel.class);
+// log.info("Using Kqueue transport");
+// return bs;
+// } catch (Throwable e) {
+// log.debug("Failed to initialize native (kqueue) transport. ", e.getMessage());
+// }
+
+ bossGroup = new NioEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+ workerGroup = new NioEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+ log.info("Using Nio transport");
+ return new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class);
}
public void setConfigParams(Dictionary<?, ?> properties) {
+ // TODO should be possible to reconfigure ports without restart,
+ // by updating ChannelGroup
String ports = get(properties, "openflowPorts");
if (!Strings.isNullOrEmpty(ports)) {
this.openFlowPorts = Stream.of(ports.split(","))
@@ -207,6 +234,8 @@
this.systemStartTime = System.currentTimeMillis();
+ cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
try {
getTlsParameters();
if (enableOfTls) {
@@ -303,11 +332,11 @@
}
if (driver == null) {
- log.error("No OpenFlow driver for {} : {}", dpid, desc);
+ log.error("No OpenFlow driver for {} : {}", dpidObj, desc);
return null;
}
- log.info("Driver {} assigned to device {}", driver.name(), dpidObj);
+ log.info("Driver '{}' assigned to device {}", driver.name(), dpidObj);
if (!driver.hasBehaviour(OpenFlowSwitchDriver.class)) {
log.error("Driver {} does not support OpenFlowSwitchDriver behaviour", driver.name());
@@ -321,7 +350,6 @@
ofSwitchDriver.init(dpidObj, desc, ofv);
ofSwitchDriver.setAgent(agent);
ofSwitchDriver.setRoleHandler(new RoleManager(ofSwitchDriver));
- log.info("OpenFlow handshaker found for device {}: {}", dpid, ofSwitchDriver);
return ofSwitchDriver;
}
@@ -337,7 +365,19 @@
public void stop() {
log.info("Stopping OpenFlow IO");
cg.close();
- execFactory.shutdown();
+
+ // Shut down all event loops to terminate all threads.
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+
+ // Wait until all threads are terminated.
+ try {
+ bossGroup.terminationFuture().sync();
+ workerGroup.terminationFuture().sync();
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while stopping", e);
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
index 43adf19..0b7f430 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
@@ -16,78 +16,82 @@
package org.onosproject.openflow.controller.impl;
+import static org.slf4j.LoggerFactory.getLogger;
+
import java.util.concurrent.TimeUnit;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
/**
* Trigger a timeout if a switch fails to complete handshake soon enough.
*/
public class HandshakeTimeoutHandler
- extends SimpleChannelUpstreamHandler {
- static final HandshakeTimeoutException EXCEPTION =
- new HandshakeTimeoutException();
+ extends ChannelDuplexHandler {
+
+ private static final Logger log = getLogger(HandshakeTimeoutHandler.class);
final OFChannelHandler channelHandler;
- final Timer timer;
- final long timeoutNanos;
- volatile Timeout timeout;
+ final long timeoutMillis;
+ volatile long deadline;
public HandshakeTimeoutHandler(OFChannelHandler channelHandler,
- Timer timer,
long timeoutSeconds) {
super();
this.channelHandler = channelHandler;
- this.timer = timer;
- this.timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSeconds);
-
+ this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSeconds);
+ this.deadline = System.currentTimeMillis() + timeoutMillis;
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- if (timeoutNanos > 0) {
- timeout = timer.newTimeout(new HandshakeTimeoutTask(ctx),
- timeoutNanos, TimeUnit.NANOSECONDS);
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ if (timeoutMillis > 0) {
+ // set Handshake deadline
+ deadline = System.currentTimeMillis() + timeoutMillis;
}
- ctx.sendUpstream(e);
+ super.channelActive(ctx);
}
@Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception {
- if (timeout != null) {
- timeout.cancel();
- timeout = null;
- }
+ public void read(ChannelHandlerContext ctx) throws Exception {
+ checkTimeout(ctx);
+ super.read(ctx);
}
- private final class HandshakeTimeoutTask implements TimerTask {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
+ ChannelPromise promise)
+ throws Exception {
+ checkTimeout(ctx);
+ super.write(ctx, msg, promise);
+ }
- private final ChannelHandlerContext ctx;
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx,
+ Object evt)
+ throws Exception {
- HandshakeTimeoutTask(ChannelHandlerContext ctx) {
- this.ctx = ctx;
+ // expecting idle event
+ checkTimeout(ctx);
+ super.userEventTriggered(ctx, evt);
+ }
+
+ void checkTimeout(ChannelHandlerContext ctx) {
+ if (channelHandler.isHandshakeComplete()) {
+ // handshake complete, Handshake monitoring timeout no-longer needed
+ ctx.channel().pipeline().remove(this);
+ return;
}
- @Override
- public void run(Timeout t) throws Exception {
- if (t.isCancelled()) {
- return;
- }
+ if (!ctx.channel().isActive()) {
+ return;
+ }
- if (!ctx.getChannel().isOpen()) {
- return;
- }
- if (!channelHandler.isHandshakeComplete()) {
- Channels.fireExceptionCaught(ctx, EXCEPTION);
- }
+ if (System.currentTimeMillis() > deadline) {
+ log.info("Handshake time out {}", channelHandler);
+ ctx.fireExceptionCaught(new HandshakeTimeoutException());
}
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 100203f..b2d77cf 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,24 +16,29 @@
package org.onosproject.openflow.controller.impl;
+import static org.onlab.util.Tools.groupedThreads;
+
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.packet.IpAddress;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
import org.onosproject.openflow.controller.driver.SwitchStateException;
import org.projectfloodlight.openflow.exceptions.OFParseError;
@@ -78,11 +83,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
+
/**
* Channel handler deals with the switch connection and dispatches
* switch messages to the appropriate locations.
*/
-class OFChannelHandler extends IdleStateAwareChannelHandler {
+class OFChannelHandler extends ChannelInboundHandlerAdapter
+ implements OpenFlowSession {
+
private static final Logger log = LoggerFactory.getLogger(OFChannelHandler.class);
private static final String RESET_BY_PEER = "Connection reset by peer";
@@ -91,7 +105,11 @@
private final Controller controller;
private OpenFlowSwitchDriver sw;
private long thisdpid; // channelHandler cached value of connected switch id
+
private Channel channel;
+ private String channelId;
+
+
// State needs to be volatile because the HandshakeTimeoutHandler
// needs to check if the handshake is complete
private volatile ChannelState state;
@@ -121,6 +139,7 @@
//Indicates the openflow version used by this switch
protected OFVersion ofVersion;
+ protected OFFactory factory;
// deprecated in 1.10.0
@Deprecated
@@ -135,11 +154,43 @@
*/
private int handshakeTransactionIds = -1;
+
+
+ private static final int MSG_READ_BUFFER = 5000;
+
+ /**
+ * OFMessage dispatch queue.
+ */
+ private final BlockingQueue<OFMessage> dispatchQueue =
+ new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+
+ /**
+ * Single thread executor for OFMessage dispatching.
+ *
+ * Gets initialized on channelActive, shutdown on channelInactive.
+ */
+ private ExecutorService dispatcher;
+
+ /**
+ * Handle for dispatcher thread.
+ * <p>
+ * Should only be touched from the Channel I/O thread
+ */
+ private Future<?> dispatcherHandle = CompletableFuture.completedFuture(null);
+
+ /**
+ * Dispatch backlog.
+ * <p>
+ * Should only be touched from the Channel I/O thread
+ */
+ private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+
/**
* Create a new unconnected OFChannelHandler.
* @param controller parent controller
*/
OFChannelHandler(Controller controller) {
+
this.controller = controller;
this.state = ChannelState.INIT;
this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
@@ -208,30 +259,33 @@
if (m.getVersion().getWireVersion() >= OFVersion.OF_13.getWireVersion()) {
log.debug("Received {} Hello from {} - switching to OF "
+ "version 1.3+", m.getVersion(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
h.ofVersion = m.getVersion();
+ h.factory = OFFactories.getFactory(h.ofVersion);
h.sendHandshakeHelloMessage();
} else if (m.getVersion().getWireVersion() >= OFVersion.OF_10.getWireVersion()) {
log.debug("Received {} Hello from {} - switching to OF "
+ "version 1.0", m.getVersion(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
h.ofVersion = m.getVersion();
+ h.factory = OFFactories.getFactory(h.ofVersion);
OFHello hi =
- OFFactories.getFactory(h.ofVersion).buildHello()
+ h.factory.buildHello()
.setXid(h.handshakeTransactionIds--)
.build();
- h.channel.write(Collections.singletonList(hi));
+ h.channel.writeAndFlush(Collections.singletonList(hi));
} else {
log.error("Received Hello of version {} from switch at {}. "
+ "This controller works with OF1.0 and OF1.3 "
+ "switches. Disconnecting switch ...",
- m.getVersion(), h.channel.getRemoteAddress());
+ m.getVersion(), h.channel.remoteAddress());
h.channel.disconnect();
return;
}
h.sendHandshakeFeaturesRequestMessage();
h.setState(WAIT_FEATURES_REPLY);
}
+
@Override
void processOFFeaturesReply(OFChannelHandler h, OFFeaturesReply m)
throws IOException, SwitchStateException {
@@ -313,7 +367,7 @@
if (m.getStatsType() != OFStatsType.PORT_DESC) {
log.warn("Expecting port description stats but received stats "
+ "type {} from {}. Ignoring ...", m.getStatsType(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
if (m.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
@@ -465,22 +519,21 @@
if (m.getStatsType() != OFStatsType.DESC) {
log.warn("Expecting Description stats but received stats "
+ "type {} from {}. Ignoring ...", m.getStatsType(),
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
OFDescStatsReply drep = (OFDescStatsReply) m;
log.info("Received switch description reply {} from switch at {}",
- drep, h.channel.getRemoteAddress());
+ drep, h.channel.remoteAddress());
// Here is where we differentiate between different kinds of switches
h.sw = h.controller.getOFSwitchInstance(h.thisdpid, drep, h.ofVersion);
h.sw.setOFVersion(h.ofVersion);
h.sw.setFeaturesReply(h.featuresReply);
- //h.sw.setPortDescReply(h.portDescReply);
h.sw.setPortDescReplies(h.portDescReplies);
h.sw.setMeterFeaturesReply(h.meterFeaturesReply);
h.sw.setConnected(true);
- h.sw.setChannel(h.channel);
+ h.sw.setChannel(h);
// boolean success = h.sw.connectSwitch();
//
// if (!success) {
@@ -1072,7 +1125,7 @@
throws IOException, SwitchStateException {
// we only expect hello in the WAIT_HELLO state
log.warn("Received Hello outside WAIT_HELLO state; switch {} is not complaint.",
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
}
void processOFBarrierReply(OFChannelHandler h, OFBarrierReply m)
@@ -1084,16 +1137,15 @@
throws IOException {
if (h.ofVersion == null) {
log.error("No OF version set for {}. Not sending Echo REPLY",
- h.channel.getRemoteAddress());
+ h.channel.remoteAddress());
return;
}
- OFFactory factory = OFFactories.getFactory(h.ofVersion);
- OFEchoReply reply = factory
- .buildEchoReply()
- .setXid(m.getXid())
- .setData(m.getData())
- .build();
- h.channel.write(Collections.singletonList(reply));
+ OFEchoReply reply = h.factory
+ .buildEchoReply()
+ .setXid(m.getXid())
+ .setData(m.getData())
+ .build();
+ h.channel.writeAndFlush(Collections.singletonList(reply));
}
void processOFEchoReply(OFChannelHandler h, OFEchoReply m)
@@ -1175,11 +1227,28 @@
//*************************
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- channel = e.getChannel();
+ public void channelActive(ChannelHandlerContext ctx)
+ throws Exception {
+
+ channel = ctx.channel();
log.info("New switch connection from {}",
- channel.getRemoteAddress());
+ channel.remoteAddress());
+
+ SocketAddress address = channel.remoteAddress();
+ if (address instanceof InetSocketAddress) {
+ final InetSocketAddress inetAddress = (InetSocketAddress) address;
+ final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+ if (ipAddress.isIp4()) {
+ channelId = ipAddress.toString() + ':' + inetAddress.getPort();
+ } else {
+ channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
+ }
+ } else {
+ channelId = channel.toString();
+ }
+
+ dispatcher = Executors.newSingleThreadExecutor(groupedThreads("onos/of/dispatcher", channelId, log));
+
/*
hack to wait for the switch to tell us what it's
max version is. This is not spec compliant and should
@@ -1190,81 +1259,89 @@
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx)
+ throws Exception {
+
log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
- getSwitchInfoString());
- if (thisdpid != 0) {
- if (!duplicateDpidFound) {
- // if the disconnected switch (on this ChannelHandler)
- // was not one with a duplicate-dpid, it is safe to remove all
- // state for it at the controller. Notice that if the disconnected
- // switch was a duplicate-dpid, calling the method below would clear
- // all state for the original switch (with the same dpid),
- // which we obviously don't want.
- log.info("{}:removal called", getSwitchInfoString());
- if (sw != null) {
- sw.removeConnectedSwitch();
- }
- } else {
- // A duplicate was disconnected on this ChannelHandler,
- // this is the same switch reconnecting, but the original state was
- // not cleaned up - XXX check liveness of original ChannelHandler
- log.info("{}:duplicate found", getSwitchInfoString());
- duplicateDpidFound = Boolean.FALSE;
- }
- } else {
- log.warn("no dpid in channelHandler registered for "
- + "disconnected switch {}", getSwitchInfoString());
+ getSwitchInfoString());
+
+ if (dispatcher != null) {
+ dispatcher.shutdown();
}
+
+ if (thisdpid != 0) {
+ if (!duplicateDpidFound) {
+ // if the disconnected switch (on this ChannelHandler)
+ // was not one with a duplicate-dpid, it is safe to remove all
+ // state for it at the controller. Notice that if the disconnected
+ // switch was a duplicate-dpid, calling the method below would clear
+ // all state for the original switch (with the same dpid),
+ // which we obviously don't want.
+ log.info("{}:removal called", getSwitchInfoString());
+ if (sw != null) {
+ sw.removeConnectedSwitch();
+ }
+ } else {
+ // A duplicate was disconnected on this ChannelHandler,
+ // this is the same switch reconnecting, but the original state was
+ // not cleaned up - XXX check liveness of original ChannelHandler
+ log.info("{}:duplicate found", getSwitchInfoString());
+ duplicateDpidFound = Boolean.FALSE;
+ }
+ } else {
+ log.warn("no dpid in channelHandler registered for "
+ + "disconnected switch {}", getSwitchInfoString());
+ }
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx,
+ Throwable cause)
throws Exception {
- if (e.getCause() instanceof ReadTimeoutException) {
+
+ if (cause instanceof ReadTimeoutException) {
// switch timeout
log.error("Disconnecting switch {} due to read timeout",
getSwitchInfoString());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof HandshakeTimeoutException) {
+ ctx.channel().close();
+ } else if (cause instanceof HandshakeTimeoutException) {
log.error("Disconnecting switch {}: failed to complete handshake",
getSwitchInfoString());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof ClosedChannelException) {
+ ctx.channel().close();
+ } else if (cause instanceof ClosedChannelException) {
log.debug("Channel for sw {} already closed", getSwitchInfoString());
- } else if (e.getCause() instanceof IOException) {
- if (!e.getCause().getMessage().equals(RESET_BY_PEER) &&
- !e.getCause().getMessage().equals(BROKEN_PIPE)) {
+ } else if (cause instanceof IOException) {
+ if (!cause.getMessage().equals(RESET_BY_PEER) &&
+ !cause.getMessage().equals(BROKEN_PIPE)) {
log.error("Disconnecting switch {} due to IO Error: {}",
- getSwitchInfoString(), e.getCause().getMessage());
+ getSwitchInfoString(), cause.getMessage());
if (log.isDebugEnabled()) {
// still print stack trace if debug is enabled
- log.debug("StackTrace for previous Exception: ", e.getCause());
+ log.debug("StackTrace for previous Exception: ", cause);
}
}
- ctx.getChannel().close();
- } else if (e.getCause() instanceof SwitchStateException) {
+ ctx.channel().close();
+ } else if (cause instanceof SwitchStateException) {
log.error("Disconnecting switch {} due to switch state error: {}",
- getSwitchInfoString(), e.getCause().getMessage());
+ getSwitchInfoString(), cause.getMessage());
if (log.isDebugEnabled()) {
// still print stack trace if debug is enabled
- log.debug("StackTrace for previous Exception: ", e.getCause());
+ log.debug("StackTrace for previous Exception: ", cause);
}
- ctx.getChannel().close();
- } else if (e.getCause() instanceof OFParseError) {
+ ctx.channel().close();
+ } else if (cause instanceof OFParseError) {
log.error("Disconnecting switch "
+ getSwitchInfoString() +
" due to message parse failure",
- e.getCause());
- ctx.getChannel().close();
- } else if (e.getCause() instanceof RejectedExecutionException) {
+ cause);
+ ctx.channel().close();
+ } else if (cause instanceof RejectedExecutionException) {
log.warn("Could not process message: queue full");
} else {
log.error("Error while processing message from switch "
+ getSwitchInfoString()
- + "state " + this.state, e.getCause());
- ctx.getChannel().close();
+ + "state " + this.state, cause);
+ ctx.channel().close();
}
}
@@ -1273,38 +1350,54 @@
return getSwitchInfoString();
}
- @Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+ protected void channelIdle(ChannelHandlerContext ctx,
+ IdleStateEvent e)
throws Exception {
- OFFactory factory = OFFactories.getFactory(ofVersion);
OFMessage m = factory.buildEchoRequest().build();
log.debug("Sending Echo Request on idle channel: {}",
- e.getChannel().getPipeline().getLast());
- e.getChannel().write(Collections.singletonList(m));
+ ctx.channel());
+ ctx.write(Collections.singletonList(m), ctx.voidPromise());
// XXX S some problems here -- echo request has no transaction id, and
// echo reply is not correlated to the echo request.
state.processIdle(this);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void userEventTriggered(ChannelHandlerContext ctx,
+ Object evt)
throws Exception {
- if (e.getMessage() instanceof List) {
- @SuppressWarnings("unchecked")
- List<OFMessage> msglist = (List<OFMessage>) e.getMessage();
+ if (evt instanceof IdleStateEvent) {
+ channelIdle(ctx, (IdleStateEvent) evt);
+ }
- for (OFMessage ofm : msglist) {
- // Do the actual packet processing
- state.processOFMessage(this, ofm);
+ super.userEventTriggered(ctx, evt);
+ }
+
+ // SimpleChannelInboundHandler without dependency to TypeParameterMatcher
+ @Override
+ public void channelRead(ChannelHandlerContext ctx,
+ Object msg) throws Exception {
+
+ boolean release = true;
+ try {
+ if (msg instanceof OFMessage) {
+ // channelRead0 inlined
+ state.processOFMessage(this, (OFMessage) msg);
+ } else {
+ release = false;
+ ctx.fireChannelRead(msg);
}
- } else {
- state.processOFMessage(this, (OFMessage) e.getMessage());
+ } finally {
+ if (release) {
+ ReferenceCountUtil.release(msg);
+ }
}
}
+
//*************************
// Channel utility methods
//*************************
@@ -1318,7 +1411,63 @@
}
private void dispatchMessage(OFMessage m) {
- sw.handleMessage(m);
+
+ if (dispatchBacklog.isEmpty()) {
+ if (!dispatchQueue.offer(m)) {
+ // queue full
+ channel.config().setAutoRead(false);
+ // put it on the head of backlog
+ dispatchBacklog.addFirst(m);
+ return;
+ }
+ } else {
+ dispatchBacklog.addLast(m);
+ }
+
+ while (!dispatchBacklog.isEmpty()) {
+ OFMessage msg = dispatchBacklog.pop();
+
+ if (!dispatchQueue.offer(msg)) {
+ // queue full
+ channel.config().setAutoRead(false);
+ // put it back to the head of backlog
+ dispatchBacklog.addFirst(msg);
+ return;
+ }
+ }
+
+
+ if (dispatcherHandle.isDone()) {
+ // dispatcher terminated for some reason, restart
+
+ dispatcherHandle = dispatcher.submit(() -> {
+ try {
+ List<OFMessage> msgs = new ArrayList<>();
+ for (;;) {
+ // wait for new message
+ OFMessage msg = dispatchQueue.take();
+ sw.handleMessage(msg);
+
+ while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
+ if (!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ }
+ msgs.forEach(sw::handleMessage);
+ msgs.clear();
+ }
+
+ if (!channel.config().isAutoRead()) {
+ channel.config().setAutoRead(true);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // interrupted. gracefully shutting down
+ return;
+ }
+
+ });
+ }
}
/**
@@ -1331,10 +1480,10 @@
return sw.toString();
}
String channelString;
- if (channel == null || channel.getRemoteAddress() == null) {
+ if (channel == null || channel.remoteAddress() == null) {
channelString = "?";
} else {
- channelString = channel.getRemoteAddress().toString();
+ channelString = channel.remoteAddress().toString();
}
String dpidString;
if (featuresReply == null) {
@@ -1378,8 +1527,8 @@
.buildHello()
.setXid(this.handshakeTransactionIds--)
.setElements(Collections.singletonList(hem));
- log.info("Sending {} Hello to {}", version, channel.getRemoteAddress());
- channel.write(Collections.singletonList(mb.build()));
+ log.info("Sending {} Hello to {}", version, channel.remoteAddress());
+ channel.writeAndFlush(Collections.singletonList(mb.build()));
}
/**
@@ -1387,12 +1536,11 @@
* @throws IOException
*/
private void sendHandshakeFeaturesRequestMessage() throws IOException {
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending FEATURES_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending FEATURES_REQUEST to {}", channel.remoteAddress());
OFMessage m = factory.buildFeaturesRequest()
.setXid(this.handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(m));
+ channel.writeAndFlush(Collections.singletonList(m));
}
/**
@@ -1401,8 +1549,7 @@
* @throws IOException
*/
private void sendHandshakeSetConfig() throws IOException {
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending CONFIG_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending CONFIG_REQUEST to {}", channel.remoteAddress());
List<OFMessage> msglist = new ArrayList<>(3);
// Ensure we receive the full packet via PacketIn
@@ -1431,7 +1578,7 @@
.setXid(this.handshakeTransactionIds--)
.build();
msglist.add(gcr);
- channel.write(msglist);
+ channel.writeAndFlush(msglist);
}
/**
@@ -1440,13 +1587,12 @@
*/
private void sendHandshakeDescriptionStatsRequest() throws IOException {
// Get Description to set switch-specific flags
- OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending DESC_STATS_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending DESC_STATS_REQUEST to {}", channel.remoteAddress());
OFDescStatsRequest dreq = factory
.buildDescStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(dreq));
+ channel.writeAndFlush(Collections.singletonList(dreq));
}
/**
@@ -1457,26 +1603,59 @@
private void sendMeterFeaturesRequest() throws IOException {
// Get meter features including the MaxMeters value available for the device
OFFactory factory = OFFactories.getFactory(ofVersion);
- log.debug("Sending METER_FEATURES_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending METER_FEATURES_REQUEST to {}", channel.remoteAddress());
OFMeterFeaturesStatsRequest mfreq = factory
.buildMeterFeaturesStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(mfreq));
+ channel.writeAndFlush(Collections.singletonList(mfreq));
}
private void sendHandshakeOFPortDescRequest() throws IOException {
- log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.getRemoteAddress());
+ log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.remoteAddress());
// Get port description for 1.3+ switch
- OFPortDescStatsRequest preq = OFFactories.getFactory(ofVersion)
+ OFPortDescStatsRequest preq = factory
.buildPortDescStatsRequest()
.setXid(handshakeTransactionIds--)
.build();
- channel.write(Collections.singletonList(preq));
+ channel.writeAndFlush(Collections.singletonList(preq));
}
ChannelState getStateForTesting() {
return state;
}
+
+ @Override
+ public boolean isActive() {
+ if (channel != null) {
+ return channel.isActive();
+ }
+ return false;
+ }
+
+ @Override
+ public void closeSession() {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+
+ @Override
+ public boolean sendMsg(Iterable<OFMessage> msgs) {
+ if (channel.isActive()) {
+ channel.writeAndFlush(msgs, channel.voidPromise());
+ return true;
+ } else {
+ log.warn("Dropping messages for switch {} because channel is not connected: {}",
+ getSwitchInfoString(), msgs);
+ return false;
+ }
+ }
+
+ @Override
+ public CharSequence sessionInfo() {
+ return channelId;
+ }
+
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
new file mode 100644
index 0000000..4e6cca5
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.openflow.controller.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Creates a ChannelInitializer for a server-side openflow channel.
+ */
+public class OFChannelInitializer
+ extends ChannelInitializer<SocketChannel> {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+ private final SSLContext sslContext;
+ protected Controller controller;
+ protected EventExecutorGroup pipelineExecutor;
+
+ public OFChannelInitializer(Controller controller,
+ EventExecutorGroup pipelineExecutor,
+ SSLContext sslContext) {
+ super();
+ this.controller = controller;
+ this.pipelineExecutor = pipelineExecutor;
+ this.sslContext = sslContext;
+ }
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+
+ OFChannelHandler handler = new OFChannelHandler(controller);
+
+ ChannelPipeline pipeline = ch.pipeline();
+ if (sslContext != null) {
+ log.info("OpenFlow SSL enabled.");
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+
+ sslEngine.setNeedClientAuth(true);
+ sslEngine.setUseClientMode(false);
+ sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
+ sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
+ sslEngine.setEnableSessionCreation(true);
+
+ SslHandler sslHandler = new SslHandler(sslEngine);
+ pipeline.addLast("ssl", sslHandler);
+ } else {
+ log.debug("OpenFlow SSL disabled.");
+ }
+ pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
+ pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
+
+ pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
+ pipeline.addLast("timeout", new ReadTimeoutHandler(30));
+
+ // XXX S ONOS: was 15 increased it to fix Issue #296
+ pipeline.addLast("handshaketimeout",
+ new HandshakeTimeoutHandler(handler, 60));
+ // ExecutionHandler equivalent now part of Netty core
+ if (pipelineExecutor != null) {
+ pipeline.addLast(pipelineExecutor, "handler", handler);
+ } else {
+ pipeline.addLast("handler", handler);
+ }
+ }
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
index c5cb9f7..14894f2 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
@@ -18,30 +18,45 @@
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.List;
+
import org.projectfloodlight.openflow.protocol.OFFactories;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFMessageReader;
+import org.slf4j.Logger;
/**
- * Decode an openflow message from a Channel, for use in a netty pipeline.
+ * Decode an openflow message from a netty channel, for use in a netty pipeline.
*/
-public class OFMessageDecoder extends FrameDecoder {
+public final class OFMessageDecoder extends ByteToMessageDecoder {
+
+ private static final Logger log = getLogger(OFMessageDecoder.class);
+
+ public static OFMessageDecoder getInstance() {
+ // not Sharable
+ return new OFMessageDecoder();
+ }
+
+ private OFMessageDecoder() {}
+
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel,
- ChannelBuffer buffer) throws Exception {
- if (!channel.isConnected()) {
+ protected void decode(ChannelHandlerContext ctx,
+ ByteBuf byteBuf,
+ List<Object> out) throws Exception {
+
+ if (!ctx.channel().isActive()) {
// In testing, I see decode being called AFTER decode last.
// This check avoids that from reading corrupted frames
- return null;
+ return;
}
- // Note that a single call to decode results in reading a single
+ // Note that a single call to readFrom results in reading a single
// OFMessage from the channel buffer, which is passed on to, and processed
// by, the controller (in OFChannelHandler).
// This is different from earlier behavior (with the original openflowj),
@@ -50,16 +65,11 @@
// The performance *may or may not* not be as good as before.
OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
- //toByteBuffer is optimized to avoid copying.
- ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.toByteBuffer());
OFMessage message = reader.readFrom(byteBuf);
-
- if (message != null) {
- //set buffer's read index, as it has not been changed.
- buffer.readerIndex(buffer.readerIndex() + byteBuf.readerIndex());
+ while (message != null) {
+ out.add(message);
+ message = reader.readFrom(byteBuf);
}
-
- return message;
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
index 727301f..4dd8009 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
@@ -16,49 +16,81 @@
package org.onosproject.openflow.controller.impl;
-import java.util.List;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.EncoderException;
+import static org.slf4j.LoggerFactory.getLogger;
+
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
/**
- * Encode an openflow message for output into a ChannelBuffer, for use in a
+ * Encode an openflow message for output into a netty channel, for use in a
* netty pipeline.
*/
-public class OFMessageEncoder extends OneToOneEncoder {
+@Sharable
+public final class OFMessageEncoder extends ChannelOutboundHandlerAdapter {
+ private static final Logger log = getLogger(OFMessageEncoder.class);
+
+ private static final OFMessageEncoder INSTANCE = new OFMessageEncoder();
+
+ public static OFMessageEncoder getInstance() {
+ return INSTANCE;
+ }
+
+ private OFMessageEncoder() {}
+
+ protected final void encode(ChannelHandlerContext ctx,
+ Iterable<OFMessage> msgs,
+ ByteBuf out) throws Exception {
+
+ msgs.forEach(msg -> msg.writeTo(out));
+ }
+
+ // MessageToByteEncoder without dependency to TypeParameterMatcher
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel,
- Object msg) throws Exception {
- if (!(msg instanceof List)) {
- return msg;
- }
+ public void write(ChannelHandlerContext ctx,
+ Object msg,
+ ChannelPromise promise) throws Exception {
- @SuppressWarnings("unchecked")
- List<OFMessage> msglist = (List<OFMessage>) msg;
- /* XXX S can't get length of OFMessage in loxigen's openflowj??
- int size = 0;
- for (OFMessage ofm : msglist) {
- size += ofm.getLengthU();
- }*/
+ ByteBuf buf = null;
+ try {
+ if (msg instanceof Iterable) {
+ @SuppressWarnings("unchecked")
+ Iterable<OFMessage> ofmsgs = (Iterable<OFMessage>) msg;
+ buf = ctx.alloc().ioBuffer();
- ByteBuf bb = Unpooled.buffer();
+ encode(ctx, ofmsgs, buf);
- for (OFMessage ofm : msglist) {
- if (ofm != null) {
- ofm.writeTo(bb);
+ if (buf.isReadable()) {
+ ctx.write(buf, promise);
+ } else {
+ log.warn("NOTHING WAS WRITTEN for {}", msg);
+ buf.release();
+ ctx.write(Unpooled.EMPTY_BUFFER, promise);
+ }
+ buf = null;
+
+ } else {
+ log.warn("Attempted to encode unexpected message: {}", msg);
+ ctx.write(msg, promise);
+ }
+ } catch (EncoderException e) {
+ log.error("EncoderException handling {}", msg, e);
+ throw e;
+ } catch (Throwable e) {
+ log.error("Exception handling {}", msg, e);
+ throw new EncoderException(e);
+ } finally {
+ if (buf != null) {
+ buf.release();
}
}
-
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bb.nioBuffer());
-
- return buf;
}
}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index cd4548d..21f2c0f 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -289,9 +289,9 @@
write(dpid, msg);
ConcurrentMap<Long, CompletableFuture<OFMessage>> xids =
- responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap());
+ responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap<>());
- CompletableFuture<OFMessage> future = new CompletableFuture();
+ CompletableFuture<OFMessage> future = new CompletableFuture<>();
xids.put(msg.getXid(), future);
return future;
@@ -303,14 +303,16 @@
Collection<OFTableStatsEntry> tableStats;
Collection<OFGroupStatsEntry> groupStats;
Collection<OFGroupDescStatsEntry> groupDescStats;
- Collection<OFPortStatsEntry> portStats;
OpenFlowSwitch sw = this.getSwitch(dpid);
// Check if someone is waiting for this message
ConcurrentMap<Long, CompletableFuture<OFMessage>> xids = responses.get(dpid);
- if (xids != null && xids.containsKey(msg.getXid())) {
- xids.remove(msg.getXid()).complete(msg);
+ if (xids != null) {
+ CompletableFuture<OFMessage> future = xids.remove(msg.getXid());
+ if (future != null) {
+ future.complete(msg);
+ }
}
switch (msg.getType()) {
@@ -326,7 +328,7 @@
break;
case PACKET_IN:
if (sw == null) {
- log.error("Switch {} is not found", dpid);
+ log.error("Ignoring PACKET_IN, switch {} is not found", dpid);
break;
}
OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java
deleted file mode 100644
index c94d315..0000000
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.openflow.controller.impl;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.ExternalResourceReleasable;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import static org.onlab.util.Tools.groupedThreads;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * Creates a ChannelPipeline for a server-side openflow channel.
- */
-public class OpenflowPipelineFactory
- implements ChannelPipelineFactory, ExternalResourceReleasable {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
-
- private final SSLContext sslContext;
- protected Controller controller;
- protected ThreadPoolExecutor pipelineExecutor;
- protected Timer timer;
- protected IdleStateHandler idleHandler;
- protected ReadTimeoutHandler readTimeoutHandler;
-
- public OpenflowPipelineFactory(Controller controller,
- ThreadPoolExecutor pipelineExecutor,
- SSLContext sslContext) {
- super();
- this.controller = controller;
- this.pipelineExecutor = pipelineExecutor;
- this.timer = new HashedWheelTimer(groupedThreads("OpenflowPipelineFactory", "timer-%d", log));
- this.idleHandler = new IdleStateHandler(timer, 20, 25, 0);
- this.readTimeoutHandler = new ReadTimeoutHandler(timer, 30);
- this.sslContext = sslContext;
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- OFChannelHandler handler = new OFChannelHandler(controller);
-
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslContext != null) {
- log.debug("OpenFlow SSL enabled.");
- SSLEngine sslEngine = sslContext.createSSLEngine();
-
- sslEngine.setNeedClientAuth(true);
- sslEngine.setUseClientMode(false);
- sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
- sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
- sslEngine.setEnableSessionCreation(true);
-
- SslHandler sslHandler = new SslHandler(sslEngine);
- pipeline.addLast("ssl", sslHandler);
- } else {
- log.debug("OpenFlow SSL disabled.");
- }
- pipeline.addLast("ofmessagedecoder", new OFMessageDecoder());
- pipeline.addLast("ofmessageencoder", new OFMessageEncoder());
- pipeline.addLast("idle", idleHandler);
- pipeline.addLast("timeout", readTimeoutHandler);
- // XXX S ONOS: was 15 increased it to fix Issue #296
- pipeline.addLast("handshaketimeout",
- new HandshakeTimeoutHandler(handler, timer, 60));
- if (pipelineExecutor != null) {
- pipeline.addLast("pipelineExecutor",
- new ExecutionHandler(pipelineExecutor));
- }
- pipeline.addLast("handler", handler);
- return pipeline;
- }
-
- @Override
- public void releaseExternalResources() {
- timer.stop();
- }
-}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
new file mode 100644
index 0000000..a786211
--- /dev/null
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow;
+
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
+/**
+ * Dummy Channel for testing.
+ */
+public class ChannelAdapter implements Channel {
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+
+ return null;
+ }
+
+ @Override
+ public int compareTo(Channel o) {
+ return 0;
+ }
+
+ @Override
+ public EventLoop eventLoop() {
+ return null;
+ }
+
+ @Override
+ public Channel parent() {
+ return null;
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture closeFuture() {
+ return null;
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public Channel read() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public Channel flush() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+
+ return false;
+ }
+
+ @Override
+ public ChannelId id() {
+
+ return null;
+ }
+
+ @Override
+ public long bytesBeforeUnwritable() {
+
+ return 0;
+ }
+
+ @Override
+ public long bytesBeforeWritable() {
+
+ return 0;
+ }
+
+}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
index 4b5afb7..2fcc6b9 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
@@ -15,63 +15,272 @@
*/
package org.onosproject.openflow;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
/**
* Adapter for testing against a netty channel handler context.
*/
public class ChannelHandlerContextAdapter implements ChannelHandlerContext {
+
@Override
- public Channel getChannel() {
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+
return null;
}
@Override
- public ChannelPipeline getPipeline() {
+ public Channel channel() {
+
+ return new ChannelAdapter();
+ }
+
+ @Override
+ public EventExecutor executor() {
+
return null;
}
@Override
- public String getName() {
+ public String name() {
+
return null;
}
@Override
- public ChannelHandler getHandler() {
+ public ChannelHandler handler() {
+
return null;
}
@Override
- public boolean canHandleUpstream() {
+ public boolean isRemoved() {
+
return false;
}
@Override
- public boolean canHandleDownstream() {
- return false;
- }
+ public ChannelHandlerContext fireChannelRegistered() {
- @Override
- public void sendUpstream(ChannelEvent channelEvent) {
-
- }
-
- @Override
- public void sendDownstream(ChannelEvent channelEvent) {
-
- }
-
- @Override
- public Object getAttachment() {
return null;
}
@Override
- public void setAttachment(Object o) {
+ public ChannelHandlerContext fireChannelUnregistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelActive() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelInactive() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireUserEventTriggered(Object event) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRead(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelReadComplete() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelWritabilityChanged() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext read() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext flush() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
}
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
index 74751db..0083038 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
@@ -15,11 +15,11 @@
*/
package org.onosproject.openflow;
-import org.jboss.netty.channel.Channel;
import org.onosproject.net.Device;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverHandler;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.RoleState;
import org.onosproject.openflow.controller.driver.OpenFlowAgent;
import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
@@ -147,7 +147,7 @@
}
@Override
- public void setChannel(Channel channel) {
+ public void setChannel(OpenFlowSession channel) {
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
index 6c109ae..f2617cd 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
@@ -16,38 +16,47 @@
package org.onosproject.openflow.controller.impl;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.onosproject.core.netty.ChannelAdapter;
+import org.onosproject.openflow.ChannelAdapter;
import org.onosproject.openflow.ChannelHandlerContextAdapter;
import org.projectfloodlight.openflow.protocol.OFHello;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.is;
+import java.util.ArrayList;
+import java.util.List;
/**
* Tests for the OpenFlow message decoder.
*/
public class OFMessageDecoderTest {
- static class ConnectedChannel extends ChannelAdapter {
- @Override
- public boolean isConnected() {
- return true;
- }
- }
+ private ByteBuf buf;
- private ChannelBuffer getHelloMessageBuffer() {
+ private ByteBuf getHelloMessageBuffer() {
// OFHello, OF version 1, xid of 0, total of 8 bytes
byte[] messageData = {0x1, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0};
- ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
- channelBuffer.writeBytes(messageData);
- return channelBuffer;
+ buf.writeBytes(messageData);
+ return buf;
}
+ @Before
+ public void setUp() {
+ buf = ByteBufAllocator.DEFAULT.buffer();
+ }
+
+ @After
+ public void tearDown() {
+ buf.release();
+ }
+
+
/**
* Tests decoding a message on a closed channel.
*
@@ -55,13 +64,13 @@
*/
@Test
public void testDecodeNoChannel() throws Exception {
- OFMessageDecoder decoder = new OFMessageDecoder();
- ChannelBuffer channelBuffer = getHelloMessageBuffer();
- Object message =
- decoder.decode(new ChannelHandlerContextAdapter(),
- new ChannelAdapter(),
- channelBuffer);
- assertThat(message, nullValue());
+ OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+ ByteBuf channelBuffer = getHelloMessageBuffer();
+ List<Object> out = new ArrayList<>();
+ decoder.decode(new ChannelHandlerContextAdapter(),
+ channelBuffer,
+ out);
+ assertThat(out.size(), is(0));
}
/**
@@ -71,14 +80,29 @@
*/
@Test
public void testDecode() throws Exception {
- OFMessageDecoder decoder = new OFMessageDecoder();
- ChannelBuffer channelBuffer = getHelloMessageBuffer();
- Object message =
- decoder.decode(new ChannelHandlerContextAdapter(),
- new ConnectedChannel(),
- channelBuffer);
- assertThat(message, notNullValue());
- assertThat(message, instanceOf(OFHello.class));
+ OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+ ByteBuf channelBuffer = getHelloMessageBuffer();
+ List<Object> out = new ArrayList<>();
+ decoder.decode(new ActiveChannelHandlerContextAdapter(),
+ channelBuffer,
+ out);
+ assertThat(out.size(), is(1));
+ assertThat(out.get(0), instanceOf(OFHello.class));
+ }
+
+ public class ActiveChannelHandlerContextAdapter
+ extends ChannelHandlerContextAdapter {
+
+ @Override
+ public Channel channel() {
+ return new ChannelAdapter() {
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+ };
+ }
+
}
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
index e62345c..636c568 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
@@ -16,26 +16,27 @@
package org.onosproject.openflow.controller.impl;
import java.nio.charset.StandardCharsets;
-import java.util.List;
+import java.util.Collections;
import io.netty.buffer.ByteBuf;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBufAllocator;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.OfMessageAdapter;
-import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFType;
-import com.google.common.collect.ImmutableList;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
/**
* Tests for the OpenFlow message encoder.
*/
public class OFMessageEncoderTest {
+ private ByteBuf buf;
static class MockOfMessage extends OfMessageAdapter {
static int nextId = 1;
final int id;
@@ -52,40 +53,26 @@
}
}
- /**
- * Tests that encoding a non-list returns the object specified.
- *
- * @throws Exception on exception in the encoder
- */
- @Test
- public void testNoList() throws Exception {
- OFMessageEncoder encoder = new OFMessageEncoder();
- MockOfMessage message = new MockOfMessage();
- OFMessage returnedMessage =
- (OFMessage) encoder.encode(null, null, message);
- assertThat(message, is(returnedMessage));
+ @Before
+ public void setUp() {
+ buf = ByteBufAllocator.DEFAULT.buffer();
}
- /**
- * Tests that encoding a list returns the proper encoded payload.
- *
- * @throws Exception on exception in the encoder
- */
+ @After
+ public void tearDown() {
+ buf.release();
+ }
+
@Test
- public void testList() throws Exception {
- OFMessageEncoder encoder = new OFMessageEncoder();
+ public void testEncode() throws Exception {
+ OFMessageEncoder encoder = OFMessageEncoder.getInstance();
MockOfMessage message1 = new MockOfMessage();
- MockOfMessage message2 = new MockOfMessage();
- MockOfMessage message3 = new MockOfMessage();
- List<MockOfMessage> messages = ImmutableList.of(message1, message2, message3);
- ChannelBuffer returnedChannel =
- (ChannelBuffer) encoder.encode(null, null, messages);
- assertThat(returnedChannel, notNullValue());
- byte[] channelBytes = returnedChannel.array();
- String expectedListMessage = "message1 message2 message3 ";
- String listMessage =
- (new String(channelBytes, StandardCharsets.UTF_8))
- .substring(0, expectedListMessage.length());
- assertThat(listMessage, is(expectedListMessage));
+ encoder.encode(null, Collections.singletonList(message1), buf);
+
+ assertThat(buf.isReadable(), Matchers.is(true));
+ byte[] channelBytes = new byte[buf.readableBytes()];
+ buf.readBytes(channelBytes);
+ String expectedListMessage = "message1 ";
+ assertThat(channelBytes, is(expectedListMessage.getBytes()));
}
}