Implement listeners for ConsistentMultimap.

Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index e8142a4..c7c9f5d 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -21,6 +21,7 @@
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Synchronous;
 import org.onosproject.store.service.Versioned;
 
@@ -29,6 +30,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -144,6 +146,16 @@
         //FIXME implement this when a new version of ConsistentMapBackedJavaMap is made for multimaps
     }
 
+    @Override
+    public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        complete(asyncMultimap.addListener(listener, executor));
+    }
+
+    @Override
+    public void removeListener(MultimapEventListener<K, V> listener) {
+        complete(asyncMultimap.removeListener(listener));
+    }
+
     private <T> T complete(CompletableFuture<T> future) {
         try {
             return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 36d6d25..6a44e06 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -17,12 +17,14 @@
 package org.onosproject.store.service;
 
 import com.google.common.collect.Multiset;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.onosproject.store.primitives.DefaultConsistentMultimap;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Interface for a distributed multimap.
@@ -222,6 +224,34 @@
     CompletableFuture<Collection<Map.Entry<K, V>>> entries();
 
     /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     * @return future that will be completed when the operation finishes
+     */
+    default CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener) {
+        return addListener(listener, MoreExecutors.directExecutor());
+    }
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     * @param executor executor to use for handling incoming map events
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     * @return future that will be completed when the operation finishes
+     */
+    CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener);
+
+    /**
      * Returns a map of keys to collections of values that reflect the set of
      * key-value pairs contained in the multimap, where the key value pairs
      * would be the key paired with each of the values in the collection.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 8858369..bba38bb 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -17,10 +17,12 @@
 package org.onosproject.store.service;
 
 import com.google.common.collect.Multiset;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 
 /**
  * This provides a synchronous version of the functionality provided by
@@ -203,4 +205,29 @@
      * empty.
      */
     Map<K, Collection<V>> asMap();
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     */
+    default void addListener(MultimapEventListener<K, V> listener) {
+        addListener(listener, MoreExecutors.directExecutor());
+    }
+
+    /**
+     * Registers the specified listener to be notified whenever the map is updated.
+     *
+     * @param listener listener to notify about map events
+     * @param executor executor to use for handling incoming map events
+     */
+    void addListener(MultimapEventListener<K, V> listener, Executor executor);
+
+    /**
+     * Unregisters the specified listener such that it will no longer
+     * receive map change notifications.
+     *
+     * @param listener listener to unregister
+     */
+    void removeListener(MultimapEventListener<K, V> listener);
 }
diff --git a/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java b/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java
new file mode 100644
index 0000000..ed56974
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MultimapEvent.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * Representation of a ConsistentMultimap update notification.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class MultimapEvent<K, V> {
+
+    /**
+     * MultimapEvent type.
+     */
+    public enum Type {
+        /**
+         * Entry inserted into the map.
+         */
+        INSERT,
+
+        /**
+         * Entry removed from map.
+         */
+        REMOVE
+    }
+
+    private final String name;
+    private final Type type;
+    private final K key;
+    private final V newValue;
+    private final V oldValue;
+
+    /**
+     * Creates a new event object.
+     *
+     * @param name map name
+     * @param key key the event concerns
+     * @param newValue new value key is mapped to
+     * @param oldValue previous value that was mapped to the key
+     */
+    public MultimapEvent(String name, K key, V newValue, V oldValue) {
+        this.name = name;
+        this.key = key;
+        this.newValue = newValue;
+        this.oldValue = oldValue;
+        this.type = newValue != null ? Type.INSERT : Type.REMOVE;
+    }
+
+    /**
+     * Returns the map name.
+     *
+     * @return name of map
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Returns the type of the event.
+     *
+     * @return the type of event
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the key this event concerns.
+     *
+     * @return the key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value in the map associated with the key.
+     * If {@link #type()} returns {@code REMOVE},
+     * this method will return {@code null}.
+     *
+     * @return the new value for key
+     */
+    public V newValue() {
+        return newValue;
+    }
+
+    /**
+     * Returns the old value that was associated with the key.
+     * If {@link #type()} returns {@code INSERT}, this method will return
+     * {@code null}.
+     *
+     * @return the old value that was mapped to the key
+     */
+    public V oldValue() {
+        return oldValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MultimapEvent)) {
+            return false;
+        }
+
+        MultimapEvent<K, V> that = (MultimapEvent) o;
+        return Objects.equals(this.name, that.name) &&
+                Objects.equals(this.type, that.type) &&
+                Objects.equals(this.key, that.key) &&
+                Objects.equals(this.newValue, that.newValue) &&
+                Objects.equals(this.oldValue, that.oldValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, key, newValue, oldValue);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("name", name)
+                .add("type", type)
+                .add("key", key)
+                .add("newValue", newValue)
+                .add("oldValue", oldValue)
+                .toString();
+    }
+}
diff --git a/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java b/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java
new file mode 100644
index 0000000..2445af9
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/MultimapEventListener.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Listener to be notified about updates to a ConsistentMultimap.
+ */
+public interface MultimapEventListener<K, V> {
+
+    /**
+     * Reacts to the specified event.
+     *
+     * @param event the event
+     */
+    void event(MultimapEvent<K, V> event);
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 6bf6602..2e46d92 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -134,6 +135,14 @@
     }
 
     @Override
