[ONOS-6267] Support configurable Executors for primitives
- Support user-provided Executors in primitive builders
- Implement default per-partition per-primitive serial executor using a shared thread pool
- Implement Executor wrappers for all primitive types

Change-Id: I53acfb173a9b49a992a9a388983791d9735ed54a
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
index a122125..f9939da 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
@@ -15,6 +15,9 @@
  */
 package org.onosproject.store.primitives;
 
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+
 import org.onosproject.core.ApplicationId;
 import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.Serializer;
@@ -27,10 +30,11 @@
 public abstract class DistributedPrimitiveBuilder<B extends DistributedPrimitiveBuilder<B, T>,
                                                   T extends DistributedPrimitive> {
 
-    private DistributedPrimitive.Type type;
+    private final DistributedPrimitive.Type type;
     private String name;
     private ApplicationId applicationId;
     private Serializer serializer;
+    private Supplier<Executor> executorSupplier;
     private boolean partitionsDisabled = false;
     private boolean meteringDisabled = false;
     private boolean readOnly = false;
@@ -63,6 +67,31 @@
     }
 
     /**
+     * Sets the executor to use for asynchronous callbacks.
+     * <p>
+     * For partitioned primitives, the provided executor will be shared across all partitions.
+     *
+     * @param executor the executor to use for asynchronous callbacks
+     * @return this builder
+     */
+    public B withExecutor(Executor executor) {
+        return withExecutorSupplier(() -> executor);
+    }
+
+    /**
+     * Sets the supplier to be used to create executors.
+     * <p>
+     * When a factory is set, the supplier will be used to create a separate executor for each partition.
+     *
+     * @param executorSupplier the executor supplier
+     * @return this builder
+     */
+    public B withExecutorSupplier(Supplier<Executor> executorSupplier) {
+        this.executorSupplier = executorSupplier;
+        return (B) this;
+    }
+
+    /**
      * Sets the application id that owns this primitive.
      *
      * @param applicationId application identifier
@@ -148,6 +177,15 @@
     }
 
     /**
+     * Returns the executor supplier.
+     *
+     * @return executor supplier
+     */
+    public final Supplier<Executor> executorSupplier() {
+        return executorSupplier;
+    }
+
+    /**
      * Returns the application identifier.
      *
      * @return application id
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index d034261..6ee43e5 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -28,6 +28,8 @@
 import org.onosproject.store.service.WorkQueue;
 
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 
 /**
  * Interface for entity that can create instances of different distributed primitives.
@@ -43,7 +45,22 @@
      * @param <V> value type
      * @return map
      */
-    <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer);
+    default <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+        return newAsyncConsistentMap(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncConsistentMap}.
+     *
+     * @param name map name
+     * @param serializer serializer to use for serializing/deserializing map entries
+     * @param executorSupplier a callback that returns an executor to be used for each partition
+     * @param <K> key type
+     * @param <V> value type
+     * @return map
+     */
+    <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncConsistentTreeMap}.
@@ -53,8 +70,22 @@
      * @param <V> value type
      * @return distributedTreeMap
      */
+    default <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+            String name, Serializer serializer) {
+        return newAsyncConsistentTreeMap(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncConsistentTreeMap}.
+     *
+     * @param name tree name
+     * @param serializer serializer to use for serializing/deserializing map entries
+     * @param executorSupplier a callback that returns an executor to be used for each partition
+     * @param <V> value type
+     * @return distributedTreeMap
+     */
     <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
-            String name, Serializer serializer);
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new set backed {@code AsyncConsistentMultimap}.
@@ -65,8 +96,23 @@
      * @param <V> value type
      * @return set backed distributedMultimap
      */
+    default <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
+            String name, Serializer serializer) {
+        return newAsyncConsistentSetMultimap(name, serializer, null);
+    }
+
+    /**
+     * Creates a new set backed {@code AsyncConsistentMultimap}.
+     *
+     * @param name multimap name
+     * @param serializer serializer to use for serializing/deserializing
+     * @param executorSupplier a callback that returns an executor to be used for each partition
+     * @param <K> key type
+     * @param <V> value type
+     * @return set backed distributedMultimap
+     */
     <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
-            String name, Serializer serializer);
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncAtomicCounterMap}.
@@ -76,8 +122,22 @@
      * @param <K> key type
      * @return atomic counter map
      */
+    default <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+            String name, Serializer serializer) {
+        return newAsyncAtomicCounterMap(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncAtomicCounterMap}.
+     *
+     * @param name counter map name
+     * @param serializer serializer to use for serializing/deserializing keys
+     * @param executorSupplier a callback that returns an executor to be used for each partition
+     * @param <K> key type
+     * @return atomic counter map
+     */
     <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
-        String name, Serializer serializer);
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncAtomicCounter}.
@@ -85,7 +145,18 @@
      * @param name counter name
      * @return counter
      */
-    AsyncAtomicCounter newAsyncCounter(String name);
+    default AsyncAtomicCounter newAsyncCounter(String name) {
+        return newAsyncCounter(name, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncAtomicCounter}.
+     *
+     * @param name counter name
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @return counter
+     */
+    AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncAtomicValue}.
@@ -95,7 +166,21 @@
      * @param <V> value type
      * @return value
      */
-    <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer);
+    default <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+        return newAsyncAtomicValue(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncAtomicValue}.
+     *
+     * @param name value name
+     * @param serializer serializer to use for serializing/deserializing value type
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @param <V> value type
+     * @return value
+     */
+    <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncDistributedSet}.
@@ -105,7 +190,21 @@
      * @param <E> set entry type
      * @return set
      */
-    <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer);
+    default <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+        return newAsyncDistributedSet(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncDistributedSet}.
+     *
+     * @param name set name
+     * @param serializer serializer to use for serializing/deserializing set entries
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @param <E> set entry type
+     * @return set
+     */
+    <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncLeaderElector}.
@@ -113,7 +212,18 @@
      * @param name leader elector name
      * @return leader elector
      */
