[Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur
Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 72ba2ea..2f883e1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -16,12 +16,12 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Strings;
-
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -41,6 +41,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
@@ -53,7 +54,9 @@
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +64,12 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -267,18 +272,14 @@
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
- byte[] responsePayload = handler.apply(message.sender(), message.payload());
- if (responsePayload != null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- responsePayload);
- sendAsync(message.sender(), response).whenComplete((result, error) -> {
- if (error != null) {
- log.debug("Failed to respond", error);
- }
- });
+ byte[] responsePayload = null;
+ Status status = Status.OK;
+ try {
+ responsePayload = handler.apply(message.sender(), message.payload());
+ } catch (Exception e) {
+ status = Status.ERROR_HANDLER_EXCEPTION;
}
+ sendReply(message, status, Optional.ofNullable(responsePayload));
}));
}
@@ -286,17 +287,8 @@
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- sendAsync(message.sender(), response).whenComplete((r, e) -> {
- if (e != null) {
- log.debug("Failed to respond", e);
- }
- });
- }
+ Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+ sendReply(message, status, Optional.ofNullable(result));
});
});
}
@@ -500,9 +492,15 @@
Callback callback =
callbacks.getIfPresent(message.id());
if (callback != null) {
- callback.complete(message.payload());
+ if (message.status() == Status.OK) {
+ callback.complete(message.payload());
+ } else if (message.status() == Status.ERROR_NO_HANDLER) {
+ callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+ } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+ callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+ }
} else {
- log.warn("Received a reply for message id:[{}]. "
+ log.debug("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
@@ -515,10 +513,24 @@
if (handler != null) {
handler.accept(message);
} else {
- log.debug("No handler registered for {}", type);
+ log.debug("No handler for message type {}", message.type(), message.sender());
+ sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
}
}
+ private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ responsePayload.orElse(new byte[0]),
+ status);
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
+ }
+
private final class Callback {
private final CompletableFuture<byte[]> future;
private final Executor executor;