StorageService API for creating AsyncDocumentTree primitive
Change-Id: Ib7c3f19beb7b26a5b69161cf972c3c64d0be94b3
diff --git a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
index a04ef98..ebad932 100644
--- a/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
+++ b/apps/vtn/vtnrsc/src/test/java/org/onosproject/vtnrsc/util/VtnStorageServiceAdapter.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.vtnrsc.util;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
import org.onosproject.store.service.WorkQueue;
@@ -81,4 +82,9 @@
public <T> Topic<T> getTopic(String name, Serializer serializer) {
return null;
}
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ return null;
+ }
}
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
index a8827a7..b0a25f2 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DistributedPrimitiveCreator.java
@@ -22,9 +22,10 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
-import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.WorkQueue;
/**
* Interface for entity that can create instances of different distributed primitives.
@@ -99,6 +100,16 @@
<E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
/**
+ * Creates a new {@code AsyncDocumentTree}.
+ *
+ * @param <V> document tree node value type
+ * @param name tree name
+ * @param serializer serializer
+ * @return document tree
+ */
+ <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer);
+
+ /**
* Returns the names of all created {@code AsyncConsistentMap} instances.
* @return set of {@code AsyncConsistentMap} names
*/
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index 4edb438..2e2acfb 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -114,12 +114,21 @@
* @param <E> work element type
* @param name work queue name
* @param serializer serializer
- *
* @return WorkQueue instance
*/
<E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
/**
+ * Returns an instance of {@code AsyncDocumentTree} with specified name.
+ *
+ * @param <V> tree node value type
+ * @param name document tree name
+ * @param serializer serializer
+ * @return AsyncDocumentTree instance
+ */
+ <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
+
+ /**
* Returns an instance of {@code Topic} with specified name.
*
* @param <T> topic message type
diff --git a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
index e75c59e..01cc065 100644
--- a/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/store/service/StorageServiceAdapter.java
@@ -64,7 +64,13 @@
return null;
}
+ @Override
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
return null;
}
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ return null;
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 654ba84..7159f06 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -106,7 +106,6 @@
serializer.register(DocumentTreeUpdateResult.class, factory);
serializer.register(DocumentTreeUpdateResult.Status.class, factory);
serializer.register(DocumentTreeEvent.class, factory);
- serializer.register(DocumentTreeEvent.Type.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(ImmutableList.of().getClass(), factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
new file mode 100644
index 0000000..dd4ac50
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultDistributedDocumentTree.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTreeEvent;
+import org.onosproject.store.service.DocumentTreeListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Default implementation of {@link AsyncDocumentTree}.
+ * <p>
+ * This implementation delegates execution to a backing tree implemented on top of Atomix framework.
+ *
+ * @See AtomixDocumentTree
+ *
+ * @param <V> tree node value type.
+ */
+public class DefaultDistributedDocumentTree<V> implements AsyncDocumentTree<V> {
+
+ private final String name;
+ private final AsyncDocumentTree<byte[]> backingTree;
+ private final Serializer serializer;
+ private final Map<DocumentTreeListener<V>, InternalBackingDocumentTreeListener> listeners =
+ Maps.newIdentityHashMap();
+
+ DefaultDistributedDocumentTree(String name, AsyncDocumentTree<byte[]> backingTree, Serializer serializer) {
+ this.name = name;
+ this.backingTree = backingTree;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return backingTree.primitiveType();
+ }
+
+ @Override
+ public DocumentPath root() {
+ return backingTree.root();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+ return backingTree.getChildren(path)
+ .thenApply(map -> Maps.transformValues(map, v -> v.map(serializer::decode)));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+ return backingTree.get(path)
+ .thenApply(v -> v == null ? null : v.map(serializer::decode));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+ return backingTree.set(path, serializer.encode(value))
+ .thenApply(v -> v == null ? null : v.map(serializer::decode));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+ return backingTree.create(path, serializer.encode(value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+ return backingTree.replace(path, serializer.encode(newValue), version);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+ return backingTree.replace(path, serializer.encode(newValue), serializer.encode(currentValue));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+ return backingTree.removeNode(path)
+ .thenApply(v -> v == null ? null : v.map(serializer::decode));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+ synchronized (listeners) {
+ InternalBackingDocumentTreeListener backingListener =
+ listeners.computeIfAbsent(listener, k -> new InternalBackingDocumentTreeListener(listener));
+ return backingTree.addListener(path, backingListener);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+ synchronized (listeners) {
+ InternalBackingDocumentTreeListener backingListener = listeners.remove(listener);
+ if (backingListener != null) {
+ return backingTree.removeListener(backingListener);
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+ }
+
+ private class InternalBackingDocumentTreeListener implements DocumentTreeListener<byte[]> {
+
+ private final DocumentTreeListener<V> listener;
+
+ InternalBackingDocumentTreeListener(DocumentTreeListener<V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void event(DocumentTreeEvent<byte[]> event) {
+ listener.event(new DocumentTreeEvent<V>(event.path(),
+ event.type(),
+ event.newValue().map(v -> v.map(serializer::decode)),
+ event.oldValue().map(v -> v.map(serializer::decode))));
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index efb32d7..b36906c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -29,6 +29,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
@@ -108,6 +109,11 @@
}
@Override
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ return getCreator(name).newAsyncDocumentTree(name, serializer);
+ }
+
+ @Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 25eb3a5..8e04d5a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -43,6 +43,7 @@
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
@@ -180,6 +181,12 @@
}
@Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ checkPermission(STORAGE_WRITE);
+ return federatedPrimitiveCreator.newAsyncDocumentTree(name, serializer);
+ }
+
+ @Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 1b59752..b123ee0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -41,6 +41,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
@@ -49,6 +50,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.PartitionClientInfo;
@@ -191,6 +193,12 @@
}
@Override
+ public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
+ AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
+ return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
+ }
+
+ @Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
index 77f548b..cd725d9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
@@ -65,7 +65,7 @@
private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
- private final AtomicLong versionCounter = new AtomicLong(0);
+ private AtomicLong versionCounter = new AtomicLong(0);
private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
public AtomixDocumentTreeState(Properties properties) {
@@ -79,7 +79,7 @@
@Override
public void install(SnapshotReader reader) {
- versionCounter.set(reader.readLong());
+ versionCounter = new AtomicLong(reader.readLong());
}
@Override
@@ -101,8 +101,7 @@
commit.close();
return;
}
- commit.session()
- .onStateChange(
+ commit.session().onStateChange(
state -> {
if (state == ServerSession.State.CLOSED
|| state == ServerSession.State.EXPIRED) {
@@ -262,11 +261,10 @@
result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
Optional.ofNullable(result.newValue()),
Optional.ofNullable(result.oldValue()));
- Object message = ImmutableList.of(event);
- listeners.values().forEach(commit -> {
- commit.session().publish(AtomixDocumentTree.CHANGE_SUBJECT, message);
- System.out.println("Sent " + message + " to " + commit.session().id());
- });
+ listeners.values()
+ .forEach(commit -> commit.session()
+ .publish(AtomixDocumentTree.CHANGE_SUBJECT,
+ ImmutableList.of(event)));
}
@Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
index 8a5efe9..653b61c 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeTest.java
@@ -320,10 +320,6 @@
}
}
- public boolean eventReceived() {
- return !queue.isEmpty();
- }
-
public DocumentTreeEvent<byte[]> event() throws InterruptedException {
return queue.take();
}
diff --git a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
index 3b864c4..e124325 100644
--- a/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
+++ b/protocols/pcep/ctl/src/test/java/org/onosproject/pcelabelstore/util/StorageServiceAdapter.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.pcelabelstore.util;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
@@ -83,4 +84,9 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
+ return null;
+ }
}