Added DistributedPrimitive interface
Added AsyncDistributedSet that provides async set operations

Change-Id: I83494075a7973694ea6b7445ff4799b7a1a50641
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
index 0631f86..e50ece1 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
@@ -1,114 +1,120 @@
-/*

- * Copyright 2015 Open Networking Laboratory

- *

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * You may obtain a copy of the License at

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-package org.onosproject.vtnrsc.util;

-

-import java.util.Collection;

-import java.util.Map;

-import java.util.Set;

-import java.util.function.BiFunction;

-

-import org.onosproject.store.service.EventuallyConsistentMap;

-import org.onosproject.store.service.EventuallyConsistentMapListener;

-

-/**

- * Testing adapter for EventuallyConsistentMap.

- */

-public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {

-    @Override

-    public int size() {

-        return 0;

-    }

-

-    @Override

-    public boolean isEmpty() {

-        return false;

-    }

-

-    @Override

-    public boolean containsKey(K key) {

-        return false;

-    }

-

-    @Override

-    public boolean containsValue(V value) {

-        return false;

-    }

-

-    @Override

-    public V get(K key) {

-        return null;

-    }

-

-    @Override

-    public void put(K key, V value) {

-

-    }

-

-    @Override

-    public V remove(K key) {

-        return null;

-    }

-

-    @Override

-    public void remove(K key, V value) {

-

-    }

-

-    @Override

-    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {

-        return null;

-    }

-

-    @Override

-    public void putAll(Map<? extends K, ? extends V> m) {

-

-    }

-

-    @Override

-    public void clear() {

-

-    }

-

-    @Override

-    public Set<K> keySet() {

-        return null;

-    }

-

-    @Override

-    public Collection<V> values() {

-        return null;

-    }

-

-    @Override

-    public Set<Map.Entry<K, V>> entrySet() {

-        return null;

-    }

-

-    @Override

-    public void addListener(EventuallyConsistentMapListener<K, V> listener) {

-

-    }

-

-    @Override

-    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {

-

-    }

-

-    @Override

-    public void destroy() {

-

-    }

-}

