Simplified how message payloads get serialized/deserialized
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
index f4024a4..1772a3c 100644
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -8,16 +8,15 @@
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
- * @param <T> type of response.
*/
-public class AsyncResponse<T> implements Response<T> {
+public class AsyncResponse implements Response {
- private T value;
+ private byte[] value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
- public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
@@ -43,7 +42,7 @@
}
@Override
- public T get() throws InterruptedException {
+ public byte[] get() throws InterruptedException {
throw new UnsupportedOperationException();
}
@@ -57,11 +56,10 @@
* available.
* @param data response data.
*/
- @SuppressWarnings("unchecked")
- public synchronized void setResponse(Object data) {
+ public synchronized void setResponse(byte[] data) {
if (!done) {
done = true;
- value = (T) data;
+ value = data;
this.notifyAll();
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 367ca91..c535f7b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -13,11 +13,9 @@
private long id;
private Endpoint sender;
private String type;
- private Object payload;
-
+ private byte[] payload;
private transient NettyMessagingService messagingService;
- // TODO: add transient payload serializer or change payload type to
- // byte[], ByteBuffer, etc.
+ public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
// Must be created using the Builder.
private InternalMessage() {}
@@ -35,7 +33,7 @@
}
@Override
- public Object payload() {
+ public byte[] payload() {
return payload;
}
@@ -44,7 +42,7 @@
}
@Override
- public void respond(Object data) throws IOException {
+ public void respond(byte[] data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp.
@@ -81,7 +79,7 @@
message.sender = sender;
return this;
}
- public Builder withPayload(Object payload) {
+ public Builder withPayload(byte[] payload) {
message.payload = payload;
return this;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 4414d05..b8efb51 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -10,7 +10,7 @@
/**
* Kryo Serializer.
*/
-public class KryoSerializer implements PayloadSerializer {
+public class KryoSerializer {
private KryoPool serializerPool;
@@ -28,29 +28,26 @@
HashMap.class,
ArrayList.class,
InternalMessage.class,
- Endpoint.class
+ Endpoint.class,
+ byte[].class
)
.build()
.populate(1);
}
- @Override
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
- @Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
- @Override
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
- @Override
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
index 23c4073..366898b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -12,6 +12,6 @@
@Override
public void handle(Message message) {
- log.info("Received message. Payload: " + message.payload());
+ log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
index 54b9526..87a8bb6 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -12,12 +12,12 @@
* Returns the payload of this message.
* @return message payload.
*/
- public Object payload();
+ public byte[] payload();
/**
- * Sends a reply back to the sender of this messge.
+ * Sends a reply back to the sender of this message.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
- public void respond(Object data) throws IOException;
+ public void respond(byte[] data) throws IOException;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index f199019..d4832e5 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -14,14 +14,14 @@
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
- private final PayloadSerializer payloadSerializer;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer();
private int contentLength;
- public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) {
+ public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
- this.payloadSerializer = payloadSerializer;
}
@Override
@@ -48,7 +48,7 @@
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer());
+ InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 0ee29cb..716efb9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -17,11 +17,7 @@
public static final int SERIALIZER_VERSION = 1;
- private final PayloadSerializer payloadSerializer;
-
- public MessageEncoder(PayloadSerializer payloadSerializer) {
- this.payloadSerializer = payloadSerializer;
- }
+ private static final KryoSerializer SERIALIZER = new KryoSerializer();
@Override
protected void encode(
@@ -35,7 +31,12 @@
// write preamble
out.writeBytes(PREAMBLE);
- byte[] payload = payloadSerializer.encode(message);
+ try {
+ SERIALIZER.encode(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ byte[] payload = SERIALIZER.encode(message);
// write payload length
out.writeInt(payload.length);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index fece742..08676ac 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -11,10 +11,10 @@
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
- * @param payload message payload.
+ * @param payload message payload bytes.
* @throws IOException
*/
- public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+ public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
@@ -24,7 +24,7 @@
* @return a response future
* @throws IOException
*/
- public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+ public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
@@ -38,12 +38,4 @@
* @param type message type
*/
public void unregisterHandler(String type);
-
- // FIXME: remove me and add PayloadSerializer to all other methods
- /**
- * Specify the serializer to use for encoding/decoding payload.
- *
- * @param payloadSerializer payloadSerializer to use
- */
- public void setPayloadSerializer(PayloadSerializer payloadSerializer);
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 051482e..48aeb30 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -43,7 +43,7 @@
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
+ private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
@@ -52,8 +52,6 @@
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
- protected PayloadSerializer payloadSerializer;
-
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
@@ -83,7 +81,7 @@
}
@Override
- public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+ public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withSender(localEp)
@@ -108,9 +106,9 @@
}
@Override
- public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+ public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
- AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+ AsyncResponse futureResponse = new AsyncResponse();
Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
@@ -133,11 +131,6 @@
handlers.remove(type);
}
- @Override
- public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
- this.payloadSerializer = payloadSerializer;
- }
-
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
@@ -202,13 +195,13 @@
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
+ private final ChannelHandler encoder = new MessageEncoder();
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
+ .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
.addLast("handler", dispatcher);
}
}
@@ -237,7 +230,7 @@
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
- AsyncResponse<?> futureResponse =
+ AsyncResponse futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
diff --git a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java b/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
deleted file mode 100644
index 9874543..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/PayloadSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.onlab.netty;
-
-import java.nio.ByteBuffer;
-
-/**
- * Interface for encoding/decoding message payloads.
- */
-public interface PayloadSerializer {
-
- /**
- * Decodes the specified byte array to a POJO.
- *
- * @param data byte array.
- * @return POJO
- */
- public <T> T decode(byte[] data);
-
- /**
- * Encodes the specified POJO into a byte array.
- *
- * @param data POJO to be encoded
- * @return byte array.
- */
- public byte[] encode(Object data);
-
- /**
- * Encodes the specified POJO into a byte buffer.
- *
- * @param data POJO to be encoded
- * @param buffer to write serialized bytes
- */
- public void encode(final Object data, ByteBuffer buffer);
-
- /**
- * Decodes the specified byte buffer to a POJO.
- *
- * @param buffer bytes to be decoded
- * @return POJO
- */
- public <T> T decode(final ByteBuffer buffer);
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
index 04675ce..150755e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Response.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -7,26 +7,24 @@
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
- *
- * @param <T> type of response.
*/
-public interface Response<T> {
+public interface Response {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
- * @return response
+ * @return response payload
* @throws TimeoutException if the timeout expires before the response arrives.
*/
- public T get(long timeout, TimeUnit tu) throws TimeoutException;
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
- * @return response
+ * @return response payload
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
- public T get() throws InterruptedException;
+ public byte[] get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
index 494d410..3869948 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -24,7 +24,7 @@
final int warmup = 100;
for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+ messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
@@ -33,10 +33,10 @@
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
- Response<String> response = messaging
+ Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
- "Hello World");
- System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+ "Hello World".getBytes());
+ System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
@@ -45,8 +45,6 @@
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
- PayloadSerializer payloadSerializer = new KryoSerializer();
- this.payloadSerializer = payloadSerializer;
}
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 84984c1..b8ae5b0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -7,7 +7,6 @@
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
- server.setPayloadSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index 96b877e..36d2a1e 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -2,7 +2,8 @@
import java.util.concurrent.TimeUnit;
-import org.junit.Assert;
+import org.apache.commons.lang3.RandomUtils;
+import static org.junit.Assert.*;
import org.junit.Test;
/**
@@ -17,11 +18,10 @@
try {
pinger.activate();
ponger.activate();
- pinger.setPayloadSerializer(new KryoSerializer());
- ponger.setPayloadSerializer(new KryoSerializer());
ponger.registerHandler("echo", new EchoHandler());
- Response<String> response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello");
- Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS));
+ byte[] payload = RandomUtils.nextBytes(100);
+ Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+ assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();
ponger.deactivate();