[ONOS-6868] Improve Netty message encoder/decoder performance
Change-Id: I6b4e2490fecb15bb20d9a8bb19fede3b53327bc1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index d9ed9c3..f77fa69 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -90,7 +90,6 @@
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +131,7 @@
private Endpoint localEndpoint;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+ private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -296,7 +295,7 @@
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
- InternalMessage message = new InternalMessage(preamble,
+ InternalRequest message = new InternalRequest(preamble,
clockService.timeNow(),
messageIdGenerator.incrementAndGet(),
localEndpoint,
@@ -315,7 +314,7 @@
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
checkPermission(CLUSTER_WRITE);
Long messageId = messageIdGenerator.incrementAndGet();
- InternalMessage message = new InternalMessage(preamble,
+ InternalRequest message = new InternalRequest(preamble,
clockService.timeNow(),
messageId,
localEndpoint,
@@ -444,12 +443,12 @@
checkPermission(CLUSTER_WRITE);
handlers.put(type, (message, connection) -> executor.execute(() -> {
byte[] responsePayload = null;
- Status status = Status.OK;
+ InternalReply.Status status = InternalReply.Status.OK;
try {
responsePayload = handler.apply(message.sender(), message.payload());
} catch (Exception e) {
log.debug("An error occurred in a message handler: {}", e);
- status = Status.ERROR_HANDLER_EXCEPTION;
+ status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
}
connection.reply(message, status, Optional.ofNullable(responsePayload));
}));
@@ -460,12 +459,12 @@
checkPermission(CLUSTER_WRITE);
handlers.put(type, (message, connection) -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- Status status;
+ InternalReply.Status status;
if (error == null) {
- status = Status.OK;
+ status = InternalReply.Status.OK;
} else {
log.debug("An error occurred in a message handler: {}", error);
- status = Status.ERROR_HANDLER_EXCEPTION;
+ status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
}
connection.reply(message, status, Optional.ofNullable(result));
});
@@ -549,7 +548,6 @@
*/
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -565,7 +563,7 @@
serverSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -576,7 +574,6 @@
*/
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -591,7 +588,7 @@
clientSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -602,12 +599,11 @@
*/
private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -628,11 +624,11 @@
if (message.isRequest()) {
RemoteServerConnection connection =
serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
- connection.dispatch(message);
+ connection.dispatch((InternalRequest) message);
} else {
RemoteClientConnection connection =
clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
- connection.dispatch(message);
+ connection.dispatch((InternalReply) message);
}
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
@@ -702,7 +698,7 @@
* @param message the message to send
* @return a completable future to be completed once the message has been sent
*/
- CompletableFuture<Void> sendAsync(InternalMessage message);
+ CompletableFuture<Void> sendAsync(InternalRequest message);
/**
* Sends a message to the other side of the connection, awaiting a reply.
@@ -710,7 +706,7 @@
* @param message the message to send
* @return a completable future to be completed once a reply is received or the request times out
*/
- CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+ CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
/**
* Closes the connection.
@@ -731,7 +727,7 @@
* @param status the reply status
* @param payload the response payload
*/
- void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+ void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
/**
* Closes the connection.
@@ -745,8 +741,8 @@
*/
private final class LocalClientConnection implements ClientConnection {
@Override
- public CompletableFuture<Void> sendAsync(InternalMessage message) {
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ public CompletableFuture<Void> sendAsync(InternalRequest message) {
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, localServerConnection);
} else {
@@ -756,14 +752,15 @@
}
@Override
- public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, new LocalServerConnection(future));
} else {
log.debug("No handler for message type {} from {}", message.type(), message.sender());
- new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ new LocalServerConnection(future)
+ .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
}
return future;
}
@@ -780,15 +777,15 @@
}
@Override
- public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+ public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
if (future != null) {
- if (status == Status.OK) {
+ if (status == InternalReply.Status.OK) {
future.complete(payload.orElse(EMPTY_PAYLOAD));
- } else if (status == Status.ERROR_NO_HANDLER) {
+ } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
future.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+ } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (status == Status.PROTOCOL_EXCEPTION) {
+ } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
future.completeExceptionally(new MessagingException.ProtocolException());
}
}
@@ -844,7 +841,7 @@
}
@Override
- public CompletableFuture<Void> sendAsync(InternalMessage message) {
+ public CompletableFuture<Void> sendAsync(InternalRequest message) {
CompletableFuture<Void> future = new CompletableFuture<>();
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
@@ -857,9 +854,9 @@
}
@Override
- public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
- Callback callback = new Callback(message.type(), future);
+ Callback callback = new Callback(message.subject(), future);
futures.put(message.id(), callback);
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
@@ -875,9 +872,9 @@
*
* @param message the message to dispatch
*/
- private void dispatch(InternalMessage message) {
+ private void dispatch(InternalReply message) {
if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ log.debug("Received {} with invalid preamble", message.type());
return;
}
@@ -885,13 +882,13 @@
Callback callback = futures.remove(message.id());
if (callback != null) {
- if (message.status() == Status.OK) {
+ if (message.status() == InternalReply.Status.OK) {
callback.complete(message.payload());
- } else if (message.status() == Status.ERROR_NO_HANDLER) {
+ } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
callback.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+ } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+ } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
callback.completeExceptionally(new MessagingException.ProtocolException());
}
@@ -902,9 +899,9 @@
throw new AssertionError();
}
} else {
- log.debug("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
+ log.debug("Received a reply for message id:[{}] "
+ + "but was unable to locate the"
+ + " request handle", message.id());
}
}
@@ -934,30 +931,29 @@
*
* @param message the message to dispatch
*/
- private void dispatch(InternalMessage message) {
+ private void dispatch(InternalRequest message) {
if (message.preamble() != preamble) {
log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
- reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+ reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
return;
}
clockService.recordEventTime(message.time());
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, this);
} else {
log.debug("No handler for message type {} from {}", message.type(), message.sender());
- reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
}
}
@Override
- public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
- InternalMessage response = new InternalMessage(preamble,
+ public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
+ InternalReply response = new InternalReply(preamble,
clockService.timeNow(),
message.id(),
- localEndpoint,
payload.orElse(EMPTY_PAYLOAD),
status);
channel.writeAndFlush(response);