New API for specifying an executor when registering a map listener

Change-Id: I1fc92e0a3da576d88d5ece4a666af8ad1c1fb9d8
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index f2a86d1..ce725e24 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -23,6 +23,7 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -33,6 +34,7 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
+
 import com.google.common.base.MoreObjects;
 
 /**
@@ -153,8 +155,8 @@
     }
 
     @Override
-    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
-        return delegateMap.addListener(listener);
+    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
+        return delegateMap.addListener(listener, executor);
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 9535134..2a6b2c7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
@@ -39,6 +40,7 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -190,9 +192,9 @@
     }
 
     @Override
-    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
+    public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
         return CompletableFuture.allOf(getMaps().stream()
-                                                .map(map -> map.addListener(listener))
+                                                .map(map -> map.addListener(listener, executor))
                                                 .toArray(CompletableFuture[]::new));
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 0178730..f3938ce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -21,6 +21,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -34,6 +35,7 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
+
 import com.google.common.collect.Maps;
 
 /**
@@ -235,11 +237,11 @@
     }
 
     @Override
-    public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener) {
+    public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
         synchronized (listeners) {
             InternalBackingMapEventListener backingMapListener =
                     listeners.computeIfAbsent(listener, k -> new InternalBackingMapEventListener(listener));
-            return backingMap.addListener(backingMapListener);
+            return backingMap.addListener(backingMapListener, executor);
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 2cb4a50..9bad652 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -22,10 +22,12 @@
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -54,7 +56,9 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
+
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -65,7 +69,7 @@
     implements AsyncConsistentMap<String, byte[]> {
 
     private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
-    private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
+    private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
 
     public static final String CHANGE_SUBJECT = "changeEvents";
 
@@ -87,7 +91,8 @@
     }
 
     private void handleEvent(List<MapEvent<String, byte[]>> events) {
-        events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
+        events.forEach(event ->
+            mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
     @Override
@@ -250,18 +255,19 @@
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
+    public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
+                                                            Executor executor) {
         if (mapEventListeners.isEmpty()) {
-            return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
+            return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
         } else {
-            mapEventListeners.add(listener);
+            mapEventListeners.put(listener, executor);
             return CompletableFuture.completedFuture(null);
         }
     }
 
     @Override
     public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
-        if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
+        if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
             return submit(new Unlisten()).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);