[ONOS-6267] Detect and complete blocked futures on I/O threads.
Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
index 40eacc6..c8bba52 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
@@ -20,7 +20,6 @@
import java.util.concurrent.Executor;
import com.google.common.collect.Maps;
-import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValueEventListener;
@@ -30,47 +29,48 @@
*/
public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
private final AsyncAtomicValue<V> delegateValue;
- private final Executor executor;
+ private final Executor orderedExecutor;
private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
- public ExecutingAsyncAtomicValue(AsyncAtomicValue<V> delegateValue, Executor executor) {
- super(delegateValue, executor);
+ public ExecutingAsyncAtomicValue(
+ AsyncAtomicValue<V> delegateValue, Executor orderedExecutor, Executor threadPoolExecutor) {
+ super(delegateValue, orderedExecutor, threadPoolExecutor);
this.delegateValue = delegateValue;
- this.executor = executor;
+ this.orderedExecutor = orderedExecutor;
}
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
- return Tools.asyncFuture(delegateValue.compareAndSet(expect, update), executor);
+ return asyncFuture(delegateValue.compareAndSet(expect, update));
}
@Override
public CompletableFuture<V> get() {
- return Tools.asyncFuture(delegateValue.get(), executor);
+ return asyncFuture(delegateValue.get());
}
@Override
public CompletableFuture<V> getAndSet(V value) {
- return Tools.asyncFuture(delegateValue.getAndSet(value), executor);
+ return asyncFuture(delegateValue.getAndSet(value));
}
@Override
public CompletableFuture<Void> set(V value) {
- return Tools.asyncFuture(delegateValue.set(value), executor);
+ return asyncFuture(delegateValue.set(value));
}
@Override
public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
- AtomicValueEventListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+ AtomicValueEventListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
listenerMap.put(listener, wrappedListener);
- return Tools.asyncFuture(delegateValue.addListener(wrappedListener), executor);
+ return asyncFuture(delegateValue.addListener(wrappedListener));
}
@Override
public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
if (wrappedListener != null) {
- return Tools.asyncFuture(delegateValue.removeListener(wrappedListener), executor);
+ return asyncFuture(delegateValue.removeListener(wrappedListener));
}
return CompletableFuture.completedFuture(null);
}