Implement listeners for ConsistentMultimap.

Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index d676ab1..2c4ee73 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -51,6 +51,7 @@
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Task;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.WorkQueueStats;
@@ -100,6 +101,8 @@
         serializer.register(TransactionLog.class, factory);
         serializer.register(Versioned.class, factory);
         serializer.register(MapEvent.class, factory);
+        serializer.register(MultimapEvent.class, factory);
+        serializer.register(MultimapEvent.Type.class, factory);
         serializer.register(Task.class, factory);
         serializer.register(WorkQueueStats.class, factory);
         serializer.register(DocumentPath.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 5bd37c5..3db4dce 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -16,14 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
+import java.util.concurrent.Executor;
 
 /**
  * {@code AsyncConsistentMultimap} that merely delegates control to
@@ -133,6 +136,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return delegateMap.addListener(listener, executor);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return delegateMap.removeListener(listener);
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return delegateMap.asMap();
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
index 6ec0a69..2045586 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ExecutingAsyncConsistentMultimap.java
@@ -15,16 +15,17 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Multiset;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
-import com.google.common.collect.Multiset;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-
 /**
  * {@link org.onosproject.store.service.AsyncConsistentMultimap} that executes asynchronous callbacks on a provided
  * {@link Executor}.
@@ -125,6 +126,16 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return asyncFuture(delegateMap.addListener(listener, executor));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return asyncFuture(delegateMap.removeListener(listener));
+    }
+
+    @Override
     public CompletableFuture<Map<K, Collection<V>>> asMap() {
         return asyncFuture(delegateMap.asMap());
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index c2f167a..52cf210 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -16,6 +16,17 @@
 
 package org.onosproject.store.primitives.impl;
 
+import com.google.common.collect.Maps;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.Versioned;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -28,18 +39,6 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
-import org.onlab.util.Tools;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.TransactionLog;
-import org.onosproject.store.service.Version;
-import org.onosproject.store.service.Versioned;
-
-import com.google.common.collect.Maps;
-
 /**
  * An {@code AsyncConsistentMap} that maps its operations to operations on a
  * differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs.
@@ -259,11 +258,13 @@
 
     @Override
     public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) {
-        InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
-        if (backingMapListener != null) {
-            return backingMap.removeListener(backingMapListener);
-        } else {
-            return CompletableFuture.completedFuture(null);
+        synchronized (listeners) {
+            InternalBackingMapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
         }
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 666ac1f..9d699d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Multiset;
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
@@ -28,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.function.BiConsumer;
 import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
@@ -60,6 +63,8 @@
             Versioned<Collection<? extends V1>>> versionedValueCollectionDecode;
     private final Function<Collection<? extends V1>, Collection<V2>>
             valueCollectionEncode;
+    private final Map<MultimapEventListener<K1, V1>, InternalBackingMultimapEventListener> listeners =
+            Maps.newIdentityHashMap();
 
      public TranscodingAsyncConsistentMultimap(
             AsyncConsistentMultimap<K2, V2> backingMap,
@@ -244,6 +249,27 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K1, V1> listener, Executor executor) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener =
+                    listeners.computeIfAbsent(listener, k -> new InternalBackingMultimapEventListener(listener));
+            return backingMap.addListener(backingMapListener, executor);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K1, V1> listener) {
+        synchronized (listeners) {
+            InternalBackingMultimapEventListener backingMapListener = listeners.remove(listener);
+            if (backingMapListener != null) {
+                return backingMap.removeListener(backingMapListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    @Override
     public void addStatusChangeListener(Consumer<Status> listener) {
         backingMap.addStatusChangeListener(listener);
     }
@@ -290,4 +316,21 @@
             return EnumSet.of(Characteristics.UNORDERED);
         }
     }
+
+    private class InternalBackingMultimapEventListener implements MultimapEventListener<K2, V2> {
+
+        private final MultimapEventListener<K1, V1> listener;
+
+        InternalBackingMultimapEventListener(MultimapEventListener<K1, V1> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(MultimapEvent<K2, V2> event) {
+            listener.event(new MultimapEvent(event.name(),
+                    keyDecoder.apply(event.key()),
+                    valueDecoder.apply(event.newValue()),
+                    valueDecoder.apply(event.oldValue())));
+        }
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
index d765466..b8e5d4c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMultimapCommands.java
@@ -563,6 +563,44 @@
     }
 
     /**
+     * Change listen.
+     */
+    @SuppressWarnings("serial")
+    public static class Listen implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.QUORUM;
+        }
+    }
+
+    /**
+     * Change unlisten.
+     */
+    @SuppressWarnings("serial")
+    public static class Unlisten implements Command<Void>, CatalystSerializable {
+        @Override
+        public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public void readObject(BufferInput<?> buffer, Serializer serializer) {
+        }
+
+        @Override
+        public CompactionMode compaction() {
+            return CompactionMode.TOMBSTONE;
+        }
+    }
+
+    /**
      * Multimap command type resolver.
      */
     @SuppressWarnings("serial")
@@ -584,6 +622,8 @@
             registry.register(Put.class, -1012);
             registry.register(RemoveAll.class, -1013);
             registry.register(MultiRemove.class, -1014);
+            registry.register(Listen.class, -1015);
+            registry.register(Unlisten.class, -1016);
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index e446a67..22a0345 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -22,14 +22,19 @@
 import io.atomix.resource.AbstractResource;
 import io.atomix.resource.ResourceTypeInfo;
 import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
 
 import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Clear;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.ContainsEntry;
