Reply with error status on receiving message with incorrect preamble
Change-Id: I0d17dc74c817546f221fbcade1d5642c8f29b0fe
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 53611f3..8a32e6d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -217,7 +217,8 @@
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
- InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ InternalMessage message = new InternalMessage(preamble,
+ messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
@@ -263,7 +264,7 @@
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
- InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+ InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
@@ -425,7 +426,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -459,7 +460,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -473,7 +474,7 @@
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -497,6 +498,10 @@
}
}
private void dispatchLocally(InternalMessage message) throws IOException {
+ if (message.preamble() != preamble) {
+ log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+ }
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
@@ -509,6 +514,8 @@
callback.completeExceptionally(new MessagingException.NoRemoteHandler());
} else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.ProcotolException());
}
} else {
log.debug("Received a reply for message id:[{}]. "
@@ -530,7 +537,8 @@
}
private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
- InternalMessage response = new InternalMessage(message.id(),
+ InternalMessage response = new InternalMessage(preamble,
+ message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload.orElse(new byte[0]),