[ONOS-5992] Ensure Copycat connections are closed when no remote handler is registered
Change-Id: Iec17fd09f0d715dbbe08c604057aeb00d677b939
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index eebbf9c..b8596ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,6 +21,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -125,16 +126,7 @@
remoteSubject,
baos.toByteArray(),
context.executor())
- .whenComplete((r, e) -> {
- Throwable wrappedError = e;
- if (e != null) {
- Throwable rootCause = Throwables.getRootCause(e);
- if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
- wrappedError = new TransportException(e);
- }
- }
- handleResponse(r, wrappedError, future);
- });
+ .whenComplete((response, error) -> handleResponse(response, error, future));
} catch (SerializationException | IOException e) {
future.completeExceptionally(e);
}
@@ -149,9 +141,18 @@
Throwable error,
CompletableFuture<T> future) {
if (error != null) {
- future.completeExceptionally(error);
+ Throwable rootCause = Throwables.getRootCause(error);
+ if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+ future.completeExceptionally(new TransportException(error));
+ if (rootCause instanceof MessagingException.NoRemoteHandler) {
+ close(rootCause);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
return;
}
+
checkNotNull(response);
InputStream input = new ByteArrayInputStream(response);
try {
@@ -227,7 +228,7 @@
private CompletableFuture<byte[]> handleClose() {
CompletableFuture<byte[]> future = new CompletableFuture<>();
context.executor().execute(() -> {
- cleanup();
+ close(null);
ByteBuffer responseBuffer = ByteBuffer.allocate(1);
responseBuffer.put(SUCCESS);
future.complete(responseBuffer.array());
@@ -273,7 +274,7 @@
CompletableFuture<Void> future = new CompletableFuture<>();
messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
.whenComplete((payload, error) -> {
- cleanup();
+ close(error);
Throwable wrappedError = error;
if (error != null) {
Throwable rootCause = Throwables.getRootCause(error);
@@ -296,9 +297,12 @@
/**
* Cleans up the connection, unregistering handlers registered on the MessagingService.
*/
- private void cleanup() {
+ private void close(Throwable error) {
log.debug("Connection {}-{} closed", partitionId, connectionId);
messagingService.unregisterHandler(localSubject);
+ if (error != null) {
+ exceptionListeners.accept(error);
+ }
closeListeners.accept(this);
}