[ONOS-6594] Upgrade to Atomix 2.0.0
Change-Id: I6534bca1c8570b4e017f682953b876da29146675
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 3e45091..825fa98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -15,73 +15,65 @@
*/
package org.onosproject.store.primitives.resources.impl;
-import io.atomix.copycat.client.CopycatClient;
-import io.atomix.resource.AbstractResource;
-import io.atomix.resource.ResourceTypeInfo;
-
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
-import java.util.function.Function;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.Change;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
-import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
-import org.onosproject.store.service.AsyncLeaderElector;
-
-import com.google.common.collect.ImmutableSet;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetElectedTopics;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GetLeadership;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Promote;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Run;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.Withdraw;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.Serializer;
+
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorEvents.CHANGE;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ADD_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.ANOINT;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.EVICT;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ALL_LEADERSHIPS;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_ELECTED_TOPICS;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.GET_LEADERSHIP;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.PROMOTE;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.REMOVE_LISTENER;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.RUN;
+import static org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorOperations.WITHDRAW;
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
-@ResourceTypeInfo(id = -152, factory = AtomixLeaderElectorFactory.class)
-public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
- implements AsyncLeaderElector {
- private final Set<Consumer<Status>> statusChangeListeners =
- Sets.newCopyOnWriteArraySet();
- private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
- Sets.newCopyOnWriteArraySet();
+public class AtomixLeaderElector extends AbstractRaftPrimitive implements AsyncLeaderElector {
+ private static final Serializer SERIALIZER = Serializer.using(KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(AtomixLeaderElectorOperations.NAMESPACE)
+ .register(AtomixLeaderElectorEvents.NAMESPACE)
+ .build());
+
+ private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = Sets.newCopyOnWriteArraySet();
private final Consumer<Change<Leadership>> cacheUpdater;
private final Consumer<Status> statusListener;
- public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private final LoadingCache<String, CompletableFuture<Leadership>> cache;
- Function<CopycatClient.State, Status> mapper = state -> {
- switch (state) {
- case CONNECTED:
- return Status.ACTIVE;
- case SUSPENDED:
- return Status.SUSPENDED;
- case CLOSED:
- return Status.INACTIVE;
- default:
- throw new IllegalStateException("Unknown state " + state);
- }
- };
-
- public AtomixLeaderElector(CopycatClient client, Properties properties) {
- super(client, properties);
+ public AtomixLeaderElector(RaftProxy proxy) {
+ super(proxy);
cache = CacheBuilder.newBuilder()
.maximumSize(1000)
- .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
+ .build(CacheLoader.from(topic -> proxy.invoke(
+ GET_LEADERSHIP, SERIALIZER::encode, new GetLeadership(topic), SERIALIZER::decode)));
cacheUpdater = change -> {
Leadership leadership = change.newValue();
@@ -93,7 +85,13 @@
}
};
addStatusChangeListener(statusListener);
- client.onStateChange(this::handleStateChange);
+
+ proxy.addStateChangeListener(state -> {
+ if (state == RaftProxy.State.CONNECTED && isListening()) {
+ proxy.invoke(ADD_LISTENER);
+ }
+ });
+ proxy.addEventListener(CHANGE, SERIALIZER::decode, this::handleEvent);
}
@Override
@@ -102,24 +100,6 @@
return removeChangeListener(cacheUpdater);
}
- @Override
- public String name() {
- return null;
- }
-
- @Override
- public CompletableFuture<AtomixLeaderElector> open() {
- return super.open().thenApply(result -> {
- client.onStateChange(state -> {
- if (state == CopycatClient.State.CONNECTED && isListening()) {
- client.submit(new Listen());
- }
- });
- client.onEvent(CHANGE_SUBJECT, this::handleEvent);
- return result;
- });
- }
-
public CompletableFuture<AtomixLeaderElector> setupCache() {
return addChangeListener(cacheUpdater).thenApply(v -> this);
}
@@ -130,27 +110,32 @@
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.<Run, Leadership>invoke(RUN, SERIALIZER::encode, new Run(topic, nodeId), SERIALIZER::decode)
+ .whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.invoke(WITHDRAW, SERIALIZER::encode, new Withdraw(topic))
+ .whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.<Anoint, Boolean>invoke(ANOINT, SERIALIZER::encode, new Anoint(topic, nodeId), SERIALIZER::decode)
+ .whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
+ return proxy.<Promote, Boolean>invoke(
+ PROMOTE, SERIALIZER::encode, new Promote(topic, nodeId), SERIALIZER::decode)
+ .whenComplete((r, e) -> cache.invalidate(topic));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
- return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+ return proxy.invoke(EVICT, SERIALIZER::encode, new AtomixLeaderElectorOperations.Evict(nodeId));
}
@Override
@@ -165,17 +150,17 @@
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return client.submit(new GetAllLeaderships());
+ return proxy.invoke(GET_ALL_LEADERSHIPS, SERIALIZER::decode);
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
- return client.submit(new GetElectedTopics(nodeId));
+ return proxy.invoke(GET_ELECTED_TOPICS, SERIALIZER::encode, new GetElectedTopics(nodeId), SERIALIZER::decode);
}
@Override
public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.isEmpty()) {
- return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+ return proxy.invoke(ADD_LISTENER).thenRun(() -> leadershipChangeListeners.add(consumer));
} else {
leadershipChangeListeners.add(consumer);
return CompletableFuture.completedFuture(null);
@@ -185,31 +170,12 @@
@Override
public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
- return client.submit(new Unlisten()).thenApply(v -> null);
+ return proxy.invoke(REMOVE_LISTENER).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
- @Override
- public void addStatusChangeListener(Consumer<Status> listener) {
- statusChangeListeners.add(listener);
- }
-
- @Override
- public void removeStatusChangeListener(Consumer<Status> listener) {
- statusChangeListeners.remove(listener);
- }
-
- @Override
- public Collection<Consumer<Status>> statusChangeListeners() {
- return ImmutableSet.copyOf(statusChangeListeners);
- }
-
private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
}
-
- private void handleStateChange(CopycatClient.State state) {
- statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
- }
-}
+}
\ No newline at end of file