netty4 OpenFlow southbound

- separate I/O thread and message dispatch threads

Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
index 100203f..b2d77cf 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFChannelHandler.java
@@ -16,24 +16,29 @@
 
 package org.onosproject.openflow.controller.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
+
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.onlab.packet.IpAddress;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
 import org.onosproject.openflow.controller.driver.SwitchStateException;
 import org.projectfloodlight.openflow.exceptions.OFParseError;
@@ -78,11 +83,20 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.util.ReferenceCountUtil;
+
 /**
  * Channel handler deals with the switch connection and dispatches
  * switch messages to the appropriate locations.
  */
-class OFChannelHandler extends IdleStateAwareChannelHandler {
+class OFChannelHandler extends ChannelInboundHandlerAdapter
+    implements OpenFlowSession {
+
     private static final Logger log = LoggerFactory.getLogger(OFChannelHandler.class);
 
     private static final String RESET_BY_PEER = "Connection reset by peer";
@@ -91,7 +105,11 @@
     private final Controller controller;
     private OpenFlowSwitchDriver sw;
     private long thisdpid; // channelHandler cached value of connected switch id
+
     private Channel channel;
+    private String channelId;
+
+
     // State needs to be volatile because the HandshakeTimeoutHandler
     // needs to check if the handshake is complete
     private volatile ChannelState state;
@@ -121,6 +139,7 @@
 
     //Indicates the openflow version used by this switch
     protected OFVersion ofVersion;
+    protected OFFactory factory;
 
     // deprecated in 1.10.0
     @Deprecated
@@ -135,11 +154,43 @@
      */
     private int handshakeTransactionIds = -1;
 
+
+
+    private static final int MSG_READ_BUFFER = 5000;
+
+    /**
+     * OFMessage dispatch queue.
+     */
+    private final BlockingQueue<OFMessage> dispatchQueue =
+            new LinkedBlockingQueue<>(MSG_READ_BUFFER);
+
+    /**
+     * Single thread executor for OFMessage dispatching.
+     *
+     * Gets initialized on channelActive, shutdown on channelInactive.
+     */
+    private ExecutorService dispatcher;
+
+    /**
+     * Handle for dispatcher thread.
+     * <p>
+     * Should only be touched from the Channel I/O thread
+     */
+    private Future<?> dispatcherHandle = CompletableFuture.completedFuture(null);
+
+    /**
+     * Dispatch backlog.
+     * <p>
+     * Should only be touched from the Channel I/O thread
+     */
+    private final Deque<OFMessage> dispatchBacklog = new ArrayDeque<>();
+
     /**
      * Create a new unconnected OFChannelHandler.
      * @param controller parent controller
      */
     OFChannelHandler(Controller controller) {
+
         this.controller = controller;
         this.state = ChannelState.INIT;
         this.pendingPortStatusMsg = new CopyOnWriteArrayList<>();
@@ -208,30 +259,33 @@
                 if (m.getVersion().getWireVersion() >= OFVersion.OF_13.getWireVersion()) {
                     log.debug("Received {} Hello from {} - switching to OF "
                             + "version 1.3+", m.getVersion(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     h.ofVersion = m.getVersion();
+                    h.factory = OFFactories.getFactory(h.ofVersion);
                     h.sendHandshakeHelloMessage();
                 } else if (m.getVersion().getWireVersion() >= OFVersion.OF_10.getWireVersion()) {
                     log.debug("Received {} Hello from {} - switching to OF "
                             + "version 1.0", m.getVersion(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     h.ofVersion = m.getVersion();
+                    h.factory = OFFactories.getFactory(h.ofVersion);
                     OFHello hi =
-                            OFFactories.getFactory(h.ofVersion).buildHello()
+                            h.factory.buildHello()
                                     .setXid(h.handshakeTransactionIds--)
                                     .build();
-                    h.channel.write(Collections.singletonList(hi));
+                    h.channel.writeAndFlush(Collections.singletonList(hi));
                 } else {
                     log.error("Received Hello of version {} from switch at {}. "
                             + "This controller works with OF1.0 and OF1.3 "
                             + "switches. Disconnecting switch ...",
-                            m.getVersion(), h.channel.getRemoteAddress());
+                            m.getVersion(), h.channel.remoteAddress());
                     h.channel.disconnect();
                     return;
                 }
                 h.sendHandshakeFeaturesRequestMessage();
                 h.setState(WAIT_FEATURES_REPLY);
             }
+
             @Override
             void processOFFeaturesReply(OFChannelHandler h, OFFeaturesReply  m)
                     throws IOException, SwitchStateException {
@@ -313,7 +367,7 @@
                 if (m.getStatsType() != OFStatsType.PORT_DESC) {
                     log.warn("Expecting port description stats but received stats "
                             + "type {} from {}. Ignoring ...", m.getStatsType(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     return;
                 }
                 if (m.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
@@ -465,22 +519,21 @@
                 if (m.getStatsType() != OFStatsType.DESC) {
                     log.warn("Expecting Description stats but received stats "
                             + "type {} from {}. Ignoring ...", m.getStatsType(),
-                            h.channel.getRemoteAddress());
+                            h.channel.remoteAddress());
                     return;
                 }
                 OFDescStatsReply drep = (OFDescStatsReply) m;
                 log.info("Received switch description reply {} from switch at {}",
-                         drep, h.channel.getRemoteAddress());
+                         drep, h.channel.remoteAddress());
                 // Here is where we differentiate between different kinds of switches
                 h.sw = h.controller.getOFSwitchInstance(h.thisdpid, drep, h.ofVersion);
 
                 h.sw.setOFVersion(h.ofVersion);
                 h.sw.setFeaturesReply(h.featuresReply);
-                //h.sw.setPortDescReply(h.portDescReply);
                 h.sw.setPortDescReplies(h.portDescReplies);
                 h.sw.setMeterFeaturesReply(h.meterFeaturesReply);
                 h.sw.setConnected(true);
-                h.sw.setChannel(h.channel);
+                h.sw.setChannel(h);
 //                boolean success = h.sw.connectSwitch();
 //
 //                if (!success) {
@@ -1072,7 +1125,7 @@
                 throws IOException, SwitchStateException {
             // we only expect hello in the WAIT_HELLO state
             log.warn("Received Hello outside WAIT_HELLO state; switch {} is not complaint.",
-                     h.channel.getRemoteAddress());
+                     h.channel.remoteAddress());
         }
 
         void processOFBarrierReply(OFChannelHandler h, OFBarrierReply m)
@@ -1084,16 +1137,15 @@
                 throws IOException {
             if (h.ofVersion == null) {
                 log.error("No OF version set for {}. Not sending Echo REPLY",
-                        h.channel.getRemoteAddress());
+                        h.channel.remoteAddress());
                 return;
             }
-            OFFactory factory = OFFactories.getFactory(h.ofVersion);
-                    OFEchoReply reply = factory
-                            .buildEchoReply()
-                            .setXid(m.getXid())
-                            .setData(m.getData())
-                            .build();
-                    h.channel.write(Collections.singletonList(reply));
+            OFEchoReply reply = h.factory
+                    .buildEchoReply()
+                    .setXid(m.getXid())
+                    .setData(m.getData())
+                    .build();
+            h.channel.writeAndFlush(Collections.singletonList(reply));
         }
 
         void processOFEchoReply(OFChannelHandler h, OFEchoReply m)
@@ -1175,11 +1227,28 @@
     //*************************
 
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-            ChannelStateEvent e) throws Exception {
-        channel = e.getChannel();
+    public void channelActive(ChannelHandlerContext ctx)
+            throws Exception {
+
+        channel = ctx.channel();
         log.info("New switch connection from {}",
-                channel.getRemoteAddress());
+                 channel.remoteAddress());
+
+        SocketAddress address = channel.remoteAddress();
+        if (address instanceof InetSocketAddress) {
+            final InetSocketAddress inetAddress = (InetSocketAddress) address;
+            final IpAddress ipAddress = IpAddress.valueOf(inetAddress.getAddress());
+            if (ipAddress.isIp4()) {
+                channelId = ipAddress.toString() + ':' + inetAddress.getPort();
+            } else {
+                channelId = '[' + ipAddress.toString() + "]:" + inetAddress.getPort();
+            }
+        } else {
+            channelId = channel.toString();
+        }
+
+        dispatcher = Executors.newSingleThreadExecutor(groupedThreads("onos/of/dispatcher", channelId, log));
+
         /*
             hack to wait for the switch to tell us what it's
             max version is. This is not spec compliant and should
@@ -1190,81 +1259,89 @@
     }
 
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx,
-            ChannelStateEvent e) throws Exception {
+    public void channelInactive(ChannelHandlerContext ctx)
+            throws Exception {
+
         log.info("Switch disconnected callback for sw:{}. Cleaning up ...",
-                getSwitchInfoString());
-        if (thisdpid != 0) {
-            if (!duplicateDpidFound) {
-                // if the disconnected switch (on this ChannelHandler)
-                // was not one with a duplicate-dpid, it is safe to remove all
-                // state for it at the controller. Notice that if the disconnected
-                // switch was a duplicate-dpid, calling the method below would clear
-                // all state for the original switch (with the same dpid),
-                // which we obviously don't want.
-                log.info("{}:removal called", getSwitchInfoString());
-                if (sw != null) {
-                    sw.removeConnectedSwitch();
-                }
-            } else {
-                // A duplicate was disconnected on this ChannelHandler,
-                // this is the same switch reconnecting, but the original state was
-                // not cleaned up - XXX check liveness of original ChannelHandler
-                log.info("{}:duplicate found", getSwitchInfoString());
-                duplicateDpidFound = Boolean.FALSE;
-            }
-        } else {
-            log.warn("no dpid in channelHandler registered for "
-                    + "disconnected switch {}", getSwitchInfoString());
+                 getSwitchInfoString());
+
+        if (dispatcher != null) {
+            dispatcher.shutdown();
         }
+
+         if (thisdpid != 0) {
+             if (!duplicateDpidFound) {
+                 // if the disconnected switch (on this ChannelHandler)
+                 // was not one with a duplicate-dpid, it is safe to remove all
+                 // state for it at the controller. Notice that if the disconnected
+                 // switch was a duplicate-dpid, calling the method below would clear
+                 // all state for the original switch (with the same dpid),
+                 // which we obviously don't want.
+                 log.info("{}:removal called", getSwitchInfoString());
+                 if (sw != null) {
+                     sw.removeConnectedSwitch();
+                 }
+             } else {
+                 // A duplicate was disconnected on this ChannelHandler,
+                 // this is the same switch reconnecting, but the original state was
+                 // not cleaned up - XXX check liveness of original ChannelHandler
+                 log.info("{}:duplicate found", getSwitchInfoString());
+                 duplicateDpidFound = Boolean.FALSE;
+             }
+         } else {
+             log.warn("no dpid in channelHandler registered for "
+                     + "disconnected switch {}", getSwitchInfoString());
+         }
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx,
+                                Throwable cause)
             throws Exception {
-        if (e.getCause() instanceof ReadTimeoutException) {
+
+        if (cause instanceof ReadTimeoutException) {
             // switch timeout
             log.error("Disconnecting switch {} due to read timeout",
                     getSwitchInfoString());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof HandshakeTimeoutException) {
+            ctx.channel().close();
+        } else if (cause instanceof HandshakeTimeoutException) {
             log.error("Disconnecting switch {}: failed to complete handshake",
                     getSwitchInfoString());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof ClosedChannelException) {
+            ctx.channel().close();
+        } else if (cause instanceof ClosedChannelException) {
             log.debug("Channel for sw {} already closed", getSwitchInfoString());
-        } else if (e.getCause() instanceof IOException) {
-            if (!e.getCause().getMessage().equals(RESET_BY_PEER) &&
-                    !e.getCause().getMessage().equals(BROKEN_PIPE)) {
+        } else if (cause instanceof IOException) {
+            if (!cause.getMessage().equals(RESET_BY_PEER) &&
+                    !cause.getMessage().equals(BROKEN_PIPE)) {
                 log.error("Disconnecting switch {} due to IO Error: {}",
-                          getSwitchInfoString(), e.getCause().getMessage());
+                          getSwitchInfoString(), cause.getMessage());
                 if (log.isDebugEnabled()) {
                     // still print stack trace if debug is enabled
-                    log.debug("StackTrace for previous Exception: ", e.getCause());
+                    log.debug("StackTrace for previous Exception: ", cause);
                 }
             }
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof SwitchStateException) {
+            ctx.channel().close();
+        } else if (cause instanceof SwitchStateException) {
             log.error("Disconnecting switch {} due to switch state error: {}",
-                    getSwitchInfoString(), e.getCause().getMessage());
+                    getSwitchInfoString(), cause.getMessage());
             if (log.isDebugEnabled()) {
                 // still print stack trace if debug is enabled
-                log.debug("StackTrace for previous Exception: ", e.getCause());
+                log.debug("StackTrace for previous Exception: ", cause);
             }
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof OFParseError) {
+            ctx.channel().close();
+        } else if (cause instanceof OFParseError) {
             log.error("Disconnecting switch "
                     + getSwitchInfoString() +
                     " due to message parse failure",
-                    e.getCause());
-            ctx.getChannel().close();
-        } else if (e.getCause() instanceof RejectedExecutionException) {
+                    cause);
+            ctx.channel().close();
+        } else if (cause instanceof RejectedExecutionException) {
             log.warn("Could not process message: queue full");
         } else {
             log.error("Error while processing message from switch "
                     + getSwitchInfoString()
-                    + "state " + this.state, e.getCause());
-            ctx.getChannel().close();
+                    + "state " + this.state, cause);
+            ctx.channel().close();
         }
     }
 
@@ -1273,38 +1350,54 @@
         return getSwitchInfoString();
     }
 
-    @Override
-    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
+    protected void channelIdle(ChannelHandlerContext ctx,
+                               IdleStateEvent e)
             throws Exception {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
         OFMessage m = factory.buildEchoRequest().build();
         log.debug("Sending Echo Request on idle channel: {}",
-                e.getChannel().getPipeline().getLast());
-        e.getChannel().write(Collections.singletonList(m));
+                  ctx.channel());
+        ctx.write(Collections.singletonList(m), ctx.voidPromise());
         // XXX S some problems here -- echo request has no transaction id, and
         // echo reply is not correlated to the echo request.
         state.processIdle(this);
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+    public void userEventTriggered(ChannelHandlerContext ctx,
+                                   Object evt)
             throws Exception {
-        if (e.getMessage() instanceof List) {
-            @SuppressWarnings("unchecked")
-            List<OFMessage> msglist = (List<OFMessage>) e.getMessage();
 
+        if (evt instanceof IdleStateEvent) {
+            channelIdle(ctx, (IdleStateEvent) evt);
+        }
 
-            for (OFMessage ofm : msglist) {
-                // Do the actual packet processing
-                state.processOFMessage(this, ofm);
+        super.userEventTriggered(ctx, evt);
+    }
+
+    // SimpleChannelInboundHandler without dependency to TypeParameterMatcher
+    @Override
+    public void channelRead(ChannelHandlerContext ctx,
+                            Object msg) throws Exception {
+
+        boolean release = true;
+        try {
+            if (msg instanceof OFMessage) {
+                // channelRead0 inlined
+                state.processOFMessage(this, (OFMessage) msg);
+            } else {
+                release = false;
+                ctx.fireChannelRead(msg);
             }
-        } else {
-            state.processOFMessage(this, (OFMessage) e.getMessage());
+        } finally {
+            if (release) {
+                ReferenceCountUtil.release(msg);
+            }
         }
     }
 
 
 
+
     //*************************
     //  Channel utility methods
     //*************************
@@ -1318,7 +1411,63 @@
     }
 
     private void dispatchMessage(OFMessage m) {
-        sw.handleMessage(m);
+
+        if (dispatchBacklog.isEmpty()) {
+            if (!dispatchQueue.offer(m)) {
+                // queue full
+                channel.config().setAutoRead(false);
+                // put it on the head of backlog
+                dispatchBacklog.addFirst(m);
+                return;
+            }
+        } else {
+            dispatchBacklog.addLast(m);
+        }
+
+        while (!dispatchBacklog.isEmpty()) {
+            OFMessage msg = dispatchBacklog.pop();
+
+            if (!dispatchQueue.offer(msg)) {
+                // queue full
+                channel.config().setAutoRead(false);
+                // put it back to the head of backlog
+                dispatchBacklog.addFirst(msg);
+                return;
+            }
+        }
+
+
+        if (dispatcherHandle.isDone()) {
+            // dispatcher terminated for some reason, restart
+
+            dispatcherHandle = dispatcher.submit(() -> {
+                try {
+                    List<OFMessage> msgs = new ArrayList<>();
+                    for (;;) {
+                        // wait for new message
+                        OFMessage msg = dispatchQueue.take();
+                        sw.handleMessage(msg);
+
+                        while (dispatchQueue.drainTo(msgs, MSG_READ_BUFFER) > 0) {
+                            if (!channel.config().isAutoRead()) {
+                                channel.config().setAutoRead(true);
+                            }
+                            msgs.forEach(sw::handleMessage);
+                            msgs.clear();
+                        }
+
+                        if (!channel.config().isAutoRead()) {
+                            channel.config().setAutoRead(true);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    // interrupted. gracefully shutting down
+                    return;
+                }
+
+            });
+        }
     }
 
     /**
@@ -1331,10 +1480,10 @@
             return sw.toString();
         }
         String channelString;
-        if (channel == null || channel.getRemoteAddress() == null) {
+        if (channel == null || channel.remoteAddress() == null) {
             channelString = "?";
         } else {
-            channelString = channel.getRemoteAddress().toString();
+            channelString = channel.remoteAddress().toString();
         }
         String dpidString;
         if (featuresReply == null) {
@@ -1378,8 +1527,8 @@
                 .buildHello()
                 .setXid(this.handshakeTransactionIds--)
                 .setElements(Collections.singletonList(hem));
-        log.info("Sending {} Hello to {}", version, channel.getRemoteAddress());
-        channel.write(Collections.singletonList(mb.build()));
+        log.info("Sending {} Hello to {}", version, channel.remoteAddress());
+        channel.writeAndFlush(Collections.singletonList(mb.build()));
     }
 
     /**
@@ -1387,12 +1536,11 @@
      * @throws IOException
      */
     private void sendHandshakeFeaturesRequestMessage() throws IOException {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending FEATURES_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending FEATURES_REQUEST to {}", channel.remoteAddress());
         OFMessage m = factory.buildFeaturesRequest()
                 .setXid(this.handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(m));
+        channel.writeAndFlush(Collections.singletonList(m));
     }
 
     /**
@@ -1401,8 +1549,7 @@
      * @throws IOException
      */
     private void sendHandshakeSetConfig() throws IOException {
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending CONFIG_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending CONFIG_REQUEST to {}", channel.remoteAddress());
         List<OFMessage> msglist = new ArrayList<>(3);
 
         // Ensure we receive the full packet via PacketIn
@@ -1431,7 +1578,7 @@
                 .setXid(this.handshakeTransactionIds--)
                 .build();
         msglist.add(gcr);
-        channel.write(msglist);
+        channel.writeAndFlush(msglist);
     }
 
     /**
@@ -1440,13 +1587,12 @@
      */
     private void sendHandshakeDescriptionStatsRequest() throws IOException {
         // Get Description to set switch-specific flags
-        OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending DESC_STATS_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending DESC_STATS_REQUEST to {}", channel.remoteAddress());
         OFDescStatsRequest dreq = factory
                 .buildDescStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(dreq));
+        channel.writeAndFlush(Collections.singletonList(dreq));
     }
 
     /**
@@ -1457,26 +1603,59 @@
     private void sendMeterFeaturesRequest() throws IOException {
         // Get meter features including the MaxMeters value available for the device
         OFFactory factory = OFFactories.getFactory(ofVersion);
-        log.debug("Sending METER_FEATURES_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending METER_FEATURES_REQUEST to {}", channel.remoteAddress());
         OFMeterFeaturesStatsRequest mfreq = factory
                 .buildMeterFeaturesStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(mfreq));
+        channel.writeAndFlush(Collections.singletonList(mfreq));
     }
 
     private void sendHandshakeOFPortDescRequest() throws IOException {
-        log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.getRemoteAddress());
+        log.debug("Sending OF_PORT_DESC_REQUEST to {}", channel.remoteAddress());
         // Get port description for 1.3+ switch
-        OFPortDescStatsRequest preq = OFFactories.getFactory(ofVersion)
+        OFPortDescStatsRequest preq = factory
                 .buildPortDescStatsRequest()
                 .setXid(handshakeTransactionIds--)
                 .build();
-        channel.write(Collections.singletonList(preq));
+        channel.writeAndFlush(Collections.singletonList(preq));
     }
 
     ChannelState getStateForTesting() {
         return state;
     }
 
+
+    @Override
+    public boolean isActive() {
+        if (channel != null) {
+            return channel.isActive();
+        }
+        return false;
+    }
+
+    @Override
+    public void closeSession() {
+        if (channel != null) {
+            channel.close();
+        }
+    }
+
+    @Override
+    public boolean sendMsg(Iterable<OFMessage> msgs) {
+        if (channel.isActive()) {
+            channel.writeAndFlush(msgs, channel.voidPromise());
+            return true;
+        } else {
+            log.warn("Dropping messages for switch {} because channel is not connected: {}",
+                     getSwitchInfoString(), msgs);
+            return false;
+        }
+    }
+
+    @Override
+    public CharSequence sessionInfo() {
+        return channelId;
+    }
+
 }