When a Copycat client is in SUSPENDED or CLOSED state fail-fast all its operations

Change-Id: I821ca0a488e68d004b4e41b6d8ac28368f09ffcb
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index f1a44de..23d2f86 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
+import io.atomix.copycat.Operation;
 import io.atomix.copycat.client.CopycatClient;
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
@@ -34,6 +35,7 @@
 import java.util.function.Predicate;
 
 import org.onlab.util.Match;
+import org.onlab.util.Tools;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@@ -55,6 +57,7 @@
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.Versioned;
 
 import com.google.common.collect.ImmutableSet;
@@ -97,48 +100,48 @@
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return client.submit(new IsEmpty());
+        return submit(new IsEmpty());
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return client.submit(new Size());
+        return submit(new Size());
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(String key) {
-        return client.submit(new ContainsKey(key));
+        return submit(new ContainsKey(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(byte[] value) {
-        return client.submit(new ContainsValue(value));
+        return submit(new ContainsValue(value));
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(String key) {
-        return client.submit(new Get(key));
+        return submit(new Get(key));
     }
 
     @Override
     public CompletableFuture<Set<String>> keySet() {
-        return client.submit(new KeySet());
+        return submit(new KeySet());
     }
 
     @Override
     public CompletableFuture<Collection<Versioned<byte[]>>> values() {
-        return client.submit(new Values());
+        return submit(new Values());
     }
 
     @Override
     public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
-        return client.submit(new EntrySet());
+        return submit(new EntrySet());
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
-        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -146,7 +149,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
-        return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.newValue());
     }
@@ -154,14 +157,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
-        return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> remove(String key) {
-        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
+        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -169,7 +172,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, byte[] value) {
-        return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+        return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -177,7 +180,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> remove(String key, long version) {
-        return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+        return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -185,7 +188,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
-        return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+        return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.oldValue());
     }
@@ -193,7 +196,7 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
-        return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
+        return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
@@ -201,14 +204,14 @@
     @Override
     @SuppressWarnings("unchecked")
     public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
-        return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
+        return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
                 .whenComplete((r, e) -> throwIfLocked(r.status()))
                 .thenApply(v -> v.updated());
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return client.submit(new Clear())
+        return submit(new Clear())
                 .whenComplete((r, e) -> throwIfLocked(r))
                 .thenApply(v -> null);
     }
@@ -239,7 +242,7 @@
             }
             Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
             Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
-            return client.submit(new UpdateAndGet(key,
+            return submit(new UpdateAndGet(key,
                     computedValue.get(),
                     valueMatch,
                     versionMatch))
@@ -252,7 +255,7 @@
     public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
                                                             Executor executor) {
         if (mapEventListeners.isEmpty()) {
-            return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
+            return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
         } else {
             mapEventListeners.put(listener, executor);
             return CompletableFuture.completedFuture(null);
@@ -262,7 +265,7 @@
     @Override
     public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
         if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
-            return client.submit(new Unlisten()).thenApply(v -> null);
+            return submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -275,23 +278,23 @@
 
     @Override
     public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
-        return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
     public CompletableFuture<Void> commit(TransactionId transactionId) {
-        return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
+        return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Void> rollback(TransactionId transactionId) {
-        return client.submit(new TransactionRollback(transactionId))
+        return submit(new TransactionRollback(transactionId))
                 .thenApply(v -> null);
     }
 
     @Override
     public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
-        return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+        return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
 
     @Override
@@ -308,4 +311,11 @@
     public Collection<Consumer<Status>> statusChangeListeners() {
         return ImmutableSet.copyOf(statusChangeListeners);
     }
+
+    <T> CompletableFuture<T> submit(Operation<T> command) {
+        if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
+            return Tools.exceptionalFuture(new StorageException.Unavailable());
+        }
+        return client.submit(command);
+    }
 }
\ No newline at end of file