Using latest atomix release candidate + Updates to CopycatTransport

Change-Id: I960af428ff733ee7467024811e3b3470e951ecb7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
index b5b1f21..3491987 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransport.java
@@ -86,7 +86,7 @@
 
     /**
      * Maps {@link Address address} to {@link Endpoint endpoint}.
-     * @param address
+     * @param address address
      * @return end point
      */
     public static Endpoint toEndpoint(Address address) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 9a1dce9..5b15d33 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.primitives.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -23,8 +24,10 @@
 import org.apache.commons.lang.math.RandomUtils;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
 
 import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Client;
@@ -36,26 +39,30 @@
  */
 public class CopycatTransportClient implements Client {
 
+    private final Logger log = getLogger(getClass());
     private final PartitionId partitionId;
     private final MessagingService messagingService;
     private final CopycatTransport.Mode mode;
+    private final String newConnectionMessageSubject;
     private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
 
     CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
         this.mode = checkNotNull(mode);
+        this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
     }
 
     @Override
     public CompletableFuture<Connection> connect(Address remoteAddress) {
         ThreadContext context = ThreadContext.currentContextOrThrow();
         return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
-                                               PartitionManager.HELLO_MESSAGE_SUBJECT,
-                                               "hello".getBytes())
-                .thenApplyAsync(r -> {
+                                               newConnectionMessageSubject,
+                                               Longs.toByteArray(nextConnectionId()))
+                .thenApplyAsync(bytes -> {
+                    long connectionId = Longs.fromByteArray(bytes);
                     CopycatTransportConnection connection = new CopycatTransportConnection(
-                            nextConnectionId(),
+                            connectionId,
                             CopycatTransport.Mode.CLIENT,
                             partitionId,
                             remoteAddress,
@@ -64,6 +71,7 @@
                     if (mode == CopycatTransport.Mode.CLIENT) {
                         connection.setBidirectional();
                     }
+                    log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
                     connections.add(connection);
                     return connection;
                 }, context.executor());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 39ce10f..d1b686f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -31,12 +31,14 @@
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.cluster.messaging.MessagingService;
+import org.slf4j.Logger;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
 import io.atomix.catalyst.transport.MessageHandler;
@@ -52,6 +54,7 @@
  */
 public class CopycatTransportConnection implements Connection {
 
+    private final Logger log = getLogger(getClass());
     private final Listeners<Throwable> exceptionListeners = new Listeners<>();
     private final Listeners<Connection> closeListeners = new Listeners<>();
 
@@ -82,11 +85,11 @@
         this.remoteAddress = checkNotNull(address);
         this.messagingService = checkNotNull(messagingService);
         if (mode == CopycatTransport.Mode.CLIENT) {
-            this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
-            this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
+            this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+            this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
         } else {
-            this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
-            this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
+            this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
+            this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
         }
         this.context = checkNotNull(context);
     }
@@ -203,6 +206,7 @@
 
     @Override
     public CompletableFuture<Void> close() {
+        log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
         closeListeners.forEach(listener -> listener.accept(this));
         if (mode == CopycatTransport.Mode.CLIENT) {
             messagingService.unregisterHandler(inboundMessageSubject);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 36a1958..2f24d6b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -35,6 +35,7 @@
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
 
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Connection;
@@ -54,13 +55,15 @@
     private final ScheduledExecutorService executorService;
     private final PartitionId partitionId;
     private final MessagingService messagingService;
-    private final String messageSubject;
+    private final String protocolMessageSubject;
+    private final String newConnectionMessageSubject;
     private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
 
     CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
         this.partitionId = checkNotNull(partitionId);
         this.messagingService = checkNotNull(messagingService);
-        this.messageSubject = String.format("onos-copycat-%s", partitionId);
+        this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
+        this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
         this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
                 new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
     }
@@ -68,49 +71,49 @@
     @Override
     public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
         if (listening.compareAndSet(false, true)) {
+            // message handler for all non-connection-establishment messages.
+            messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
+                try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
+                    long connectionId = input.readLong();
+                    CopycatTransportConnection connection = connections.get(connectionId);
+                    if (connection == null) {
+                        throw new IOException("Closed connection");
+                    }
+                    byte[] messagePayload = IOUtils.toByteArray(input);
+                    return connection.handle(messagePayload);
+                } catch (IOException e) {
+                    return Tools.exceptionalFuture(e);
+                }
+            });
+
+            // message handler for new connection attempts.
             ThreadContext context = ThreadContext.currentContextOrThrow();
-            listen(address, listener, context);
+            messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
+                long connectionId = Longs.fromByteArray(payload);
+                CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
+                        CopycatTransport.Mode.SERVER,
+                        partitionId,
+                        CopycatTransport.toAddress(sender),
+                        messagingService,
+                        getOrCreateContext(context));
+                connections.put(connectionId, connection);
+                connection.closeListener(c -> connections.remove(connectionId, c));
+                log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
+                return CompletableFuture.supplyAsync(() -> {
+                    listener.accept(connection);
+                    // echo the connectionId back to indicate successful completion.
+                    return payload;
+                }, context.executor());
+            });
+            context.execute(() -> listenFuture.complete(null));
         }
         return listenFuture;
     }
 