+    public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+    }
+
+    @Override
+    public void removeListener(MultimapEventListener<K, V> listener) {
+    }
+
+    @Override
     public String name() {
         return this.name;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
         serializer.register(TransactionLog.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
+        serializer.register(MultimapEvent.class, factory);
+        serializer.register(MultimapEvent.Type.class, factory);
         serializer.register(Task.class, factory);
         serializer.register(WorkQueueStats.class, factory);
         serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
 
 /**
  * {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return delegateMap.addListener(listener, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return delegateMap.removeListener(listener);
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return delegateMap.asMap();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
 /**
  * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
  * {@link Executor}.
@@ -125,6 +126,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return asyncFuture(delegateMap.addListener(listener, executor));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return asyncFuture(delegateMap.removeListener(listener));
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return asyncFuture(delegateMap.asMap());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -28,18 +39,6 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
 /**
  * An {@code AsyncConsistentMap} that maps its operations to operations on a
  * differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
 
     @Override
     public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
-        InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
-        if (backingMapListener != null) {
-            return backingMap.removeListener(backingMapListener);
-        } else {
-            return CompletableFuture.completedFuture(null);
+        synchronized (listeners) {
+            InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Multiset;
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -28,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
@@ -60,6 +63,8 @@
             Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
     private final Function<Collection<? extends V1>, Collection<V2>>
             valueCollectionEncode;
+    private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+            Maps.newIdentityHashMap();
 
      public TranscodingAsyncConsistentMultimap(
             AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener =
+                    listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+            return backingMap.addListener(backingMapListener, executor);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    @Override
     public void addStatusChangeListener(Consumer<Status> listener) {
         backingMap.addStatusChangeListener(listener);
     }
@@ -290,4 +316,21 @@
             return EnumSet.of(Characteristics.UNORDERED);
         }
     }
+
+    private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+        private final MultimapEventListener<K1, V1> listener;
+
+        InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(MultimapEvent<K2, V2> event) {
+            listener.event(new MultimapEvent(event.name(),
+                    keyDecoder.apply(event.key()),
+                    valueDecoder.apply(event.newValue()),
+                    valueDecoder.apply(event.oldValue())));
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
index d765466..b8e5d4c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
@@ -563,6 +563,44 @@
     }
 
     /**
+     * Change listen.
+     */
+    @SuppressWarnings("serial")
+    public static class Listen implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.QUORUM;
+        }
+    }
+
+    /**
+     * Change unlisten.
+     */
+    @SuppressWarnings("serial")
+    public static class Unlisten implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.TOMBSTONE;
+        }
+    }
+
+    /**
      * Multimap command type resolver.
      */
     @SuppressWarnings("serial")
@@ -584,6 +622,8 @@
             registry.register(Put.class, -1012);
             registry.register(RemoveAll.class, -1013);
             registry.register(MultiRemove.class, -1014);
+            registry.register(Listen.class, -1015);
+            registry.register(Unlisten.class, -1016);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index e446a67..22a0345 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -22,14 +22,19 @@
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
@@ -40,13 +45,16 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
 
