[ONOS-6868] Improve Netty message encoder/decoder performance

Change-Id: I6b4e2490fecb15bb20d9a8bb19fede3b53327bc1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index d9ed9c3..f77fa69 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -90,7 +90,6 @@
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,7 +131,7 @@
     private Endpoint localEndpoint;
     private int preamble;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+    private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
     private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
     private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -296,7 +295,7 @@
     @Override
     public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         checkPermission(CLUSTER_WRITE);
-        InternalMessage message = new InternalMessage(preamble,
+        InternalRequest message = new InternalRequest(preamble,
                 clockService.timeNow(),
                 messageIdGenerator.incrementAndGet(),
                 localEndpoint,
@@ -315,7 +314,7 @@
     public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
         checkPermission(CLUSTER_WRITE);
         Long messageId = messageIdGenerator.incrementAndGet();
-        InternalMessage message = new InternalMessage(preamble,
+        InternalRequest message = new InternalRequest(preamble,
                 clockService.timeNow(),
                 messageId,
                 localEndpoint,
@@ -444,12 +443,12 @@
         checkPermission(CLUSTER_WRITE);
         handlers.put(type, (message, connection) -> executor.execute(() -> {
             byte[] responsePayload = null;
-            Status status = Status.OK;
+            InternalReply.Status status = InternalReply.Status.OK;
             try {
                 responsePayload = handler.apply(message.sender(), message.payload());
             } catch (Exception e) {
                 log.debug("An error occurred in a message handler: {}", e);
-                status = Status.ERROR_HANDLER_EXCEPTION;
+                status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
             }
             connection.reply(message, status, Optional.ofNullable(responsePayload));
         }));
@@ -460,12 +459,12 @@
         checkPermission(CLUSTER_WRITE);
         handlers.put(type, (message, connection) -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                Status status;
+                InternalReply.Status status;
                 if (error == null) {
-                    status = Status.OK;
+                    status = InternalReply.Status.OK;
                 } else {
                     log.debug("An error occurred in a message handler: {}", error);
-                    status = Status.ERROR_HANDLER_EXCEPTION;
+                    status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
                 }
                 connection.reply(message, status, Optional.ofNullable(result));
             });
@@ -549,7 +548,6 @@
      */
     private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -565,7 +563,7 @@
             serverSslEngine.setEnableSessionCreation(true);
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -576,7 +574,6 @@
      */
     private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -591,7 +588,7 @@
             clientSslEngine.setEnableSessionCreation(true);
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -602,12 +599,11 @@
      */
     private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -628,11 +624,11 @@
                 if (message.isRequest()) {
                     RemoteServerConnection connection =
                             serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
-                    connection.dispatch(message);
+                    connection.dispatch((InternalRequest) message);
                 } else {
                     RemoteClientConnection connection =
                             clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
-                    connection.dispatch(message);
+                    connection.dispatch((InternalReply) message);
                 }
             } catch (RejectedExecutionException e) {
                 log.warn("Unable to dispatch message due to {}", e.getMessage());
@@ -702,7 +698,7 @@
          * @param message the message to send
          * @return a completable future to be completed once the message has been sent
          */
-        CompletableFuture<Void> sendAsync(InternalMessage message);
+        CompletableFuture<Void> sendAsync(InternalRequest message);
 
         /**
          * Sends a message to the other side of the connection, awaiting a reply.
@@ -710,7 +706,7 @@
          * @param message the message to send
          * @return a completable future to be completed once a reply is received or the request times out
          */
-        CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+        CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
 
         /**
          * Closes the connection.
@@ -731,7 +727,7 @@
          * @param status the reply status
          * @param payload the response payload
          */
-        void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+        void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
 
         /**
          * Closes the connection.
@@ -745,8 +741,8 @@
      */
     private final class LocalClientConnection implements ClientConnection {
         @Override
-        public CompletableFuture<Void> sendAsync(InternalMessage message) {
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+        public CompletableFuture<Void> sendAsync(InternalRequest message) {
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, localServerConnection);
             } else {
@@ -756,14 +752,15 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, new LocalServerConnection(future));
             } else {
                 log.debug("No handler for message type {} from {}", message.type(), message.sender());
-                new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+                new LocalServerConnection(future)
+                        .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
             }
             return future;
         }
@@ -780,15 +777,15 @@
         }
 
         @Override
-        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
             if (future != null) {
-                if (status == Status.OK) {
+                if (status == InternalReply.Status.OK) {
                     future.complete(payload.orElse(EMPTY_PAYLOAD));
-                } else if (status == Status.ERROR_NO_HANDLER) {
+                } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
                     future.completeExceptionally(new MessagingException.NoRemoteHandler());
-                } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+                } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                     future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
-                } else if (status == Status.PROTOCOL_EXCEPTION) {
+                } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
                     future.completeExceptionally(new MessagingException.ProtocolException());
                 }
             }
@@ -844,7 +841,7 @@
         }
 
         @Override
-        public CompletableFuture<Void> sendAsync(InternalMessage message) {
+        public CompletableFuture<Void> sendAsync(InternalRequest message) {
             CompletableFuture<Void> future = new CompletableFuture<>();
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
@@ -857,9 +854,9 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            Callback callback = new Callback(message.type(), future);
+            Callback callback = new Callback(message.subject(), future);
             futures.put(message.id(), callback);
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
@@ -875,9 +872,9 @@
          *
          * @param message the message to dispatch
          */
-        private void dispatch(InternalMessage message) {
+        private void dispatch(InternalReply message) {
             if (message.preamble() != preamble) {
-                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+                log.debug("Received {} with invalid preamble", message.type());
                 return;
             }
 
@@ -885,13 +882,13 @@
 
             Callback callback = futures.remove(message.id());
             if (callback != null) {
-                if (message.status() == Status.OK) {
+                if (message.status() == InternalReply.Status.OK) {
                     callback.complete(message.payload());
-                } else if (message.status() == Status.ERROR_NO_HANDLER) {
+                } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
                     callback.completeExceptionally(new MessagingException.NoRemoteHandler());
-                } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+                } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                     callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
-                } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+                } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
                     callback.completeExceptionally(new MessagingException.ProtocolException());
                 }
 
@@ -902,9 +899,9 @@
                     throw new AssertionError();
                 }
             } else {
-                log.debug("Received a reply for message id:[{}]. "
-                        + " from {}. But was unable to locate the"
-                        + " request handle", message.id(), message.sender());
+                log.debug("Received a reply for message id:[{}] "
+                        + "but was unable to locate the"
+                        + " request handle", message.id());
             }
         }
 
@@ -934,30 +931,29 @@
          *
          * @param message the message to dispatch
          */
-        private void dispatch(InternalMessage message) {
+        private void dispatch(InternalRequest message) {
             if (message.preamble() != preamble) {
                 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
-                reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+                reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
                 return;
             }
 
             clockService.recordEventTime(message.time());
 
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, this);
             } else {
                 log.debug("No handler for message type {} from {}", message.type(), message.sender());
-                reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+                reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
             }
         }
 
         @Override
-        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
-            InternalMessage response = new InternalMessage(preamble,
+        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
+            InternalReply response = new InternalReply(preamble,
                     clockService.timeNow(),
                     message.id(),
-                    localEndpoint,
                     payload.orElse(EMPTY_PAYLOAD),
                     status);
             channel.writeAndFlush(response);