[Falcon] Adds a status field to InternalMessage and support for replying with appropriate status when handler errors occur

Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
new file mode 100644
index 0000000..e71ac10
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingException.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.messaging;
+
+import java.io.IOException;
+
+/**
+ * Top level exception for MessagingService failures.
+ */
+@SuppressWarnings("serial")
+public class MessagingException extends IOException {
+
+    public MessagingException() {
+    }
+
+    public MessagingException(String message) {
+        super(message);
+    }
+
+    public MessagingException(String message, Throwable t) {
+        super(message, t);
+    }
+
+    public MessagingException(Throwable t) {
+        super(t);
+    }
+
+    /**
+     * Exception indicating no remote registered remote handler.
+     */
+    public static class NoRemoteHandler extends MessagingException {
+    }
+
+    /**
+     * Exception indicating handler failure.
+     */
+    public static class RemoteHandlerFailure extends MessagingException {
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
index e113a3f..608b4e0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/DecoderState.java
@@ -26,6 +26,7 @@
     READ_SENDER_PORT,
     READ_MESSAGE_TYPE_LENGTH,
     READ_MESSAGE_TYPE,
+    READ_MESSAGE_STATUS,
     READ_CONTENT_LENGTH,
     READ_CONTENT
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
index 9deec66..e02ecc8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/InternalMessage.java
@@ -25,16 +25,46 @@
  */
 public final class InternalMessage {
 
+    /**
+     * Message status.
+     */
+    public enum Status {
+        /**
+         * All ok.
+         */
+        OK,
+
+        /**
+         * Response status signifying no registered handler.
+         */
+        ERROR_NO_HANDLER,
+
+        /**
+         * Response status signifying an exception handling the message.
+         */
+        ERROR_HANDLER_EXCEPTION
+
+        // NOTE: For backwards compatibility it important that new enum constants
+        // be appended.
+        // FIXME: We should remove this restriction in the future.
+    }
+
     private final long id;
     private final Endpoint sender;
     private final String type;
     private final byte[] payload;
+    private final Status status;
 
     public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+        this(id, sender, type, payload, Status.OK);
+    }
+
+    public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) {
         this.id = id;
         this.sender = sender;
         this.type = type;
         this.payload = payload;
+        this.status = status;
     }
 
     public long id() {
@@ -53,12 +83,17 @@
         return payload;
     }
 
+    public Status status() {
+        return status;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
                 .add("id", id)
                 .add("type", type)
                 .add("sender", sender)
+                .add("status", status)
                 .add("payload", ByteArraySizeHashPrinter.of(payload))
                 .toString();
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
index 149b706..aea3e29 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageDecoder.java
@@ -16,12 +16,15 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.base.Charsets;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
+
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpAddress.Version;
 import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +47,7 @@
     private int senderPort;
     private int messageTypeLength;
     private String messageType;
+    private Status status;
     private int contentLength;
 
     public MessageDecoder(int correctPreamble) {
@@ -86,18 +90,27 @@
             byte[] messageTypeBytes = new byte[messageTypeLength];
             buffer.readBytes(messageTypeBytes);
             messageType = new String(messageTypeBytes, Charsets.UTF_8);
+            checkpoint(DecoderState.READ_MESSAGE_STATUS);
+        case READ_MESSAGE_STATUS:
+            status = Status.values()[buffer.readInt()];
             checkpoint(DecoderState.READ_CONTENT_LENGTH);
         case READ_CONTENT_LENGTH:
             contentLength = buffer.readInt();
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            //TODO Perform a sanity check on the size before allocating
-            byte[] payload = new byte[contentLength];
-            buffer.readBytes(payload);
+            byte[] payload;
+            if (contentLength > 0) {
+                //TODO Perform a sanity check on the size before allocating
+                payload = new byte[contentLength];
+                buffer.readBytes(payload);
+            } else {
+                payload = new byte[0];
+            }
             InternalMessage message = new InternalMessage(messageId,
                                                           new Endpoint(senderIp, senderPort),
                                                           messageType,
-                                                          payload);
+                                                          payload,
+                                                          status);
             out.add(message);
             checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
             break;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
index 48c75dd..4611554 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/MessageEncoder.java
@@ -75,6 +75,9 @@
         // write message type bytes
         out.writeBytes(messageTypeBytes);
 
+        // write message status value
+        out.writeInt(message.status().ordinal());
+
         byte[] payload = message.payload();
 
         // write payload length
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 72ba2ea..2f883e1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -16,12 +16,12 @@
 package org.onosproject.store.cluster.messaging.impl;
 
 import com.google.common.base.Strings;
-
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.MoreExecutors;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -41,6 +41,7 @@
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.felix.scr.annotations.Activate;
@@ -53,7 +54,9 @@
 import org.onosproject.cluster.ClusterMetadataService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,10 +64,12 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManagerFactory;
+
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.KeyStore;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -267,18 +272,14 @@
     @Override
     public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
         handlers.put(type, message -> executor.execute(() -> {
-            byte[] responsePayload = handler.apply(message.sender(), message.payload());
-            if (responsePayload != null) {
-                InternalMessage response = new InternalMessage(message.id(),
-                                                               localEp,
-                                                               REPLY_MESSAGE_TYPE,
-                                                               responsePayload);
-                sendAsync(message.sender(), response).whenComplete((result, error) -> {
-                    if (error != null) {
-                        log.debug("Failed to respond", error);
-                    }
-                });
+            byte[] responsePayload = null;
+            Status status = Status.OK;
+            try {
+                responsePayload = handler.apply(message.sender(), message.payload());
+            } catch (Exception e) {
+                status = Status.ERROR_HANDLER_EXCEPTION;
             }
+            sendReply(message, status, Optional.ofNullable(responsePayload));
         }));
     }
 
@@ -286,17 +287,8 @@
     public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
         handlers.put(type, message -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                if (error == null) {
-                    InternalMessage response = new InternalMessage(message.id(),
-                                                                   localEp,
-                                                                   REPLY_MESSAGE_TYPE,
-                                                                   result);
-                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
-                        if (e != null) {
-                            log.debug("Failed to respond", e);
-                        }
-                    });
-                }
+                Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+                sendReply(message, status, Optional.ofNullable(result));
             });
         });
     }
@@ -500,9 +492,15 @@
                 Callback callback =
                         callbacks.getIfPresent(message.id());
                 if (callback != null) {
-                    callback.complete(message.payload());
+                    if (message.status() == Status.OK) {
+                        callback.complete(message.payload());
+                    } else if (message.status() == Status.ERROR_NO_HANDLER) {
+                        callback.completeExceptionally(new MessagingException.NoRemoteHandler());
+                    } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
+                        callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
+                    }
                 } else {
-                    log.warn("Received a reply for message id:[{}]. "
+                    log.debug("Received a reply for message id:[{}]. "
                                      + " from {}. But was unable to locate the"
                                      + " request handle", message.id(), message.sender());
                 }
@@ -515,10 +513,24 @@
         if (handler != null) {
             handler.accept(message);
         } else {
-            log.debug("No handler registered for {}", type);
+            log.debug("No handler for message type {}", message.type(), message.sender());
+            sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
         }
     }
 
+    private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
+        InternalMessage response = new InternalMessage(message.id(),
+                localEp,
+                REPLY_MESSAGE_TYPE,
+                responsePayload.orElse(new byte[0]),
+                status);
+        sendAsync(message.sender(), response).whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("Failed to respond", error);
+            }
+        });
+    }
+
     private final class Callback {
         private final CompletableFuture<byte[]> future;
         private final Executor executor;