-    AsyncLeaderElector newAsyncLeaderElector(String name);
+    default AsyncLeaderElector newAsyncLeaderElector(String name) {
+        return newAsyncLeaderElector(name, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncLeaderElector}.
+     *
+     * @param name leader elector name
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @return leader elector
+     */
+    AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code WorkQueue}.
@@ -123,7 +233,20 @@
      * @param serializer serializer
      * @return work queue
      */
-    <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
+    default <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+        return newWorkQueue(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code WorkQueue}.
+     *
+     * @param <E> work element type
+     * @param name work queue name
+     * @param serializer serializer
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @return work queue
+     */
+    <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Creates a new {@code AsyncDocumentTree}.
@@ -133,7 +256,21 @@
      * @param serializer serializer
      * @return document tree
      */
-    <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer);
+    default <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+        return newAsyncDocumentTree(name, serializer, null);
+    }
+
+    /**
+     * Creates a new {@code AsyncDocumentTree}.
+     *
+     * @param <V> document tree node value type
+     * @param name tree name
+     * @param serializer serializer
+     * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+     * @return document tree
+     */
+    <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier);
 
     /**
      * Returns the names of all created {@code AsyncConsistentMap} instances.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
index 45cf193..a189eb2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
@@ -32,6 +32,6 @@
 
     @Override
     public AsyncAtomicCounter build() {
-        return primitiveCreator.newAsyncCounter(name());
+        return primitiveCreator.newAsyncCounter(name(), executorSupplier());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
index 309220a..c92ef22 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
@@ -33,7 +33,7 @@
 
     @Override
     public AsyncAtomicCounterMap<K> buildAsyncMap() {
-        return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer());
+        return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer(), executorSupplier());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index b17983c..a63fe4be 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import org.onosproject.store.service.AsyncAtomicValue;
@@ -37,6 +38,12 @@
     }
 
     @Override
+    public AtomicValueBuilder<V> withExecutorSupplier(Supplier<Executor> executorSupplier) {
+        mapBuilder.withExecutorSupplier(executorSupplier);
+        return this;
+    }
+
+    @Override
     public AsyncAtomicValue<V> build() {
         return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
                                              checkNotNull(serializer()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index b5284de..820174d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -41,7 +41,7 @@
 
     @Override
     public AsyncConsistentMap<K, V> buildAsyncMap() {
-        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+        AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer(), executorSupplier());
         map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
         map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
         return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
index 976862b..ba7d673 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
@@ -36,8 +36,7 @@
 
     @Override
     public AsyncConsistentMultimap<K, V> buildMultimap() {
-        return primitiveCreator.newAsyncConsistentSetMultimap(
-                name(), serializer());
+        return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer(), executorSupplier());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
index 65a364e..5e2a8b4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
@@ -35,8 +35,7 @@
 
     @Override
     public AsyncConsistentTreeMap<V> buildTreeMap() {
-        return primitiveCreator.newAsyncConsistentTreeMap(name(),
-                                                          serializer());
+        return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer(), executorSupplier());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
index c17f91d..5e95180 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import java.util.concurrent.Executor;
 import java.util.function.Supplier;
 
 import org.onosproject.core.ApplicationId;
@@ -53,6 +54,12 @@
     }
 
     @Override
+    public DistributedSetBuilder<E> withExecutorSupplier(Supplier<Executor> executorSupplier) {
+        mapBuilder.withExecutorSupplier(executorSupplier);
+        return this;
+    }
+
+    @Override
     public DistributedSetBuilder<E> withPurgeOnUninstall() {
         mapBuilder.withPurgeOnUninstall();
         return this;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
index 8d21e60..65c0504 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
@@ -47,6 +47,6 @@
     @Deprecated
     @Override
     public AsyncDocumentTree<V> build() {
-        return primitiveCreator.newAsyncDocumentTree(name(), serializer());
+        return primitiveCreator.newAsyncDocumentTree(name(), serializer(), executorSupplier());
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 69788f9..6f8f55d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -32,6 +32,6 @@
 
     @Override
     public AsyncLeaderElector build() {
-        return primitiveCreator.newAsyncLeaderElector(name());
+        return primitiveCreator.newAsyncLeaderElector(name(), executorSupplier());
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index c0b323a..15279e0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -16,8 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.util.Collection;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -28,7 +26,7 @@
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
-import org.onosproject.core.ApplicationId;
+import com.google.common.base.MoreObjects;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.AsyncConsistentMap;
@@ -37,8 +35,6 @@
 import org.onosproject.store.service.Version;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.base.MoreObjects;
-
 /**
  * {@code AsyncConsistentMap} that merely delegates control to
  * another AsyncConsistentMap.
@@ -46,22 +42,14 @@
  * @param <K> key type
  * @param <V> value type
  */