-    private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
-        messagingService.registerHandler(messageSubject, (sender, payload) -> {
-            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
-                long connectionId = input.readLong();
-                AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
-                CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
-                    newConnectionCreated.set(true);
-                    CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
-                            CopycatTransport.Mode.SERVER,
-                            partitionId,
-                            CopycatTransport.toAddress(sender),
-                            messagingService,
-                            getOrCreateContext(context));
-                    log.debug("Created new incoming connection {}", connectionId);
-                    newConnection.closeListener(c -> connections.remove(connectionId, c));
-                    return newConnection;
-                });
-                byte[] request = IOUtils.toByteArray(input);
-                return CompletableFuture.supplyAsync(
-                        () -> {
-                            if (newConnectionCreated.get()) {
-                                listener.accept(connection);
-                            }
-                            return connection;
-                        }, context.executor()).thenCompose(c -> c.handle(request));
-            } catch (IOException e) {
-                return Tools.exceptionalFuture(e);
-            }
-        });
-        context.execute(() -> {
-            listenFuture.complete(null);
-        });
-    }
-
     @Override
     public CompletableFuture<Void> close() {
-        messagingService.unregisterHandler(messageSubject);
+        messagingService.unregisterHandler(newConnectionMessageSubject);
+        messagingService.unregisterHandler(protocolMessageSubject);
         executorService.shutdown();
         return CompletableFuture.completedFuture(null);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
index 78fcedd..d567adc 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultCatalystTypeSerializerFactory.java
@@ -54,7 +54,7 @@
         }
 
         @Override
