Revert "Automatically balance leaders on failover in LeaderElector state machine."
This reverts commit 2135e037437d9f68eda7bbd569b91e90ac74136d.
Change-Id: I20ff709c13dd5f3998ab1c77d53188c452baaa3c
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
index 865ec43..b259cd1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorService.java
@@ -30,7 +30,6 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
-import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -100,7 +99,6 @@
}
termCounters = reader.readObject(SERIALIZER::decode);
elections = reader.readObject(SERIALIZER::decode);
- elections.values().forEach(e -> e.elections = elections);
logger().debug("Reinstated state machine from snapshot");
}
@@ -152,7 +150,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.Run} commit.
- *
* @param commit commit entry
* @return topic leader. If no previous leader existed this is the node that just entered the race.
*/
@@ -163,11 +160,10 @@
Registration registration = new Registration(commit.value().nodeId(), commit.session().sessionId().id());
elections.compute(topic, (k, v) -> {
if (v == null) {
- return new ElectionState(registration, termCounter(topic)::incrementAndGet, elections);
+ return new ElectionState(registration, termCounter(topic)::incrementAndGet);
} else {
if (!v.isDuplicate(registration)) {
- return new ElectionState(v).addRegistration(
- topic, registration, termCounter(topic)::incrementAndGet);
+ return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
} else {
return v;
}
@@ -187,15 +183,14 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.Withdraw} commit.
- *
* @param commit withdraw commit
*/
public void withdraw(Commit<? extends Withdraw> commit) {
try {
String topic = commit.value().topic();
Leadership oldLeadership = leadership(topic);
- elections.computeIfPresent(topic, (k, v) -> v.cleanup(
- topic, commit.session(), termCounter(topic)::incrementAndGet));
+ elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
+ termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
@@ -208,7 +203,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.Anoint} commit.
- *
* @param commit anoint commit
* @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
*/
@@ -234,7 +228,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.Promote} commit.
- *
* @param commit promote commit
* @return {@code true} if changes desired end state is achieved.
*/
@@ -260,7 +253,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.Evict} commit.
- *
* @param commit evict commit
*/
public void evict(Commit<? extends Evict> commit) {
@@ -285,7 +277,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.GetLeadership} commit.
- *
* @param commit GetLeadership commit
* @return leader
*/
@@ -301,7 +292,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations.GetElectedTopics} commit.
- *
* @param commit commit entry
* @return set of topics for which the node is the leader
*/
@@ -320,7 +310,6 @@
/**
* Applies an {@link AtomixLeaderElectorOperations#GET_ALL_LEADERSHIPS} commit.
- *
* @param commit GetAllLeaderships commit
* @return topic to leader mapping
*/
@@ -357,7 +346,7 @@
List<Change<Leadership>> changes = Lists.newArrayList();
topics.forEach(topic -> {
Leadership oldLeadership = leadership(topic);
- elections.compute(topic, (k, v) -> v.cleanup(topic, session, termCounter(topic)::incrementAndGet));
+ elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
changes.add(new Change<>(oldLeadership, newLeadership));
@@ -392,20 +381,17 @@
}
}
- private class ElectionState {
+ private static class ElectionState {
final Registration leader;
final long term;
final long termStartTime;
final List<Registration> registrations;
- transient Map<String, ElectionState> elections;
- public ElectionState(Registration registration, Supplier<Long> termCounter,
- Map<String, ElectionState> elections) {
+ public ElectionState(Registration registration, Supplier<Long> termCounter) {
registrations = Arrays.asList(registration);
term = termCounter.get();
termStartTime = System.currentTimeMillis();
leader = registration;
- this.elections = elections;
}
public ElectionState(ElectionState other) {
@@ -413,38 +399,19 @@
leader = other.leader;
term = other.term;
termStartTime = other.termStartTime;
- elections = other.elections;
}
public ElectionState(List<Registration> registrations,
Registration leader,
long term,
- long termStartTime,
- Map<String, ElectionState> elections) {
+ long termStartTime) {
this.registrations = Lists.newArrayList(registrations);
this.leader = leader;
this.term = term;
this.termStartTime = termStartTime;
- this.elections = elections;
}
- private void sortRegistrations(String topic, List<Registration> registrations) {
- registrations.sort((a, b) -> ComparisonChain.start()
- .compare(countLeaders(topic, a), countLeaders(topic, b))
- .compare(a.sessionId, b.sessionId)
- .result());
- }
-
- private long countLeaders(String topic, Registration registration) {
- return elections.entrySet().stream()
- .filter(entry -> !entry.getKey().equals(topic))
- .filter(entry -> entry.getValue().leader != null)
- .filter(entry -> entry.getValue().leader.nodeId.id()
- .equals(sessions().getSession(registration.sessionId).memberId().id()))
- .count();
- }
-
- public ElectionState cleanup(String topic, RaftSession session, Supplier<Long> termCounter) {
+ public ElectionState cleanup(RaftSession session, Supplier<Long> termCounter) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.sessionId().id()).findFirst();
if (registration.isPresent()) {
@@ -454,17 +421,15 @@
.collect(Collectors.toList());
if (leader.sessionId() == session.sessionId().id()) {
if (!updatedRegistrations.isEmpty()) {
- sortRegistrations(topic, updatedRegistrations);
return new ElectionState(updatedRegistrations,
updatedRegistrations.get(0),
termCounter.get(),
- System.currentTimeMillis(),
- elections);
+ System.currentTimeMillis());
} else {
- return new ElectionState(updatedRegistrations, null, term, termStartTime, elections);
+ return new ElectionState(updatedRegistrations, null, term, termStartTime);
}
} else {
- return new ElectionState(updatedRegistrations, leader, term, termStartTime, elections);
+ return new ElectionState(updatedRegistrations, leader, term, termStartTime);
}
} else {
return this;
@@ -484,13 +449,12 @@
return new ElectionState(updatedRegistrations,
updatedRegistrations.get(0),
termCounter.get(),
- System.currentTimeMillis(),
- elections);
+ System.currentTimeMillis());
} else {
- return new ElectionState(updatedRegistrations, null, term, termStartTime, elections);
+ return new ElectionState(updatedRegistrations, null, term, termStartTime);
}
} else {
- return new ElectionState(updatedRegistrations, leader, term, termStartTime, elections);
+ return new ElectionState(updatedRegistrations, leader, term, termStartTime);
}
} else {
return this;
@@ -514,25 +478,15 @@
return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
}
- public ElectionState addRegistration(String topic, Registration registration, Supplier<Long> termCounter) {
+ public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
List<Registration> updatedRegistrations = new LinkedList<>(registrations);
updatedRegistrations.add(registration);
- sortRegistrations(topic, updatedRegistrations);
- Registration firstRegistration = updatedRegistrations.get(0);
- Registration leader = this.leader;
- long term = this.term;
- long termStartTime = this.termStartTime;
- if (leader == null || !leader.equals(firstRegistration)) {
- leader = firstRegistration;
- term = termCounter.get();
- termStartTime = System.currentTimeMillis();
- }
+ boolean newLeader = leader == null;
return new ElectionState(updatedRegistrations,
- leader,
- term,
- termStartTime,
- elections);
+ newLeader ? registration : leader,
+ newLeader ? termCounter.get() : term,
+ newLeader ? System.currentTimeMillis() : termStartTime);
}
return this;
}
@@ -546,8 +500,7 @@
return new ElectionState(registrations,
newLeader,
termCounter.incrementAndGet(),
- System.currentTimeMillis(),
- elections);
+ System.currentTimeMillis());
} else {
return this;
}
@@ -566,8 +519,7 @@
return new ElectionState(updatedRegistrations,
leader,
term,
- termStartTime,
- elections);
+ termStartTime);
}
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
index 3eda617..b0ec22d 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorTest.java
@@ -36,9 +36,9 @@
*/
public class AtomixLeaderElectorTest extends AtomixTestBase<AtomixLeaderElector> {
- NodeId node1 = new NodeId("4");
- NodeId node2 = new NodeId("5");
- NodeId node3 = new NodeId("6");
+ NodeId node1 = new NodeId("node1");
+ NodeId node2 = new NodeId("node2");
+ NodeId node3 = new NodeId("node3");
@Override
protected RaftService createService() {
@@ -64,20 +64,13 @@
assertEquals(node1, result.candidates().get(0));
}).join();
- elector1.run("bar", node1).thenAccept(result -> {
+ AtomixLeaderElector elector2 = newPrimitive("test-elector-run");
+ elector2.run("foo", node2).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
- assertEquals(1, result.candidates().size());
- assertEquals(node1, result.candidates().get(0));
- }).join();
-
- AtomixLeaderElector elector2 = newPrimitive("test-elector-run");
- elector2.run("bar", node2).thenAccept(result -> {
- assertEquals(node2, result.leaderNodeId());
- assertEquals(2, result.leader().term());
assertEquals(2, result.candidates().size());
- assertEquals(node2, result.candidates().get(0));
- assertEquals(node1, result.candidates().get(1));
+ assertEquals(node1, result.candidates().get(0));
+ assertEquals(node2, result.candidates().get(1));
}).join();
}
@@ -268,50 +261,6 @@
}
@Test
- public void testLeaderBalance() throws Throwable {
- AtomixLeaderElector elector1 = newPrimitive("test-elector-leader-session-close");
- elector1.run("foo", node1).join();
- elector1.run("bar", node1).join();
- elector1.run("baz", node1).join();
-
- AtomixLeaderElector elector2 = newPrimitive("test-elector-leader-session-close");
- elector2.run("foo", node2).join();
- elector2.run("bar", node2).join();
- elector2.run("baz", node2).join();
-
- AtomixLeaderElector elector3 = newPrimitive("test-elector-leader-session-close");
- elector3.run("foo", node3).join();
- elector3.run("bar", node3).join();
- elector3.run("baz", node3).join();
-
- LeaderEventListener listener = new LeaderEventListener();
- elector2.addChangeListener(listener).join();
-
- elector1.proxy.close();
-
- listener.nextEvent().thenAccept(result -> {
- assertEquals(node3, result.newValue().leaderNodeId());
- assertEquals(2, result.newValue().candidates().size());
- assertEquals(node3, result.newValue().candidates().get(0));
- assertEquals(node2, result.newValue().candidates().get(1));
- }).join();
-
- listener.nextEvent().thenAccept(result -> {
- assertEquals(node2, result.newValue().leaderNodeId());
- assertEquals(2, result.newValue().candidates().size());
- assertEquals(node2, result.newValue().candidates().get(0));
- assertEquals(node3, result.newValue().candidates().get(1));
- });
-
- listener.nextEvent().thenAccept(result -> {
- assertEquals(node2, result.newValue().leaderNodeId());
- assertEquals(2, result.newValue().candidates().size());
- assertEquals(node2, result.newValue().candidates().get(0));
- assertEquals(node3, result.newValue().candidates().get(1));
- }).join();
- }
-
- @Test
public void testQueries() throws Throwable {
leaderElectorQueryTests();
}