Convert all Atomix exceptions to ONOS storage exceptions
Change-Id: If4ba6b4fa41643fc3d9a9f2d84dc68fd6399a352
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
index 4711936..dde25cb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounter.java
@@ -19,6 +19,8 @@
import org.onosproject.store.service.AsyncAtomicCounter;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix atomic counter.
*/
@@ -36,36 +38,36 @@
@Override
public CompletableFuture<Long> incrementAndGet() {
- return atomixCounter.incrementAndGet();
+ return adaptFuture(atomixCounter.incrementAndGet());
}
@Override
public CompletableFuture<Long> getAndIncrement() {
- return atomixCounter.getAndIncrement();
+ return adaptFuture(atomixCounter.getAndIncrement());
}
@Override
public CompletableFuture<Long> getAndAdd(long delta) {
- return atomixCounter.getAndAdd(delta);
+ return adaptFuture(atomixCounter.getAndAdd(delta));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
- return atomixCounter.addAndGet(delta);
+ return adaptFuture(atomixCounter.addAndGet(delta));
}
@Override
public CompletableFuture<Long> get() {
- return atomixCounter.get();
+ return adaptFuture(atomixCounter.get());
}
@Override
public CompletableFuture<Void> set(long value) {
- return atomixCounter.set(value);
+ return adaptFuture(atomixCounter.set(value));
}
@Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
- return atomixCounter.compareAndSet(expectedValue, updateValue);
+ return adaptFuture(atomixCounter.compareAndSet(expectedValue, updateValue));
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
index bd15261..e60f893 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicCounterMap.java
@@ -19,6 +19,8 @@
import org.onosproject.store.service.AsyncAtomicCounterMap;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
+
/**
* Atomix atomic counter map.
*/
@@ -36,76 +38,76 @@
@Override
public CompletableFuture<Long> incrementAndGet(K key) {
- return atomixMap.incrementAndGet(key);
+ return adaptMapFuture(atomixMap.incrementAndGet(key));
}
@Override
public CompletableFuture<Long> decrementAndGet(K key) {
- return atomixMap.decrementAndGet(key);
+ return adaptMapFuture(atomixMap.decrementAndGet(key));
}
@Override
public CompletableFuture<Long> getAndIncrement(K key) {
- return atomixMap.getAndIncrement(key);
+ return adaptMapFuture(atomixMap.getAndIncrement(key));
}
@Override
public CompletableFuture<Long> getAndDecrement(K key) {
- return atomixMap.getAndDecrement(key);
+ return adaptMapFuture(atomixMap.getAndDecrement(key));
}
@Override
public CompletableFuture<Long> addAndGet(K key, long delta) {
- return atomixMap.addAndGet(key, delta);
+ return adaptMapFuture(atomixMap.addAndGet(key, delta));
}
@Override
public CompletableFuture<Long> getAndAdd(K key, long delta) {
- return atomixMap.getAndAdd(key, delta);
+ return adaptMapFuture(atomixMap.getAndAdd(key, delta));
}
@Override
public CompletableFuture<Long> get(K key) {
- return atomixMap.get(key);
+ return adaptMapFuture(atomixMap.get(key));
}
@Override
public CompletableFuture<Long> put(K key, long newValue) {
- return atomixMap.put(key, newValue);
+ return adaptMapFuture(atomixMap.put(key, newValue));
}
@Override
public CompletableFuture<Long> putIfAbsent(K key, long newValue) {
- return atomixMap.putIfAbsent(key, newValue);
+ return adaptMapFuture(atomixMap.putIfAbsent(key, newValue));
}
@Override
public CompletableFuture<Boolean> replace(K key, long expectedOldValue, long newValue) {
- return atomixMap.replace(key, expectedOldValue, newValue);
+ return adaptMapFuture(atomixMap.replace(key, expectedOldValue, newValue));
}
@Override
public CompletableFuture<Long> remove(K key) {
- return atomixMap.remove(key);
+ return adaptMapFuture(atomixMap.remove(key));
}
@Override
public CompletableFuture<Boolean> remove(K key, long value) {
- return atomixMap.remove(key, value);
+ return adaptMapFuture(atomixMap.remove(key, value));
}
@Override
public CompletableFuture<Integer> size() {
- return atomixMap.size();
+ return adaptMapFuture(atomixMap.size());
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return atomixMap.isEmpty();
+ return adaptMapFuture(atomixMap.isEmpty());
}
@Override
public CompletableFuture<Void> clear() {
- return atomixMap.clear();
+ return adaptMapFuture(atomixMap.clear());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
index 97d0268..e762511 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicIdGenerator.java
@@ -19,6 +19,8 @@
import org.onosproject.store.service.AsyncAtomicIdGenerator;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix atomic ID generator.
*/
@@ -36,6 +38,6 @@
@Override
public CompletableFuture<Long> nextId() {
- return atomixIdGenerator.nextId();
+ return adaptFuture(atomixIdGenerator.nextId());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
index e4ad32f..0843f9c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixAtomicValue.java
@@ -23,6 +23,8 @@
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix atomic value.
*/
@@ -42,22 +44,22 @@
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
- return atomixValue.compareAndSet(expect, update);
+ return adaptFuture(atomixValue.compareAndSet(expect, update));
}
@Override
public CompletableFuture<V> get() {
- return atomixValue.get();
+ return adaptFuture(atomixValue.get());
}
@Override
public CompletableFuture<V> getAndSet(V value) {
- return atomixValue.getAndSet(value);
+ return adaptFuture(atomixValue.getAndSet(value));
}
@Override
public CompletableFuture<Void> set(V value) {
- return atomixValue.set(value);
+ return adaptFuture(atomixValue.set(value));
}
@Override
@@ -68,14 +70,14 @@
event.newValue(),
event.oldValue()));
listenerMap.put(listener, atomixListener);
- return atomixValue.addListener(atomixListener);
+ return adaptFuture(atomixValue.addListener(atomixListener));
}
@Override
public synchronized CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
io.atomix.core.value.AtomicValueEventListener<V> atomixListener = listenerMap.remove(listener);
if (atomixListener != null) {
- return atomixValue.removeListener(atomixListener);
+ return adaptFuture(atomixValue.removeListener(atomixListener));
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
index 645544d..6241855 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMap.java
@@ -24,7 +24,6 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection;
import io.atomix.core.set.impl.TranscodingAsyncDistributedSet;
@@ -32,13 +31,14 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncIterator;
-import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
+
/**
* Atomix consistent map.
*/
@@ -84,22 +84,22 @@
@Override
public CompletableFuture<Versioned<V>> computeIf(
K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
- return adapt(atomixMap.computeIf(key, condition, remappingFunction).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.computeIf(key, condition, remappingFunction).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Versioned<V>> put(K key, V value) {
- return adapt(atomixMap.put(key, value).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.put(key, value).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
- return adapt(atomixMap.putAndGet(key, value).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.putAndGet(key, value).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Versioned<V>> remove(K key) {
- return adapt(atomixMap.remove(key).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.remove(key).thenApply(this::toVersioned));
}
@Override
@@ -137,32 +137,32 @@
@Override
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
- return adapt(atomixMap.putIfAbsent(key, value).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.putIfAbsent(key, value).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
- return adapt(atomixMap.remove(key, value));
+ return adaptMapFuture(atomixMap.remove(key, value));
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
- return adapt(atomixMap.remove(key, version));
+ return adaptMapFuture(atomixMap.remove(key, version));
}
@Override
public CompletableFuture<Versioned<V>> replace(K key, V value) {
- return adapt(atomixMap.replace(key, value).thenApply(this::toVersioned));
+ return adaptMapFuture(atomixMap.replace(key, value).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
- return adapt(atomixMap.replace(key, oldValue, newValue));
+ return adaptMapFuture(atomixMap.replace(key, oldValue, newValue));
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
- return adapt(atomixMap.replace(key, oldVersion, newValue));
+ return adaptMapFuture(atomixMap.replace(key, oldVersion, newValue));
}
@Override
@@ -230,30 +230,6 @@
throw new UnsupportedOperationException();
}
- private <T> CompletableFuture<T> adapt(CompletableFuture<T> future) {
- CompletableFuture<T> newFuture = new CompletableFuture<>();
- future.whenComplete((result, error) -> {
- if (error == null) {
- newFuture.complete(result);
- } else {
- Throwable cause = Throwables.getRootCause(error);
- if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
- newFuture.completeExceptionally(
- new ConsistentMapException.ConcurrentModification(cause.getMessage()));
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
- newFuture.completeExceptionally(new ConsistentMapException.Timeout(cause.getMessage()));
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
- newFuture.completeExceptionally(new ConsistentMapException.Interrupted());
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
- newFuture.completeExceptionally(new ConsistentMapException.Unavailable());
- } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
- newFuture.completeExceptionally(new ConsistentMapException(cause.getMessage()));
- }
- }
- });
- return newFuture;
- }
-
private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) {
return versioned != null
? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
index b7638c1..96d7e8c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentMultimap.java
@@ -32,6 +32,8 @@
import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
+
/**
* Atomix consistent map.
*/
@@ -51,79 +53,81 @@
@Override
public CompletableFuture<Integer> size() {
- return atomixMultimap.size();
+ return adaptMapFuture(atomixMultimap.size());
}
@Override
public CompletableFuture<Boolean> containsKey(K key) {
- return atomixMultimap.containsKey(key);
+ return adaptMapFuture(atomixMultimap.containsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(V value) {
- return atomixMultimap.containsValue(value);
+ return adaptMapFuture(atomixMultimap.containsValue(value));
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return atomixMultimap.isEmpty();
+ return adaptMapFuture(atomixMultimap.isEmpty());
}
@Override
public CompletableFuture<Boolean> containsEntry(K key, V value) {
- return atomixMultimap.containsEntry(key, value);
+ return adaptMapFuture(atomixMultimap.containsEntry(key, value));
}
@Override
public CompletableFuture<Boolean> put(K key, V value) {
- return atomixMultimap.put(key, value);
+ return adaptMapFuture(atomixMultimap.put(key, value));
}
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
- return atomixMultimap.put(key, value).thenCompose(v -> atomixMultimap.get(key)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixMultimap.put(key, value).thenCompose(v -> atomixMultimap.get(key))
+ .thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
- return atomixMultimap.remove(key, value);
+ return adaptMapFuture(atomixMultimap.remove(key, value));
}
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
- return atomixMultimap.remove(key, value).thenCompose(v -> atomixMultimap.get(key)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixMultimap.remove(key, value).thenCompose(v -> atomixMultimap.get(key))
+ .thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
- return atomixMultimap.removeAll(key, values);
+ return adaptMapFuture(atomixMultimap.removeAll(key, values));
}
@Override
public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
- return atomixMultimap.removeAll(key).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixMultimap.removeAll(key).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
- return atomixMultimap.putAll(key, values);
+ return adaptMapFuture(atomixMultimap.putAll(key, values));
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
- return atomixMultimap.replaceValues(key, values).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixMultimap.replaceValues(key, values).thenApply(this::toVersioned));
}
@Override
public CompletableFuture<Void> clear() {
- return atomixMultimap.clear();
+ return adaptMapFuture(atomixMultimap.clear());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
- return atomixMultimap.get(key).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixMultimap.get(key).thenApply(this::toVersioned));
}
@Override
@@ -176,14 +180,14 @@
event.newValue(),
event.oldValue()));
listenerMap.put(listener, atomixListener);
- return atomixMultimap.addListener(atomixListener, executor);
+ return adaptMapFuture(atomixMultimap.addListener(atomixListener, executor));
}
@Override
public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
io.atomix.core.multimap.AtomicMultimapEventListener<K, V> atomixListener = listenerMap.remove(listener);
if (atomixListener != null) {
- return atomixMultimap.removeListener(atomixListener);
+ return adaptMapFuture(atomixMultimap.removeListener(atomixListener));
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
index 6b64139..cfee0e7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixConsistentTreeMap.java
@@ -27,7 +27,6 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.core.collection.impl.TranscodingAsyncDistributedCollection;
import io.atomix.core.map.impl.DelegatingAsyncDistributedNavigableMap;
@@ -35,13 +34,14 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncIterator;
-import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptMapFuture;
+
/**
* Atomix consistent tree map.
*/
@@ -168,22 +168,22 @@
public CompletableFuture<Versioned<V>> computeIf(
String key,
Predicate<? super V> condition, BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
- return adapt(atomixTreeMap.computeIf(key, condition, remappingFunction)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.computeIf(key, condition, remappingFunction)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Versioned<V>> put(String key, V value) {
- return adapt(atomixTreeMap.put(key, value)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.put(key, value)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
- return adapt(atomixTreeMap.putAndGet(key, value)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.putAndGet(key, value)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Versioned<V>> remove(String key) {
- return adapt(atomixTreeMap.remove(key)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.remove(key)).thenApply(this::toVersioned);
}
@Override
@@ -201,32 +201,32 @@
@Override
public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
- return adapt(atomixTreeMap.putIfAbsent(key, value)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.putIfAbsent(key, value)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Boolean> remove(String key, V value) {
- return adapt(atomixTreeMap.remove(key, value));
+ return adaptMapFuture(atomixTreeMap.remove(key, value));
}
@Override
public CompletableFuture<Boolean> remove(String key, long version) {
- return adapt(atomixTreeMap.remove(key, version));
+ return adaptMapFuture(atomixTreeMap.remove(key, version));
}
@Override
public CompletableFuture<Versioned<V>> replace(String key, V value) {
- return adapt(atomixTreeMap.replace(key, value)).thenApply(this::toVersioned);
+ return adaptMapFuture(atomixTreeMap.replace(key, value)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
- return adapt(atomixTreeMap.replace(key, oldValue, newValue));
+ return adaptMapFuture(atomixTreeMap.replace(key, oldValue, newValue));
}
@Override
public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
- return adapt(atomixTreeMap.replace(key, oldVersion, newValue));
+ return adaptMapFuture(atomixTreeMap.replace(key, oldVersion, newValue));
}
@Override
@@ -314,30 +314,6 @@
throw new UnsupportedOperationException();
}
- private <T> CompletableFuture<T> adapt(CompletableFuture<T> future) {
- CompletableFuture<T> newFuture = new CompletableFuture<>();
- future.whenComplete((result, error) -> {
- if (error == null) {
- newFuture.complete(result);
- } else {
- Throwable cause = Throwables.getRootCause(error);
- if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
- newFuture.completeExceptionally(
- new ConsistentMapException.ConcurrentModification(error.getMessage()));
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
- newFuture.completeExceptionally(new ConsistentMapException.Timeout(error.getMessage()));
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
- newFuture.completeExceptionally(new ConsistentMapException.Interrupted());
- } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
- newFuture.completeExceptionally(new ConsistentMapException.Unavailable());
- } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
- newFuture.completeExceptionally(new ConsistentMapException(cause.getMessage()));
- }
- }
- });
- return newFuture;
- }
-
private Versioned<V> toVersioned(io.atomix.utils.time.Versioned<V> versioned) {
return versioned != null
? new Versioned<>(versioned.value(), versioned.version(), versioned.creationTime())
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
index a4e0f27..038d995 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedLock.java
@@ -22,6 +22,8 @@
import org.onosproject.store.service.AsyncDistributedLock;
import org.onosproject.store.service.Version;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix distributed lock.
*/
@@ -39,22 +41,22 @@
@Override
public CompletableFuture<Version> lock() {
- return atomixLock.lock().thenApply(this::toVersion);
+ return adaptFuture(atomixLock.lock()).thenApply(this::toVersion);
}
@Override
public CompletableFuture<Optional<Version>> tryLock() {
- return atomixLock.tryLock().thenApply(optional -> optional.map(this::toVersion));
+ return adaptFuture(atomixLock.tryLock()).thenApply(optional -> optional.map(this::toVersion));
}
@Override
public CompletableFuture<Optional<Version>> tryLock(Duration timeout) {
- return atomixLock.tryLock(timeout).thenApply(optional -> optional.map(this::toVersion));
+ return adaptFuture(atomixLock.tryLock(timeout)).thenApply(optional -> optional.map(this::toVersion));
}
@Override
public CompletableFuture<Void> unlock() {
- return atomixLock.unlock();
+ return adaptFuture(atomixLock.unlock());
}
private Version toVersion(io.atomix.utils.time.Version version) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
index 5add8b4..5483c74 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedSet.java
@@ -26,6 +26,8 @@
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix distributed set.
*/
@@ -45,52 +47,52 @@
@Override
public CompletableFuture<Integer> size() {
- return atomixSet.size();
+ return adaptFuture(atomixSet.size());
}
@Override
public CompletableFuture<Boolean> add(E element) {
- return atomixSet.add(element);
+ return adaptFuture(atomixSet.add(element));
}
@Override
public CompletableFuture<Boolean> remove(E element) {
- return atomixSet.remove(element);
+ return adaptFuture(atomixSet.remove(element));
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return atomixSet.isEmpty();
+ return adaptFuture(atomixSet.isEmpty());
}
@Override
public CompletableFuture<Void> clear() {
- return atomixSet.clear();
+ return adaptFuture(atomixSet.clear());
}
@Override
public CompletableFuture<Boolean> contains(E element) {
- return atomixSet.contains(element);
+ return adaptFuture(atomixSet.contains(element));
}
@Override
public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
- return atomixSet.addAll(c);
+ return adaptFuture(atomixSet.addAll(c));
}
@Override
public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
- return atomixSet.containsAll(c);
+ return adaptFuture(atomixSet.containsAll(c));
}
@Override
public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
- return atomixSet.retainAll(c);
+ return adaptFuture(atomixSet.retainAll(c));
}
@Override
public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
- return atomixSet.removeAll(c);
+ return adaptFuture(atomixSet.removeAll(c));
}
@Override
@@ -106,14 +108,14 @@
SetEvent.Type.valueOf(event.type().name()),
event.element()));
listenerMap.put(listener, atomixListener);
- return atomixSet.addListener(atomixListener);
+ return adaptFuture(atomixSet.addListener(atomixListener));
}
@Override
public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
io.atomix.core.collection.CollectionEventListener<E> atomixListener = listenerMap.remove(listener);
if (atomixListener != null) {
- return atomixSet.removeListener(atomixListener);
+ return adaptFuture(atomixSet.removeListener(atomixListener));
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
index 29203a2..a6d6536 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDistributedTopic.java
@@ -26,6 +26,8 @@
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.Topic;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Default implementation of {@link Topic}.
*
@@ -52,7 +54,7 @@
@Override
public CompletableFuture<Void> publish(T message) {
- return atomixValue.set(message);
+ return adaptFuture(atomixValue.set(message));
}
@Override
@@ -60,7 +62,7 @@
AtomicValueEventListener<T> valueListener =
event -> executor.execute(() -> callback.accept(event.newValue()));
if (callbacks.putIfAbsent(callback, valueListener) == null) {
- return atomixValue.addListener(valueListener);
+ return adaptFuture(atomixValue.addListener(valueListener));
}
return CompletableFuture.completedFuture(null);
}
@@ -69,7 +71,7 @@
public CompletableFuture<Void> unsubscribe(Consumer<T> callback) {
AtomicValueEventListener<T> valueListener = callbacks.remove(callback);
if (valueListener != null) {
- return atomixValue.removeListener(valueListener);
+ return adaptFuture(atomixValue.removeListener(valueListener));
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
index 1989607..206da04 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixDocumentTree.java
@@ -20,7 +20,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.onosproject.store.primitives.NodeUpdate;
@@ -29,12 +28,12 @@
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
-import org.onosproject.store.service.IllegalDocumentModificationException;
-import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptTreeFuture;
+
/**
* Atomix document tree.
*/
@@ -59,7 +58,7 @@
@Override
public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
- return atomixTree.getChildren(toAtomixPath(path))
+ return adaptTreeFuture(atomixTree.getChildren(toAtomixPath(path)))
.thenApply(map -> map.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(),
e -> toVersioned(e.getValue()))));
@@ -67,37 +66,37 @@
@Override
public CompletableFuture<Versioned<V>> get(DocumentPath path) {
- return atomixTree.get(toAtomixPath(path)).thenApply(this::toVersioned);
+ return adaptTreeFuture(atomixTree.get(toAtomixPath(path))).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
- return atomixTree.set(toAtomixPath(path), value).thenApply(this::toVersioned);
+ return adaptTreeFuture(atomixTree.set(toAtomixPath(path), value)).thenApply(this::toVersioned);
}
@Override
public CompletableFuture<Boolean> create(DocumentPath path, V value) {
- return convertException(atomixTree.create(toAtomixPath(path), value));
+ return adaptTreeFuture(atomixTree.create(toAtomixPath(path), value));
}
@Override
public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
- return atomixTree.createRecursive(toAtomixPath(path), value);
+ return adaptTreeFuture(atomixTree.createRecursive(toAtomixPath(path), value));
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
- return atomixTree.replace(toAtomixPath(path), newValue, version);
+ return adaptTreeFuture(atomixTree.replace(toAtomixPath(path), newValue, version));
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
- return atomixTree.replace(toAtomixPath(path), newValue, currentValue);
+ return adaptTreeFuture(atomixTree.replace(toAtomixPath(path), newValue, currentValue));
}
@Override
public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
- return atomixTree.remove(toAtomixPath(path)).thenApply(this::toVersioned);
+ return adaptTreeFuture(atomixTree.remove(toAtomixPath(path))).thenApply(this::toVersioned);
}
@Override
@@ -109,14 +108,14 @@
event.newValue().map(this::toVersioned),
event.oldValue().map(this::toVersioned)));
listenerMap.put(listener, atomixListener);
- return atomixTree.addListener(toAtomixPath(path), atomixListener);
+ return adaptTreeFuture(atomixTree.addListener(toAtomixPath(path), atomixListener));
}
@Override
public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
io.atomix.core.tree.DocumentTreeEventListener<V> atomixListener = listenerMap.remove(listener);
if (atomixListener != null) {
- return atomixTree.removeListener(atomixListener);
+ return adaptTreeFuture(atomixTree.removeListener(atomixListener));
}
return CompletableFuture.completedFuture(null);
}
@@ -146,25 +145,6 @@
throw new UnsupportedOperationException();
}
- private <T> CompletableFuture<T> convertException(CompletableFuture<T> future) {
- CompletableFuture<T> newFuture = new CompletableFuture<>();
- future.whenComplete((result, error) -> {
- if (error == null) {
- newFuture.complete(result);
- } else {
- Throwable cause = Throwables.getRootCause(error);
- if (cause instanceof io.atomix.core.tree.NoSuchDocumentPathException) {
- newFuture.completeExceptionally(new NoSuchDocumentPathException());
- } else if (cause instanceof io.atomix.core.tree.IllegalDocumentModificationException) {
- newFuture.completeExceptionally(new IllegalDocumentModificationException());
- } else {
- newFuture.completeExceptionally(cause);
- }
- }
- });
- return newFuture;
- }
-
private DocumentPath toOnosPath(io.atomix.core.tree.DocumentPath path) {
List<String> pathElements = Lists.newArrayList(path.pathElements());
pathElements.set(0, DocumentPath.ROOT.pathElements().get(0));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixFutures.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixFutures.java
new file mode 100644
index 0000000..d41da6a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixFutures.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.atomix.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.base.Throwables;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.IllegalDocumentModificationException;
+import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.StorageException;
+
+/**
+ * Utility class for adapting Atomix futures.
+ */
+final class AtomixFutures {
+
+ /**
+ * Adapts the given Atomix future to ONOS.
+ *
+ * @param future the future to adapt
+ * @param <T> the future result type
+ * @return the adapted future
+ */
+ static <T> CompletableFuture<T> adaptFuture(CompletableFuture<T> future) {
+ CompletableFuture<T> newFuture = new CompletableFuture<>();
+ future.whenComplete((result, error) -> {
+ if (error == null) {
+ newFuture.complete(result);
+ } else {
+ Throwable cause = Throwables.getRootCause(error);
+ if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
+ newFuture.completeExceptionally(new StorageException.ConcurrentModification());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
+ newFuture.completeExceptionally(new StorageException.Timeout());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
+ newFuture.completeExceptionally(new StorageException.Interrupted());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
+ newFuture.completeExceptionally(new StorageException.Unavailable());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
+ newFuture.completeExceptionally(new StorageException(cause.getMessage()));
+ } else {
+ newFuture.completeExceptionally(cause);
+ }
+ }
+ });
+ return newFuture;
+ }
+
+ /**
+ * Adapts the given Atomix future to map exceptions.
+ *
+ * @param future the future to adapt
+ * @param <T> the future result type
+ * @return the adapted future
+ */
+ static <T> CompletableFuture<T> adaptMapFuture(CompletableFuture<T> future) {
+ CompletableFuture<T> newFuture = new CompletableFuture<>();
+ future.whenComplete((result, error) -> {
+ if (error == null) {
+ newFuture.complete(result);
+ } else {
+ Throwable cause = Throwables.getRootCause(error);
+ if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
+ newFuture.completeExceptionally(
+ new ConsistentMapException.ConcurrentModification(cause.getMessage()));
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
+ newFuture.completeExceptionally(new ConsistentMapException.Timeout(cause.getMessage()));
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
+ newFuture.completeExceptionally(new ConsistentMapException.Interrupted());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
+ newFuture.completeExceptionally(new ConsistentMapException.Unavailable());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
+ newFuture.completeExceptionally(new ConsistentMapException(cause.getMessage()));
+ } else {
+ newFuture.completeExceptionally(cause);
+ }
+ }
+ });
+ return newFuture;
+ }
+
+ /**
+ * Adapts the given Atomix future to document tree exceptions.
+ *
+ * @param future the future to adapt
+ * @param <T> the future result type
+ * @return the adapted future
+ */
+ static <T> CompletableFuture<T> adaptTreeFuture(CompletableFuture<T> future) {
+ CompletableFuture<T> newFuture = new CompletableFuture<>();
+ future.whenComplete((result, error) -> {
+ if (error == null) {
+ newFuture.complete(result);
+ } else {
+ Throwable cause = Throwables.getRootCause(error);
+ if (cause instanceof io.atomix.core.tree.NoSuchDocumentPathException) {
+ newFuture.completeExceptionally(new NoSuchDocumentPathException());
+ } else if (cause instanceof io.atomix.core.tree.IllegalDocumentModificationException) {
+ newFuture.completeExceptionally(new IllegalDocumentModificationException());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.ConcurrentModification) {
+ newFuture.completeExceptionally(new StorageException.ConcurrentModification());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Timeout) {
+ newFuture.completeExceptionally(new StorageException.Timeout());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Interrupted) {
+ newFuture.completeExceptionally(new StorageException.Interrupted());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException.Unavailable) {
+ newFuture.completeExceptionally(new StorageException.Unavailable());
+ } else if (cause instanceof io.atomix.primitive.PrimitiveException) {
+ newFuture.completeExceptionally(new StorageException(cause.getMessage()));
+ } else {
+ newFuture.completeExceptionally(cause);
+ }
+ }
+ });
+ return newFuture;
+ }
+
+ private AtomixFutures() {
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
index a421465..e92eafb 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixLeaderElector.java
@@ -28,6 +28,8 @@
import org.onosproject.event.Change;
import org.onosproject.store.service.AsyncLeaderElector;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix leader elector.
*/
@@ -49,37 +51,37 @@
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return atomixElector.run(topic, nodeId).thenApply(leadership -> toLeadership(topic, leadership));
+ return adaptFuture(atomixElector.run(topic, nodeId)).thenApply(leadership -> toLeadership(topic, leadership));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return atomixElector.withdraw(topic, localNodeId);
+ return adaptFuture(atomixElector.withdraw(topic, localNodeId));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return atomixElector.anoint(topic, nodeId);
+ return adaptFuture(atomixElector.anoint(topic, nodeId));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
- return atomixElector.evict(nodeId);
+ return adaptFuture(atomixElector.evict(nodeId));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return atomixElector.promote(topic, nodeId);
+ return adaptFuture(atomixElector.promote(topic, nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
- return atomixElector.getLeadership(topic).thenApply(leadership -> toLeadership(topic, leadership));
+ return adaptFuture(atomixElector.getLeadership(topic)).thenApply(leadership -> toLeadership(topic, leadership));
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return atomixElector.getLeaderships()
+ return adaptFuture(atomixElector.getLeaderships())
.thenApply(leaderships -> leaderships.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> toLeadership(e.getKey(), e.getValue()))));
}
@@ -91,14 +93,14 @@
toLeadership(event.topic(), event.oldLeadership()),
toLeadership(event.topic(), event.newLeadership())));
listenerMap.put(consumer, atomixListener);
- return atomixElector.addListener(atomixListener);
+ return adaptFuture(atomixElector.addListener(atomixListener));
}
@Override
public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
LeadershipEventListener<NodeId> atomixListener = listenerMap.remove(consumer);
if (atomixListener != null) {
- return atomixElector.removeListener(atomixListener);
+ return adaptFuture(atomixElector.removeListener(atomixListener));
}
return CompletableFuture.completedFuture(null);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
index 8382851..9125b4d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/atomix/primitives/impl/AtomixWorkQueue.java
@@ -25,6 +25,8 @@
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
+import static org.onosproject.store.atomix.primitives.impl.AtomixFutures.adaptFuture;
+
/**
* Atomix work queue.
*/
@@ -42,12 +44,12 @@
@Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
- return atomixWorkQueue.addMultiple(items);
+ return adaptFuture(atomixWorkQueue.addMultiple(items));
}
@Override
public CompletableFuture<Collection<Task<E>>> take(int maxItems) {
- return atomixWorkQueue.take(maxItems)
+ return adaptFuture(atomixWorkQueue.take(maxItems))
.thenApply(tasks -> tasks.stream()
.map(task -> new Task<>(task.taskId(), task.payload()))
.collect(Collectors.toList()));
@@ -55,23 +57,23 @@
@Override
public CompletableFuture<Void> complete(Collection<String> taskIds) {
- return atomixWorkQueue.complete(taskIds);
+ return adaptFuture(atomixWorkQueue.complete(taskIds));
}
@Override
public CompletableFuture<Void> registerTaskProcessor(
Consumer<E> taskProcessor, int parallelism, Executor executor) {
- return atomixWorkQueue.registerTaskProcessor(taskProcessor, parallelism, executor);
+ return adaptFuture(atomixWorkQueue.registerTaskProcessor(taskProcessor, parallelism, executor));
}
@Override
public CompletableFuture<Void> stopProcessing() {
- return atomixWorkQueue.stopProcessing();
+ return adaptFuture(atomixWorkQueue.stopProcessing());
}
@Override
public CompletableFuture<WorkQueueStats> stats() {
- return atomixWorkQueue.stats()
+ return adaptFuture(atomixWorkQueue.stats())
.thenApply(stats -> WorkQueueStats.builder()
.withTotalCompleted(stats.totalCompleted())
.withTotalInProgress(stats.totalInProgress())