[Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur
Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 149b706..aea3e29 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -16,12 +16,15 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Charsets;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
+
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +47,7 @@
private int senderPort;
private int messageTypeLength;
private String messageType;
+ private Status status;
private int contentLength;
public MessageDecoder(int correctPreamble) {
@@ -86,18 +90,27 @@
byte[] messageTypeBytes = new byte[messageTypeLength];
buffer.readBytes(messageTypeBytes);
messageType = new String(messageTypeBytes, Charsets.UTF_8);
+ checkpoint(DecoderState.READ_MESSAGE_STATUS);
+ case READ_MESSAGE_STATUS:
+ status = Status.values()[buffer.readInt()];
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- //TODO Perform a sanity check on the size before allocating
- byte[] payload = new byte[contentLength];
- buffer.readBytes(payload);
+ byte[] payload;
+ if (contentLength > 0) {
+ //TODO Perform a sanity check on the size before allocating
+ payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ } else {
+ payload = new byte[0];
+ }
InternalMessage message = new InternalMessage(messageId,
new Endpoint(senderIp, senderPort),
messageType,
- payload);
+ payload,
+ status);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;