[ONOS-5992] Ensure Copycat connections are closed when no remote handler is registered

Change-Id: Iec17fd09f0d715dbbe08c604057aeb00d677b939
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index eebbf9c..b8596ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,6 +21,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -125,16 +126,7 @@
                                             remoteSubject,
                                             baos.toByteArray(),
                                             context.executor())
-                    .whenComplete((r, e) -> {
-                        Throwable wrappedError = e;
-                        if (e != null) {
-                            Throwable rootCause = Throwables.getRootCause(e);
-                            if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
-                                wrappedError = new TransportException(e);
-                            }
-                        }
-                        handleResponse(r, wrappedError, future);
-                    });
+                    .whenComplete((response, error) -> handleResponse(response, error, future));
         } catch (SerializationException | IOException e) {
             future.completeExceptionally(e);
         }
@@ -149,9 +141,18 @@
             Throwable error,
             CompletableFuture<T> future) {
         if (error != null) {
-            future.completeExceptionally(error);
+            Throwable rootCause = Throwables.getRootCause(error);
+            if (rootCause instanceof MessagingException || rootCause instanceof SocketException) {
+                future.completeExceptionally(new TransportException(error));
+                if (rootCause instanceof MessagingException.NoRemoteHandler) {
+                    close(rootCause);
+                }
+            } else {
+                future.completeExceptionally(error);
+            }
             return;
         }
+
         checkNotNull(response);
         InputStream input = new ByteArrayInputStream(response);
         try {
@@ -227,7 +228,7 @@
     private CompletableFuture<byte[]> handleClose() {
         CompletableFuture<byte[]> future = new CompletableFuture<>();
         context.executor().execute(() -> {
-            cleanup();
+            close(null);
             ByteBuffer responseBuffer = ByteBuffer.allocate(1);
             responseBuffer.put(SUCCESS);
             future.complete(responseBuffer.array());
@@ -273,7 +274,7 @@
         CompletableFuture<Void> future = new CompletableFuture<>();
         messagingService.sendAndReceive(endpoint, remoteSubject, requestBuffer.array(), context.executor())
                 .whenComplete((payload, error) -> {
-                    cleanup();
+                    close(error);
                     Throwable wrappedError = error;
                     if (error != null) {
                         Throwable rootCause = Throwables.getRootCause(error);
@@ -296,9 +297,12 @@
     /**
      * Cleans up the connection, unregistering handlers registered on the MessagingService.
      */
-    private void cleanup() {
+    private void close(Throwable error) {
         log.debug("Connection {}-{} closed", partitionId, connectionId);
         messagingService.unregisterHandler(localSubject);
+        if (error != null) {
+            exceptionListeners.accept(error);
+        }
         closeListeners.accept(this);
     }