netty4 OpenFlow southbound
- separate I/O thread and message dispatch threads
Change-Id: I11a10a47de451a9e3063b62f9450be19c3a9dae7
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
new file mode 100644
index 0000000..a786211
--- /dev/null
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelAdapter.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openflow;
+
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+
+/**
+ * Dummy Channel for testing.
+ */
+public class ChannelAdapter implements Channel {
+
+ @Override
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+
+ return null;
+ }
+
+ @Override
+ public int compareTo(Channel o) {
+ return 0;
+ }
+
+ @Override
+ public EventLoop eventLoop() {
+ return null;
+ }
+
+ @Override
+ public Channel parent() {
+ return null;
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return null;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public boolean isActive() {
+ return false;
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return null;
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture closeFuture() {
+ return null;
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ @Override
+ public Unsafe unsafe() {
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public Channel read() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public Channel flush() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+
+ return false;
+ }
+
+ @Override
+ public ChannelId id() {
+
+ return null;
+ }
+
+ @Override
+ public long bytesBeforeUnwritable() {
+
+ return 0;
+ }
+
+ @Override
+ public long bytesBeforeWritable() {
+
+ return 0;
+ }
+
+}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
index 4b5afb7..2fcc6b9 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/ChannelHandlerContextAdapter.java
@@ -15,63 +15,272 @@
*/
package org.onosproject.openflow;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
+import java.net.SocketAddress;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutor;
/**
* Adapter for testing against a netty channel handler context.
*/
public class ChannelHandlerContextAdapter implements ChannelHandlerContext {
+
@Override
- public Channel getChannel() {
+ public <T> Attribute<T> attr(AttributeKey<T> key) {
+
return null;
}
@Override
- public ChannelPipeline getPipeline() {
+ public Channel channel() {
+
+ return new ChannelAdapter();
+ }
+
+ @Override
+ public EventExecutor executor() {
+
return null;
}
@Override
- public String getName() {
+ public String name() {
+
return null;
}
@Override
- public ChannelHandler getHandler() {
+ public ChannelHandler handler() {
+
return null;
}
@Override
- public boolean canHandleUpstream() {
+ public boolean isRemoved() {
+
return false;
}
@Override
- public boolean canHandleDownstream() {
- return false;
- }
+ public ChannelHandlerContext fireChannelRegistered() {
- @Override
- public void sendUpstream(ChannelEvent channelEvent) {
-
- }
-
- @Override
- public void sendDownstream(ChannelEvent channelEvent) {
-
- }
-
- @Override
- public Object getAttachment() {
return null;
}
@Override
- public void setAttachment(Object o) {
+ public ChannelHandlerContext fireChannelUnregistered() {
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelActive() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelInactive() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireUserEventTriggered(Object event) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelRead(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelReadComplete() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext fireChannelWritabilityChanged() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture bind(SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture connect(SocketAddress remoteAddress,
+ SocketAddress localAddress,
+ ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture disconnect(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture close(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture deregister(ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext read() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture write(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelHandlerContext flush() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture writeAndFlush(Object msg) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPipeline pipeline() {
+
+ return null;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPromise newPromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelProgressivePromise newProgressivePromise() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newSucceededFuture() {
+
+ return null;
+ }
+
+ @Override
+ public ChannelFuture newFailedFuture(Throwable cause) {
+
+ return null;
+ }
+
+ @Override
+ public ChannelPromise voidPromise() {
+
+ return null;
+ }
+
+ @Override
+ public <T> boolean hasAttr(AttributeKey<T> key) {
+ return false;
}
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
index 74751db..0083038 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/OpenflowSwitchDriverAdapter.java
@@ -15,11 +15,11 @@
*/
package org.onosproject.openflow;
-import org.jboss.netty.channel.Channel;
import org.onosproject.net.Device;
import org.onosproject.net.driver.DriverData;
import org.onosproject.net.driver.DriverHandler;
import org.onosproject.openflow.controller.Dpid;
+import org.onosproject.openflow.controller.OpenFlowSession;
import org.onosproject.openflow.controller.RoleState;
import org.onosproject.openflow.controller.driver.OpenFlowAgent;
import org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver;
@@ -147,7 +147,7 @@
}
@Override
- public void setChannel(Channel channel) {
+ public void setChannel(OpenFlowSession channel) {
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
index 6c109ae..f2617cd 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageDecoderTest.java
@@ -16,38 +16,47 @@
package org.onosproject.openflow.controller.impl;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.onosproject.core.netty.ChannelAdapter;
+import org.onosproject.openflow.ChannelAdapter;
import org.onosproject.openflow.ChannelHandlerContextAdapter;
import org.projectfloodlight.openflow.protocol.OFHello;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.is;
+import java.util.ArrayList;
+import java.util.List;
/**
* Tests for the OpenFlow message decoder.
*/
public class OFMessageDecoderTest {
- static class ConnectedChannel extends ChannelAdapter {
- @Override
- public boolean isConnected() {
- return true;
- }
- }
+ private ByteBuf buf;
- private ChannelBuffer getHelloMessageBuffer() {
+ private ByteBuf getHelloMessageBuffer() {
// OFHello, OF version 1, xid of 0, total of 8 bytes
byte[] messageData = {0x1, 0x0, 0x0, 0x8, 0x0, 0x0, 0x0, 0x0};
- ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
- channelBuffer.writeBytes(messageData);
- return channelBuffer;
+ buf.writeBytes(messageData);
+ return buf;
}
+ @Before
+ public void setUp() {
+ buf = ByteBufAllocator.DEFAULT.buffer();
+ }
+
+ @After
+ public void tearDown() {
+ buf.release();
+ }
+
+
/**
* Tests decoding a message on a closed channel.
*
@@ -55,13 +64,13 @@
*/
@Test
public void testDecodeNoChannel() throws Exception {
- OFMessageDecoder decoder = new OFMessageDecoder();
- ChannelBuffer channelBuffer = getHelloMessageBuffer();
- Object message =
- decoder.decode(new ChannelHandlerContextAdapter(),
- new ChannelAdapter(),
- channelBuffer);
- assertThat(message, nullValue());
+ OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+ ByteBuf channelBuffer = getHelloMessageBuffer();
+ List<Object> out = new ArrayList<>();
+ decoder.decode(new ChannelHandlerContextAdapter(),
+ channelBuffer,
+ out);
+ assertThat(out.size(), is(0));
}
/**
@@ -71,14 +80,29 @@
*/
@Test
public void testDecode() throws Exception {
- OFMessageDecoder decoder = new OFMessageDecoder();
- ChannelBuffer channelBuffer = getHelloMessageBuffer();
- Object message =
- decoder.decode(new ChannelHandlerContextAdapter(),
- new ConnectedChannel(),
- channelBuffer);
- assertThat(message, notNullValue());
- assertThat(message, instanceOf(OFHello.class));
+ OFMessageDecoder decoder = OFMessageDecoder.getInstance();
+ ByteBuf channelBuffer = getHelloMessageBuffer();
+ List<Object> out = new ArrayList<>();
+ decoder.decode(new ActiveChannelHandlerContextAdapter(),
+ channelBuffer,
+ out);
+ assertThat(out.size(), is(1));
+ assertThat(out.get(0), instanceOf(OFHello.class));
+ }
+
+ public class ActiveChannelHandlerContextAdapter
+ extends ChannelHandlerContextAdapter {
+
+ @Override
+ public Channel channel() {
+ return new ChannelAdapter() {
+ @Override
+ public boolean isActive() {
+ return true;
+ }
+ };
+ }
+
}
}
diff --git a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
index e62345c..636c568 100644
--- a/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
+++ b/protocols/openflow/ctl/src/test/java/org/onosproject/openflow/controller/impl/OFMessageEncoderTest.java
@@ -16,26 +16,27 @@
package org.onosproject.openflow.controller.impl;
import java.nio.charset.StandardCharsets;
-import java.util.List;
+import java.util.Collections;
import io.netty.buffer.ByteBuf;
-import org.jboss.netty.buffer.ChannelBuffer;
+import io.netty.buffer.ByteBufAllocator;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.OfMessageAdapter;
-import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFType;
-import com.google.common.collect.ImmutableList;
-
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
/**
* Tests for the OpenFlow message encoder.
*/
public class OFMessageEncoderTest {
+ private ByteBuf buf;
static class MockOfMessage extends OfMessageAdapter {
static int nextId = 1;
final int id;
@@ -52,40 +53,26 @@
}
}
- /**
- * Tests that encoding a non-list returns the object specified.
- *
- * @throws Exception on exception in the encoder
- */
- @Test
- public void testNoList() throws Exception {
- OFMessageEncoder encoder = new OFMessageEncoder();
- MockOfMessage message = new MockOfMessage();
- OFMessage returnedMessage =
- (OFMessage) encoder.encode(null, null, message);
- assertThat(message, is(returnedMessage));
+ @Before
+ public void setUp() {
+ buf = ByteBufAllocator.DEFAULT.buffer();
}
- /**
- * Tests that encoding a list returns the proper encoded payload.
- *
- * @throws Exception on exception in the encoder
- */
+ @After
+ public void tearDown() {
+ buf.release();
+ }
+
@Test
- public void testList() throws Exception {
- OFMessageEncoder encoder = new OFMessageEncoder();
+ public void testEncode() throws Exception {
+ OFMessageEncoder encoder = OFMessageEncoder.getInstance();
MockOfMessage message1 = new MockOfMessage();
- MockOfMessage message2 = new MockOfMessage();
- MockOfMessage message3 = new MockOfMessage();
- List<MockOfMessage> messages = ImmutableList.of(message1, message2, message3);
- ChannelBuffer returnedChannel =
- (ChannelBuffer) encoder.encode(null, null, messages);
- assertThat(returnedChannel, notNullValue());
- byte[] channelBytes = returnedChannel.array();
- String expectedListMessage = "message1 message2 message3 ";
- String listMessage =
- (new String(channelBytes, StandardCharsets.UTF_8))
- .substring(0, expectedListMessage.length());
- assertThat(listMessage, is(expectedListMessage));
+ encoder.encode(null, Collections.singletonList(message1), buf);
+
+ assertThat(buf.isReadable(), Matchers.is(true));
+ byte[] channelBytes = new byte[buf.readableBytes()];
+ buf.readBytes(channelBytes);
+ String expectedListMessage = "message1 ";
+ assertThat(channelBytes, is(expectedListMessage.getBytes()));
}
}