[Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur

Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 72ba2ea..2f883e1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -16,12 +16,12 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.base.Strings;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -41,6 +41,7 @@
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.felix.scr.annotations.Activate;
@@ -53,7 +54,9 @@
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,10 +64,12 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManagerFactory;
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.KeyStore;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -267,18 +272,14 @@
     @Override
     public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
         handlers.put(type, message -> executor.execute(() -> {
-            byte[] responsePayload = handler.apply(message.sender(), message.payload());
-            if (responsePayload != null) {
-                InternalMessage response = new InternalMessage(message.id(),
-                                                               localEp,
-                                                               REPLY_MESSAGE_TYPE,
-                                                               responsePayload);
-                sendAsync(message.sender(), response).whenComplete((result, error) -> {
-                    if (error != null) {
-                        log.debug("Failed to respond", error);
-                    }
-                });
+            byte[] responsePayload = null;
+            Status status = Status.OK;
+            try {
+                responsePayload = handler.apply(message.sender(), message.payload());
+            } catch (Exception e) {
+                status = Status.ERROR_HANDLER_EXCEPTION;
             }
+            sendReply(message, status, Optional.ofNullable(responsePayload));
         }));
     }
 
@@ -286,17 +287,8 @@
     public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
         handlers.put(type, message -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                if (error == null) {
-                    InternalMessage response = new InternalMessage(message.id(),
-                                                                   localEp,
-                                                                   REPLY_MESSAGE_TYPE,
-                                                                   result);
-                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
-                        if (e != null) {
-                            log.debug("Failed to respond", e);
-                        }
-                    });
-                }
+                Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+                sendReply(message, status, Optional.ofNullable(result));
             });
         });
     }
@@ -500,9 +492,15 @@
                 Callback callback =
                         callbacks.getIfPresent(message.id());
                 if (callback != null) {
-                    callback.complete(message.payload());
+                    if (message.status() == Status.OK) {
+                        callback.complete(message.payload());
+                    } else if (message.status() == Status.ERROR_NO_HANDLER) {
+                        callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+                    } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+                        callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                    }
                 } else {
-                    log.warn("Received a reply for message id:[{}]. "
+                    log.debug("Received a reply for message id:[{}]. "
                                      + " from {}. But was unable to locate the"
                                      + " request handle", message.id(), message.sender());
                 }
@@ -515,10 +513,24 @@
         if (handler != null) {
             handler.accept(message);
         } else {
-            log.debug("No handler registered for {}", type);
+            log.debug("No handler for message type {}", message.type(), message.sender());
+            sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
         }
     }
 
+    private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
+        InternalMessage response = new InternalMessage(message.id(),
+                localEp,
+                REPLY_MESSAGE_TYPE,
+                responsePayload.orElse(new byte[0]),
+                status);
+        sendAsync(message.sender(), response).whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("Failed to respond", error);
+            }
+        });
+    }
+
     private final class Callback {
         private final CompletableFuture<byte[]> future;
         private final Executor executor;