+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.vtnrsc.util;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+
+/**
+ * Testing adapter for EventuallyConsistentMap.
+ */
+public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return false;
+    }
+
+    @Override
+    public boolean containsKey(K key) {
+        return false;
+    }
+
+    @Override
+    public boolean containsValue(V value) {
+        return false;
+    }
+
+    @Override
+    public V get(K key) {
+        return null;
+    }
+
+    @Override
+    public void put(K key, V value) {
+
+    }
+
+    @Override
+    public V remove(K key) {
+        return null;
+    }
+
+    @Override
+    public void remove(K key, V value) {
+
+    }
+
+    @Override
+    public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
+        return null;
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return null;
+    }
+
+    @Override
+    public Collection<V> values() {
+        return null;
+    }
+
+    @Override
+    public Set<Map.Entry<K, V>> entrySet() {
+        return null;
+    }
+
+    @Override
+    public void addListener(EventuallyConsistentMapListener<K, V> listener) {
+
+    }
+
+    @Override
+    public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
+
+    }
+
+    @Override
+    public void destroy() {
+
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
index cfaf314..aa20f4e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -20,7 +20,12 @@
 /**
  * An async atomic counter dispenses monotonically increasing values.
  */
-public interface AsyncAtomicCounter {
+public interface AsyncAtomicCounter extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.COUNTER;
+    }
 
     /**
      * Atomically increment by one the current value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
index 531721a..60d8337 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
@@ -27,7 +27,12 @@
  *
  * @param <V> value type
  */
-public interface AsyncAtomicValue<V> {
+public interface AsyncAtomicValue<V> extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.VALUE;
+    }
 
     /**
      * Atomically sets the value to the given updated value if the current value is equal to the expected value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index c7b6eac..d83c553 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -49,7 +49,12 @@
  * the returned future will be {@link CompletableFuture#complete completed} when the
  * operation finishes.
  */
-public interface AsyncConsistentMap<K, V> {
+public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.CONSISTENT_MAP;
+    }
 
     /**
      * Returns the number of entries in the map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java
new file mode 100644
index 0000000..df63f99
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ * <p>
+ * All methods of {@code AsyncDistributedSet} immediately return a {@link CompletableFuture future}.
+ * The returned future will be {@link CompletableFuture#complete completed} when the operation
+ * completes.
+ *
+ * @param <E> set entry type
+ */
+public interface AsyncDistributedSet<E> extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.SET;
+    }
+
+    /**
+     * Registers the specified listener to be notified whenever
+     * the set is updated.
+     *
+     * @param listener listener to notify about set update events
+     */
+    CompletableFuture<Void> addListener(SetEventListener<E> listener);
+
+    /**
+     * Unregisters the specified listener.
+     *
+     * @param listener listener to unregister.
+     * @return CompletableFuture that is completed when the operation completes
+     */
+    CompletableFuture<Void> removeListener(SetEventListener<E> listener);
+
+    /**
+     * Adds the specified element to this set if it is not already present (optional operation).
+     * @param element element to add
+     * @return {@code true} if this set did not already contain the specified element.
+     */
+    CompletableFuture<Boolean> add(E element);
+
+    /**
+     * Removes the specified element to this set if it is present (optional operation).
+     * @param element element to remove
+     * @return {@code true} if this set contained the specified element
+     */
+    CompletableFuture<Boolean> remove(E element);
+
+    /**
+     * Returns the number of elements in the set.
+     * @return size of the set
+     */
+    CompletableFuture<Integer> size();
+
+    /**
+     * Returns if the set is empty.
+     * @return {@code true} if this set is empty
+     */
+    CompletableFuture<Boolean> isEmpty();
+
+    /**
+     * Removes all elements from the set.
+     */
+    CompletableFuture<Void> clear();
+
+    /**
+     * Returns if this set contains the specified element.
+     * @param element element to check
+     * @return {@code true} if this set contains the specified element
+     */
+    CompletableFuture<Boolean> contains(E element);
+
+    /**
+     * Adds all of the elements in the specified collection to this set if they're not
+     * already present (optional operation).
+     * @param c collection containing elements to be added to this set
+     * @return {@code true} if this set contains all elements in the collection
+     */
+    CompletableFuture<Boolean> addAll(Collection<? extends E> c);
+
+    /**
+     * Returns if this set contains all the elements in specified collection.
+     * @param c collection
+     * @return {@code true} if this set contains all elements in the collection
+     */
+    CompletableFuture<Boolean> containsAll(Collection<? extends E> c);
+
+    /**
+     * Retains only the elements in this set that are contained in the specified collection (optional operation).
+     * @param c collection containing elements to be retained in this set
+     * @return {@code true} if this set changed as a result of the call
+     */
+    CompletableFuture<Boolean> retainAll(Collection<? extends E> c);
+
+    /**
+     * Removes from this set all of its elements that are contained in the specified collection (optional operation).
+     * If the specified collection is also a set, this operation effectively modifies this set so that its
+     * value is the asymmetric set difference of the two sets.
+     * @param c collection containing elements to be removed from this set
+     * @return {@code true} if this set changed as a result of the call
+     */
+    CompletableFuture<Boolean> removeAll(Collection<? extends E> c);
+
+    /**
+     * Returns the entries as a immutable set. The returned set is a snapshot and will not reflect new changes made to
+     * this AsyncDistributedSet
+     * @return immutable set copy
+     */
+    CompletableFuture<? extends Set<E>> getAsImmutableSet();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
index 3c9e02c..592f67a0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
@@ -18,7 +18,12 @@
 /**
  * Distributed version of java.util.concurrent.atomic.AtomicLong.
  */
-public interface AtomicCounter {
+public interface AtomicCounter extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.COUNTER;
+    }
 
     /**
      * Atomically increment by one the current value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
index dfa0fb3..bb9b56b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
@@ -20,7 +20,7 @@
  *
  * @param <V> value type
  */
