Using 1.0.0.rc2 version of Atomix
CopycatTransport updates

Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index d4f851f..4a31570 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,7 +35,6 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
@@ -55,15 +54,13 @@
     private final ScheduledExecutorService executorService;
     private final PartitionId partitionId;
     private final MessagingService messagingService;
-    private final String protocolMessageSubject;
-    private final String newConnectionMessageSubject;
+    private final String messageSubject;
     private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
 
     CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
-        this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
-        this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
+        this.messageSubject = String.format("onos-copycat-%s", partitionId);
         this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
                 new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
     }
@@ -71,49 +68,49 @@
     @Override
     public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
         if (listening.compareAndSet(false, true)) {
-            // message handler for all non-connection-establishment messages.
-            messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
-                try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
-                    long connectionId = input.readLong();
-                    CopycatTransportConnection connection = connections.get(connectionId);
-                    if (connection == null) {
-                        throw new IOException("Closed connection");
-                    }
-                    byte[] messagePayload = IOUtils.toByteArray(input);
-                    return connection.handle(messagePayload);
-                } catch (IOException e) {
-                    return Tools.exceptionalFuture(e);
-                }
-            });
-
-            // message handler for new connection attempts.
             ThreadContext context = ThreadContext.currentContextOrThrow();
-            messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
-                long connectionId = Longs.fromByteArray(payload);
-                CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
-                        CopycatTransport.Mode.SERVER,
-                        partitionId,
-                        CopycatTransport.toAddress(sender),
-                        messagingService,
-                        getOrCreateContext(context));
-                connections.put(connectionId, connection);
-                connection.closeListener(c -> connections.remove(connectionId, c));
-                log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
-                return CompletableFuture.supplyAsync(() -> {
-                    listener.accept(connection);
-                    // echo the connectionId back to indicate successful completion.
-                    return payload;
-                }, context.executor());
-            });
-            context.execute(() -> listenFuture.complete(null));
+            listen(address, listener, context);
         }
         return listenFuture;
     }
 
+    private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+        messagingService.registerHandler(messageSubject, (sender, payload) -> {
+            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+                long connectionId = input.readLong();
+                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
+                CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+                    newConnectionCreated.set(true);
+                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+                            CopycatTransport.Mode.SERVER,
+                            partitionId,
+                            CopycatTransport.toAddress(sender),
+                            messagingService,
+                            getOrCreateContext(context));
+                    log.debug("Created new incoming connection {}", connectionId);
+                    newConnection.closeListener(c -> connections.remove(connectionId, c));
+                    return newConnection;
+                });
+                byte[] request = IOUtils.toByteArray(input);
+                return CompletableFuture.supplyAsync(
+                        () -> {
+                            if (newConnectionCreated.get()) {
+                                listener.accept(connection);
+                            }
+                            return connection;
+                        }, context.executor()).thenCompose(c -> c.handle(request));
+            } catch (IOException e) {
+                return Tools.exceptionalFuture(e);
+            }
+        });
+        context.execute(() -> {
+            listenFuture.complete(null);
+        });
+    }
+
     @Override
     public CompletableFuture<Void> close() {
-        messagingService.unregisterHandler(newConnectionMessageSubject);
-        messagingService.unregisterHandler(protocolMessageSubject);
+        messagingService.unregisterHandler(messageSubject);
         executorService.shutdown();
         return CompletableFuture.completedFuture(null);
     }