Moved client availability check to copycat client

Change-Id: I411eb74c5d39985d85c5feda976a250e77b88ff5
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
similarity index 86%
rename from core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
rename to core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
index 39ed6c3..a170afb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/QueryRetryingCopycatClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/OnosCopycatClient.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
 
+import org.onlab.util.Tools;
+import org.onosproject.store.service.StorageException;
 import org.slf4j.Logger;
 
 import com.google.common.base.Throwables;
@@ -38,9 +40,9 @@
 import io.atomix.copycat.session.ClosedSessionException;
 
 /**
- * {@code CopycatClient} that can retry when certain recoverable errors are encoutered.
+ * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
  */
-public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
+public class OnosCopycatClient extends DelegatingCopycatClient {
 
     private final int maxRetries;
     private final long delayBetweenRetriesMillis;
@@ -55,7 +57,7 @@
             || e instanceof UnknownSessionException
             || e instanceof ClosedSessionException;
 
-    QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
+    OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
         super(client);
         this.maxRetries = maxRetries;
         this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
@@ -70,6 +72,9 @@
 
     @Override
     public <T> CompletableFuture<T> submit(Query<T> query) {
+        if (state() == State.SUSPENDED || state() == State.CLOSED) {
+            return Tools.exceptionalFuture(new StorageException.Unavailable());
+        }
         CompletableFuture<T> future = new CompletableFuture<>();
         executor.submit(() -> submit(query, 1, future));
         return future;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 1495524..92170d4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -51,7 +51,6 @@
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.PartitionClientInfo;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageException;
 import org.slf4j.Logger;
 
 import com.google.common.base.Supplier;
@@ -120,7 +119,6 @@
 
     @Override
     public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
-        checkAvailability();
         AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
         Consumer<State> statusListener = state -> {
             atomixConsistentMap.statusChangeListeners()
@@ -145,13 +143,11 @@
 
     @Override
     public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
-        checkAvailability();
         return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
     }
 
     @Override
     public AsyncAtomicCounter newAsyncCounter(String name) {
-        checkAvailability();
         DistributedLong distributedLong = client.getLong(name).join();
         return new AtomixCounter(name, distributedLong);
     }
@@ -169,7 +165,6 @@
 
     @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
-        checkAvailability();
         AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
                                                   .thenCompose(AtomixLeaderElector::setupCache)
                                                   .join();
@@ -183,13 +178,11 @@
 
     @Override
     public Set<String> getAsyncConsistentMapNames() {
-        checkAvailability();
         return client.keys(AtomixConsistentMap.class).join();
     }
 
     @Override
     public Set<String> getAsyncAtomicCounterNames() {
-        checkAvailability();
         return client.keys(DistributedLong.class).join();
     }
 
@@ -232,12 +225,6 @@
                 throw new ResourceManagerException(e);
             }
         }
-        return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
-    }
-
-    private void checkAvailability() {
-        if (resourceClient.client().state() == State.SUSPENDED || resourceClient.client().state() == State.CLOSED) {
-            throw new StorageException.Unavailable();
-        }
+        return new ResourceClient(new OnosCopycatClient(copycatClient, 2, 100));
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 23d2f86..f1a44de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import io.atomix.copycat.Operation;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
@@ -35,7 +34,6 @@
 import java.util.function.Predicate;
 
 import org.onlab.util.Match;
-import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@@ -57,7 +55,6 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
-import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.ImmutableSet;
@@ -100,48 +97,48 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return submit(new IsEmpty());
+        return client.submit(new IsEmpty());
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return submit(new Size());
+        return client.submit(new Size());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return submit(new ContainsKey(key));
+        return client.submit(new ContainsKey(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return submit(new ContainsValue(value));
+        return client.submit(new ContainsValue(value));
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return submit(new Get(key));
+        return client.submit(new Get(key));
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return submit(new KeySet());
+        return client.submit(new KeySet());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return submit(new Values());
+        return client.submit(new Values());
     }
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
-        return submit(new EntrySet());
+        return client.submit(new EntrySet());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -149,7 +146,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.newValue());
     }
@@ -157,14 +154,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
+        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -172,7 +169,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+        return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -180,7 +177,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
-        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -188,7 +185,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+        return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -196,7 +193,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
+        return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -204,14 +201,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
+        return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return submit(new Clear())
+        return client.submit(new Clear())
                 .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> null);
     }
@@ -242,7 +239,7 @@
             }
             Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
             Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return submit(new UpdateAndGet(key,
+            return client.submit(new UpdateAndGet(key,
                     computedValue.get(),
                     valueMatch,
                     versionMatch))
@@ -255,7 +252,7 @@
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
                                                             Executor executor) {
         if (mapEventListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
+            return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
         } else {
             mapEventListeners.put(listener, executor);
             return CompletableFuture.completedFuture(null);
@@ -265,7 +262,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
         if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
-            return submit(new Unlisten()).thenApply(v -> null);
+            return client.submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -278,23 +275,23 @@
 
     @Override
     public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
-        return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
+        return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return submit(new TransactionRollback(transactionId))
+        return client.submit(new TransactionRollback(transactionId))
                 .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
-        return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
@@ -311,11 +308,4 @@
     public Collection<Consumer<Status>> statusChangeListeners() {
         return ImmutableSet.copyOf(statusChangeListeners);
     }
-
-    <T> CompletableFuture<T> submit(Operation<T> command) {
-        if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
-            return Tools.exceptionalFuture(new StorageException.Unavailable());
-        }
-        return client.submit(command);
-    }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 2c2f47d..95f9d17 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import io.atomix.copycat.Operation;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
@@ -28,7 +27,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
-import org.onlab.util.Tools;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
@@ -42,8 +40,6 @@
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
 import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.StorageException;
-
 import com.google.common.collect.ImmutableSet;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -70,7 +66,7 @@
         super(client, properties);
         cache = CacheBuilder.newBuilder()
                 .maximumSize(1000)
-                .build(CacheLoader.from(topic -> submit(new GetLeadership(topic))));
+                .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
 
         cacheUpdater = change -> {
             Leadership leadership = change.newValue();
@@ -113,27 +109,27 @@
 
     @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> withdraw(String topic) {
-        return submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
+        return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
-        return submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> evict(NodeId nodeId) {
-        return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+        return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
     }
 
     @Override
@@ -148,17 +144,17 @@
 
     @Override
     public CompletableFuture<Map<String, Leadership>> getLeaderships() {
-        return submit(new GetAllLeaderships());
+        return client.submit(new GetAllLeaderships());
     }
 
     public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
-        return submit(new GetElectedTopics(nodeId));
+        return client.submit(new GetElectedTopics(nodeId));
     }
 
     @Override
     public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+            return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
         } else {
             leadershipChangeListeners.add(consumer);
             return CompletableFuture.completedFuture(null);
@@ -168,7 +164,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
-            return submit(new Unlisten()).thenApply(v -> null);
+            return client.submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -187,11 +183,4 @@
     public Collection<Consumer<Status>> statusChangeListeners() {
         return ImmutableSet.copyOf(statusChangeListeners);
     }
-
-    <T> CompletableFuture<T> submit(Operation<T> command) {
-        if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
-            return Tools.exceptionalFuture(new StorageException.Unavailable());
-        }
-        return client.submit(command);
-    }
 }