New API for specifying an executor when registering a map listener
Change-Id: I1fc92e0a3da576d88d5ece4a666af8ad1c1fb9d8
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 5360e50..ec03cf3 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
@@ -178,8 +179,8 @@
}
@Override
- public void addListener(MapEventListener<K, V> listener) {
- complete(asyncMap.addListener(listener));
+ public void addListener(MapEventListener<K, V> listener, Executor executor) {
+ complete(asyncMap.addListener(listener, executor));
}
@Override
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
index 745202c..46c458b 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentTreeMap.java
@@ -17,6 +17,7 @@
package org.onosproject.store.primitives;
import com.google.common.base.Throwables;
+
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
@@ -30,6 +31,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
@@ -258,8 +260,8 @@
}
@Override
- public void addListener(MapEventListener<K, V> listener) {
- complete(treeMap.addListener(listener));
+ public void addListener(MapEventListener<K, V> listener, Executor executor) {
+ complete(treeMap.addListener(listener, executor));
}
@Override
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
index 3ce1f07..9a432e0 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMap.java
@@ -21,6 +21,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -28,6 +29,8 @@
import org.onosproject.store.primitives.DefaultConsistentMap;
import org.onosproject.store.primitives.TransactionId;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* A distributed, strongly consistent map whose methods are all executed asynchronously.
* <p>
@@ -308,7 +311,18 @@
* @param listener listener to notify about map events
* @return future that will be completed when the operation finishes
*/
- CompletableFuture<Void> addListener(MapEventListener<K, V> listener);
+ default CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
+ return addListener(listener, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ * @param executor executor to use for handling incoming map events
+ * @return future that will be completed when the operation finishes
+ */
+ CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor);
/**
* Unregisters the specified listener such that it will no longer
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 39119b7..f5e8c13 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -20,10 +20,13 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
+import com.google.common.util.concurrent.MoreExecutors;
+
/**
* {@code ConsistentMap} provides the same functionality as {@link AsyncConsistentMap} with
* the only difference that all its methods block until the corresponding operation completes.
@@ -270,7 +273,17 @@
*
* @param listener listener to notify about map events
*/
- void addListener(MapEventListener<K, V> listener);
+ default void addListener(MapEventListener<K, V> listener) {
+ addListener(listener, MoreExecutors.directExecutor());
+ }
+
+ /**
+ * Registers the specified listener to be notified whenever the map is updated.
+ *
+ * @param listener listener to notify about map events
+ * @param executor executor to use for handling incoming map events
+ */
+ void addListener(MapEventListener<K, V> listener, Executor executor);
/**
* Unregisters the specified listener such that it will no longer
diff --git a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
index d810e15..e6e71c2 100644
--- a/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/ConsistentMapAdapter.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -149,7 +150,7 @@
}
@Override
- public void addListener(MapEventListener<K, V> listener) {
+ public void addListener(MapEventListener<K, V> listener, Executor executor) {
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index f2a86d1..ce725e24 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -33,6 +34,7 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+
import com.google.common.base.MoreObjects;
/**
@@ -153,8 +155,8 @@
}
@Override
- public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
- return delegateMap.addListener(listener);
+ public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
+ return delegateMap.addListener(listener, executor);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 9535134..2a6b2c7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -39,6 +40,7 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -190,9 +192,9 @@
}
@Override
- public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
+ public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMaps().stream()
- .map(map -> map.addListener(listener))
+ .map(map -> map.addListener(listener, executor))
.toArray(CompletableFuture[]::new));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 0178730..f3938ce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -34,6 +35,7 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.Maps;
/**
@@ -235,11 +237,11 @@
}
@Override
- public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener) {
+ public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
synchronized (listeners) {
InternalBackingMapEventListener backingMapListener =
listeners.computeIfAbsent(listener, k -> new InternalBackingMapEventListener(listener));
- return backingMap.addListener(backingMapListener);
+ return backingMap.addListener(backingMapListener, executor);
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 2cb4a50..9bad652 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -22,10 +22,12 @@
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -54,7 +56,9 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -65,7 +69,7 @@
implements AsyncConsistentMap<String, byte[]> {
private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
- private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+ private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
public static final String CHANGE_SUBJECT = "changeEvents";
@@ -87,7 +91,8 @@
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
- events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
+ events.forEach(event ->
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -250,18 +255,19 @@
}
@Override
- public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+ public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
+ Executor executor) {
if (mapEventListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
+ return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
- mapEventListeners.add(listener);
+ mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
}
}
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
- if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+ if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);