-public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
+public class DelegatingAsyncConsistentMap<K, V>
+        extends DelegatingDistributedPrimitive implements AsyncConsistentMap<K, V> {
 
     private final AsyncConsistentMap<K, V> delegateMap;
 
     DelegatingAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap) {
-        this.delegateMap = checkNotNull(delegateMap, "delegate map cannot be null");
-    }
-
-    @Override
-    public String name() {
-        return delegateMap.name();
-    }
-
-    @Override
-    public ApplicationId applicationId() {
-        return delegateMap.applicationId();
+        super(delegateMap);
+        this.delegateMap = delegateMap;
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index a425c32..5bd37c5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,19 +16,15 @@
 
 package org.onosproject.store.primitives.impl;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
 import com.google.common.collect.Multiset;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.Versioned;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
 /**
  * {@code AsyncConsistentMultimap} that merely delegates control to
  * another AsyncConsistentMultimap.
@@ -37,18 +33,14 @@
  * @param <V> value type
  */
 public class DelegatingAsyncConsistentMultimap<K, V>
-        implements AsyncConsistentMultimap<K, V> {
+        extends DelegatingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
 
     private final AsyncConsistentMultimap<K, V> delegateMap;
 
     public DelegatingAsyncConsistentMultimap(
             AsyncConsistentMultimap<K, V> delegateMap) {
-        this.delegateMap = Preconditions.checkNotNull(delegateMap);
-    }
-
-    @Override
-    public String name() {
-        return delegateMap.name();
+        super(delegateMap);
+        this.delegateMap = delegateMap;
     }
 
     @Override
@@ -144,41 +136,4 @@
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return delegateMap.asMap();
     }
-
-    @Override
-    public void addStatusChangeListener(Consumer<Status> listener) {
-        delegateMap.addStatusChangeListener(listener);
-    }
-
-    @Override
-    public void removeStatusChangeListener(Consumer<Status> listener) {
-        delegateMap.removeStatusChangeListener(listener);
-    }
-
-    @Override
-    public Collection<Consumer<Status>> statusChangeListeners() {
-        return delegateMap.statusChangeListeners();
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("delegateMap", delegateMap)
-                .toString();
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(delegateMap);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (other instanceof DelegatingAsyncConsistentMultimap) {
-            DelegatingAsyncConsistentMultimap<K, V> that =
-                    (DelegatingAsyncConsistentMultimap) other;
-            return this.delegateMap.equals(that.delegateMap);
-        }
-        return false;
-    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index f474f7b..972fd72 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -16,14 +16,6 @@
 
 package org.onosproject.store.primitives.impl;
 
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -35,6 +27,14 @@
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -42,11 +42,12 @@
  * of {@link AsyncConsistentTreeMap}.
  */
 public class DelegatingAsyncConsistentTreeMap<V>
-        implements AsyncConsistentTreeMap<V> {
+        extends DelegatingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
 
     private final AsyncConsistentTreeMap<V> delegateMap;
 
     DelegatingAsyncConsistentTreeMap(AsyncConsistentTreeMap<V> delegateMap) {
+        super(delegateMap);
         this.delegateMap = checkNotNull(delegateMap,
                                         "delegate map cannot be null");
     }
@@ -137,11 +138,6 @@
     }
 
     @Override
-    public String name() {
-        return delegateMap.name();
-    }
-
-    @Override
     public CompletableFuture<Integer> size() {
         return delegateMap.size();
     }
@@ -293,5 +289,4 @@
     public int hashCode() {
         return Objects.hash(delegateMap);
     }
-
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java
new file mode 100644
index 0000000..10659d1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.DistributedPrimitive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for primitive delegates.
+ */
+public abstract class DelegatingDistributedPrimitive implements DistributedPrimitive {
+    private final DistributedPrimitive primitive;
+
+    public DelegatingDistributedPrimitive(DistributedPrimitive primitive) {
+        this.primitive = checkNotNull(primitive);
+    }
+
+    @Override
+    public String name() {
+        return primitive.name();
+    }
+
+    @Override
+    public Type primitiveType() {
+        return primitive.primitiveType();
+    }
+
+    @Override
+    public ApplicationId applicationId() {
+        return primitive.applicationId();
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        return primitive.destroy();
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        primitive.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        primitive.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return primitive.statusChangeListeners();
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("delegate", primitive)
+                .toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(primitive);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return other instanceof DelegatingDistributedPrimitive
+                && primitive.equals(((DelegatingDistributedPrimitive) other).primitive);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
new file mode 100644
index 0000000..5d72838
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicCounter;
+
+/**
+ * {@link AsyncAtomicCounter} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicCounter extends ExecutingDistributedPrimitive implements AsyncAtomicCounter {
+    private final AsyncAtomicCounter delegateCounter;
+    private final Executor executor;
+
+    public ExecutingAsyncAtomicCounter(AsyncAtomicCounter delegateCounter, Executor executor) {
+        super(delegateCounter, executor);
+        this.delegateCounter = delegateCounter;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Long> incrementAndGet() {
+        return Tools.asyncFuture(delegateCounter.incrementAndGet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndIncrement() {
+        return Tools.asyncFuture(delegateCounter.getAndIncrement(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndAdd(long delta) {
+        return Tools.asyncFuture(delegateCounter.getAndAdd(delta), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> addAndGet(long delta) {
+        return Tools.asyncFuture(delegateCounter.addAndGet(delta), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> get() {
+        return Tools.asyncFuture(delegateCounter.get(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> set(long value) {
+        return Tools.asyncFuture(delegateCounter.set(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+        return Tools.asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
new file mode 100644
index 0000000..9a039f2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+
+/**
+ * {@link org.onosproject.store.service.AsyncAtomicCounterMap} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicCounterMap<K>
+        extends ExecutingDistributedPrimitive implements AsyncAtomicCounterMap<K> {
+    private final AsyncAtomicCounterMap<K> delegateMap;
+    private final Executor executor;
+
+    public ExecutingAsyncAtomicCounterMap(AsyncAtomicCounterMap<K> delegateMap, Executor executor) {
+        super(delegateMap, executor);
+        this.delegateMap = delegateMap;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Long> incrementAndGet(K key) {
+        return Tools.asyncFuture(delegateMap.incrementAndGet(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> decrementAndGet(K key) {
+        return Tools.asyncFuture(delegateMap.decrementAndGet(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndIncrement(K key) {
+        return Tools.asyncFuture(delegateMap.getAndIncrement(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndDecrement(K key) {
+        return Tools.asyncFuture(delegateMap.getAndDecrement(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> addAndGet(K key, long delta) {
+        return Tools.asyncFuture(delegateMap.addAndGet(key, delta), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> getAndAdd(K key, long delta) {
+        return Tools.asyncFuture(delegateMap.getAndAdd(key, delta), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> get(K key) {
+        return Tools.asyncFuture(delegateMap.get(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> put(K key, long newValue) {
+        return Tools.asyncFuture(delegateMap.put(key, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> putIfAbsent(K key, long newValue) {
+        return Tools.asyncFuture(delegateMap.putIfAbsent(key, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(K key, long expectedOldValue, long newValue) {
+        return Tools.asyncFuture(delegateMap.replace(key, expectedOldValue, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Long> remove(K key) {
+        return Tools.asyncFuture(delegateMap.remove(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, long value) {
+        return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return Tools.asyncFuture(delegateMap.size(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return Tools.asyncFuture(delegateMap.isEmpty(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return Tools.asyncFuture(delegateMap.clear(), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
new file mode 100644
index 0000000..40eacc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEventListener;
+
+/**
+ * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
+    private final AsyncAtomicValue<V> delegateValue;
+    private final Executor executor;
+    private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
+
+    public ExecutingAsyncAtomicValue(AsyncAtomicValue<V> delegateValue, Executor executor) {
+        super(delegateValue, executor);
+        this.delegateValue = delegateValue;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
+        return Tools.asyncFuture(delegateValue.compareAndSet(expect, update), executor);
+    }
+
+    @Override
+    public CompletableFuture<V> get() {
+        return Tools.asyncFuture(delegateValue.get(), executor);
+    }
+
+    @Override
+    public CompletableFuture<V> getAndSet(V value) {
+        return Tools.asyncFuture(delegateValue.getAndSet(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> set(V value) {
+        return Tools.asyncFuture(delegateValue.set(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
+        AtomicValueEventListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+        listenerMap.put(listener, wrappedListener);
+        return Tools.asyncFuture(delegateValue.addListener(wrappedListener), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
+        AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
+        if (wrappedListener != null) {
+            return Tools.asyncFuture(delegateValue.removeListener(wrappedListener), executor);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
new file mode 100644
index 0000000..07911af
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * An {@link org.onosproject.store.service.AsyncConsistentMap} that completes asynchronous calls on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentMap<K, V>
+        extends ExecutingDistributedPrimitive implements AsyncConsistentMap<K, V> {
+    private final AsyncConsistentMap<K, V> delegateMap;
+    private final Executor executor;
+
+    public ExecutingAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap, Executor executor) {
+        super(delegateMap, executor);
+        this.delegateMap = delegateMap;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return Tools.asyncFuture(delegateMap.size(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(K key) {
+        return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(K key) {
+        return Tools.asyncFuture(delegateMap.get(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
+        return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> computeIf(
+            K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> put(K key, V value) {
+        return Tools.asyncFuture(delegateMap.put(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
+        return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> remove(K key) {
+        return Tools.asyncFuture(delegateMap.remove(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return Tools.asyncFuture(delegateMap.clear(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Set<K>> keySet() {
+        return Tools.asyncFuture(delegateMap.keySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<V>>> values() {
+        return Tools.asyncFuture(delegateMap.values(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
+        return Tools.asyncFuture(delegateMap.entrySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
+        return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, V value) {
+        return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, long version) {
+        return Tools.asyncFuture(delegateMap.remove(key, version), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> replace(K key, V value) {
+        return Tools.asyncFuture(delegateMap.replace(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
+        return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
+        return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.begin(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.commit(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.rollback(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+        return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
+        return addListener(listener, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
+        return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
+        return Tools.asyncFuture(delegateMap.removeListener(listener), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
new file mode 100644
index 0000000..a2b308a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Multiset;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentMultimap<K, V>
+        extends ExecutingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
+    private final AsyncConsistentMultimap<K, V> delegateMap;
+    private final Executor executor;
+
+    public ExecutingAsyncConsistentMultimap(AsyncConsistentMultimap<K, V> delegateMap, Executor executor) {
+        super(delegateMap, executor);
+        this.delegateMap = delegateMap;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return Tools.asyncFuture(delegateMap.size(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return Tools.asyncFuture(delegateMap.isEmpty(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(K key) {
+        return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsEntry(K key, V value) {
+        return Tools.asyncFuture(delegateMap.containsEntry(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> put(K key, V value) {
+        return Tools.asyncFuture(delegateMap.put(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, V value) {
+        return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+        return Tools.asyncFuture(delegateMap.removeAll(key, values), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+        return Tools.asyncFuture(delegateMap.removeAll(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+        return Tools.asyncFuture(delegateMap.putAll(key, values), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+        return Tools.asyncFuture(delegateMap.replaceValues(key, values), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return Tools.asyncFuture(delegateMap.clear(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+        return Tools.asyncFuture(delegateMap.get(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Set<K>> keySet() {
+        return Tools.asyncFuture(delegateMap.keySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Multiset<K>> keys() {
+        return Tools.asyncFuture(delegateMap.keys(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Multiset<V>> values() {
+        return Tools.asyncFuture(delegateMap.values(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
+        return Tools.asyncFuture(delegateMap.entries(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map<K, Collection<V>>> asMap() {
+        return Tools.asyncFuture(delegateMap.asMap(), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
new file mode 100644
index 0000000..1441941
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link org.onosproject.store.service.AsyncConsistentTreeMap} that executes asynchronous callbacks on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentTreeMap<V>
+        extends ExecutingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
+    private final AsyncConsistentTreeMap<V> delegateMap;
+    private final Executor executor;
+
+    public ExecutingAsyncConsistentTreeMap(AsyncConsistentTreeMap<V> delegateMap, Executor executor) {
+        super(delegateMap, executor);
+        this.delegateMap = delegateMap;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<String> firstKey() {
+        return Tools.asyncFuture(delegateMap.firstKey(), executor);
+    }
+
+    @Override
+    public CompletableFuture<String> lastKey() {
+        return Tools.asyncFuture(delegateMap.lastKey(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
+        return Tools.asyncFuture(delegateMap.ceilingEntry(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
+        return Tools.asyncFuture(delegateMap.floorEntry(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
+        return Tools.asyncFuture(delegateMap.higherEntry(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
+        return Tools.asyncFuture(delegateMap.lowerEntry(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
+        return Tools.asyncFuture(delegateMap.firstEntry(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return Tools.asyncFuture(delegateMap.size(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
+        return Tools.asyncFuture(delegateMap.lastEntry(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
+        return Tools.asyncFuture(delegateMap.pollFirstEntry(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(String key) {
+        return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
+        return Tools.asyncFuture(delegateMap.pollLastEntry(), executor);
+    }
+
+    @Override
+    public CompletableFuture<String> lowerKey(String key) {
+        return Tools.asyncFuture(delegateMap.lowerKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+    }
+
+    @Override
+    public CompletableFuture<String> floorKey(String key) {
+        return Tools.asyncFuture(delegateMap.floorKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<String> ceilingKey(String key) {
+        return Tools.asyncFuture(delegateMap.ceilingKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(String key) {
+        return Tools.asyncFuture(delegateMap.get(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> getOrDefault(String key, V defaultValue) {
+        return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<String> higherKey(String key) {
+        return Tools.asyncFuture(delegateMap.higherKey(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<NavigableSet<String>> navigableKeySet() {
+        return Tools.asyncFuture(delegateMap.navigableKeySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<NavigableMap<String, V>> subMap(
+            String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) {
+        return Tools.asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> computeIf(
+            String key, Predicate<? super V> condition,
+            BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
+        return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> put(String key, V value) {
+        return Tools.asyncFuture(delegateMap.put(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
+        return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> remove(String key) {
+        return Tools.asyncFuture(delegateMap.remove(key), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return Tools.asyncFuture(delegateMap.clear(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> keySet() {
+        return Tools.asyncFuture(delegateMap.keySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<V>>> values() {
+        return Tools.asyncFuture(delegateMap.values(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
+        return Tools.asyncFuture(delegateMap.entrySet(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
+        return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String key, V value) {
+        return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String key, long version) {
+        return Tools.asyncFuture(delegateMap.remove(key, version), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> replace(String key, V value) {
+        return Tools.asyncFuture(delegateMap.replace(key, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
+        return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
+        return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.begin(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.commit(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return Tools.asyncFuture(delegateMap.rollback(transactionId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
+        return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MapEventListener<String, V> listener) {
+        return addListener(listener, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MapEventListener<String, V> listener, Executor executor) {
+        return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MapEventListener<String, V> listener) {
+        return Tools.asyncFuture(delegateMap.removeListener(listener), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
new file mode 100644
index 0000000..352ee7a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTreeListener;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link AsyncDocumentTree} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncDocumentTree<V> extends ExecutingDistributedPrimitive implements AsyncDocumentTree<V> {
+    private final AsyncDocumentTree<V> delegateTree;
+    private final Executor executor;
+    private final Map<DocumentTreeListener<V>, DocumentTreeListener<V>> listenerMap = Maps.newConcurrentMap();
+
+    public ExecutingAsyncDocumentTree(AsyncDocumentTree<V> delegateTree, Executor executor) {
+        super(delegateTree, executor);
+        this.delegateTree = delegateTree;
+        this.executor = executor;
+    }
+
+    @Override
+    public DocumentPath root() {
+        return delegateTree.root();
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+        return Tools.asyncFuture(delegateTree.getChildren(path), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+        return Tools.asyncFuture(delegateTree.get(path), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+        return Tools.asyncFuture(delegateTree.set(path, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+        return Tools.asyncFuture(delegateTree.create(path, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+        return Tools.asyncFuture(delegateTree.createRecursive(path, value), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+        return Tools.asyncFuture(delegateTree.replace(path, newValue, version), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+        return Tools.asyncFuture(delegateTree.replace(path, newValue, currentValue), executor);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+        return Tools.asyncFuture(delegateTree.removeNode(path), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+        DocumentTreeListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+        listenerMap.put(listener, wrappedListener);
+        return Tools.asyncFuture(delegateTree.addListener(path, wrappedListener), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+        DocumentTreeListener<V> wrappedListener = listenerMap.remove(listener);
+        if (wrappedListener != null) {
+            return Tools.asyncFuture(delegateTree.removeListener(wrappedListener), executor);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
new file mode 100644
index 0000000..ecc5b8d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * {@link AsyncLeaderElector} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncLeaderElector extends ExecutingDistributedPrimitive implements AsyncLeaderElector {
+    private final AsyncLeaderElector delegateElector;
+    private final Executor executor;
+    private final Map<Consumer<Change<Leadership>>, Consumer<Change<Leadership>>> listenerMap = Maps.newConcurrentMap();
+
+    public ExecutingAsyncLeaderElector(AsyncLeaderElector delegateElector, Executor executor) {
+        super(delegateElector, executor);
+        this.delegateElector = delegateElector;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+        return Tools.asyncFuture(delegateElector.run(topic, nodeId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> withdraw(String topic) {
+        return Tools.asyncFuture(delegateElector.withdraw(topic), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+        return Tools.asyncFuture(delegateElector.anoint(topic, nodeId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> evict(NodeId nodeId) {
+        return Tools.asyncFuture(delegateElector.evict(nodeId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+        return Tools.asyncFuture(delegateElector.promote(topic, nodeId), executor);
+    }
+
+    @Override
+    public CompletableFuture<Leadership> getLeadership(String topic) {
+        return Tools.asyncFuture(delegateElector.getLeadership(topic), executor);
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+        return Tools.asyncFuture(delegateElector.getLeaderships(), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
+        Consumer<Change<Leadership>> wrappedListener = e -> executor.execute(() -> listener.accept(e));
+        listenerMap.put(listener, wrappedListener);
+        return Tools.asyncFuture(delegateElector.addChangeListener(wrappedListener), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
+        Consumer<Change<Leadership>> wrappedListener = listenerMap.remove(listener);
+        if (wrappedListener != null) {
+            return Tools.asyncFuture(delegateElector.removeChangeListener(wrappedListener), executor);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
new file mode 100644
index 0000000..021dbe5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.DistributedPrimitive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for primitives that delegate asynchronous callbacks to a user provided {@link Executor}.
+ */
+public abstract class ExecutingDistributedPrimitive
+        extends DelegatingDistributedPrimitive {
+    private final DistributedPrimitive primitive;
+    private final Executor executor;
+    private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
+
+    protected ExecutingDistributedPrimitive(DistributedPrimitive primitive, Executor executor) {
+        super(primitive);
+        this.primitive = primitive;
+        this.executor = checkNotNull(executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        return Tools.asyncFuture(primitive.destroy(), executor);
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
+        Consumer<DistributedPrimitive.Status> wrappedListener =
+                status -> executor.execute(() -> listener.accept(status));
+        listenerMap.put(listener, wrappedListener);
+        primitive.addStatusChangeListener(wrappedListener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
+        Consumer<DistributedPrimitive.Status> wrappedListener = listenerMap.remove(listener);
+        if (wrappedListener != null) {
+            primitive.removeStatusChangeListener(wrappedListener);
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
new file mode 100644
index 0000000..5dda17a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+// CHECKSTYLE:OFF
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueStats;
+// CHECKSTYLE:ON
+
+/**
+ * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingWorkQueue<E> extends ExecutingDistributedPrimitive implements WorkQueue<E> {
+    private final WorkQueue<E> delegateQueue;
+    private final Executor executor;
+
+    public ExecutingWorkQueue(WorkQueue<E> delegateQueue, Executor executor) {
+        super(delegateQueue, executor);
+        this.delegateQueue = delegateQueue;
+        this.executor = executor;
+    }
+
+    @Override
+    public CompletableFuture<Void> addMultiple(Collection<E> items) {
+        return Tools.asyncFuture(delegateQueue.addMultiple(items), executor);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Task<E>>> take(int maxItems) {
+        return Tools.asyncFuture(delegateQueue.take(maxItems), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> complete(Collection<String> taskIds) {
+        return Tools.asyncFuture(delegateQueue.complete(taskIds), executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> registerTaskProcessor(
+            Consumer<E> taskProcessor, int parallelism, Executor executor) {
+        return Tools.asyncFuture(
+                delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor),
+                this.executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> stopProcessing() {
+        return Tools.asyncFuture(delegateQueue.stopProcessing(), executor);
+    }
+
+    @Override
+    public CompletableFuture<WorkQueueStats> stats() {
+        return Tools.asyncFuture(delegateQueue.stats(), executor);
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 6dd2322..0358a7c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -39,6 +39,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -57,12 +59,13 @@
     }
 
     @Override
-    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         checkNotNull(name);
         checkNotNull(serializer);
         Map<PartitionId, AsyncConsistentMap<K, V>> maps =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncConsistentMap(name, serializer));
+                                     partition -> partition.newAsyncConsistentMap(name, serializer, executorSupplier));
         Hasher<K> hasher = key -> {
             int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -71,44 +74,46 @@
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name,
-                                                                   Serializer serializer) {
-        return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
+    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncConsistentTreeMap(name, serializer, executorSupplier);
     }
 
     @Override
     public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
-            String name, Serializer serializer) {
-        return getCreator(name).newAsyncConsistentSetMultimap(name,
-                                                              serializer);
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncConsistentSetMultimap(name, serializer, executorSupplier);
     }
 
     @Override
-    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
-        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
+    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
     }
 
     @Override
-    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
-        return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
+    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncAtomicCounterMap(name, serializer, executorSupplier);
     }
 
     @Override
-    public AsyncAtomicCounter newAsyncCounter(String name) {
-        return getCreator(name).newAsyncCounter(name);
+    public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncCounter(name, executorSupplier);
     }
 
     @Override
-    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
-        return getCreator(name).newAsyncAtomicValue(name, serializer);
+    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
     }
 
     @Override
-    public AsyncLeaderElector newAsyncLeaderElector(String name) {
+    public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
         checkNotNull(name);
         Map<PartitionId, AsyncLeaderElector> leaderElectors =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncLeaderElector(name));
+                                     partition -> partition.newAsyncLeaderElector(name, executorSupplier));
         Hasher<String> hasher = topic -> {
             int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -117,13 +122,14 @@
     }
 
     @Override
-    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
-        return getCreator(name).newWorkQueue(name, serializer);
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newWorkQueue(name, serializer, executorSupplier);
     }
 
     @Override
-    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
-        return getCreator(name).newAsyncDocumentTree(name, serializer);
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return getCreator(name).newAsyncDocumentTree(name, serializer, executorSupplier);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3150363..bacda00 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.store.primitives.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.File;
@@ -23,6 +24,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -81,6 +84,9 @@
     private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
     private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
     private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+    private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(),
+            groupedThreads("onos/primitives", "primitive-events", log));
 
     @Activate
     public void activate() {
@@ -93,6 +99,7 @@
                                messagingService,
                                clusterService,
                                CatalystSerializers.getSerializer(),
+                               sharedPrimitiveExecutor,
                                new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
 
         CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index a68b793..e9261c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -24,6 +24,7 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -47,6 +48,7 @@
 
     private final AtomicBoolean isOpened = new AtomicBoolean(false);
     private final Serializer serializer;
+    private final Executor sharedExecutor;
     private final MessagingService messagingService;
     private final ClusterService clusterService;
     private final File logFolder;
@@ -63,12 +65,14 @@
             MessagingService messagingService,
             ClusterService clusterService,
             Serializer serializer,
+            Executor sharedExecutor,
             File logFolder) {
         this.partition = partition;
         this.messagingService = messagingService;
         this.clusterService = clusterService;
         this.localNodeId = clusterService.getLocalNode().id();
         this.serializer = serializer;
+        this.sharedExecutor = sharedExecutor;
         this.logFolder = logFolder;
     }
 
@@ -156,7 +160,8 @@
     private CompletableFuture<StoragePartitionClient> openClient() {
         client = new StoragePartitionClient(this,
                 serializer,
-                new CopycatTransport(partition.getId(), messagingService));
+                new CopycatTransport(partition.getId(), messagingService),
+                sharedExecutor);
         return client.open().thenApply(v -> client);
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 3e80753..83547ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.primitives.impl;
 
-import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import io.atomix.AtomixClient;
 import io.atomix.catalyst.transport.Transport;
@@ -31,6 +30,7 @@
 import io.atomix.resource.ResourceType;
 import io.atomix.variables.DistributedLong;
 import org.onlab.util.HexString;
+import org.onlab.util.SerialExecutor;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
@@ -59,8 +59,10 @@
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -74,10 +76,11 @@
     private final StoragePartition partition;
     private final Transport transport;
     private final io.atomix.catalyst.serializer.Serializer serializer;
+    private final Executor sharedExecutor;
     private AtomixClient client;
     private ResourceClient resourceClient;
     private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
-    private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
+    private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
             Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
                                                           Serializer.using(KryoNamespaces.BASIC)));
     Function<State, Status> mapper = state -> {
@@ -95,10 +98,12 @@
 
     public StoragePartitionClient(StoragePartition partition,
             io.atomix.catalyst.serializer.Serializer serializer,
-            Transport transport) {
+            Transport transport,
+            Executor sharedExecutor) {
         this.partition = partition;
         this.serializer = serializer;
         this.transport = transport;
+        this.sharedExecutor = sharedExecutor;
     }
 
     @Override
@@ -125,14 +130,26 @@
         return client != null ? client.close() : CompletableFuture.completedFuture(null);
     }
 
+    /**
+     * Returns the executor provided by the given supplier or a serial executor if the supplier is {@code null}.
+     *
+     * @param executorSupplier the user-provided executor supplier
+     * @return the executor
+     */
+    private Executor defaultExecutor(Supplier<Executor> executorSupplier) {
+        return executorSupplier != null ? executorSupplier.get() : new SerialExecutor(sharedExecutor);
+    }
+
     @Override
-    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+    public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
         Consumer<State> statusListener = state -> {
             atomixConsistentMap.statusChangeListeners()
                                .forEach(listener -> listener.accept(mapper.apply(state)));
         };
         resourceClient.client().onStateChange(statusListener);
+
         AsyncConsistentMap<String, byte[]> rawMap =
                 new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
                     @Override
@@ -140,17 +157,20 @@
                         return name;
                     }
                 };
-        AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
+
+        // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
+        AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
             key -> HexString.toHexString(serializer.encode(key)),
             string -> serializer.decode(HexString.fromHexString(string)),
             value -> value == null ? null : serializer.encode(value),
             bytes -> serializer.decode(bytes));
 
-        return transcodedMap;
+        return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
+    public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         AtomixConsistentTreeMap atomixConsistentTreeMap =
                 client.getResource(name, AtomixConsistentTreeMap.class).join();
         Consumer<State> statusListener = state -> {
@@ -158,6 +178,7 @@
                     .forEach(listener -> listener.accept(mapper.apply(state)));
         };
         resourceClient.client().onStateChange(statusListener);
+
         AsyncConsistentTreeMap<byte[]> rawMap =
                 new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
                     @Override
@@ -165,17 +186,19 @@
                         return name;
                     }
                 };
+
         AsyncConsistentTreeMap<V> transcodedMap =
                 DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
                     rawMap,
                     value -> value == null ? null : serializer.encode(value),
                     bytes -> serializer.decode(bytes));
-        return transcodedMap;
+
+        return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier));
     }
 
     @Override
     public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
