netty4 OpenFlow southbound

- separate I/O thread and message dispatch threads

Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
new file mode 100644
index 0000000..a786211
--- /dev/null
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow;
+
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
+/**
+ * Dummy Channel for testing.
+ */
+public class ChannelAdapter implements Channel {
+
+    @Override
+    public <T> Attribute<T> attr(AttributeKey<T> key) {
+
+        return null;
+    }
+
+    @Override
+    public int compareTo(Channel o) {
+        return 0;
+    }
+
+    @Override
+    public EventLoop eventLoop() {
+        return null;
+    }
+
+    @Override
+    public Channel parent() {
+        return null;
+    }
+
+    @Override
+    public ChannelConfig config() {
+        return null;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return false;
+    }
+
+    @Override
+    public boolean isRegistered() {
+        return false;
+    }
+
+    @Override
+    public boolean isActive() {
+        return false;
+    }
+
+    @Override
+    public ChannelMetadata metadata() {
+        return null;
+    }
+
+    @Override
+    public SocketAddress localAddress() {
+        return null;
+    }
+
+    @Override
+    public SocketAddress remoteAddress() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture closeFuture() {
+        return null;
+    }
+
+    @Override
+    public boolean isWritable() {
+        return false;
+    }
+
+    @Override
+    public Unsafe unsafe() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline pipeline() {
+        return null;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable cause) {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress,
+                              ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public Channel read() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public Channel flush() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public <T> boolean hasAttr(AttributeKey<T> key) {
+
+        return false;
+    }
+
+    @Override
+    public ChannelId id() {
+
+        return null;
+    }
+
+    @Override
+    public long bytesBeforeUnwritable() {
+
+        return 0;
+    }
+
+    @Override
+    public long bytesBeforeWritable() {
+
+        return 0;
+    }
+
+}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
index 4b5afb7..2fcc6b9 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
@@ -15,63 +15,272 @@
  */
 package org.onosproject.openflow;
 
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
 
 /**
  * Adapter for testing against a netty channel handler context.
  */
 public class ChannelHandlerContextAdapter implements ChannelHandlerContext {
+
     @Override
-    public Channel getChannel() {
+    public <T> Attribute<T> attr(AttributeKey<T> key) {
+
         return null;
     }
 
     @Override
-    public ChannelPipeline getPipeline() {
+    public Channel channel() {
+
+        return new ChannelAdapter();
+    }
+
+    @Override
+    public EventExecutor executor() {
+
         return null;
     }
 
     @Override
-    public String getName() {
+    public String name() {
+
         return null;
     }
 
     @Override
-    public ChannelHandler getHandler() {
+    public ChannelHandler handler() {
+
         return null;
     }
 
     @Override
-    public boolean canHandleUpstream() {
+    public boolean isRemoved() {
+
         return false;
     }
 
     @Override
-    public boolean canHandleDownstream() {
-        return false;
-    }
+    public ChannelHandlerContext fireChannelRegistered() {
 
-    @Override
-    public void sendUpstream(ChannelEvent channelEvent) {
-
-    }
-
-    @Override
-    public void sendDownstream(ChannelEvent channelEvent) {
-
-    }
-
-    @Override
-    public Object getAttachment() {
         return null;
     }
 
     @Override
-    public void setAttachment(Object o) {
+    public ChannelHandlerContext fireChannelUnregistered() {
 
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelActive() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelInactive() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireUserEventTriggered(Object event) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelRead(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelReadComplete() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelWritabilityChanged() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress,
+                              ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress,
+                                 SocketAddress localAddress,
+                                 ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext read() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext flush() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline pipeline() {
+
+        return null;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable cause) {
+
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+
+        return null;
+    }
+
+    @Override
+    public <T> boolean hasAttr(AttributeKey<T> key) {
+        return false;
     }
 }
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
index 74751db..0083038 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
@@ -15,11 +15,11 @@
  */
 package org.onosproject.openflow;
 
-import org.jboss.netty.channel.Channel;
 import org.onosproject.net.Device;
 import org.onosproject.net.driver.DriverData;
 import org.onosproject.net.driver.DriverHandler;
 import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
 import org.onosproject.openflow.controller.RoleState;
 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
 import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
@@ -147,7 +147,7 @@
     }
 
     @Override
-    public void setChannel(Channel channel) {
+    public void setChannel(OpenFlowSession channel) {
 
     }
 
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
index 6c109ae..f2617cd 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
@@ -16,38 +16,47 @@
 package org.onosproject.openflow.controller.impl;
 
 
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
-import org.onosproject.core.netty.ChannelAdapter;
+import org.onosproject.openflow.ChannelAdapter;
 import org.onosproject.openflow.ChannelHandlerContextAdapter;
 import org.projectfloodlight.openflow.protocol.OFHello;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.is;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Tests for the OpenFlow message decoder.
  */
 public class OFMessageDecoderTest {
 
-    static class ConnectedChannel extends ChannelAdapter {
-        @Override
-        public boolean isConnected() {
-            return true;
-        }
-    }
+    private ByteBuf buf;
 
-    private ChannelBuffer getHelloMessageBuffer() {
+    private ByteBuf getHelloMessageBuffer() {
         // OFHello, OF version 1, xid of 0, total of 8 bytes
         byte[] messageData = {0x1, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0};
-        ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
-        channelBuffer.writeBytes(messageData);
-        return channelBuffer;
+        buf.writeBytes(messageData);
+        return buf;
     }
 
+    @Before
+    public void setUp() {
+        buf = ByteBufAllocator.DEFAULT.buffer();
+    }
+
+    @After
+    public void tearDown() {
+        buf.release();
+    }
+
+
     /**
      * Tests decoding a message on a closed channel.
      *
@@ -55,13 +64,13 @@
      */
     @Test
     public void testDecodeNoChannel() throws Exception {
-        OFMessageDecoder decoder = new OFMessageDecoder();
-        ChannelBuffer channelBuffer = getHelloMessageBuffer();
-        Object message =
-                decoder.decode(new ChannelHandlerContextAdapter(),
-                               new ChannelAdapter(),
-                               channelBuffer);
-        assertThat(message, nullValue());
+        OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+        ByteBuf channelBuffer = getHelloMessageBuffer();
+        List<Object> out = new ArrayList<>();
+        decoder.decode(new ChannelHandlerContextAdapter(),
+                       channelBuffer,
+                       out);
+        assertThat(out.size(), is(0));
     }
 
     /**
@@ -71,14 +80,29 @@
      */
     @Test
     public void testDecode() throws Exception {
-        OFMessageDecoder decoder = new OFMessageDecoder();
-        ChannelBuffer channelBuffer = getHelloMessageBuffer();
-        Object message =
-                decoder.decode(new ChannelHandlerContextAdapter(),
-                               new ConnectedChannel(),
-                               channelBuffer);
-        assertThat(message, notNullValue());
-        assertThat(message, instanceOf(OFHello.class));
+        OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+        ByteBuf channelBuffer = getHelloMessageBuffer();
+        List<Object> out = new ArrayList<>();
+        decoder.decode(new ActiveChannelHandlerContextAdapter(),
+                       channelBuffer,
+                       out);
+        assertThat(out.size(), is(1));
+        assertThat(out.get(0), instanceOf(OFHello.class));
+    }
+
+    public class ActiveChannelHandlerContextAdapter
+            extends ChannelHandlerContextAdapter {
+
+        @Override
+        public Channel channel() {
+            return new ChannelAdapter() {
+                @Override
+                public boolean isActive() {
+                    return true;
+                }
+            };
+        }
+
     }
 
 }
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
index e62345c..636c568 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
@@ -16,26 +16,27 @@
 package org.onosproject.openflow.controller.impl;
 
 import java.nio.charset.StandardCharsets;
-import java.util.List;
+import java.util.Collections;
 
 import io.netty.buffer.ByteBuf;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBufAllocator;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.openflow.OfMessageAdapter;
-import org.projectfloodlight.openflow.protocol.OFMessage;
 import org.projectfloodlight.openflow.protocol.OFType;
 
-import com.google.common.collect.ImmutableList;
-
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 
 /**
  * Tests for the OpenFlow message encoder.
  */
 public class OFMessageEncoderTest {
 
+    private ByteBuf buf;
     static class MockOfMessage extends OfMessageAdapter {
         static int nextId = 1;
         final int id;
@@ -52,40 +53,26 @@
         }
     }
 
-    /**
-     * Tests that encoding a non-list returns the object specified.
-     *
-     * @throws Exception on exception in the encoder
-     */
-    @Test
-    public void testNoList() throws Exception {
-        OFMessageEncoder encoder = new OFMessageEncoder();
-        MockOfMessage message = new MockOfMessage();
-        OFMessage returnedMessage =
-                (OFMessage) encoder.encode(null, null, message);
-        assertThat(message, is(returnedMessage));
+    @Before
+    public void setUp() {
+        buf = ByteBufAllocator.DEFAULT.buffer();
     }
 
-    /**
-     * Tests that encoding a list returns the proper encoded payload.
-     *
-     * @throws Exception on exception in the encoder
-     */
+    @After
+    public void tearDown() {
+        buf.release();
+    }
+
     @Test
-    public void testList() throws Exception {
-        OFMessageEncoder encoder = new OFMessageEncoder();
+    public void testEncode() throws Exception {
+        OFMessageEncoder encoder = OFMessageEncoder.getInstance();
         MockOfMessage message1 = new MockOfMessage();
-        MockOfMessage message2 = new MockOfMessage();
-        MockOfMessage message3 = new MockOfMessage();
-        List<MockOfMessage> messages = ImmutableList.of(message1, message2, message3);
-        ChannelBuffer returnedChannel =
-                (ChannelBuffer) encoder.encode(null, null, messages);
-        assertThat(returnedChannel, notNullValue());
-        byte[] channelBytes = returnedChannel.array();
-        String expectedListMessage = "message1 message2 message3 ";
-        String listMessage =
-                (new String(channelBytes, StandardCharsets.UTF_8))
-                        .substring(0, expectedListMessage.length());
-        assertThat(listMessage, is(expectedListMessage));
+        encoder.encode(null, Collections.singletonList(message1), buf);
+
+        assertThat(buf.isReadable(), Matchers.is(true));
+        byte[] channelBytes = new byte[buf.readableBytes()];
+        buf.readBytes(channelBytes);
+        String expectedListMessage = "message1 ";
+        assertThat(channelBytes, is(expectedListMessage.getBytes()));
     }
 }