Implement listeners for ConsistentMultimap.

Change-Id: Ica07d444c18af8ba7a9bbb120623512def572a48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
index 5110187..47972d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapState.java
@@ -27,12 +27,14 @@
 import io.atomix.copycat.server.Commit;
 import io.atomix.copycat.server.Snapshottable;
 import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
 import io.atomix.copycat.server.session.SessionListener;
 import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
 import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
 import io.atomix.resource.ResourceStateMachine;
 import org.onlab.util.CountDownCompleter;
 import org.onlab.util.Match;
+import org.onosproject.store.service.MultimapEvent;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
@@ -40,6 +42,8 @@
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -61,12 +65,14 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.IsEmpty;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.KeySet;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Keys;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Listen;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultiRemove;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.MultimapCommand;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Put;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.RemoveAll;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Size;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Unlisten;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands.Values;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -78,7 +84,7 @@
 
     private final Logger log = getLogger(getClass());
     private final AtomicLong globalVersion = new AtomicLong(1);
-    //TODO Add listener map here
+    private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
     private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
 
     public AtomixConsistentSetMultimapState(Properties properties) {
@@ -110,6 +116,8 @@
         executor.register(MultiRemove.class, this::multiRemove);
         executor.register(Put.class, this::put);
         executor.register(Replace.class, this::replace);
+        executor.register(Listen.class, this::listen);
+        executor.register(Unlisten.class, this::unlisten);
     }
 
     /**
@@ -313,12 +321,20 @@
      */
     protected Versioned<Collection<? extends byte[]>> removeAll(
             Commit<? extends RemoveAll> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return new Versioned<>(Sets.newHashSet(), -1);
-        } else {
-            return backingMap.get(commit.operation().key()).addCommit(commit);
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues =
+                backingMap.get(key).addCommit(commit);
+        publish(removedValues.value().stream()
+                .map(value -> new MultimapEvent<String, byte[]>(
+                        "", key, null, value))
+                .collect(Collectors.toList()));
+        return removedValues;
     }
 
     /**
@@ -328,14 +344,26 @@
      * @return true if any change results, else false
      */
     protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
-        if (!backingMap.containsKey(commit.operation().key())) {
+        String key = commit.operation().key();
+
+        if (!backingMap.containsKey(key)) {
             commit.close();
             return false;
-        } else {
-            return (backingMap
-                    .get(commit.operation().key())
-                    .addCommit(commit)) != null;
         }
+
+        Versioned<Collection<? extends byte[]>> removedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (removedValues != null) {
+            publish(removedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, null, value))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     /**
@@ -345,16 +373,27 @@
      * @return true if this commit results in a change, else false
      */
     protected boolean put(Commit<? extends Put> commit) {
+        String key = commit.operation().key();
         if (commit.operation().values().isEmpty()) {
             return false;
         }
-        if (!backingMap.containsKey(commit.operation().key())) {
-            backingMap.put(commit.operation().key(),
-                           new NonTransactionalCommit(1));
+        if (!backingMap.containsKey(key)) {
+            backingMap.put(key, new NonTransactionalCommit(1));
         }
-        return backingMap
-                .get(commit.operation().key())
-                .addCommit(commit) != null;
+
+        Versioned<Collection<? extends byte[]>> addedValues = backingMap
+                .get(key)
+                .addCommit(commit);
+
+        if (addedValues != null) {
+            publish(addedValues.value().stream()
+                    .map(value -> new MultimapEvent<String, byte[]>(
+                            "", key, value, null))
+                    .collect(Collectors.toList()));
+            return true;
+        }
+
+        return false;
     }
 
     protected Versioned<Collection<? extends byte[]>> replace(
@@ -366,6 +405,56 @@
         return backingMap.get(commit.operation().key()).addCommit(commit);
     }
 
+    /**
+     * Handles a listen commit.
+     *
+     * @param commit listen commit
+     */
+    protected void listen(Commit<? extends Listen> commit) {
+        Long sessionId = commit.session().id();
+        if (listeners.putIfAbsent(sessionId, commit) != null) {
+            commit.close();
+            return;
+        }
+        commit.session()
+                .onStateChange(
+                        state -> {
+                            if (state == ServerSession.State.CLOSED
+                                    || state == ServerSession.State.EXPIRED) {
+                                Commit<? extends Listen> listener = listeners.remove(sessionId);
+                                if (listener != null) {
+                                    listener.close();
+                                }
+                            }
+                        });
+    }
+
+    /**
+     * Handles an unlisten commit.
+     *
+     * @param commit unlisten commit
+     */
+    protected void unlisten(Commit<? extends Unlisten> commit) {
+        try {
+            Commit<? extends Listen> listener = listeners.remove(commit.session().id());
+            if (listener != null) {
+                listener.close();
+            }
+        } finally {
+            commit.close();
+        }
+    }
+
+    /**
+     * Publishes events to listeners.
+     *
+     * @param events list of map event to publish
+     */
+    private void publish(List<MultimapEvent<String, byte[]>> events) {
+        listeners.values().forEach(commit ->
+                commit.session().publish(AtomixConsistentSetMultimap.CHANGE_SUBJECT, events));
+    }
+
     private interface MapEntryValue {
 
         /**