@@ -40,13 +45,16 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
 
+
 /**
  * Set based implementation of the {@link AsyncConsistentMultimap}.
  * <p>
@@ -57,6 +65,10 @@
         extends AbstractResource<AtomixConsistentSetMultimap>
         implements AsyncConsistentMultimap<String, byte[]> {
 
+    private final Map<MultimapEventListener<String, byte[]>, Executor> mapEventListeners = new ConcurrentHashMap<>();
+
+    public static final String CHANGE_SUBJECT = "multimapChangeEvents";
+
     public AtomixConsistentSetMultimap(CopycatClient client,
                                        Properties properties) {
         super(client, properties);
@@ -64,8 +76,20 @@
 
     @Override
     public CompletableFuture<AtomixConsistentSetMultimap> open() {
-        return super.open();
-        //TODO
+        return super.open().thenApply(result -> {
+            client.onStateChange(state -> {
+                if (state == CopycatClient.State.CONNECTED && isListening()) {
+                    client.submit(new Listen());
+                }
+            });
+            client.onEvent(CHANGE_SUBJECT, this::handleEvent);
+            return result;
+        });
+    }
+
+    private void handleEvent(List<MultimapEvent<String, byte[]>> events) {
+        events.forEach(event ->
+                mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
     }
 
     @Override
@@ -158,6 +182,24 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<String, byte[]> listener, Executor executor) {
+        if (mapEventListeners.isEmpty()) {
+            return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
+        } else {
+            mapEventListeners.put(listener, executor);
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<String, byte[]> listener) {
+        if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
+            return client.submit(new Unlisten()).thenApply(v -> null);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Map<String, Collection<byte[]>>> asMap() {
         throw new UnsupportedOperationException("Expensive operation.");
     }
@@ -178,4 +220,8 @@
                                                       "in progress");
         }
     }
+
+    private boolean isListening() {
+        return !mapEventListeners.isEmpty();
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.session.SessionListener;
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
 import org.onlab.util.CountDownCompleter;
 import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
@@ -40,6 +42,8 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -61,12 +65,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -78,7 +84,7 @@
 
     private final Logger log = getLogger(getClass());
     private final AtomicLong globalVersion = new AtomicLong(1);
-    //TODO Add listener map here
+    private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
 
     public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
         executor.register(MultiRemove.class, this::multiRemove);
         executor.register(Put.class, this::put);
         executor.register(Replace.class, this::replace);
+        executor.register(Listen.class, this::listen);
+        executor.register(Unlisten.class, this::unlisten);
     }
 
     /**
@@ -313,12 +321,20 @@
      */
     protected Versioned<Collection<? extends byte[]>> removeAll(
             Commit<? extends RemoveAll> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return new Versioned<>(Sets.newHashSet(), -1);
-        } else {
-            return backingMap.get(commit.operation().key()).addCommit(commit);
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues =
+                backingMap.get(key).addCommit(commit);
+        publish(removedValues.value().stream()
+                .map(value -> new MultimapEvent<String, byte[]>(
+                        "", key, null, value))
+                .collect(Collectors.toList()));
+        return removedValues;
     }
 
     /**
@@ -328,14 +344,26 @@
      * @return true if any change results, else false
      */
     protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return false;
-        } else {
-            return (backingMap
-                    .get(commit.operation().key())
-                    .addCommit(commit)) != null;
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (removedValues != null) {
+            publish(removedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, null, value))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     /**
@@ -345,16 +373,27 @@
      * @return true if this commit results in a change, else false
      */
     protected boolean put(Commit<? extends Put> commit) {
+        String key = commit.operation().key();
         if (commit.operation().values().isEmpty()) {
             return false;
         }
-        if (!backingMap.containsKey(commit.operation().key())) {
-            backingMap.put(commit.operation().key(),
-                           new NonTransactionalCommit(1));
+        if (!backingMap.containsKey(key)) {
+            backingMap.put(key, new NonTransactionalCommit(1));
         }
-        return backingMap
-                .get(commit.operation().key())
-                .addCommit(commit) != null;
+
+        Versioned<Collection<? extends byte[]>> addedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (addedValues != null) {
+            publish(addedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, value, null))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
         return backingMap.get(commit.operation().key()).addCommit(commit);
     }
 
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit listen commit
+     */
+    protected void listen(Commit<? extends Listen> commit) {
+        Long sessionId = commit.session().id();
+        if (listeners.putIfAbsent(sessionId, commit) != null) {
+            commit.close();
+            return;
+        }
+        commit.session()
+                .onStateChange(
+                        state -> {
+                            if (state == ServerSession.State.CLOSED
+                                    || state == ServerSession.State.EXPIRED) {
+                                Commit<? extends Listen> listener = listeners.remove(sessionId);
+                                if (listener != null) {
+                                    listener.close();
+                                }
+                            }
+                        });
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit unlisten commit
+     */
+    protected void unlisten(Commit<? extends Unlisten> commit) {
+        try {
+            Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Publishes events to listeners.
+     *
+     * @param events list of map event to publish
+     */
+    private void publish(List<MultimapEvent<String, byte[]>> events) {
+        listeners.values().forEach(commit ->
+                commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+    }
+
     private interface MapEntryValue {
 
         /**