netty4 OpenFlow southbound

- separate I/O thread and message dispatch threads

Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
index 727301f..4dd8009 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
@@ -16,49 +16,81 @@
 
 package org.onosproject.openflow.controller.impl;
 
-import java.util.List;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.EncoderException;
+import static org.slf4j.LoggerFactory.getLogger;
+
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
 
 /**
- * Encode an openflow message for output into a ChannelBuffer, for use in a
+ * Encode an openflow message for output into a netty channel, for use in a
  * netty pipeline.
  */
-public class OFMessageEncoder extends OneToOneEncoder {
+@Sharable
+public final class OFMessageEncoder extends ChannelOutboundHandlerAdapter {
 
+    private static final Logger log = getLogger(OFMessageEncoder.class);
+
+    private static final OFMessageEncoder INSTANCE = new OFMessageEncoder();
+
+    public static OFMessageEncoder getInstance() {
+        return INSTANCE;
+    }
+
+    private OFMessageEncoder() {}
+
+    protected final void encode(ChannelHandlerContext ctx,
+                          Iterable<OFMessage> msgs,
+                          ByteBuf out) throws Exception {
+
+        msgs.forEach(msg -> msg.writeTo(out));
+    }
+
+    // MessageToByteEncoder without dependency to TypeParameterMatcher
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel,
-                            Object msg) throws Exception {
-        if (!(msg instanceof List)) {
-            return msg;
-        }
+    public void write(ChannelHandlerContext ctx,
+                      Object msg,
+                      ChannelPromise promise) throws Exception {
 
-        @SuppressWarnings("unchecked")
-        List<OFMessage> msglist = (List<OFMessage>) msg;
-        /* XXX S can't get length of OFMessage in loxigen's openflowj??
-        int size = 0;
-        for (OFMessage ofm : msglist) {
-            size += ofm.getLengthU();
-        }*/
+        ByteBuf buf = null;
+        try {
+            if (msg instanceof Iterable) {
+                @SuppressWarnings("unchecked")
+                Iterable<OFMessage> ofmsgs =  (Iterable<OFMessage>) msg;
+                buf = ctx.alloc().ioBuffer();
 
-        ByteBuf bb = Unpooled.buffer();
+                encode(ctx, ofmsgs, buf);
 
-        for (OFMessage ofm : msglist) {
-            if (ofm != null) {
-                ofm.writeTo(bb);
+                if (buf.isReadable()) {
+                    ctx.write(buf, promise);
+                } else {
+                    log.warn("NOTHING WAS WRITTEN for {}", msg);
+                    buf.release();
+                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
+                }
+                buf = null;
+
+            } else {
+                log.warn("Attempted to encode unexpected message: {}", msg);
+                ctx.write(msg, promise);
+            }
+        } catch (EncoderException e) {
+            log.error("EncoderException handling {}", msg, e);
+            throw e;
+        } catch (Throwable e) {
+            log.error("Exception handling {}", msg, e);
+            throw new EncoderException(e);
+        } finally {
+            if (buf != null) {
+                buf.release();
             }
         }
-
-        ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bb.nioBuffer());
-
-        return buf;
     }
 
 }