Copycat transport enhancements

Change-Id: I50e9eb0f419b2aa10deff6d54f58649688788faa
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 2cecc0f..53f57f1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -15,29 +15,19 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import io.atomix.catalyst.serializer.CatalystSerializable;
+import java.util.Arrays;
+
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.TypeSerializerFactory;
 import io.atomix.copycat.client.Query;
-import io.atomix.manager.state.GetResource;
-import io.atomix.manager.state.GetResourceKeys;
-import io.atomix.resource.ResourceQuery;
-import io.atomix.variables.state.ValueCommands;
-
-import java.io.IOException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Enumeration;
-import java.util.Scanner;
 
 import org.onlab.util.Match;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
 import org.onosproject.store.primitives.resources.impl.CommitResult;
 import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
 import org.onosproject.store.primitives.resources.impl.PrepareResult;
@@ -47,7 +37,6 @@
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 /**
@@ -65,113 +54,31 @@
                         org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
                                 MapEntryUpdateResult.class,
                                 MapEntryUpdateResult.Status.class,
-                                MapUpdate.class,
-                                MapUpdate.Type.class,
-                                MapTransaction.class,
                                 Transaction.State.class,
-                                TransactionId.class,
                                 PrepareResult.class,
                                 CommitResult.class,
                                 RollbackResult.class,
-                                AtomixConsistentMapCommands.Get.class,
-                                AtomixConsistentMapCommands.ContainsKey.class,
-                                AtomixConsistentMapCommands.ContainsValue.class,
-                                AtomixConsistentMapCommands.Size.class,
-                                AtomixConsistentMapCommands.IsEmpty.class,
-                                AtomixConsistentMapCommands.KeySet.class,
-                                AtomixConsistentMapCommands.EntrySet.class,
-                                AtomixConsistentMapCommands.Values.class,
-                                AtomixConsistentMapCommands.UpdateAndGet.class,
-                                AtomixConsistentMapCommands.TransactionPrepare.class,
-                                AtomixConsistentMapCommands.TransactionCommit.class,
-                                AtomixConsistentMapCommands.TransactionRollback.class,
-                                AtomixLeaderElectorCommands.GetLeadership.class,
-                                AtomixLeaderElectorCommands.GetAllLeaderships.class,
-                                AtomixLeaderElectorCommands.GetElectedTopics.class,
-                                AtomixLeaderElectorCommands.Run.class,
-                                AtomixLeaderElectorCommands.Withdraw.class,
-                                AtomixLeaderElectorCommands.Anoint.class,
-                                GetResource.class,
-                                GetResourceKeys.class,
-                                ResourceQuery.class,
-                                ValueCommands.Get.class,
-                                ValueCommands.Set.class,
                                 Query.ConsistencyLevel.class));
         // ONOS classes
         serializer.register(Change.class, factory);
+        serializer.register(Leader.class, factory);
+        serializer.register(Leadership.class, factory);
         serializer.register(NodeId.class, factory);
         serializer.register(Match.class, factory);
         serializer.register(MapEntryUpdateResult.class, factory);
         serializer.register(MapEntryUpdateResult.Status.class, factory);
-        serializer.register(MapTransaction.class, factory);
         serializer.register(Transaction.State.class, factory);
         serializer.register(PrepareResult.class, factory);
         serializer.register(CommitResult.class, factory);
         serializer.register(RollbackResult.class, factory);
         serializer.register(TransactionId.class, factory);
         serializer.register(MapUpdate.class, factory);
+        serializer.register(MapUpdate.Type.class, factory);
+        serializer.register(MapTransaction.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
         serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
-        serializer.register(AtomixConsistentMapState.class, factory);
 
-        serializer.register(ResourceQuery.class, factory);
-        serializer.register(GetResource.class, factory);
-        serializer.register(GetResourceKeys.class, factory);
-        serializer.register(ValueCommands.Get.class, factory);
-        serializer.register(ValueCommands.Set.class, factory);
-
-        // ConsistentMap
-        serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
-        serializer.register(AtomixConsistentMapCommands.Clear.class);
-        serializer.register(AtomixConsistentMapCommands.Listen.class);
-        serializer.register(AtomixConsistentMapCommands.Unlisten.class);
-        serializer.register(AtomixConsistentMapCommands.Get.class);
-        serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
-        serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
-        serializer.register(AtomixConsistentMapCommands.EntrySet.class);
-        serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
-        serializer.register(AtomixConsistentMapCommands.KeySet.class);
-        serializer.register(AtomixConsistentMapCommands.Size.class);
-        serializer.register(AtomixConsistentMapCommands.Values.class);
-        serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
-        serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
-        serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
-        // LeaderElector
-        serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
-        serializer.register(AtomixLeaderElectorCommands.Listen.class);
-        serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
-        // Atomix types
-        try {
-            ClassLoader cl = CatalystSerializable.class.getClassLoader();
-            Enumeration<URL> urls = cl.getResources(
-                    String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
-            while (urls.hasMoreElements()) {
-                URL url = urls.nextElement();
-                try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
-                    scanner.useDelimiter("\n").forEachRemaining(line -> {
-                        if (!line.trim().startsWith("#")) {
-                            line = line.trim();
-                            if (line.length() > 0) {
-                                try {
-                                    serializer.register(cl.loadClass(line));
-                                } catch (ClassNotFoundException e) {
-                                    Throwables.propagate(e);
-                                }
-                            }
-                        }
-                    });
-                }
-            }
-        } catch (IOException e) {
-            Throwables.propagate(e);
-        }
         return serializer;
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index ddec252..b5b1f21 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -17,9 +17,20 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.onlab.packet.IpAddress;
 import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Client;
 import io.atomix.catalyst.transport.Server;
 import io.atomix.catalyst.transport.Transport;
@@ -51,6 +62,8 @@
     private final Mode mode;
     private final PartitionId partitionId;
     private final MessagingService messagingService;
+    private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
+    private static final Map<Endpoint, Address> ADDRESS_LOOKUP_CACHE = Maps.newConcurrentMap();
 
     public CopycatTransport(Mode mode, PartitionId partitionId, MessagingService messagingService) {
         this.mode = checkNotNull(mode);
@@ -70,4 +83,42 @@
         return new CopycatTransportServer(partitionId,
                                           messagingService);
     }
+
+    /**
+     * Maps {@link Address address} to {@link Endpoint endpoint}.
+     * @param address
+     * @return end point
+     */
+    public static Endpoint toEndpoint(Address address) {
+        return EP_LOOKUP_CACHE.computeIfAbsent(address, a -> {
+            try {
+                Endpoint endpoint = new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
+                ADDRESS_LOOKUP_CACHE.putIfAbsent(endpoint, address);
+                return endpoint;
+            } catch (UnknownHostException e) {
+                Throwables.propagate(e);
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Maps {@link Endpoint endpoint} to {@link Address address}.
+     * @param endpoint end point
+     * @return address
+     */
+    public static Address toAddress(Endpoint endpoint) {
+        return ADDRESS_LOOKUP_CACHE.computeIfAbsent(endpoint, ep -> {
+            try {
+                InetAddress host = InetAddress.getByAddress(endpoint.host().toOctets());
+                int port = endpoint.port();
+                Address address = new Address(new InetSocketAddress(host, port));
+                EP_LOOKUP_CACHE.putIfAbsent(address, endpoint);
+                return address;
+            } catch (UnknownHostException e) {
+                Throwables.propagate(e);
+                return null;
+            }
+        });
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 96729c7..9a1dce9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -50,18 +50,23 @@
     @Override
     public CompletableFuture<Connection> connect(Address remoteAddress) {
         ThreadContext context = ThreadContext.currentContextOrThrow();
-        CopycatTransportConnection connection = new CopycatTransportConnection(
-                nextConnectionId(),
-                CopycatTransport.Mode.CLIENT,
-                partitionId,
-                remoteAddress,
-                messagingService,
-                context);
-        if (mode == CopycatTransport.Mode.CLIENT) {
-            connection.setBidirectional();
-        }
-        connections.add(connection);
-        return CompletableFuture.supplyAsync(() -> connection, context.executor());
+        return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
+                                               PartitionManager.HELLO_MESSAGE_SUBJECT,
+                                               "hello".getBytes())
+                .thenApplyAsync(r -> {
+                    CopycatTransportConnection connection = new CopycatTransportConnection(
+                            nextConnectionId(),
+                            CopycatTransport.Mode.CLIENT,
+                            partitionId,
+                            remoteAddress,
+                            messagingService,
+                            context);
+                    if (mode == CopycatTransport.Mode.CLIENT) {
+                        connection.setBidirectional();
+                    }
+                    connections.add(connection);
+                    return connection;
+                }, context.executor());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 3c5b649..39ce10f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -21,8 +21,6 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -30,10 +28,8 @@
 import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
-import org.onlab.packet.IpAddress;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
-import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 
 import com.google.common.base.MoreObjects;
@@ -74,7 +70,6 @@
     private final AtomicInteger sendFailures = new AtomicInteger(0);
     private final AtomicInteger messagesReceived = new AtomicInteger(0);
     private final AtomicInteger receiveFailures = new AtomicInteger(0);
-    private final Map<Address, Endpoint> endpointLookupCache = Maps.newConcurrentMap();
 
     CopycatTransportConnection(long connectionId,
             CopycatTransport.Mode mode,
@@ -120,7 +115,7 @@
             if (message instanceof ReferenceCounted) {
                 ((ReferenceCounted<?>) message).release();
             }
-            messagingService.sendAndReceive(toEndpoint(remoteAddress),
+            messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
                                             outboundMessageSubject,
                                             baos.toByteArray(),
                                             context.executor())
@@ -240,17 +235,6 @@
                 .toString();
     }
 
-    private Endpoint toEndpoint(Address address) {
-        return endpointLookupCache.computeIfAbsent(address, a -> {
-            try {
-                return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
-            } catch (UnknownHostException e) {
-                Throwables.propagate(e);
-                return null;
-            }
-        });
-    }
-
     @SuppressWarnings("rawtypes")
     private final class InternalHandler {
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 6acb7db..03a5a71 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -16,13 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +30,8 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
 
-import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 import io.atomix.catalyst.transport.Address;
@@ -47,6 +45,7 @@
  */
 public class CopycatTransportServer implements Server {
 
+    private final Logger log = getLogger(getClass());
     private final AtomicBoolean listening = new AtomicBoolean(false);
     private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
     private final PartitionId partitionId;
@@ -73,28 +72,23 @@
         messagingService.registerHandler(messageSubject, (sender, payload) -> {
             try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
                 long connectionId = input.readLong();
-                AtomicBoolean newConnection = new AtomicBoolean(false);
+                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
                 CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
-                    newConnection.set(true);
-                    try {
-                        InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
-                        int senderPort = sender.port();
-                        Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
-                        return new CopycatTransportConnection(connectionId,
-                                CopycatTransport.Mode.SERVER,
-                                partitionId,
-                                senderAddress,
-                                messagingService,
-                                getOrCreateContext(context));
-                    } catch (UnknownHostException e) {
-                        Throwables.propagate(e);
-                        return null;
-                    }
+                    newConnectionCreated.set(true);
+                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+                            CopycatTransport.Mode.SERVER,
+                            partitionId,
+                            CopycatTransport.toAddress(sender),
+                            messagingService,
+                            getOrCreateContext(context));
+                    log.debug("Created new incoming connection {}", connectionId);
+                    newConnection.closeListener(c -> connections.remove(connectionId, c));
+                    return newConnection;
                 });
                 byte[] request = IOUtils.toByteArray(input);
                 return CompletableFuture.supplyAsync(
                         () -> {
-                            if (newConnection.get()) {
+                            if (newConnectionCreated.get()) {
                                 listener.accept(connection);
                             }
                             return connection;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 14e1d96..76709d6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -57,6 +57,7 @@
 public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
     implements PartitionService, PartitionAdminService {
 
+    public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -72,6 +73,8 @@
 
     @Activate
     public void activate() {
+        messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
+                                         (ep, input) -> CompletableFuture.completedFuture(input));
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
 
         metadataService.getClusterMetadata()
@@ -92,6 +95,7 @@
     }
 
     public void deactivate() {
+        messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
         eventDispatcher.removeSink(PartitionEvent.class);
 
         CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
@@ -151,4 +155,4 @@
                          .map(Optional::get)
                          .collect(Collectors.toList());
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 293deef..854aa3c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.Atomix;
 import io.atomix.AtomixClient;
 import io.atomix.catalyst.transport.Transport;
@@ -38,6 +39,7 @@
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
+import org.slf4j.Logger;
 
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
@@ -48,6 +50,8 @@
  */
 public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
 
+    private final Logger log = getLogger(getClass());
+
     private final StoragePartition partition;
     private final Transport transport;
     private final io.atomix.catalyst.serializer.Serializer serializer;
@@ -82,7 +86,13 @@
                                 .withTransport(transport)
                                 .build();
         }
-        return client.open().thenApply(v -> null);
+        return client.open().whenComplete((r, e) -> {
+            if (e == null) {
+                log.info("Successfully started client for partition {}", partition.getId());
+            } else {
+                log.info("Failed to start client for partition {}", partition.getId(), e);
+            }
+        }).thenApply(v -> null);
     }
 
     @Override
@@ -156,4 +166,4 @@
     public boolean isClosed() {
         return client.isClosed();
     }
-}
\ No newline at end of file
+}