[ONOS-6267] Support configurable Executors for primitives
- Support user-provided Executors in primitive builders
- Implement default per-partition per-primitive serial executor using a shared thread pool
- Implement Executor wrappers for all primitive types
Change-Id: I53acfb173a9b49a992a9a388983791d9735ed54a
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
index a122125..f9939da 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveBuilder.java
@@ -15,6 +15,9 @@
*/
package org.onosproject.store.primitives;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.Serializer;
@@ -27,10 +30,11 @@
public abstract class DistributedPrimitiveBuilder<B extends DistributedPrimitiveBuilder<B, T>,
T extends DistributedPrimitive> {
- private DistributedPrimitive.Type type;
+ private final DistributedPrimitive.Type type;
private String name;
private ApplicationId applicationId;
private Serializer serializer;
+ private Supplier<Executor> executorSupplier;
private boolean partitionsDisabled = false;
private boolean meteringDisabled = false;
private boolean readOnly = false;
@@ -63,6 +67,31 @@
}
/**
+ * Sets the executor to use for asynchronous callbacks.
+ * <p>
+ * For partitioned primitives, the provided executor will be shared across all partitions.
+ *
+ * @param executor the executor to use for asynchronous callbacks
+ * @return this builder
+ */
+ public B withExecutor(Executor executor) {
+ return withExecutorSupplier(() -> executor);
+ }
+
+ /**
+ * Sets the supplier to be used to create executors.
+ * <p>
+ * When a factory is set, the supplier will be used to create a separate executor for each partition.
+ *
+ * @param executorSupplier the executor supplier
+ * @return this builder
+ */
+ public B withExecutorSupplier(Supplier<Executor> executorSupplier) {
+ this.executorSupplier = executorSupplier;
+ return (B) this;
+ }
+
+ /**
* Sets the application id that owns this primitive.
*
* @param applicationId application identifier
@@ -148,6 +177,15 @@
}
/**
+ * Returns the executor supplier.
+ *
+ * @return executor supplier
+ */
+ public final Supplier<Executor> executorSupplier() {
+ return executorSupplier;
+ }
+
+ /**
* Returns the application identifier.
*
* @return application id
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index d034261..6ee43e5 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -28,6 +28,8 @@
import org.onosproject.store.service.WorkQueue;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
/**
* Interface for entity that can create instances of different distributed primitives.
@@ -43,7 +45,22 @@
* @param <V> value type
* @return map
*/
- <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer);
+ default <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ return newAsyncConsistentMap(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncConsistentMap}.
+ *
+ * @param name map name
+ * @param serializer serializer to use for serializing/deserializing map entries
+ * @param executorSupplier a callback that returns an executor to be used for each partition
+ * @param <K> key type
+ * @param <V> value type
+ * @return map
+ */
+ <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncConsistentTreeMap}.
@@ -53,8 +70,22 @@
* @param <V> value type
* @return distributedTreeMap
*/
+ default <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+ String name, Serializer serializer) {
+ return newAsyncConsistentTreeMap(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncConsistentTreeMap}.
+ *
+ * @param name tree name
+ * @param serializer serializer to use for serializing/deserializing map entries
+ * @param executorSupplier a callback that returns an executor to be used for each partition
+ * @param <V> value type
+ * @return distributedTreeMap
+ */
<V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
- String name, Serializer serializer);
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new set backed {@code AsyncConsistentMultimap}.
@@ -65,8 +96,23 @@
* @param <V> value type
* @return set backed distributedMultimap
*/
+ default <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
+ String name, Serializer serializer) {
+ return newAsyncConsistentSetMultimap(name, serializer, null);
+ }
+
+ /**
+ * Creates a new set backed {@code AsyncConsistentMultimap}.
+ *
+ * @param name multimap name
+ * @param serializer serializer to use for serializing/deserializing
+ * @param executorSupplier a callback that returns an executor to be used for each partition
+ * @param <K> key type
+ * @param <V> value type
+ * @return set backed distributedMultimap
+ */
<K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
- String name, Serializer serializer);
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncAtomicCounterMap}.
@@ -76,8 +122,22 @@
* @param <K> key type
* @return atomic counter map
*/
+ default <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+ String name, Serializer serializer) {
+ return newAsyncAtomicCounterMap(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncAtomicCounterMap}.
+ *
+ * @param name counter map name
+ * @param serializer serializer to use for serializing/deserializing keys
+ * @param executorSupplier a callback that returns an executor to be used for each partition
+ * @param <K> key type
+ * @return atomic counter map
+ */
<K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
- String name, Serializer serializer);
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncAtomicCounter}.
@@ -85,7 +145,18 @@
* @param name counter name
* @return counter
*/
- AsyncAtomicCounter newAsyncCounter(String name);
+ default AsyncAtomicCounter newAsyncCounter(String name) {
+ return newAsyncCounter(name, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncAtomicCounter}.
+ *
+ * @param name counter name
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @return counter
+ */
+ AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncAtomicValue}.
@@ -95,7 +166,21 @@
* @param <V> value type
* @return value
*/
- <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer);
+ default <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
+ return newAsyncAtomicValue(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncAtomicValue}.
+ *
+ * @param name value name
+ * @param serializer serializer to use for serializing/deserializing value type
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @param <V> value type
+ * @return value
+ */
+ <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncDistributedSet}.
@@ -105,7 +190,21 @@
* @param <E> set entry type
* @return set
*/
- <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer);
+ default <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
+ return newAsyncDistributedSet(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncDistributedSet}.
+ *
+ * @param name set name
+ * @param serializer serializer to use for serializing/deserializing set entries
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @param <E> set entry type
+ * @return set
+ */
+ <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncLeaderElector}.
@@ -113,7 +212,18 @@
* @param name leader elector name
* @return leader elector
*/
- AsyncLeaderElector newAsyncLeaderElector(String name);
+ default AsyncLeaderElector newAsyncLeaderElector(String name) {
+ return newAsyncLeaderElector(name, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncLeaderElector}.
+ *
+ * @param name leader elector name
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @return leader elector
+ */
+ AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code WorkQueue}.
@@ -123,7 +233,20 @@
* @param serializer serializer
* @return work queue
*/
- <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
+ default <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
+ return newWorkQueue(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code WorkQueue}.
+ *
+ * @param <E> work element type
+ * @param name work queue name
+ * @param serializer serializer
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @return work queue
+ */
+ <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Creates a new {@code AsyncDocumentTree}.
@@ -133,7 +256,21 @@
* @param serializer serializer
* @return document tree
*/
- <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer);
+ default <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ return newAsyncDocumentTree(name, serializer, null);
+ }
+
+ /**
+ * Creates a new {@code AsyncDocumentTree}.
+ *
+ * @param <V> document tree node value type
+ * @param name tree name
+ * @param serializer serializer
+ * @param executorSupplier a callback that returns an executor to be used asynchronous callbacks
+ * @return document tree
+ */
+ <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier);
/**
* Returns the names of all created {@code AsyncConsistentMap} instances.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
index 45cf193..a189eb2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterBuilder.java
@@ -32,6 +32,6 @@
@Override
public AsyncAtomicCounter build() {
- return primitiveCreator.newAsyncCounter(name());
+ return primitiveCreator.newAsyncCounter(name(), executorSupplier());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
index 309220a..c92ef22 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicCounterMapBuilder.java
@@ -33,7 +33,7 @@
@Override
public AsyncAtomicCounterMap<K> buildAsyncMap() {
- return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer());
+ return primitiveCreator.newAsyncAtomicCounterMap(name(), serializer(), executorSupplier());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
index b17983c..a63fe4be 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultAtomicValueBuilder.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -37,6 +38,12 @@
}
@Override
+ public AtomicValueBuilder<V> withExecutorSupplier(Supplier<Executor> executorSupplier) {
+ mapBuilder.withExecutorSupplier(executorSupplier);
+ return this;
+ }
+
+ @Override
public AsyncAtomicValue<V> build() {
return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
checkNotNull(serializer()),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
index b5284de..820174d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMapBuilder.java
@@ -41,7 +41,7 @@
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
- AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer());
+ AsyncConsistentMap<K, V> map = primitiveCreator.newAsyncConsistentMap(name(), serializer(), executorSupplier());
map = relaxedReadConsistency() ? DistributedPrimitives.newCachingMap(map) : map;
map = readOnly() ? DistributedPrimitives.newUnmodifiableMap(map) : map;
return meteringEnabled() ? DistributedPrimitives.newMeteredMap(map) : map;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
index 976862b..ba7d673 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentMultimapBuilder.java
@@ -36,8 +36,7 @@
@Override
public AsyncConsistentMultimap<K, V> buildMultimap() {
- return primitiveCreator.newAsyncConsistentSetMultimap(
- name(), serializer());
+ return primitiveCreator.newAsyncConsistentSetMultimap(name(), serializer(), executorSupplier());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
index 65a364e..5e2a8b4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultConsistentTreeMapBuilder.java
@@ -35,8 +35,7 @@
@Override
public AsyncConsistentTreeMap<V> buildTreeMap() {
- return primitiveCreator.newAsyncConsistentTreeMap(name(),
- serializer());
+ return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer(), executorSupplier());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
index c17f91d..5e95180 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedSetBuilder.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.concurrent.Executor;
import java.util.function.Supplier;
import org.onosproject.core.ApplicationId;
@@ -53,6 +54,12 @@
}
@Override
+ public DistributedSetBuilder<E> withExecutorSupplier(Supplier<Executor> executorSupplier) {
+ mapBuilder.withExecutorSupplier(executorSupplier);
+ return this;
+ }
+
+ @Override
public DistributedSetBuilder<E> withPurgeOnUninstall() {
mapBuilder.withPurgeOnUninstall();
return this;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
index 8d21e60..65c0504 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDocumentTreeBuilder.java
@@ -47,6 +47,6 @@
@Deprecated
@Override
public AsyncDocumentTree<V> build() {
- return primitiveCreator.newAsyncDocumentTree(name(), serializer());
+ return primitiveCreator.newAsyncDocumentTree(name(), serializer(), executorSupplier());
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
index 69788f9..6f8f55d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultLeaderElectorBuilder.java
@@ -32,6 +32,6 @@
@Override
public AsyncLeaderElector build() {
- return primitiveCreator.newAsyncLeaderElector(name());
+ return primitiveCreator.newAsyncLeaderElector(name(), executorSupplier());
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index c0b323a..15279e0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -16,8 +16,6 @@
package org.onosproject.store.primitives.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.util.Collection;
import java.util.Map.Entry;
import java.util.Objects;
@@ -28,7 +26,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
-import org.onosproject.core.ApplicationId;
+import com.google.common.base.MoreObjects;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
@@ -37,8 +35,6 @@
import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
-import com.google.common.base.MoreObjects;
-
/**
* {@code AsyncConsistentMap} that merely delegates control to
* another AsyncConsistentMap.
@@ -46,22 +42,14 @@
* @param <K> key type
* @param <V> value type
*/
-public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
+public class DelegatingAsyncConsistentMap<K, V>
+ extends DelegatingDistributedPrimitive implements AsyncConsistentMap<K, V> {
private final AsyncConsistentMap<K, V> delegateMap;
DelegatingAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap) {
- this.delegateMap = checkNotNull(delegateMap, "delegate map cannot be null");
- }
-
- @Override
- public String name() {
- return delegateMap.name();
- }
-
- @Override
- public ApplicationId applicationId() {
- return delegateMap.applicationId();
+ super(delegateMap);
+ this.delegateMap = delegateMap;
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index a425c32..5bd37c5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,19 +16,15 @@
package org.onosproject.store.primitives.impl;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
import com.google.common.collect.Multiset;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.Versioned;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
/**
* {@code AsyncConsistentMultimap} that merely delegates control to
* another AsyncConsistentMultimap.
@@ -37,18 +33,14 @@
* @param <V> value type
*/
public class DelegatingAsyncConsistentMultimap<K, V>
- implements AsyncConsistentMultimap<K, V> {
+ extends DelegatingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
private final AsyncConsistentMultimap<K, V> delegateMap;
public DelegatingAsyncConsistentMultimap(
AsyncConsistentMultimap<K, V> delegateMap) {
- this.delegateMap = Preconditions.checkNotNull(delegateMap);
- }
-
- @Override
- public String name() {
- return delegateMap.name();
+ super(delegateMap);
+ this.delegateMap = delegateMap;
}
@Override
@@ -144,41 +136,4 @@
public CompletableFuture<Map<K, Collection<V>>> asMap() {
return delegateMap.asMap();
}
-
- @Override
- public void addStatusChangeListener(Consumer<Status> listener) {
- delegateMap.addStatusChangeListener(listener);
- }
-
- @Override
- public void removeStatusChangeListener(Consumer<Status> listener) {
- delegateMap.removeStatusChangeListener(listener);
- }
-
- @Override
- public Collection<Consumer<Status>> statusChangeListeners() {
- return delegateMap.statusChangeListeners();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("delegateMap", delegateMap)
- .toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(delegateMap);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof DelegatingAsyncConsistentMultimap) {
- DelegatingAsyncConsistentMultimap<K, V> that =
- (DelegatingAsyncConsistentMultimap) other;
- return this.delegateMap.equals(that.delegateMap);
- }
- return false;
- }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index f474f7b..972fd72 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -16,14 +16,6 @@
package org.onosproject.store.primitives.impl;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentTreeMap;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
@@ -35,6 +27,14 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -42,11 +42,12 @@
* of {@link AsyncConsistentTreeMap}.
*/
public class DelegatingAsyncConsistentTreeMap<V>
- implements AsyncConsistentTreeMap<V> {
+ extends DelegatingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
private final AsyncConsistentTreeMap<V> delegateMap;
DelegatingAsyncConsistentTreeMap(AsyncConsistentTreeMap<V> delegateMap) {
+ super(delegateMap);
this.delegateMap = checkNotNull(delegateMap,
"delegate map cannot be null");
}
@@ -137,11 +138,6 @@
}
@Override
- public String name() {
- return delegateMap.name();
- }
-
- @Override
public CompletableFuture<Integer> size() {
return delegateMap.size();
}
@@ -293,5 +289,4 @@
public int hashCode() {
return Objects.hash(delegateMap);
}
-
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java
new file mode 100644
index 0000000..10659d1
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingDistributedPrimitive.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.store.service.DistributedPrimitive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for primitive delegates.
+ */
+public abstract class DelegatingDistributedPrimitive implements DistributedPrimitive {
+ private final DistributedPrimitive primitive;
+
+ public DelegatingDistributedPrimitive(DistributedPrimitive primitive) {
+ this.primitive = checkNotNull(primitive);
+ }
+
+ @Override
+ public String name() {
+ return primitive.name();
+ }
+
+ @Override
+ public Type primitiveType() {
+ return primitive.primitiveType();
+ }
+
+ @Override
+ public ApplicationId applicationId() {
+ return primitive.applicationId();
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return primitive.destroy();
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ primitive.addStatusChangeListener(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ primitive.removeStatusChangeListener(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return primitive.statusChangeListeners();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("delegate", primitive)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(primitive);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof DelegatingDistributedPrimitive
+ && primitive.equals(((DelegatingDistributedPrimitive) other).primitive);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
new file mode 100644
index 0000000..5d72838
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounter.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicCounter;
+
+/**
+ * {@link AsyncAtomicCounter} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicCounter extends ExecutingDistributedPrimitive implements AsyncAtomicCounter {
+ private final AsyncAtomicCounter delegateCounter;
+ private final Executor executor;
+
+ public ExecutingAsyncAtomicCounter(AsyncAtomicCounter delegateCounter, Executor executor) {
+ super(delegateCounter, executor);
+ this.delegateCounter = delegateCounter;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Long> incrementAndGet() {
+ return Tools.asyncFuture(delegateCounter.incrementAndGet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndIncrement() {
+ return Tools.asyncFuture(delegateCounter.getAndIncrement(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndAdd(long delta) {
+ return Tools.asyncFuture(delegateCounter.getAndAdd(delta), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> addAndGet(long delta) {
+ return Tools.asyncFuture(delegateCounter.addAndGet(delta), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> get() {
+ return Tools.asyncFuture(delegateCounter.get(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> set(long value) {
+ return Tools.asyncFuture(delegateCounter.set(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
+ return Tools.asyncFuture(delegateCounter.compareAndSet(expectedValue, updateValue), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
new file mode 100644
index 0000000..9a039f2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicCounterMap.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicCounterMap;
+
+/**
+ * {@link org.onosproject.store.service.AsyncAtomicCounterMap} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicCounterMap<K>
+ extends ExecutingDistributedPrimitive implements AsyncAtomicCounterMap<K> {
+ private final AsyncAtomicCounterMap<K> delegateMap;
+ private final Executor executor;
+
+ public ExecutingAsyncAtomicCounterMap(AsyncAtomicCounterMap<K> delegateMap, Executor executor) {
+ super(delegateMap, executor);
+ this.delegateMap = delegateMap;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Long> incrementAndGet(K key) {
+ return Tools.asyncFuture(delegateMap.incrementAndGet(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> decrementAndGet(K key) {
+ return Tools.asyncFuture(delegateMap.decrementAndGet(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndIncrement(K key) {
+ return Tools.asyncFuture(delegateMap.getAndIncrement(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndDecrement(K key) {
+ return Tools.asyncFuture(delegateMap.getAndDecrement(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> addAndGet(K key, long delta) {
+ return Tools.asyncFuture(delegateMap.addAndGet(key, delta), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> getAndAdd(K key, long delta) {
+ return Tools.asyncFuture(delegateMap.getAndAdd(key, delta), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> get(K key) {
+ return Tools.asyncFuture(delegateMap.get(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> put(K key, long newValue) {
+ return Tools.asyncFuture(delegateMap.put(key, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> putIfAbsent(K key, long newValue) {
+ return Tools.asyncFuture(delegateMap.putIfAbsent(key, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(K key, long expectedOldValue, long newValue) {
+ return Tools.asyncFuture(delegateMap.replace(key, expectedOldValue, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Long> remove(K key) {
+ return Tools.asyncFuture(delegateMap.remove(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, long value) {
+ return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return Tools.asyncFuture(delegateMap.size(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return Tools.asyncFuture(delegateMap.isEmpty(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return Tools.asyncFuture(delegateMap.clear(), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
new file mode 100644
index 0000000..40eacc6
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncAtomicValue.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.AtomicValueEventListener;
+
+/**
+ * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncAtomicValue<V> extends ExecutingDistributedPrimitive implements AsyncAtomicValue<V> {
+ private final AsyncAtomicValue<V> delegateValue;
+ private final Executor executor;
+ private final Map<AtomicValueEventListener<V>, AtomicValueEventListener<V>> listenerMap = Maps.newConcurrentMap();
+
+ public ExecutingAsyncAtomicValue(AsyncAtomicValue<V> delegateValue, Executor executor) {
+ super(delegateValue, executor);
+ this.delegateValue = delegateValue;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
+ return Tools.asyncFuture(delegateValue.compareAndSet(expect, update), executor);
+ }
+
+ @Override
+ public CompletableFuture<V> get() {
+ return Tools.asyncFuture(delegateValue.get(), executor);
+ }
+
+ @Override
+ public CompletableFuture<V> getAndSet(V value) {
+ return Tools.asyncFuture(delegateValue.getAndSet(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> set(V value) {
+ return Tools.asyncFuture(delegateValue.set(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
+ AtomicValueEventListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+ listenerMap.put(listener, wrappedListener);
+ return Tools.asyncFuture(delegateValue.addListener(wrappedListener), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
+ AtomicValueEventListener<V> wrappedListener = listenerMap.remove(listener);
+ if (wrappedListener != null) {
+ return Tools.asyncFuture(delegateValue.removeListener(wrappedListener), executor);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
new file mode 100644
index 0000000..07911af
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMap.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * An {@link org.onosproject.store.service.AsyncConsistentMap} that completes asynchronous calls on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentMap<K, V>
+ extends ExecutingDistributedPrimitive implements AsyncConsistentMap<K, V> {
+ private final AsyncConsistentMap<K, V> delegateMap;
+ private final Executor executor;
+
+ public ExecutingAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap, Executor executor) {
+ super(delegateMap, executor);
+ this.delegateMap = delegateMap;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return Tools.asyncFuture(delegateMap.size(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(K key) {
+ return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(V value) {
+ return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(K key) {
+ return Tools.asyncFuture(delegateMap.get(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> getOrDefault(K key, V defaultValue) {
+ return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> computeIf(
+ K key, Predicate<? super V> condition, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> put(K key, V value) {
+ return Tools.asyncFuture(delegateMap.put(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
+ return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> remove(K key) {
+ return Tools.asyncFuture(delegateMap.remove(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return Tools.asyncFuture(delegateMap.clear(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Set<K>> keySet() {
+ return Tools.asyncFuture(delegateMap.keySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<V>>> values() {
+ return Tools.asyncFuture(delegateMap.values(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet() {
+ return Tools.asyncFuture(delegateMap.entrySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
+ return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, V value) {
+ return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, long version) {
+ return Tools.asyncFuture(delegateMap.remove(key, version), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> replace(K key, V value) {
+ return Tools.asyncFuture(delegateMap.replace(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
+ return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
+ return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.begin(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.commit(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.rollback(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
+ return addListener(listener, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
+ return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
+ return Tools.asyncFuture(delegateMap.removeListener(listener), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
new file mode 100644
index 0000000..a2b308a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Multiset;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentMultimap<K, V>
+ extends ExecutingDistributedPrimitive implements AsyncConsistentMultimap<K, V> {
+ private final AsyncConsistentMultimap<K, V> delegateMap;
+ private final Executor executor;
+
+ public ExecutingAsyncConsistentMultimap(AsyncConsistentMultimap<K, V> delegateMap, Executor executor) {
+ super(delegateMap, executor);
+ this.delegateMap = delegateMap;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return Tools.asyncFuture(delegateMap.size(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return Tools.asyncFuture(delegateMap.isEmpty(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(K key) {
+ return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(V value) {
+ return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsEntry(K key, V value) {
+ return Tools.asyncFuture(delegateMap.containsEntry(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> put(K key, V value) {
+ return Tools.asyncFuture(delegateMap.put(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, V value) {
+ return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+ return Tools.asyncFuture(delegateMap.removeAll(key, values), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+ return Tools.asyncFuture(delegateMap.removeAll(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+ return Tools.asyncFuture(delegateMap.putAll(key, values), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+ return Tools.asyncFuture(delegateMap.replaceValues(key, values), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return Tools.asyncFuture(delegateMap.clear(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+ return Tools.asyncFuture(delegateMap.get(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Set<K>> keySet() {
+ return Tools.asyncFuture(delegateMap.keySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Multiset<K>> keys() {
+ return Tools.asyncFuture(delegateMap.keys(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Multiset<V>> values() {
+ return Tools.asyncFuture(delegateMap.values(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
+ return Tools.asyncFuture(delegateMap.entries(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map<K, Collection<V>>> asMap() {
+ return Tools.asyncFuture(delegateMap.asMap(), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
new file mode 100644
index 0000000..1441941
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentTreeMap.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentTreeMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link org.onosproject.store.service.AsyncConsistentTreeMap} that executes asynchronous callbacks on a provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncConsistentTreeMap<V>
+ extends ExecutingDistributedPrimitive implements AsyncConsistentTreeMap<V> {
+ private final AsyncConsistentTreeMap<V> delegateMap;
+ private final Executor executor;
+
+ public ExecutingAsyncConsistentTreeMap(AsyncConsistentTreeMap<V> delegateMap, Executor executor) {
+ super(delegateMap, executor);
+ this.delegateMap = delegateMap;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<String> firstKey() {
+ return Tools.asyncFuture(delegateMap.firstKey(), executor);
+ }
+
+ @Override
+ public CompletableFuture<String> lastKey() {
+ return Tools.asyncFuture(delegateMap.lastKey(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
+ return Tools.asyncFuture(delegateMap.ceilingEntry(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
+ return Tools.asyncFuture(delegateMap.floorEntry(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
+ return Tools.asyncFuture(delegateMap.higherEntry(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
+ return Tools.asyncFuture(delegateMap.lowerEntry(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
+ return Tools.asyncFuture(delegateMap.firstEntry(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return Tools.asyncFuture(delegateMap.size(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
+ return Tools.asyncFuture(delegateMap.lastEntry(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
+ return Tools.asyncFuture(delegateMap.pollFirstEntry(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String key) {
+ return Tools.asyncFuture(delegateMap.containsKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
+ return Tools.asyncFuture(delegateMap.pollLastEntry(), executor);
+ }
+
+ @Override
+ public CompletableFuture<String> lowerKey(String key) {
+ return Tools.asyncFuture(delegateMap.lowerKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(V value) {
+ return Tools.asyncFuture(delegateMap.containsValue(value), executor);
+ }
+
+ @Override
+ public CompletableFuture<String> floorKey(String key) {
+ return Tools.asyncFuture(delegateMap.floorKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<String> ceilingKey(String key) {
+ return Tools.asyncFuture(delegateMap.ceilingKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(String key) {
+ return Tools.asyncFuture(delegateMap.get(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> getOrDefault(String key, V defaultValue) {
+ return Tools.asyncFuture(delegateMap.getOrDefault(key, defaultValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<String> higherKey(String key) {
+ return Tools.asyncFuture(delegateMap.higherKey(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<NavigableSet<String>> navigableKeySet() {
+ return Tools.asyncFuture(delegateMap.navigableKeySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<NavigableMap<String, V>> subMap(
+ String upperKey, String lowerKey, boolean inclusiveUpper, boolean inclusiveLower) {
+ return Tools.asyncFuture(delegateMap.subMap(upperKey, lowerKey, inclusiveUpper, inclusiveLower), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> computeIf(
+ String key, Predicate<? super V> condition,
+ BiFunction<? super String, ? super V, ? extends V> remappingFunction) {
+ return Tools.asyncFuture(delegateMap.computeIf(key, condition, remappingFunction), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> put(String key, V value) {
+ return Tools.asyncFuture(delegateMap.put(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
+ return Tools.asyncFuture(delegateMap.putAndGet(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> remove(String key) {
+ return Tools.asyncFuture(delegateMap.remove(key), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return Tools.asyncFuture(delegateMap.clear(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet() {
+ return Tools.asyncFuture(delegateMap.keySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<V>>> values() {
+ return Tools.asyncFuture(delegateMap.values(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
+ return Tools.asyncFuture(delegateMap.entrySet(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
+ return Tools.asyncFuture(delegateMap.putIfAbsent(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String key, V value) {
+ return Tools.asyncFuture(delegateMap.remove(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String key, long version) {
+ return Tools.asyncFuture(delegateMap.remove(key, version), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> replace(String key, V value) {
+ return Tools.asyncFuture(delegateMap.replace(key, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String key, V oldValue, V newValue) {
+ return Tools.asyncFuture(delegateMap.replace(key, oldValue, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String key, long oldVersion, V newValue) {
+ return Tools.asyncFuture(delegateMap.replace(key, oldVersion, newValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.begin(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
+ return Tools.asyncFuture(delegateMap.prepare(transactionLog), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.commit(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return Tools.asyncFuture(delegateMap.rollback(transactionId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
+ return Tools.asyncFuture(delegateMap.prepareAndCommit(transactionLog), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MapEventListener<String, V> listener) {
+ return addListener(listener, executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MapEventListener<String, V> listener, Executor executor) {
+ return Tools.asyncFuture(delegateMap.addListener(listener, executor), this.executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MapEventListener<String, V> listener) {
+ return Tools.asyncFuture(delegateMap.removeListener(listener), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
new file mode 100644
index 0000000..352ee7a
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncDocumentTree.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTreeListener;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * {@link AsyncDocumentTree} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncDocumentTree<V> extends ExecutingDistributedPrimitive implements AsyncDocumentTree<V> {
+ private final AsyncDocumentTree<V> delegateTree;
+ private final Executor executor;
+ private final Map<DocumentTreeListener<V>, DocumentTreeListener<V>> listenerMap = Maps.newConcurrentMap();
+
+ public ExecutingAsyncDocumentTree(AsyncDocumentTree<V> delegateTree, Executor executor) {
+ super(delegateTree, executor);
+ this.delegateTree = delegateTree;
+ this.executor = executor;
+ }
+
+ @Override
+ public DocumentPath root() {
+ return delegateTree.root();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+ return Tools.asyncFuture(delegateTree.getChildren(path), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+ return Tools.asyncFuture(delegateTree.get(path), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+ return Tools.asyncFuture(delegateTree.set(path, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+ return Tools.asyncFuture(delegateTree.create(path, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+ return Tools.asyncFuture(delegateTree.createRecursive(path, value), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+ return Tools.asyncFuture(delegateTree.replace(path, newValue, version), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+ return Tools.asyncFuture(delegateTree.replace(path, newValue, currentValue), executor);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+ return Tools.asyncFuture(delegateTree.removeNode(path), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+ DocumentTreeListener<V> wrappedListener = e -> executor.execute(() -> listener.event(e));
+ listenerMap.put(listener, wrappedListener);
+ return Tools.asyncFuture(delegateTree.addListener(path, wrappedListener), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+ DocumentTreeListener<V> wrappedListener = listenerMap.remove(listener);
+ if (wrappedListener != null) {
+ return Tools.asyncFuture(delegateTree.removeListener(wrappedListener), executor);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
new file mode 100644
index 0000000..ecc5b8d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncLeaderElector.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.service.AsyncLeaderElector;
+
+/**
+ * {@link AsyncLeaderElector} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingAsyncLeaderElector extends ExecutingDistributedPrimitive implements AsyncLeaderElector {
+ private final AsyncLeaderElector delegateElector;
+ private final Executor executor;
+ private final Map<Consumer<Change<Leadership>>, Consumer<Change<Leadership>>> listenerMap = Maps.newConcurrentMap();
+
+ public ExecutingAsyncLeaderElector(AsyncLeaderElector delegateElector, Executor executor) {
+ super(delegateElector, executor);
+ this.delegateElector = delegateElector;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
+ return Tools.asyncFuture(delegateElector.run(topic, nodeId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> withdraw(String topic) {
+ return Tools.asyncFuture(delegateElector.withdraw(topic), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
+ return Tools.asyncFuture(delegateElector.anoint(topic, nodeId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return Tools.asyncFuture(delegateElector.evict(nodeId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return Tools.asyncFuture(delegateElector.promote(topic, nodeId), executor);
+ }
+
+ @Override
+ public CompletableFuture<Leadership> getLeadership(String topic) {
+ return Tools.asyncFuture(delegateElector.getLeadership(topic), executor);
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Leadership>> getLeaderships() {
+ return Tools.asyncFuture(delegateElector.getLeaderships(), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> listener) {
+ Consumer<Change<Leadership>> wrappedListener = e -> executor.execute(() -> listener.accept(e));
+ listenerMap.put(listener, wrappedListener);
+ return Tools.asyncFuture(delegateElector.addChangeListener(wrappedListener), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> listener) {
+ Consumer<Change<Leadership>> wrappedListener = listenerMap.remove(listener);
+ if (wrappedListener != null) {
+ return Tools.asyncFuture(delegateElector.removeChangeListener(wrappedListener), executor);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
new file mode 100644
index 0000000..021dbe5
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingDistributedPrimitive.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.service.DistributedPrimitive;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Base class for primitives that delegate asynchronous callbacks to a user provided {@link Executor}.
+ */
+public abstract class ExecutingDistributedPrimitive
+ extends DelegatingDistributedPrimitive {
+ private final DistributedPrimitive primitive;
+ private final Executor executor;
+ private final Map<Consumer<Status>, Consumer<Status>> listenerMap = Maps.newConcurrentMap();
+
+ protected ExecutingDistributedPrimitive(DistributedPrimitive primitive, Executor executor) {
+ super(primitive);
+ this.primitive = primitive;
+ this.executor = checkNotNull(executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return Tools.asyncFuture(primitive.destroy(), executor);
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
+ Consumer<DistributedPrimitive.Status> wrappedListener =
+ status -> executor.execute(() -> listener.accept(status));
+ listenerMap.put(listener, wrappedListener);
+ primitive.addStatusChangeListener(wrappedListener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<DistributedPrimitive.Status> listener) {
+ Consumer<DistributedPrimitive.Status> wrappedListener = listenerMap.remove(listener);
+ if (wrappedListener != null) {
+ primitive.removeStatusChangeListener(wrappedListener);
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
new file mode 100644
index 0000000..301f73e
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingWorkQueue.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+import org.onlab.util.Tools;
+import org.onosproject.store.service.AsyncAtomicValue;
+import org.onosproject.store.service.Task;
+import org.onosproject.store.service.WorkQueue;
+import org.onosproject.store.service.WorkQueueStats;
+
+/**
+ * {@link AsyncAtomicValue} that executes asynchronous callbacks on a user provided
+ * {@link Executor}.
+ */
+public class ExecutingWorkQueue<E> extends ExecutingDistributedPrimitive implements WorkQueue<E> {
+ private final WorkQueue<E> delegateQueue;
+ private final Executor executor;
+
+ public ExecutingWorkQueue(WorkQueue<E> delegateQueue, Executor executor) {
+ super(delegateQueue, executor);
+ this.delegateQueue = delegateQueue;
+ this.executor = executor;
+ }
+
+ @Override
+ public CompletableFuture<Void> addMultiple(Collection<E> items) {
+ return Tools.asyncFuture(delegateQueue.addMultiple(items), executor);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Task<E>>> take(int maxItems) {
+ return Tools.asyncFuture(delegateQueue.take(maxItems), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> complete(Collection<String> taskIds) {
+ return Tools.asyncFuture(delegateQueue.complete(taskIds), executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> registerTaskProcessor(
+ Consumer<E> taskProcessor, int parallelism, Executor executor) {
+ return Tools.asyncFuture(
+ delegateQueue.registerTaskProcessor(taskProcessor, parallelism, executor),
+ this.executor);
+ }
+
+ @Override
+ public CompletableFuture<Void> stopProcessing() {
+ return Tools.asyncFuture(delegateQueue.stopProcessing(), executor);
+ }
+
+ @Override
+ public CompletableFuture<WorkQueueStats> stats() {
+ return Tools.asyncFuture(delegateQueue.stats(), executor);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 6dd2322..0358a7c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -39,6 +39,8 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -57,12 +59,13 @@
}
@Override
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
checkNotNull(name);
checkNotNull(serializer);
Map<PartitionId, AsyncConsistentMap<K, V>> maps =
Maps.transformValues(members,
- partition -> partition.newAsyncConsistentMap(name, serializer));
+ partition -> partition.newAsyncConsistentMap(name, serializer, executorSupplier));
Hasher<K> hasher = key -> {
int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -71,44 +74,46 @@
}
@Override
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name,
- Serializer serializer) {
- return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncConsistentTreeMap(name, serializer, executorSupplier);
}
@Override
public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
- String name, Serializer serializer) {
- return getCreator(name).newAsyncConsistentSetMultimap(name,
- serializer);
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncConsistentSetMultimap(name, serializer, executorSupplier);
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
- return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
}
@Override
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
- return getCreator(name).newAsyncAtomicCounterMap(name, serializer);
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncAtomicCounterMap(name, serializer, executorSupplier);
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name) {
- return getCreator(name).newAsyncCounter(name);
+ public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncCounter(name, executorSupplier);
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
- return getCreator(name).newAsyncAtomicValue(name, serializer);
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncAtomicValue(name, serializer, executorSupplier);
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name) {
+ public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
Maps.transformValues(members,
- partition -> partition.newAsyncLeaderElector(name));
+ partition -> partition.newAsyncLeaderElector(name, executorSupplier));
Hasher<String> hasher = topic -> {
int hashCode = Hashing.sha256().hashString(topic, Charsets.UTF_8).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
@@ -117,13 +122,14 @@
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
- return getCreator(name).newWorkQueue(name, serializer);
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newWorkQueue(name, serializer, executorSupplier);
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
- return getCreator(name).newAsyncDocumentTree(name, serializer);
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return getCreator(name).newAsyncDocumentTree(name, serializer, executorSupplier);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 3150363..bacda00 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
@@ -23,6 +24,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -81,6 +84,9 @@
private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+ private final ExecutorService sharedPrimitiveExecutor = Executors.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(),
+ groupedThreads("onos/primitives", "primitive-events", log));
@Activate
public void activate() {
@@ -93,6 +99,7 @@
messagingService,
clusterService,
CatalystSerializers.getSerializer(),
+ sharedPrimitiveExecutor,
new File(System.getProperty("karaf.data") + "/partitions/" + partition.getId()))));
CompletableFuture<Void> openFuture = CompletableFuture.allOf(partitions.values()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index a68b793..e9261c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -24,6 +24,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -47,6 +48,7 @@
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final Serializer serializer;
+ private final Executor sharedExecutor;
private final MessagingService messagingService;
private final ClusterService clusterService;
private final File logFolder;
@@ -63,12 +65,14 @@
MessagingService messagingService,
ClusterService clusterService,
Serializer serializer,
+ Executor sharedExecutor,
File logFolder) {
this.partition = partition;
this.messagingService = messagingService;
this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
this.serializer = serializer;
+ this.sharedExecutor = sharedExecutor;
this.logFolder = logFolder;
}
@@ -156,7 +160,8 @@
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
- new CopycatTransport(partition.getId(), messagingService));
+ new CopycatTransport(partition.getId(), messagingService),
+ sharedExecutor);
return client.open().thenApply(v -> client);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 3e80753..83547ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
-import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Transport;
@@ -31,6 +30,7 @@
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
import org.onlab.util.HexString;
+import org.onlab.util.SerialExecutor;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixAtomicCounterMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
@@ -59,8 +59,10 @@
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.slf4j.LoggerFactory.getLogger;
@@ -74,10 +76,11 @@
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
+ private final Executor sharedExecutor;
private AtomixClient client;
private ResourceClient resourceClient;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
- private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
+ private final com.google.common.base.Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
Serializer.using(KryoNamespaces.BASIC)));
Function<State, Status> mapper = state -> {
@@ -95,10 +98,12 @@
public StoragePartitionClient(StoragePartition partition,
io.atomix.catalyst.serializer.Serializer serializer,
- Transport transport) {
+ Transport transport,
+ Executor sharedExecutor) {
this.partition = partition;
this.serializer = serializer;
this.transport = transport;
+ this.sharedExecutor = sharedExecutor;
}
@Override
@@ -125,14 +130,26 @@
return client != null ? client.close() : CompletableFuture.completedFuture(null);
}
+ /**
+ * Returns the executor provided by the given supplier or a serial executor if the supplier is {@code null}.
+ *
+ * @param executorSupplier the user-provided executor supplier
+ * @return the executor
+ */
+ private Executor defaultExecutor(Supplier<Executor> executorSupplier) {
+ return executorSupplier != null ? executorSupplier.get() : new SerialExecutor(sharedExecutor);
+ }
+
@Override
- public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
Consumer<State> statusListener = state -> {
atomixConsistentMap.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
+
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@Override
@@ -140,17 +157,20 @@
return name;
}
};
- AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.<K, V, String, byte[]>newTranscodingMap(rawMap,
+
+ // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
+ AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
key -> HexString.toHexString(serializer.encode(key)),
string -> serializer.decode(HexString.fromHexString(string)),
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
- return transcodedMap;
+ return new ExecutingAsyncConsistentMap<>(transcodedMap, defaultExecutor(executorSupplier));
}
@Override
- public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
+ public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AtomixConsistentTreeMap atomixConsistentTreeMap =
client.getResource(name, AtomixConsistentTreeMap.class).join();
Consumer<State> statusListener = state -> {
@@ -158,6 +178,7 @@
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
+
AsyncConsistentTreeMap<byte[]> rawMap =
new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
@Override
@@ -165,17 +186,19 @@
return name;
}
};
+
AsyncConsistentTreeMap<V> transcodedMap =
DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
rawMap,
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
- return transcodedMap;
+
+ return new ExecutingAsyncConsistentTreeMap<>(transcodedMap, defaultExecutor(executorSupplier));
}
@Override
public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(
- String name, Serializer serializer) {
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AtomixConsistentSetMultimap atomixConsistentSetMultimap =
client.getResource(name, AtomixConsistentSetMultimap.class)
.join();
@@ -184,6 +207,7 @@
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
+
AsyncConsistentMultimap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMultimap<String, byte[]>(
atomixConsistentSetMultimap) {
@@ -192,6 +216,7 @@
return super.name();
}
};
+
AsyncConsistentMultimap<K, V> transcodedMap =
DistributedPrimitives.newTranscodingMultimap(
rawMap,
@@ -199,53 +224,64 @@
string -> serializer.decode(HexString.fromHexString(string)),
value -> serializer.encode(value),
bytes -> serializer.decode(bytes));
- return transcodedMap;
+ return new ExecutingAsyncConsistentMultimap<>(transcodedMap, defaultExecutor(executorSupplier));
}
@Override
- public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
- return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
+ public <E> AsyncDistributedSet<E> newAsyncDistributedSet(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer, executorSupplier));
}
@Override
- public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
+ public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AtomixAtomicCounterMap atomixAtomicCounterMap =
client.getResource(name, AtomixAtomicCounterMap.class)
.join();
+
AsyncAtomicCounterMap<K> transcodedMap =
DistributedPrimitives.<K, String>newTranscodingAtomicCounterMap(
atomixAtomicCounterMap,
key -> HexString.toHexString(serializer.encode(key)),
string -> serializer.decode(HexString.fromHexString(string)));
- return transcodedMap;
+
+ return new ExecutingAsyncAtomicCounterMap<>(transcodedMap, defaultExecutor(executorSupplier));
}
@Override
- public AsyncAtomicCounter newAsyncCounter(String name) {
+ public AsyncAtomicCounter newAsyncCounter(String name, Supplier<Executor> executorSupplier) {
DistributedLong distributedLong = client.getLong(name).join();
- return new AtomixCounter(name, distributedLong);
+ AsyncAtomicCounter asyncCounter = new AtomixCounter(name, distributedLong);
+ return new ExecutingAsyncAtomicCounter(asyncCounter, defaultExecutor(executorSupplier));
}
@Override
- public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
- return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
+ public <V> AsyncAtomicValue<V> newAsyncAtomicValue(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ AsyncAtomicValue<V> asyncValue = new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
+ return new ExecutingAsyncAtomicValue<>(asyncValue, defaultExecutor(executorSupplier));
}
@Override
- public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
- AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
- return new DefaultDistributedWorkQueue<>(workQueue, serializer);
+ public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer, Supplier<Executor> executorSupplier) {
+ AtomixWorkQueue atomixWorkQueue = client.getResource(name, AtomixWorkQueue.class).join();
+ WorkQueue<E> workQueue = new DefaultDistributedWorkQueue<>(atomixWorkQueue, serializer);
+ return new ExecutingWorkQueue<>(workQueue, defaultExecutor(executorSupplier));
}
@Override
- public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(
+ String name, Serializer serializer, Supplier<Executor> executorSupplier) {
AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
- return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
+ AsyncDocumentTree<V> asyncDocumentTree = new DefaultDistributedDocumentTree<>(
+ name, atomixDocumentTree, serializer);
+ return new ExecutingAsyncDocumentTree<>(asyncDocumentTree, defaultExecutor(executorSupplier));
}
@Override
- public AsyncLeaderElector newAsyncLeaderElector(String name) {
+ public AsyncLeaderElector newAsyncLeaderElector(String name, Supplier<Executor> executorSupplier) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
.join();
@@ -254,7 +290,7 @@
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
- return leaderElector;
+ return new ExecutingAsyncLeaderElector(leaderElector, defaultExecutor(executorSupplier));
}
@Override
diff --git a/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java
new file mode 100644
index 0000000..936a33f
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/BestEffortSerialExecutor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor that executes tasks in serial on a shared thread pool, falling back to parallel execution when threads
+ * are blocked.
+ * <p>
+ * This executor attempts to execute tasks in serial as if they occur on a single thread. However, in the event tasks
+ * are blocking a thread (a thread is in the {@link Thread.State#WAITING} or {@link Thread.State#TIMED_WAITING} state)
+ * the executor will execute tasks on parallel on the underlying {@link Executor}. This is useful for ensuring blocked
+ * threads cannot block events, but mimics a single-threaded model otherwise.
+ */
+public class BestEffortSerialExecutor implements Executor {
+ private final Executor parent;
+ private final LinkedList<Runnable> tasks = new LinkedList<>();
+ private volatile Thread thread;
+
+ public BestEffortSerialExecutor(Executor parent) {
+ this.parent = parent;
+ }
+
+ private void run() {
+ synchronized (tasks) {
+ thread = Thread.currentThread();
+ }
+ for (;;) {
+ if (!runTask()) {
+ synchronized (tasks) {
+ thread = null;
+ }
+ return;
+ }
+ }
+ }
+
+ private boolean runTask() {
+ final Runnable task;
+ synchronized (tasks) {
+ task = tasks.poll();
+ if (task == null) {
+ return false;
+ }
+ }
+ task.run();
+ return true;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ synchronized (tasks) {
+ tasks.add(command);
+ if (thread == null) {
+ parent.execute(this::run);
+ } else if (thread.getState() == Thread.State.WAITING || thread.getState() == Thread.State.TIMED_WAITING) {
+ parent.execute(this::runTask);
+ }
+ }
+ }
+}
diff --git a/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java b/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java
new file mode 100644
index 0000000..9e54ac2
--- /dev/null
+++ b/utils/misc/src/main/java/org/onlab/util/SerialExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+
+/**
+ * Executor that executes tasks in serial on a shared thread pool.
+ * <p>
+ * The serial executor behaves semantically like a single-threaded executor, but multiplexes tasks on a shared thread
+ * pool, ensuring blocked threads in the shared thread pool don't block individual serial executors.
+ */
+public class SerialExecutor implements Executor {
+ private final Executor parent;
+ private final LinkedList<Runnable> tasks = new LinkedList<>();
+ private boolean running;
+
+ public SerialExecutor(Executor parent) {
+ this.parent = parent;
+ }
+
+ private void run() {
+ for (;;) {
+ final Runnable task;
+ synchronized (tasks) {
+ task = tasks.poll();
+ if (task == null) {
+ running = false;
+ return;
+ }
+ }
+ task.run();
+ }
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ synchronized (tasks) {
+ tasks.add(command);
+ if (!running) {
+ running = true;
+ parent.execute(this::run);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index cc769c4..883a7c8 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -39,6 +39,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -642,6 +643,32 @@
}
/**
+ * Returns a future that's completed using the given {@link Executor} once the given {@code future} is completed.
+ * <p>
+ * {@link CompletableFuture}'s async methods cannot be relied upon to complete futures on an executor thread. If a
+ * future is completed synchronously, {@code CompletableFuture} async methods will often complete the future on the
+ * current thread, ignoring the provided {@code Executor}. This method ensures a more reliable and consistent thread
+ * model by ensuring that futures are always completed using the provided {@code Executor}.
+ *
+ * @param future the future to convert into an asynchronous future
+ * @param executor the executor with which to complete the returned future
+ * @param <T> future value type
+ * @return a new completable future to be completed using the provided {@code executor} once the provided
+ * {@code future} is complete
+ */
+ public static <T> CompletableFuture<T> asyncFuture(CompletableFuture<T> future, Executor executor) {
+ CompletableFuture<T> newFuture = new CompletableFuture<T>();
+ future.whenComplete((result, error) -> executor.execute(() -> {
+ if (future.isCompletedExceptionally()) {
+ newFuture.completeExceptionally(error);
+ } else {
+ newFuture.complete(result);
+ }
+ }));
+ return newFuture;
+ }
+
+ /**
* Returns a new CompletableFuture completed with a list of computed values
* when all of the given CompletableFuture complete.
*
diff --git a/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java
new file mode 100644
index 0000000..39a1f87
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/BestEffortSerialExecutorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Best effort serial executor test.
+ */
+public class BestEffortSerialExecutorTest {
+
+ @Test
+ public void testSerialExecution() throws Throwable {
+ Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor());
+ CountDownLatch latch = new CountDownLatch(2);
+ executor.execute(latch::countDown);
+ executor.execute(latch::countDown);
+ latch.await();
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testBlockedExecution() throws Throwable {
+ Executor executor = new BestEffortSerialExecutor(SharedExecutors.getPoolThreadExecutor());
+ CountDownLatch latch = new CountDownLatch(3);
+ executor.execute(() -> {
+ try {
+ Thread.sleep(2000);
+ latch.countDown();
+ } catch (InterruptedException e) {
+ }
+ });
+ Thread.sleep(10);
+ executor.execute(() -> {
+ try {
+ new CompletableFuture<>().get(2, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ latch.countDown();
+ }
+ });
+ Thread.sleep(10);
+ executor.execute(latch::countDown);
+ latch.await(1, TimeUnit.SECONDS);
+ assertEquals(2, latch.getCount());
+ latch.await(3, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
+
+}
diff --git a/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java b/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java
new file mode 100644
index 0000000..140ce1f
--- /dev/null
+++ b/utils/misc/src/test/java/org/onlab/util/SerialExecutorTest.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Serial executor test.
+ */
+public class SerialExecutorTest {
+
+ @Test
+ public void testSerialExecution() throws Throwable {
+ Executor executor = new SerialExecutor(SharedExecutors.getPoolThreadExecutor());
+ CountDownLatch latch = new CountDownLatch(2);
+ executor.execute(latch::countDown);
+ executor.execute(latch::countDown);
+ latch.await();
+ assertEquals(0, latch.getCount());
+ }
+
+}