Code clean up in ConsistentMap and LeaderElector resources
Change-Id: I1834188393f19e37394c32047538e6027522a13d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index ce8c582..389f026 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -29,6 +29,15 @@
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
import org.onosproject.store.service.AsyncLeaderElector;
import com.google.common.collect.Sets;
@@ -70,22 +79,22 @@
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.Run(topic, nodeId));
+ return submit(new Run(topic, nodeId));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
- return submit(new AtomixLeaderElectorCommands.Withdraw(topic));
+ return submit(new Withdraw(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.Anoint(topic, nodeId));
+ return submit(new Anoint(topic, nodeId));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId));
+ return submit(new Promote(topic, nodeId));
}
@Override
@@ -95,69 +104,32 @@
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
- return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
+ return submit(new GetLeadership(topic));
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
- return submit(new AtomixLeaderElectorCommands.GetAllLeaderships());
+ return submit(new GetAllLeaderships());
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
- return submit(new AtomixLeaderElectorCommands.GetElectedTopics(nodeId));
+ return submit(new GetElectedTopics(nodeId));
}
- /**
- * Leadership change listener context.
- */
- private final class LeadershipChangeListener implements Listener<Change<Leadership>> {
- private final Consumer<Change<Leadership>> listener;
-
- private LeadershipChangeListener(Consumer<Change<Leadership>> listener) {
- this.listener = listener;
- }
-
- @Override
- public void accept(Change<Leadership> change) {
- listener.accept(change);
- }
-
- @Override
- public void close() {
- synchronized (AtomixLeaderElector.this) {
- submit(new AtomixLeaderElectorCommands.Unlisten());
- }
+ @Override
+ public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
+ if (leadershipChangeListeners.isEmpty()) {
+ return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
+ } else {
+ leadershipChangeListeners.add(consumer);
+ return CompletableFuture.completedFuture(null);
}
}
@Override
- public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
- leadershipChangeListeners.add(consumer);
- return setupListener();
- }
-
- @Override
- public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
- leadershipChangeListeners.remove(consumer);
- return teardownListener();
- }
-
- private CompletableFuture<Void> setupListener() {
- if (listener == null && !leadershipChangeListeners.isEmpty()) {
- Consumer<Change<Leadership>> changeConsumer = change -> {
- leadershipChangeListeners.forEach(consumer -> consumer.accept(change));
- };
- return submit(new AtomixLeaderElectorCommands.Listen())
- .thenAccept(v -> listener = new LeadershipChangeListener(changeConsumer));
- }
- return CompletableFuture.completedFuture(null);
- }
-
- private CompletableFuture<Void> teardownListener() {
- if (listener != null && leadershipChangeListeners.isEmpty()) {
- listener.close();
- listener = null;
- return submit(new AtomixLeaderElectorCommands.Unlisten());
+ public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
+ if (leadershipChangeListeners.remove(listener) && leadershipChangeListeners.isEmpty()) {
+ return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}