Prevent serialization errors from causing recursion in the Copycat transport

Change-Id: I0a1b0737d6cda3d7ab63bb26a7547d2f9124a434
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index 4ad5f09..339695f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -302,6 +302,7 @@
             try {
                 responsePayload = handler.apply(message.sender(), message.payload());
             } catch (Exception e) {
+                log.debug("An error occurred in a message handler: {}", e);
                 status = Status.ERROR_HANDLER_EXCEPTION;
             }
             sendReply(message, status, Optional.ofNullable(responsePayload));
@@ -313,7 +314,13 @@
         checkPermission(CLUSTER_WRITE);
         handlers.put(type, message -> {
             handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
-                Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+                Status status;
+                if (error == null) {
+                    status = Status.OK;
+                } else {
+                    log.debug("An error occurred in a message handler: {}", error);
+                    status = Status.ERROR_HANDLER_EXCEPTION;
+                }
                 sendReply(message, status, Optional.ofNullable(result));
             });
         });
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index fb56ad4..de7bb7a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -59,6 +59,8 @@
  * Base Copycat Transport connection.
  */
 public class CopycatTransportConnection implements Connection {
+    private static final int MAX_MESSAGE_SIZE = 1024 * 1024;
+
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final long connectionId;
     private final String localSubject;
@@ -100,7 +102,11 @@
                 ((ReferenceCounted<?>) message).release();
             }
 
-            messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray())
+            byte[] bytes = baos.toByteArray();
+            if (bytes.length > MAX_MESSAGE_SIZE) {
+                throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
+            }
+            messagingService.sendAsync(endpoint, remoteSubject, bytes)
                     .whenComplete((r, e) -> {
                         if (e != null) {
                             context.executor().execute(() -> future.completeExceptionally(e));
@@ -125,9 +131,14 @@
             if (message instanceof ReferenceCounted) {
                 ((ReferenceCounted<?>) message).release();
             }
+
+            byte[] bytes = baos.toByteArray();
+            if (bytes.length > MAX_MESSAGE_SIZE) {
+                throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
+            }
             messagingService.sendAndReceive(endpoint,
                                             remoteSubject,
-                                            baos.toByteArray(),
+                                            bytes,
                                             context.executor())
                     .whenComplete((response, error) -> handleResponse(response, error, future));
         } catch (SerializationException | IOException e) {
@@ -145,11 +156,11 @@
             CompletableFuture<T> future) {
         if (error != null) {
             Throwable rootCause = Throwables.getRootCause(error);
-            if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+            if (rootCause instanceof MessagingException.NoRemoteHandler) {
                 future.completeExceptionally(new TransportException(error));
-                if (rootCause instanceof MessagingException.NoRemoteHandler) {
-                    close(rootCause);
-                }
+                close(rootCause);
+            } else if (rootCause instanceof SocketException) {
+                future.completeExceptionally(new TransportException(error));
             } else {
                 future.completeExceptionally(error);
             }
@@ -214,7 +225,11 @@
                 try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                     baos.write(error != null ? FAILURE : SUCCESS);
                     context.serializer().writeObject(error != null ? error : result, baos);
-                    return baos.toByteArray();
+                    byte[] bytes = baos.toByteArray();
+                    if (bytes.length > MAX_MESSAGE_SIZE) {
+                        throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE);
+                    }
+                    return bytes;
                 } catch (IOException e) {
                     Throwables.propagate(e);
                     return null;
@@ -281,7 +296,7 @@
                     Throwable wrappedError = error;
                     if (error != null) {
                         Throwable rootCause = Throwables.getRootCause(error);
-                        if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
+                        if (rootCause instanceof MessagingException.NoRemoteHandler) {
                             wrappedError = new TransportException(error);
                         }
                         future.completeExceptionally(wrappedError);