[ONOS-6267] Detect and complete blocked futures on I/O threads.

Change-Id: I0488dc5096f9e610b97405ad05c02d0ff3854b5f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
index 40eacc6..c8bba52 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
@@ -20,7 +20,6 @@
 import java.util.concurrent.Executor;
 
 import com.google.common.collect.Maps;
-import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValueEventListener;
 
@@ -30,47 +29,48 @@
  */
 public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
     private final AsyncAtomicValue<V> delegateValue;
-    private final Executor executor;
+    private final Executor orderedExecutor;
     private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
 
-    public ExecutingAsyncAtomicValue(AsyncAtomicValue<V> delegateValue, Executor executor) {
-        super(delegateValue, executor);
+    public ExecutingAsyncAtomicValue(
+            AsyncAtomicValue<V> delegateValue, Executor orderedExecutor, Executor threadPoolExecutor) {
+        super(delegateValue, orderedExecutor, threadPoolExecutor);
         this.delegateValue = delegateValue;
-        this.executor = executor;
+        this.orderedExecutor = orderedExecutor;
     }
 
     @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
-        return Tools.asyncFuture(delegateValue.compareAndSet(expect, update), executor);
+        return asyncFuture(delegateValue.compareAndSet(expect, update));
     }
 
     @Override
     public CompletableFuture<V> get() {
-        return Tools.asyncFuture(delegateValue.get(), executor);
+        return asyncFuture(delegateValue.get());
     }
 
     @Override
     public CompletableFuture<V> getAndSet(V value) {
-        return Tools.asyncFuture(delegateValue.getAndSet(value), executor);
+        return asyncFuture(delegateValue.getAndSet(value));
     }
 
     @Override
     public CompletableFuture<Void> set(V value) {
-        return Tools.asyncFuture(delegateValue.set(value), executor);
+        return asyncFuture(delegateValue.set(value));
     }
 
     @Override
     public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
-        AtomicValueEventListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+        AtomicValueEventListener<V> wrappedListener = e -> orderedExecutor.execute(() -> listener.event(e));
         listenerMap.put(listener, wrappedListener);
-        return Tools.asyncFuture(delegateValue.addListener(wrappedListener), executor);
+        return asyncFuture(delegateValue.addListener(wrappedListener));
     }
 
     @Override
     public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
         AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
         if (wrappedListener != null) {
-            return Tools.asyncFuture(delegateValue.removeListener(wrappedListener), executor);
+            return asyncFuture(delegateValue.removeListener(wrappedListener));
         }
         return CompletableFuture.completedFuture(null);
     }