Supporting Atomix classes for DocumentTree distributed primitive
Change-Id: I754222337401f90f976d4152b6abbdf2e1a4df8e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 4ed3a72..50db768 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -15,14 +15,15 @@
*/
package org.onosproject.store.primitives.resources.impl;
+import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
import static org.slf4j.LoggerFactory.getLogger;
-import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
@@ -68,15 +69,13 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
- private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
+ private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
new file mode 100644
index 0000000..939d3f0
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.AbstractResource;
+import io.atomix.resource.ResourceTypeInfo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import org.onlab.util.Match;
+import org.onlab.util.Tools;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
+import org.onosproject.store.service.AsyncDocumentTree;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTreeEvent;
+import org.onosproject.store.service.DocumentTreeListener;
+import org.onosproject.store.service.IllegalDocumentModificationException;
+import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * Distributed resource providing the {@link AsyncDocumentTree} primitive.
+ */
+@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
+public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
+ implements AsyncDocumentTree<byte[]> {
+
+ private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>();
+ public static final String CHANGE_SUBJECT = "changeEvents";
+
+ protected AtomixDocumentTree(CopycatClient client, Properties options) {
+ super(client, options);
+ }
+
+ @Override
+ public CompletableFuture<AtomixDocumentTree> open() {
+ return super.open().thenApply(result -> {
+ client.onStateChange(state -> {
+ if (state == CopycatClient.State.CONNECTED && isListening()) {
+ client.submit(new Listen());
+ }
+ });
+ client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
+ return result;
+ });
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return Type.DOCUMENT_TREE;
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return client.submit(new Clear());
+ }
+
+ @Override
+ public DocumentPath root() {
+ return DocumentPath.from("root");
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
+ return client.submit(new GetChildren(checkNotNull(path)));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
+ return client.submit(new Get(checkNotNull(path)));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
+ return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.any(), Match.any()))
+ .thenCompose(result -> {
+ if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
+ return Tools.exceptionalFuture(new NoSuchDocumentPathException());
+ } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
+ return Tools.exceptionalFuture(new IllegalDocumentModificationException());
+ } else {
+ return CompletableFuture.completedFuture(result);
+ }
+ }).thenApply(result -> result.oldValue());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
+ return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.ifNull(), Match.any()))
+ .thenCompose(result -> {
+ if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
+ return Tools.exceptionalFuture(new NoSuchDocumentPathException());
+ } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
+ return Tools.exceptionalFuture(new IllegalDocumentModificationException());
+ } else {
+ return CompletableFuture.completedFuture(result);
+ }
+ }).thenApply(result -> result.created());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
+ return client.submit(new Update(checkNotNull(path), newValue, Match.any(), Match.ifValue(version)))
+ .thenApply(result -> result.updated());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
+ return client.submit(new Update(checkNotNull(path), newValue, Match.ifValue(currentValue), Match.any()))
+ .thenCompose(result -> {
+ if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
+ return Tools.exceptionalFuture(new NoSuchDocumentPathException());
+ } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
+ return Tools.exceptionalFuture(new IllegalDocumentModificationException());
+ } else {
+ return CompletableFuture.completedFuture(result);
+ }
+ }).thenApply(result -> result.updated());
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
+ if (path.equals(DocumentPath.from("root"))) {
+ return Tools.exceptionalFuture(new IllegalDocumentModificationException());
+ }
+ return client.submit(new Update(checkNotNull(path), null, Match.ifNotNull(), Match.any()))
+ .thenCompose(result -> {
+ if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
+ return Tools.exceptionalFuture(new NoSuchDocumentPathException());
+ } else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
+ return Tools.exceptionalFuture(new IllegalDocumentModificationException());
+ } else {
+ return CompletableFuture.completedFuture(result);
+ }
+ }).thenApply(result -> result.oldValue());
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
+ checkNotNull(path);
+ checkNotNull(listener);
+ // TODO: Support API that takes an executor
+ if (isListening()) {
+ eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return client.submit(new Listen(path))
+ .thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor()));
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
+ checkNotNull(listener);
+ if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) {
+ return client.submit(new Unlisten()).thenApply(v -> null);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private boolean isListening() {
+ return !eventListeners.isEmpty();
+ }
+
+ private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
+ events.forEach(event ->
+ eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeCommands.java
new file mode 100644
index 0000000..a6b406c
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeCommands.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.buffer.BufferInput;
+import io.atomix.catalyst.buffer.BufferOutput;
+import io.atomix.catalyst.serializer.CatalystSerializable;
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.serializer.SerializerRegistry;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Query;
+
+import java.util.Map;
+
+import org.onlab.util.Match;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * {@link AtomixDocumentTree} resource state machine operations.
+ */
+public class AtomixDocumentTreeCommands {
+
+ /**
+ * Abstract DocumentTree operation.
+ */
+ public abstract static class DocumentTreeOperation<V> implements CatalystSerializable {
+
+ private DocumentPath path;
+
+ DocumentTreeOperation(DocumentPath path) {
+ this.path = path;
+ }
+
+ public DocumentPath path() {
+ return path;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ serializer.writeObject(path, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ path = serializer.readObject(buffer);
+ }
+ }
+
+ /**
+ * Abstract DocumentTree query.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class DocumentTreeQuery<V> extends DocumentTreeOperation<V> implements Query<V> {
+
+ DocumentTreeQuery(DocumentPath path) {
+ super(path);
+ }
+
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.SEQUENTIAL;
+ }
+ }
+
+ /**
+ * Abstract DocumentTree command.
+ */
+ @SuppressWarnings("serial")
+ public abstract static class DocumentTreeCommand<V> extends DocumentTreeOperation<V> implements Command<V> {
+
+ DocumentTreeCommand(DocumentPath path) {
+ super(path);
+ }
+ }
+
+ /**
+ * DocumentTree#get query.
+ */
+ @SuppressWarnings("serial")
+ public static class Get extends DocumentTreeQuery<Versioned<byte[]>> {
+ public Get() {
+ super(null);
+ }
+
+ public Get(DocumentPath path) {
+ super(path);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("path", path())
+ .toString();
+ }
+ }
+
+ /**
+ * DocumentTree#getChildren query.
+ */
+ @SuppressWarnings("serial")
+ public static class GetChildren extends DocumentTreeQuery<Map<String, Versioned<byte[]>>> {
+ public GetChildren() {
+ super(null);
+ }
+
+ public GetChildren(DocumentPath path) {
+ super(path);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("path", path())
+ .toString();
+ }
+ }
+
+ /**
+ * DocumentTree update command.
+ */
+ @SuppressWarnings("serial")
+ public static class Update extends DocumentTreeCommand<DocumentTreeUpdateResult<byte[]>> {
+
+ private byte[] value;
+ private Match<byte[]> valueMatch;
+ private Match<Long> versionMatch;
+
+ public Update() {
+ super(null);
+ this.value = null;
+ this.valueMatch = null;
+ this.versionMatch = null;
+ }
+
+ public Update(DocumentPath path, byte[] value, Match<byte[]> valueMatch, Match<Long> versionMatch) {
+ super(path);
+ this.value = value;
+ this.valueMatch = valueMatch;
+ this.versionMatch = versionMatch;
+ }
+
+ public byte[] value() {
+ return value;
+ }
+
+ public Match<byte[]> valueMatch() {
+ return valueMatch;
+ }
+
+ public Match<Long> versionMatch() {
+ return versionMatch;
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ super.writeObject(buffer, serializer);
+ serializer.writeObject(value, buffer);
+ serializer.writeObject(valueMatch, buffer);
+ serializer.writeObject(versionMatch, buffer);
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ super.readObject(buffer, serializer);
+ value = serializer.readObject(buffer);
+ valueMatch = serializer.readObject(buffer);
+ versionMatch = serializer.readObject(buffer);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("path", path())
+ .add("value", value)
+ .add("valueMatch", valueMatch)
+ .add("versionMatch", versionMatch)
+ .toString();
+ }
+ }
+
+ /**
+ * Clear command.
+ */
+ @SuppressWarnings("serial")
+ public static class Clear implements Command<Void>, CatalystSerializable {
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Change listen.
+ */
+ @SuppressWarnings("serial")
+ public static class Listen extends DocumentTreeCommand<Void> {
+
+ public Listen() {
+ this(DocumentPath.from("root"));
+ }
+
+ public Listen(DocumentPath path) {
+ super(path);
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * Change unlisten.
+ */
+ @SuppressWarnings("serial")
+ public static class Unlisten extends DocumentTreeCommand<Void> {
+
+ public Unlisten() {
+ this(DocumentPath.from("root"));
+ }
+
+ public Unlisten(DocumentPath path) {
+ super(path);
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ }
+ }
+
+ /**
+ * DocumentTree command type resolver.
+ */
+ public static class TypeResolver implements SerializableTypeResolver {
+ @Override
+ public void resolve(SerializerRegistry registry) {
+ registry.register(Get.class, -911);
+ registry.register(GetChildren.class, -912);
+ registry.register(Update.class, -913);
+ registry.register(Listen.class, -914);
+ registry.register(Unlisten.class, -915);
+ registry.register(Clear.class, -916);
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeFactory.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeFactory.java
new file mode 100644
index 0000000..4282566
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.resources.impl;
+
+import io.atomix.catalyst.serializer.SerializableTypeResolver;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.resource.ResourceFactory;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.Properties;
+
+/**
+ * {@link AtomixDocumentTree} resource factory.
+ *
+ */
+public class AtomixDocumentTreeFactory implements ResourceFactory<AtomixDocumentTree> {
+
+ @Override
+ public SerializableTypeResolver createSerializableTypeResolver() {
+ return new AtomixDocumentTreeCommands.TypeResolver();
+ }
+
+ @Override
+ public ResourceStateMachine createStateMachine(Properties config) {
+ return new AtomixDocumentTreeState(config);
+ }
+
+ @Override
+ public AtomixDocumentTree createInstance(CopycatClient client, Properties options) {
+ return new AtomixDocumentTree(client, options);
+ }
+ }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
new file mode 100644
index 0000000..77f548b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeState.java
@@ -0,0 +1,297 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+import io.atomix.copycat.server.Commit;
+import io.atomix.copycat.server.Snapshottable;
+import io.atomix.copycat.server.StateMachineExecutor;
+import io.atomix.copycat.server.session.ServerSession;
+import io.atomix.copycat.server.session.SessionListener;
+import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
+import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
+import io.atomix.resource.ResourceStateMachine;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
+import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTree;
+import org.onosproject.store.service.DocumentTreeEvent;
+import org.onosproject.store.service.DocumentTreeEvent.Type;
+import org.onosproject.store.service.IllegalDocumentModificationException;
+import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+
+/**
+ * State Machine for {@link AtomixDocumentTree} resource.
+ */
+public class AtomixDocumentTreeState
+ extends ResourceStateMachine
+ implements SessionListener, Snapshottable {
+
+ private final Logger log = getLogger(getClass());
+ private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
+ private final AtomicLong versionCounter = new AtomicLong(0);
+ private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
+
+ public AtomixDocumentTreeState(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeLong(versionCounter.get());
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ versionCounter.set(reader.readLong());
+ }
+
+ @Override
+ protected void configure(StateMachineExecutor executor) {
+ // Listeners
+ executor.register(Listen.class, this::listen);
+ executor.register(Unlisten.class, this::unlisten);
+ // queries
+ executor.register(Get.class, this::get);
+ executor.register(GetChildren.class, this::getChildren);
+ // commands
+ executor.register(Update.class, this::update);
+ executor.register(Clear.class, this::clear);
+ }
+
+ protected void listen(Commit<? extends Listen> commit) {
+ Long sessionId = commit.session().id();
+ if (listeners.putIfAbsent(sessionId, commit) != null) {
+ commit.close();
+ return;
+ }
+ commit.session()
+ .onStateChange(
+ state -> {
+ if (state == ServerSession.State.CLOSED
+ || state == ServerSession.State.EXPIRED) {
+ Commit<? extends Listen> listener = listeners.remove(sessionId);
+ if (listener != null) {
+ listener.close();
+ }
+ }
+ });
+ }
+
+ protected void unlisten(Commit<? extends Unlisten> commit) {
+ try {
+ closeListener(commit.session().id());
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected Versioned<byte[]> get(Commit<? extends Get> commit) {
+ try {
+ Versioned<TreeNodeValue> value = docTree.get(commit.operation().path());
+ return value == null ? null : value.map(node -> node == null ? null : node.value());
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
+ try {
+ Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(commit.operation().path());
+ return children == null
+ ? null : Maps.newHashMap(Maps.transformValues(children,
+ value -> value.map(TreeNodeValue::value)));
+ } finally {
+ commit.close();
+ }
+ }
+
+ protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
+ DocumentTreeUpdateResult<byte[]> result = null;
+ DocumentPath path = commit.operation().path();
+ boolean updated = false;
+ Versioned<TreeNodeValue> currentValue = docTree.get(path);
+ try {
+ Match<Long> versionMatch = commit.operation().versionMatch();
+ Match<byte[]> valueMatch = commit.operation().valueMatch();
+
+ if (versionMatch.matches(currentValue == null ? null : currentValue.version())
+ && valueMatch.matches(currentValue == null ? null : currentValue.value().value())) {
+ if (commit.operation().value() == null) {
+ docTree.removeNode(path);
+ } else {
+ docTree.set(path, new NonTransactionalCommit(commit));
+ }
+ updated = true;
+ }
+ Versioned<TreeNodeValue> newValue = updated ? docTree.get(path) : currentValue;
+ Status updateStatus = updated
+ ? Status.OK : commit.operation().value() == null ? Status.INVALID_PATH : Status.NOOP;
+ result = new DocumentTreeUpdateResult<>(path,
+ updateStatus,
+ newValue == null
+ ? null : newValue.map(TreeNodeValue::value),
+ currentValue == null
+ ? null : currentValue.map(TreeNodeValue::value));
+ } catch (IllegalDocumentModificationException e) {
+ result = DocumentTreeUpdateResult.illegalModification(path);
+ } catch (NoSuchDocumentPathException e) {
+ result = DocumentTreeUpdateResult.invalidPath(path);
+ } catch (Exception e) {
+ log.error("Failed to apply {} to state machine", commit.operation(), e);
+ throw Throwables.propagate(e);
+ } finally {
+ if (updated) {
+ if (currentValue != null) {
+ currentValue.value().discard();
+ }
+ } else {
+ commit.close();
+ }
+ }
+ notifyListeners(path, result);
+ return result;
+ }
+
+ protected void clear(Commit<? extends Clear> commit) {
+ try {
+ Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
+ Map<String, Versioned<TreeNodeValue>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
+ toClearQueue.addAll(topLevelChildren.keySet()
+ .stream()
+ .map(name -> new DocumentPath(name, DocumentPath.from("root")))
+ .collect(Collectors.toList()));
+ while (!toClearQueue.isEmpty()) {
+ DocumentPath path = toClearQueue.remove();
+ Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(path);
+ if (children.size() == 0) {
+ docTree.removeNode(path).value().discard();
+ } else {
+ children.keySet()
+ .stream()
+ .forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
+ toClearQueue.add(path);
+ }
+ }
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Interface implemented by tree node values.
+ */
+ private interface TreeNodeValue {
+ /**
+ * Returns the raw {@code byte[]}.
+ *
+ * @return raw value
+ */
+ byte[] value();
+
+ /**
+ * Discards the value by invoke appropriate clean up actions.
+ */
+ void discard();
+ }
+
+ /**
+ * A {@code TreeNodeValue} that is derived from a non-transactional update
+ * i.e. via any standard tree update operation.
+ */
+ private class NonTransactionalCommit implements TreeNodeValue {
+ private final Commit<? extends Update> commit;
+
+ public NonTransactionalCommit(Commit<? extends Update> commit) {
+ this.commit = commit;
+ }
+
+ @Override
+ public byte[] value() {
+ return commit.operation().value();
+ }
+
+ @Override
+ public void discard() {
+ commit.close();
+ }
+ }
+
+ private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
+ if (result.status() != Status.OK) {
+ return;
+ }
+ DocumentTreeEvent<byte[]> event =
+ new DocumentTreeEvent<>(path,
+ result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
+ Optional.ofNullable(result.newValue()),
+ Optional.ofNullable(result.oldValue()));
+ Object message = ImmutableList.of(event);
+ listeners.values().forEach(commit -> {
+ commit.session().publish(AtomixDocumentTree.CHANGE_SUBJECT, message);
+ System.out.println("Sent " + message + " to " + commit.session().id());
+ });
+ }
+
+ @Override
+ public void register(ServerSession session) {
+ }
+
+ @Override
+ public void unregister(ServerSession session) {
+ closeListener(session.id());
+ }
+
+ @Override
+ public void expire(ServerSession session) {
+ closeListener(session.id());
+ }
+
+ @Override
+ public void close(ServerSession session) {
+ closeListener(session.id());
+ }
+
+ private void closeListener(Long sessionId) {
+ Commit<? extends Listen> commit = listeners.remove(sessionId);
+ if (commit != null) {
+ commit.close();
+ }
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DefaultDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DefaultDocumentTree.java
index fdfe9dd..6589a98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DefaultDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DefaultDocumentTree.java
@@ -19,7 +19,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
@@ -30,6 +30,7 @@
import org.onosproject.store.service.Versioned;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
/**
@@ -41,10 +42,17 @@
private static final DocumentPath ROOT_PATH = DocumentPath.from("root");
private final DefaultDocumentTreeNode<V> root;
- private final AtomicInteger versionCounter = new AtomicInteger(0);
+ private final Supplier<Long> versionSupplier;
public DefaultDocumentTree() {
- root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, nextVersion(), null);
+ AtomicLong versionCounter = new AtomicLong(0);
+ versionSupplier = versionCounter::incrementAndGet;
+ root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, versionSupplier.get(), null);
+ }
+
+ public DefaultDocumentTree(Supplier<Long> versionSupplier) {
+ root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, versionSupplier.get(), null);
+ this.versionSupplier = versionSupplier;
}
@Override
@@ -74,7 +82,7 @@
checkRootModification(path);
DefaultDocumentTreeNode<V> node = getNode(path);
if (node != null) {
- return node.update(value, nextVersion());
+ return node.update(value, versionSupplier.get());
} else {
create(path, value);
return null;
@@ -93,7 +101,7 @@
if (parentNode == null) {
throw new IllegalDocumentModificationException();
}
- parentNode.addChild(simpleName(path), value, nextVersion());
+ parentNode.addChild(simpleName(path), value, versionSupplier.get());
return true;
}
@@ -159,10 +167,6 @@
return currentNode;
}
- private long nextVersion() {
- return versionCounter.incrementAndGet();
- }
-
private String simpleName(DocumentPath path) {
return path.pathElements().get(path.pathElements().size() - 1);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeUpdateResult.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeUpdateResult.java
new file mode 100644
index 0000000..52303b2
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/DocumentTreeUpdateResult.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Result of a document tree node update operation.
+ * <p>
+ * Both old and new values are accessible along with a status of update.
+ *
+ * @param <V> value type
+ */
+public class DocumentTreeUpdateResult<V> {
+
+ public enum Status {
+ /**
+ * Indicates a successful update.
+ */
+ OK,
+
+ /**
+ * Indicates a noop i.e. existing and new value are both same.
+ */
+ NOOP,
+
+ /**
+ * Indicates a failed update due to a write lock.
+ */
+ WRITE_LOCK,
+
+ /**
+ * Indicates a failed update due to a invalid path.
+ */
+ INVALID_PATH,
+
+ /**
+ * Indicates a failed update due to a illegal modification attempt.
+ */
+ ILLEGAL_MODIFICATION,
+ }
+
+ private final DocumentPath path;
+ private final Status status;
+ private final Versioned<V> oldValue;
+ private final Versioned<V> newValue;
+
+ public DocumentTreeUpdateResult(DocumentPath path,
+ Status status,
+ Versioned<V> newValue,
+ Versioned<V> oldValue) {
+ this.status = status;
+ this.path = path;
+ this.newValue = newValue;
+ this.oldValue = oldValue;
+ }
+
+ public static <V> DocumentTreeUpdateResult<V> invalidPath(DocumentPath path) {
+ return new DocumentTreeUpdateResult<>(path, Status.INVALID_PATH, null, null);
+ }
+
+ public static <V> DocumentTreeUpdateResult<V> illegalModification(DocumentPath path) {
+ return new DocumentTreeUpdateResult<>(path, Status.ILLEGAL_MODIFICATION, null, null);
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public DocumentPath path() {
+ return path;
+ }
+
+ public Versioned<V> oldValue() {
+ return oldValue;
+ }
+
+ public Versioned<V> newValue() {
+ return this.newValue;
+ }
+
+ public boolean updated() {
+ return status == Status.OK;
+ }
+
+ public boolean created() {
+ return updated() && oldValue == null;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("path", path)
+ .add("status", status)
+ .add("newValue", newValue)
+ .add("oldValue", oldValue)
+ .toString();
+ }
+}