[ONOS-6075] Rewrite Copycat Transport
- Ensure connection IDs are globally unique
- Ensure connections are closed on each side when close() is called
- Add Transport unit tests

Change-Id: Ia848b075d4030ce74293ecc57fea983693cee265
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 913dc6c..75a75d7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -15,54 +15,92 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.MessagingService;
-
+import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
-
+import io.atomix.catalyst.concurrent.ThreadContext;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Client;
 import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.transport.TransportException;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.onosproject.store.cluster.messaging.MessagingException;
+import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.primitives.impl.CopycatTransport.CONNECT;
+import static org.onosproject.store.primitives.impl.CopycatTransport.SUCCESS;
 
 /**
- * {@link Client} implementation for {@link CopycatTransport}.
+ * Copycat transport client implementation.
  */
 public class CopycatTransportClient implements Client {
-
+    private final Logger log = LoggerFactory.getLogger(getClass());
     private final PartitionId partitionId;
+    private final String serverSubject;
     private final MessagingService messagingService;
-    private final CopycatTransport.Mode mode;
     private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
 
-    CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
-        this.partitionId = checkNotNull(partitionId);
-        this.messagingService = checkNotNull(messagingService);
-        this.mode = checkNotNull(mode);
+    public CopycatTransportClient(PartitionId partitionId, MessagingService messagingService) {
+        this.partitionId = checkNotNull(partitionId, "partitionId cannot be null");
+        this.serverSubject = String.format("onos-copycat-%s", partitionId);
+        this.messagingService = checkNotNull(messagingService, "messagingService cannot be null");
     }
 
     @Override
-    public CompletableFuture<Connection> connect(Address remoteAddress) {
+    public CompletableFuture<Connection> connect(Address address) {
+        CompletableFuture<Connection> future = new CompletableFuture<>();
         ThreadContext context = ThreadContext.currentContextOrThrow();
-        CopycatTransportConnection connection = new CopycatTransportConnection(
-                nextConnectionId(),
-                CopycatTransport.Mode.CLIENT,
-                partitionId,
-                remoteAddress,
-                messagingService,
-                context);
-        if (mode == CopycatTransport.Mode.CLIENT) {
-            connection.setBidirectional();
-        }
-        connection.closeListener(c -> connections.remove(c));
-        connections.add(connection);
-        return CompletableFuture.supplyAsync(() -> connection, context.executor());
+        Endpoint endpoint = CopycatTransport.toEndpoint(address);
+
+        log.debug("Connecting to {}", address);
+
+        ByteBuffer requestBuffer = ByteBuffer.allocate(1);
+        requestBuffer.put(CONNECT);
+
+        // Send a connect request to the server to get a unique connection ID.
+        messagingService.sendAndReceive(endpoint, serverSubject, requestBuffer.array(), context.executor())
+                .whenComplete((payload, error) -> {
+                    Throwable wrappedError = error;
+                    if (error != null) {
+                        Throwable rootCause = Throwables.getRootCause(error);
+                        if (MessagingException.class.isAssignableFrom(rootCause.getClass())) {
+                            wrappedError = new TransportException(error);
+                        }
+                        log.warn("Connection to {} failed! Reason: {}", address, wrappedError);
+                        future.completeExceptionally(wrappedError);
+                    } else {
+                        // If the connection is successful, the server will send back a
+                        // connection ID indicating where to send messages for the connection.
+                        ByteBuffer responseBuffer = ByteBuffer.wrap(payload);
+                        if (responseBuffer.get() == SUCCESS) {
+                            long connectionId = responseBuffer.getLong();
+                            CopycatTransportConnection connection = new CopycatTransportConnection(
+                                    connectionId,
+                                    CopycatTransportConnection.Mode.CLIENT,
+                                    partitionId,
+                                    endpoint,
+                                    messagingService,
+                                    context);
+                            connection.closeListener(connections::remove);
+                            connections.add(connection);
+                            future.complete(connection);
+                            log.debug("Created connection {}-{} to {}", partitionId, connectionId, address);
+                        } else {
+                            log.warn("Connection to {} failed!");
+                            future.completeExceptionally(new ConnectException());
+                        }
+                    }
+                });
+        return future;
     }
 
     @Override
@@ -70,7 +108,11 @@
         return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
     }
 
-    private long nextConnectionId() {
-        return RandomUtils.nextLong();
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("partitionId", partitionId)
+                .toString();
     }
- }
+}
+