Implement listeners for ConsistentMultimap.

Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
         serializer.register(TransactionLog.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
+        serializer.register(MultimapEvent.class, factory);
+        serializer.register(MultimapEvent.Type.class, factory);
         serializer.register(Task.class, factory);
         serializer.register(WorkQueueStats.class, factory);
         serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
 
 /**
  * {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return delegateMap.addListener(listener, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return delegateMap.removeListener(listener);
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return delegateMap.asMap();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
 /**
  * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
  * {@link Executor}.
@@ -125,6 +126,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return asyncFuture(delegateMap.addListener(listener, executor));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return asyncFuture(delegateMap.removeListener(listener));
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return asyncFuture(delegateMap.asMap());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -28,18 +39,6 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
 /**
  * An {@code AsyncConsistentMap} that maps its operations to operations on a
  * differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
 
     @Override
     public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
-        InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
-        if (backingMapListener != null) {
-            return backingMap.removeListener(backingMapListener);
-        } else {
-            return CompletableFuture.completedFuture(null);
+        synchronized (listeners) {
+            InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Multiset;
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -28,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
@@ -60,6 +63,8 @@
             Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
     private final Function<Collection<? extends V1>, Collection<V2>>
             valueCollectionEncode;
+    private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+            Maps.newIdentityHashMap();
 
      public TranscodingAsyncConsistentMultimap(
             AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener =
+                    listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+            return backingMap.addListener(backingMapListener, executor);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    @Override
     public void addStatusChangeListener(Consumer<Status> listener) {
         backingMap.addStatusChangeListener(listener);
     }
@@ -290,4 +316,21 @@
             return EnumSet.of(Characteristics.UNORDERED);
         }
     }
+
+    private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+        private final MultimapEventListener<K1, V1> listener;
+
+        InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(MultimapEvent<K2, V2> event) {
+            listener.event(new MultimapEvent(event.name(),
+                    keyDecoder.apply(event.key()),
+                    valueDecoder.apply(event.newValue()),
+                    valueDecoder.apply(event.oldValue())));
+        }
+    }
 }