-        public void write(T object, BufferOutput<?> buffer,
+        public void write(T object, BufferOutput buffer,
                 io.atomix.catalyst.serializer.Serializer serializer) {
             try {
                 byte[] payload = this.serializer.encode(object);
@@ -66,7 +66,7 @@
         }
 
         @Override
-        public T read(Class<T> type, BufferInput<?> buffer,
+        public T read(Class<T> type, BufferInput buffer,
                 io.atomix.catalyst.serializer.Serializer serializer) {
             int size = buffer.readInt();
             try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3826116..54abda2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -27,6 +27,7 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -56,7 +57,6 @@
 public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
     implements PartitionService, PartitionAdminService {
 
-    public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -72,8 +72,6 @@
 
     @Activate
     public void activate() {
-        messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
-                                         (ep, input) -> CompletableFuture.completedFuture(input));
         eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
 
         metadataService.getClusterMetadata()
@@ -93,8 +91,8 @@
         log.info("Started");
     }
 
+    @Deactivate
     public void deactivate() {
-        messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
         eventDispatcher.removeSink(PartitionEvent.class);
 
         CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index f1eb4ce..2a3b834 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -90,7 +90,7 @@
     private TransactionCoordinator transactionCoordinator;
 
     @Activate
-    public void actiavte() {
+    public void activate() {
         basePrimitiveCreator = partitionService.getDistributedPrimitiveCreator(PartitionId.from(0));
         Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
         partitionService.getAllPartitionIds().stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
index 38b811e..3fb243d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionDetails.java
@@ -19,14 +19,14 @@
 
 import java.util.Collection;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.service.PartitionInfo;
 
 import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 
 /**
  * Operational details for a {@code StoragePartition}.
@@ -98,9 +98,11 @@
      * @return partition info
      */
     public PartitionInfo toPartitionInfo() {
+        Function<Member, String> memberToString =
+                m -> m == null ? "none" : String.format("%s:%d", m.address().host(), m.address().port());
         return new PartitionInfo(partitionId.toString(),
                 leaderTerm,
-                Lists.transform(ImmutableList.copyOf(activeMembers), m -> m.address().toString()),
-                leader == null ? "none" : leader.address().toString());
+                activeMembers.stream().map(memberToString).collect(Collectors.toList()),
+                memberToString.apply(leader));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 834d8e9..dcf98f6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -22,6 +22,7 @@
 import io.atomix.copycat.server.CopycatServer;
 import io.atomix.copycat.server.storage.Storage;
 import io.atomix.copycat.server.storage.StorageLevel;
+import io.atomix.manager.ResourceManagerTypeResolver;
 import io.atomix.manager.state.ResourceManagerState;
 import io.atomix.resource.ResourceRegistry;
 import io.atomix.resource.ResourceType;
@@ -107,7 +108,7 @@
         ResourceRegistry registry = new ResourceRegistry();
         resourceTypes.forEach(registry::register);
         resourceResolver.resolve(registry);
-        return CopycatServer.builder(localAddress, partition.getMemberAddresses())
+        CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
                 .withName("partition-" + partition.getId())
                 .withSerializer(serializer.clone())
                 .withTransport(transport.get())
@@ -119,6 +120,8 @@
                         .withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
                         .build())
                 .build();
+        server.serializer().resolve(new ResourceManagerTypeResolver(registry));
+        return server;
     }
 
     public Set<NodeId> configuredMembers() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 73bc8b7..df44b48 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -43,8 +43,10 @@
 /**
  * Distributed resource providing the {@link AsyncConsistentMap} primitive.
  */
-@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
-public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
+@ResourceTypeInfo(id = -151,
+                  stateMachine = AtomixConsistentMapState.class,
+                  typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
+public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
     implements AsyncConsistentMap<String, byte[]> {
 
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 72e52c2..9c9b019 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -26,6 +26,7 @@
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -69,12 +70,17 @@
  * State Machine for {@link AtomixConsistentMap} resource.
  */
 public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
+
     private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
     private final Set<String> preparedKeys = Sets.newHashSet();
     private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
     private AtomicLong versionCounter = new AtomicLong(0);
 
+    public AtomixConsistentMapState() {
+        super(new ResourceType(AtomixConsistentMap.class));
+    }
+
     @Override
     public void snapshot(SnapshotWriter writer) {
         writer.writeLong(versionCounter.get());
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 4e15a81..b7e48fa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -25,7 +25,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.onlab.util.SharedExecutors;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
@@ -36,8 +35,10 @@
 /**
  * Distributed resource providing the {@link AsyncLeaderElector} primitive.
  */
-@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
-public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
+@ResourceTypeInfo(id = -152,
+                  stateMachine = AtomixLeaderElectorState.class,
+                  typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
+public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
     implements AsyncLeaderElector {
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newConcurrentHashSet();
@@ -62,8 +63,7 @@
     }
 
     private void handleEvent(Change<Leadership> change) {
-        SharedExecutors.getSingleThreadExecutor().execute(() ->
-            leadershipChangeListeners.forEach(l -> l.accept(change)));
+        leadershipChangeListeners.forEach(l -> l.accept(change));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index e8abfac..9b58226 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -24,6 +24,7 @@
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
+import io.atomix.resource.ResourceType;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -72,6 +73,10 @@
                                                            ElectionState.class,
                                                            Registration.class);
 
+    public AtomixLeaderElectorState() {
+        super(new ResourceType(AtomixLeaderElector.class));
+    }
+
     @Override
     protected void configure(StateMachineExecutor executor) {
         // Notification
@@ -261,7 +266,7 @@
     }
 
     private void onSessionEnd(Session session) {
-        Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
+        Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
         if (listener != null) {
             listener.close();
         }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
index d655d52..a5c197b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixTestBase.java
@@ -110,7 +110,6 @@
                 .withStorage(Storage.builder()
                         .withStorageLevel(StorageLevel.DISK)
                         .withDirectory(TEST_DIR + "/" + address.port())
-                        .withSerializer(serializer.clone())
                         .build())
                 .withStateMachine(() -> new ResourceManagerState(resourceRegistry))
                 .withSerializer(serializer.clone())
diff --git a/pom.xml b/pom.xml
index 0042de6..4280443 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,7 @@
         <onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
         <netty4.version>4.0.33.Final</netty4.version>
         <!-- TODO: replace with final release version when it is out -->
-        <atomix.version>0.1.0-beta5</atomix.version>
+        <atomix.version>1.0.0-rc1</atomix.version>
         <copycat.version>0.5.1.onos</copycat.version>
         <openflowj.version>0.9.1.onos</openflowj.version>
         <onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>