AsyncLeaderElector APIs in support for mastership balancing
Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9995c4d..b8c5afa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -20,6 +20,7 @@
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +44,7 @@
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
+ public static final String CHANGE_SUBJECT = "changeEvents";
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
@@ -57,13 +59,13 @@
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
- client.onEvent("change", this::handleEvent);
+ client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
- private void handleEvent(Change<Leadership> change) {
- leadershipChangeListeners.forEach(l -> l.accept(change));
+ private void handleEvent(List<Change<Leadership>> changes) {
+ changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
@@ -82,6 +84,16 @@
}
@Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId));
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+ }
+
+ @Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
}