+
 /**
  * Set based implementation of the {@link AsyncConsistentMultimap}.
  * <p>
@@ -57,6 +65,10 @@
         extends AbstractResource<AtomixConsistentSetMultimap>
         implements AsyncConsistentMultimap<String, byte[]> {
 
+    private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
+
+    public static final String CHANGE_SUBJECT = "multimapChangeEvents";
+
     public AtomixConsistentSetMultimap(CopycatClient client,
                                        Properties properties) {
         super(client, properties);
@@ -64,8 +76,20 @@
 
     @Override
     public CompletableFuture<AtomixConsistentSetMultimap> open() {
-        return super.open();
-        //TODO
+        return super.open().thenApply(result -> {
+            client.onStateChange(state -> {
+                if (state == CopycatClient.State.CONNECTED && isListening()) {
+                    client.submit(new Listen());
+                }
+            });
+            client.onEvent(CHANGE_SUBJECT, this::handleEvent);
+            return result;
+        });
+    }
+
+    private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+        events.forEach(event ->
+                mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
     @Override
@@ -158,6 +182,24 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
+        if (mapEventListeners.isEmpty()) {
+            return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
+        } else {
+            mapEventListeners.put(listener, executor);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<String, byte[]> listener) {
+        if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
+            return client.submit(new Unlisten()).thenApply(v -> null);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
         throw new UnsupportedOperationException("Expensive operation.");
     }
@@ -178,4 +220,8 @@
                                                       "in progress");
         }
     }
+
+    private boolean isListening() {
+        return !mapEventListeners.isEmpty();
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.session.SessionListener;
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
 import org.onlab.util.CountDownCompleter;
 import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
@@ -40,6 +42,8 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -61,12 +65,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -78,7 +84,7 @@
 
     private final Logger log = getLogger(getClass());
     private final AtomicLong globalVersion = new AtomicLong(1);
-    //TODO Add listener map here
+    private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
 
     public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
         executor.register(MultiRemove.class, this::multiRemove);
         executor.register(Put.class, this::put);
         executor.register(Replace.class, this::replace);
+        executor.register(Listen.class, this::listen);
+        executor.register(Unlisten.class, this::unlisten);
     }
 
     /**
@@ -313,12 +321,20 @@
      */
     protected Versioned<Collection<? extends byte[]>> removeAll(
             Commit<? extends RemoveAll> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return new Versioned<>(Sets.newHashSet(), -1);
-        } else {
-            return backingMap.get(commit.operation().key()).addCommit(commit);
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues =
+                backingMap.get(key).addCommit(commit);
+        publish(removedValues.value().stream()
+                .map(value -> new MultimapEvent<String, byte[]>(
+                        "", key, null, value))
+                .collect(Collectors.toList()));
+        return removedValues;
     }
 
     /**
@@ -328,14 +344,26 @@
      * @return true if any change results, else false
      */
     protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return false;
-        } else {
-            return (backingMap
-                    .get(commit.operation().key())
-                    .addCommit(commit)) != null;
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (removedValues != null) {
+            publish(removedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, null, value))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     /**
@@ -345,16 +373,27 @@
      * @return true if this commit results in a change, else false
      */
     protected boolean put(Commit<? extends Put> commit) {
+        String key = commit.operation().key();
         if (commit.operation().values().isEmpty()) {
             return false;
         }
-        if (!backingMap.containsKey(commit.operation().key())) {
-            backingMap.put(commit.operation().key(),
-                           new NonTransactionalCommit(1));
+        if (!backingMap.containsKey(key)) {
+            backingMap.put(key, new NonTransactionalCommit(1));
         }
-        return backingMap
-                .get(commit.operation().key())
-                .addCommit(commit) != null;
+
+        Versioned<Collection<? extends byte[]>> addedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (addedValues != null) {
+            publish(addedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, value, null))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
         return backingMap.get(commit.operation().key()).addCommit(commit);
     }
 
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit listen commit
+     */
+    protected void listen(Commit<? extends Listen> commit) {
+        Long sessionId = commit.session().id();
+        if (listeners.putIfAbsent(sessionId, commit) != null) {
+            commit.close();
+            return;
+        }
+        commit.session()
+                .onStateChange(
+                        state -> {
+                            if (state == ServerSession.State.CLOSED
+                                    || state == ServerSession.State.EXPIRED) {
+                                Commit<? extends Listen> listener = listeners.remove(sessionId);
+                                if (listener != null) {
+                                    listener.close();
+                                }
+                            }
+                        });
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit unlisten commit
+     */
+    protected void unlisten(Commit<? extends Unlisten> commit) {
+        try {
+            Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Publishes events to listeners.
+     *
+     * @param events list of map event to publish
+     */
+    private void publish(List<MultimapEvent<String, byte[]>> events) {
+        listeners.values().forEach(commit ->
+                commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+    }
+
     private interface MapEntryValue {
 
         /**
diff --git a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
index 7d1a9d1..2cfd0c4 100644
--- a/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onosproject/store/serializers/KryoNamespaces.java
@@ -20,7 +20,6 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-
 import org.onlab.packet.ChassisId;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ip4Address;
@@ -214,6 +213,7 @@
 import org.onosproject.store.primitives.TransactionId;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.SetEvent;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
@@ -528,6 +528,8 @@
             .register(Versioned.class)
             .register(MapEvent.class)
             .register(MapEvent.Type.class)
+            .register(MultimapEvent.class)
+            .register(MultimapEvent.Type.class)
             .register(SetEvent.class)
             .register(SetEvent.Type.class)
             .register(GroupId.class)