Added DistributedPrimitive interface
Added AsyncDistributedSet that provides async set operations

Change-Id: I83494075a7973694ea6b7445ff4799b7a1a50641
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index d851eaa..e6972c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -51,6 +51,11 @@
     }
 
     @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
     public CompletableFuture<Long> incrementAndGet() {
         final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
         return addAndGet(1L)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
index 454d46c..c223565 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
@@ -55,6 +55,11 @@
     }
 
     @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
     public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
         final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
         CompletableFuture<Boolean> response;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 46a097c..b99bfbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -171,6 +171,7 @@
      * Returns this map name.
      * @return map name
      */
+    @Override
     public String name() {
         return name;
     }
@@ -187,6 +188,7 @@
      * Returns the applicationId owning this map.
      * @return application Id
      */
+    @Override
     public ApplicationId applicationId() {
         return applicationId;
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
new file mode 100644
index 0000000..d67b635
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncDistributedSet}.
+ *
+ * @param <E> set entry type
+ */
+public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
+
+    private static final String CONTAINS = "contains";
+    private static final String PRIMITIVE_NAME = "distributedSet";
+    private static final String SIZE = "size";
+    private static final String IS_EMPTY = "isEmpty";
+    private static final String ADD = "add";
+    private static final String REMOVE = "remove";
+    private static final String CONTAINS_ALL = "containsAll";
+    private static final String ADD_ALL = "addAll";
+    private static final String RETAIN_ALL = "retainAll";
+    private static final String REMOVE_ALL = "removeAll";
+    private static final String CLEAR = "clear";
+    private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
+
+    private final String name;
+    private final AsyncConsistentMap<E, Boolean> backingMap;
+    private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
+    private final MeteringAgent monitor;
+
+    public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
+        this.backingMap = backingMap;
+        this.name = name;
+        monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        final MeteringAgent.Context timer = monitor.startTimer(SIZE);
+        return backingMap.size().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
+        return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> contains(E element) {
+        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
+        return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> add(E entry) {
+        final MeteringAgent.Context timer = monitor.startTimer(ADD);
+        return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(E entry) {
+        final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
+        return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
+        return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
+        return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
+        return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
+                                  .thenCompose(this::removeAll)
+                                  .whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
+        final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
+        return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
+            v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
+        return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
+        final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
+        return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+            if (mapEvent.type() == MapEvent.Type.INSERT) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+                listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+            }
+        };
+        if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+            return backingMap.addListener(mapEventListener);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
+        MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+        if (mapEventListener != null) {
+            return backingMap.removeListener(mapEventListener);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 2d6a956..00a98a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -18,6 +18,7 @@
 import org.onosproject.store.service.AsyncAtomicCounter;
 import org.onosproject.store.service.AtomicCounter;
 import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -30,16 +31,15 @@
  * <p>
  * The initial value will be zero.
  */
-public class DefaultAtomicCounter implements AtomicCounter {
+public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter  {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
 
     private final AsyncAtomicCounter asyncCounter;
 
-    public DefaultAtomicCounter(String name,
-                                Database database,
-                                boolean meteringEnabled) {
-        asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
+    public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
+        super(asyncCounter);
+        this.asyncCounter = asyncCounter;
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
index dba4443..6596f36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
@@ -53,9 +53,7 @@
 
     @Override
     public AtomicCounter build() {
-        validateInputs();
-        Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
-        return new DefaultAtomicCounter(name, database, metering);
+        return new DefaultAtomicCounter(buildAsyncCounter());
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
index 20bfd5f9..f61d330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
@@ -17,10 +17,12 @@
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AtomicValue;
 import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
 
 import com.google.common.util.concurrent.Futures;
 
@@ -29,12 +31,13 @@
  *
  * @param <V> value type
  */
-public class DefaultAtomicValue<V> implements AtomicValue<V> {
+public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
     private final AsyncAtomicValue<V> asyncValue;
 
     public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
+        super(asyncValue);
         this.asyncValue = asyncValue;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7841c16..dd8a5a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,9 +28,11 @@
 import java.util.function.Predicate;
 import java.util.Set;
 
+import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Synchronous;
 import org.onosproject.store.service.Versioned;
 
 /**
@@ -40,18 +42,15 @@
  * @param <K> type of key.
  * @param <V> type of value.
  */
-public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
+public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
 
     private static final int OPERATION_TIMEOUT_MILLIS = 5000;
 
     private final DefaultAsyncConsistentMap<K, V> asyncMap;
     private Map<K, V> javaMap;
 
-    public String name() {
-        return asyncMap.name();
-    }
-
     public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+        super(asyncMap);
         this.asyncMap = asyncMap;
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index 5f69fde..edcd26a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -19,12 +19,14 @@
 import com.google.common.util.concurrent.Futures;
 
 import org.onlab.util.SharedExecutors;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
 
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
 
@@ -108,10 +110,16 @@
                                             .whenComplete((r, e) -> timer.stop(e)));
     }
 
+    @Override
     public String name() {
         return name;
     }
 
+    @Override
+    public DistributedPrimitive.Type type() {
+        return DistributedPrimitive.Type.QUEUE;
+    }
+
     protected void tryPoll() {
         Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
         for (CompletableFuture<E> future : pendingFutures) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index 42ed615..5092892 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -15,227 +15,138 @@
  */
 package org.onosproject.store.consistent.impl;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.SetEvent;
 import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.Synchronous;
 
 import java.lang.reflect.Array;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * Implementation of distributed set that is backed by a ConsistentMap.
+ * Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
+ * and waits for the operation to complete.
 
  * @param <E> set element type
  */
-public class DefaultDistributedSet<E> implements DistributedSet<E> {
+public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
 
-    private static final String CONTAINS = "contains";
-    private static final String PRIMITIVE_NAME = "distributedSet";
-    private static final String SIZE = "size";
-    private static final String IS_EMPTY = "isEmpty";
-    private static final String ITERATOR = "iterator";
-    private static final String TO_ARRAY = "toArray";
-    private static final String ADD = "add";
-    private static final String REMOVE = "remove";
-    private static final String CONTAINS_ALL = "containsAll";
-    private static final String ADD_ALL = "addAll";
-    private static final String RETAIN_ALL = "retainAll";
-    private static final String REMOVE_ALL = "removeAll";
-    private static final String CLEAR = "clear";
+    private static final long OPERATION_TIMEOUT_MILLIS = 5000;
 
-    private final String name;
-    private final ConsistentMap<E, Boolean> backingMap;
-    private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
-    private final MeteringAgent monitor;
+    private final AsyncDistributedSet<E> asyncSet;
 
-    public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
-        this.name = name;
-        this.backingMap = backingMap;
-        monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+    public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
+        super(asyncSet);
+        this.asyncSet = asyncSet;
+    }
+
+    private static <T> T complete(CompletableFuture<T> future) {
+        try {
+            return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ConsistentMapException.Interrupted();
+        } catch (TimeoutException e) {
+            throw new ConsistentMapException.Timeout();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof ConsistentMapException) {
+                throw (ConsistentMapException) e.getCause();
+            } else {
+                throw new ConsistentMapException(e.getCause());
+            }
+        }
     }
 
     @Override
     public int size() {
-        final MeteringAgent.Context timer = monitor.startTimer(SIZE);
-        try {
-            return backingMap.size();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.size());
     }
 
     @Override
     public boolean isEmpty() {
-        final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
-        try {
-            return backingMap.isEmpty();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.isEmpty());
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public boolean contains(Object o) {
-        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
-        try {
-            return backingMap.containsKey((E) o);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.contains((E) o));
     }
 
     @Override
     public Iterator<E> iterator() {
-        final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
-        //Do we have to measure this guy?
-        try {
-            return backingMap.keySet().iterator();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.getAsImmutableSet()).iterator();
     }
 
     @Override
     public Object[] toArray() {
-        final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
-        try {
-            return backingMap.keySet().stream().toArray();
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.getAsImmutableSet()).stream().toArray();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <T> T[] toArray(T[] a) {
-        final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
-        try {
-            // TODO: Optimize this to only allocate a new array if the set size
-            // is larger than the array.length. If the set size is smaller than
-            // the array.length then copy the data into the array and set the
-            // last element in the array to be null.
-            final T[] resizedArray =
-                    (T[]) Array.newInstance(a.getClass().getComponentType(), backingMap.keySet().size());
-            return (T[]) backingMap.keySet().toArray(resizedArray);
-        } finally {
-            timer.stop(null);
-        }
+        // TODO: Optimize this to only allocate a new array if the set size
+        // is larger than the array.length. If the set size is smaller than
+        // the array.length then copy the data into the array and set the
+        // last element in the array to be null.
+        final T[] resizedArray =
+                (T[]) Array.newInstance(a.getClass().getComponentType(), complete(asyncSet.getAsImmutableSet()).size());
+        return complete(asyncSet.getAsImmutableSet()).toArray(resizedArray);
     }
 
     @Override
     public boolean add(E e) {
-        final MeteringAgent.Context timer = monitor.startTimer(ADD);
-        try {
-            return backingMap.putIfAbsent(e, true) == null;
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.add(e));
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public boolean remove(Object o) {
-        final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
-        try {
-            return backingMap.remove((E) o) != null;
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.remove((E) o));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean containsAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
-        try {
-           return c.stream()
-                .allMatch(this::contains);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.containsAll((Collection<? extends E>) c));
     }
 
     @Override
     public boolean addAll(Collection<? extends E> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
-        try {
-            return c.stream()
-                .map(this::add)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.addAll(c));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean retainAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
-        try {
-            Set<?> retainSet = Sets.newHashSet(c);
-            return backingMap.keySet()
-                .stream()
-                .filter(k -> !retainSet.contains(k))
-                .map(this::remove)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.retainAll((Collection<? extends E>) c));
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public boolean removeAll(Collection<?> c) {
-        final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
-        try {
-            Set<?> removeSet = Sets.newHashSet(c);
-            return backingMap.keySet()
-                .stream()
-                .filter(removeSet::contains)
-                .map(this::remove)
-                .reduce(Boolean::logicalOr)
-                .orElse(false);
-        } finally {
-            timer.stop(null);
-        }
+        return complete(asyncSet.removeAll((Collection<? extends E>) c));
     }
 
     @Override
     public void clear() {
-        final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
-        try {
-            backingMap.clear();
-        } finally {
-            timer.stop(null);
-        }
+        complete(asyncSet.clear());
     }
 
     @Override
     public void addListener(SetEventListener<E> listener) {
-        MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
-            if (mapEvent.type() == MapEvent.Type.INSERT) {
-                listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
-            } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
-                listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
-            }
-        };
-        if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
-            backingMap.addListener(mapEventListener);
-        }
+        complete(asyncSet.addListener(listener));
     }
 
     @Override
     public void removeListener(SetEventListener<E> listener) {
-        MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
-        if (mapEventListener != null) {
-            backingMap.removeListener(mapEventListener);
-        }
+        complete(asyncSet.removeListener(listener));
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
index f7957f3..92e3e9a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -16,6 +16,7 @@
 package org.onosproject.store.consistent.impl;
 
 import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.Serializer;
@@ -88,6 +89,11 @@
 
     @Override
     public DistributedSet<E> build() {
-        return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
+        return new DefaultDistributedSet<E>(buildAsyncSet());
+    }
+
+    @Override
+    public AsyncDistributedSet<E> buildAsyncSet() {
+        return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b5ea52e..649e5a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -273,6 +273,11 @@
     }
 
     @Override
+    public String name() {
+        return mapName;
+    }
+
+    @Override
     public int size() {
         checkState(!destroyed, destroyedMessage);
         // TODO: Maintain a separate counter for tracking live elements in map.