Using 1.0.0.rc2 version of Atomix
CopycatTransport updates

Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
index 609ac70..3aed8d2 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncLeaderElector.java
@@ -120,6 +120,6 @@
      * @return new {@code LeaderElector} instance
      */
     default LeaderElector asLeaderElector() {
-        return asLeaderElector(DEFAULT_OPERTATION_TIMEOUT_MILLIS);
+        return asLeaderElector(Long.MAX_VALUE);
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 53f57f1..8ed352b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -19,7 +19,7 @@
 
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Query;
 
 import org.onlab.util.Match;
 import org.onosproject.cluster.Leader;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 5b15d33..96729c7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -16,7 +16,6 @@
 package org.onosproject.store.primitives.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -24,10 +23,8 @@
 import org.apache.commons.lang.math.RandomUtils;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
 
 import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Client;
@@ -39,42 +36,32 @@
  */
 public class CopycatTransportClient implements Client {
 
-    private final Logger log = getLogger(getClass());
     private final PartitionId partitionId;
     private final MessagingService messagingService;
     private final CopycatTransport.Mode mode;
-    private final String newConnectionMessageSubject;
     private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
 
     CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
         this.mode = checkNotNull(mode);
-        this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
     }
 
     @Override
     public CompletableFuture<Connection> connect(Address remoteAddress) {
         ThreadContext context = ThreadContext.currentContextOrThrow();
-        return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
-                                               newConnectionMessageSubject,
-                                               Longs.toByteArray(nextConnectionId()))
-                .thenApplyAsync(bytes -> {
-                    long connectionId = Longs.fromByteArray(bytes);
-                    CopycatTransportConnection connection = new CopycatTransportConnection(
-                            connectionId,
-                            CopycatTransport.Mode.CLIENT,
-                            partitionId,
-                            remoteAddress,
-                            messagingService,
-                            context);
-                    if (mode == CopycatTransport.Mode.CLIENT) {
-                        connection.setBidirectional();
-                    }
-                    log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
-                    connections.add(connection);
-                    return connection;
-                }, context.executor());
+        CopycatTransportConnection connection = new CopycatTransportConnection(
+                nextConnectionId(),
+                CopycatTransport.Mode.CLIENT,
+                partitionId,
+                remoteAddress,
+                messagingService,
+                context);
+        if (mode == CopycatTransport.Mode.CLIENT) {
+            connection.setBidirectional();
+        }
+        connections.add(connection);
+        return CompletableFuture.supplyAsync(() -> connection, context.executor());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index d1b686f..39ce10f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -31,14 +31,12 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
-import org.slf4j.Logger;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
 import io.atomix.catalyst.transport.MessageHandler;
@@ -54,7 +52,6 @@
  */
 public class CopycatTransportConnection implements Connection {
 
-    private final Logger log = getLogger(getClass());
     private final Listeners<Throwable> exceptionListeners = new Listeners<>();
     private final Listeners<Connection> closeListeners = new Listeners<>();
 
@@ -85,11 +82,11 @@
         this.remoteAddress = checkNotNull(address);
         this.messagingService = checkNotNull(messagingService);
         if (mode == CopycatTransport.Mode.CLIENT) {
-            this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
-            this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
+            this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
+            this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
         } else {
-            this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
-            this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+            this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
+            this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
         }
         this.context = checkNotNull(context);
     }
@@ -206,7 +203,6 @@
 
     @Override
     public CompletableFuture<Void> close() {
-        log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
         closeListeners.forEach(listener -> listener.accept(this));
         if (mode == CopycatTransport.Mode.CLIENT) {
             messagingService.unregisterHandler(inboundMessageSubject);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index d4f851f..4a31570 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,7 +35,6 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.google.common.primitives.Longs;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
@@ -55,15 +54,13 @@
     private final ScheduledExecutorService executorService;
     private final PartitionId partitionId;
     private final MessagingService messagingService;
-    private final String protocolMessageSubject;
-    private final String newConnectionMessageSubject;
+    private final String messageSubject;
     private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
 
     CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
-        this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
-        this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
+        this.messageSubject = String.format("onos-copycat-%s", partitionId);
         this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
                 new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
     }
@@ -71,49 +68,49 @@
     @Override
     public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
         if (listening.compareAndSet(false, true)) {
-            // message handler for all non-connection-establishment messages.
-            messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
-                try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
-                    long connectionId = input.readLong();
-                    CopycatTransportConnection connection = connections.get(connectionId);
-                    if (connection == null) {
-                        throw new IOException("Closed connection");
-                    }
-                    byte[] messagePayload = IOUtils.toByteArray(input);
-                    return connection.handle(messagePayload);
-                } catch (IOException e) {
-                    return Tools.exceptionalFuture(e);
-                }
-            });
-
-            // message handler for new connection attempts.
             ThreadContext context = ThreadContext.currentContextOrThrow();
