[ONOS-6267] Detect and complete blocked futures on I/O threads.
Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
index 021dbe5..836a682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
@@ -32,24 +32,38 @@
public abstract class ExecutingDistributedPrimitive
extends DelegatingDistributedPrimitive {
private final DistributedPrimitive primitive;
- private final Executor executor;
+ private final Executor orderedExecutor;
+ private final Executor threadPoolExecutor;
private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
- protected ExecutingDistributedPrimitive(DistributedPrimitive primitive, Executor executor) {
+ protected ExecutingDistributedPrimitive(
+ DistributedPrimitive primitive, Executor orderedExecutor, Executor threadPoolExecutor) {
super(primitive);
this.primitive = primitive;
- this.executor = checkNotNull(executor);
+ this.orderedExecutor = checkNotNull(orderedExecutor);
+ this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
+ }
+
+ /**
+ * Creates a future to be completed asynchronously on the provided ordered and thread pool executors.
+ *
+ * @param future the future to be completed asynchronously
+ * @param <T> future result type
+ * @return a new {@link CompletableFuture} to be completed asynchronously using the primitive thread model
+ */
+ protected <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future) {
+ return Tools.orderedFuture(future, orderedExecutor, threadPoolExecutor);
}
@Override
public CompletableFuture<Void> destroy() {
- return Tools.asyncFuture(primitive.destroy(), executor);
+ return asyncFuture(primitive.destroy());
}
@Override
public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
Consumer<DistributedPrimitive.Status> wrappedListener =
- status -> executor.execute(() -> listener.accept(status));
+ status -> orderedExecutor.execute(() -> listener.accept(status));
listenerMap.put(listener, wrappedListener);
primitive.addStatusChangeListener(wrappedListener);
}