[ONOS-5992] Ensure Copycat connections are closed when no remote handler is registered
Change-Id: Iec17fd09f0d715dbbe08c604057aeb00d677b939
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index 45cc529..fc94dd6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -27,7 +27,6 @@
import org.onosproject.store.cluster.messaging.MessagingService;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
@@ -41,7 +40,6 @@
private final PartitionId partitionId;
private final MessagingService messagingService;
private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
- private static final Map<Endpoint, Address> ADDRESS_LOOKUP_CACHE = Maps.newConcurrentMap();
static final byte MESSAGE = 0x01;
static final byte CONNECT = 0x02;
@@ -85,22 +83,4 @@
}
});
}
-
- /**
- * Maps {@link Endpoint endpoint} to {@link Address address}.
- * @param endpoint end point
- * @return address
- */
- static Address toAddress(Endpoint endpoint) {
- return ADDRESS_LOOKUP_CACHE.computeIfAbsent(endpoint, ep -> {
- try {
- InetAddress host = InetAddress.getByAddress(endpoint.host().toOctets());
- int port = endpoint.port();
- return new Address(new InetSocketAddress(host, port));
- } catch (UnknownHostException e) {
- Throwables.propagate(e);
- return null;
- }
- });
- }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index eebbf9c..b8596ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,6 +21,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -125,16 +126,7 @@
remoteSubject,
baos.toByteArray(),
context.executor())
- .whenComplete((r, e) -> {
- Throwable wrappedError = e;
- if (e != null) {
- Throwable rootCause = Throwables.getRootCause(e);
- if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
- wrappedError = new TransportException(e);
- }
- }
- handleResponse(r, wrappedError, future);
- });
+ .whenComplete((response, error) -> handleResponse(response, error, future));
} catch (SerializationException | IOException e) {
future.completeExceptionally(e);
}
@@ -149,9 +141,18 @@
Throwable error,
CompletableFuture<T> future) {
if (error != null) {
- future.completeExceptionally(error);
+ Throwable rootCause = Throwables.getRootCause(error);
+ if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+ future.completeExceptionally(new TransportException(error));
+ if (rootCause instanceof MessagingException.NoRemoteHandler) {
+ close(rootCause);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
return;
}
+
checkNotNull(response);
InputStream input = new ByteArrayInputStream(response);
try {
@@ -227,7 +228,7 @@
private CompletableFuture<byte[]> handleClose() {
CompletableFuture<byte[]> future = new CompletableFuture<>();
context.executor().execute(() -> {
- cleanup();
+ close(null);
ByteBuffer responseBuffer = ByteBuffer.allocate(1);
responseBuffer.put(SUCCESS);
future.complete(responseBuffer.array());
@@ -273,7 +274,7 @@
CompletableFuture<Void> future = new CompletableFuture<>();
messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
.whenComplete((payload, error) -> {
- cleanup();
+ close(error);
Throwable wrappedError = error;
if (error != null) {
Throwable rootCause = Throwables.getRootCause(error);
@@ -296,9 +297,12 @@
/**
* Cleans up the connection, unregistering handlers registered on the MessagingService.
*/
- private void cleanup() {
+ private void close(Throwable error) {
log.debug("Connection {}-{} closed", partitionId, connectionId);
messagingService.unregisterHandler(localSubject);
+ if (error != null) {
+ exceptionListeners.accept(error);
+ }
closeListeners.accept(this);
}