Reply with error status on receiving message with incorrect preamble
Change-Id: I0d17dc74c817546f221fbcade1d5642c8f29b0fe
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
index e71ac10..e954d0f 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
@@ -49,4 +49,10 @@
*/
public static class RemoteHandlerFailure extends MessagingException {
}
+
+ /**
+ * Exception indicating failure due to invalid message strucuture such as an incorrect preamble.
+ */
+ public static class ProcotolException extends MessagingException {
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index e02ecc8..0d96c09 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -42,24 +42,31 @@
/**
* Response status signifying an exception handling the message.
*/
- ERROR_HANDLER_EXCEPTION
+ ERROR_HANDLER_EXCEPTION,
+
+ /**
+ * Reponse status signifying invalid message structure.
+ */
+ PROTOCOL_EXCEPTION
// NOTE: For backwards compatibility it important that new enum constants
// be appended.
// FIXME: We should remove this restriction in the future.
}
+ private final int preamble;
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
private final Status status;
- public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
- this(id, sender, type, payload, Status.OK);
+ public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload) {
+ this(preamble, id, sender, type, payload, Status.OK);
}
- public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) {
+ public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload, Status status) {
+ this.preamble = preamble;
this.id = id;
this.sender = sender;
this.type = type;
@@ -67,6 +74,10 @@
this.status = status;
}
+ public int preamble() {
+ return preamble;
+ }
+
public long id() {
return id;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index aea3e29..4743987 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -39,7 +39,6 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private final int correctPreamble;
private long messageId;
private int preamble;
private Version ipVersion;
@@ -50,9 +49,8 @@
private Status status;
private int contentLength;
- public MessageDecoder(int correctPreamble) {
+ public MessageDecoder() {
super(DecoderState.READ_MESSAGE_PREAMBLE);
- this.correctPreamble = correctPreamble;
}
@Override
@@ -65,9 +63,6 @@
switch (state()) {
case READ_MESSAGE_PREAMBLE:
preamble = buffer.readInt();
- if (preamble != correctPreamble) {
- throw new IllegalStateException("This message had an incorrect preamble.");
- }
checkpoint(DecoderState.READ_MESSAGE_ID);
case READ_MESSAGE_ID:
messageId = buffer.readLong();
@@ -106,7 +101,8 @@
} else {
payload = new byte[0];
}
- InternalMessage message = new InternalMessage(messageId,
+ InternalMessage message = new InternalMessage(preamble,
+ messageId,
new Endpoint(senderIp, senderPort),
messageType,
payload,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 53611f3..8a32e6d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -217,7 +217,8 @@
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
- InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
+ InternalMessage message = new InternalMessage(preamble,
+ messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
@@ -263,7 +264,7 @@
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
- InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
+ InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
@@ -425,7 +426,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -459,7 +460,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -473,7 +474,7 @@
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(preamble))
+ .addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
}
@@ -497,6 +498,10 @@
}
}
private void dispatchLocally(InternalMessage message) throws IOException {
+ if (message.preamble() != preamble) {
+ log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+ }
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
@@ -509,6 +514,8 @@
callback.completeExceptionally(new MessagingException.NoRemoteHandler());
} else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.ProcotolException());
}
} else {
log.debug("Received a reply for message id:[{}]. "
@@ -530,7 +537,8 @@
}
private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
- InternalMessage response = new InternalMessage(message.id(),
+ InternalMessage response = new InternalMessage(preamble,
+ message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload.orElse(new byte[0]),