[ONOS-6868] Improve Netty message encoder/decoder performance
Change-Id: I6b4e2490fecb15bb20d9a8bb19fede3b53327bc1
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index 2f2bef1..b58871e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -19,16 +19,17 @@
* State transitions a decoder goes through as it is decoding an incoming message.
*/
public enum DecoderState {
- READ_MESSAGE_PREAMBLE,
+ READ_TYPE,
+ READ_PREAMBLE,
READ_LOGICAL_TIME,
READ_LOGICAL_COUNTER,
READ_MESSAGE_ID,
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
READ_SENDER_PORT,
- READ_MESSAGE_TYPE_LENGTH,
- READ_MESSAGE_TYPE,
- READ_MESSAGE_STATUS,
+ READ_SUBJECT_LENGTH,
+ READ_SUBJECT,
+ READ_STATUS,
READ_CONTENT_LENGTH,
READ_CONTENT
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 0ab7c31..646a20d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,76 +15,47 @@
*/
package org.onosproject.store.cluster.messaging.impl;
-import com.google.common.base.MoreObjects;
-
-import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.core.HybridLogicalTime;
-import org.onosproject.store.cluster.messaging.Endpoint;
/**
- * Internal message representation with additional attributes
- * for supporting, synchronous request/reply behavior.
+ * Base class for internal messages.
*/
-public final class InternalMessage {
+public abstract class InternalMessage {
/**
- * Message status.
+ * Internal message type.
*/
- public enum Status {
-
- // NOTE: For backwards compatibility enum constant IDs should not be changed.
-
- /**
- * All ok.
- */
- OK(0),
-
- /**
- * Response status signifying no registered handler.
- */
- ERROR_NO_HANDLER(1),
-
- /**
- * Response status signifying an exception handling the message.
- */
- ERROR_HANDLER_EXCEPTION(2),
-
- /**
- * Response status signifying invalid message structure.
- */
- PROTOCOL_EXCEPTION(3);
+ public enum Type {
+ REQUEST(1),
+ REPLY(2);
private final int id;
- Status(int id) {
+ Type(int id) {
this.id = id;
}
/**
- * Returns the unique status ID.
+ * Returns the unique message type ID.
*
- * @return the unique status ID.
+ * @return the unique message type ID.
*/
public int id() {
return id;
}
/**
- * Returns the status enum associated with the given ID.
+ * Returns the message type enum associated with the given ID.
*
- * @param id the status ID.
- * @return the status enum for the given ID.
+ * @param id the type ID.
+ * @return the type enum for the given ID.
*/
- public static Status forId(int id) {
+ public static Type forId(int id) {
switch (id) {
- case 0:
- return OK;
case 1:
- return ERROR_NO_HANDLER;
+ return REQUEST;
case 2:
- return ERROR_HANDLER_EXCEPTION;
- case 3:
- return PROTOCOL_EXCEPTION;
+ return REPLY;
default:
throw new IllegalArgumentException("Unknown status ID " + id);
}
@@ -94,51 +65,26 @@
private final int preamble;
private final HybridLogicalTime time;
private final long id;
- private final Endpoint sender;
- private final String type;
private final byte[] payload;
- private final Status status;
- public InternalMessage(int preamble,
- HybridLogicalTime time,
- long id,
- Endpoint sender,
- String type,
- byte[] payload) {
- this(preamble, time, id, sender, type, payload, null);
- }
-
- public InternalMessage(int preamble,
+ protected InternalMessage(int preamble,
HybridLogicalTime time,
long id,
- Endpoint sender,
- byte[] payload,
- Status status) {
- this(preamble, time, id, sender, "", payload, status);
- }
-
- InternalMessage(int preamble,
- HybridLogicalTime time,
- long id,
- Endpoint sender,
- String type,
- byte[] payload,
- Status status) {
+ byte[] payload) {
this.preamble = preamble;
this.time = time;
this.id = id;
- this.sender = sender;
- this.type = type;
this.payload = payload;
- this.status = status;
}
+ public abstract Type type();
+
public boolean isRequest() {
- return status == null;
+ return type() == Type.REQUEST;
}
public boolean isReply() {
- return status != null;
+ return type() == Type.REPLY;
}
public HybridLogicalTime time() {
@@ -153,31 +99,7 @@
return id;
}
- public String type() {
- return type;
- }
-
- public Endpoint sender() {
- return sender;
- }
-
public byte[] payload() {
return payload;
}
-
- public Status status() {
- return status;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("time", time)
- .add("id", id)
- .add("type", type)
- .add("sender", sender)
- .add("status", status)
- .add("payload", ByteArraySizeHashPrinter.of(payload))
- .toString();
- }
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
new file mode 100644
index 0000000..2030d61
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalReply.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.core.HybridLogicalTime;
+
+/**
+ * Internal reply message.
+ */
+public final class InternalReply extends InternalMessage {
+
+ /**
+ * Message status.
+ */
+ public enum Status {
+
+ // NOTE: For backwards compatibility enum constant IDs should not be changed.
+
+ /**
+ * All ok.
+ */
+ OK(0),
+
+ /**
+ * Response status signifying no registered handler.
+ */
+ ERROR_NO_HANDLER(1),
+
+ /**
+ * Response status signifying an exception handling the message.
+ */
+ ERROR_HANDLER_EXCEPTION(2),
+
+ /**
+ * Response status signifying invalid message structure.
+ */
+ PROTOCOL_EXCEPTION(3);
+
+ private final int id;
+
+ Status(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Returns the unique status ID.
+ *
+ * @return the unique status ID.
+ */
+ public int id() {
+ return id;
+ }
+
+ /**
+ * Returns the status enum associated with the given ID.
+ *
+ * @param id the status ID.
+ * @return the status enum for the given ID.
+ */
+ public static Status forId(int id) {
+ switch (id) {
+ case 0:
+ return OK;
+ case 1:
+ return ERROR_NO_HANDLER;
+ case 2:
+ return ERROR_HANDLER_EXCEPTION;
+ case 3:
+ return PROTOCOL_EXCEPTION;
+ default:
+ throw new IllegalArgumentException("Unknown status ID " + id);
+ }
+ }
+ }
+
+ private final Status status;
+
+ public InternalReply(int preamble,
+ HybridLogicalTime time,
+ long id,
+ Status status) {
+ this(preamble, time, id, new byte[0], status);
+ }
+
+ public InternalReply(int preamble,
+ HybridLogicalTime time,
+ long id,
+ byte[] payload,
+ Status status) {
+ super(preamble, time, id, payload);
+ this.status = status;
+ }
+
+ @Override
+ public Type type() {
+ return Type.REPLY;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("time", time())
+ .add("id", id())
+ .add("status", status())
+ .add("payload", ByteArraySizeHashPrinter.of(payload()))
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
new file mode 100644
index 0000000..81a74c9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging.impl;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.util.ByteArraySizeHashPrinter;
+import org.onosproject.core.HybridLogicalTime;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+/**
+ * Internal request message.
+ */
+public final class InternalRequest extends InternalMessage {
+ private final Endpoint sender;
+ private final String subject;
+
+ public InternalRequest(int preamble,
+ HybridLogicalTime time,
+ long id,
+ Endpoint sender,
+ String subject,
+ byte[] payload) {
+ super(preamble, time, id, payload);
+ this.sender = sender;
+ this.subject = subject;
+ }
+
+ @Override
+ public Type type() {
+ return Type.REQUEST;
+ }
+
+ public String subject() {
+ return subject;
+ }
+
+ public Endpoint sender() {
+ return sender;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("time", time())
+ .add("id", id())
+ .add("subject", subject)
+ .add("sender", sender)
+ .add("payload", ByteArraySizeHashPrinter.of(payload()))
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 367193a..6b5450d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -25,7 +25,6 @@
import org.onlab.packet.IpAddress.Version;
import org.onosproject.core.HybridLogicalTime;
import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,20 +39,23 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private long logicalTime;
- private long logicalCounter;
- private long messageId;
- private int preamble;
private Version ipVersion;
private IpAddress senderIp;
private int senderPort;
- private int messageTypeLength;
- private String messageType;
- private Status status;
+
+ private InternalMessage.Type type;
+ private int preamble;
+ private long logicalTime;
+ private long logicalCounter;
+ private long messageId;
private int contentLength;
+ private byte[] content;
+ private int subjectLength;
+ private String subject;
+ private InternalReply.Status status;
public MessageDecoder() {
- super(DecoderState.READ_MESSAGE_PREAMBLE);
+ super(DecoderState.READ_SENDER_IP_VERSION);
}
@Override
@@ -64,69 +66,100 @@
List<Object> out) throws Exception {
switch (state()) {
- case READ_MESSAGE_PREAMBLE:
- preamble = buffer.readInt();
- checkpoint(DecoderState.READ_LOGICAL_TIME);
- case READ_LOGICAL_TIME:
- logicalTime = buffer.readLong();
- checkpoint(DecoderState.READ_LOGICAL_COUNTER);
- case READ_LOGICAL_COUNTER:
- logicalCounter = buffer.readLong();
- checkpoint(DecoderState.READ_MESSAGE_ID);
- case READ_MESSAGE_ID:
- messageId = buffer.readLong();
- checkpoint(DecoderState.READ_SENDER_IP_VERSION);
- case READ_SENDER_IP_VERSION:
- ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
- checkpoint(DecoderState.READ_SENDER_IP);
- case READ_SENDER_IP:
- byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
- buffer.readBytes(octets);
- senderIp = IpAddress.valueOf(ipVersion, octets);
- checkpoint(DecoderState.READ_SENDER_PORT);
- case READ_SENDER_PORT:
- senderPort = buffer.readInt();
- checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
- case READ_MESSAGE_TYPE_LENGTH:
- messageTypeLength = buffer.readShort();
- checkpoint(DecoderState.READ_MESSAGE_TYPE);
- case READ_MESSAGE_TYPE:
- byte[] messageTypeBytes = new byte[messageTypeLength];
- buffer.readBytes(messageTypeBytes);
- messageType = new String(messageTypeBytes, Charsets.UTF_8);
- checkpoint(DecoderState.READ_MESSAGE_STATUS);
- case READ_MESSAGE_STATUS:
- int statusId = buffer.readByte();
- if (statusId == -1) {
- status = null;
- } else {
- status = Status.forId(statusId);
- }
- checkpoint(DecoderState.READ_CONTENT_LENGTH);
- case READ_CONTENT_LENGTH:
- contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_CONTENT);
- case READ_CONTENT:
- byte[] payload;
- if (contentLength > 0) {
- //TODO Perform a sanity check on the size before allocating
- payload = new byte[contentLength];
- buffer.readBytes(payload);
- } else {
- payload = new byte[0];
- }
- InternalMessage message = new InternalMessage(preamble,
- new HybridLogicalTime(logicalTime, logicalCounter),
- messageId,
- new Endpoint(senderIp, senderPort),
- messageType,
- payload,
- status);
- out.add(message);
- checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
- break;
- default:
- checkState(false, "Must not be here");
+ case READ_SENDER_IP_VERSION:
+ ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+ checkpoint(DecoderState.READ_SENDER_IP);
+ case READ_SENDER_IP:
+ byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octets);
+ senderIp = IpAddress.valueOf(ipVersion, octets);
+ checkpoint(DecoderState.READ_SENDER_PORT);
+ case READ_SENDER_PORT:
+ senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_TYPE);
+ case READ_TYPE:
+ type = InternalMessage.Type.forId(buffer.readByte());
+ checkpoint(DecoderState.READ_PREAMBLE);
+ case READ_PREAMBLE:
+ preamble = buffer.readInt();
+ checkpoint(DecoderState.READ_LOGICAL_TIME);
+ case READ_LOGICAL_TIME:
+ logicalTime = buffer.readLong();
+ checkpoint(DecoderState.READ_LOGICAL_COUNTER);
+ case READ_LOGICAL_COUNTER:
+ logicalCounter = buffer.readLong();
+ checkpoint(DecoderState.READ_MESSAGE_ID);
+ case READ_MESSAGE_ID:
+ messageId = buffer.readLong();
+ checkpoint(DecoderState.READ_CONTENT_LENGTH);
+ case READ_CONTENT_LENGTH:
+ contentLength = buffer.readInt();
+ checkpoint(DecoderState.READ_CONTENT);
+ case READ_CONTENT:
+ if (contentLength > 0) {
+ //TODO Perform a sanity check on the size before allocating
+ content = new byte[contentLength];
+ buffer.readBytes(content);
+ } else {
+ content = new byte[0];
+ }
+
+ switch (type) {
+ case REQUEST:
+ checkpoint(DecoderState.READ_SUBJECT_LENGTH);
+ break;
+ case REPLY:
+ checkpoint(DecoderState.READ_STATUS);
+ break;
+ default:
+ checkState(false, "Must not be here");
+ }
+ break;
+ default:
+ break;
+ }
+
+ switch (type) {
+ case REQUEST:
+ switch (state()) {
+ case READ_SUBJECT_LENGTH:
+ subjectLength = buffer.readShort();
+ checkpoint(DecoderState.READ_SUBJECT);
+ case READ_SUBJECT:
+ byte[] messageTypeBytes = new byte[subjectLength];
+ buffer.readBytes(messageTypeBytes);
+ subject = new String(messageTypeBytes, Charsets.UTF_8);
+ InternalRequest message = new InternalRequest(preamble,
+ new HybridLogicalTime(logicalTime, logicalCounter),
+ messageId,
+ new Endpoint(senderIp, senderPort),
+ subject,
+ content);
+ out.add(message);
+ checkpoint(DecoderState.READ_TYPE);
+ break;
+ default:
+ break;
+ }
+ break;
+ case REPLY:
+ switch (state()) {
+ case READ_STATUS:
+ status = InternalReply.Status.forId(buffer.readByte());
+ InternalReply message = new InternalReply(preamble,
+ new HybridLogicalTime(logicalTime, logicalCounter),
+ messageId,
+ content,
+ status);
+ out.add(message);
+ checkpoint(DecoderState.READ_TYPE);
+ break;
+ default:
+ break;
+ }
+ break;
+ default:
+ checkState(false, "Must not be here");
}
}
@@ -135,4 +168,4 @@
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index 6ec50a3..b7513a0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -15,9 +15,10 @@
*/
package org.onosproject.store.cluster.messaging.impl;
+import java.io.IOException;
+
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.onlab.packet.IpAddress;
@@ -26,34 +27,55 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/**
* Encode InternalMessage out into a byte buffer.
*/
-@Sharable
public class MessageEncoder extends MessageToByteEncoder<Object> {
// Effectively MessageToByteEncoder<InternalMessage>,
// had to specify <Object> to avoid Class Loader not being able to find some classes.
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Endpoint endpoint;
private final int preamble;
+ private boolean endpointWritten;
- public MessageEncoder(int preamble) {
+ public MessageEncoder(Endpoint endpoint, int preamble) {
super();
+ this.endpoint = endpoint;
this.preamble = preamble;
}
-
@Override
protected void encode(
ChannelHandlerContext context,
Object rawMessage,
ByteBuf out) throws Exception {
+ if (rawMessage instanceof InternalRequest) {
+ encodeRequest((InternalRequest) rawMessage, out);
+ } else if (rawMessage instanceof InternalReply) {
+ encodeReply((InternalReply) rawMessage, out);
+ }
+ }
- InternalMessage message = (InternalMessage) rawMessage;
+ private void encodeMessage(InternalMessage message, ByteBuf out) {
+ // If the endpoint hasn't been written to the channel, write it.
+ if (!endpointWritten) {
+ IpAddress senderIp = endpoint.host();
+ if (senderIp.version() == Version.INET) {
+ out.writeByte(0);
+ } else {
+ out.writeByte(1);
+ }
+ out.writeBytes(senderIp.toOctets());
+ // write sender port
+ out.writeInt(endpoint.port());
+
+ endpointWritten = true;
+ }
+
+ out.writeByte(message.type().id());
out.writeInt(this.preamble);
// write time
@@ -63,35 +85,6 @@
// write message id
out.writeLong(message.id());
- Endpoint sender = message.sender();
-
- IpAddress senderIp = sender.host();
- if (senderIp.version() == Version.INET) {
- out.writeByte(0);
- } else {
- out.writeByte(1);
- }
- out.writeBytes(senderIp.toOctets());
-
- // write sender port
- out.writeInt(sender.port());
-
- byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
-
- // write length of message type
- out.writeShort(messageTypeBytes.length);
-
- // write message type bytes
- out.writeBytes(messageTypeBytes);
-
- // write message status value
- InternalMessage.Status status = message.status();
- if (status == null) {
- out.writeByte(-1);
- } else {
- out.writeByte(status.id());
- }
-
byte[] payload = message.payload();
// write payload length
@@ -101,6 +94,26 @@
out.writeBytes(payload);
}
+ private void encodeRequest(InternalRequest request, ByteBuf out) {
+ encodeMessage(request, out);
+
+ byte[] messageTypeBytes = request.subject().getBytes(Charsets.UTF_8);
+
+ // write length of message type
+ out.writeShort(messageTypeBytes.length);
+
+ // write message type bytes
+ out.writeBytes(messageTypeBytes);
+
+ }
+
+ private void encodeReply(InternalReply reply, ByteBuf out) {
+ encodeMessage(reply, out);
+
+ // write message status value
+ out.writeByte(reply.status().id());
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
if (cause instanceof IOException) {
@@ -116,4 +129,4 @@
public final boolean acceptOutboundMessage(Object msg) throws Exception {
return msg instanceof InternalMessage;
}
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index d9ed9c3..f77fa69 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -90,7 +90,6 @@
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
-import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +131,7 @@
private Endpoint localEndpoint;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
- private final Map<String, BiConsumer<InternalMessage, ServerConnection>> handlers = new ConcurrentHashMap<>();
+ private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<>();
private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -296,7 +295,7 @@
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
checkPermission(CLUSTER_WRITE);
- InternalMessage message = new InternalMessage(preamble,
+ InternalRequest message = new InternalRequest(preamble,
clockService.timeNow(),
messageIdGenerator.incrementAndGet(),
localEndpoint,
@@ -315,7 +314,7 @@
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
checkPermission(CLUSTER_WRITE);
Long messageId = messageIdGenerator.incrementAndGet();
- InternalMessage message = new InternalMessage(preamble,
+ InternalRequest message = new InternalRequest(preamble,
clockService.timeNow(),
messageId,
localEndpoint,
@@ -444,12 +443,12 @@
checkPermission(CLUSTER_WRITE);
handlers.put(type, (message, connection) -> executor.execute(() -> {
byte[] responsePayload = null;
- Status status = Status.OK;
+ InternalReply.Status status = InternalReply.Status.OK;
try {
responsePayload = handler.apply(message.sender(), message.payload());
} catch (Exception e) {
log.debug("An error occurred in a message handler: {}", e);
- status = Status.ERROR_HANDLER_EXCEPTION;
+ status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
}
connection.reply(message, status, Optional.ofNullable(responsePayload));
}));
@@ -460,12 +459,12 @@
checkPermission(CLUSTER_WRITE);
handlers.put(type, (message, connection) -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- Status status;
+ InternalReply.Status status;
if (error == null) {
- status = Status.OK;
+ status = InternalReply.Status.OK;
} else {
log.debug("An error occurred in a message handler: {}", error);
- status = Status.ERROR_HANDLER_EXCEPTION;
+ status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
}
connection.reply(message, status, Optional.ofNullable(result));
});
@@ -549,7 +548,6 @@
*/
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -565,7 +563,7 @@
serverSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -576,7 +574,6 @@
*/
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -591,7 +588,7 @@
clientSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -602,12 +599,11 @@
*/
private class BasicChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast("encoder", encoder)
+ .addLast("encoder", new MessageEncoder(localEndpoint, preamble))
.addLast("decoder", new MessageDecoder())
.addLast("handler", dispatcher);
}
@@ -628,11 +624,11 @@
if (message.isRequest()) {
RemoteServerConnection connection =
serverConnections.computeIfAbsent(ctx.channel(), RemoteServerConnection::new);
- connection.dispatch(message);
+ connection.dispatch((InternalRequest) message);
} else {
RemoteClientConnection connection =
clientConnections.computeIfAbsent(ctx.channel(), RemoteClientConnection::new);
- connection.dispatch(message);
+ connection.dispatch((InternalReply) message);
}
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
@@ -702,7 +698,7 @@
* @param message the message to send
* @return a completable future to be completed once the message has been sent
*/
- CompletableFuture<Void> sendAsync(InternalMessage message);
+ CompletableFuture<Void> sendAsync(InternalRequest message);
/**
* Sends a message to the other side of the connection, awaiting a reply.
@@ -710,7 +706,7 @@
* @param message the message to send
* @return a completable future to be completed once a reply is received or the request times out
*/
- CompletableFuture<byte[]> sendAndReceive(InternalMessage message);
+ CompletableFuture<byte[]> sendAndReceive(InternalRequest message);
/**
* Closes the connection.
@@ -731,7 +727,7 @@
* @param status the reply status
* @param payload the response payload
*/
- void reply(InternalMessage message, Status status, Optional<byte[]> payload);
+ void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
/**
* Closes the connection.
@@ -745,8 +741,8 @@
*/
private final class LocalClientConnection implements ClientConnection {
@Override
- public CompletableFuture<Void> sendAsync(InternalMessage message) {
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ public CompletableFuture<Void> sendAsync(InternalRequest message) {
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, localServerConnection);
} else {
@@ -756,14 +752,15 @@
}
@Override
- public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, new LocalServerConnection(future));
} else {
log.debug("No handler for message type {} from {}", message.type(), message.sender());
- new LocalServerConnection(future).reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ new LocalServerConnection(future)
+ .reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
}
return future;
}
@@ -780,15 +777,15 @@
}
@Override
- public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
+ public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
if (future != null) {
- if (status == Status.OK) {
+ if (status == InternalReply.Status.OK) {
future.complete(payload.orElse(EMPTY_PAYLOAD));
- } else if (status == Status.ERROR_NO_HANDLER) {
+ } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
future.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (status == Status.ERROR_HANDLER_EXCEPTION) {
+ } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
future.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (status == Status.PROTOCOL_EXCEPTION) {
+ } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
future.completeExceptionally(new MessagingException.ProtocolException());
}
}
@@ -844,7 +841,7 @@
}
@Override
- public CompletableFuture<Void> sendAsync(InternalMessage message) {
+ public CompletableFuture<Void> sendAsync(InternalRequest message) {
CompletableFuture<Void> future = new CompletableFuture<>();
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
@@ -857,9 +854,9 @@
}
@Override
- public CompletableFuture<byte[]> sendAndReceive(InternalMessage message) {
+ public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
CompletableFuture<byte[]> future = new CompletableFuture<>();
- Callback callback = new Callback(message.type(), future);
+ Callback callback = new Callback(message.subject(), future);
futures.put(message.id(), callback);
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
@@ -875,9 +872,9 @@
*
* @param message the message to dispatch
*/
- private void dispatch(InternalMessage message) {
+ private void dispatch(InternalReply message) {
if (message.preamble() != preamble) {
- log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
+ log.debug("Received {} with invalid preamble", message.type());
return;
}
@@ -885,13 +882,13 @@
Callback callback = futures.remove(message.id());
if (callback != null) {
- if (message.status() == Status.OK) {
+ if (message.status() == InternalReply.Status.OK) {
callback.complete(message.payload());
- } else if (message.status() == Status.ERROR_NO_HANDLER) {
+ } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
callback.completeExceptionally(new MessagingException.NoRemoteHandler());
- } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+ } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
- } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
+ } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
callback.completeExceptionally(new MessagingException.ProtocolException());
}
@@ -902,9 +899,9 @@
throw new AssertionError();
}
} else {
- log.debug("Received a reply for message id:[{}]. "
- + " from {}. But was unable to locate the"
- + " request handle", message.id(), message.sender());
+ log.debug("Received a reply for message id:[{}] "
+ + "but was unable to locate the"
+ + " request handle", message.id());
}
}
@@ -934,30 +931,29 @@
*
* @param message the message to dispatch
*/
- private void dispatch(InternalMessage message) {
+ private void dispatch(InternalRequest message) {
if (message.preamble() != preamble) {
log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
- reply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
+ reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
return;
}
clockService.recordEventTime(message.time());
- BiConsumer<InternalMessage, ServerConnection> handler = handlers.get(message.type());
+ BiConsumer<InternalRequest, ServerConnection> handler = handlers.get(message.subject());
if (handler != null) {
handler.accept(message, this);
} else {
log.debug("No handler for message type {} from {}", message.type(), message.sender());
- reply(message, Status.ERROR_NO_HANDLER, Optional.empty());
+ reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
}
}
@Override
- public void reply(InternalMessage message, Status status, Optional<byte[]> payload) {
- InternalMessage response = new InternalMessage(preamble,
+ public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
+ InternalReply response = new InternalReply(preamble,
clockService.timeNow(),
message.id(),
- localEndpoint,
payload.orElse(EMPTY_PAYLOAD),
status);
channel.writeAndFlush(response);