Reply with error status on receiving message with incorrect preamble

Change-Id: I0d17dc74c817546f221fbcade1d5642c8f29b0fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 53611f3..8a32e6d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -217,7 +217,8 @@
     @Override
     public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         checkPermission(CLUSTER_WRITE);
-        InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+        InternalMessage message = new InternalMessage(preamble,
+                                                      messageIdGenerator.incrementAndGet(),
                                                       localEp,
                                                       type,
                                                       payload);
@@ -263,7 +264,7 @@
         Callback callback = new Callback(response, executor);
         Long messageId = messageIdGenerator.incrementAndGet();
         callbacks.put(messageId, callback);
-        InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+        InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
         return sendAsync(ep, message).whenComplete((r, e) -> {
             if (e != null) {
                 callbacks.invalidate(messageId);
@@ -425,7 +426,7 @@
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -459,7 +460,7 @@
 
             channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -473,7 +474,7 @@
         protected void initChannel(SocketChannel channel) throws Exception {
             channel.pipeline()
                     .addLast("encoder", encoder)
-                    .addLast("decoder", new MessageDecoder(preamble))
+                    .addLast("decoder", new MessageDecoder())
                     .addLast("handler", dispatcher);
         }
     }
@@ -497,6 +498,10 @@
         }
     }
     private void dispatchLocally(InternalMessage message) throws IOException {
+        if (message.preamble() != preamble) {
+            log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+            sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+        }
         String type = message.type();
         if (REPLY_MESSAGE_TYPE.equals(type)) {
             try {
@@ -509,6 +514,8 @@
                         callback.completeExceptionally(new MessagingException.NoRemoteHandler());
                     } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
                         callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                    } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+                        callback.completeExceptionally(new MessagingException.ProcotolException());
                     }
                 } else {
                     log.debug("Received a reply for message id:[{}]. "
@@ -530,7 +537,8 @@
     }
 
     private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
-        InternalMessage response = new InternalMessage(message.id(),
+        InternalMessage response = new InternalMessage(preamble,
+                message.id(),
                 localEp,
                 REPLY_MESSAGE_TYPE,
                 responsePayload.orElse(new byte[0]),