AsyncLeaderElector APIs in support for mastership balancing
Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
index 23dcf52..ef8c444 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElectorState.java
@@ -43,10 +43,12 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
+import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
@@ -86,6 +88,8 @@
executor.register(Run.class, this::run);
executor.register(Withdraw.class, this::withdraw);
executor.register(Anoint.class, this::anoint);
+ executor.register(Promote.class, this::promote);
+ executor.register(Evict.class, this::evict);
// Queries
executor.register(GetLeadership.class, this::leadership);
executor.register(GetAllLeaderships.class, this::allLeaderships);
@@ -93,8 +97,16 @@
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
- Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
- listeners.values().forEach(listener -> listener.session().publish("change", change));
+ notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
+ }
+
+ private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
+ if (changes.isEmpty()) {
+ return;
+ }
+ listeners.values()
+ .forEach(listener -> listener.session()
+ .publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
}
@Override
@@ -206,6 +218,53 @@
}
/**
+ * Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
+ * @param commit promote commit
+ * @return {@code true} if changes desired end state is achieved.
+ */
+ public boolean promote(Commit<? extends Promote> commit) {
+ try {
+ String topic = commit.operation().topic();
+ NodeId nodeId = commit.operation().nodeId();
+ Leadership oldLeadership = leadership(topic);
+ if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
+ return false;
+ }
+ elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ notifyLeadershipChange(oldLeadership, newLeadership);
+ }
+ return true;
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
+ * Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
+ * @param commit evict commit
+ */
+ public void evict(Commit<? extends Evict> commit) {
+ try {
+ List<Change<Leadership>> changes = Lists.newLinkedList();
+ NodeId nodeId = commit.operation().nodeId();
+ Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
+ topics.forEach(topic -> {
+ Leadership oldLeadership = leadership(topic);
+ elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
+ Leadership newLeadership = leadership(topic);
+ if (!Objects.equal(oldLeadership, newLeadership)) {
+ changes.add(new Change<>(oldLeadership, newLeadership));
+ }
+ });
+ notifyLeadershipChanges(changes);
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
* Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
* @param commit GetLeadership commit
* @return leader
@@ -362,6 +421,31 @@
}
}
+ public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
+ Optional<Registration> registration =
+ registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
+ if (registration.isPresent()) {
+ List<Registration> updatedRegistrations =
+ registrations.stream()
+ .filter(r -> !r.nodeId().equals(nodeId))
+ .collect(Collectors.toList());
+ if (leader.nodeId().equals(nodeId)) {
+ if (updatedRegistrations.size() > 0) {
+ return new ElectionState(updatedRegistrations,
+ updatedRegistrations.get(0),
+ termCounter.get(),
+ System.currentTimeMillis());
+ } else {
+ return new ElectionState(updatedRegistrations, null, term, termStartTime);
+ }
+ } else {
+ return new ElectionState(updatedRegistrations, leader, term, termStartTime);
+ }
+ } else {
+ return this;
+ }
+ }
+
public boolean isDuplicate(Registration registration) {
return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
}
@@ -406,6 +490,23 @@
return this;
}
}
+
+ public ElectionState promote(NodeId nodeId) {
+ Registration registration = registrations.stream()
+ .filter(r -> r.nodeId().equals(nodeId))
+ .findFirst()
+ .orElse(null);
+ List<Registration> updatedRegistrations = Lists.newLinkedList();
+ updatedRegistrations.add(registration);
+ registrations.stream()
+ .filter(r -> !r.nodeId().equals(nodeId))
+ .forEach(updatedRegistrations::add);
+ return new ElectionState(updatedRegistrations,
+ leader,
+ term,
+ termStartTime);
+
+ }
}
@Override