Improved message decoding performance
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
index b2b490e..f4024a4 100644
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -17,8 +17,8 @@
private final long start = System.nanoTime();
@Override
- public T get(long timeout, TimeUnit tu) throws TimeoutException {
- timeout = tu.toNanos(timeout);
+ public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
synchronized (this) {
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
index ddfeeb6..6ba4bdf 100644
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -2,14 +2,19 @@
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Message handler that echos the message back to the sender.
*/
public class EchoHandler implements MessageHandler {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
@Override
public void handle(Message message) throws IOException {
- System.out.println("Received message. Echoing it back to the sender.");
+ log.info("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 1482db4..8ed86ae 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -11,6 +11,7 @@
/**
* Used for serialization.
*/
+ @SuppressWarnings("unused")
private Endpoint() {
port = 0;
host = null;
@@ -67,4 +68,4 @@
}
return true;
}
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 96cbe79..e6c027e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -86,4 +86,4 @@
return message;
}
}
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index c07d289..8a90c07 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -53,4 +53,4 @@
public void serialize(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
index ed6cdb4..23c4073 100644
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -1,12 +1,17 @@
package org.onlab.netty;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A MessageHandler that simply logs the information.
*/
public class LoggingHandler implements MessageHandler {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
@Override
public void handle(Message message) {
- System.out.println("Received: " + message.payload());
+ log.info("Received message. Payload: " + message.payload());
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 7997ef0..3ed3216 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,13 +8,18 @@
import java.util.Arrays;
import java.util.List;
-// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
-public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
+/**
+ * Decoder for inbound messages.
+ */
+public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
+ private int contentLength;
+
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+ super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
this.serializer = serializer;
}
@@ -25,16 +30,31 @@
ByteBuf buffer,
List<Object> out) throws Exception {
- byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
- buffer.readBytes(preamble);
- checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
-
- int bodySize = buffer.readInt();
- byte[] body = new byte[bodySize];
- buffer.readBytes(body);
-
- InternalMessage message = serializer.decode(body);
- message.setMessagingService(messagingService);
- out.add(message);
+ switch(state()) {
+ case READ_HEADER_VERSION:
+ int headerVersion = buffer.readInt();
+ checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
+ checkpoint(DecoderState.READ_PREAMBLE);
+ case READ_PREAMBLE:
+ byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
+ buffer.readBytes(preamble);
+ checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+ checkpoint(DecoderState.READ_CONTENT_LENGTH);
+ case READ_CONTENT_LENGTH:
+ contentLength = buffer.readInt();
+ checkpoint(DecoderState.READ_SERIALIZER_VERSION);
+ case READ_SERIALIZER_VERSION:
+ int serializerVersion = buffer.readInt();
+ checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
+ checkpoint(DecoderState.READ_CONTENT);
+ case READ_CONTENT:
+ InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
+ message.setMessagingService(messagingService);
+ out.add(message);
+ checkpoint(DecoderState.READ_HEADER_VERSION);
+ break;
+ default:
+ checkState(false, "Must not be here");
+ }
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 63c1dbc..ad88b62 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -11,6 +11,9 @@
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
+ public static final int HEADER_VERSION = 1;
+ public static final int SERIALIZER_VERSION = 1;
+
private final Serializer serializer;
@@ -24,6 +27,9 @@
InternalMessage message,
ByteBuf out) throws Exception {
+ // write version
+ out.writeInt(HEADER_VERSION);
+
// write preamble
out.writeBytes(PREAMBLE);
@@ -32,6 +38,9 @@
// write payload length
out.writeInt(payload.length);
+ // write serializer version
+ out.writeInt(SERIALIZER_VERSION);
+
// write payload.
out.writeBytes(payload);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 4a755cc..8a609bd 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -134,6 +134,10 @@
handlers.remove(type);
}
+ public void setSerializer(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 12fa025..6a93149 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -4,16 +4,10 @@
private SimpleServer() {}
public static void main(String... args) throws Exception {
- NettyMessagingService server = new TestNettyMessagingService();
+ NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
+ server.setSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
-
- public static class TestNettyMessagingService extends NettyMessagingService {
- protected TestNettyMessagingService() {
- Serializer serializer = new KryoSerializer();
- this.serializer = serializer;
- }
- }
}