[ONOS-6594] Upgrade to Atomix 2.0.0

Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
index 3875e55..1730fe8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixDocumentTree.java
@@ -16,70 +16,65 @@
 
 package org.onosproject.store.primitives.resources.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
-import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Match;
 import org.onlab.util.Tools;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
-import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Get;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GetChildren;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.Update;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTreeEvent;
 import org.onosproject.store.service.DocumentTreeListener;
 import org.onosproject.store.service.IllegalDocumentModificationException;
 import org.onosproject.store.service.NoSuchDocumentPathException;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.util.concurrent.MoreExecutors;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.CLEAR;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.GET_CHILDREN;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeOperations.UPDATE;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.INVALID_PATH;
+import static org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status.OK;
 
 /**
  * Distributed resource providing the {@link AsyncDocumentTree} primitive.
  */
-@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
-public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
-    implements AsyncDocumentTree<byte[]> {
+public class AtomixDocumentTree extends AbstractRaftPrimitive implements AsyncDocumentTree<byte[]> {
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+            .register(KryoNamespaces.BASIC)
+            .register(AtomixDocumentTreeOperations.NAMESPACE)
+            .register(AtomixDocumentTreeEvents.NAMESPACE)
+            .build());
 
     private final Map<DocumentTreeListener<byte[]>, InternalListener> eventListeners = new HashMap<>();
-    public static final String CHANGE_SUBJECT = "changeEvents";
 
-    protected AtomixDocumentTree(CopycatClient client, Properties options) {
-        super(client, options);
-    }
-
-    @Override
-    public CompletableFuture<AtomixDocumentTree> open() {
-        return super.open().thenApply(result -> {
-            client.onStateChange(state -> {
-                if (state == CopycatClient.State.CONNECTED && isListening()) {
-                    client.submit(new Listen());
-                }
-            });
-            client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
-            return result;
+    public AtomixDocumentTree(RaftProxy proxy) {
+        super(proxy);
+        proxy.addStateChangeListener(state -> {
+            if (state == RaftProxy.State.CONNECTED && isListening()) {
+                proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen());
+            }
         });
-    }
-
-    @Override
-    public String name() {
-        return null;
+        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::processTreeUpdates);
     }
 
     @Override
@@ -89,7 +84,7 @@
 
     @Override
     public CompletableFuture<Void> destroy() {
-        return client.submit(new Clear());
+        return proxy.invoke(CLEAR);
     }
 
     @Override
@@ -99,17 +94,20 @@
 
     @Override
     public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
-        return client.submit(new GetChildren(checkNotNull(path)));
+        return proxy.invoke(GET_CHILDREN, SERIALIZER::encode, new GetChildren(checkNotNull(path)), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
-        return client.submit(new Get(checkNotNull(path)));
+        return proxy.invoke(GET, SERIALIZER::encode, new Get(checkNotNull(path)), SERIALIZER::decode);
     }
 
     @Override
     public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
-        return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()))
+        return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+                SERIALIZER::encode,
+                new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.any()),
+                SERIALIZER::decode)
                 .thenCompose(result -> {
                     if (result.status() == INVALID_PATH) {
                         return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -138,7 +136,7 @@
                 .thenCompose(status -> {
                     if (status == ILLEGAL_MODIFICATION) {
                         return createRecursive(path.parent(), null)
-                                    .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
+                                .thenCompose(r -> createInternal(path, value).thenApply(v -> true));
                     }
                     return CompletableFuture.completedFuture(status == OK);
                 });
@@ -146,19 +144,24 @@
 
     @Override
     public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
-        return client.submit(new Update(checkNotNull(path),
-                                        Optional.ofNullable(newValue),
-                                        Match.any(),
-                                        Match.ifValue(version)))
+        return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+                SERIALIZER::encode,
+                new Update(checkNotNull(path),
+                        Optional.ofNullable(newValue),
+                        Match.any(),
+                        Match.ifValue(version)), SERIALIZER::decode)
                 .thenApply(result -> result.updated());
     }
 
     @Override
     public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
-        return client.submit(new Update(checkNotNull(path),
-                                        Optional.ofNullable(newValue),
-                                        Match.ifValue(currentValue),
-                                        Match.any()))
+        return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+                SERIALIZER::encode,
+                new Update(checkNotNull(path),
+                        Optional.ofNullable(newValue),
+                        Match.ifValue(currentValue),
+                        Match.any()),
+                SERIALIZER::decode)
                 .thenCompose(result -> {
                     if (result.status() == INVALID_PATH) {
                         return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -175,7 +178,10 @@
         if (path.equals(DocumentPath.from("root"))) {
             return Tools.exceptionalFuture(new IllegalDocumentModificationException());
         }
-        return client.submit(new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()))
+        return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+                SERIALIZER::encode,
+                new Update(checkNotNull(path), null, Match.any(), Match.ifNotNull()),
+                SERIALIZER::decode)
                 .thenCompose(result -> {
                     if (result.status() == INVALID_PATH) {
                         return Tools.exceptionalFuture(new NoSuchDocumentPathException());
@@ -194,8 +200,8 @@
         InternalListener internalListener = new InternalListener(path, listener, MoreExecutors.directExecutor());
         // TODO: Support API that takes an executor
         if (!eventListeners.containsKey(listener)) {
-            return client.submit(new Listen(path))
-                         .thenRun(() -> eventListeners.put(listener, internalListener));
+            return proxy.invoke(ADD_LISTENER, SERIALIZER::encode, new Listen(path))
+                    .thenRun(() -> eventListeners.put(listener, internalListener));
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -205,14 +211,18 @@
         checkNotNull(listener);
         InternalListener internalListener = eventListeners.remove(listener);
         if  (internalListener != null && eventListeners.isEmpty()) {
-            return client.submit(new Unlisten(internalListener.path)).thenApply(v -> null);
+            return proxy.invoke(REMOVE_LISTENER, SERIALIZER::encode, new Unlisten(internalListener.path))
+                    .thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
 
     private CompletableFuture<DocumentTreeUpdateResult.Status> createInternal(DocumentPath path, byte[] value) {
-        return client.submit(new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()))
-                     .thenApply(result -> result.status());
+        return proxy.<Update, DocumentTreeUpdateResult<byte[]>>invoke(UPDATE,
+                SERIALIZER::encode,
+                new Update(checkNotNull(path), Optional.ofNullable(value), Match.any(), Match.ifNull()),
+                SERIALIZER::decode)
+                .thenApply(result -> result.status());
     }
 
     private boolean isListening() {
@@ -242,4 +252,4 @@
             }
         }
     }
-}
+}
\ No newline at end of file