-            String name, Serializer serializer) {
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         AtomixConsistentSetMultimap atomixConsistentSetMultimap =
                 client.getResource(name, AtomixConsistentSetMultimap.class)
                         .join();
@@ -184,6 +207,7 @@
                     .forEach(listener -> listener.accept(mapper.apply(state)));
         };
         resourceClient.client().onStateChange(statusListener);
+
         AsyncConsistentMultimap<String, byte[]> rawMap =
                 new DelegatingAsyncConsistentMultimap<String, byte[]>(
                         atomixConsistentSetMultimap) {
@@ -192,6 +216,7 @@
                         return super.name();
                     }
                 };
+
         AsyncConsistentMultimap<K, V> transcodedMap =
                 DistributedPrimitives.newTranscodingMultimap(
                         rawMap,
@@ -199,53 +224,64 @@
                         string -> serializer.decode(HexString.fromHexString(string)),
                         value -> serializer.encode(value),
                         bytes -> serializer.decode(bytes));
-        return transcodedMap;
 
+        return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
-        return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
+    public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
     }
 
     @Override
-    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+    public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         AtomixAtomicCounterMap atomixAtomicCounterMap =
                 client.getResource(name, AtomixAtomicCounterMap.class)
                         .join();
+
         AsyncAtomicCounterMap<K> transcodedMap =
                 DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
                        atomixAtomicCounterMap,
                         key -> HexString.toHexString(serializer.encode(key)),
                         string -> serializer.decode(HexString.fromHexString(string)));
