[ONOS-6267] Detect and complete blocked futures on I/O threads.

Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
index 021dbe5..836a682 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
@@ -32,24 +32,38 @@
 public abstract class ExecutingDistributedPrimitive
         extends DelegatingDistributedPrimitive {
     private final DistributedPrimitive primitive;
-    private final Executor executor;
+    private final Executor orderedExecutor;
+    private final Executor threadPoolExecutor;
     private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
 
-    protected ExecutingDistributedPrimitive(DistributedPrimitive primitive, Executor executor) {
+    protected ExecutingDistributedPrimitive(
+            DistributedPrimitive primitive, Executor orderedExecutor, Executor threadPoolExecutor) {
         super(primitive);
         this.primitive = primitive;
-        this.executor = checkNotNull(executor);
+        this.orderedExecutor = checkNotNull(orderedExecutor);
+        this.threadPoolExecutor = checkNotNull(threadPoolExecutor);
+    }
+
+    /**
+     * Creates a future to be completed asynchronously on the provided ordered and thread pool executors.
+     *
+     * @param future the future to be completed asynchronously
+     * @param <T> future result type
+     * @return a new {@link CompletableFuture} to be completed asynchronously using the primitive thread model
+     */
+    protected <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future) {
+        return Tools.orderedFuture(future, orderedExecutor, threadPoolExecutor);
     }
 
     @Override
     public CompletableFuture<Void> destroy() {
-        return Tools.asyncFuture(primitive.destroy(), executor);
+        return asyncFuture(primitive.destroy());
     }
 
     @Override
     public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
         Consumer<DistributedPrimitive.Status> wrappedListener =
-                status -> executor.execute(() -> listener.accept(status));
+                status -> orderedExecutor.execute(() -> listener.accept(status));
         listenerMap.put(listener, wrappedListener);
         primitive.addStatusChangeListener(wrappedListener);
     }