Prevent serialization errors from causing recursion in the Copycat transport
Change-Id: I0a1b0737d6cda3d7ab63bb26a7547d2f9124a434
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index fae957a..f3b028c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -303,6 +303,7 @@
try {
responsePayload = handler.apply(message.sender(), message.payload());
} catch (Exception e) {
+ log.debug("An error occurred in a message handler: {}", e);
status = Status.ERROR_HANDLER_EXCEPTION;
}
sendReply(message, status, Optional.ofNullable(responsePayload));
@@ -314,7 +315,13 @@
checkPermission(CLUSTER_WRITE);
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
- Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
+ Status status;
+ if (error == null) {
+ status = Status.OK;
+ } else {
+ log.debug("An error occurred in a message handler: {}", error);
+ status = Status.ERROR_HANDLER_EXCEPTION;
+ }
sendReply(message, status, Optional.ofNullable(result));
});
});
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index b8596ae..a3a8539 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -56,6 +56,8 @@
* Base Copycat Transport connection.
*/
public class CopycatTransportConnection implements Connection {
+ private static final int MAX_MESSAGE_SIZE = 1024 * 1024;
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final long connectionId;
private final String localSubject;
@@ -97,7 +99,11 @@
((ReferenceCounted<?>) message).release();
}
- messagingService.sendAsync(endpoint, remoteSubject, baos.toByteArray())
+ byte[] bytes = baos.toByteArray();
+ if (bytes.length > MAX_MESSAGE_SIZE) {
+ throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
+ }
+ messagingService.sendAsync(endpoint, remoteSubject, bytes)
.whenComplete((r, e) -> {
if (e != null) {
context.executor().execute(() -> future.completeExceptionally(e));
@@ -122,9 +128,14 @@
if (message instanceof ReferenceCounted) {
((ReferenceCounted<?>) message).release();
}
+
+ byte[] bytes = baos.toByteArray();
+ if (bytes.length > MAX_MESSAGE_SIZE) {
+ throw new IllegalArgumentException(message + " exceeds maximum message size " + MAX_MESSAGE_SIZE);
+ }
messagingService.sendAndReceive(endpoint,
remoteSubject,
- baos.toByteArray(),
+ bytes,
context.executor())
.whenComplete((response, error) -> handleResponse(response, error, future));
} catch (SerializationException | IOException e) {
@@ -142,11 +153,11 @@
CompletableFuture<T> future) {
if (error != null) {
Throwable rootCause = Throwables.getRootCause(error);
- if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+ if (rootCause instanceof MessagingException.NoRemoteHandler) {
future.completeExceptionally(new TransportException(error));
- if (rootCause instanceof MessagingException.NoRemoteHandler) {
- close(rootCause);
- }
+ close(rootCause);
+ } else if (rootCause instanceof SocketException) {
+ future.completeExceptionally(new TransportException(error));
} else {
future.completeExceptionally(error);
}
@@ -211,7 +222,11 @@
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
baos.write(error != null ? FAILURE : SUCCESS);
context.serializer().writeObject(error != null ? error : result, baos);
- return baos.toByteArray();
+ byte[] bytes = baos.toByteArray();
+ if (bytes.length > MAX_MESSAGE_SIZE) {
+ throw new IllegalArgumentException("response exceeds maximum message size " + MAX_MESSAGE_SIZE);
+ }
+ return bytes;
} catch (IOException e) {
Throwables.propagate(e);
return null;
@@ -278,7 +293,7 @@
Throwable wrappedError = error;
if (error != null) {
Throwable rootCause = Throwables.getRootCause(error);
- if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
+ if (rootCause instanceof MessagingException.NoRemoteHandler) {
wrappedError = new TransportException(error);
}
future.completeExceptionally(wrappedError);