Added DistributedPrimitive interface
Added AsyncDistributedSet that provides async set operations
Change-Id: I83494075a7973694ea6b7445ff4799b7a1a50641
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
index 0631f86..e50ece1 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnEventuallyConsistentMapAdapter.java
@@ -1,114 +1,120 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.vtnrsc.util;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-
-/**
- * Testing adapter for EventuallyConsistentMap.
- */
-public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
- @Override
- public int size() {
- return 0;
- }
-
- @Override
- public boolean isEmpty() {
- return false;
- }
-
- @Override
- public boolean containsKey(K key) {
- return false;
- }
-
- @Override
- public boolean containsValue(V value) {
- return false;
- }
-
- @Override
- public V get(K key) {
- return null;
- }
-
- @Override
- public void put(K key, V value) {
-
- }
-
- @Override
- public V remove(K key) {
- return null;
- }
-
- @Override
- public void remove(K key, V value) {
-
- }
-
- @Override
- public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
- return null;
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> m) {
-
- }
-
- @Override
- public void clear() {
-
- }
-
- @Override
- public Set<K> keySet() {
- return null;
- }
-
- @Override
- public Collection<V> values() {
- return null;
- }
-
- @Override
- public Set<Map.Entry<K, V>> entrySet() {
- return null;
- }
-
- @Override
- public void addListener(EventuallyConsistentMapListener<K, V> listener) {
-
- }
-
- @Override
- public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
-
- }
-
- @Override
- public void destroy() {
-
- }
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.vtnrsc.util;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+
+/**
+ * Testing adapter for EventuallyConsistentMap.
+ */
+public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return false;
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return false;
+ }
+
+ @Override
+ public boolean containsValue(V value) {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return null;
+ }
+
+ @Override
+ public void put(K key, V value) {
+
+ }
+
+ @Override
+ public V remove(K key) {
+ return null;
+ }
+
+ @Override
+ public void remove(K key, V value) {
+
+ }
+
+ @Override
+ public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
+ return null;
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+
+ }
+
+ @Override
+ public void clear() {
+
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return null;
+ }
+
+ @Override
+ public Collection<V> values() {
+ return null;
+ }
+
+ @Override
+ public Set<Map.Entry<K, V>> entrySet() {
+ return null;
+ }
+
+ @Override
+ public void addListener(EventuallyConsistentMapListener<K, V> listener) {
+
+ }
+
+ @Override
+ public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
index cfaf314..aa20f4e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicCounter.java
@@ -20,7 +20,12 @@
/**
* An async atomic counter dispenses monotonically increasing values.
*/
-public interface AsyncAtomicCounter {
+public interface AsyncAtomicCounter extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.COUNTER;
+ }
/**
* Atomically increment by one the current value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
index 531721a..60d8337 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncAtomicValue.java
@@ -27,7 +27,12 @@
*
* @param <V> value type
*/
-public interface AsyncAtomicValue<V> {
+public interface AsyncAtomicValue<V> extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.VALUE;
+ }
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index c7b6eac..d83c553 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -49,7 +49,12 @@
* the returned future will be {@link CompletableFuture#complete completed} when the
* operation finishes.
*/
-public interface AsyncConsistentMap<K, V> {
+public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.CONSISTENT_MAP;
+ }
/**
* Returns the number of entries in the map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java
new file mode 100644
index 0000000..df63f99
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncDistributedSet.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A distributed collection designed for holding unique elements.
+ * <p>
+ * All methods of {@code AsyncDistributedSet} immediately return a {@link CompletableFuture future}.
+ * The returned future will be {@link CompletableFuture#complete completed} when the operation
+ * completes.
+ *
+ * @param <E> set entry type
+ */
+public interface AsyncDistributedSet<E> extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.SET;
+ }
+
+ /**
+ * Registers the specified listener to be notified whenever
+ * the set is updated.
+ *
+ * @param listener listener to notify about set update events
+ */
+ CompletableFuture<Void> addListener(SetEventListener<E> listener);
+
+ /**
+ * Unregisters the specified listener.
+ *
+ * @param listener listener to unregister.
+ * @return CompletableFuture that is completed when the operation completes
+ */
+ CompletableFuture<Void> removeListener(SetEventListener<E> listener);
+
+ /**
+ * Adds the specified element to this set if it is not already present (optional operation).
+ * @param element element to add
+ * @return {@code true} if this set did not already contain the specified element.
+ */
+ CompletableFuture<Boolean> add(E element);
+
+ /**
+ * Removes the specified element to this set if it is present (optional operation).
+ * @param element element to remove
+ * @return {@code true} if this set contained the specified element
+ */
+ CompletableFuture<Boolean> remove(E element);
+
+ /**
+ * Returns the number of elements in the set.
+ * @return size of the set
+ */
+ CompletableFuture<Integer> size();
+
+ /**
+ * Returns if the set is empty.
+ * @return {@code true} if this set is empty
+ */
+ CompletableFuture<Boolean> isEmpty();
+
+ /**
+ * Removes all elements from the set.
+ */
+ CompletableFuture<Void> clear();
+
+ /**
+ * Returns if this set contains the specified element.
+ * @param element element to check
+ * @return {@code true} if this set contains the specified element
+ */
+ CompletableFuture<Boolean> contains(E element);
+
+ /**
+ * Adds all of the elements in the specified collection to this set if they're not
+ * already present (optional operation).
+ * @param c collection containing elements to be added to this set
+ * @return {@code true} if this set contains all elements in the collection
+ */
+ CompletableFuture<Boolean> addAll(Collection<? extends E> c);
+
+ /**
+ * Returns if this set contains all the elements in specified collection.
+ * @param c collection
+ * @return {@code true} if this set contains all elements in the collection
+ */
+ CompletableFuture<Boolean> containsAll(Collection<? extends E> c);
+
+ /**
+ * Retains only the elements in this set that are contained in the specified collection (optional operation).
+ * @param c collection containing elements to be retained in this set
+ * @return {@code true} if this set changed as a result of the call
+ */
+ CompletableFuture<Boolean> retainAll(Collection<? extends E> c);
+
+ /**
+ * Removes from this set all of its elements that are contained in the specified collection (optional operation).
+ * If the specified collection is also a set, this operation effectively modifies this set so that its
+ * value is the asymmetric set difference of the two sets.
+ * @param c collection containing elements to be removed from this set
+ * @return {@code true} if this set changed as a result of the call
+ */
+ CompletableFuture<Boolean> removeAll(Collection<? extends E> c);
+
+ /**
+ * Returns the entries as a immutable set. The returned set is a snapshot and will not reflect new changes made to
+ * this AsyncDistributedSet
+ * @return immutable set copy
+ */
+ CompletableFuture<? extends Set<E>> getAsImmutableSet();
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
index 3c9e02c..592f67a0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicCounter.java
@@ -18,7 +18,12 @@
/**
* Distributed version of java.util.concurrent.atomic.AtomicLong.
*/
-public interface AtomicCounter {
+public interface AtomicCounter extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.COUNTER;
+ }
/**
* Atomically increment by one the current value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
index dfa0fb3..bb9b56b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AtomicValue.java
@@ -20,7 +20,7 @@
*
* @param <V> value type
*/
-public interface AtomicValue<V> {
+public interface AtomicValue<V> extends DistributedPrimitive {
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 93abf78..19f8954 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -44,7 +44,7 @@
* (which extends RuntimeException) to indicate failures.
*
*/
-public interface ConsistentMap<K, V> {
+public interface ConsistentMap<K, V> extends DistributedPrimitive {
/**
* Returns the number of entries in the map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
new file mode 100644
index 0000000..342a110
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import org.onosproject.core.ApplicationId;
+
+/**
+ * Interface for all distributed primitives.
+ */
+public interface DistributedPrimitive {
+
+ /**
+ * Type of distributed primitive.
+ */
+ public enum Type {
+ /**
+ * Map with strong consistency semantics.
+ */
+ CONSISTENT_MAP,
+
+ /**
+ * Map with eventual consistency semantics.
+ */
+ EVENTUALLY_CONSISTENT_MAP,
+
+ /**
+ * distributed set.
+ */
+ SET,
+
+ /**
+ * atomic counter.
+ */
+ COUNTER,
+
+ /**
+ * Atomic value.
+ */
+ VALUE,
+
+ /**
+ * Distributed queue.
+ */
+ QUEUE
+ }
+
+ /**
+ * Returns the name of this primitive.
+ * @return name
+ */
+ String name();
+
+ /**
+ * Returns the type of primitive.
+ * @return primitive type
+ */
+ Type type();
+
+ /**
+ * Returns the application owning this primitive.
+ */
+ default ApplicationId applicationId() {
+ return null;
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
index cc0b00d..777152c 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedQueue.java
@@ -24,7 +24,7 @@
*
* @param <E> queue entry type
*/
-public interface DistributedQueue<E> {
+public interface DistributedQueue<E> extends DistributedPrimitive {
/**
* Returns total number of entries in the queue.
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
index 460206e..c59462b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSet.java
@@ -22,7 +22,7 @@
*
* @param <E> set entry type
*/
-public interface DistributedSet<E> extends Set<E> {
+public interface DistributedSet<E> extends Set<E>, DistributedPrimitive {
/**
* Registers the specified listener to be notified whenever
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index f5a44c9..09ba794 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -129,4 +129,13 @@
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
DistributedSet<E> build();
+
+ /**
+ * Builds an {@link AsyncDistributedSet async set} based on the configuration options
+ * supplied to this builder.
+ *
+ * @return new AsyncDistributedSet
+ * @throws java.lang.RuntimeException if a mandatory parameter is missing
+ */
+ AsyncDistributedSet<E> buildAsyncSet();
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
index 06395b8..2369619 100644
--- a/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/EventuallyConsistentMap.java
@@ -39,7 +39,12 @@
* Null values are not allowed in this map.
* </p>
*/
-public interface EventuallyConsistentMap<K, V> {
+public interface EventuallyConsistentMap<K, V> extends DistributedPrimitive {
+
+ @Override
+ default DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.EVENTUALLY_CONSISTENT_MAP;
+ }
/**
* Returns the number of key-value mappings in this map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/Synchronous.java b/core/api/src/main/java/org/onosproject/store/service/Synchronous.java
new file mode 100644
index 0000000..be65382
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Synchronous.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * DistributedPrimitive that is a synchronous (blocking) version of
+ * another.
+ *
+ * @param <T> type of DistributedPrimitive
+ */
+public abstract class Synchronous<T extends DistributedPrimitive> implements DistributedPrimitive {
+
+ private final T primitive;
+
+ public Synchronous(T primitive) {
+ this.primitive = primitive;
+ }
+
+ @Override
+ public String name() {
+ return primitive.name();
+ }
+
+ @Override
+ public Type type() {
+ return primitive.type();
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index a7a6ce8..ba9cfc2 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -26,6 +26,17 @@
* Testing adapter for the consistent map.
*/
public class ConsistentMapAdapter<K, V> implements ConsistentMap<K, V> {
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.CONSISTENT_MAP;
+ }
+
@Override
public int size() {
return 0;
diff --git a/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
index 07f5fb4..1f4af9e 100644
--- a/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/EventuallyConsistentMapAdapter.java
@@ -20,10 +20,23 @@
import java.util.Set;
import java.util.function.BiFunction;
+import org.onosproject.store.service.DistributedPrimitive.Type;
+
/**
* Testing adapter for EventuallyConsistentMap.
*/
public class EventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type type() {
+ return Type.EVENTUALLY_CONSISTENT_MAP;
+ }
+
@Override
public int size() {
return 0;
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
index 8c577df..337c090 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAtomicCounter.java
@@ -23,6 +23,16 @@
public final class TestAtomicCounter implements AtomicCounter {
final AtomicLong value;
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type type() {
+ return Type.COUNTER;
+ }
+
private TestAtomicCounter() {
value = new AtomicLong();
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
index 57b36ab..4133095 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMap.java
@@ -28,7 +28,7 @@
import java.util.stream.Collectors;
import org.onosproject.core.ApplicationId;
-import static org.onosproject.store.service.MapEvent.Type;
+
import static org.onosproject.store.service.MapEvent.Type.*;
/**
@@ -37,7 +37,7 @@
public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
private final List<MapEventListener<K, V>> listeners;
- private final HashMap<K, V> map;
+ private final Map<K, V> map;
private final String mapName;
private final AtomicLong counter = new AtomicLong(0);
@@ -54,7 +54,7 @@
/**
* Notify all listeners of an event.
*/
- private void notifyListeners(String mapName, Type type,
+ private void notifyListeners(String mapName, MapEvent.Type type,
K key, Versioned<V> value) {
MapEvent<K, V> event = new MapEvent<>(mapName, type, key, value);
listeners.forEach(
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
index d851eaa..e6972c2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
@@ -51,6 +51,11 @@
}
@Override
+ public String name() {
+ return name;
+ }
+
+ @Override
public CompletableFuture<Long> incrementAndGet() {
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
index 454d46c..c223565 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicValue.java
@@ -55,6 +55,11 @@
}
@Override
+ public String name() {
+ return name;
+ }
+
+ @Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
CompletableFuture<Boolean> response;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 46a097c..b99bfbf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -171,6 +171,7 @@
* Returns this map name.
* @return map name
*/
+ @Override
public String name() {
return name;
}
@@ -187,6 +188,7 @@
* Returns the applicationId owning this map.
* @return application Id
*/
+ @Override
public ApplicationId applicationId() {
return applicationId;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
new file mode 100644
index 0000000..d67b635
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncDistributedSet.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2015-2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.SetEvent;
+import org.onosproject.store.service.SetEventListener;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link AsyncDistributedSet}.
+ *
+ * @param <E> set entry type
+ */
+public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
+
+ private static final String CONTAINS = "contains";
+ private static final String PRIMITIVE_NAME = "distributedSet";
+ private static final String SIZE = "size";
+ private static final String IS_EMPTY = "isEmpty";
+ private static final String ADD = "add";
+ private static final String REMOVE = "remove";
+ private static final String CONTAINS_ALL = "containsAll";
+ private static final String ADD_ALL = "addAll";
+ private static final String RETAIN_ALL = "retainAll";
+ private static final String REMOVE_ALL = "removeAll";
+ private static final String CLEAR = "clear";
+ private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
+
+ private final String name;
+ private final AsyncConsistentMap<E, Boolean> backingMap;
+ private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
+ private final MeteringAgent monitor;
+
+ public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
+ this.backingMap = backingMap;
+ this.name = name;
+ monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ final MeteringAgent.Context timer = monitor.startTimer(SIZE);
+ return backingMap.size().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
+ return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> contains(E element) {
+ final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
+ return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> add(E entry) {
+ final MeteringAgent.Context timer = monitor.startTimer(ADD);
+ return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(E entry) {
+ final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
+ return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
+ return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
+ return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
+ return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
+ .thenCompose(this::removeAll)
+ .whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
+ final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
+ return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
+ v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
+ return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
+ final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
+ return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
+ if (mapEvent.type() == MapEvent.Type.INSERT) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
+ } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
+ listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
+ }
+ };
+ if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
+ return backingMap.addListener(mapEventListener);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
+ MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
+ if (mapEventListener != null) {
+ return backingMap.removeListener(mapEventListener);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
index 2d6a956..00a98a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
@@ -18,6 +18,7 @@
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -30,16 +31,15 @@
* <p>
* The initial value will be zero.
*/
-public class DefaultAtomicCounter implements AtomicCounter {
+public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicCounter asyncCounter;
- public DefaultAtomicCounter(String name,
- Database database,
- boolean meteringEnabled) {
- asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
+ public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
+ super(asyncCounter);
+ this.asyncCounter = asyncCounter;
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
index dba4443..6596f36 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
@@ -53,9 +53,7 @@
@Override
public AtomicCounter build() {
- validateInputs();
- Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
- return new DefaultAtomicCounter(name, database, metering);
+ return new DefaultAtomicCounter(buildAsyncCounter());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
index 20bfd5f9..f61d330 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicValue.java
@@ -17,10 +17,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.Synchronous;
import com.google.common.util.concurrent.Futures;
@@ -29,12 +31,13 @@
*
* @param <V> value type
*/
-public class DefaultAtomicValue<V> implements AtomicValue<V> {
+public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicValue<V> asyncValue;
public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
+ super(asyncValue);
this.asyncValue = asyncValue;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 7841c16..dd8a5a9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -28,9 +28,11 @@
import java.util.function.Predicate;
import java.util.Set;
+import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
/**
@@ -40,18 +42,15 @@
* @param <K> type of key.
* @param <V> type of value.
*/
-public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
+public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final DefaultAsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
- public String name() {
- return asyncMap.name();
- }
-
public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
+ super(asyncMap);
this.asyncMap = asyncMap;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
index 5f69fde..edcd26a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -19,12 +19,14 @@
import com.google.common.util.concurrent.Futures;
import org.onlab.util.SharedExecutors;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
@@ -108,10 +110,16 @@
.whenComplete((r, e) -> timer.stop(e)));
}
+ @Override
public String name() {
return name;
}
+ @Override
+ public DistributedPrimitive.Type type() {
+ return DistributedPrimitive.Type.QUEUE;
+ }
+
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
index 42ed615..5092892 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSet.java
@@ -15,227 +15,138 @@
*/
package org.onosproject.store.consistent.impl;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedSet;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
+import org.onosproject.store.service.Synchronous;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * Implementation of distributed set that is backed by a ConsistentMap.
+ * Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
+ * and waits for the operation to complete.
* @param <E> set element type
*/
-public class DefaultDistributedSet<E> implements DistributedSet<E> {
+public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
- private static final String CONTAINS = "contains";
- private static final String PRIMITIVE_NAME = "distributedSet";
- private static final String SIZE = "size";
- private static final String IS_EMPTY = "isEmpty";
- private static final String ITERATOR = "iterator";
- private static final String TO_ARRAY = "toArray";
- private static final String ADD = "add";
- private static final String REMOVE = "remove";
- private static final String CONTAINS_ALL = "containsAll";
- private static final String ADD_ALL = "addAll";
- private static final String RETAIN_ALL = "retainAll";
- private static final String REMOVE_ALL = "removeAll";
- private static final String CLEAR = "clear";
+ private static final long OPERATION_TIMEOUT_MILLIS = 5000;
- private final String name;
- private final ConsistentMap<E, Boolean> backingMap;
- private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
- private final MeteringAgent monitor;
+ private final AsyncDistributedSet<E> asyncSet;
- public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
- this.name = name;
- this.backingMap = backingMap;
- monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
+ public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
+ super(asyncSet);
+ this.asyncSet = asyncSet;
+ }
+
+ private static <T> T complete(CompletableFuture<T> future) {
+ try {
+ return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsistentMapException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new ConsistentMapException.Timeout();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof ConsistentMapException) {
+ throw (ConsistentMapException) e.getCause();
+ } else {
+ throw new ConsistentMapException(e.getCause());
+ }
+ }
}
@Override
public int size() {
- final MeteringAgent.Context timer = monitor.startTimer(SIZE);
- try {
- return backingMap.size();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.size());
}
@Override
public boolean isEmpty() {
- final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
- try {
- return backingMap.isEmpty();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.isEmpty());
}
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
- try {
- return backingMap.containsKey((E) o);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.contains((E) o));
}
@Override
public Iterator<E> iterator() {
- final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
- //Do we have to measure this guy?
- try {
- return backingMap.keySet().iterator();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.getAsImmutableSet()).iterator();
}
@Override
public Object[] toArray() {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- return backingMap.keySet().stream().toArray();
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.getAsImmutableSet()).stream().toArray();
}
+ @SuppressWarnings("unchecked")
@Override
public <T> T[] toArray(T[] a) {
- final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
- try {
- // TODO: Optimize this to only allocate a new array if the set size
- // is larger than the array.length. If the set size is smaller than
- // the array.length then copy the data into the array and set the
- // last element in the array to be null.
- final T[] resizedArray =
- (T[]) Array.newInstance(a.getClass().getComponentType(), backingMap.keySet().size());
- return (T[]) backingMap.keySet().toArray(resizedArray);
- } finally {
- timer.stop(null);
- }
+ // TODO: Optimize this to only allocate a new array if the set size
+ // is larger than the array.length. If the set size is smaller than
+ // the array.length then copy the data into the array and set the
+ // last element in the array to be null.
+ final T[] resizedArray =
+ (T[]) Array.newInstance(a.getClass().getComponentType(), complete(asyncSet.getAsImmutableSet()).size());
+ return complete(asyncSet.getAsImmutableSet()).toArray(resizedArray);
}
@Override
public boolean add(E e) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD);
- try {
- return backingMap.putIfAbsent(e, true) == null;
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.add(e));
}
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
- try {
- return backingMap.remove((E) o) != null;
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.remove((E) o));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean containsAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
- try {
- return c.stream()
- .allMatch(this::contains);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.containsAll((Collection<? extends E>) c));
}
@Override
public boolean addAll(Collection<? extends E> c) {
- final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
- try {
- return c.stream()
- .map(this::add)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.addAll(c));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean retainAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
- try {
- Set<?> retainSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(k -> !retainSet.contains(k))
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.retainAll((Collection<? extends E>) c));
}
+ @SuppressWarnings("unchecked")
@Override
public boolean removeAll(Collection<?> c) {
- final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
- try {
- Set<?> removeSet = Sets.newHashSet(c);
- return backingMap.keySet()
- .stream()
- .filter(removeSet::contains)
- .map(this::remove)
- .reduce(Boolean::logicalOr)
- .orElse(false);
- } finally {
- timer.stop(null);
- }
+ return complete(asyncSet.removeAll((Collection<? extends E>) c));
}
@Override
public void clear() {
- final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
- try {
- backingMap.clear();
- } finally {
- timer.stop(null);
- }
+ complete(asyncSet.clear());
}
@Override
public void addListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
- if (mapEvent.type() == MapEvent.Type.INSERT) {
- listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
- } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
- listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
- }
- };
- if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
- backingMap.addListener(mapEventListener);
- }
+ complete(asyncSet.addListener(listener));
}
@Override
public void removeListener(SetEventListener<E> listener) {
- MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
- if (mapEventListener != null) {
- backingMap.removeListener(mapEventListener);
- }
+ complete(asyncSet.removeListener(listener));
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
index f7957f3..92e3e9a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -16,6 +16,7 @@
package org.onosproject.store.consistent.impl;
import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
@@ -88,6 +89,11 @@
@Override
public DistributedSet<E> build() {
- return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
+ return new DefaultDistributedSet<E>(buildAsyncSet());
+ }
+
+ @Override
+ public AsyncDistributedSet<E> buildAsyncSet() {
+ return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index b5ea52e..649e5a8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -273,6 +273,11 @@
}
@Override
+ public String name() {
+ return mapName;
+ }
+
+ @Override
public int size() {
checkState(!destroyed, destroyedMessage);
// TODO: Maintain a separate counter for tracking live elements in map.