Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.
Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 51f8dd3..1697539 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -20,9 +20,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
-import java.util.Arrays;
import java.util.List;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +36,15 @@
private final NettyMessagingService messagingService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
+ private long messageId;
+ private Version ipVersion;
+ private IpAddress senderIp;
+ private int senderPort;
private int contentLength;
+ private long messageType;
public MessageDecoder(NettyMessagingService messagingService) {
- super(DecoderState.READ_HEADER_VERSION);
+ super(DecoderState.READ_MESSAGE_ID);
this.messagingService = messagingService;
}
@@ -51,27 +55,37 @@
List<Object> out) throws Exception {
switch (state()) {
- case READ_HEADER_VERSION:
- int headerVersion = buffer.readInt();
- checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
- checkpoint(DecoderState.READ_PREAMBLE);
- case READ_PREAMBLE:
- byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
- buffer.readBytes(preamble);
- checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+ case READ_MESSAGE_ID:
+ messageId = buffer.readLong();
+ checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+ case READ_SENDER_IP_VERSION:
+ ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+ checkpoint(DecoderState.READ_SENDER_IP);
+ case READ_SENDER_IP:
+ byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octects);
+ senderIp = IpAddress.valueOf(ipVersion, octects);
+ checkpoint(DecoderState.READ_SENDER_PORT);
+ case READ_SENDER_PORT:
+ senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE);
+ case READ_MESSAGE_TYPE:
+ messageType = buffer.readLong();
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_SERIALIZER_VERSION);
- case READ_SERIALIZER_VERSION:
- int serializerVersion = buffer.readInt();
- checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
+ byte[] payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ InternalMessage message = new InternalMessage(
+ messageId,
+ new Endpoint(senderIp, senderPort),
+ messageType,
+ payload);
message.setMessagingService(messagingService);
out.add(message);
- checkpoint(DecoderState.READ_HEADER_VERSION);
+ checkpoint(DecoderState.READ_MESSAGE_ID);
break;
default:
checkState(false, "Must not be here");