-public interface AtomicValue<V> {
+public interface AtomicValue<V> extends DistributedPrimitive  {
 
     /**
      * Atomically sets the value to the given updated value if the current value is equal to the expected value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 93abf78..19f8954 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -44,7 +44,7 @@
  * (which extends RuntimeException) to indicate failures.
  *
  */
-public interface ConsistentMap<K, V> {
+public interface ConsistentMap<K, V> extends DistributedPrimitive {
 
     /**
      * Returns the number of entries in the map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
new file mode 100644
index 0000000..342a110
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.onosproject.core.ApplicationId;
+
+/**
+ * Interface for all distributed primitives.
+ */
+public interface DistributedPrimitive {
+
+    /**
+     * Type of distributed primitive.
+     */
+    public enum Type {
+        /**
+         * Map with strong consistency semantics.
+         */
+        CONSISTENT_MAP,
+
+        /**
+         * Map with eventual consistency semantics.
+         */
+        EVENTUALLY_CONSISTENT_MAP,
+
+        /**
+         * distributed set.
+         */
+        SET,
+
+        /**
+         * atomic counter.
+         */
+        COUNTER,
+
+        /**
+         * Atomic value.
+         */
+        VALUE,
+
+        /**
+         * Distributed queue.
+         */
+        QUEUE
+    }
+
+    /**
+     * Returns the name of this primitive.
+     * @return name
+     */
+    String name();
+
+    /**
+     * Returns the type of primitive.
+     * @return primitive type
+     */
+    Type type();
+
+    /**
+     * Returns the application owning this primitive.
+     */
+    default ApplicationId applicationId() {
+        return null;
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
index cc0b00d..777152c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
@@ -24,7 +24,7 @@
  *
  * @param <E> queue entry type
  */
-public interface DistributedQueue<E> {
+public interface DistributedQueue<E> extends DistributedPrimitive {
 
     /**
      * Returns total number of entries in the queue.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
index 460206e..c59462b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -22,7 +22,7 @@
  *
  * @param <E> set entry type
  */
-public interface DistributedSet<E> extends Set<E> {
+public interface DistributedSet<E> extends Set<E>, DistributedPrimitive {
 
     /**
      * Registers the specified listener to be notified whenever
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index f5a44c9..09ba794 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -129,4 +129,13 @@
      * @throws java.lang.RuntimeException if a mandatory parameter is missing
      */
     DistributedSet<E> build();
+
+    /**
+     * Builds an {@link AsyncDistributedSet async set} based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new AsyncDistributedSet
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    AsyncDistributedSet<E> buildAsyncSet();
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 06395b8..2369619 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -39,7 +39,12 @@
  * Null values are not allowed in this map.
  * </p>
  */
-public interface EventuallyConsistentMap<K, V> {
+public interface EventuallyConsistentMap<K, V> extends DistributedPrimitive {
+
+    @Override
+    default DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.EVENTUALLY_CONSISTENT_MAP;
+    }
 
     /**
      * Returns the number of key-value mappings in this map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/Synchronous.java b/core/api/src/main/java/org/onosproject/store/service/Synchronous.java
new file mode 100644
index 0000000..be65382
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Synchronous.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * DistributedPrimitive that is a synchronous (blocking) version of
+ * another.
+ *
+ * @param <T> type of DistributedPrimitive
+ */
+public abstract class Synchronous<T extends DistributedPrimitive> implements DistributedPrimitive {
+
+    private final T primitive;
+
+    public Synchronous(T primitive) {
+        this.primitive = primitive;
+    }
+
+    @Override
+    public String name() {
+        return primitive.name();
+    }
+
+    @Override
+    public Type type() {
+        return primitive.type();
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index a7a6ce8..ba9cfc2 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -26,6 +26,17 @@
  * Testing adapter for the consistent map.
  */
 public class ConsistentMapAdapter<K, V> implements ConsistentMap<K, V> {
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.CONSISTENT_MAP;
+    }
+
     @Override
     public int size() {
         return 0;
diff --git a/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
index 07f5fb4..1f4af9e 100644
--- a/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
@@ -20,10 +20,23 @@
 import java.util.Set;
 import java.util.function.BiFunction;
 
+import org.onosproject.store.service.DistributedPrimitive.Type;
+
 /**
  * Testing adapter for EventuallyConsistentMap.
  */
 public class EventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public Type type() {
+        return Type.EVENTUALLY_CONSISTENT_MAP;
+    }
+
     @Override
     public int size() {
         return 0;
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
index 8c577df..337c090 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
@@ -23,6 +23,16 @@
 public final class TestAtomicCounter implements AtomicCounter {
     final AtomicLong value;
 
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public Type type() {
+        return Type.COUNTER;
+    }
+
     private TestAtomicCounter() {
         value = new AtomicLong();
     }
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
index 57b36ab..4133095 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
@@ -28,7 +28,7 @@
 import java.util.stream.Collectors;
 
 import org.onosproject.core.ApplicationId;
-import static org.onosproject.store.service.MapEvent.Type;
+
 import static org.onosproject.store.service.MapEvent.Type.*;
 
 /**
@@ -37,7 +37,7 @@
 public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
 
     private final List<MapEventListener<K, V>> listeners;
-    private final HashMap<K, V> map;
+    private final Map<K, V> map;
     private final String mapName;
     private final AtomicLong counter = new AtomicLong(0);
 
@@ -54,7 +54,7 @@
     /**
      * Notify all listeners of an event.
      */
-    private void notifyListeners(String mapName, Type type,
+    private void notifyListeners(String mapName, MapEvent.Type type,
                                  K key, Versioned<V> value) {
         MapEvent<K, V> event = new MapEvent<>(mapName, type, key, value);
         listeners.forEach(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index d851eaa..e6972c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
     public CompletableFuture<Long> incrementAndGet() {
         final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
         return addAndGet(1L)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
index 454d46c..c223565 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
@@ -55,6 +55,11 @@
     }
 
     @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
         final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
         CompletableFuture<Boolean> response;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 46a097c..b99bfbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -171,6 +171,7 @@
      * Returns this map name.
      * @return map name
      */
+    @Override
     public String name() {
         return name;
     }
@@ -187,6 +188,7 @@
      * Returns the applicationId owning this map.
      * @return application Id
      */
+    @Override
     public ApplicationId applicationId() {
         return applicationId;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
new file mode 100644
index 0000000..d67b635
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncDistributedSet}.
+ *
+ * @param <E> set entry type
+ */
+public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
+
+    private static final String CONTAINS = "contains";
+    private static final String PRIMITIVE_NAME = "distributedSet";
+    private static final String SIZE = "size";
+    private static final String IS_EMPTY = "isEmpty";
+    private static final String ADD = "add";
+    private static final String REMOVE = "remove";
+    private static final String CONTAINS_ALL = "containsAll";
+    private static final String ADD_ALL = "addAll";
+    private static final String RETAIN_ALL = "retainAll";
+    private static final String REMOVE_ALL = "removeAll";
+    private static final String CLEAR = "clear";
+    private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
+
+    private final String name;
+    private final AsyncConsistentMap<E, Boolean> backingMap;
+    private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
+    private final MeteringAgent monitor;
+
+    public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
+        this.backingMap = backingMap;
+        this.name = name;
+        monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        final MeteringAgent.Context timer = monitor.startTimer(SIZE);
+        return backingMap.size().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
+        return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> contains(E element) {
+        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
+        return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> add(E entry) {
+        final MeteringAgent.Context timer = monitor.startTimer(ADD);
+        return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(E entry) {
+        final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
+        return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
+        return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
+        return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
+        return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
+                                  .thenCompose(this::removeAll)
+                                  .whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
+        return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
+        return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
+        final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
+        return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+            if (mapEvent.type() == MapEvent.Type.INSERT) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+            }
+        };
+        if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+            return backingMap.addListener(mapEventListener);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+        if (mapEventListener != null) {
+            return backingMap.removeListener(mapEventListener);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 2d6a956..00a98a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -18,6 +18,7 @@
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -30,16 +31,15 @@
  * <p>
  * The initial value will be zero.
  */
-public class DefaultAtomicCounter implements AtomicCounter {
+public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter  {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
 
     private final AsyncAtomicCounter asyncCounter;
 
-    public DefaultAtomicCounter(String name,
-                                Database database,
-                                boolean meteringEnabled) {
-        asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
+    public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
+        super(asyncCounter);
+        this.asyncCounter = asyncCounter;
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
index dba4443..6596f36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
@@ -53,9 +53,7 @@
 
     @Override
     public AtomicCounter build() {
-        validateInputs();
-        Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
-        return new DefaultAtomicCounter(name, database, metering);
+        return new DefaultAtomicCounter(buildAsyncCounter());
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
index 20bfd5f9..f61d330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
@@ -17,10 +17,12 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValue;
 import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
 
 import com.google.common.util.concurrent.Futures;
 
@@ -29,12 +31,13 @@
  *
  * @param <V> value type
  */
-public class DefaultAtomicValue<V> implements AtomicValue<V> {
+public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
     private final AsyncAtomicValue<V> asyncValue;
 
     public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
+        super(asyncValue);
         this.asyncValue = asyncValue;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7841c16..dd8a5a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,9 +28,11 @@
 import java.util.function.Predicate;
 import java.util.Set;
 
+import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Synchronous;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -40,18 +42,15 @@
  * @param <K> type of key.
  * @param <V> type of value.
  */
-public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
+public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
 
     private final DefaultAsyncConsistentMap<K, V> asyncMap;
     private Map<K, V> javaMap;
 
-    public String name() {
-        return asyncMap.name();
-    }
-
     public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+        super(asyncMap);
         this.asyncMap = asyncMap;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index 5f69fde..edcd26a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -19,12 +19,14 @@
 import com.google.common.util.concurrent.Futures;
 
 import org.onlab.util.SharedExecutors;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
 
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
 
@@ -108,10 +110,16 @@
                                             .whenComplete((r, e) -> timer.stop(e)));
     }
 
+    @Override
     public String name() {
         return name;
     }
 
+    @Override
+    public DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.QUEUE;
+    }
+
     protected void tryPoll() {
         Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
         for (CompletableFuture<E> future : pendingFutures) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index 42ed615..5092892 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -15,227 +15,138 @@
  */
 package org.onosproject.store.consistent.impl;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.SetEvent;
 import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.Synchronous;
 
 import java.lang.reflect.Array;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * Implementation of distributed set that is backed by a ConsistentMap.
+ * Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
+ * and waits for the operation to complete.
 
  * @param <E> set element type
  */
-public class DefaultDistributedSet<E> implements DistributedSet<E> {
+public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
 
-    private static final String CONTAINS = "contains";
-    private static final String PRIMITIVE_NAME = "distributedSet";
-    private static final String SIZE = "size";
-    private static final String IS_EMPTY = "isEmpty";
-    private static final String ITERATOR = "iterator";
-    private static final String TO_ARRAY = "toArray";
-    private static final String ADD = "add";
-    private static final String REMOVE = "remove";
-    private static final String CONTAINS_ALL = "containsAll";
-    private static final String ADD_ALL = "addAll";
-    private static final String RETAIN_ALL = "retainAll";
-    private static final String REMOVE_ALL = "removeAll";
-    private static final String CLEAR = "clear";
+    private static final long OPERATION_TIMEOUT_MILLIS = 5000;
 
-    private final String name;
-    private final ConsistentMap<E, Boolean> backingMap;
-    private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
-    private final MeteringAgent monitor;
+    private final AsyncDistributedSet<E> asyncSet;
 
-    public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
-        this.name = name;
-        this.backingMap = backingMap;
-        monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+    public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
+        super(asyncSet);
+        this.asyncSet = asyncSet;
+    }
+
+    private static <T> T complete(CompletableFuture<T> future) {
+        try {
+            return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ConsistentMapException.Interrupted();
+        } catch (TimeoutException e) {
+            throw new ConsistentMapException.Timeout();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof ConsistentMapException) {
+                throw (ConsistentMapException) e.getCause();
+            } else {
+                throw new ConsistentMapException(e.getCause());
+            }
+        }
     }
 
     @Override
     public int size() {
-        final MeteringAgent.Context timer = monitor.startTimer(SIZE);
-        try {
-            return backingMap.size();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.size());
     }
 
     @Override
     public boolean isEmpty() {
-        final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
-        try {
-            return backingMap.isEmpty();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.isEmpty());
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public boolean contains(Object o) {
-        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
-        try {
-            return backingMap.containsKey((E) o);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.contains((E) o));
     }
 
     @Override
     public Iterator<E> iterator() {
-        final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
-        //Do we have to measure this guy?
-        try {
-            return backingMap.keySet().iterator();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.getAsImmutableSet()).iterator();
     }
 
     @Override
     public Object[] toArray() {
-        final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
-        try {
-            return backingMap.keySet().stream().toArray();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.getAsImmutableSet()).stream().toArray();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <T> T[] toArray(T[] a) {
-        final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
-        try {
-            // TODO: Optimize this to only allocate a new array if the set size
-            // is larger than the array.length. If the set size is smaller than
-            // the array.length then copy the data into the array and set the
-            // last element in the array to be null.
-            final T[] resizedArray =
-                    (T[]) Array.newInstance(a.getClass().getComponentType(), backingMap.keySet().size());
-            return (T[]) backingMap.keySet().toArray(resizedArray);
-        } finally {
-            timer.stop(null);
-        }
+        // TODO: Optimize this to only allocate a new array if the set size
+        // is larger than the array.length. If the set size is smaller than
+        // the array.length then copy the data into the array and set the
+        // last element in the array to be null.
+        final T[] resizedArray =
+                (T[]) Array.newInstance(a.getClass().getComponentType(), complete(asyncSet.getAsImmutableSet()).size());
+        return complete(asyncSet.getAsImmutableSet()).toArray(resizedArray);
     }
 
     @Override
     public boolean add(E e) {
-        final MeteringAgent.Context timer = monitor.startTimer(ADD);
-        try {
-            return backingMap.putIfAbsent(e, true) == null;
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.add(e));
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public boolean remove(Object o) {
-        final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
-        try {
-            return backingMap.remove((E) o) != null;
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.remove((E) o));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean containsAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
-        try {
-           return c.stream()
-                .allMatch(this::contains);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.containsAll((Collection<? extends E>) c));
     }
 
     @Override
     public boolean addAll(Collection<? extends E> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
-        try {
-            return c.stream()
-                .map(this::add)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.addAll(c));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean retainAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
-        try {
-            Set<?> retainSet = Sets.newHashSet(c);
-            return backingMap.keySet()
-                .stream()
-                .filter(k -> !retainSet.contains(k))
-                .map(this::remove)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.retainAll((Collection<? extends E>) c));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean removeAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
-        try {
-            Set<?> removeSet = Sets.newHashSet(c);
-            return backingMap.keySet()
-                .stream()
-                .filter(removeSet::contains)
-                .map(this::remove)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.removeAll((Collection<? extends E>) c));
     }
 
     @Override
     public void clear() {
-        final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
-        try {
-            backingMap.clear();
-        } finally {
-            timer.stop(null);
-        }
+        complete(asyncSet.clear());
     }
 
     @Override
     public void addListener(SetEventListener<E> listener) {
-        MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
-            if (mapEvent.type() == MapEvent.Type.INSERT) {
-                listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
-            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
-                listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
-            }
-        };
-        if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
-            backingMap.addListener(mapEventListener);
-        }
+        complete(asyncSet.addListener(listener));
     }
 
     @Override
     public void removeListener(SetEventListener<E> listener) {
-        MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
-        if (mapEventListener != null) {
-            backingMap.removeListener(mapEventListener);
-        }
+        complete(asyncSet.removeListener(listener));
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
index f7957f3..92e3e9a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.consistent.impl;
 
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.Serializer;
@@ -88,6 +89,11 @@
 
     @Override
     public DistributedSet<E> build() {
-        return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
+        return new DefaultDistributedSet<E>(buildAsyncSet());
+    }
+
+    @Override
+    public AsyncDistributedSet<E> buildAsyncSet() {
+        return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b5ea52e..649e5a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -273,6 +273,11 @@
     }
 
     @Override
+    public String name() {
+        return mapName;
+    }
+
+    @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
         // TODO: Maintain a separate counter for tracking live elements in map.