StorageService API for creating AsyncDocumentTree primitive

Change-Id: Ib7c3f19beb7b26a5b69161cf972c3c64d0be94b3
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
index a04ef98..ebad932 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.vtnrsc.util;
 
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.Topic;
 import org.onosproject.store.service.ConsistentTreeMapBuilder;
 import org.onosproject.store.service.WorkQueue;
@@ -81,4 +82,9 @@
     public <T> Topic<T> getTopic(String name, Serializer serializer) {
         return null;
     }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        return null;
+    }
 }
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index a8827a7..b0a25f2 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -22,9 +22,10 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.WorkQueue;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
 
 /**
  * Interface for entity that can create instances of different distributed primitives.
@@ -99,6 +100,16 @@
     <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
 
     /**
+     * Creates a new {@code AsyncDocumentTree}.
+     *
+     * @param <V> document tree node value type
+     * @param name tree name
+     * @param serializer serializer
+     * @return document tree
+     */
+    <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer);
+
+    /**
      * Returns the names of all created {@code AsyncConsistentMap} instances.
      * @return set of {@code AsyncConsistentMap} names
      */
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 4edb438..2e2acfb 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -114,12 +114,21 @@
      * @param <E> work element type
      * @param name work queue name
      * @param serializer serializer
-     *
      * @return WorkQueue instance
      */
     <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
 
     /**
+     * Returns an instance of {@code AsyncDocumentTree} with specified name.
+     *
+     * @param <V> tree node value type
+     * @param name document tree name
+     * @param serializer serializer
+     * @return AsyncDocumentTree instance
+     */
+    <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
+
+    /**
      * Returns an instance of {@code Topic} with specified name.
      *
      * @param <T> topic message type
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index e75c59e..01cc065 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -64,7 +64,13 @@
         return null;
     }
 
+    @Override
     public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
         return null;
     }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        return null;
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 654ba84..7159f06 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -106,7 +106,6 @@
         serializer.register(DocumentTreeUpdateResult.class, factory);
         serializer.register(DocumentTreeUpdateResult.Status.class, factory);
         serializer.register(DocumentTreeEvent.class, factory);
-        serializer.register(DocumentTreeEvent.Type.class, factory);
         serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
         serializer.register(ImmutableList.of().getClass(), factory);
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
new file mode 100644
index 0000000..dd4ac50
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTreeEvent;
+import org.onosproject.store.service.DocumentTreeListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Default implementation of {@link AsyncDocumentTree}.
+ * <p>
+ * This implementation delegates execution to a backing tree implemented on top of Atomix framework.
+ *
+ * @See AtomixDocumentTree
+ *
+ * @param <V> tree node value type.
+ */
+public class DefaultDistributedDocumentTree<V> implements AsyncDocumentTree<V> {
+
+    private final String name;
+    private final AsyncDocumentTree<byte[]> backingTree;
+    private final Serializer serializer;
+    private final Map<DocumentTreeListener<V>, InternalBackingDocumentTreeListener> listeners =
+            Maps.newIdentityHashMap();
+
+    DefaultDistributedDocumentTree(String name, AsyncDocumentTree<byte[]> backingTree, Serializer serializer) {
+        this.name = name;
+        this.backingTree = backingTree;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Type primitiveType() {
+        return backingTree.primitiveType();
+    }
+
+    @Override
+    public DocumentPath root() {
+        return backingTree.root();
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+        return backingTree.getChildren(path)
+                          .thenApply(map -> Maps.transformValues(map, v -> v.map(serializer::decode)));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+        return backingTree.get(path)
+                          .thenApply(v -> v == null ? null : v.map(serializer::decode));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+        return backingTree.set(path, serializer.encode(value))
+                          .thenApply(v -> v == null ? null : v.map(serializer::decode));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+        return backingTree.create(path, serializer.encode(value));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+        return backingTree.replace(path, serializer.encode(newValue), version);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+        return backingTree.replace(path, serializer.encode(newValue), serializer.encode(currentValue));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+        return backingTree.removeNode(path)
+                          .thenApply(v -> v == null ? null : v.map(serializer::decode));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+        synchronized (listeners) {
+            InternalBackingDocumentTreeListener backingListener =
+                    listeners.computeIfAbsent(listener, k -> new InternalBackingDocumentTreeListener(listener));
+            return backingTree.addListener(path, backingListener);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+        synchronized (listeners) {
+            InternalBackingDocumentTreeListener backingListener = listeners.remove(listener);
+            if (backingListener != null) {
+                return backingTree.removeListener(backingListener);
+            } else {
+                return CompletableFuture.completedFuture(null);
+            }
+        }
+    }
+
+    private class InternalBackingDocumentTreeListener implements DocumentTreeListener<byte[]> {
+
+        private final DocumentTreeListener<V> listener;
+
+        InternalBackingDocumentTreeListener(DocumentTreeListener<V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void event(DocumentTreeEvent<byte[]> event) {
+            listener.event(new DocumentTreeEvent<V>(event.path(),
+                    event.type(),
+                    event.newValue().map(v -> v.map(serializer::decode)),
+                    event.oldValue().map(v -> v.map(serializer::decode))));
+        }
+    }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index efb32d7..b36906c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -29,6 +29,7 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.WorkQueue;
@@ -108,6 +109,11 @@
     }
 
     @Override
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+        return getCreator(name).newAsyncDocumentTree(name, serializer);
+    }
+
+    @Override
     public Set<String> getAsyncConsistentMapNames() {
         return members.values()
                       .stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 25eb3a5..8e04d5a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -43,6 +43,7 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncAtomicValue;
 import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMap;
@@ -180,6 +181,12 @@
     }
 
     @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        checkPermission(STORAGE_WRITE);
+        return federatedPrimitiveCreator.newAsyncDocumentTree(name, serializer);
+    }
+
+    @Override
     public List<MapInfo> getMapInfo() {
         return listMapInfo(federatedPrimitiveCreator);
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 1b59752..b123ee0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
 import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
 import org.onosproject.store.primitives.resources.impl.AtomixCounter;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
 import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
 import org.onosproject.store.serializers.KryoNamespaces;
@@ -49,6 +50,7 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncConsistentTreeMap;
 import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AsyncLeaderElector;
 import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.PartitionClientInfo;
@@ -191,6 +193,12 @@
     }
 
     @Override
+    public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+        AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
+        return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
+    }
+
+    @Override
     public AsyncLeaderElector newAsyncLeaderElector(String name) {
         AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
                                                   .thenCompose(AtomixLeaderElector::setupCache)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
index 77f548b..cd725d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
@@ -65,7 +65,7 @@
 
     private final Logger log = getLogger(getClass());
     private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
-    private final AtomicLong versionCounter = new AtomicLong(0);
+    private AtomicLong versionCounter = new AtomicLong(0);
     private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
 
     public AtomixDocumentTreeState(Properties properties) {
@@ -79,7 +79,7 @@
 
     @Override
     public void install(SnapshotReader reader) {
-        versionCounter.set(reader.readLong());
+        versionCounter = new AtomicLong(reader.readLong());
     }
 
     @Override
@@ -101,8 +101,7 @@
             commit.close();
             return;
         }
-        commit.session()
-                .onStateChange(
+        commit.session().onStateChange(
                         state -> {
                             if (state == ServerSession.State.CLOSED
                                     || state == ServerSession.State.EXPIRED) {
@@ -262,11 +261,10 @@
                         result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
                         Optional.ofNullable(result.newValue()),
                         Optional.ofNullable(result.oldValue()));
-        Object message = ImmutableList.of(event);
-        listeners.values().forEach(commit -> {
-            commit.session().publish(AtomixDocumentTree.CHANGE_SUBJECT, message);
-            System.out.println("Sent " + message + " to " + commit.session().id());
-        });
+        listeners.values()
+                 .forEach(commit -> commit.session()
+                                          .publish(AtomixDocumentTree.CHANGE_SUBJECT,
+                                                   ImmutableList.of(event)));
     }
 
     @Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
index 8a5efe9..653b61c 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
@@ -320,10 +320,6 @@
             }
         }
 
-        public boolean eventReceived() {
-            return !queue.isEmpty();
-        }
-
         public DocumentTreeEvent<byte[]> event() throws InterruptedException {
             return queue.take();
         }
diff --git a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index 3b864c4..e124325 100644
--- a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.pcelabelstore.util;
 
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.AtomicCounterBuilder;
 import org.onosproject.store.service.AtomicValueBuilder;
 import org.onosproject.store.service.ConsistentMapBuilder;
@@ -83,4 +84,9 @@
         // TODO Auto-generated method stub
         return null;
     }
+
+    @Override
+    public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+        return null;
+    }
 }