Moved client availability check to copycat client
Change-Id: I411eb74c5d39985d85c5feda976a250e77b88ff5
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 23d2f86..f1a44de 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import io.atomix.copycat.Operation;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
@@ -35,7 +34,6 @@
import java.util.function.Predicate;
import org.onlab.util.Match;
-import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@@ -57,7 +55,6 @@
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
-import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet;
@@ -100,48 +97,48 @@
@Override
public CompletableFuture<Boolean> isEmpty() {
- return submit(new IsEmpty());
+ return client.submit(new IsEmpty());
}
@Override
public CompletableFuture<Integer> size() {
- return submit(new Size());
+ return client.submit(new Size());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
- return submit(new ContainsKey(key));
+ return client.submit(new ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
- return submit(new ContainsValue(value));
+ return client.submit(new ContainsValue(value));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String key) {
- return submit(new Get(key));
+ return client.submit(new Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
- return submit(new KeySet());
+ return client.submit(new KeySet());
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() {
- return submit(new Values());
+ return client.submit(new Values());
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
- return submit(new EntrySet());
+ return client.submit(new EntrySet());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -149,7 +146,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue());
}
@@ -157,14 +154,14 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
- return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
+ return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -172,7 +169,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
- return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
+ return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -180,7 +177,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
- return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
+ return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -188,7 +185,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
- return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
+ return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@@ -196,7 +193,7 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
- return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
+ return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@@ -204,14 +201,14 @@
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
- return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
+ return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
- return submit(new Clear())
+ return client.submit(new Clear())
.whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null);
}
@@ -242,7 +239,7 @@
}
Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
- return submit(new UpdateAndGet(key,
+ return client.submit(new UpdateAndGet(key,
computedValue.get(),
valueMatch,
versionMatch))
@@ -255,7 +252,7 @@
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Executor executor) {
if (mapEventListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
+ return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
@@ -265,7 +262,7 @@
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
- return submit(new Unlisten()).thenApply(v -> null);
+ return client.submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
@@ -278,23 +275,23 @@
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
- return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+ return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
+ return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return submit(new TransactionRollback(transactionId))
+ return client.submit(new TransactionRollback(transactionId))
.thenApply(v -> null);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
- return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+ return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
@@ -311,11 +308,4 @@
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
-
- <T> CompletableFuture<T> submit(Operation<T> command) {
- if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
- return Tools.exceptionalFuture(new StorageException.Unavailable());
- }
- return client.submit(command);
- }
}
\ No newline at end of file