AsyncLeaderElector APIs in support for mastership balancing
Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
index e364a7e..27c92ec 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncLeaderElector.java
@@ -70,6 +70,18 @@
}
@Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return getLeaderElector(topic).promote(topic, nodeId);
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return CompletableFuture.allOf(getLeaderElectors().stream()
+ .map(le -> le.evict(nodeId))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return getLeaderElector(topic).getLeadership(topic);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9995c4d..b8c5afa 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -20,6 +20,7 @@
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -43,6 +44,7 @@
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
+ public static final String CHANGE_SUBJECT = "changeEvents";
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
@@ -57,13 +59,13 @@
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
- client.onEvent("change", this::handleEvent);
+ client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
- private void handleEvent(Change<Leadership> change) {
- leadershipChangeListeners.forEach(l -> l.accept(change));
+ private void handleEvent(List<Change<Leadership>> changes) {
+ changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
@@ -82,6 +84,16 @@
}
@Override
+ public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId));
+ }
+
+ @Override
+ public CompletableFuture<Void> evict(NodeId nodeId) {
+ return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
+ }
+
+ @Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
index 235bd07..8efa234 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorCommands.java
@@ -346,6 +346,102 @@
}
/**
+ * Command for administratively promote a node as top candidate.
+ */
+ @SuppressWarnings("serial")
+ public static class Promote extends ElectionCommand<Boolean> {
+ private String topic;
+ private NodeId nodeId;
+
+ public Promote() {
+ }
+
+ public Promote(String topic, NodeId nodeId) {
+ this.topic = topic;
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Returns the topic.
+ *
+ * @return The topic
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * Returns the nodeId to make top candidate.
+ *
+ * @return The nodeId
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("topic", topic)
+ .add("nodeId", nodeId)
+ .toString();
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeString(topic);
+ buffer.writeString(nodeId.toString());
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ topic = buffer.readString();
+ nodeId = new NodeId(buffer.readString());
+ }
+ }
+
+ /**
+ * Command for administratively evicting a node from all leadership topics.
+ */
+ @SuppressWarnings("serial")
+ public static class Evict extends ElectionCommand<Void> {
+ private NodeId nodeId;
+
+ public Evict() {
+ }
+
+ public Evict(NodeId nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * Returns the node identifier.
+ *
+ * @return The nodeId
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("nodeId", nodeId)
+ .toString();
+ }
+
+ @Override
+ public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
+ buffer.writeString(nodeId.toString());
+ }
+
+ @Override
+ public void readObject(BufferInput<?> buffer, Serializer serializer) {
+ nodeId = new NodeId(buffer.readString());
+ }
+ }
+
+ /**
* Map command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
@@ -359,6 +455,8 @@
registry.register(GetLeadership.class, -866);
registry.register(Listen.class, -867);
registry.register(Unlisten.class, -868);
+ registry.register(Promote.class, -869);
+ registry.register(Evict.class, -870);
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 23dcf52..ef8c444 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -43,10 +43,12 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
@@ -86,6 +88,8 @@
executor.register(Run.class, this::run);
executor.register(Withdraw.class, this::withdraw);
executor.register(Anoint.class, this::anoint);
+ executor.register(Promote.class, this::promote);
+ executor.register(Evict.class, this::evict);
// Queries
executor.register(GetLeadership.class, this::leadership);
executor.register(GetAllLeaderships.class, this::allLeaderships);
@@ -93,8 +97,16 @@
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
- Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
- listeners.values().forEach(listener -> listener.session().publish("change", change));
+ notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
+ }
+
+ private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
+ if (changes.isEmpty()) {
+ return;
+ }
+ listeners.values()
+ .forEach(listener -> listener.session()
+ .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
}
@Override
@@ -206,6 +218,53 @@
}
/**
+ * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
+ * @param commit promote commit
+ * @return {@code true} if changes desired end state is achieved.
+ */
+ public boolean promote(Commit<? extends Promote> commit) {
+ try {
+ String topic = commit.operation().topic();
+ NodeId nodeId = commit.operation().nodeId();
+ Leadership oldLeadership = leadership(topic);
+ if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
+ return false;
+ }
+ elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ return true;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
+ * @param commit evict commit
+ */
+ public void evict(Commit<? extends Evict> commit) {
+ try {
+ List<Change<Leadership>> changes = Lists.newLinkedList();
+ NodeId nodeId = commit.operation().nodeId();
+ Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
+ topics.forEach(topic -> {
+ Leadership oldLeadership = leadership(topic);
+ elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ changes.add(new Change<>(oldLeadership, newLeadership));
+ }
+ });
+ notifyLeadershipChanges(changes);
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
* Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
* @param commit GetLeadership commit
* @return leader
@@ -362,6 +421,31 @@
}
}
+ public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
+ Optional<Registration> registration =
+ registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
+ if (registration.isPresent()) {
+ List<Registration> updatedRegistrations =
+ registrations.stream()
+ .filter(r -> !r.nodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ if (leader.nodeId().equals(nodeId)) {
+ if (updatedRegistrations.size() > 0) {
+ return new ElectionState(updatedRegistrations,
+ updatedRegistrations.get(0),
+ termCounter.get(),
+ System.currentTimeMillis());
+ } else {
+ return new ElectionState(updatedRegistrations, null, term, termStartTime);
+ }
+ } else {
+ return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+ }
+ } else {
+ return this;
+ }
+ }
+
public boolean isDuplicate(Registration registration) {
return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
}
@@ -406,6 +490,23 @@
return this;
}
}
+
+ public ElectionState promote(NodeId nodeId) {
+ Registration registration = registrations.stream()
+ .filter(r -> r.nodeId().equals(nodeId))
+ .findFirst()
+ .orElse(null);
+ List<Registration> updatedRegistrations = Lists.newLinkedList();
+ updatedRegistrations.add(registration);
+ registrations.stream()
+ .filter(r -> !r.nodeId().equals(nodeId))
+ .forEach(updatedRegistrations::add);
+ return new ElectionState(updatedRegistrations,
+ leader,
+ term,
+ termStartTime);
+
+ }
}
@Override
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 1990534..2b1f56b 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -51,10 +51,10 @@
public void testRun() throws Throwable {
leaderElectorRunTests(1);
clearTests();
-// leaderElectorRunTests(2);
-// clearTests();
-// leaderElectorRunTests(3);
-// clearTests();
+ leaderElectorRunTests(2);
+ clearTests();
+ leaderElectorRunTests(3);
+ clearTests();
}
private void leaderElectorRunTests(int numServers) throws Throwable {
@@ -183,6 +183,63 @@
}
@Test
+ public void testPromote() throws Throwable {
+ leaderElectorPromoteTests(1);
+ clearTests();
+ leaderElectorPromoteTests(2);
+ clearTests();
+ leaderElectorPromoteTests(3);
+ clearTests();
+ }
+
+ private void leaderElectorPromoteTests(int numServers) throws Throwable {
+ createCopycatServers(numServers);
+ Atomix client1 = createAtomixClient();
+ AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client2 = createAtomixClient();
+ AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
+ Atomix client3 = createAtomixClient();
+ AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
+ elector1.run("foo", node1).join();
+ elector2.run("foo", node2).join();
+
+ LeaderEventListener listener1 = new LeaderEventListener();
+ elector1.addChangeListener(listener1).join();
+ LeaderEventListener listener2 = new LeaderEventListener();
+ elector2.addChangeListener(listener2).join();
+ LeaderEventListener listener3 = new LeaderEventListener();
+ elector3.addChangeListener(listener3).join();
+
+ elector3.promote("foo", node3).thenAccept(result -> {
+ assertFalse(result);
+ }).join();
+
+ assertFalse(listener1.hasEvent());
+ assertFalse(listener2.hasEvent());
+ assertFalse(listener3.hasEvent());
+
+ elector3.run("foo", node3).join();
+
+ listener1.clearEvents();
+ listener2.clearEvents();
+ listener3.clearEvents();
+
+ elector3.promote("foo", node3).thenAccept(result -> {
+ assertTrue(result);
+ }).join();
+
+ listener1.nextEvent().thenAccept(result -> {
+ assertEquals(node3, result.newValue().candidates().get(0));
+ }).join();
+ listener2.nextEvent().thenAccept(result -> {
+ assertEquals(node3, result.newValue().candidates().get(0));
+ }).join();
+ listener3.nextEvent().thenAccept(result -> {
+ assertEquals(node3, result.newValue().candidates().get(0));
+ }).join();
+ }
+
+ @Test
public void testLeaderSessionClose() throws Throwable {
leaderElectorLeaderSessionCloseTests(1);
clearTests();
@@ -325,6 +382,10 @@
return !eventQueue.isEmpty();
}
+ public void clearEvents() {
+ eventQueue.clear();
+ }
+
public CompletableFuture<Change<Leadership>> nextEvent() {
synchronized (this) {
if (eventQueue.isEmpty()) {