Added DistributedPrimitive interface
Added AsyncDistributedSet that provides async set operations
Change-Id: I83494075a7973694ea6b7445ff4799b7a1a50641
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index d851eaa..e6972c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -51,6 +51,11 @@
}
@Override
+ public String name() {
+ return name;
+ }
+
+ @Override
public CompletableFuture<Long> incrementAndGet() {
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
index 454d46c..c223565 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
@@ -55,6 +55,11 @@
}
@Override
+ public String name() {
+ return name;
+ }
+
+ @Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
CompletableFuture<Boolean> response;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 46a097c..b99bfbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -171,6 +171,7 @@
* Returns this map name.
* @return map name
*/
+ @Override
public String name() {
return name;
}
@@ -187,6 +188,7 @@
* Returns the applicationId owning this map.
* @return application Id
*/
+ @Override
public ApplicationId applicationId() {
return applicationId;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
new file mode 100644
index 0000000..d67b635
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncDistributedSet}.
+ *
+ * @param <E> set entry type
+ */
+public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
+
+ private static final String CONTAINS = "contains";
+ private static final String PRIMITIVE_NAME = "distributedSet";
+ private static final String SIZE = "size";
+ private static final String IS_EMPTY = "isEmpty";
+ private static final String ADD = "add";
+ private static final String REMOVE = "remove";
+ private static final String CONTAINS_ALL = "containsAll";
+ private static final String ADD_ALL = "addAll";
+ private static final String RETAIN_ALL = "retainAll";
+ private static final String REMOVE_ALL = "removeAll";
+ private static final String CLEAR = "clear";
+ private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
+
+ private final String name;
+ private final AsyncConsistentMap<E, Boolean> backingMap;
+ private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
+ private final MeteringAgent monitor;
+
+ public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
+ this.backingMap = backingMap;
+ this.name = name;
+ monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ final MeteringAgent.Context timer = monitor.startTimer(SIZE);
+ return backingMap.size().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
+ return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> contains(E element) {
+ final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
+ return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> add(E entry) {
+ final MeteringAgent.Context timer = monitor.startTimer(ADD);
+ return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(E entry) {
+ final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
+ return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
+ return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
+ return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
+ return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
+ .thenCompose(this::removeAll)
+ .whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
+ return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
+ return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
+ final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
+ return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+ if (mapEvent.type() == MapEvent.Type.INSERT) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+ } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+ }
+ };
+ if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+ return backingMap.addListener(mapEventListener);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+ if (mapEventListener != null) {
+ return backingMap.removeListener(mapEventListener);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 2d6a956..00a98a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -18,6 +18,7 @@
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -30,16 +31,15 @@
* <p>
* The initial value will be zero.
*/
-public class DefaultAtomicCounter implements AtomicCounter {
+public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicCounter asyncCounter;
- public DefaultAtomicCounter(String name,
- Database database,
- boolean meteringEnabled) {
- asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
+ public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
+ super(asyncCounter);
+ this.asyncCounter = asyncCounter;
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
index dba4443..6596f36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
@@ -53,9 +53,7 @@
@Override
public AtomicCounter build() {
- validateInputs();
- Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
- return new DefaultAtomicCounter(name, database, metering);
+ return new DefaultAtomicCounter(buildAsyncCounter());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
index 20bfd5f9..f61d330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
@@ -17,10 +17,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
import com.google.common.util.concurrent.Futures;
@@ -29,12 +31,13 @@
*
* @param <V> value type
*/
-public class DefaultAtomicValue<V> implements AtomicValue<V> {
+public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicValue<V> asyncValue;
public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
+ super(asyncValue);
this.asyncValue = asyncValue;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7841c16..dd8a5a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,9 +28,11 @@
import java.util.function.Predicate;
import java.util.Set;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
/**
@@ -40,18 +42,15 @@
* @param <K> type of key.
* @param <V> type of value.
*/
-public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
+public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final DefaultAsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
- public String name() {
- return asyncMap.name();
- }
-
public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+ super(asyncMap);
this.asyncMap = asyncMap;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index 5f69fde..edcd26a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -19,12 +19,14 @@
import com.google.common.util.concurrent.Futures;
import org.onlab.util.SharedExecutors;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
@@ -108,10 +110,16 @@
.whenComplete((r, e) -> timer.stop(e)));
}
+ @Override
public String name() {
return name;
}
+ @Override
+ public DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.QUEUE;
+ }
+
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index 42ed615..5092892 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -15,227 +15,138 @@
*/
package org.onosproject.store.consistent.impl;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.Synchronous;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * Implementation of distributed set that is backed by a ConsistentMap.
+ * Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
+ * and waits for the operation to complete.
* @param <E> set element type
*/
-public class DefaultDistributedSet<E> implements DistributedSet<E> {
+public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
- private static final String CONTAINS = "contains";
- private static final String PRIMITIVE_NAME = "distributedSet";
- private static final String SIZE = "size";
- private static final String IS_EMPTY = "isEmpty";
- private static final String ITERATOR = "iterator";
- private static final String TO_ARRAY = "toArray";
- private static final String ADD = "add";
- private static final String REMOVE = "remove";
- private static final String CONTAINS_ALL = "containsAll";
- private static final String ADD_ALL = "addAll";
- private static final String RETAIN_ALL = "retainAll";
- private static final String REMOVE_ALL = "removeAll";
- private static final String CLEAR = "clear";
+ private static final long OPERATION_TIMEOUT_MILLIS = 5000;
- private final String name;
- private final ConsistentMap<E, Boolean> backingMap;
- private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
- private final MeteringAgent monitor;
+ private final AsyncDistributedSet<E> asyncSet;
- public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
- this.name = name;
- this.backingMap = backingMap;
- monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+ public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
+ super(asyncSet);
+ this.asyncSet = asyncSet;
+ }
+
+ private static <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsistentMapException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new ConsistentMapException.Timeout();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ConsistentMapException) {
+ throw (ConsistentMapException) e.getCause();
+ } else {
+ throw new ConsistentMapException(e.getCause());
+ }
+ }
}
@Override
public int size() {
- final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- try {
- return backingMap.size();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.size());
}
@Override
public boolean isEmpty() {
- final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
- try {
- return backingMap.isEmpty();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.isEmpty());
}
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
- try {
- return backingMap.containsKey((E) o);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.contains((E) o));
}
@Override
public Iterator<E> iterator() {
- final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
- //Do we have to measure this guy?
- try {
- return backingMap.keySet().iterator();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.getAsImmutableSet()).iterator();
}
@Override
public Object[] toArray() {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- return backingMap.keySet().stream().toArray();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.getAsImmutableSet()).stream().toArray();
}
+ @SuppressWarnings("unchecked")
@Override
public <T> T[] toArray(T[] a) {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- // TODO: Optimize this to only allocate a new array if the set size
- // is larger than the array.length. If the set size is smaller than
- // the array.length then copy the data into the array and set the
- // last element in the array to be null.
- final T[] resizedArray =
- (T[]) Array.newInstance(a.getClass().getComponentType(), backingMap.keySet().size());
- return (T[]) backingMap.keySet().toArray(resizedArray);
- } finally {
- timer.stop(null);
- }
+ // TODO: Optimize this to only allocate a new array if the set size
+ // is larger than the array.length. If the set size is smaller than
+ // the array.length then copy the data into the array and set the
+ // last element in the array to be null.
+ final T[] resizedArray =
+ (T[]) Array.newInstance(a.getClass().getComponentType(), complete(asyncSet.getAsImmutableSet()).size());
+ return complete(asyncSet.getAsImmutableSet()).toArray(resizedArray);
}
@Override
public boolean add(E e) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD);
- try {
- return backingMap.putIfAbsent(e, true) == null;
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.add(e));
}
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- try {
- return backingMap.remove((E) o) != null;
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.remove((E) o));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean containsAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
- try {
- return c.stream()
- .allMatch(this::contains);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.containsAll((Collection<? extends E>) c));
}
@Override
public boolean addAll(Collection<? extends E> c) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
- try {
- return c.stream()
- .map(this::add)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.addAll(c));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean retainAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
- try {
- Set<?> retainSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(k -> !retainSet.contains(k))
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.retainAll((Collection<? extends E>) c));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean removeAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
- try {
- Set<?> removeSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(removeSet::contains)
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.removeAll((Collection<? extends E>) c));
}
@Override
public void clear() {
- final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
- try {
- backingMap.clear();
- } finally {
- timer.stop(null);
- }
+ complete(asyncSet.clear());
}
@Override
public void addListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
- if (mapEvent.type() == MapEvent.Type.INSERT) {
- listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
- } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
- listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
- }
- };
- if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
- backingMap.addListener(mapEventListener);
- }
+ complete(asyncSet.addListener(listener));
}
@Override
public void removeListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
- if (mapEventListener != null) {
- backingMap.removeListener(mapEventListener);
- }
+ complete(asyncSet.removeListener(listener));
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
index f7957f3..92e3e9a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -16,6 +16,7 @@
package org.onosproject.store.consistent.impl;
import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
@@ -88,6 +89,11 @@
@Override
public DistributedSet<E> build() {
- return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
+ return new DefaultDistributedSet<E>(buildAsyncSet());
+ }
+
+ @Override
+ public AsyncDistributedSet<E> buildAsyncSet() {
+ return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b5ea52e..649e5a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -273,6 +273,11 @@
}
@Override
+ public String name() {
+ return mapName;
+ }
+
+ @Override
public int size() {
checkState(!destroyed, destroyedMessage);
// TODO: Maintain a separate counter for tracking live elements in map.