-            messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
-                long connectionId = Longs.fromByteArray(payload);
-                CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
-                        CopycatTransport.Mode.SERVER,
-                        partitionId,
-                        CopycatTransport.toAddress(sender),
-                        messagingService,
-                        getOrCreateContext(context));
-                connections.put(connectionId, connection);
-                connection.closeListener(c -> connections.remove(connectionId, c));
-                log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
-                return CompletableFuture.supplyAsync(() -> {
-                    listener.accept(connection);
-                    // echo the connectionId back to indicate successful completion.
-                    return payload;
-                }, context.executor());
-            });
-            context.execute(() -> listenFuture.complete(null));
+            listen(address, listener, context);
         }
         return listenFuture;
     }
 
+    private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
+        messagingService.registerHandler(messageSubject, (sender, payload) -> {
+            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+                long connectionId = input.readLong();
+                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
+                CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
+                    newConnectionCreated.set(true);
+                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
+                            CopycatTransport.Mode.SERVER,
+                            partitionId,
+                            CopycatTransport.toAddress(sender),
+                            messagingService,
+                            getOrCreateContext(context));
+                    log.debug("Created new incoming connection {}", connectionId);
+                    newConnection.closeListener(c -> connections.remove(connectionId, c));
+                    return newConnection;
+                });
+                byte[] request = IOUtils.toByteArray(input);
+                return CompletableFuture.supplyAsync(
+                        () -> {
+                            if (newConnectionCreated.get()) {
+                                listener.accept(connection);
+                            }
+                            return connection;
+                        }, context.executor()).thenCompose(c -> c.handle(request));
+            } catch (IOException e) {
+                return Tools.exceptionalFuture(e);
+            }
+        });
+        context.execute(() -> {
+            listenFuture.complete(null);
+        });
+    }
+
     @Override
     public CompletableFuture<Void> close() {
-        messagingService.unregisterHandler(newConnectionMessageSubject);
-        messagingService.unregisterHandler(protocolMessageSubject);
+        messagingService.unregisterHandler(messageSubject);
         executorService.shutdown();
         return CompletableFuture.completedFuture(null);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 54abda2..45bd171 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -105,14 +105,16 @@
 
     @Override
     public CompletableFuture<Void> leave(PartitionId partitionId) {
-        // TODO: Implement
-        return Tools.exceptionalFuture(new UnsupportedOperationException());
+        return partitions.get(partitionId)
+                         .server()
+                         .map(server -> server.close())
+                         .orElse(CompletableFuture.completedFuture(null));
     }
 
     @Override
     public CompletableFuture<Void> join(PartitionId partitionId) {
-        // TODO: Implement
-        return Tools.exceptionalFuture(new UnsupportedOperationException());
+        return partitions.get(partitionId)
+                         .open();
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 41cdf7c..3b142a7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -72,10 +72,22 @@
         this.logFolder = logFolder;
     }
 
+    /**
+     * Returns the partition client instance.
+     * @return client
+     */
     public StoragePartitionClient client() {
         return client;
     }
 
+    /**
+     * Returns the optional server instance.
+     * @return server
+     */
+    public Optional<StoragePartitionServer> server() {
+        return server;
+    }
+
     @Override
     public CompletableFuture<Void> open() {
         return openServer().thenAccept(s -> server = Optional.ofNullable(s))
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index d674ebe..e6669dc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -114,8 +114,7 @@
                 .withTransport(transport.get())
                 .withStateMachine(() -> new ResourceManagerState(registry))
                 .withStorage(Storage.builder()
-                         // FIXME: StorageLevel should be DISK
-                        .withStorageLevel(StorageLevel.MEMORY)
+                        .withStorageLevel(StorageLevel.DISK)
                         .withCompactionThreads(1)
                         .withDirectory(dataFolder)
                         .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index df44b48..4c065a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -65,7 +65,7 @@
     @Override
     public CompletableFuture<AtomixConsistentMap> open() {
         return super.open().thenApply(result -> {
-            client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
+            client.onEvent(CHANGE_SUBJECT, this::handleEvent);
             return result;
         });
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 913c3bc..4f912da 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -22,8 +22,8 @@
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.SerializerRegistry;
 import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.client.Command;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
 
 import java.util.Collection;
 import java.util.Map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 9c9b019..a6e6ca0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -18,7 +18,7 @@
 import static org.onosproject.store.service.MapEvent.Type.INSERT;
 import static org.onosproject.store.service.MapEvent.Type.REMOVE;
 import static org.onosproject.store.service.MapEvent.Type.UPDATE;
-import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
@@ -322,8 +322,8 @@
         commit.session()
                 .onStateChange(
                         state -> {
-                            if (state == Session.State.CLOSED
-                                    || state == Session.State.EXPIRED) {
+                            if (state == ServerSession.State.CLOSED
+                                    || state == ServerSession.State.EXPIRED) {
                                 Commit<? extends Listen> listener = listeners.remove(sessionId);
                                 if (listener != null) {
                                     listener.close();
@@ -503,21 +503,21 @@
     }
 
     @Override
-    public void register(Session session) {
+    public void register(ServerSession session) {
     }
 
     @Override
-    public void unregister(Session session) {
+    public void unregister(ServerSession session) {
         closeListener(session.id());
     }
 
     @Override
-    public void expire(Session session) {
+    public void expire(ServerSession session) {
         closeListener(session.id());
     }
 
     @Override
-    public void close(Session session) {
+    public void close(ServerSession session) {
         closeListener(session.id());
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index b7e48fa..9995c4d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -57,7 +57,7 @@
     @Override
     public CompletableFuture<AtomixLeaderElector> open() {
         return super.open().thenApply(result -> {
-            client.session().onEvent("change", this::handleEvent);
+            client.onEvent("change", this::handleEvent);
             return result;
         });
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index e7de783..235bd07 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -32,8 +32,8 @@
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.SerializerRegistry;
 import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.client.Command;
-import io.atomix.copycat.client.Query;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
 
 /**
  * {@link AtomixLeaderElector} resource state machine operations.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 9b58226..23dcf52 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -16,7 +16,7 @@
 package org.onosproject.store.primitives.resources.impl;
 
 import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.copycat.client.session.Session;
+import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
@@ -265,7 +265,7 @@
         return electionState == null ? new LinkedList<>() : electionState.candidates();
     }
 
-    private void onSessionEnd(Session session) {
+    private void onSessionEnd(ServerSession session) {
         Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
         if (listener != null) {
             listener.close();
@@ -337,7 +337,7 @@
             this.termStartTime = termStartTime;
         }
 
-        public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
+        public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
             Optional<Registration> registration =
                     registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
             if (registration.isPresent()) {
@@ -409,21 +409,21 @@
     }
 
     @Override
-    public void register(Session session) {
+    public void register(ServerSession session) {
     }
 
     @Override
-    public void unregister(Session session) {
+    public void unregister(ServerSession session) {
         onSessionEnd(session);
     }
 
     @Override
-    public void expire(Session session) {
+    public void expire(ServerSession session) {
         onSessionEnd(session);
     }
 
     @Override
-    public void close(Session session) {
+    public void close(ServerSession session) {
         onSessionEnd(session);
     }
 
diff --git a/pom.xml b/pom.xml
index 3669294..75f5c06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
         <onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
         <netty4.version>4.0.33.Final</netty4.version>
         <!-- TODO: replace with final release version when it is out -->
-        <atomix.version>1.0.0-rc1</atomix.version>
+        <atomix.version>1.0.0-rc2</atomix.version>
         <copycat.version>0.5.1.onos</copycat.version>
         <openflowj.version>0.9.1.onos</openflowj.version>
         <onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>