Migrating to latest Atomix

Change-Id: Ie636d1b2623b7f83572dca0d70bd56734379e61a
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 3cde4bd..175e253 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -19,9 +19,8 @@
 
 import io.atomix.catalyst.serializer.Serializer;
 import io.atomix.catalyst.serializer.TypeSerializerFactory;
-import io.atomix.copycat.Query;
 import io.atomix.manager.util.ResourceManagerTypeResolver;
-import io.atomix.variables.state.LongCommands;
+import io.atomix.variables.internal.LongCommands;
 
 import org.onlab.util.Match;
 import org.onosproject.cluster.Leader;
@@ -63,8 +62,7 @@
                                 Transaction.State.class,
                                 PrepareResult.class,
                                 CommitResult.class,
-                                RollbackResult.class,
-                                Query.ConsistencyLevel.class));
+                                RollbackResult.class));
         // ONOS classes
         serializer.register(Change.class, factory);
         serializer.register(Leader.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
index 66ae253..7a1cecf 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportClient.java
@@ -29,7 +29,7 @@
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Client;
 import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
+import io.atomix.catalyst.concurrent.ThreadContext;
 
 /**
  * {@link Client} implementation for {@link CopycatTransport}.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
index 333ec3d..8d4b577 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportConnection.java
@@ -15,6 +15,18 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.Listeners;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.serializer.SerializationException;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.MessageHandler;
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.catalyst.util.reference.ReferenceCounted;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -26,8 +38,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-
-
 import org.apache.commons.io.IOUtils;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.PartitionId;
@@ -38,18 +48,6 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import io.atomix.catalyst.serializer.SerializationException;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.MessageHandler;
-import io.atomix.catalyst.transport.TransportException;
-import io.atomix.catalyst.util.Assert;
-import io.atomix.catalyst.util.Listener;
-import io.atomix.catalyst.util.Listeners;
-import io.atomix.catalyst.util.ReferenceCounted;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
-
 /**
  * {@link Connection} implementation for CopycatTransport.
  */
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
index 345a0d7..ff64112 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CopycatTransportServer.java
@@ -17,6 +17,12 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.catalyst.concurrent.CatalystThreadFactory;
+import io.atomix.catalyst.concurrent.SingleThreadContext;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.Server;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -36,13 +42,6 @@
 
 import com.google.common.collect.Maps;
 
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Connection;
-import io.atomix.catalyst.transport.Server;
-import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
-import io.atomix.catalyst.util.concurrent.SingleThreadContext;
-import io.atomix.catalyst.util.concurrent.ThreadContext;
-
 /**
  * {@link Server} implementation for {@link CopycatTransport}.
  */
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
new file mode 100644
index 0000000..8955a6d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingCopycatClient.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Transport;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.session.Session;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * {@code CopycatClient} that merely delegates control to
+ * another CopycatClient.
+ */
+public class DelegatingCopycatClient implements CopycatClient {
+
+    protected final CopycatClient client;
+
+    DelegatingCopycatClient(CopycatClient client) {
+        this.client = client;
+    }
+
+    @Override
+    public State state() {
+        return client.state();
+    }
+
+    @Override
+    public Listener<State> onStateChange(Consumer<State> callback) {
+        return client.onStateChange(callback);
+    }
+
+    @Override
+    public ThreadContext context() {
+        return client.context();
+    }
+
+    @Override
+    public Transport transport() {
+        return client.transport();
+    }
+
+    @Override
+    public Serializer serializer() {
+        return client.serializer();
+    }
+
+    @Override
+    public Session session() {
+        return client.session();
+    }
+
+    @Override
+    public <T> CompletableFuture<T> submit(Command<T> command) {
+        return client.submit(command);
+    }
+
+    @Override
+    public <T> CompletableFuture<T> submit(Query<T> query) {
+        return client.submit(query);
+    }
+
+    @Override
+    public Listener<Void> onEvent(String event, Runnable callback) {
+        return client.onEvent(event, callback);
+    }
+
+    @Override
+    public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
+        return client.onEvent(event, callback);
+    }
+
+    @Override
+    public CompletableFuture<CopycatClient> connect(Collection<Address> members) {
+        return client.connect(members);
+    }
+
+    @Override
+    public CompletableFuture<CopycatClient> recover() {
+        return client.recover();
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        return client.close();
+    }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
new file mode 100644
index 0000000..39ed6c3
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.net.ConnectException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Throwables;
+
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.copycat.Query;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.error.QueryException;
+import io.atomix.copycat.error.UnknownSessionException;
+import io.atomix.copycat.session.ClosedSessionException;
+
+/**
+ * {@code CopycatClient} that can retry when certain recoverable errors are encoutered.
+ */
+public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
+
+    private final int maxRetries;
+    private final long delayBetweenRetriesMillis;
+    private final ScheduledExecutorService executor;
+    private final Logger log = getLogger(getClass());
+
+    private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
+            || e instanceof TimeoutException
+            || e instanceof TransportException
+            || e instanceof ClosedChannelException
+            || e instanceof QueryException
+            || e instanceof UnknownSessionException
+            || e instanceof ClosedSessionException;
+
+    QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
+        super(client);
+        this.maxRetries = maxRetries;
+        this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
+        this.executor = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        executor.shutdown();
+        return super.close();
+    }
+
+    @Override
+    public <T> CompletableFuture<T> submit(Query<T> query) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        executor.submit(() -> submit(query, 1, future));
+        return future;
+    }
+
+    private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
+        client.submit(query).whenComplete((r, e) -> {
+            if (e != null) {
+                if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
+                    log.debug("Retry attempt ({} of {}). Failure due to {}",
+                            attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
+                    executor.schedule(() ->
+                        submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(e);
+                }
+            } else {
+                future.complete(r);
+            }
+        });
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index e1086cf..63037d0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -16,22 +16,18 @@
 package org.onosproject.store.primitives.impl;
 
 import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.Atomix;
 import io.atomix.AtomixClient;
-import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
-import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
 import io.atomix.copycat.client.ConnectionStrategies;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.copycat.client.CopycatClient.State;
 import io.atomix.copycat.client.RecoveryStrategies;
-import io.atomix.copycat.client.RetryStrategies;
 import io.atomix.copycat.client.ServerSelectionStrategies;
 import io.atomix.manager.ResourceClient;
-import io.atomix.manager.state.ResourceManagerException;
+import io.atomix.manager.ResourceManagerException;
 import io.atomix.manager.util.ResourceManagerTypeResolver;
+import io.atomix.resource.ResourceRegistry;
 import io.atomix.resource.ResourceType;
-import io.atomix.resource.util.ResourceRegistry;
 import io.atomix.variables.DistributedLong;
 
 import java.util.Collection;
@@ -70,8 +66,8 @@
     private final StoragePartition partition;
     private final Transport transport;
     private final io.atomix.catalyst.serializer.Serializer serializer;
-    private Atomix client;
-    private CopycatClient copycatClient;
+    private AtomixClient client;
+    private ResourceClient resourceClient;
     private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
     private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
             Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
@@ -99,19 +95,15 @@
 
     @Override
     public CompletableFuture<Void> open() {
-        if (client != null && client.isOpen()) {
-            return CompletableFuture.completedFuture(null);
-        }
         synchronized (StoragePartitionClient.this) {
-            copycatClient = newCopycatClient(partition.getMemberAddresses(),
-                                             transport,
+            resourceClient = newResourceClient(transport,
                                              serializer.clone(),
                                              StoragePartition.RESOURCE_TYPES);
-          copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+            resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
                     + " changed to {}", partition.getId(), state));
-            client = new AtomixClient(new ResourceClient(copycatClient));
+            client = new AtomixClient(resourceClient);
         }
-        return client.open().whenComplete((r, e) -> {
+        return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
             if (e == null) {
                 log.info("Successfully started client for partition {}", partition.getId());
             } else {
@@ -132,7 +124,7 @@
             atomixConsistentMap.statusChangeListeners()
                                .forEach(listener -> listener.accept(mapper.apply(state)));
         };
-        copycatClient.onStateChange(statusListener);
+        resourceClient.client().onStateChange(statusListener);
         AsyncConsistentMap<String, byte[]> rawMap =
                 new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
                     @Override
@@ -173,7 +165,15 @@
 
     @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
-        return client.getResource(name, AtomixLeaderElector.class).join();
+        AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
+                                                  .thenCompose(AtomixLeaderElector::setupCache)
+                                                  .join();
+        Consumer<State> statusListener = state -> {
+            leaderElector.statusChangeListeners()
+                         .forEach(listener -> listener.accept(mapper.apply(state)));
+        };
+        resourceClient.client().onStateChange(statusListener);
+        return leaderElector;
     }
 
     @Override
@@ -188,7 +188,7 @@
 
     @Override
     public boolean isOpen() {
-        return client.isOpen();
+        return resourceClient.client().state() != State.CLOSED;
     }
 
     /**
@@ -198,33 +198,33 @@
     public PartitionClientInfo clientInfo() {
         return new PartitionClientInfo(partition.getId(),
                 partition.getMembers(),
-                copycatClient.session().id(),
-                mapper.apply(copycatClient.state()));
+                resourceClient.client().session().id(),
+                mapper.apply(resourceClient.client().state()));
     }
 
-    private CopycatClient newCopycatClient(Collection<Address> members,
-                                           Transport transport,
+    private ResourceClient newResourceClient(Transport transport,
                                            io.atomix.catalyst.serializer.Serializer serializer,
                                            Collection<ResourceType> resourceTypes) {
         ResourceRegistry registry = new ResourceRegistry();
         resourceTypes.forEach(registry::register);
-        CopycatClient client = CopycatClient.builder(members)
+        CopycatClient copycatClient = CopycatClient.builder()
                 .withServerSelectionStrategy(ServerSelectionStrategies.ANY)
                 .withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
                 .withRecoveryStrategy(RecoveryStrategies.RECOVER)
-                .withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
                 .withTransport(transport)
                 .withSerializer(serializer)
-                .withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
                 .build();
-        client.serializer().resolve(new ResourceManagerTypeResolver());
+        copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
         for (ResourceType type : registry.types()) {
             try {
-                type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
+                type.factory()
+                    .newInstance()
+                    .createSerializableTypeResolver()
+                    .resolve(copycatClient.serializer().registry());
             } catch (InstantiationException | IllegalAccessException e) {
                 throw new ResourceManagerException(e);
             }
         }
-        return client;
+        return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 94ede8e..f0c0609 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -22,7 +22,7 @@
 import io.atomix.copycat.server.CopycatServer;
 import io.atomix.copycat.server.storage.Storage;
 import io.atomix.copycat.server.storage.StorageLevel;
-import io.atomix.manager.state.ResourceManagerState;
+import io.atomix.manager.internal.ResourceManagerState;
 import io.atomix.manager.util.ResourceManagerTypeResolver;
 
 import java.io.File;
@@ -68,9 +68,9 @@
                 return CompletableFuture.completedFuture(null);
             }
             synchronized (this) {
-                server = buildServer(partition.getMemberAddresses());
+                server = buildServer();
             }
-            serverOpenFuture = server.start();
+            serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
         } else {
             serverOpenFuture = CompletableFuture.completedFuture(null);
         }
@@ -85,11 +85,7 @@
 
     @Override
     public CompletableFuture<Void> close() {
-        /**
-         * CopycatServer#kill just shuts down the server and does not result
-         * in any cluster membership changes.
-         */
-        return server.kill();
+        return server.shutdown();
     }
 
     /**
@@ -97,11 +93,11 @@
      * @return future that is completed when the operation is complete
      */
     public CompletableFuture<Void> closeAndExit() {
-        return server.stop();
+        return server.leave();
     }
 
-    private CopycatServer buildServer(Collection<Address> clusterMembers) {
-        CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
+    private CopycatServer buildServer() {
+        CopycatServer server = CopycatServer.builder(localAddress)
                 .withName("partition-" + partition.getId())
                 .withSerializer(serializer.clone())
                 .withTransport(transport.get())
@@ -118,9 +114,8 @@
     }
 
     public CompletableFuture<Void> join(Collection<Address> otherMembers) {
-        server = buildServer(otherMembers);
-
-        return server.start().whenComplete((r, e) -> {
+        server = buildServer();
+        return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
                 log.info("Successfully joined partition {}", partition.getId());
             } else {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
index 6838ab3..251a7ca 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentMultimapCommands.java
@@ -48,10 +48,6 @@
     @SuppressWarnings("serial")
     public abstract static class MultimapCommand<V> implements Command<V>,
             CatalystSerializable {
-        @Override
-        public ConsistencyLevel consistency() {
-            return ConsistencyLevel.SEQUENTIAL;
-        }
 
         @Override
         public String toString() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
index 3d735be..45b4d56 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AsyncConsistentSetMultimap.java
@@ -70,95 +70,90 @@
 
     @Override
     public CompletableFuture<Integer> size() {
-        return submit(new Size());
+        return client.submit(new Size());
     }
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return submit(new IsEmpty());
+        return client.submit(new IsEmpty());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return submit(new ContainsKey(key));
+        return client.submit(new ContainsKey(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return submit(new ContainsValue(value));
+        return client.submit(new ContainsValue(value));
     }
 
     @Override
     public CompletableFuture<Boolean> containsEntry(String key, byte[] value) {
-        return submit(new ContainsEntry(key, value));
+        return client.submit(new ContainsEntry(key, value));
     }
 
     @Override
     public CompletableFuture<Boolean> put(String key, byte[] value) {
-        return submit(new Put(key, Lists.newArrayList(value), null));
+        return client.submit(new Put(key, Lists.newArrayList(value), null));
     }
 
     @Override
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return submit(new MultiRemove(key,
-                                      Lists.newArrayList(value),
-                                      null));
+        return client.submit(new MultiRemove(key,
+                                             Lists.newArrayList(value),
+                                             null));
     }
 
     @Override
-    public CompletableFuture<Boolean> removeAll(
-            String key, Collection<? extends byte[]> values) {
-        return submit(new MultiRemove(key, (Collection<byte[]>) values, null));
+    public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
+        return client.submit(new MultiRemove(key, (Collection<byte[]>) values, null));
     }
 
     @Override
-    public CompletableFuture<
-            Versioned<Collection<? extends byte[]>>> removeAll(String key) {
-        return submit(new RemoveAll(key, null));
+    public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAll(String key) {
+        return client.submit(new RemoveAll(key, null));
     }
 
     @Override
     public CompletableFuture<Boolean> putAll(
             String key, Collection<? extends byte[]> values) {
-        return submit(new Put(key, values, null));
+        return client.submit(new Put(key, values, null));
     }
 
     @Override
-    public CompletableFuture<
-            Versioned<Collection<? extends byte[]>>> replaceValues(
+    public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
             String key, Collection<byte[]> values) {
-        return submit(new Replace(key, values, null));
+        return client.submit(new Replace(key, values, null));
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return submit(new Clear());
+        return client.submit(new Clear());
     }
 
     @Override
-    public CompletableFuture<
-            Versioned<Collection<? extends byte[]>>> get(String key) {
-        return submit(new Get(key));
+    public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
+        return client.submit(new Get(key));
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return submit(new KeySet());
+        return client.submit(new KeySet());
     }
 
     @Override
     public CompletableFuture<Multiset<String>> keys() {
-        return submit(new Keys());
+        return client.submit(new Keys());
     }
 
-    @Override
     public CompletableFuture<Multiset<byte[]>> values() {
-        return submit(new Values());
+        return client.submit(new Values());
     }
 
     @Override
     public CompletableFuture<Collection<Map.Entry<String, byte[]>>> entries() {
-        return submit(new Entries());
+        return client.submit(new Entries());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 9bad652..f1a44de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -97,48 +97,48 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return submit(new IsEmpty());
+        return client.submit(new IsEmpty());
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return submit(new Size());
+        return client.submit(new Size());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return submit(new ContainsKey(key));
+        return client.submit(new ContainsKey(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return submit(new ContainsValue(value));
+        return client.submit(new ContainsValue(value));
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return submit(new Get(key));
+        return client.submit(new Get(key));
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return submit(new KeySet());
+        return client.submit(new KeySet());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return submit(new Values());
+        return client.submit(new Values());
     }
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
-        return submit(new EntrySet());
+        return client.submit(new EntrySet());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -146,7 +146,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.newValue());
     }
@@ -154,14 +154,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -169,7 +169,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+        return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -177,7 +177,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
-        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -185,7 +185,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -193,10 +193,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return submit(new UpdateAndGet(key,
-                newValue,
-                Match.ifValue(oldValue),
-                Match.ANY))
+        return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -204,17 +201,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return submit(new UpdateAndGet(key,
-                newValue,
-                Match.ANY,
-                Match.ifValue(oldVersion)))
+        return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return submit(new Clear())
+        return client.submit(new Clear())
                 .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> null);
     }
@@ -245,7 +239,7 @@
             }
             Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
             Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return submit(new UpdateAndGet(key,
+            return client.submit(new UpdateAndGet(key,
                     computedValue.get(),
                     valueMatch,
                     versionMatch))
@@ -258,7 +252,7 @@
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
                                                             Executor executor) {
         if (mapEventListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
+            return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
         } else {
             mapEventListeners.put(listener, executor);
             return CompletableFuture.completedFuture(null);
@@ -268,7 +262,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
         if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
-            return submit(new Unlisten()).thenApply(v -> null);
+            return client.submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -281,23 +275,23 @@
 
     @Override
     public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
-        return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
+        return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return submit(new TransactionRollback(transactionId))
+        return client.submit(new TransactionRollback(transactionId))
                 .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
-        return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index ae75453..7e7b979 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -51,11 +51,6 @@
     public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
 
         @Override
-        public ConsistencyLevel consistency() {
-          return ConsistencyLevel.SEQUENTIAL;
-        }
-
-        @Override
         public String toString() {
             return MoreObjects.toStringHelper(getClass())
                     .toString();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index f4a4252..6baa835 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -42,6 +42,9 @@
 import org.onosproject.store.service.AsyncLeaderElector;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
 
 /**
@@ -54,11 +57,34 @@
             Sets.newCopyOnWriteArraySet();
     private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
             Sets.newCopyOnWriteArraySet();
+    private final Consumer<Change<Leadership>> cacheUpdater;
+    private final Consumer<Status> statusListener;
 
     public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
+    private final LoadingCache<String, CompletableFuture<Leadership>> cache;
 
     public AtomixLeaderElector(CopycatClient client, Properties properties) {
         super(client, properties);
+        cache = CacheBuilder.newBuilder()
+                .maximumSize(1000)
+                .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+
+        cacheUpdater = change -> {
+            Leadership leadership = change.newValue();
+            cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
+        };
+        statusListener = status -> {
+            if (status == Status.SUSPENDED || status == Status.INACTIVE) {
+                cache.invalidateAll();
+            }
+        };
+        addStatusChangeListener(statusListener);
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        removeStatusChangeListener(statusListener);
+        return removeChangeListener(cacheUpdater);
     }
 
     @Override
@@ -74,53 +100,57 @@
         });
     }
 
+    public CompletableFuture<AtomixLeaderElector> setupCache() {
+        return addChangeListener(cacheUpdater).thenApply(v -> this);
+    }
+
     private void handleEvent(List<Change<Leadership>> changes) {
         changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
     }
 
     @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return submit(new Run(topic, nodeId));
+        return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> withdraw(String topic) {
-        return submit(new Withdraw(topic));
+        return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return submit(new Anoint(topic, nodeId));
+        return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
-        return submit(new Promote(topic, nodeId));
+        return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> evict(NodeId nodeId) {
-        return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+        return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
     }
 
     @Override
     public CompletableFuture<Leadership> getLeadership(String topic) {
-        return submit(new GetLeadership(topic));
+        return cache.getUnchecked(topic);
     }
 
     @Override
     public CompletableFuture<Map<String, Leadership>> getLeaderships() {
-        return submit(new GetAllLeaderships());
+        return client.submit(new GetAllLeaderships());
     }
 
     public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
-        return submit(new GetElectedTopics(nodeId));
+        return client.submit(new GetElectedTopics(nodeId));
     }
 
     @Override
     public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+            return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
         } else {
             leadershipChangeListeners.add(consumer);
             return CompletableFuture.completedFuture(null);
@@ -130,7 +160,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
-            return submit(new Unlisten()).thenApply(v -> null);
+            return client.submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 620d7f2..7ee481c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -50,11 +50,6 @@
     public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
 
         @Override
-        public ConsistencyLevel consistency() {
-            return ConsistencyLevel.BOUNDED_LINEARIZABLE;
-        }
-
-        @Override
         public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
         }
 
@@ -103,11 +98,6 @@
     public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
 
         @Override
-        public ConsistencyLevel consistency() {
-            return ConsistencyLevel.LINEARIZABLE;
-        }
-
-        @Override
         public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
         }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index df04e6d..894cc79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -302,8 +302,10 @@
      * @return topic to leader mapping
      */
     public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
+        Map<String, Leadership> result = new HashMap<>();
         try {
-            return Maps.transformEntries(elections, (k, v) -> leadership(k));
+            result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
+            return result;
         } finally {
             commit.close();
         }
@@ -539,7 +541,7 @@
         byte[] encodedElections  = serializer.encode(elections);
         writer.writeInt(encodedElections.length);
         writer.write(encodedElections);
-        log.info("Took state machine snapshot");
+        log.debug("Took state machine snapshot");
     }
 
     @Override
@@ -552,7 +554,7 @@
         byte[] encodedElections = new byte[encodedElectionsSize];
         reader.read(encodedElections);
         elections = serializer.decode(encodedElections);
-        log.info("Reinstated state machine from snapshot");
+        log.debug("Reinstated state machine from snapshot");
     }
 
     private AtomicLong termCounter(String topic) {