[ONOS-6594] Upgrade to Atomix 2.0.0

Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 3e45091..825fa98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -15,73 +15,65 @@
  */
 package org.onosproject.store.primitives.resources.impl;
 
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
-import java.util.function.Function;
 
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
-import org.onosproject.store.service.AsyncLeaderElector;
-
-import com.google.common.collect.ImmutableSet;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Sets;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.Serializer;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
 
 /**
  * Distributed resource providing the {@link AsyncLeaderElector} primitive.
  */
-@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
-public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
-    implements AsyncLeaderElector {
-    private final Set<Consumer<Status>> statusChangeListeners =
-            Sets.newCopyOnWriteArraySet();
-    private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
-            Sets.newCopyOnWriteArraySet();
+public class AtomixLeaderElector extends AbstractRaftPrimitive implements AsyncLeaderElector {
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(AtomixLeaderElectorOperations.NAMESPACE)
+            .register(AtomixLeaderElectorEvents.NAMESPACE)
+            .build());
+
+    private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
     private final Consumer<Change<Leadership>> cacheUpdater;
     private final Consumer<Status> statusListener;
 
-    public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
     private final LoadingCache<String, CompletableFuture<Leadership>> cache;
 
-    Function<CopycatClient.State, Status> mapper = state -> {
-        switch (state) {
-            case CONNECTED:
-                return Status.ACTIVE;
-            case SUSPENDED:
-                return Status.SUSPENDED;
-            case CLOSED:
-                return Status.INACTIVE;
-            default:
-                throw new IllegalStateException("Unknown state " + state);
-        }
-    };
-
-    public AtomixLeaderElector(CopycatClient client, Properties properties) {
-        super(client, properties);
+    public AtomixLeaderElector(RaftProxy proxy) {
+        super(proxy);
         cache = CacheBuilder.newBuilder()
                 .maximumSize(1000)
-                .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+                .build(CacheLoader.from(topic -> proxy.invoke(
+                        GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode)));
 
         cacheUpdater = change -> {
             Leadership leadership = change.newValue();
@@ -93,7 +85,13 @@
             }
         };
         addStatusChangeListener(statusListener);
-        client.onStateChange(this::handleStateChange);
+
+        proxy.addStateChangeListener(state -> {
+            if (state == RaftProxy.State.CONNECTED && isListening()) {
+                proxy.invoke(ADD_LISTENER);
+            }
+        });
+        proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
     }
 
     @Override
@@ -102,24 +100,6 @@
         return removeChangeListener(cacheUpdater);
     }
 
-    @Override
-    public String name() {
-        return null;
-    }
-
-    @Override
-    public CompletableFuture<AtomixLeaderElector> open() {
-        return super.open().thenApply(result -> {
-            client.onStateChange(state -> {
-                if (state == CopycatClient.State.CONNECTED && isListening()) {
-                    client.submit(new Listen());
-                }
-            });
-            client.onEvent(CHANGE_SUBJECT, this::handleEvent);
-            return result;
-        });
-    }
-
     public CompletableFuture<AtomixLeaderElector> setupCache() {
         return addChangeListener(cacheUpdater).thenApply(v -> this);
     }
@@ -130,27 +110,32 @@
 
     @Override
     public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
-        return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode)
+                .whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> withdraw(String topic) {
-        return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic))
+                .whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
-        return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode)
+                .whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
-        return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+        return proxy.<Promote, Boolean>invoke(
+                PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode)
+                .whenComplete((r, e) -> cache.invalidate(topic));
     }
 
     @Override
     public CompletableFuture<Void> evict(NodeId nodeId) {
-        return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+        return proxy.invoke(EVICT, SERIALIZER::encode, new AtomixLeaderElectorOperations.Evict(nodeId));
     }
 
     @Override
@@ -165,17 +150,17 @@
 
     @Override
     public CompletableFuture<Map<String, Leadership>> getLeaderships() {
-        return client.submit(new GetAllLeaderships());
+        return proxy.invoke(GET_ALL_LEADERSHIPS, SERIALIZER::decode);
     }
 
     public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
-        return client.submit(new GetElectedTopics(nodeId));
+        return proxy.invoke(GET_ELECTED_TOPICS, SERIALIZER::encode, new GetElectedTopics(nodeId), SERIALIZER::decode);
     }
 
     @Override
     public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.isEmpty()) {
-            return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+            return proxy.invoke(ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(consumer));
         } else {
             leadershipChangeListeners.add(consumer);
             return CompletableFuture.completedFuture(null);
@@ -185,31 +170,12 @@
     @Override
     public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
         if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
-            return client.submit(new Unlisten()).thenApply(v -> null);
+            return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
         }
         return CompletableFuture.completedFuture(null);
     }
 
-    @Override
-    public void addStatusChangeListener(Consumer<Status> listener) {
-        statusChangeListeners.add(listener);
-    }
-
-    @Override
-    public void removeStatusChangeListener(Consumer<Status> listener) {
-        statusChangeListeners.remove(listener);
-    }
-
-    @Override
-    public Collection<Consumer<Status>> statusChangeListeners() {
-        return ImmutableSet.copyOf(statusChangeListeners);
-    }
-
     private boolean isListening() {
         return !leadershipChangeListeners.isEmpty();
     }
-
-    private void handleStateChange(CopycatClient.State state) {
-        statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
-    }
-}
+}
\ No newline at end of file