Reply with error status on receiving message with incorrect preamble

Change-Id: I0d17dc74c817546f221fbcade1d5642c8f29b0fe
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
index e71ac10..e954d0f 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
@@ -49,4 +49,10 @@
      */
     public static class RemoteHandlerFailure extends MessagingException {
     }
+
+    /**
+     * Exception indicating failure due to invalid message strucuture such as an incorrect preamble.
+     */
+    public static class ProcotolException extends MessagingException {
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index e02ecc8..0d96c09 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -42,24 +42,31 @@
         /**
          * Response status signifying an exception handling the message.
          */
-        ERROR_HANDLER_EXCEPTION
+        ERROR_HANDLER_EXCEPTION,
+
+        /**
+         * Reponse status signifying invalid message structure.
+         */
+        PROTOCOL_EXCEPTION
 
         // NOTE: For backwards compatibility it important that new enum constants
         // be appended.
         // FIXME: We should remove this restriction in the future.
     }
 
+    private final int preamble;
     private final long id;
     private final Endpoint sender;
     private final String type;
     private final byte[] payload;
     private final Status status;
 
-    public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
-        this(id, sender, type, payload, Status.OK);
+    public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload) {
+        this(preamble, id, sender, type, payload, Status.OK);
     }
 
-    public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) {
+    public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload, Status status) {
+        this.preamble = preamble;
         this.id = id;
         this.sender = sender;
         this.type = type;
@@ -67,6 +74,10 @@
         this.status = status;
     }
 
+    public int preamble() {
+        return preamble;
+    }
+
     public long id() {
         return id;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index aea3e29..4743987 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -39,7 +39,6 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final int correctPreamble;
     private long messageId;
     private int preamble;
     private Version ipVersion;
@@ -50,9 +49,8 @@
     private Status status;
     private int contentLength;
 
-    public MessageDecoder(int correctPreamble) {
+    public MessageDecoder() {
         super(DecoderState.READ_MESSAGE_PREAMBLE);
-        this.correctPreamble = correctPreamble;
     }
 
     @Override
@@ -65,9 +63,6 @@
         switch (state()) {
         case READ_MESSAGE_PREAMBLE:
             preamble = buffer.readInt();
-            if (preamble != correctPreamble) {
-                throw new IllegalStateException("This message had an incorrect preamble.");
-            }
             checkpoint(DecoderState.READ_MESSAGE_ID);
         case READ_MESSAGE_ID:
             messageId = buffer.readLong();
@@ -106,7 +101,8 @@
             } else {
                 payload = new byte[0];
             }
-            InternalMessage message = new InternalMessage(messageId,
+            InternalMessage message = new InternalMessage(preamble,
+                                                          messageId,
                                                           new Endpoint(senderIp, senderPort),
                                                           messageType,
                                                           payload,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 53611f3..8a32e6d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -217,7 +217,8 @@
     @Override
     public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         checkPermission(CLUSTER_WRITE);
-        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+        InternalMessage message = new InternalMessage(preamble,
+                                                      messageIdGenerator.incrementAndGet(),
                                                       localEp,
                                                       type,
                                                       payload);
@@ -263,7 +264,7 @@
         Callback callback = new Callback(response, executor);
         Long messageId = messageIdGenerator.incrementAndGet();
         callbacks.put(messageId, callback);
-        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+        InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
         return sendAsync(ep, message).whenComplete((r, e) -> {
             if (e != null) {
                 callbacks.invalidate(messageId);
@@ -425,7 +426,7 @@
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -459,7 +460,7 @@
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -473,7 +474,7 @@
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -497,6 +498,10 @@
         }
     }
     private void dispatchLocally(InternalMessage message) throws IOException {
+        if (message.preamble() != preamble) {
+            log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+            sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+        }
         String type = message.type();
         if (REPLY_MESSAGE_TYPE.equals(type)) {
             try {
@@ -509,6 +514,8 @@
                         callback.completeExceptionally(new MessagingException.NoRemoteHandler());
                     } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
                         callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                    } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+                        callback.completeExceptionally(new MessagingException.ProcotolException());
                     }
                 } else {
                     log.debug("Received a reply for message id:[{}]. "
@@ -530,7 +537,8 @@
     }
 
     private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
-        InternalMessage response = new InternalMessage(message.id(),
+        InternalMessage response = new InternalMessage(preamble,
+                message.id(),
                 localEp,
                 REPLY_MESSAGE_TYPE,
                 responsePayload.orElse(new byte[0]),