[ONOS-5992] Ensure Copycat connections are closed when no remote handler is registered

Change-Id: Iec17fd09f0d715dbbe08c604057aeb00d677b939
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index 45cc529..fc94dd6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -27,7 +27,6 @@
 import org.onosproject.store.cluster.messaging.MessagingService;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
 
@@ -41,7 +40,6 @@
     private final PartitionId partitionId;
     private final MessagingService messagingService;
     private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
-    private static final Map<Endpoint, Address> ADDRESS_LOOKUP_CACHE = Maps.newConcurrentMap();
 
     static final byte MESSAGE = 0x01;
     static final byte CONNECT = 0x02;
@@ -85,22 +83,4 @@
             }
         });
     }
-
-    /**
-     * Maps {@link Endpoint endpoint} to {@link Address address}.
-     * @param endpoint end point
-     * @return address
-     */
-    static Address toAddress(Endpoint endpoint) {
-        return ADDRESS_LOOKUP_CACHE.computeIfAbsent(endpoint, ep -> {
-            try {
-                InetAddress host = InetAddress.getByAddress(endpoint.host().toOctets());
-                int port = endpoint.port();
-                return new Address(new InetSocketAddress(host, port));
-            } catch (UnknownHostException e) {
-                Throwables.propagate(e);
-                return null;
-            }
-        });
-    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index eebbf9c..b8596ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,6 +21,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -125,16 +126,7 @@
                                             remoteSubject,
                                             baos.toByteArray(),
                                             context.executor())
-                    .whenComplete((r, e) -> {
-                        Throwable wrappedError = e;
-                        if (e != null) {
-                            Throwable rootCause = Throwables.getRootCause(e);
-                            if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
-                                wrappedError = new TransportException(e);
-                            }
-                        }
-                        handleResponse(r, wrappedError, future);
-                    });
+                    .whenComplete((response, error) -> handleResponse(response, error, future));
         } catch (SerializationException | IOException e) {
             future.completeExceptionally(e);
         }
@@ -149,9 +141,18 @@
             Throwable error,
             CompletableFuture<T> future) {
         if (error != null) {
-            future.completeExceptionally(error);
+            Throwable rootCause = Throwables.getRootCause(error);
+            if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+                future.completeExceptionally(new TransportException(error));
+                if (rootCause instanceof MessagingException.NoRemoteHandler) {
+                    close(rootCause);
+                }
+            } else {
+                future.completeExceptionally(error);
+            }
             return;
         }
+
         checkNotNull(response);
         InputStream input = new ByteArrayInputStream(response);
         try {
@@ -227,7 +228,7 @@
     private CompletableFuture<byte[]> handleClose() {
         CompletableFuture<byte[]> future = new CompletableFuture<>();
         context.executor().execute(() -> {
-            cleanup();
+            close(null);
             ByteBuffer responseBuffer = ByteBuffer.allocate(1);
             responseBuffer.put(SUCCESS);
             future.complete(responseBuffer.array());
@@ -273,7 +274,7 @@
         CompletableFuture<Void> future = new CompletableFuture<>();
         messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
                 .whenComplete((payload, error) -> {
-                    cleanup();
+                    close(error);
                     Throwable wrappedError = error;
                     if (error != null) {
                         Throwable rootCause = Throwables.getRootCause(error);
@@ -296,9 +297,12 @@
     /**
      * Cleans up the connection, unregistering handlers registered on the MessagingService.
      */
-    private void cleanup() {
+    private void close(Throwable error) {
         log.debug("Connection {}-{} closed", partitionId, connectionId);
         messagingService.unregisterHandler(localSubject);
+        if (error != null) {
+            exceptionListeners.accept(error);
+        }
         closeListeners.accept(this);
     }