Improved message decoding performance
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 7997ef0..3ed3216 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -8,13 +8,18 @@
import java.util.Arrays;
import java.util.List;
-// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
-public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
+/**
+ * Decoder for inbound messages.
+ */
+public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
+ private int contentLength;
+
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+ super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
this.serializer = serializer;
}
@@ -25,16 +30,31 @@
ByteBuf buffer,
List<Object> out) throws Exception {
- byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
- buffer.readBytes(preamble);
- checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
-
- int bodySize = buffer.readInt();
- byte[] body = new byte[bodySize];
- buffer.readBytes(body);
-
- InternalMessage message = serializer.decode(body);
- message.setMessagingService(messagingService);
- out.add(message);
+ switch(state()) {
+ case READ_HEADER_VERSION:
+ int headerVersion = buffer.readInt();
+ checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
+ checkpoint(DecoderState.READ_PREAMBLE);
+ case READ_PREAMBLE:
+ byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
+ buffer.readBytes(preamble);
+ checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+ checkpoint(DecoderState.READ_CONTENT_LENGTH);
+ case READ_CONTENT_LENGTH:
+ contentLength = buffer.readInt();
+ checkpoint(DecoderState.READ_SERIALIZER_VERSION);
+ case READ_SERIALIZER_VERSION:
+ int serializerVersion = buffer.readInt();
+ checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
+ checkpoint(DecoderState.READ_CONTENT);
+ case READ_CONTENT:
+ InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
+ message.setMessagingService(messagingService);
+ out.add(message);
+ checkpoint(DecoderState.READ_HEADER_VERSION);
+ break;
+ default:
+ checkState(false, "Must not be here");
+ }
}
}