[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
new file mode 100644
index 0000000..6b7c550
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTreeService.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.resources.impl;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import io.atomix.protocols.raft.event.EventType;
+import io.atomix.protocols.raft.service.AbstractRaftService;
+import io.atomix.protocols.raft.service.Commit;
+import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.session.RaftSession;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
+import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Match;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.DocumentPath;
+import org.onosproject.store.service.DocumentTree;
+import org.onosproject.store.service.DocumentTreeEvent;
+import org.onosproject.store.service.DocumentTreeEvent.Type;
+import org.onosproject.store.service.IllegalDocumentModificationException;
+import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+
+/**
+ * State Machine for {@link AtomixDocumentTree} resource.
+ */
+public class AtomixDocumentTreeService extends AbstractRaftService {
+ private final Serializer serializer = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDocumentTreeOperations.NAMESPACE)
+ .register(AtomixDocumentTreeEvents.NAMESPACE)
+ .register(new com.esotericsoftware.kryo.Serializer<Listener>() {
+ @Override
+ public void write(Kryo kryo, Output output, Listener listener) {
+ output.writeLong(listener.session.sessionId().id());
+ kryo.writeObject(output, listener.path);
+ }
+
+ @Override
+ public Listener read(Kryo kryo, Input input, Class<Listener> type) {
+ return new Listener(getSessions().getSession(input.readLong()),
+ kryo.readObjectOrNull(input, DocumentPath.class));
+ }
+ }, Listener.class)
+ .register(Versioned.class)
+ .register(DocumentPath.class)
+ .register(new HashMap().keySet().getClass())
+ .register(TreeMap.class)
+ .register(SessionListenCommits.class)
+ .register(new com.esotericsoftware.kryo.Serializer<DefaultDocumentTree>() {
+ @Override
+ public void write(Kryo kryo, Output output, DefaultDocumentTree object) {
+ kryo.writeObject(output, object.root);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public DefaultDocumentTree read(Kryo kryo, Input input, Class<DefaultDocumentTree> type) {
+ return new DefaultDocumentTree(versionCounter::incrementAndGet,
+ kryo.readObject(input, DefaultDocumentTreeNode.class));
+ }
+ }, DefaultDocumentTree.class)
+ .register(DefaultDocumentTreeNode.class)
+ .build());
+
+ private Map<Long, SessionListenCommits> listeners = new HashMap<>();
+ private AtomicLong versionCounter = new AtomicLong(0);
+ private DocumentTree<byte[]> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
+
+ @Override
+ public void snapshot(SnapshotWriter writer) {
+ writer.writeLong(versionCounter.get());
+ writer.writeObject(listeners, serializer::encode);
+ writer.writeObject(docTree, serializer::encode);
+ }
+
+ @Override
+ public void install(SnapshotReader reader) {
+ versionCounter = new AtomicLong(reader.readLong());
+ listeners = reader.readObject(serializer::decode);
+ docTree = reader.readObject(serializer::decode);
+ }
+
+ @Override
+ protected void configure(RaftServiceExecutor executor) {
+ // Listeners
+ executor.register(ADD_LISTENER, serializer::decode, this::listen);
+ executor.register(REMOVE_LISTENER, serializer::decode, this::unlisten);
+ // queries
+ executor.register(GET, serializer::decode, this::get, serializer::encode);
+ executor.register(GET_CHILDREN, serializer::decode, this::getChildren, serializer::encode);
+ // commands
+ executor.register(UPDATE, serializer::decode, this::update, serializer::encode);
+ executor.register(CLEAR, this::clear);
+ }
+
+ protected void listen(Commit<? extends Listen> commit) {
+ Long sessionId = commit.session().sessionId().id();
+ listeners.computeIfAbsent(sessionId, k -> new SessionListenCommits())
+ .add(new Listener(commit.session(), commit.value().path()));
+ }
+
+ protected void unlisten(Commit<? extends Unlisten> commit) {
+ Long sessionId = commit.session().sessionId().id();
+ SessionListenCommits listenCommits = listeners.get(sessionId);
+ if (listenCommits != null) {
+ listenCommits.remove(commit);
+ }
+ }
+
+ protected Versioned<byte[]> get(Commit<? extends Get> commit) {
+ try {
+ Versioned<byte[]> value = docTree.get(commit.value().path());
+ return value == null ? null : value.map(node -> node == null ? null : node);
+ } catch (IllegalStateException e) {
+ return null;
+ }
+ }
+
+ protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
+ return docTree.getChildren(commit.value().path());
+ }
+
+ protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
+ DocumentTreeUpdateResult<byte[]> result = null;
+ DocumentPath path = commit.value().path();
+ boolean updated = false;
+ Versioned<byte[]> currentValue = docTree.get(path);
+ try {
+ Match<Long> versionMatch = commit.value().versionMatch();
+ Match<byte[]> valueMatch = commit.value().valueMatch();
+
+ if (versionMatch.matches(currentValue == null ? null : currentValue.version())
+ && valueMatch.matches(currentValue == null ? null : currentValue.value())) {
+ if (commit.value().value() == null) {
+ docTree.removeNode(path);
+ } else {
+ docTree.set(path, commit.value().value().orElse(null));
+ }
+ updated = true;
+ }
+ Versioned<byte[]> newValue = updated ? docTree.get(path) : currentValue;
+ Status updateStatus = updated
+ ? Status.OK : commit.value().value() == null ? Status.INVALID_PATH : Status.NOOP;
+ result = new DocumentTreeUpdateResult<>(path, updateStatus, newValue, currentValue);
+ } catch (IllegalDocumentModificationException e) {
+ result = DocumentTreeUpdateResult.illegalModification(path);
+ } catch (NoSuchDocumentPathException e) {
+ result = DocumentTreeUpdateResult.invalidPath(path);
+ } catch (Exception e) {
+ getLogger().error("Failed to apply {} to state machine", commit.value(), e);
+ throw Throwables.propagate(e);
+ }
+ notifyListeners(path, result);
+ return result;
+ }
+
+ protected void clear(Commit<Void> commit) {
+ Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
+ Map<String, Versioned<byte[]>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
+ toClearQueue.addAll(topLevelChildren.keySet()
+ .stream()
+ .map(name -> new DocumentPath(name, DocumentPath.from("root")))
+ .collect(Collectors.toList()));
+ while (!toClearQueue.isEmpty()) {
+ DocumentPath path = toClearQueue.remove();
+ Map<String, Versioned<byte[]>> children = docTree.getChildren(path);
+ if (children.size() == 0) {
+ docTree.removeNode(path);
+ } else {
+ children.keySet().forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
+ toClearQueue.add(path);
+ }
+ }
+ }
+
+ private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
+ if (result.status() != Status.OK) {
+ return;
+ }
+ DocumentTreeEvent<byte[]> event =
+ new DocumentTreeEvent<>(path,
+ result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
+ Optional.ofNullable(result.newValue()),
+ Optional.ofNullable(result.oldValue()));
+
+ listeners.values()
+ .stream()
+ .filter(l -> event.path().isDescendentOf(l.leastCommonAncestorPath()))
+ .forEach(listener -> listener.publish(CHANGE, Arrays.asList(event)));
+ }
+
+ @Override
+ public void onExpire(RaftSession session) {
+ closeListener(session.sessionId().id());
+ }
+
+ @Override
+ public void onClose(RaftSession session) {
+ closeListener(session.sessionId().id());
+ }
+
+ private void closeListener(Long sessionId) {
+ listeners.remove(sessionId);
+ }
+
+ private class SessionListenCommits {
+ private final List<Listener> listeners = Lists.newArrayList();
+ private DocumentPath leastCommonAncestorPath;
+
+ public void add(Listener listener) {
+ listeners.add(listener);
+ recomputeLeastCommonAncestor();
+ }
+
+ public void remove(Commit<? extends Unlisten> commit) {
+ // Remove the first listen commit with path matching path in unlisten commit
+ Iterator<Listener> iterator = listeners.iterator();
+ while (iterator.hasNext()) {
+ Listener listener = iterator.next();
+ if (listener.path().equals(commit.value().path())) {
+ iterator.remove();
+ }
+ }
+ recomputeLeastCommonAncestor();
+ }
+
+ public DocumentPath leastCommonAncestorPath() {
+ return leastCommonAncestorPath;
+ }
+
+ public <M> void publish(EventType topic, M message) {
+ listeners.stream().findAny().ifPresent(listener ->
+ listener.session().publish(topic, serializer::encode, message));
+ }
+
+ private void recomputeLeastCommonAncestor() {
+ this.leastCommonAncestorPath = DocumentPath.leastCommonAncestor(listeners.stream()
+ .map(Listener::path)
+ .collect(Collectors.toList()));
+ }
+ }
+
+ private static class Listener {
+ private final RaftSession session;
+ private final DocumentPath path;
+
+ public Listener(RaftSession session, DocumentPath path) {
+ this.session = session;
+ this.path = path;
+ }
+
+ public DocumentPath path() {
+ return path;
+ }
+
+ public RaftSession session() {
+ return session;
+ }
+ }
+}
\ No newline at end of file