Fixed issue with recieving side not checking readability of channel before reading
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
index 313a448..ddfeeb6 100644
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -9,7 +9,7 @@
@Override
public void handle(Message message) throws IOException {
- System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
+ System.out.println("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 8681093..1482db4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -8,6 +8,14 @@
private final int port;
private final String host;
+ /**
+ * Used for serialization.
+ */
+ private Endpoint() {
+ port = 0;
+ host = null;
+ }
+
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index bcf6f52..96cbe79 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -35,6 +35,10 @@
return payload;
}
+ protected void setMessagingService(NettyMessagingService messagingService) {
+ this.messagingService = messagingService;
+ }
+
@Override
public void respond(Object data) throws IOException {
Builder builder = new Builder(messagingService);
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 73c01a0..c07d289 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -1,9 +1,8 @@
package org.onlab.netty;
import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -12,8 +11,6 @@
*/
public class KryoSerializer implements Serializer {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private KryoPool serializerPool;
public KryoSerializer() {
@@ -28,7 +25,9 @@
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
- ArrayList.class
+ ArrayList.class,
+ InternalMessage.class,
+ Endpoint.class
)
.build()
.populate(1);
@@ -36,7 +35,7 @@
@Override
- public Object decode(byte[] data) {
+ public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
@@ -44,4 +43,14 @@
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
+
+ @Override
+ public <T> T deserialize(ByteBuffer buffer) {
+ return serializerPool.deserialize(buffer);
+ }
+
+ @Override
+ public void serialize(Object obj, ByteBuffer buffer) {
+ serializerPool.serialize(obj, buffer);
+ }
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index ecf2d62..7997ef0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -1,18 +1,15 @@
package org.onlab.netty;
+import static com.google.common.base.Preconditions.checkState;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
import java.util.Arrays;
import java.util.List;
-import static com.google.common.base.Preconditions.checkState;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-/**
- * Decode bytes into a InternalMessage.
- */
-public class MessageDecoder extends ByteToMessageDecoder {
+// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
+public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
@@ -23,36 +20,21 @@
}
@Override
- protected void decode(ChannelHandlerContext context, ByteBuf in,
- List<Object> messages) throws Exception {
+ protected void decode(
+ ChannelHandlerContext context,
+ ByteBuf buffer,
+ List<Object> out) throws Exception {
- byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
+ byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
+ buffer.readBytes(preamble);
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
- // read message Id.
- long id = in.readLong();
+ int bodySize = buffer.readInt();
+ byte[] body = new byte[bodySize];
+ buffer.readBytes(body);
- // read message type; first read size and then bytes.
- String type = new String(in.readBytes(in.readInt()).array());
-
- // read sender host name; first read size and then bytes.
- String host = new String(in.readBytes(in.readInt()).array());
-
- // read sender port.
- int port = in.readInt();
-
- Endpoint sender = new Endpoint(host, port);
-
- // read message payload; first read size and then bytes.
- Object payload = serializer.decode(in.readBytes(in.readInt()).array());
-
- InternalMessage message = new InternalMessage.Builder(messagingService)
- .withId(id)
- .withSender(sender)
- .withType(type)
- .withPayload(payload)
- .build();
-
- messages.add(message);
+ InternalMessage message = serializer.decode(body);
+ message.setMessagingService(messagingService);
+ out.add(message);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 1b52a0f..63c1dbc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -19,42 +19,20 @@
}
@Override
- protected void encode(ChannelHandlerContext context, InternalMessage message,
+ protected void encode(
+ ChannelHandlerContext context,
+ InternalMessage message,
ByteBuf out) throws Exception {
// write preamble
out.writeBytes(PREAMBLE);
- // write id
- out.writeLong(message.id());
+ byte[] payload = serializer.encode(message);
- // write type length
- out.writeInt(message.type().length());
-
- // write type
- out.writeBytes(message.type().getBytes());
-
- // write sender host name size
- out.writeInt(message.sender().host().length());
-
- // write sender host name.
- out.writeBytes(message.sender().host().getBytes());
-
- // write port
- out.writeInt(message.sender().port());
-
- try {
- serializer.encode(message.payload());
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- byte[] payload = serializer.encode(message.payload());
-
- // write payload length.
+ // write payload length
out.writeInt(payload.length);
- // write payload bytes
+ // write payload.
out.writeBytes(payload);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 54da8cc..4a755cc 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -22,7 +22,6 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
-import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
@@ -38,8 +37,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private KeyedObjectPool<Endpoint, Channel> channels =
- new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ private GenericKeyedObjectPool<Endpoint, Channel> channels;
+
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
@@ -66,6 +65,9 @@
}
public void activate() throws Exception {
+ channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ channels.setTestOnBorrow(true);
+ channels.setTestOnReturn(true);
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
@@ -95,17 +97,14 @@
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
Channel channel = null;
try {
- channel = channels.borrowObject(ep);
- channel.eventLoop().execute(new WriteTask(channel, message));
+ try {
+ channel = channels.borrowObject(ep);
+ channel.eventLoop().execute(new WriteTask(channel, message));
+ } finally {
+ channels.returnObject(ep, channel);
+ }
} catch (Exception e) {
throw new IOException(e);
- } finally {
- try {
- channels.returnObject(ep, channel);
- } catch (Exception e) {
- log.warn("Error returning object back to the pool", e);
- // ignored.
- }
}
}
@@ -141,6 +140,8 @@
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
@@ -169,6 +170,8 @@
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
+ b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
@@ -197,20 +200,20 @@
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast(new MessageEncoder(serializer))
- .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
- .addLast(new NettyMessagingService.InboundMessageDispatcher());
+ .addLast("encoder", new MessageEncoder(serializer))
+ .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+ .addLast("handler", new InboundMessageDispatcher());
}
}
private class WriteTask implements Runnable {
- private final Object message;
+ private final InternalMessage message;
private final Channel channel;
- public WriteTask(Channel channel, Object message) {
- this.message = message;
+ public WriteTask(Channel channel, InternalMessage message) {
this.channel = channel;
+ this.message = message;
}
@Override
@@ -240,5 +243,11 @@
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
handler.handle(message);
}
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
+ context.close();
+ }
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
index ac55f5a..56494b2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -1,5 +1,7 @@
package org.onlab.netty;
+import java.nio.ByteBuffer;
+
/**
* Interface for encoding/decoding message payloads.
*/
@@ -11,7 +13,7 @@
* @param data byte array.
* @return POJO
*/
- Object decode(byte[] data);
+ public <T> T decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
@@ -19,6 +21,23 @@
* @param data POJO to be encoded
* @return byte array.
*/
- byte[] encode(Object message);
+ public byte[] encode(Object data);
+ /**
+ * Serializes the specified object into bytes using one of the
+ * pre-registered serializers.
+ *
+ * @param obj object to be serialized
+ * @param buffer to write serialized bytes
+ */
+ public void serialize(final Object obj, ByteBuffer buffer);
+
+ /**
+ * Deserializes the specified bytes into an object using one of the
+ * pre-registered serializers.
+ *
+ * @param buffer bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T deserialize(final ByteBuffer buffer);
}