-        return transcodedMap;
+
+        return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public AsyncAtomicCounter newAsyncCounter(String name) {
+    public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
         DistributedLong distributedLong = client.getLong(name).join();
-        return new AtomixCounter(name, distributedLong);
+        AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong);
+        return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
-       return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
+    public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+       AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
+       return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
-        AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
-        return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+    public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+        AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join();
+        WorkQueue<E> workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
+        return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+            String name, Serializer serializer, Supplier<Executor> executorSupplier) {
         AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
-        return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
+        AsyncDocumentTree<V> asyncDocumentTree = new DefaultDistributedDocumentTree<>(
+                name, atomixDocumentTree, serializer);
+        return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier));
     }
 
     @Override
-    public AsyncLeaderElector newAsyncLeaderElector(String name) {
+    public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
         AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
                                                   .thenCompose(AtomixLeaderElector::setupCache)
                                                   .join();
@@ -254,7 +290,7 @@
                          .forEach(listener -> listener.accept(mapper.apply(state)));
         };
         resourceClient.client().onStateChange(statusListener);
-        return leaderElector;
+        return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier));
     }
 
     @Override
diff --git a/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java
new file mode 100644
index 0000000..936a33f
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor that executes tasks in serial on a shared thread pool, falling back to parallel execution when threads
+ * are blocked.
+ * <p>
+ * This executor attempts to execute tasks in serial as if they occur on a single thread. However, in the event tasks
+ * are blocking a thread (a thread is in the {@link Thread.State#WAITING} or {@link Thread.State#TIMED_WAITING} state)
+ * the executor will execute tasks on parallel on the underlying {@link Executor}. This is useful for ensuring blocked
+ * threads cannot block events, but mimics a single-threaded model otherwise.
+ */
+public class BestEffortSerialExecutor implements Executor {
+    private final Executor parent;
+    private final LinkedList<Runnable> tasks = new LinkedList<>();
+    private volatile Thread thread;
+
+    public BestEffortSerialExecutor(Executor parent) {
+        this.parent = parent;
+    }
+
+    private void run() {
+        synchronized (tasks) {
+            thread = Thread.currentThread();
+        }
+        for (;;) {
+            if (!runTask()) {
+                synchronized (tasks) {
+                    thread = null;
+                }
+                return;
+            }
+        }
+    }
+
+    private boolean runTask() {
+        final Runnable task;
+        synchronized (tasks) {
+            task = tasks.poll();
+            if (task == null) {
+                return false;
+            }
+        }
+        task.run();
+        return true;
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        synchronized (tasks) {
+            tasks.add(command);
+            if (thread == null) {
+                parent.execute(this::run);
+            } else if (thread.getState() == Thread.State.WAITING || thread.getState() == Thread.State.TIMED_WAITING) {
+                parent.execute(this::runTask);
+            }
+        }
+    }
+}
diff --git a/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java
new file mode 100644
index 0000000..9e54ac2
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor that executes tasks in serial on a shared thread pool.
+ * <p>
+ * The serial executor behaves semantically like a single-threaded executor, but multiplexes tasks on a shared thread
+ * pool, ensuring blocked threads in the shared thread pool don't block individual serial executors.
+ */
+public class SerialExecutor implements Executor {
+    private final Executor parent;
+    private final LinkedList<Runnable> tasks = new LinkedList<>();
+    private boolean running;
+
+    public SerialExecutor(Executor parent) {
+        this.parent = parent;
+    }
+
+    private void run() {
+        for (;;) {
+            final Runnable task;
+            synchronized (tasks) {
+                task = tasks.poll();
+                if (task == null) {
+                    running = false;
+                    return;
+                }
+            }
+            task.run();
+        }
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        synchronized (tasks) {
+            tasks.add(command);
+            if (!running) {
+                running = true;
+                parent.execute(this::run);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index cc769c4..883a7c8 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -39,6 +39,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -642,6 +643,32 @@
     }
 
     /**
+     * Returns a future that's completed using the given {@link Executor} once the given {@code future} is completed.
+     * <p>
+     * {@link CompletableFuture}'s async methods cannot be relied upon to complete futures on an executor thread. If a
+     * future is completed synchronously, {@code CompletableFuture} async methods will often complete the future on the
+     * current thread, ignoring the provided {@code Executor}. This method ensures a more reliable and consistent thread
+     * model by ensuring that futures are always completed using the provided {@code Executor}.
+     *
+     * @param future the future to convert into an asynchronous future
+     * @param executor the executor with which to complete the returned future
+     * @param <T> future value type
+     * @return a new completable future to be completed using the provided {@code executor} once the provided
+     * {@code future} is complete
+     */
+    public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
+        CompletableFuture<T> newFuture = new CompletableFuture<T>();
+        future.whenComplete((result, error) -> executor.execute(() -> {
+            if (future.isCompletedExceptionally()) {
+                newFuture.completeExceptionally(error);
+            } else {
+                newFuture.complete(result);
+            }
+        }));
+        return newFuture;
+    }
+
+    /**
      * Returns a new CompletableFuture completed with a list of computed values
      * when all of the given CompletableFuture complete.
      *
diff --git a/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java
new file mode 100644
index 0000000..39a1f87
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Best effort serial executor test.
+ */
+public class BestEffortSerialExecutorTest {
+
+    @Test
+    public void testSerialExecution() throws Throwable {
+        Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor());
+        CountDownLatch latch = new CountDownLatch(2);
+        executor.execute(latch::countDown);
+        executor.execute(latch::countDown);
+        latch.await();
+        assertEquals(0, latch.getCount());
+    }
+
+    @Test
+    public void testBlockedExecution() throws Throwable {
+        Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor());
+        CountDownLatch latch = new CountDownLatch(3);
+        executor.execute(() -> {
+            try {
+                Thread.sleep(2000);
+                latch.countDown();
+            } catch (InterruptedException e) {
+            }
+        });
+        Thread.sleep(10);
+        executor.execute(() -> {
+            try {
+                new CompletableFuture<>().get(2, TimeUnit.SECONDS);
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                latch.countDown();
+            }
+        });
+        Thread.sleep(10);
+        executor.execute(latch::countDown);
+        latch.await(1, TimeUnit.SECONDS);
+        assertEquals(2, latch.getCount());
+        latch.await(3, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
+
+}
diff --git a/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java
new file mode 100644
index 0000000..140ce1f
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Serial executor test.
+ */
+public class SerialExecutorTest {
+
+    @Test
+    public void testSerialExecution() throws Throwable {
+        Executor executor = new SerialExecutor(SharedExecutors.getPoolThreadExecutor());
+        CountDownLatch latch = new CountDownLatch(2);
+        executor.execute(latch::countDown);
+        executor.execute(latch::countDown);
+        latch.await();
+        assertEquals(0, latch.getCount());
+    }
+
+}