Copycat transport enhancements

Change-Id: I50e9eb0f419b2aa10deff6d54f58649688788faa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 6acb7db..03a5a71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -16,13 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +30,8 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
 
-import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 import io.atomix.catalyst.transport.Address;
@@ -47,6 +45,7 @@
  */
 public class CopycatTransportServer implements Server {
 
+    private final Logger log = getLogger(getClass());
     private final AtomicBoolean listening = new AtomicBoolean(false);
     private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
     private final PartitionId partitionId;
@@ -73,28 +72,23 @@
         messagingService.registerHandler(messageSubject, (sender, payload) -> {
             try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
                 long connectionId = input.readLong();
-                AtomicBoolean newConnection = new AtomicBoolean(false);
+                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
                 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
-                    newConnection.set(true);
-                    try {
-                        InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
-                        int senderPort = sender.port();
-                        Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
-                        return new CopycatTransportConnection(connectionId,
-                                CopycatTransport.Mode.SERVER,
-                                partitionId,
-                                senderAddress,
-                                messagingService,
-                                getOrCreateContext(context));
-                    } catch (UnknownHostException e) {
-                        Throwables.propagate(e);
-                        return null;
-                    }
+                    newConnectionCreated.set(true);
+                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+                            CopycatTransport.Mode.SERVER,
+                            partitionId,
+                            CopycatTransport.toAddress(sender),
+                            messagingService,
+                            getOrCreateContext(context));
+                    log.debug("Created new incoming connection {}", connectionId);
+                    newConnection.closeListener(c -> connections.remove(connectionId, c));
+                    return newConnection;
                 });
                 byte[] request = IOUtils.toByteArray(input);
                 return CompletableFuture.supplyAsync(
                         () -> {
-                            if (newConnection.get()) {
+                            if (newConnectionCreated.get()) {
                                 listener.accept(connection);
                             }
                             return connection;