[Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur
Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
new file mode 100644
index 0000000..e71ac10
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import java.io.IOException;
+
+/**
+ * Top level exception for MessagingService failures.
+ */
+@SuppressWarnings("serial")
+public class MessagingException extends IOException {
+
+ public MessagingException() {
+ }
+
+ public MessagingException(String message) {
+ super(message);
+ }
+
+ public MessagingException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public MessagingException(Throwable t) {
+ super(t);
+ }
+
+ /**
+ * Exception indicating no remote registered remote handler.
+ */
+ public static class NoRemoteHandler extends MessagingException {
+ }
+
+ /**
+ * Exception indicating handler failure.
+ */
+ public static class RemoteHandlerFailure extends MessagingException {
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index e113a3f..608b4e0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -26,6 +26,7 @@
READ_SENDER_PORT,
READ_MESSAGE_TYPE_LENGTH,
READ_MESSAGE_TYPE,
+ READ_MESSAGE_STATUS,
READ_CONTENT_LENGTH,
READ_CONTENT
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 9deec66..e02ecc8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -25,16 +25,46 @@
*/
public final class InternalMessage {
+ /**
+ * Message status.
+ */
+ public enum Status {
+ /**
+ * All ok.
+ */
+ OK,
+
+ /**
+ * Response status signifying no registered handler.
+ */
+ ERROR_NO_HANDLER,
+
+ /**
+ * Response status signifying an exception handling the message.
+ */
+ ERROR_HANDLER_EXCEPTION
+
+ // NOTE: For backwards compatibility it important that new enum constants
+ // be appended.
+ // FIXME: We should remove this restriction in the future.
+ }
+
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
+ private final Status status;
public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+ this(id, sender, type, payload, Status.OK);
+ }
+
+ public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) {
this.id = id;
this.sender = sender;
this.type = type;
this.payload = payload;
+ this.status = status;
}
public long id() {
@@ -53,12 +83,17 @@
return payload;
}
+ public Status status() {
+ return status;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("type", type)
.add("sender", sender)
+ .add("status", status)
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 149b706..aea3e29 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -16,12 +16,15 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Charsets;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
+
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +47,7 @@
private int senderPort;
private int messageTypeLength;
private String messageType;
+ private Status status;
private int contentLength;
public MessageDecoder(int correctPreamble) {
@@ -86,18 +90,27 @@
byte[] messageTypeBytes = new byte[messageTypeLength];
buffer.readBytes(messageTypeBytes);
messageType = new String(messageTypeBytes, Charsets.UTF_8);
+ checkpoint(DecoderState.READ_MESSAGE_STATUS);
+ case READ_MESSAGE_STATUS:
+ status = Status.values()[buffer.readInt()];
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- //TODO Perform a sanity check on the size before allocating
- byte[] payload = new byte[contentLength];
- buffer.readBytes(payload);
+ byte[] payload;
+ if (contentLength > 0) {
+ //TODO Perform a sanity check on the size before allocating
+ payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ } else {
+ payload = new byte[0];
+ }
InternalMessage message = new InternalMessage(messageId,
new Endpoint(senderIp, senderPort),
messageType,
- payload);
+ payload,
+ status);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index 48c75dd..4611554 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -75,6 +75,9 @@
// write message type bytes
out.writeBytes(messageTypeBytes);
+ // write message status value
+ out.writeInt(message.status().ordinal());
+
byte[] payload = message.payload();
// write payload length
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 72ba2ea..2f883e1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -16,12 +16,12 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Strings;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -41,6 +41,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
@@ -53,7 +54,9 @@
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +64,12 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -267,18 +272,14 @@
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.sender(), message.payload());
- if (responsePayload != null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to respond", error);
- }
- });
+ byte[] responsePayload = null;
+ Status status = Status.OK;
+ try {
+ responsePayload = handler.apply(message.sender(), message.payload());
+ } catch (Exception e) {
+ status = Status.ERROR_HANDLER_EXCEPTION;
}
+ sendReply(message, status, Optional.ofNullable(responsePayload));
}));
}
@@ -286,17 +287,8 @@
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
+ Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+ sendReply(message, status, Optional.ofNullable(result));
});
});
}
@@ -500,9 +492,15 @@
Callback callback =
callbacks.getIfPresent(message.id());
if (callback != null) {
- callback.complete(message.payload());
+ if (message.status() == Status.OK) {
+ callback.complete(message.payload());
+ } else if (message.status() == Status.ERROR_NO_HANDLER) {
+ callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+ } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
} else {
- log.warn("Received a reply for message id:[{}]. "
+ log.debug("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
@@ -515,10 +513,24 @@
if (handler != null) {
handler.accept(message);
} else {
- log.debug("No handler registered for {}", type);
+ log.debug("No handler for message type {}", message.type(), message.sender());
+ sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
}
}
+ private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload.orElse(new byte[0]),
+ status);
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
+ }
+
private final class Callback {
private final CompletableFuture<byte[]> future;
private final Executor executor;