New API for specifying an executor when registering a map listener
Change-Id: I1fc92e0a3da576d88d5ece4a666af8ad1c1fb9d8
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 2cb4a50..9bad652 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -22,10 +22,12 @@
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -54,7 +56,9 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
@@ -65,7 +69,7 @@
implements AsyncConsistentMap<String, byte[]> {
private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
- private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+ private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
public static final String CHANGE_SUBJECT = "changeEvents";
@@ -87,7 +91,8 @@
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
- events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
+ events.forEach(event ->
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
@@ -250,18 +255,19 @@
}
@Override
- public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+ public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
+ Executor executor) {
if (mapEventListeners.isEmpty()) {
- return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
+ return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
- mapEventListeners.add(listener);
+ mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
}
}
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
- if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+ if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);