AtomixDocumentTree support for filtering notifications by DocumentPath

Change-Id: I3f4f616bc4f2e488e5433e44f72bcd121b564b0d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 8f0c73a..77f9b98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -33,11 +33,11 @@
 
 import org.onlab.util.Match;
 import org.onlab.util.Tools;
-import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
 import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
@@ -56,7 +56,7 @@
 public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
     implements AsyncDocumentTree<byte[]> {
 
-    private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>();
+    private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
     public static final String CHANGE_SUBJECT = "changeEvents";
 
     protected AtomixDocumentTree(CopycatClient client, Properties options) {
@@ -184,21 +184,21 @@
     public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
         checkNotNull(path);
         checkNotNull(listener);
+        InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
         // TODO: Support API that takes an executor
-        if (isListening()) {
-            eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
-            return CompletableFuture.completedFuture(null);
-        } else {
+        if (!eventListeners.containsKey(listener)) {
             return client.submit(new Listen(path))
-                         .thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor()));
+                         .thenRun(() -> eventListeners.put(listener, internalListener));
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
         checkNotNull(listener);
-        if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) {
-            return client.submit(new Unlisten()).thenApply(v -> null);
+        InternalListener internalListener = eventListeners.remove(listener);
+        if  (internalListener != null && eventListeners.isEmpty()) {
+            return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -213,7 +213,26 @@
     }
 
     private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
-        events.forEach(event ->
-            eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+        events.forEach(event -> eventListeners.values().forEach(listener -> listener.event(event)));
+    }
+
+    private class InternalListener implements DocumentTreeListener<byte[]> {
+
+        private final DocumentPath path;
+        private final DocumentTreeListener<byte[]> listener;
+        private final Executor executor;
+
+        public InternalListener(DocumentPath path, DocumentTreeListener<byte[]> listener, Executor executor) {
+            this.path = path;
+            this.listener = listener;
+            this.executor = executor;
+        }
+
+        @Override
+        public void event(DocumentTreeEvent<byte[]> event) {
+            if (event.path().isDescendentOf(path)) {
+                executor.execute(() -> listener.event(event));
+            }
+        }
     }
 }