netty4 OpenFlow southbound

- separate I/O thread and message dispatch threads

Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/BUCK b/protocols/openflow/ctl/BUCK
index 62f40af..fb7667f 100644
--- a/protocols/openflow/ctl/BUCK
+++ b/protocols/openflow/ctl/BUCK
@@ -1,7 +1,13 @@
 COMPILE_DEPS = [
     '//lib:CORE_DEPS',
     '//lib:openflowj',
-    '//protocols/openflow/api:onos-protocols-openflow-api'
+    '//protocols/openflow/api:onos-protocols-openflow-api',
+    '//lib:netty-buffer',
+    '//lib:netty-codec',
+    '//lib:netty-common',
+    '//lib:netty-handler',
+    '//lib:netty-transport',
+    '//lib:netty-transport-native-epoll',
 ]
 
 TEST_DEPS = [
diff --git a/protocols/openflow/ctl/pom.xml b/protocols/openflow/ctl/pom.xml
index 7dcd69d..baccf7c 100644
--- a/protocols/openflow/ctl/pom.xml
+++ b/protocols/openflow/ctl/pom.xml
@@ -20,29 +20,56 @@
 
     <parent>
         <groupId>org.onosproject</groupId>
-        <artifactId>onos-of</artifactId>
+        <artifactId>onos-protocols-openflow</artifactId>
         <version>1.11.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>onos-of-ctl</artifactId>
+    <artifactId>onos-protocols-openflow-ctl</artifactId>
     <packaging>bundle</packaging>
 
     <description>ONOS OpenFlow controller subsystem API</description>
 
     <dependencies>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty4.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-epoll</artifactId>
+            <version>${netty4.version}</version>
+            <classifier>linux-x86_64</classifier>
+        </dependency>
+<!--
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-kqueue</artifactId>
+            <version>${netty4.version}</version>
+        </dependency>
+ -->
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-of-api</artifactId>
+            <artifactId>onos-protocols-openflow-api</artifactId>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.onosproject</groupId>
             <artifactId>openflowj</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
@@ -68,6 +95,38 @@
                 <groupId>org.onosproject</groupId>
                 <artifactId>onos-maven-plugin</artifactId>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+
+                <!--
+                    Netty native code related utils seems to assume,
+                    Native related loading uses same class loader(~=bundle)
+                    Embed inline required slice of netty-common and native-transport
+                 -->
+                <configuration>
+                    <instructions combine.children="append">
+                        <DynamicImport-Package>*</DynamicImport-Package>
+                        <Embed-Dependency>
+                            netty-transport-native-epoll;inline=true,
+                            *;artifactId=netty-common;inline=io/netty/util/internal/*,
+                        </Embed-Dependency>
+                        <Embed-Transitive>false</Embed-Transitive>
+                        <Bundle-NativeCode>
+                            META-INF/native/libnetty-transport-native-epoll.so;osname=linux;processor=x86_64,
+                            *
+                        </Bundle-NativeCode>
+                        <Import-Package>
+                        sun.misc;resolution:=optional,
+                        org.apache.commons.logging;resolution:=optional,
+                        org.apache.logging.log4j;resolution:=optional,
+                        *
+                        </Import-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+
         </plugins>
     </build>
 
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
index b854898..f328ae7 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/Controller.java
@@ -18,11 +18,17 @@
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import org.onlab.util.ItemNotFoundException;
 import org.onosproject.net.DeviceId;
@@ -46,13 +52,11 @@
 import java.io.FileInputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
-import java.net.InetSocketAddress;
 import java.security.KeyStore;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -96,7 +100,8 @@
 
     private OpenFlowAgent agent;
 
-    private NioServerSocketChannelFactory execFactory;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
 
     protected String ksLocation;
     protected String tsLocation;
@@ -144,20 +149,19 @@
 
         try {
             final ServerBootstrap bootstrap = createServerBootStrap();
+            bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
+            bootstrap.childOption(ChannelOption.SO_SNDBUF, Controller.SEND_BUFFER_SIZE);
+//            bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+//                                  new WriteBufferWaterMark(8 * 1024, 32 * 1024));
 
-            bootstrap.setOption("reuseAddr", true);
-            bootstrap.setOption("child.keepAlive", true);
-            bootstrap.setOption("child.tcpNoDelay", true);
-            bootstrap.setOption("child.sendBufferSize", Controller.SEND_BUFFER_SIZE);
+            bootstrap.childHandler(new OFChannelInitializer(this, null, sslContext));
 
-            ChannelPipelineFactory pfact =
-                    new OpenflowPipelineFactory(this, null, sslContext);
-            bootstrap.setPipelineFactory(pfact);
-            cg = new DefaultChannelGroup();
             openFlowPorts.forEach(port -> {
-                InetSocketAddress sa = new InetSocketAddress(port);
-                cg.add(bootstrap.bind(sa));
-                log.info("Listening for switch connections on {}", sa);
+                // TODO revisit if this is best way to listen to multiple ports
+                cg.add(bootstrap.bind(port).syncUninterruptibly().channel());
+                log.info("Listening for switch connections on {}", port);
             });
 
         } catch (Exception e) {
@@ -168,20 +172,43 @@
 
     private ServerBootstrap createServerBootStrap() {
 
-        if (workerThreads == 0) {
-            execFactory = new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
-                    Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)));
-            return new ServerBootstrap(execFactory);
-        } else {
-            execFactory = new NioServerSocketChannelFactory(
-                    Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
-                    Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)), workerThreads);
-            return new ServerBootstrap(execFactory);
+        int bossThreads = Math.max(1, openFlowPorts.size());
+        try {
+            bossGroup = new EpollEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+            workerGroup = new EpollEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+            ServerBootstrap bs = new ServerBootstrap()
+                    .group(bossGroup, workerGroup)
+                    .channel(EpollServerSocketChannel.class);
+            log.info("Using Epoll transport");
+            return bs;
+        } catch (Throwable e) {
+            log.debug("Failed to initialize native (epoll) transport: {}", e.getMessage());
         }
+
+// Requires 4.1.11 or later
+//        try {
+//            bossGroup = new KQueueEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+//            workerGroup = new KQueueEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+//            ServerBootstrap bs = new ServerBootstrap()
+//                    .group(bossGroup, workerGroup)
+//                    .channel(KQueueServerSocketChannel.class);
+//            log.info("Using Kqueue transport");
+//            return bs;
+//        } catch (Throwable e) {
+//            log.debug("Failed to initialize native (kqueue) transport. ", e.getMessage());
+//        }
+
+        bossGroup = new NioEventLoopGroup(bossThreads, groupedThreads("onos/of", "boss-%d", log));
+        workerGroup = new NioEventLoopGroup(workerThreads, groupedThreads("onos/of", "worker-%d", log));
+        log.info("Using Nio transport");
+        return new ServerBootstrap()
+                    .group(bossGroup, workerGroup)
+                    .channel(NioServerSocketChannel.class);
     }
 
     public void setConfigParams(Dictionary<?, ?> properties) {
+        // TODO should be possible to reconfigure ports without restart,
+        // by updating ChannelGroup
         String ports = get(properties, "openflowPorts");
         if (!Strings.isNullOrEmpty(ports)) {
             this.openFlowPorts = Stream.of(ports.split(","))
@@ -207,6 +234,8 @@
 
         this.systemStartTime = System.currentTimeMillis();
 
+        cg = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
         try {
             getTlsParameters();
             if (enableOfTls) {
@@ -303,11 +332,11 @@
         }
 
         if (driver == null) {
-            log.error("No OpenFlow driver for {} : {}", dpid, desc);
+            log.error("No OpenFlow driver for {} : {}", dpidObj, desc);
             return null;
         }
 
-        log.info("Driver {} assigned to device {}", driver.name(), dpidObj);
+        log.info("Driver '{}' assigned to device {}", driver.name(), dpidObj);
 
         if (!driver.hasBehaviour(OpenFlowSwitchDriver.class)) {
             log.error("Driver {} does not support OpenFlowSwitchDriver behaviour", driver.name());
@@ -321,7 +350,6 @@
         ofSwitchDriver.init(dpidObj, desc, ofv);
         ofSwitchDriver.setAgent(agent);
         ofSwitchDriver.setRoleHandler(new RoleManager(ofSwitchDriver));
-        log.info("OpenFlow handshaker found for device {}: {}", dpid, ofSwitchDriver);
         return ofSwitchDriver;
     }
 
@@ -337,7 +365,19 @@
     public void stop() {
         log.info("Stopping OpenFlow IO");
         cg.close();
-        execFactory.shutdown();
+
+        // Shut down all event loops to terminate all threads.
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+
+        // Wait until all threads are terminated.
+        try {
+            bossGroup.terminationFuture().sync();
+            workerGroup.terminationFuture().sync();
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while stopping", e);
+            Thread.currentThread().interrupt();
+        }
     }
 
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
index 43adf19..0b7f430 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/HandshakeTimeoutHandler.java
@@ -16,78 +16,82 @@
 
 package org.onosproject.openflow.controller.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 import java.util.concurrent.TimeUnit;
 
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.Timer;
-import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
 
 /**
  * Trigger a timeout if a switch fails to complete handshake soon enough.
  */
 public class HandshakeTimeoutHandler
-    extends SimpleChannelUpstreamHandler {
-    static final HandshakeTimeoutException EXCEPTION =
-            new HandshakeTimeoutException();
+    extends ChannelDuplexHandler {
+
+    private static final Logger log = getLogger(HandshakeTimeoutHandler.class);
 
     final OFChannelHandler channelHandler;
-    final Timer timer;
-    final long timeoutNanos;
-    volatile Timeout timeout;
+    final long timeoutMillis;
+    volatile long deadline;
 
     public HandshakeTimeoutHandler(OFChannelHandler channelHandler,
-                                   Timer timer,
                                    long timeoutSeconds) {
         super();
         this.channelHandler = channelHandler;
-        this.timer = timer;
-        this.timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSeconds);
-
+        this.timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSeconds);
+        this.deadline = System.currentTimeMillis() + timeoutMillis;
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception {
-        if (timeoutNanos > 0) {
-            timeout = timer.newTimeout(new HandshakeTimeoutTask(ctx),
-                                       timeoutNanos, TimeUnit.NANOSECONDS);
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        if (timeoutMillis > 0) {
+            // set Handshake deadline
+            deadline = System.currentTimeMillis() + timeoutMillis;
         }
-        ctx.sendUpstream(e);
+        super.channelActive(ctx);
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception {
-        if (timeout != null) {
-            timeout.cancel();
-            timeout = null;
-        }
+    public void read(ChannelHandlerContext ctx) throws Exception {
+        checkTimeout(ctx);
+        super.read(ctx);
     }
 
-    private final class HandshakeTimeoutTask implements TimerTask {
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg,
+                      ChannelPromise promise)
+            throws Exception {
+        checkTimeout(ctx);
+        super.write(ctx, msg, promise);
+    }
 
-        private final ChannelHandlerContext ctx;
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx,
+                                   Object evt)
+            throws Exception {
 
-        HandshakeTimeoutTask(ChannelHandlerContext ctx) {
-            this.ctx = ctx;
+        // expecting idle event
+        checkTimeout(ctx);
+        super.userEventTriggered(ctx, evt);
+    }
+
+    void checkTimeout(ChannelHandlerContext ctx) {
+        if (channelHandler.isHandshakeComplete()) {
+            // handshake complete, Handshake monitoring timeout no-longer needed
+            ctx.channel().pipeline().remove(this);
+            return;
         }
 
-        @Override
-        public void run(Timeout t) throws Exception {
-            if (t.isCancelled()) {
-                return;
-            }
+        if (!ctx.channel().isActive()) {
+            return;
+        }
 
-            if (!ctx.getChannel().isOpen()) {
-                return;
-            }
-            if (!channelHandler.isHandshakeComplete()) {
-                Channels.fireExceptionCaught(ctx, EXCEPTION);
-            }
+        if (System.currentTimeMillis() > deadline) {
+            log.info("Handshake time out {}", channelHandler);
+            ctx.fireExceptionCaught(new HandshakeTimeoutException());
         }
     }
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 100203f..b2d77cf 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,24 +16,29 @@
 
 package org.onosproject.openflow.controller.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
+
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.packet.IpAddress;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
 import org.onosproject.openflow.controller.driver.SwitchStateException;
 import org.projectfloodlight.openflow.exceptions.OFParseError;
@@ -78,11 +83,20 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
+
 /**
  * Channel handler deals with the switch connection and dispatches
  * switch messages to the appropriate locations.
  */
-class OFChannelHandler extends IdleStateAwareChannelHandler {
+class OFChannelHandler extends ChannelInboundHandlerAdapter
+    implements OpenFlowSession {
+
     private static final Logger log = LoggerFactory.getLogger(OFChannelHandler.class);
 
     private static final String RESET_BY_PEER = "Connection reset by peer";
@@ -91,7 +105,11 @@
     private final Controller controller;
     private OpenFlowSwitchDriver sw;
     private long thisdpid; // channelHandler cached value of connected switch id
+
     private Channel channel;
+    private String channelId;
+
+
     // State needs to be volatile because the HandshakeTimeoutHandler
     // needs to check if the handshake is complete
     private volatile ChannelState state;
@@ -121,6 +139,7 @@
 
     //Indicates the openflow version used by this switch
     protected OFVersion ofVersion;
+    protected OFFactory factory;
 
     // deprecated in 1.10.0
     @Deprecated
@@ -135,11 +154,43 @@
      */
     private int handshakeTransactionIds = -1;
 
+
+
+    private static final int MSG_READ_BUFFER = 5000;
+
+    /**
+     * OFMessage dispatch queue.
+     */
+    private final BlockingQueue<OFMessage> dispatchQueue =
+            new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+
+    /**
+     * Single thread executor for OFMessage dispatching.
+     *
+     * Gets initialized on channelActive, shutdown on channelInactive.
+     */
+    private ExecutorService dispatcher;
+
+    /**
+     * Handle for dispatcher thread.
+     * <p>
+     * Should only be touched from the Channel I/O thread
+     */
+    private Future<?> dispatcherHandle = CompletableFuture.completedFuture(null);
+
+    /**
+     * Dispatch backlog.
+     * <p>
+     * Should only be touched from the Channel I/O thread
+     */
+    private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+
     /**
      * Create a new unconnected OFChannelHandler.
      * @param controller parent controller
      */
     OFChannelHandler(Controller controller) {
+
         this.controller = controller;
         this.state = ChannelState.INIT;
         this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
@@ -208,30 +259,33 @@
                 if (m.getVersion().getWireVersion() >= OFVersion.OF_13.getWireVersion()) {
                     log.debug("Received {} Hello from {} - switching to OF "
                             + "version 1.3+", m.getVersion(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     h.ofVersion = m.getVersion();
+                    h.factory = OFFactories.getFactory(h.ofVersion);
                     h.sendHandshakeHelloMessage();
                 } else if (m.getVersion().getWireVersion() >= OFVersion.OF_10.getWireVersion()) {
                     log.debug("Received {} Hello from {} - switching to OF "
                             + "version 1.0", m.getVersion(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     h.ofVersion = m.getVersion();
+                    h.factory = OFFactories.getFactory(h.ofVersion);
                     OFHello hi =
-                            OFFactories.getFactory(h.ofVersion).buildHello()
+                            h.factory.buildHello()
                                     .setXid(h.handshakeTransactionIds--)
                                     .build();
-                    h.channel.write(Collections.singletonList(hi));
+                    h.channel.writeAndFlush(Collections.singletonList(hi));
                 } else {
                     log.error("Received Hello of version {} from switch at {}. "
                             + "This controller works with OF1.0 and OF1.3 "
                             + "switches. Disconnecting switch ...",
-                            m.getVersion(), h.channel.getRemoteAddress());
+                            m.getVersion(), h.channel.remoteAddress());
                     h.channel.disconnect();
                     return;
                 }
                 h.sendHandshakeFeaturesRequestMessage();
                 h.setState(WAIT_FEATURES_REPLY);
             }
+
             @Override
             void processOFFeaturesReply(OFChannelHandler h, OFFeaturesReply  m)
                     throws IOException, SwitchStateException {
@@ -313,7 +367,7 @@
                 if (m.getStatsType() != OFStatsType.PORT_DESC) {
                     log.warn("Expecting port description stats but received stats "
                             + "type {} from {}. Ignoring ...", m.getStatsType(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     return;
                 }
                 if (m.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
@@ -465,22 +519,21 @@
                 if (m.getStatsType() != OFStatsType.DESC) {
                     log.warn("Expecting Description stats but received stats "
                             + "type {} from {}. Ignoring ...", m.getStatsType(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     return;
                 }
                 OFDescStatsReply drep = (OFDescStatsReply) m;
                 log.info("Received switch description reply {} from switch at {}",
-                         drep, h.channel.getRemoteAddress());
+                         drep, h.channel.remoteAddress());
                 // Here is where we differentiate between different kinds of switches
                 h.sw = h.controller.getOFSwitchInstance(h.thisdpid, drep, h.ofVersion);
 
                 h.sw.setOFVersion(h.ofVersion);
                 h.sw.setFeaturesReply(h.featuresReply);
-                //h.sw.setPortDescReply(h.portDescReply);
                 h.sw.setPortDescReplies(h.portDescReplies);
                 h.sw.setMeterFeaturesReply(h.meterFeaturesReply);
                 h.sw.setConnected(true);
-                h.sw.setChannel(h.channel);
+                h.sw.setChannel(h);
 //                boolean success = h.sw.connectSwitch();
 //
 //                if (!success) {
@@ -1072,7 +1125,7 @@
                 throws IOException, SwitchStateException {
             // we only expect hello in the WAIT_HELLO state
             log.warn("Received Hello outside WAIT_HELLO state; switch {} is not complaint.",
-                     h.channel.getRemoteAddress());
+                     h.channel.remoteAddress());
         }
 
         void processOFBarrierReply(OFChannelHandler h, OFBarrierReply m)
@@ -1084,16 +1137,15 @@
                 throws IOException {
             if (h.ofVersion == null) {
                 log.error("No OF version set for {}. Not sending Echo REPLY",
-                        h.channel.getRemoteAddress());
+                        h.channel.remoteAddress());
                 return;
             }
-            OFFactory factory = OFFactories.getFactory(h.ofVersion);
-                    OFEchoReply reply = factory
-                            .buildEchoReply()
-                            .setXid(m.getXid())
-                            .setData(m.getData())
-                            .build();
-                    h.channel.write(Collections.singletonList(reply));
+            OFEchoReply reply = h.factory
+                    .buildEchoReply()
+                    .setXid(m.getXid())
+                    .setData(m.getData())
+                    .build();
+            h.channel.writeAndFlush(Collections.singletonList(reply));
         }
 
         void processOFEchoReply(OFChannelHandler h, OFEchoReply m)
@@ -1175,11 +1227,28 @@
     //*************************
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-            ChannelStateEvent e) throws Exception {
-        channel = e.getChannel();
+    public void channelActive(ChannelHandlerContext ctx)
+            throws Exception {
+
+        channel = ctx.channel();
         log.info("New switch connection from {}",
-                channel.getRemoteAddress());
+                 channel.remoteAddress());
+
+        SocketAddress address = channel.remoteAddress();
+        if (address instanceof InetSocketAddress) {
+            final InetSocketAddress inetAddress = (InetSocketAddress) address;
+            final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+            if (ipAddress.isIp4()) {
+                channelId = ipAddress.toString() + ':' + inetAddress.getPort();
+            } else {
+                channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
+            }
+        } else {
+            channelId = channel.toString();
+        }
+
+        dispatcher = Executors.newSingleThreadExecutor(groupedThreads("onos/of/dispatcher", channelId, log));
+
         /*
             hack to wait for the switch to tell us what it's
             max version is. This is not spec compliant and should
@@ -1190,81 +1259,89 @@
     }
 
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx,
-            ChannelStateEvent e) throws Exception {
+    public void channelInactive(ChannelHandlerContext ctx)
+            throws Exception {
+
         log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
-                getSwitchInfoString());
-        if (thisdpid != 0) {
-            if (!duplicateDpidFound) {
-                // if the disconnected switch (on this ChannelHandler)
-                // was not one with a duplicate-dpid, it is safe to remove all
-                // state for it at the controller. Notice that if the disconnected
-                // switch was a duplicate-dpid, calling the method below would clear
-                // all state for the original switch (with the same dpid),
-                // which we obviously don't want.
-                log.info("{}:removal called", getSwitchInfoString());
-                if (sw != null) {
-                    sw.removeConnectedSwitch();
-                }
-            } else {
-                // A duplicate was disconnected on this ChannelHandler,
-                // this is the same switch reconnecting, but the original state was
-                // not cleaned up - XXX check liveness of original ChannelHandler
-                log.info("{}:duplicate found", getSwitchInfoString());
-                duplicateDpidFound = Boolean.FALSE;
-            }
-        } else {
-            log.warn("no dpid in channelHandler registered for "
-                    + "disconnected switch {}", getSwitchInfoString());
+                 getSwitchInfoString());
+
+        if (dispatcher != null) {
+            dispatcher.shutdown();
         }
+
+         if (thisdpid != 0) {
+             if (!duplicateDpidFound) {
+                 // if the disconnected switch (on this ChannelHandler)
+                 // was not one with a duplicate-dpid, it is safe to remove all
+                 // state for it at the controller. Notice that if the disconnected
+                 // switch was a duplicate-dpid, calling the method below would clear
+                 // all state for the original switch (with the same dpid),
+                 // which we obviously don't want.
+                 log.info("{}:removal called", getSwitchInfoString());
+                 if (sw != null) {
+                     sw.removeConnectedSwitch();
+                 }
+             } else {
+                 // A duplicate was disconnected on this ChannelHandler,
+                 // this is the same switch reconnecting, but the original state was
+                 // not cleaned up - XXX check liveness of original ChannelHandler
+                 log.info("{}:duplicate found", getSwitchInfoString());
+                 duplicateDpidFound = Boolean.FALSE;
+             }
+         } else {
+             log.warn("no dpid in channelHandler registered for "
+                     + "disconnected switch {}", getSwitchInfoString());
+         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx,
+                                Throwable cause)
             throws Exception {
-        if (e.getCause() instanceof ReadTimeoutException) {
+
+        if (cause instanceof ReadTimeoutException) {
             // switch timeout
             log.error("Disconnecting switch {} due to read timeout",
                     getSwitchInfoString());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof HandshakeTimeoutException) {
+            ctx.channel().close();
+        } else if (cause instanceof HandshakeTimeoutException) {
             log.error("Disconnecting switch {}: failed to complete handshake",
                     getSwitchInfoString());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof ClosedChannelException) {
+            ctx.channel().close();
+        } else if (cause instanceof ClosedChannelException) {
             log.debug("Channel for sw {} already closed", getSwitchInfoString());
-        } else if (e.getCause() instanceof IOException) {
-            if (!e.getCause().getMessage().equals(RESET_BY_PEER) &&
-                    !e.getCause().getMessage().equals(BROKEN_PIPE)) {
+        } else if (cause instanceof IOException) {
+            if (!cause.getMessage().equals(RESET_BY_PEER) &&
+                    !cause.getMessage().equals(BROKEN_PIPE)) {
                 log.error("Disconnecting switch {} due to IO Error: {}",
-                          getSwitchInfoString(), e.getCause().getMessage());
+                          getSwitchInfoString(), cause.getMessage());
                 if (log.isDebugEnabled()) {
                     // still print stack trace if debug is enabled
-                    log.debug("StackTrace for previous Exception: ", e.getCause());
+                    log.debug("StackTrace for previous Exception: ", cause);
                 }
             }
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof SwitchStateException) {
+            ctx.channel().close();
+        } else if (cause instanceof SwitchStateException) {
             log.error("Disconnecting switch {} due to switch state error: {}",
-                    getSwitchInfoString(), e.getCause().getMessage());
+                    getSwitchInfoString(), cause.getMessage());
             if (log.isDebugEnabled()) {
                 // still print stack trace if debug is enabled
-                log.debug("StackTrace for previous Exception: ", e.getCause());
+                log.debug("StackTrace for previous Exception: ", cause);
             }
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof OFParseError) {
+            ctx.channel().close();
+        } else if (cause instanceof OFParseError) {
             log.error("Disconnecting switch "
                     + getSwitchInfoString() +
                     " due to message parse failure",
-                    e.getCause());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof RejectedExecutionException) {
+                    cause);
+            ctx.channel().close();
+        } else if (cause instanceof RejectedExecutionException) {
             log.warn("Could not process message: queue full");
         } else {
             log.error("Error while processing message from switch "
                     + getSwitchInfoString()
-                    + "state " + this.state, e.getCause());
-            ctx.getChannel().close();
+                    + "state " + this.state, cause);
+            ctx.channel().close();
         }
     }
 
@@ -1273,38 +1350,54 @@
         return getSwitchInfoString();
     }
 
-    @Override
-    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+    protected void channelIdle(ChannelHandlerContext ctx,
+                               IdleStateEvent e)
             throws Exception {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
         OFMessage m = factory.buildEchoRequest().build();
         log.debug("Sending Echo Request on idle channel: {}",
-                e.getChannel().getPipeline().getLast());
-        e.getChannel().write(Collections.singletonList(m));
+                  ctx.channel());
+        ctx.write(Collections.singletonList(m), ctx.voidPromise());
         // XXX S some problems here -- echo request has no transaction id, and
         // echo reply is not correlated to the echo request.
         state.processIdle(this);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void userEventTriggered(ChannelHandlerContext ctx,
+                                   Object evt)
             throws Exception {
-        if (e.getMessage() instanceof List) {
-            @SuppressWarnings("unchecked")
-            List<OFMessage> msglist = (List<OFMessage>) e.getMessage();
 
+        if (evt instanceof IdleStateEvent) {
+            channelIdle(ctx, (IdleStateEvent) evt);
+        }
 
-            for (OFMessage ofm : msglist) {
-                // Do the actual packet processing
-                state.processOFMessage(this, ofm);
+        super.userEventTriggered(ctx, evt);
+    }
+
+    // SimpleChannelInboundHandler without dependency to TypeParameterMatcher
+    @Override
+    public void channelRead(ChannelHandlerContext ctx,
+                            Object msg) throws Exception {
+
+        boolean release = true;
+        try {
+            if (msg instanceof OFMessage) {
+                // channelRead0 inlined
+                state.processOFMessage(this, (OFMessage) msg);
+            } else {
+                release = false;
+                ctx.fireChannelRead(msg);
             }
-        } else {
-            state.processOFMessage(this, (OFMessage) e.getMessage());
+        } finally {
+            if (release) {
+                ReferenceCountUtil.release(msg);
+            }
         }
     }
 
 
 
+
     //*************************
     //  Channel utility methods
     //*************************
@@ -1318,7 +1411,63 @@
     }
 
     private void dispatchMessage(OFMessage m) {
-        sw.handleMessage(m);
+
+        if (dispatchBacklog.isEmpty()) {
+            if (!dispatchQueue.offer(m)) {
+                // queue full
+                channel.config().setAutoRead(false);
+                // put it on the head of backlog
+                dispatchBacklog.addFirst(m);
+                return;
+            }
+        } else {
+            dispatchBacklog.addLast(m);
+        }
+
+        while (!dispatchBacklog.isEmpty()) {
+            OFMessage msg = dispatchBacklog.pop();
+
+            if (!dispatchQueue.offer(msg)) {
+                // queue full
+                channel.config().setAutoRead(false);
+                // put it back to the head of backlog
+                dispatchBacklog.addFirst(msg);
+                return;
+            }
+        }
+
+
+        if (dispatcherHandle.isDone()) {
+            // dispatcher terminated for some reason, restart
+
+            dispatcherHandle = dispatcher.submit(() -> {
+                try {
+                    List<OFMessage> msgs = new ArrayList<>();
+                    for (;;) {
+                        // wait for new message
+                        OFMessage msg = dispatchQueue.take();
+                        sw.handleMessage(msg);
+
+                        while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
+                            if (!channel.config().isAutoRead()) {
+                                channel.config().setAutoRead(true);
+                            }
+                            msgs.forEach(sw::handleMessage);
+                            msgs.clear();
+                        }
+
+                        if (!channel.config().isAutoRead()) {
+                            channel.config().setAutoRead(true);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    // interrupted. gracefully shutting down
+                    return;
+                }
+
+            });
+        }
     }
 
     /**
@@ -1331,10 +1480,10 @@
             return sw.toString();
         }
         String channelString;
-        if (channel == null || channel.getRemoteAddress() == null) {
+        if (channel == null || channel.remoteAddress() == null) {
             channelString = "?";
         } else {
-            channelString = channel.getRemoteAddress().toString();
+            channelString = channel.remoteAddress().toString();
         }
         String dpidString;
         if (featuresReply == null) {
@@ -1378,8 +1527,8 @@
                 .buildHello()
                 .setXid(this.handshakeTransactionIds--)
                 .setElements(Collections.singletonList(hem));
-        log.info("Sending {} Hello to {}", version, channel.getRemoteAddress());
-        channel.write(Collections.singletonList(mb.build()));
+        log.info("Sending {} Hello to {}", version, channel.remoteAddress());
+        channel.writeAndFlush(Collections.singletonList(mb.build()));
     }
 
     /**
@@ -1387,12 +1536,11 @@
      * @throws IOException
      */
     private void sendHandshakeFeaturesRequestMessage() throws IOException {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending FEATURES_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending FEATURES_REQUEST to {}", channel.remoteAddress());
         OFMessage m = factory.buildFeaturesRequest()
                 .setXid(this.handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(m));
+        channel.writeAndFlush(Collections.singletonList(m));
     }
 
     /**
@@ -1401,8 +1549,7 @@
      * @throws IOException
      */
     private void sendHandshakeSetConfig() throws IOException {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending CONFIG_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending CONFIG_REQUEST to {}", channel.remoteAddress());
         List<OFMessage> msglist = new ArrayList<>(3);
 
         // Ensure we receive the full packet via PacketIn
@@ -1431,7 +1578,7 @@
                 .setXid(this.handshakeTransactionIds--)
                 .build();
         msglist.add(gcr);
-        channel.write(msglist);
+        channel.writeAndFlush(msglist);
     }
 
     /**
@@ -1440,13 +1587,12 @@
      */
     private void sendHandshakeDescriptionStatsRequest() throws IOException {
         // Get Description to set switch-specific flags
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending DESC_STATS_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending DESC_STATS_REQUEST to {}", channel.remoteAddress());
         OFDescStatsRequest dreq = factory
                 .buildDescStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(dreq));
+        channel.writeAndFlush(Collections.singletonList(dreq));
     }
 
     /**
@@ -1457,26 +1603,59 @@
     private void sendMeterFeaturesRequest() throws IOException {
         // Get meter features including the MaxMeters value available for the device
         OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending METER_FEATURES_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending METER_FEATURES_REQUEST to {}", channel.remoteAddress());
         OFMeterFeaturesStatsRequest mfreq = factory
                 .buildMeterFeaturesStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(mfreq));
+        channel.writeAndFlush(Collections.singletonList(mfreq));
     }
 
     private void sendHandshakeOFPortDescRequest() throws IOException {
-        log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.remoteAddress());
         // Get port description for 1.3+ switch
-        OFPortDescStatsRequest preq = OFFactories.getFactory(ofVersion)
+        OFPortDescStatsRequest preq = factory
                 .buildPortDescStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(preq));
+        channel.writeAndFlush(Collections.singletonList(preq));
     }
 
     ChannelState getStateForTesting() {
         return state;
     }
 
+
+    @Override
+    public boolean isActive() {
+        if (channel != null) {
+            return channel.isActive();
+        }
+        return false;
+    }
+
+    @Override
+    public void closeSession() {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    @Override
+    public boolean sendMsg(Iterable<OFMessage> msgs) {
+        if (channel.isActive()) {
+            channel.writeAndFlush(msgs, channel.voidPromise());
+            return true;
+        } else {
+            log.warn("Dropping messages for switch {} because channel is not connected: {}",
+                     getSwitchInfoString(), msgs);
+            return false;
+        }
+    }
+
+    @Override
+    public CharSequence sessionInfo() {
+        return channelId;
+    }
+
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
new file mode 100644
index 0000000..4e6cca5
--- /dev/null
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelInitializer.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2015-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.openflow.controller.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+/**
+ * Creates a ChannelInitializer for a server-side openflow channel.
+ */
+public class OFChannelInitializer
+    extends ChannelInitializer<SocketChannel> {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+    private final SSLContext sslContext;
+    protected Controller controller;
+    protected EventExecutorGroup pipelineExecutor;
+
+    public OFChannelInitializer(Controller controller,
+                                   EventExecutorGroup pipelineExecutor,
+                                   SSLContext sslContext) {
+        super();
+        this.controller = controller;
+        this.pipelineExecutor = pipelineExecutor;
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    protected void initChannel(SocketChannel ch) throws Exception {
+
+        OFChannelHandler handler = new OFChannelHandler(controller);
+
+        ChannelPipeline pipeline = ch.pipeline();
+        if (sslContext != null) {
+            log.info("OpenFlow SSL enabled.");
+            SSLEngine sslEngine = sslContext.createSSLEngine();
+
+            sslEngine.setNeedClientAuth(true);
+            sslEngine.setUseClientMode(false);
+            sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
+            sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
+            sslEngine.setEnableSessionCreation(true);
+
+            SslHandler sslHandler = new SslHandler(sslEngine);
+            pipeline.addLast("ssl", sslHandler);
+        } else {
+            log.debug("OpenFlow SSL disabled.");
+        }
+        pipeline.addLast("ofmessageencoder", OFMessageEncoder.getInstance());
+        pipeline.addLast("ofmessagedecoder", OFMessageDecoder.getInstance());
+
+        pipeline.addLast("idle", new IdleStateHandler(20, 25, 0));
+        pipeline.addLast("timeout", new ReadTimeoutHandler(30));
+
+        // XXX S ONOS: was 15 increased it to fix Issue #296
+        pipeline.addLast("handshaketimeout",
+                         new HandshakeTimeoutHandler(handler, 60));
+        // ExecutionHandler equivalent now part of Netty core
+        if (pipelineExecutor != null) {
+            pipeline.addLast(pipelineExecutor, "handler", handler);
+        } else {
+            pipeline.addLast("handler", handler);
+        }
+    }
+}
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
index c5cb9f7..14894f2 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageDecoder.java
@@ -18,30 +18,45 @@
 
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.List;
+
 import org.projectfloodlight.openflow.protocol.OFFactories;
 import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFMessageReader;
+import org.slf4j.Logger;
 
 /**
- * Decode an openflow message from a Channel, for use in a netty pipeline.
+ * Decode an openflow message from a netty channel, for use in a netty pipeline.
  */
-public class OFMessageDecoder extends FrameDecoder {
+public final class OFMessageDecoder extends ByteToMessageDecoder {
+
+    private static final Logger log = getLogger(OFMessageDecoder.class);
+
+    public static OFMessageDecoder getInstance() {
+        // not Sharable
+        return new OFMessageDecoder();
+    }
+
+    private OFMessageDecoder() {}
+
 
     @Override
-    protected Object decode(ChannelHandlerContext ctx, Channel channel,
-            ChannelBuffer buffer) throws Exception {
-        if (!channel.isConnected()) {
+    protected void decode(ChannelHandlerContext ctx,
+                          ByteBuf byteBuf,
+                          List<Object> out) throws Exception {
+
+        if (!ctx.channel().isActive()) {
             // In testing, I see decode being called AFTER decode last.
             // This check avoids that from reading corrupted frames
-            return null;
+            return;
         }
 
-        // Note that a single call to decode results in reading a single
+        // Note that a single call to readFrom results in reading a single
         // OFMessage from the channel buffer, which is passed on to, and processed
         // by, the controller (in OFChannelHandler).
         // This is different from earlier behavior (with the original openflowj),
@@ -50,16 +65,11 @@
         // The performance *may or may not* not be as good as before.
         OFMessageReader<OFMessage> reader = OFFactories.getGenericReader();
 
-        //toByteBuffer is optimized to avoid copying.
-        ByteBuf byteBuf = Unpooled.wrappedBuffer(buffer.toByteBuffer());
         OFMessage message = reader.readFrom(byteBuf);
-
-        if (message != null) {
-            //set buffer's read index, as it has not been changed.
-            buffer.readerIndex(buffer.readerIndex() + byteBuf.readerIndex());
+        while (message != null) {
+            out.add(message);
+            message = reader.readFrom(byteBuf);
         }
-
-        return message;
     }
 
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
index 727301f..4dd8009 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
@@ -16,49 +16,81 @@
 
 package org.onosproject.openflow.controller.impl;
 
-import java.util.List;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.EncoderException;
+import static org.slf4j.LoggerFactory.getLogger;
+
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
 
 /**
- * Encode an openflow message for output into a ChannelBuffer, for use in a
+ * Encode an openflow message for output into a netty channel, for use in a
  * netty pipeline.
  */
-public class OFMessageEncoder extends OneToOneEncoder {
+@Sharable
+public final class OFMessageEncoder extends ChannelOutboundHandlerAdapter {
 
+    private static final Logger log = getLogger(OFMessageEncoder.class);
+
+    private static final OFMessageEncoder INSTANCE = new OFMessageEncoder();
+
+    public static OFMessageEncoder getInstance() {
+        return INSTANCE;
+    }
+
+    private OFMessageEncoder() {}
+
+    protected final void encode(ChannelHandlerContext ctx,
+                          Iterable<OFMessage> msgs,
+                          ByteBuf out) throws Exception {
+
+        msgs.forEach(msg -> msg.writeTo(out));
+    }
+
+    // MessageToByteEncoder without dependency to TypeParameterMatcher
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel,
-                            Object msg) throws Exception {
-        if (!(msg instanceof List)) {
-            return msg;
-        }
+    public void write(ChannelHandlerContext ctx,
+                      Object msg,
+                      ChannelPromise promise) throws Exception {
 
-        @SuppressWarnings("unchecked")
-        List<OFMessage> msglist = (List<OFMessage>) msg;
-        /* XXX S can't get length of OFMessage in loxigen's openflowj??
-        int size = 0;
-        for (OFMessage ofm : msglist) {
-            size += ofm.getLengthU();
-        }*/
+        ByteBuf buf = null;
+        try {
+            if (msg instanceof Iterable) {
+                @SuppressWarnings("unchecked")
+                Iterable<OFMessage> ofmsgs =  (Iterable<OFMessage>) msg;
+                buf = ctx.alloc().ioBuffer();
 
-        ByteBuf bb = Unpooled.buffer();
+                encode(ctx, ofmsgs, buf);
 
-        for (OFMessage ofm : msglist) {
-            if (ofm != null) {
-                ofm.writeTo(bb);
+                if (buf.isReadable()) {
+                    ctx.write(buf, promise);
+                } else {
+                    log.warn("NOTHING WAS WRITTEN for {}", msg);
+                    buf.release();
+                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
+                }
+                buf = null;
+
+            } else {
+                log.warn("Attempted to encode unexpected message: {}", msg);
+                ctx.write(msg, promise);
+            }
+        } catch (EncoderException e) {
+            log.error("EncoderException handling {}", msg, e);
+            throw e;
+        } catch (Throwable e) {
+            log.error("Exception handling {}", msg, e);
+            throw new EncoderException(e);
+        } finally {
+            if (buf != null) {
+                buf.release();
             }
         }
-
-        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bb.nioBuffer());
-
-        return buf;
     }
 
 }
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index cd4548d..21f2c0f 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -289,9 +289,9 @@
         write(dpid, msg);
 
         ConcurrentMap<Long, CompletableFuture<OFMessage>> xids =
-                responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap());
+                responses.computeIfAbsent(dpid, k -> new ConcurrentHashMap<>());
 
-        CompletableFuture<OFMessage> future = new CompletableFuture();
+        CompletableFuture<OFMessage> future = new CompletableFuture<>();
         xids.put(msg.getXid(), future);
 
         return future;
@@ -303,14 +303,16 @@
         Collection<OFTableStatsEntry> tableStats;
         Collection<OFGroupStatsEntry> groupStats;
         Collection<OFGroupDescStatsEntry> groupDescStats;
-        Collection<OFPortStatsEntry> portStats;
 
         OpenFlowSwitch sw = this.getSwitch(dpid);
 
         // Check if someone is waiting for this message
         ConcurrentMap<Long, CompletableFuture<OFMessage>> xids = responses.get(dpid);
-        if (xids != null && xids.containsKey(msg.getXid())) {
-            xids.remove(msg.getXid()).complete(msg);
+        if (xids != null) {
+            CompletableFuture<OFMessage> future = xids.remove(msg.getXid());
+            if (future != null) {
+                future.complete(msg);
+            }
         }
 
         switch (msg.getType()) {
@@ -326,7 +328,7 @@
             break;
         case PACKET_IN:
             if (sw == null) {
-                log.error("Switch {} is not found", dpid);
+                log.error("Ignoring PACKET_IN, switch {} is not found", dpid);
                 break;
             }
             OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java
deleted file mode 100644
index c94d315..0000000
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenflowPipelineFactory.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2015-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.openflow.controller.impl;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.ExternalResourceReleasable;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
-import static org.onlab.util.Tools.groupedThreads;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * Creates a ChannelPipeline for a server-side openflow channel.
- */
-public class OpenflowPipelineFactory
-    implements ChannelPipelineFactory, ExternalResourceReleasable {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-
-    private final SSLContext sslContext;
-    protected Controller controller;
-    protected ThreadPoolExecutor pipelineExecutor;
-    protected Timer timer;
-    protected IdleStateHandler idleHandler;
-    protected ReadTimeoutHandler readTimeoutHandler;
-
-    public OpenflowPipelineFactory(Controller controller,
-                                   ThreadPoolExecutor pipelineExecutor,
-                                   SSLContext sslContext) {
-        super();
-        this.controller = controller;
-        this.pipelineExecutor = pipelineExecutor;
-        this.timer = new HashedWheelTimer(groupedThreads("OpenflowPipelineFactory", "timer-%d", log));
-        this.idleHandler = new IdleStateHandler(timer, 20, 25, 0);
-        this.readTimeoutHandler = new ReadTimeoutHandler(timer, 30);
-        this.sslContext = sslContext;
-    }
-
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-        OFChannelHandler handler = new OFChannelHandler(controller);
-
-        ChannelPipeline pipeline = Channels.pipeline();
-        if (sslContext != null) {
-            log.debug("OpenFlow SSL enabled.");
-            SSLEngine sslEngine = sslContext.createSSLEngine();
-
-            sslEngine.setNeedClientAuth(true);
-            sslEngine.setUseClientMode(false);
-            sslEngine.setEnabledProtocols(sslEngine.getSupportedProtocols());
-            sslEngine.setEnabledCipherSuites(sslEngine.getSupportedCipherSuites());
-            sslEngine.setEnableSessionCreation(true);
-
-            SslHandler sslHandler = new SslHandler(sslEngine);
-            pipeline.addLast("ssl", sslHandler);
-        } else {
-            log.debug("OpenFlow SSL disabled.");
-        }
-        pipeline.addLast("ofmessagedecoder", new OFMessageDecoder());
-        pipeline.addLast("ofmessageencoder", new OFMessageEncoder());
-        pipeline.addLast("idle", idleHandler);
-        pipeline.addLast("timeout", readTimeoutHandler);
-        // XXX S ONOS: was 15 increased it to fix Issue #296
-        pipeline.addLast("handshaketimeout",
-                         new HandshakeTimeoutHandler(handler, timer, 60));
-        if (pipelineExecutor != null) {
-            pipeline.addLast("pipelineExecutor",
-                             new ExecutionHandler(pipelineExecutor));
-        }
-        pipeline.addLast("handler", handler);
-        return pipeline;
-    }
-
-    @Override
-    public void releaseExternalResources() {
-        timer.stop();
-    }
-}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
new file mode 100644
index 0000000..a786211
--- /dev/null
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow;
+
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
+/**
+ * Dummy Channel for testing.
+ */
+public class ChannelAdapter implements Channel {
+
+    @Override
+    public <T> Attribute<T> attr(AttributeKey<T> key) {
+
+        return null;
+    }
+
+    @Override
+    public int compareTo(Channel o) {
+        return 0;
+    }
+
+    @Override
+    public EventLoop eventLoop() {
+        return null;
+    }
+
+    @Override
+    public Channel parent() {
+        return null;
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return null;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+    @Override
+    public boolean isRegistered() {
+        return false;
+    }
+
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return null;
+    }
+
+    @Override
+    public SocketAddress localAddress() {
+        return null;
+    }
+
+    @Override
+    public SocketAddress remoteAddress() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture closeFuture() {
+        return null;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return false;
+    }
+
+    @Override
+    public Unsafe unsafe() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline pipeline() {
+        return null;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable cause) {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress,
+                              ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public Channel read() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public Channel flush() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public <T> boolean hasAttr(AttributeKey<T> key) {
+
+        return false;
+    }
+
+    @Override
+    public ChannelId id() {
+
+        return null;
+    }
+
+    @Override
+    public long bytesBeforeUnwritable() {
+
+        return 0;
+    }
+
+    @Override
+    public long bytesBeforeWritable() {
+
+        return 0;
+    }
+
+}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
index 4b5afb7..2fcc6b9 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
@@ -15,63 +15,272 @@
  */
 package org.onosproject.openflow;
 
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
 
 /**
  * Adapter for testing against a netty channel handler context.
  */
 public class ChannelHandlerContextAdapter implements ChannelHandlerContext {
+
     @Override
-    public Channel getChannel() {
+    public <T> Attribute<T> attr(AttributeKey<T> key) {
+
         return null;
     }
 
     @Override
-    public ChannelPipeline getPipeline() {
+    public Channel channel() {
+
+        return new ChannelAdapter();
+    }
+
+    @Override
+    public EventExecutor executor() {
+
         return null;
     }
 
     @Override
-    public String getName() {
+    public String name() {
+
         return null;
     }
 
     @Override
-    public ChannelHandler getHandler() {
+    public ChannelHandler handler() {
+
         return null;
     }
 
     @Override
-    public boolean canHandleUpstream() {
+    public boolean isRemoved() {
+
         return false;
     }
 
     @Override
-    public boolean canHandleDownstream() {
-        return false;
-    }
+    public ChannelHandlerContext fireChannelRegistered() {
 
-    @Override
-    public void sendUpstream(ChannelEvent channelEvent) {
-
-    }
-
-    @Override
-    public void sendDownstream(ChannelEvent channelEvent) {
-
-    }
-
-    @Override
-    public Object getAttachment() {
         return null;
     }
 
     @Override
-    public void setAttachment(Object o) {
+    public ChannelHandlerContext fireChannelUnregistered() {
 
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelActive() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelInactive() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireUserEventTriggered(Object event) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelRead(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelReadComplete() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelWritabilityChanged() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress,
+                              ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext read() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext flush() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline pipeline() {
+
+        return null;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable cause) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+
+        return null;
+    }
+
+    @Override
+    public <T> boolean hasAttr(AttributeKey<T> key) {
+        return false;
     }
 }
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
index 74751db..0083038 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
@@ -15,11 +15,11 @@
  */
 package org.onosproject.openflow;
 
-import org.jboss.netty.channel.Channel;
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.RoleState;
 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
 import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
@@ -147,7 +147,7 @@
     }
 
     @Override
-    public void setChannel(Channel channel) {
+    public void setChannel(OpenFlowSession channel) {
 
     }
 
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
index 6c109ae..f2617cd 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
@@ -16,38 +16,47 @@
 package org.onosproject.openflow.controller.impl;
 
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
-import org.onosproject.core.netty.ChannelAdapter;
+import org.onosproject.openflow.ChannelAdapter;
 import org.onosproject.openflow.ChannelHandlerContextAdapter;
 import org.projectfloodlight.openflow.protocol.OFHello;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.is;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Tests for the OpenFlow message decoder.
  */
 public class OFMessageDecoderTest {
 
-    static class ConnectedChannel extends ChannelAdapter {
-        @Override
-        public boolean isConnected() {
-            return true;
-        }
-    }
+    private ByteBuf buf;
 
-    private ChannelBuffer getHelloMessageBuffer() {
+    private ByteBuf getHelloMessageBuffer() {
         // OFHello, OF version 1, xid of 0, total of 8 bytes
         byte[] messageData = {0x1, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0};
-        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
-        channelBuffer.writeBytes(messageData);
-        return channelBuffer;
+        buf.writeBytes(messageData);
+        return buf;
     }
 
+    @Before
+    public void setUp() {
+        buf = ByteBufAllocator.DEFAULT.buffer();
+    }
+
+    @After
+    public void tearDown() {
+        buf.release();
+    }
+
+
     /**
      * Tests decoding a message on a closed channel.
      *
@@ -55,13 +64,13 @@
      */
     @Test
     public void testDecodeNoChannel() throws Exception {
-        OFMessageDecoder decoder = new OFMessageDecoder();
-        ChannelBuffer channelBuffer = getHelloMessageBuffer();
-        Object message =
-                decoder.decode(new ChannelHandlerContextAdapter(),
-                               new ChannelAdapter(),
-                               channelBuffer);
-        assertThat(message, nullValue());
+        OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+        ByteBuf channelBuffer = getHelloMessageBuffer();
+        List<Object> out = new ArrayList<>();
+        decoder.decode(new ChannelHandlerContextAdapter(),
+                       channelBuffer,
+                       out);
+        assertThat(out.size(), is(0));
     }
 
     /**
@@ -71,14 +80,29 @@
      */
     @Test
     public void testDecode() throws Exception {
-        OFMessageDecoder decoder = new OFMessageDecoder();
-        ChannelBuffer channelBuffer = getHelloMessageBuffer();
-        Object message =
-                decoder.decode(new ChannelHandlerContextAdapter(),
-                               new ConnectedChannel(),
-                               channelBuffer);
-        assertThat(message, notNullValue());
-        assertThat(message, instanceOf(OFHello.class));
+        OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+        ByteBuf channelBuffer = getHelloMessageBuffer();
+        List<Object> out = new ArrayList<>();
+        decoder.decode(new ActiveChannelHandlerContextAdapter(),
+                       channelBuffer,
+                       out);
+        assertThat(out.size(), is(1));
+        assertThat(out.get(0), instanceOf(OFHello.class));
+    }
+
+    public class ActiveChannelHandlerContextAdapter
+            extends ChannelHandlerContextAdapter {
+
+        @Override
+        public Channel channel() {
+            return new ChannelAdapter() {
+                @Override
+                public boolean isActive() {
+                    return true;
+                }
+            };
+        }
+
     }
 
 }
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
index e62345c..636c568 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
@@ -16,26 +16,27 @@
 package org.onosproject.openflow.controller.impl;
 
 import java.nio.charset.StandardCharsets;
-import java.util.List;
+import java.util.Collections;
 
 import io.netty.buffer.ByteBuf;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBufAllocator;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.openflow.OfMessageAdapter;
-import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFType;
 
-import com.google.common.collect.ImmutableList;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 
 /**
  * Tests for the OpenFlow message encoder.
  */
 public class OFMessageEncoderTest {
 
+    private ByteBuf buf;
     static class MockOfMessage extends OfMessageAdapter {
         static int nextId = 1;
         final int id;
@@ -52,40 +53,26 @@
         }
     }
 
-    /**
-     * Tests that encoding a non-list returns the object specified.
-     *
-     * @throws Exception on exception in the encoder
-     */
-    @Test
-    public void testNoList() throws Exception {
-        OFMessageEncoder encoder = new OFMessageEncoder();
-        MockOfMessage message = new MockOfMessage();
-        OFMessage returnedMessage =
-                (OFMessage) encoder.encode(null, null, message);
-        assertThat(message, is(returnedMessage));
+    @Before
+    public void setUp() {
+        buf = ByteBufAllocator.DEFAULT.buffer();
     }
 
-    /**
-     * Tests that encoding a list returns the proper encoded payload.
-     *
-     * @throws Exception on exception in the encoder
-     */
+    @After
+    public void tearDown() {
+        buf.release();
+    }
+
     @Test
-    public void testList() throws Exception {
-        OFMessageEncoder encoder = new OFMessageEncoder();
+    public void testEncode() throws Exception {
+        OFMessageEncoder encoder = OFMessageEncoder.getInstance();
         MockOfMessage message1 = new MockOfMessage();
-        MockOfMessage message2 = new MockOfMessage();
-        MockOfMessage message3 = new MockOfMessage();
-        List<MockOfMessage> messages = ImmutableList.of(message1, message2, message3);
-        ChannelBuffer returnedChannel =
-                (ChannelBuffer) encoder.encode(null, null, messages);
-        assertThat(returnedChannel, notNullValue());
-        byte[] channelBytes = returnedChannel.array();
-        String expectedListMessage = "message1 message2 message3 ";
-        String listMessage =
-                (new String(channelBytes, StandardCharsets.UTF_8))
-                        .substring(0, expectedListMessage.length());
-        assertThat(listMessage, is(expectedListMessage));
+        encoder.encode(null, Collections.singletonList(message1), buf);
+
+        assertThat(buf.isReadable(), Matchers.is(true));
+        byte[] channelBytes = new byte[buf.readableBytes()];
+        buf.readBytes(channelBytes);
+        String expectedListMessage = "message1 ";
+        assertThat(channelBytes, is(expectedListMessage.getBytes()));
     }
 }