[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 3875e55..1730fe8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -16,70 +16,65 @@
package org.onosproject.store.primitives.resources.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onlab.util.Tools;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
-import com.google.common.util.concurrent.MoreExecutors;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
/**
* Distributed resource providing the {@link AsyncDocumentTree} primitive.
*/
-@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
-public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
- implements AsyncDocumentTree<byte[]> {
+public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(AtomixDocumentTreeOperations.NAMESPACE)
+ .register(AtomixDocumentTreeEvents.NAMESPACE)
+ .build());
private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
- public static final String CHANGE_SUBJECT = "changeEvents";
- protected AtomixDocumentTree(CopycatClient client, Properties options) {
- super(client, options);
- }
-
- @Override
- public CompletableFuture<AtomixDocumentTree> open() {
- return super.open().thenApply(result -> {
- client.onStateChange(state -> {
- if (state == CopycatClient.State.CONNECTED && isListening()) {
- client.submit(new Listen());
- }
- });
- client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
- return result;
+ public AtomixDocumentTree(RaftProxy proxy) {
+ super(proxy);
+ proxy.addStateChangeListener(state -> {
+ if (state == RaftProxy.State.CONNECTED && isListening()) {
+ proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
+ }
});
- }
-
- @Override
- public String name() {
- return null;
+ proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
}
@Override
@@ -89,7 +84,7 @@
@Override
public CompletableFuture<Void> destroy() {
- return client.submit(new Clear());
+ return proxy.invoke(CLEAR);
}
@Override
@@ -99,17 +94,20 @@
@Override
public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
- return client.submit(new GetChildren(checkNotNull(path)));
+ return proxy.invoke(GET_CHILDREN, SERIALIZER::encode, new GetChildren(checkNotNull(path)), SERIALIZER::decode);
}
@Override
public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
- return client.submit(new Get(checkNotNull(path)));
+ return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
}
@Override
public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
- return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()))
+ return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+ SERIALIZER::encode,
+ new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
+ SERIALIZER::decode)
.thenCompose(result -> {
if (result.status() == INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -138,7 +136,7 @@
.thenCompose(status -> {
if (status == ILLEGAL_MODIFICATION) {
return createRecursive(path.parent(), null)
- .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
+ .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
}
return CompletableFuture.completedFuture(status == OK);
});
@@ -146,19 +144,24 @@
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
- return client.submit(new Update(checkNotNull(path),
- Optional.ofNullable(newValue),
- Match.any(),
- Match.ifValue(version)))
+ return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+ SERIALIZER::encode,
+ new Update(checkNotNull(path),
+ Optional.ofNullable(newValue),
+ Match.any(),
+ Match.ifValue(version)), SERIALIZER::decode)
.thenApply(result -> result.updated());
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
- return client.submit(new Update(checkNotNull(path),
- Optional.ofNullable(newValue),
- Match.ifValue(currentValue),
- Match.any()))
+ return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+ SERIALIZER::encode,
+ new Update(checkNotNull(path),
+ Optional.ofNullable(newValue),
+ Match.ifValue(currentValue),
+ Match.any()),
+ SERIALIZER::decode)
.thenCompose(result -> {
if (result.status() == INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -175,7 +178,10 @@
if (path.equals(DocumentPath.from("root"))) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
}
- return client.submit(new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()))
+ return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+ SERIALIZER::encode,
+ new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
+ SERIALIZER::decode)
.thenCompose(result -> {
if (result.status() == INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -194,8 +200,8 @@
InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
// TODO: Support API that takes an executor
if (!eventListeners.containsKey(listener)) {
- return client.submit(new Listen(path))
- .thenRun(() -> eventListeners.put(listener, internalListener));
+ return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
+ .thenRun(() -> eventListeners.put(listener, internalListener));
}
return CompletableFuture.completedFuture(null);
}
@@ -205,14 +211,18 @@
checkNotNull(listener);
InternalListener internalListener = eventListeners.remove(listener);
if (internalListener != null && eventListeners.isEmpty()) {
- return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
+ return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
+ .thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
- return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()))
- .thenApply(result -> result.status());
+ return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+ SERIALIZER::encode,
+ new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
+ SERIALIZER::decode)
+ .thenApply(result -> result.status());
}
private boolean isListening() {
@@ -242,4 +252,4 @@
}
}
}
-}
+}
\ No newline at end of file