[ONOS-6868] Improve Netty message encoder/decoder performance

Change-Id: I6b4e2490fecb15bb20d9a8bb19fede3b53327bc1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index 2f2bef1..b58871e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -19,16 +19,17 @@
  * State transitions a decoder goes through as it is decoding an incoming message.
  */
 public enum DecoderState {
-    READ_MESSAGE_PREAMBLE,
+    READ_TYPE,
+    READ_PREAMBLE,
     READ_LOGICAL_TIME,
     READ_LOGICAL_COUNTER,
     READ_MESSAGE_ID,
     READ_SENDER_IP_VERSION,
     READ_SENDER_IP,
     READ_SENDER_PORT,
-    READ_MESSAGE_TYPE_LENGTH,
-    READ_MESSAGE_TYPE,
-    READ_MESSAGE_STATUS,
+    READ_SUBJECT_LENGTH,
+    READ_SUBJECT,
+    READ_STATUS,
     READ_CONTENT_LENGTH,
     READ_CONTENT
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 0ab7c31..646a20d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,76 +15,47 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
-import com.google.common.base.MoreObjects;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
 import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
 
 /**
- * Internal message representation with additional attributes
- * for supporting, synchronous request/reply behavior.
+ * Base class for internal messages.
  */
-public final class InternalMessage {
+public abstract class InternalMessage {
 
     /**
-     * Message status.
+     * Internal message type.
      */
-    public enum Status {
-
-        // NOTE: For backwards compatibility enum constant IDs should not be changed.
-
-        /**
-         * All ok.
-         */
-        OK(0),
-
-        /**
-         * Response status signifying no registered handler.
-         */
-        ERROR_NO_HANDLER(1),
-
-        /**
-         * Response status signifying an exception handling the message.
-         */
-        ERROR_HANDLER_EXCEPTION(2),
-
-        /**
-         * Response status signifying invalid message structure.
-         */
-        PROTOCOL_EXCEPTION(3);
+    public enum Type {
+        REQUEST(1),
+        REPLY(2);
 
         private final int id;
 
-        Status(int id) {
+        Type(int id) {
             this.id = id;
         }
 
         /**
-         * Returns the unique status ID.
+         * Returns the unique message type ID.
          *
-         * @return the unique status ID.
+         * @return the unique message type ID.
          */
         public int id() {
             return id;
         }
 
         /**
-         * Returns the status enum associated with the given ID.
+         * Returns the message type enum associated with the given ID.
          *
-         * @param id the status ID.
-         * @return the status enum for the given ID.
+         * @param id the type ID.
+         * @return the type enum for the given ID.
          */
-        public static Status forId(int id) {
+        public static Type forId(int id) {
             switch (id) {
-                case 0:
-                    return OK;
                 case 1:
-                    return ERROR_NO_HANDLER;
+                    return REQUEST;
                 case 2:
-                    return ERROR_HANDLER_EXCEPTION;
-                case 3:
-                    return PROTOCOL_EXCEPTION;
+                    return REPLY;
                 default:
                     throw new IllegalArgumentException("Unknown status ID " + id);
             }
@@ -94,51 +65,26 @@
     private final int preamble;
     private final HybridLogicalTime time;
     private final long id;
-    private final Endpoint sender;
-    private final String type;
     private final byte[] payload;
-    private final Status status;
 
-    public InternalMessage(int preamble,
-                           HybridLogicalTime time,
-                           long id,
-                           Endpoint sender,
-                           String type,
-                           byte[] payload) {
-        this(preamble, time, id, sender, type, payload, null);
-    }
-
-    public InternalMessage(int preamble,
+    protected InternalMessage(int preamble,
             HybridLogicalTime time,
             long id,
-            Endpoint sender,
-            byte[] payload,
-            Status status) {
-        this(preamble, time, id, sender, "", payload, status);
-    }
-
-    InternalMessage(int preamble,
-                           HybridLogicalTime time,
-                           long id,
-                           Endpoint sender,
-                           String type,
-                           byte[] payload,
-                           Status status) {
+            byte[] payload) {
         this.preamble = preamble;
         this.time = time;
         this.id = id;
-        this.sender = sender;
-        this.type = type;
         this.payload = payload;
-        this.status = status;
     }
 
+    public abstract Type type();
+
     public boolean isRequest() {
-        return status == null;
+        return type() == Type.REQUEST;
     }
 
     public boolean isReply() {
-        return status != null;
+        return type() == Type.REPLY;
     }
 
     public HybridLogicalTime time() {
@@ -153,31 +99,7 @@
         return id;
     }
 
-    public String type() {
-        return type;
-    }
-
-    public Endpoint sender() {
-        return sender;
-    }
-
     public byte[] payload() {
         return payload;
     }
-
-    public Status status() {
-        return status;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add("time", time)
-                .add("id", id)
-                .add("type", type)
-                .add("sender", sender)
-                .add("status", status)
-                .add("payload", ByteArraySizeHashPrinter.of(payload))
-                .toString();
-    }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
new file mode 100644
index 0000000..2030d61
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.core.HybridLogicalTime;
+
+/**
+ * Internal reply message.
+ */
+public final class InternalReply extends InternalMessage {
+
+    /**
+     * Message status.
+     */
+    public enum Status {
+
+        // NOTE: For backwards compatibility enum constant IDs should not be changed.
+
+        /**
+         * All ok.
+         */
+        OK(0),
+
+        /**
+         * Response status signifying no registered handler.
+         */
+        ERROR_NO_HANDLER(1),
+
+        /**
+         * Response status signifying an exception handling the message.
+         */
+        ERROR_HANDLER_EXCEPTION(2),
+
+        /**
+         * Response status signifying invalid message structure.
+         */
+        PROTOCOL_EXCEPTION(3);
+
+        private final int id;
+
+        Status(int id) {
+            this.id = id;
+        }
+
+        /**
+         * Returns the unique status ID.
+         *
+         * @return the unique status ID.
+         */
+        public int id() {
+            return id;
+        }
+
+        /**
+         * Returns the status enum associated with the given ID.
+         *
+         * @param id the status ID.
+         * @return the status enum for the given ID.
+         */
+        public static Status forId(int id) {
+            switch (id) {
+                case 0:
+                    return OK;
+                case 1:
+                    return ERROR_NO_HANDLER;
+                case 2:
+                    return ERROR_HANDLER_EXCEPTION;
+                case 3:
+                    return PROTOCOL_EXCEPTION;
+                default:
+                    throw new IllegalArgumentException("Unknown status ID " + id);
+            }
+        }
+    }
+
+    private final Status status;
+
+    public InternalReply(int preamble,
+            HybridLogicalTime time,
+            long id,
+            Status status) {
+        this(preamble, time, id, new byte[0], status);
+    }
+
+    public InternalReply(int preamble,
+            HybridLogicalTime time,
+            long id,
+            byte[] payload,
+            Status status) {
+        super(preamble, time, id, payload);
+        this.status = status;
+    }
+
+    @Override
+    public Type type() {
+        return Type.REPLY;
+    }
+
+    public Status status() {
+        return status;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("time", time())
+                .add("id", id())
+                .add("status", status())
+                .add("payload", ByteArraySizeHashPrinter.of(payload()))
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
new file mode 100644
index 0000000..81a74c9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.core.HybridLogicalTime;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+/**
+ * Internal request message.
+ */
+public final class InternalRequest extends InternalMessage {
+    private final Endpoint sender;
+    private final String subject;
+
+    public InternalRequest(int preamble,
+                           HybridLogicalTime time,
+                           long id,
+                           Endpoint sender,
+                           String subject,
+                           byte[] payload) {
+        super(preamble, time, id, payload);
+        this.sender = sender;
+        this.subject = subject;
+    }
+
+    @Override
+    public Type type() {
+        return Type.REQUEST;
+    }
+
+    public String subject() {
+        return subject;
+    }
+
+    public Endpoint sender() {
+        return sender;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("time", time())
+                .add("id", id())
+                .add("subject", subject)
+                .add("sender", sender)
+                .add("payload", ByteArraySizeHashPrinter.of(payload()))
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 367193a..6b5450d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -25,7 +25,6 @@
 import org.onlab.packet.IpAddress.Version;
 import org.onosproject.core.HybridLogicalTime;
 import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,20 +39,23 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private long logicalTime;
-    private long logicalCounter;
-    private long messageId;
-    private int preamble;
     private Version ipVersion;
     private IpAddress senderIp;
     private int senderPort;
-    private int messageTypeLength;
-    private String messageType;
-    private Status status;
+
+    private InternalMessage.Type type;
+    private int preamble;
+    private long logicalTime;
+    private long logicalCounter;
+    private long messageId;
     private int contentLength;
+    private byte[] content;
+    private int subjectLength;
+    private String subject;
+    private InternalReply.Status status;
 
     public MessageDecoder() {
-        super(DecoderState.READ_MESSAGE_PREAMBLE);
+        super(DecoderState.READ_SENDER_IP_VERSION);
     }
 
     @Override
@@ -64,69 +66,100 @@
             List<Object> out) throws Exception {
 
         switch (state()) {
-        case READ_MESSAGE_PREAMBLE:
-            preamble = buffer.readInt();
-            checkpoint(DecoderState.READ_LOGICAL_TIME);
-        case READ_LOGICAL_TIME:
-            logicalTime = buffer.readLong();
-            checkpoint(DecoderState.READ_LOGICAL_COUNTER);
-        case READ_LOGICAL_COUNTER:
-            logicalCounter = buffer.readLong();
-            checkpoint(DecoderState.READ_MESSAGE_ID);
-        case READ_MESSAGE_ID:
-            messageId = buffer.readLong();
-            checkpoint(DecoderState.READ_SENDER_IP_VERSION);
-        case READ_SENDER_IP_VERSION:
-            ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
-            checkpoint(DecoderState.READ_SENDER_IP);
-        case READ_SENDER_IP:
-            byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
-            buffer.readBytes(octets);
-            senderIp = IpAddress.valueOf(ipVersion, octets);
-            checkpoint(DecoderState.READ_SENDER_PORT);
-        case READ_SENDER_PORT:
-            senderPort = buffer.readInt();
-            checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
-        case READ_MESSAGE_TYPE_LENGTH:
-            messageTypeLength = buffer.readShort();
-            checkpoint(DecoderState.READ_MESSAGE_TYPE);
-        case READ_MESSAGE_TYPE:
-            byte[] messageTypeBytes = new byte[messageTypeLength];
-            buffer.readBytes(messageTypeBytes);
-            messageType = new String(messageTypeBytes, Charsets.UTF_8);
-            checkpoint(DecoderState.READ_MESSAGE_STATUS);
-        case READ_MESSAGE_STATUS:
-            int statusId = buffer.readByte();
-            if (statusId == -1) {
-                status = null;
-            } else {
-                status = Status.forId(statusId);
-            }
-            checkpoint(DecoderState.READ_CONTENT_LENGTH);
-        case READ_CONTENT_LENGTH:
-            contentLength = buffer.readInt();
-            checkpoint(DecoderState.READ_CONTENT);
-        case READ_CONTENT:
-            byte[] payload;
-            if (contentLength > 0) {
-                //TODO Perform a sanity check on the size before allocating
-                payload = new byte[contentLength];
-                buffer.readBytes(payload);
-            } else {
-                payload = new byte[0];
-            }
-            InternalMessage message = new InternalMessage(preamble,
-                                                          new HybridLogicalTime(logicalTime, logicalCounter),
-                                                          messageId,
-                                                          new Endpoint(senderIp, senderPort),
-                                                          messageType,
-                                                          payload,
-                                                          status);
-            out.add(message);
-            checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
-            break;
-         default:
-            checkState(false, "Must not be here");
+            case READ_SENDER_IP_VERSION:
+                ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+                checkpoint(DecoderState.READ_SENDER_IP);
+            case READ_SENDER_IP:
+                byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
+                buffer.readBytes(octets);
+                senderIp = IpAddress.valueOf(ipVersion, octets);
+                checkpoint(DecoderState.READ_SENDER_PORT);
+            case READ_SENDER_PORT:
+                senderPort = buffer.readInt();
+                checkpoint(DecoderState.READ_TYPE);
+            case READ_TYPE:
+                type = InternalMessage.Type.forId(buffer.readByte());
+                checkpoint(DecoderState.READ_PREAMBLE);
+            case READ_PREAMBLE:
+                preamble = buffer.readInt();
+                checkpoint(DecoderState.READ_LOGICAL_TIME);
+            case READ_LOGICAL_TIME:
+                logicalTime = buffer.readLong();
+                checkpoint(DecoderState.READ_LOGICAL_COUNTER);
+            case READ_LOGICAL_COUNTER:
+                logicalCounter = buffer.readLong();
+                checkpoint(DecoderState.READ_MESSAGE_ID);
+            case READ_MESSAGE_ID:
+                messageId = buffer.readLong();
+                checkpoint(DecoderState.READ_CONTENT_LENGTH);
+            case READ_CONTENT_LENGTH:
+                contentLength = buffer.readInt();
+                checkpoint(DecoderState.READ_CONTENT);
+            case READ_CONTENT:
+                if (contentLength > 0) {
+                    //TODO Perform a sanity check on the size before allocating
+                    content = new byte[contentLength];
+                    buffer.readBytes(content);
+                } else {
+                    content = new byte[0];
+                }
+
+                switch (type) {
+                    case REQUEST:
+                        checkpoint(DecoderState.READ_SUBJECT_LENGTH);
+                        break;
+                    case REPLY:
+                        checkpoint(DecoderState.READ_STATUS);
+                        break;
+                    default:
+                        checkState(false, "Must not be here");
+                }
+                break;
+            default:
+                break;
+        }
+
+        switch (type) {
+            case REQUEST:
+                switch (state()) {
+                    case READ_SUBJECT_LENGTH:
+                        subjectLength = buffer.readShort();
+                        checkpoint(DecoderState.READ_SUBJECT);
+                    case READ_SUBJECT:
+                        byte[] messageTypeBytes = new byte[subjectLength];
+                        buffer.readBytes(messageTypeBytes);
+                        subject = new String(messageTypeBytes, Charsets.UTF_8);
+                        InternalRequest message = new InternalRequest(preamble,
+                                new HybridLogicalTime(logicalTime, logicalCounter),
+                                messageId,
+                                new Endpoint(senderIp, senderPort),
+                                subject,
+                                content);
+                        out.add(message);
+                        checkpoint(DecoderState.READ_TYPE);
+                        break;
+                    default:
+                        break;
+                }
+                break;
+            case REPLY:
+                switch (state()) {
+                    case READ_STATUS:
+                        status = InternalReply.Status.forId(buffer.readByte());
+                        InternalReply message = new InternalReply(preamble,
+                                new HybridLogicalTime(logicalTime, logicalCounter),
+                                messageId,
+                                content,
+                                status);
+                        out.add(message);
+                        checkpoint(DecoderState.READ_TYPE);
+                        break;
+                    default:
+                        break;
+                }
+                break;
+            default:
+                checkState(false, "Must not be here");
         }
     }
 
@@ -135,4 +168,4 @@
         log.error("Exception inside channel handling pipeline.", cause);
         context.close();
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index 6ec50a3..b7513a0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -15,9 +15,10 @@
  */
 package org.onosproject.store.cluster.messaging.impl;
 
+import java.io.IOException;
+
 import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import org.onlab.packet.IpAddress;
@@ -26,34 +27,55 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 /**
  * Encode InternalMessage out into a byte buffer.
  */
-@Sharable
 public class MessageEncoder extends MessageToByteEncoder<Object> {
 // Effectively MessageToByteEncoder<InternalMessage>,
 // had to specify <Object> to avoid Class Loader not being able to find some classes.
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final Endpoint endpoint;
     private final int preamble;
+    private boolean endpointWritten;
 
-    public MessageEncoder(int preamble) {
+    public MessageEncoder(Endpoint endpoint, int preamble) {
         super();
+        this.endpoint = endpoint;
         this.preamble = preamble;
     }
 
-
     @Override
     protected void encode(
             ChannelHandlerContext context,
             Object rawMessage,
             ByteBuf out) throws Exception {
+        if (rawMessage instanceof InternalRequest) {
+            encodeRequest((InternalRequest) rawMessage, out);
+        } else if (rawMessage instanceof InternalReply) {
+            encodeReply((InternalReply) rawMessage, out);
+        }
+    }
 
-        InternalMessage message = (InternalMessage) rawMessage;
+    private void encodeMessage(InternalMessage message, ByteBuf out) {
+        // If the endpoint hasn't been written to the channel, write it.
+        if (!endpointWritten) {
+            IpAddress senderIp = endpoint.host();
+            if (senderIp.version() == Version.INET) {
+                out.writeByte(0);
+            } else {
+                out.writeByte(1);
+            }
+            out.writeBytes(senderIp.toOctets());
 
+            // write sender port
+            out.writeInt(endpoint.port());
+
+            endpointWritten = true;
+        }
+
+        out.writeByte(message.type().id());
         out.writeInt(this.preamble);
 
         // write time
@@ -63,35 +85,6 @@
         // write message id
         out.writeLong(message.id());
 
-        Endpoint sender = message.sender();
-
-        IpAddress senderIp = sender.host();
-        if (senderIp.version() == Version.INET) {
-            out.writeByte(0);
-        } else {
-            out.writeByte(1);
-        }
-        out.writeBytes(senderIp.toOctets());
-
-        // write sender port
-        out.writeInt(sender.port());
-
-        byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
-
-        // write length of message type
-        out.writeShort(messageTypeBytes.length);
-
-        // write message type bytes
-        out.writeBytes(messageTypeBytes);
-
-        // write message status value
-        InternalMessage.Status status = message.status();
-        if (status == null) {
-            out.writeByte(-1);
-        } else {
-            out.writeByte(status.id());
-        }
-
         byte[] payload = message.payload();
 
         // write payload length
@@ -101,6 +94,26 @@
         out.writeBytes(payload);
     }
 
+    private void encodeRequest(InternalRequest request, ByteBuf out) {
+        encodeMessage(request, out);
+
+        byte[] messageTypeBytes = request.subject().getBytes(Charsets.UTF_8);
+
+        // write length of message type
+        out.writeShort(messageTypeBytes.length);
+
+        // write message type bytes
+        out.writeBytes(messageTypeBytes);
+
+    }
+
+    private void encodeReply(InternalReply reply, ByteBuf out) {
+        encodeMessage(reply, out);
+
+        // write message status value
+        out.writeByte(reply.status().id());
+    }
+
     @Override
     public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
         if (cause instanceof IOException) {
@@ -116,4 +129,4 @@
     public final boolean acceptOutboundMessage(Object msg) throws Exception {
         return msg instanceof InternalMessage;
     }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index d9ed9c3..f77fa69 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -90,7 +90,6 @@
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,7 +131,7 @@
     private Endpoint localEndpoint;
     private int preamble;
     private final AtomicBoolean started = new AtomicBoolean(false);
-    private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+    private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
     private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
     private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -296,7 +295,7 @@
     @Override
     public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         checkPermission(CLUSTER_WRITE);
-        InternalMessage message = new InternalMessage(preamble,
+        InternalRequest message = new InternalRequest(preamble,
                 clockService.timeNow(),
                 messageIdGenerator.incrementAndGet(),
                 localEndpoint,
@@ -315,7 +314,7 @@
     public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
         checkPermission(CLUSTER_WRITE);
         Long messageId = messageIdGenerator.incrementAndGet();
-        InternalMessage message = new InternalMessage(preamble,
+        InternalRequest message = new InternalRequest(preamble,
                 clockService.timeNow(),
                 messageId,
                 localEndpoint,
@@ -444,12 +443,12 @@
         checkPermission(CLUSTER_WRITE);
         handlers.put(type, (message, connection) -> executor.execute(() -> {
             byte[] responsePayload = null;
-            Status status = Status.OK;
+            InternalReply.Status status = InternalReply.Status.OK;
             try {
                 responsePayload = handler.apply(message.sender(), message.payload());
             } catch (Exception e) {
                 log.debug("An error occurred in a message handler: {}", e);
-                status = Status.ERROR_HANDLER_EXCEPTION;
+                status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
             }
             connection.reply(message, status, Optional.ofNullable(responsePayload));
         }));
@@ -460,12 +459,12 @@
         checkPermission(CLUSTER_WRITE);
         handlers.put(type, (message, connection) -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                Status status;
+                InternalReply.Status status;
                 if (error == null) {
-                    status = Status.OK;
+                    status = InternalReply.Status.OK;
                 } else {
                     log.debug("An error occurred in a message handler: {}", error);
-                    status = Status.ERROR_HANDLER_EXCEPTION;
+                    status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
                 }
                 connection.reply(message, status, Optional.ofNullable(result));
             });
@@ -549,7 +548,6 @@
      */
     private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -565,7 +563,7 @@
             serverSslEngine.setEnableSessionCreation(true);
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -576,7 +574,6 @@
      */
     private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
@@ -591,7 +588,7 @@
             clientSslEngine.setEnableSessionCreation(true);
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -602,12 +599,11 @@
      */
     private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
         private final ChannelHandler dispatcher = new InboundMessageDispatcher();
-        private final ChannelHandler encoder = new MessageEncoder(preamble);
 
         @Override
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
-                    .addLast("encoder", encoder)
+                    .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
                     .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
@@ -628,11 +624,11 @@
                 if (message.isRequest()) {
                     RemoteServerConnection connection =
                             serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
-                    connection.dispatch(message);
+                    connection.dispatch((InternalRequest) message);
                 } else {
                     RemoteClientConnection connection =
                             clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
-                    connection.dispatch(message);
+                    connection.dispatch((InternalReply) message);
                 }
             } catch (RejectedExecutionException e) {
                 log.warn("Unable to dispatch message due to {}", e.getMessage());
@@ -702,7 +698,7 @@
          * @param message the message to send
          * @return a completable future to be completed once the message has been sent
          */
-        CompletableFuture<Void> sendAsync(InternalMessage message);
+        CompletableFuture<Void> sendAsync(InternalRequest message);
 
         /**
          * Sends a message to the other side of the connection, awaiting a reply.
@@ -710,7 +706,7 @@
          * @param message the message to send
          * @return a completable future to be completed once a reply is received or the request times out
          */
-        CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+        CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
 
         /**
          * Closes the connection.
@@ -731,7 +727,7 @@
          * @param status the reply status
          * @param payload the response payload
          */
-        void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+        void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
 
         /**
          * Closes the connection.
@@ -745,8 +741,8 @@
      */
     private final class LocalClientConnection implements ClientConnection {
         @Override
-        public CompletableFuture<Void> sendAsync(InternalMessage message) {
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+        public CompletableFuture<Void> sendAsync(InternalRequest message) {
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, localServerConnection);
             } else {
@@ -756,14 +752,15 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, new LocalServerConnection(future));
             } else {
                 log.debug("No handler for message type {} from {}", message.type(), message.sender());
-                new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+                new LocalServerConnection(future)
+                        .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
             }
             return future;
         }
@@ -780,15 +777,15 @@
         }
 
         @Override
-        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
             if (future != null) {
-                if (status == Status.OK) {
+                if (status == InternalReply.Status.OK) {
                     future.complete(payload.orElse(EMPTY_PAYLOAD));
-                } else if (status == Status.ERROR_NO_HANDLER) {
+                } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
                     future.completeExceptionally(new MessagingException.NoRemoteHandler());
-                } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+                } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                     future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
-                } else if (status == Status.PROTOCOL_EXCEPTION) {
+                } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
                     future.completeExceptionally(new MessagingException.ProtocolException());
                 }
             }
@@ -844,7 +841,7 @@
         }
 
         @Override
-        public CompletableFuture<Void> sendAsync(InternalMessage message) {
+        public CompletableFuture<Void> sendAsync(InternalRequest message) {
             CompletableFuture<Void> future = new CompletableFuture<>();
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
@@ -857,9 +854,9 @@
         }
 
         @Override
-        public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
             CompletableFuture<byte[]> future = new CompletableFuture<>();
-            Callback callback = new Callback(message.type(), future);
+            Callback callback = new Callback(message.subject(), future);
             futures.put(message.id(), callback);
             channel.writeAndFlush(message).addListener(channelFuture -> {
                 if (!channelFuture.isSuccess()) {
@@ -875,9 +872,9 @@
          *
          * @param message the message to dispatch
          */
-        private void dispatch(InternalMessage message) {
+        private void dispatch(InternalReply message) {
             if (message.preamble() != preamble) {
-                log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+                log.debug("Received {} with invalid preamble", message.type());
                 return;
             }
 
@@ -885,13 +882,13 @@
 
             Callback callback = futures.remove(message.id());
             if (callback != null) {
-                if (message.status() == Status.OK) {
+                if (message.status() == InternalReply.Status.OK) {
                     callback.complete(message.payload());
-                } else if (message.status() == Status.ERROR_NO_HANDLER) {
+                } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
                     callback.completeExceptionally(new MessagingException.NoRemoteHandler());
-                } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+                } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                     callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
-                } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+                } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
                     callback.completeExceptionally(new MessagingException.ProtocolException());
                 }
 
@@ -902,9 +899,9 @@
                     throw new AssertionError();
                 }
             } else {
-                log.debug("Received a reply for message id:[{}]. "
-                        + " from {}. But was unable to locate the"
-                        + " request handle", message.id(), message.sender());
+                log.debug("Received a reply for message id:[{}] "
+                        + "but was unable to locate the"
+                        + " request handle", message.id());
             }
         }
 
@@ -934,30 +931,29 @@
          *
          * @param message the message to dispatch
          */
-        private void dispatch(InternalMessage message) {
+        private void dispatch(InternalRequest message) {
             if (message.preamble() != preamble) {
                 log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
-                reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+                reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
                 return;
             }
 
             clockService.recordEventTime(message.time());
 
-            BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+            BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
             if (handler != null) {
                 handler.accept(message, this);
             } else {
                 log.debug("No handler for message type {} from {}", message.type(), message.sender());
-                reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+                reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
             }
         }
 
         @Override
-        public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
-            InternalMessage response = new InternalMessage(preamble,
+        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
+            InternalReply response = new InternalReply(preamble,
                     clockService.timeNow(),
                     message.id(),
-                    localEndpoint,
                     payload.orElse(EMPTY_PAYLOAD),
                     status);
             channel.writeAndFlush(response);