netty4 OpenFlow southbound
- separate I/O thread and message dispatch threads
Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
index 727301f..4dd8009 100644
--- a/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
+++ b/protocols/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OFMessageEncoder.java
@@ -16,49 +16,81 @@
package org.onosproject.openflow.controller.impl;
-import java.util.List;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.EncoderException;
+import static org.slf4j.LoggerFactory.getLogger;
+
import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.slf4j.Logger;
/**
- * Encode an openflow message for output into a ChannelBuffer, for use in a
+ * Encode an openflow message for output into a netty channel, for use in a
* netty pipeline.
*/
-public class OFMessageEncoder extends OneToOneEncoder {
+@Sharable
+public final class OFMessageEncoder extends ChannelOutboundHandlerAdapter {
+ private static final Logger log = getLogger(OFMessageEncoder.class);
+
+ private static final OFMessageEncoder INSTANCE = new OFMessageEncoder();
+
+ public static OFMessageEncoder getInstance() {
+ return INSTANCE;
+ }
+
+ private OFMessageEncoder() {}
+
+ protected final void encode(ChannelHandlerContext ctx,
+ Iterable<OFMessage> msgs,
+ ByteBuf out) throws Exception {
+
+ msgs.forEach(msg -> msg.writeTo(out));
+ }
+
+ // MessageToByteEncoder without dependency to TypeParameterMatcher
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel,
- Object msg) throws Exception {
- if (!(msg instanceof List)) {
- return msg;
- }
+ public void write(ChannelHandlerContext ctx,
+ Object msg,
+ ChannelPromise promise) throws Exception {
- @SuppressWarnings("unchecked")
- List<OFMessage> msglist = (List<OFMessage>) msg;
- /* XXX S can't get length of OFMessage in loxigen's openflowj??
- int size = 0;
- for (OFMessage ofm : msglist) {
- size += ofm.getLengthU();
- }*/
+ ByteBuf buf = null;
+ try {
+ if (msg instanceof Iterable) {
+ @SuppressWarnings("unchecked")
+ Iterable<OFMessage> ofmsgs = (Iterable<OFMessage>) msg;
+ buf = ctx.alloc().ioBuffer();
- ByteBuf bb = Unpooled.buffer();
+ encode(ctx, ofmsgs, buf);
- for (OFMessage ofm : msglist) {
- if (ofm != null) {
- ofm.writeTo(bb);
+ if (buf.isReadable()) {
+ ctx.write(buf, promise);
+ } else {
+ log.warn("NOTHING WAS WRITTEN for {}", msg);
+ buf.release();
+ ctx.write(Unpooled.EMPTY_BUFFER, promise);
+ }
+ buf = null;
+
+ } else {
+ log.warn("Attempted to encode unexpected message: {}", msg);
+ ctx.write(msg, promise);
+ }
+ } catch (EncoderException e) {
+ log.error("EncoderException handling {}", msg, e);
+ throw e;
+ } catch (Throwable e) {
+ log.error("Exception handling {}", msg, e);
+ throw new EncoderException(e);
+ } finally {
+ if (buf != null) {
+ buf.release();
}
}
-
- ChannelBuffer buf = ChannelBuffers.wrappedBuffer(bb.nioBuffer());
-
- return buf;
}
}