LeadershipStore updates:
- Now tracking leader and candidates for a topic using a single map.
- Using term numbers that are incremented by one every time a new leader is elected.
- Introduced a separate LeadershipStore to conform to the manager-store pattern
Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
diff --git a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
index a3cf7b3..a6f3d1c 100644
--- a/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
+++ b/apps/cip/src/main/java/org/onosproject/cip/ClusterIpManager.java
@@ -16,6 +16,7 @@
package org.onosproject.cip;
import com.google.common.io.ByteStreams;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -93,7 +94,7 @@
cfgService.registerProperties(getClass());
localId = clusterService.getLocalNode().id();
- processLeadershipChange(leadershipService.getLeader(CLUSTER_IP));
+ processLeaderChange(leadershipService.getLeader(CLUSTER_IP));
leadershipService.addListener(listener);
leadershipService.runForLeadership(CLUSTER_IP);
@@ -137,10 +138,7 @@
}
}
- private synchronized void processLeadershipChange(NodeId newLeader) {
- if (newLeader == null) {
- return;
- }
+ private synchronized void processLeaderChange(NodeId newLeader) {
boolean isLeader = Objects.equals(newLeader, localId);
log.info("Processing leadership change; wasLeader={}, isLeader={}", wasLeader, isLeader);
if (!wasLeader && isLeader) {
@@ -189,11 +187,15 @@
// Listens for leadership changes.
private class InternalLeadershipListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ return CLUSTER_IP.equals(event.subject().topic());
+ }
+
@Override
public void event(LeadershipEvent event) {
- if (event.subject().topic().equals(CLUSTER_IP)) {
- processLeadershipChange(event.subject().leader());
- }
+ processLeaderChange(event.subject().leaderNodeId());
}
}
diff --git a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
index bcf4e2e..b2a33a9 100644
--- a/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
+++ b/apps/mlb/src/main/java/org/onosproject/mlb/MastershipLoadBalancer.java
@@ -19,6 +19,7 @@
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -105,10 +106,7 @@
log.info("Stopped");
}
- private synchronized void processLeadershipChange(NodeId newLeader) {
- if (newLeader == null) {
- return;
- }
+ private synchronized void processLeaderChange(NodeId newLeader) {
boolean currLeader = newLeader.equals(localId);
if (isLeader.getAndSet(currLeader) != currLeader) {
if (currLeader) {
@@ -159,7 +157,7 @@
@Override
public void event(LeadershipEvent event) {
- processLeadershipChange(event.subject().leader());
+ processLeaderChange(event.subject().leaderNodeId());
}
}
-}
\ No newline at end of file
+}
diff --git a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
index 22c7d43..1811049 100644
--- a/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
+++ b/apps/routing/src/main/java/org/onosproject/routing/impl/IntentSynchronizer.java
@@ -26,6 +26,7 @@
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.intent.Intent;
@@ -43,7 +44,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -74,6 +74,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
+ private NodeId localNodeId;
private ApplicationId appId;
private final InternalLeadershipListener leadershipEventListener =
@@ -89,7 +90,7 @@
@Activate
public void activate() {
intentsSynchronizerExecutor = createExecutor();
-
+ this.localNodeId = clusterService.getLocalNode().id();
this.appId = coreService.registerApplication(APP_NAME);
leadershipService.addListener(leadershipEventListener);
@@ -268,27 +269,22 @@
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
- public void event(LeadershipEvent event) {
- if (!event.subject().topic().equals(appId.name())) {
- // Not our topic: ignore
- return;
- }
- if (!Objects.equals(event.subject().leader(),
- clusterService.getLocalNode().id())) {
- // The event is not about this instance: ignore
- return;
- }
+ public boolean isRelevant(LeadershipEvent event) {
+ return event.subject().topic().equals(appId.name());
+ }
+ @Override
+ public void event(LeadershipEvent event) {
switch (event.type()) {
- case LEADER_ELECTED:
- log.info("IntentSynchronizer gained leadership");
- leaderChanged(true);
- break;
- case LEADER_BOOTED:
- log.info("IntentSynchronizer lost leadership");
- leaderChanged(false);
- break;
- case LEADER_REELECTED:
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ if (localNodeId.equals(event.subject().leaderNodeId())) {
+ log.info("IntentSynchronizer gained leadership");
+ leaderChanged(true);
+ } else {
+ log.info("IntentSynchronizer leader changed. New leader is {}", event.subject().leaderNodeId());
+ leaderChanged(false);
+ }
default:
break;
}
diff --git a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
index d265323..351a099 100644
--- a/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
+++ b/apps/routing/src/test/java/org/onosproject/routing/impl/IntentSynchronizerTest.java
@@ -17,6 +17,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
+
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
@@ -29,7 +30,11 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.incubator.net.intf.Interface;
@@ -94,6 +99,9 @@
private static final ApplicationId APPID =
TestApplicationId.create("intent-sync-test");
+ private static final ControllerNode LOCAL_NODE =
+ new DefaultControllerNode(new NodeId("foo"), IpAddress.valueOf("127.0.0.1"));
+
@Before
public void setUp() throws Exception {
super.setUp();
@@ -105,6 +113,7 @@
intentSynchronizer = new TestIntentSynchronizer();
intentSynchronizer.coreService = new TestCoreService();
+ intentSynchronizer.clusterService = new TestClusterService();
intentSynchronizer.leadershipService = new TestLeadershipService();
intentSynchronizer.intentService = intentService;
@@ -441,6 +450,13 @@
}
}
+ private class TestClusterService extends ClusterServiceAdapter {
+ @Override
+ public ControllerNode getLocalNode() {
+ return LOCAL_NODE;
+ }
+ }
+
private class TestLeadershipService extends LeadershipServiceAdapter {
}
diff --git a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
index b2a7207..e0d3b27 100644
--- a/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
+++ b/apps/test/election/src/main/java/org/onosproject/election/ElectionTest.java
@@ -24,12 +24,11 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.CoreService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
-
import org.slf4j.Logger;
@@ -56,7 +55,7 @@
private LeadershipEventListener leadershipEventListener =
new InnerLeadershipEventListener();
- private ControllerNode localControllerNode;
+ private NodeId localNodeId;
@Activate
@@ -65,7 +64,7 @@
appId = coreService.registerApplication(ELECTION_APP);
- localControllerNode = clusterService.getLocalNode();
+ localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
}
@@ -100,20 +99,10 @@
log.debug("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
- if (!event.subject().leader().equals(
- localControllerNode.id())) {
- return; // The event is not about this instance: ignore
- }
-
switch (event.type()) {
- case LEADER_ELECTED:
- log.info("Election-test app leader elected");
- break;
- case LEADER_BOOTED:
- log.info("Election-test app lost election");
- break;
- case LEADER_REELECTED:
- log.debug("Election-test app was re-elected");
+ case LEADER_CHANGED:
+ case LEADER_AND_CANDIDATES_CHANGED:
+ log.info("Election-test app leader changed. New leadership: {}", event.subject());
break;
default:
break;
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index da4ab13..6e2901d 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -40,7 +40,7 @@
description = "Finds the leader for particular topic.")
public class LeaderCommand extends AbstractShellCommand {
- private static final String FMT = "%-30s | %-15s | %-6s | %-10s |";
+ private static final String FMT = "%-30s | %-15s | %-5s | %-10s |";
private static final String FMT_C = "%-30s | %-15s | %-19s |";
private boolean allTopics;
private Pattern pattern;
@@ -57,19 +57,8 @@
/**
* Compares leaders, sorting by toString() output.
*/
- private Comparator<Leadership> leadershipComparator =
- (e1, e2) -> {
- if (e1.leader() == null && e2.leader() == null) {
- return 0;
- }
- if (e1.leader() == null) {
- return 1;
- }
- if (e2.leader() == null) {
- return -1;
- }
- return e1.leader().toString().compareTo(e2.leader().toString());
- };
+ private Comparator<Leadership> leadershipComparator = (l1, l2) ->
+ String.valueOf(l1.leaderNodeId()).compareTo(String.valueOf(l2.leaderNodeId()));
/**
* Displays text representing the leaders.
@@ -78,18 +67,19 @@
*/
private void displayLeaders(Map<String, Leadership> leaderBoard) {
print("------------------------------------------------------------------------");
- print(FMT, "Topic", "Leader", "Epoch", "Elected");
+ print(FMT, "Topic", "Leader", "Term", "Elected");
print("------------------------------------------------------------------------");
leaderBoard.values()
.stream()
.filter(l -> allTopics || pattern.matcher(l.topic()).matches())
+ .filter(l -> l.leader() != null)
.sorted(leadershipComparator)
.forEach(l -> print(FMT,
l.topic(),
- l.leader(),
- l.epoch(),
- Tools.timeAgo(l.electedTime())));
+ l.leaderNodeId(),
+ l.leader().term(),
+ Tools.timeAgo(l.leader().termStartTime())));
print("------------------------------------------------------------------------");
}
@@ -110,7 +100,7 @@
Leadership l = leaderBoard.get(es.getKey());
print(FMT_C,
es.getKey(),
- l == null ? "null" : l.leader(),
+ String.valueOf(l.leaderNodeId()),
// formatting hacks to get it into a table
list.get(0).toString());
list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
@@ -134,10 +124,10 @@
result.add(
mapper.createObjectNode()
.put("topic", l.topic())
- .put("leader", l.leader().toString())
+ .put("leader", String.valueOf(l.leaderNodeId()))
.put("candidates", l.candidates().toString())
- .put("epoch", l.epoch())
- .put("electedTime", Tools.timeAgo(l.electedTime()))));
+ .put("epoch", l.leader().term())
+ .put("epochStartTime", Tools.timeAgo(l.leader().termStartTime()))));
return result;
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leader.java b/core/api/src/main/java/org/onosproject/cluster/Leader.java
new file mode 100644
index 0000000..8e5528d
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/Leader.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Topic leader.
+ * <p>
+ * Identified by the {@link NodeId node identifier} and a monotonically increasing term number.
+ * The term number is incremented by one every time a new node is elected as leader.
+ * Also available is the system clock time at the instant when this node was elected as leader.
+ * Keep in mind though that as with any system clock based time stamps this particular information
+ * susceptible to clock skew and should only be relied on for simple diagnostic purposes.
+ */
+public class Leader {
+ private final NodeId nodeId;
+ private final long term;
+ private final long termStartTime;
+
+ public Leader(NodeId nodeId, long term, long termStartTime) {
+ this.nodeId = checkNotNull(nodeId);
+ checkArgument(term >= 0, "term must be non-negative");
+ this.term = term;
+ checkArgument(termStartTime >= 0, "termStartTime must be non-negative");
+ this.termStartTime = termStartTime;
+ }
+
+ /**
+ * Returns the identifier for of leader.
+ * @return node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the leader's term.
+ * @return leader term
+ */
+ public long term() {
+ return term;
+ }
+
+ /**
+ * Returns the system time when the current leadership term started.
+ * @return current leader term start time
+ */
+ public long termStartTime() {
+ return termStartTime;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other != null && other instanceof Leader) {
+ Leader that = (Leader) other;
+ return Objects.equal(this.nodeId, that.nodeId) &&
+ this.term == that.term &&
+ this.termStartTime == that.termStartTime;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(nodeId, term, termStartTime);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("nodeId", nodeId)
+ .add("term", term)
+ .add("termStartTime", termStartTime)
+ .toString();
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
index 113e19c..b31c03f 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
@@ -17,63 +17,31 @@
import java.util.Objects;
import java.util.List;
-import java.util.Optional;
-
-import org.joda.time.DateTime;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
- * Abstract leadership concept. The information carried by this construct
- * include the topic of contention, the {@link NodeId}s of Nodes that could
- * become leader for the topic, the epoch when the term for a given leader
- * began, and the system time when the term began. Note:
- * <ul>
- * <li>The list of NodeIds may include the current leader at index 0, and the
- * rest in decreasing preference order.</li>
- * <li>The epoch is the logical age of a Leadership construct, and should be
- * used for comparing two Leaderships, but only of the same topic.</li>
- * <li>The leader may be null if its accuracy can't be guaranteed. This applies
- * to CANDIDATES_CHANGED events and candidate board contents.</li>
- * </ul>
+ * State of leadership for topic.
+ * <p>
+ * Provided by this construct is the current {@link Leader leader} and the list of
+ * {@link NodeId nodeId}s currently registered as candidates for election for the topic.
+ * Keep in mind that only registered candidates can become leaders.
*/
public class Leadership {
private final String topic;
- private final Optional<NodeId> leader;
+ private final Leader leader;
private final List<NodeId> candidates;
- private final long epoch;
- private final long electedTime;
- public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
+ public Leadership(String topic, Leader leader, List<NodeId> candidates) {
this.topic = topic;
- this.leader = Optional.of(leader);
- this.candidates = ImmutableList.of(leader);
- this.epoch = epoch;
- this.electedTime = electedTime;
- }
-
- public Leadership(String topic, NodeId leader, List<NodeId> candidates,
- long epoch, long electedTime) {
- this.topic = topic;
- this.leader = Optional.of(leader);
+ this.leader = leader;
this.candidates = ImmutableList.copyOf(candidates);
- this.epoch = epoch;
- this.electedTime = electedTime;
- }
-
- public Leadership(String topic, List<NodeId> candidates,
- long epoch, long electedTime) {
- this.topic = topic;
- this.leader = Optional.empty();
- this.candidates = ImmutableList.copyOf(candidates);
- this.epoch = epoch;
- this.electedTime = electedTime;
}
/**
- * The topic for which this leadership applies.
+ * Returns the leadership topic.
*
* @return leadership topic.
*/
@@ -82,13 +50,21 @@
}
/**
- * The nodeId of leader for this topic.
+ * Returns the {@link NodeId nodeId} of the leader.
*
- * @return leader node.
+ * @return leader node identifier; will be null if there is no leader
*/
- // This will return Optional<NodeId> in the future.
- public NodeId leader() {
- return leader.orElse(null);
+ public NodeId leaderNodeId() {
+ return leader == null ? null : leader.nodeId();
+ }
+
+ /**
+ * Returns the leader for this topic.
+ *
+ * @return leader; will be null if there is no leader for topic
+ */
+ public Leader leader() {
+ return leader;
}
/**
@@ -101,38 +77,9 @@
return candidates;
}
- /**
- * The epoch when the leadership was assumed.
- * <p>
- * Comparing epochs is only appropriate for leadership events for the same
- * topic. The system guarantees that for any given topic the epoch for a new
- * term is higher (not necessarily by 1) than the epoch for any previous
- * term.
- *
- * @return leadership epoch
- */
- public long epoch() {
- return epoch;
- }
-
- /**
- * The system time when the term started.
- * <p>
- * The elected time is initially set on the node coordinating
- * the leader election using its local system time. Due to possible
- * clock skew, relying on this value for determining event ordering
- * is discouraged. Epoch is more appropriate for determining
- * event ordering.
- *
- * @return elected time.
- */
- public long electedTime() {
- return electedTime;
- }
-
@Override
public int hashCode() {
- return Objects.hash(topic, leader, candidates, epoch, electedTime);
+ return Objects.hash(topic, leader, candidates);
}
@Override
@@ -144,9 +91,7 @@
final Leadership other = (Leadership) obj;
return Objects.equals(this.topic, other.topic) &&
Objects.equals(this.leader, other.leader) &&
- Objects.equals(this.candidates, other.candidates) &&
- Objects.equals(this.epoch, other.epoch) &&
- Objects.equals(this.electedTime, other.electedTime);
+ Objects.equals(this.candidates, other.candidates);
}
return false;
}
@@ -157,8 +102,6 @@
.add("topic", topic)
.add("leader", leader)
.add("candidates", candidates)
- .add("epoch", epoch)
- .add("electedTime", new DateTime(electedTime))
.toString();
}
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java
new file mode 100644
index 0000000..c58ca71
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipAdminService.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+/**
+ * Interface for administratively manipulating leadership assignments.
+ */
+public interface LeadershipAdminService {
+
+ /**
+ * Attempts to assign leadership for a topic to a specified node.
+ * @param topic leadership topic
+ * @param nodeId identifier of the node to be made leader
+ * @return true is the transfer was successfully executed. This method returns {@code false}
+ * if {@code nodeId} is not one of the candidates for for the topic.
+ */
+ boolean transferLeadership(String topic, NodeId nodeId);
+
+ /**
+ * Make a node to be the next leader by promoting it to top of candidate list.
+ * @param topic leadership topic
+ * @param nodeId identifier of node to be next leader
+ * @return {@code true} if nodeId is now the top candidate. This method returns {@code false}
+ * if {@code nodeId} is not one of the candidates for for the topic.
+ */
+ boolean promoteToTopOfCandidateList(String topic, NodeId nodeId);
+
+ /**
+ * Removes all active leadership registrations for a given node.
+ * <p>
+ * This method will also evict the node from leaderships that it currently owns.
+ * @param nodeId node identifier
+ */
+ void unregister(NodeId nodeId);
+}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index d1fe670..6fb310d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -27,33 +27,25 @@
public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
/**
- * Type of leadership-related events.
+ * Type of leadership events.
*/
public enum Type {
/**
- * Signifies that the leader has been elected.
- * The event subject is the new leader.
- * This event does not guarantee accurate candidate information.
+ * Signifies a change in both the leader as well as change to the list of candidates. Keep in mind though that
+ * the first node entering the race will trigger this event as it will become a candidate and automatically get
+ * promoted to become leader.
*/
- LEADER_ELECTED,
+ LEADER_AND_CANDIDATES_CHANGED,
/**
- * Signifies that the leader has been re-elected.
- * The event subject is the leader.
- * This event does not guarantee accurate candidate information.
+ * Signifies that the leader for a topic has changed.
*/
- LEADER_REELECTED,
+ // TODO: We may not need this. We currently do not support a way for a current leader to step down
+ // while still reamining a candidate
+ LEADER_CHANGED,
/**
- * Signifies that the leader has been booted and lost leadership.
- * The event subject is the former leader.
- * This event does not guarantee accurate candidate information.
- */
- LEADER_BOOTED,
-
- /**
- * Signifies that the list of candidates for leadership for a topic has
- * changed. This event does not guarantee accurate leader information.
+ * Signifies a change in the list of candidates for a topic.
*/
CANDIDATES_CHANGED
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 7d1f607..b0f7a8d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -17,87 +17,73 @@
import org.onosproject.event.ListenerService;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
/**
* Service for leader election.
+ * <p>
* Leadership contests are organized around topics. A instance can join the
* leadership race for a topic or withdraw from a race it has previously joined.
+ * <p>
* Listeners can be added to receive notifications asynchronously for various
* leadership contests.
+ * <p>
+ * When a node gets elected as a leader for a topic, all nodes receive notifications
+ * indicating a change in leadership.
*/
public interface LeadershipService
extends ListenerService<LeadershipEvent, LeadershipEventListener> {
/**
- * Returns the current leader for the topic.
+ * Returns the {@link NodeId node identifier} that is the current leader for a topic.
*
- * @param path topic
- * @return nodeId of the leader, null if so such topic exists.
+ * @param topic leadership topic
+ * @return node identifier of the current leader; {@code null} if there is no leader for the topic
*/
- NodeId getLeader(String path);
+ default NodeId getLeader(String topic) {
+ Leadership leadership = getLeadership(topic);
+ return leadership == null ? null : leadership.leaderNodeId();
+ }
/**
- * Returns the current leadership info for the topic.
+ * Returns the current {@link Leadership leadership} for a topic.
*
- * @param path topic
- * @return leadership info or null if so such topic exists.
+ * @param topic leadership topic
+ * @return leadership or {@code null} if no such topic exists
*/
- Leadership getLeadership(String path);
+ Leadership getLeadership(String topic);
/**
- * Returns the set of topics owned by the specified node.
+ * Returns the set of topics owned by the specified {@link NodeId node}.
*
- * @param nodeId node Id.
+ * @param nodeId node identifier.
* @return set of topics for which this node is the current leader.
*/
- Set<String> ownedTopics(NodeId nodeId);
+ default Set<String> ownedTopics(NodeId nodeId) {
+ return Maps.filterValues(getLeaderBoard(), v -> Objects.equal(nodeId, v.leaderNodeId())).keySet();
+ }
/**
- * Joins the leadership contest.
+ * Enters a leadership contest.
*
- * @param path topic for which this controller node wishes to be a leader
+ * @param topic leadership topic
* @return {@code Leadership} future
*/
- CompletableFuture<Leadership> runForLeadership(String path);
+ Leadership runForLeadership(String topic);
/**
* Withdraws from a leadership contest.
*
- * @param path topic for which this controller node no longer wishes to be a leader
- * @return future that is successfully completed when withdraw is done
+ * @param topic leadership topic
*/
- CompletableFuture<Void> withdraw(String path);
-
- /**
- * If the local nodeId is the leader for specified topic, this method causes it to
- * step down temporarily from leadership.
- * <p>
- * The node will continue to be in contention for leadership and can
- * potentially become the leader again if and when it becomes the highest
- * priority candidate
- * <p>
- * If the local nodeId is not the leader, this method will make no changes and
- * simply return false.
- *
- * @param path topic for which this controller node should give up leadership
- * @return true if this node stepped down from leadership, false otherwise
- */
- boolean stepdown(String path);
-
- /**
- * Moves the specified nodeId to the top of the candidates list for the topic.
- * <p>
- * If the node is not a candidate for this topic, this method will be a noop.
- *
- * @param path leadership topic
- * @param nodeId nodeId to make the top candidate
- * @return true if nodeId is now the top candidate, false otherwise
- */
- boolean makeTopCandidate(String path, NodeId nodeId);
+ void withdraw(String topic);
/**
* Returns the current leader board.
@@ -107,18 +93,22 @@
Map<String, Leadership> getLeaderBoard();
/**
- * Returns the candidates for all known topics.
+ * Returns the candidate nodes for each topic.
*
* @return A mapping from topics to corresponding list of candidates.
*/
- Map<String, List<NodeId>> getCandidates();
+ default Map<String, List<NodeId>> getCandidates() {
+ return ImmutableMap.copyOf(Maps.transformValues(getLeaderBoard(), v -> ImmutableList.copyOf(v.candidates())));
+ }
/**
- * Returns the candidates for a given topic.
+ * Returns the candidate nodes for a given topic.
*
- * @param path topic
- * @return A lists of NodeIds, which may be empty.
+ * @param topic leadership topic
+ * @return A lists of {@link NodeId nodeIds}, which may be empty.
*/
- List<NodeId> getCandidates(String path);
-
-}
+ default List<NodeId> getCandidates(String topic) {
+ Leadership leadership = getLeadership(topic);
+ return leadership == null ? ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
new file mode 100644
index 0000000..e3a0503
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import java.util.Map;
+import org.onosproject.store.Store;
+
+/**
+ * Store interface for managing {@link LeadershipService} state.
+ */
+public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
+
+ /**
+ * Adds registration for the local instance to be leader for topic.
+ *
+ * @param topic leadership topic
+ * @return Updated leadership after operation is completed
+ */
+ Leadership addRegistration(String topic);
+
+ /**
+ * Unregisters the local instance from leadership contest for topic.
+ *
+ * @param topic leadership topic
+ */
+ void removeRegistration(String topic);
+
+ /**
+ * Unregisters an instance from all leadership contests.
+ *
+ * @param nodeId node identifier
+ */
+ void removeRegistration(NodeId nodeId);
+
+ /**
+ * Updates state so that given node is leader for a topic.
+ *
+ * @param topic leadership topic
+ * @param toNodeId identifier of the desired leader
+ * @return {@code true} if the transfer succeeded; {@code false} otherwise.
+ * This method can return {@code false} if the node is not registered for the topic
+ */
+ boolean moveLeadership(String topic, NodeId toNodeId);
+
+ /**
+ * Attempts to make a node the top candidate.
+ *
+ * @param topic leadership topic
+ * @param nodeId node identifier
+ * @return {@code true} if the specified node is now the top candidate.
+ * This method will return {@code false} if the node is not registered for the topic
+ */
+ boolean makeTopCandidate(String topic, NodeId nodeId);
+
+ /**
+ * Returns the current leadership for topic.
+ *
+ * @param topic leadership topic
+ * @return current leadership
+ */
+ Leadership getLeadership(String topic);
+
+ /**
+ * Return current leadership for all topics.
+ *
+ * @return topic to leadership mapping
+ */
+ Map<String, Leadership> getLeaderships();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java
new file mode 100644
index 0000000..894fdc3
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * {@link LeadershipStore} delegate abstraction.
+ */
+public interface LeadershipStoreDelegate extends StoreDelegate<LeadershipEvent> {
+}
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
index be0321b..792d295 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipEventTest.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.cluster;
+import java.util.Arrays;
+
import org.junit.Test;
import com.google.common.testing.EqualsTester;
@@ -28,10 +30,11 @@
public class LeadershipEventTest {
private final NodeId node1 = new NodeId("1");
private final NodeId node2 = new NodeId("2");
- private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
- private final Leadership lead2 = new Leadership("topic1", node2, 1L, 2L);
+ private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+ private final Leadership lead2 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1, node2));
+ private final Leadership lead3 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2));
private final LeadershipEvent event1 =
- new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, lead1);
+ new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead1);
private final long time = System.currentTimeMillis();
private final LeadershipEvent event2 =
new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
@@ -40,11 +43,9 @@
new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
lead2, time);
private final LeadershipEvent event3 =
- new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, lead1);
+ new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead2);
private final LeadershipEvent event4 =
- new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead1);
- private final LeadershipEvent event5 =
- new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead2);
+ new LeadershipEvent(LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED, lead3);
/**
* Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -56,7 +57,6 @@
.addEqualityGroup(event2, sameAsEvent2)
.addEqualityGroup(event3)
.addEqualityGroup(event4)
- .addEqualityGroup(event5)
.testEquals();
}
@@ -65,7 +65,7 @@
*/
@Test
public void checkConstruction() {
- assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_ELECTED));
+ assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_CHANGED));
assertThat(event1.subject(), is(lead1));
assertThat(event2.time(), is(time));
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index e1d421d..72595e4 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -18,7 +18,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
/**
* Test adapter for leadership service.
@@ -41,13 +40,12 @@
}
@Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
+ public Leadership runForLeadership(String path) {
return null;
}
@Override
- public CompletableFuture<Void> withdraw(String path) {
- return null;
+ public void withdraw(String path) {
}
@Override
@@ -74,14 +72,4 @@
public List<NodeId> getCandidates(String path) {
return null;
}
-
- @Override
- public boolean stepdown(String path) {
- return false;
- }
-
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- return false;
- }
}
\ No newline at end of file
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
index e2a8658..defc671 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipTest.java
@@ -15,12 +15,13 @@
*/
package org.onosproject.cluster;
+import java.util.Arrays;
+
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.testing.EqualsTester;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -31,16 +32,14 @@
public class LeadershipTest {
private final NodeId node1 = new NodeId("1");
private final NodeId node2 = new NodeId("2");
- private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
- private final Leadership sameAsLead1 = new Leadership("topic1", node1, 1L, 2L);
- private final Leadership lead2 = new Leadership("topic2", node1, 1L, 2L);
- private final Leadership lead3 = new Leadership("topic1", node1, 2L, 2L);
- private final Leadership lead4 = new Leadership("topic1", node1, 3L, 2L);
- private final Leadership lead5 = new Leadership("topic1", node1, 3L, 3L);
- private final Leadership lead6 = new Leadership("topic1", node1,
- ImmutableList.of(node2), 1L, 2L);
- private final Leadership lead7 = new Leadership("topic1",
- ImmutableList.of(node2), 1L, 2L);
+ private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+ private final Leadership sameAsLead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+ private final Leadership lead2 = new Leadership("topic2", new Leader(node1, 1L, 2L), Arrays.asList(node1));
+ private final Leadership lead3 = new Leadership("topic1", new Leader(node1, 2L, 2L), Arrays.asList(node1));
+ private final Leadership lead4 = new Leadership("topic1", new Leader(node1, 3L, 2L), Arrays.asList(node1));
+ private final Leadership lead5 = new Leadership("topic1", new Leader(node1, 3L, 3L), Arrays.asList(node1));
+ private final Leadership lead6 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2, node1));
+ private final Leadership lead7 = new Leadership("topic1", null, ImmutableList.of());
/**
* Tests for proper operation of equals(), hashCode() and toString() methods.
@@ -64,12 +63,10 @@
*/
@Test
public void checkConstruction() {
- assertThat(lead6.electedTime(), is(2L));
- assertThat(lead6.epoch(), is(1L));
- assertThat(lead6.leader(), is(node1));
+ assertThat(lead6.leader(), is(new Leader(node2, 1L, 2L)));
assertThat(lead6.topic(), is("topic1"));
- assertThat(lead6.candidates(), hasSize(1));
- assertThat(lead6.candidates(), contains(node2));
+ assertThat(lead6.candidates(), hasSize(2));
+ assertThat(lead6.candidates().get(1), is(node1));
+ assertThat(lead6.candidates().get(0), is(node2));
}
-
}
diff --git a/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java b/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
index 194ffec..b540a1b 100644
--- a/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
+++ b/core/common/src/test/java/org/onosproject/store/trivial/SimpleLeadershipManager.java
@@ -17,20 +17,22 @@
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEvent.Type;
@@ -53,8 +55,15 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
+ private NodeId localNodeId;
+
private Map<String, Boolean> elections = new ConcurrentHashMap<>();
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ }
+
@Override
public NodeId getLeader(String path) {
return elections.get(path) ? clusterService.getLocalNode().id() : null;
@@ -63,7 +72,8 @@
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
- return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null;
+ return elections.get(path) ?
+ new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId)) : null;
}
@Override
@@ -77,23 +87,22 @@
}
@Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
+ public Leadership runForLeadership(String path) {
elections.put(path, true);
+ Leadership leadership = new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId));
for (LeadershipEventListener listener : listeners) {
- listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
- new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
+ listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED, leadership));
}
- return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
+ return leadership;
}
@Override
- public CompletableFuture<Void> withdraw(String path) {
+ public void withdraw(String path) {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
- listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
- new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
+ listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED,
+ new Leadership(path, null, Arrays.asList())));
}
- return CompletableFuture.completedFuture(null);
}
@Override
@@ -122,14 +131,4 @@
public List<NodeId> getCandidates(String path) {
return null;
}
-
- @Override
- public boolean stepdown(String path) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- throw new UnsupportedOperationException();
- }
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java
new file mode 100644
index 0000000..e9a78d6
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/LeadershipManager.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractListenerManager;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Implementation of {@link LeadershipService} and {@link LeadershipAdminService}.
+ */
+@Component(immediate = true)
+@Service
+public class LeadershipManager
+ extends AbstractListenerManager<LeadershipEvent, LeadershipEventListener>
+ implements LeadershipService, LeadershipAdminService {
+
+ private final Logger log = getLogger(getClass());
+
+ private LeadershipStoreDelegate delegate = this::post;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipStore store;
+
+ private NodeId localNodeId;
+
+ private final ScheduledExecutorService deadlockDetector =
+ Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/leadership", ""));
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ store.setDelegate(delegate);
+ eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+ deadlockDetector.scheduleWithFixedDelay(() -> clusterService.getNodes()
+ .stream()
+ .map(ControllerNode::id)
+ .filter(id -> clusterService.getState(id) != ControllerNode.State.ACTIVE)
+ .forEach(this::unregister), 0, 2, TimeUnit.SECONDS);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ deadlockDetector.shutdown();
+ Maps.filterValues(store.getLeaderships(), v -> v.candidates().contains(localNodeId))
+ .keySet()
+ .forEach(this::withdraw);
+ store.unsetDelegate(delegate);
+ eventDispatcher.removeSink(LeadershipEvent.class);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Leadership getLeadership(String topic) {
+ return store.getLeadership(topic);
+ }
+
+ @Override
+ public Leadership runForLeadership(String topic) {
+ return store.addRegistration(topic);
+ }
+
+ @Override
+ public void withdraw(String topic) {
+ store.removeRegistration(topic);
+ }
+
+ @Override
+ public Map<String, Leadership> getLeaderBoard() {
+ return store.getLeaderships();
+ }
+
+ @Override
+ public boolean transferLeadership(String topic, NodeId to) {
+ return store.moveLeadership(topic, to);
+ }
+
+ @Override
+ public void unregister(NodeId nodeId) {
+ store.removeRegistration(nodeId);
+ }
+
+ @Override
+ public boolean promoteToTopOfCandidateList(String topic, NodeId nodeId) {
+ return store.makeTopCandidate(topic, nodeId);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
new file mode 100644
index 0000000..c6647b7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.cluster.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipStore;
+import org.onosproject.cluster.LeadershipStoreDelegate;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
+ */
+@Service
+@Component(immediate = true, enabled = true)
+public class DistributedLeadershipStore
+ extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
+ implements LeadershipStore {
+
+ private static final Logger log = getLogger(DistributedLeadershipStore.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ protected NodeId localNodeId;
+ protected ConsistentMap<String, InternalLeadership> leadershipMap;
+ private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
+ event -> {
+ Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
+ Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
+ boolean leaderChanged =
+ !Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
+ boolean candidatesChanged =
+ !Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
+ ImmutableSet.<NodeId>of() : oldValue.candidates()),
+ Sets.newHashSet(newValue.candidates())).isEmpty();
+ LeadershipEvent.Type eventType = null;
+ if (leaderChanged && candidatesChanged) {
+ eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
+ }
+ if (leaderChanged && !candidatesChanged) {
+ eventType = LeadershipEvent.Type.LEADER_CHANGED;
+ }
+ if (!leaderChanged && candidatesChanged) {
+ eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
+ }
+ notifyDelegate(new LeadershipEvent(eventType, newValue));
+ };
+
+ @Activate
+ public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
+ .withName("onos-leadership")
+ .withPartitionsDisabled()
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
+ .build();
+ leadershipMap.addListener(leadershipChangeListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ leadershipMap.removeListener(leadershipChangeListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Leadership addRegistration(String topic) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v == null || !v.candidates().contains(localNodeId),
+ (k, v) -> {
+ if (v == null || v.candidates().isEmpty()) {
+ return new InternalLeadership(topic,
+ localNodeId,
+ v == null ? 1 : v.term() + 1,
+ System.currentTimeMillis(),
+ ImmutableList.of(localNodeId));
+ }
+ List<NodeId> newCandidates = new ArrayList<>(v.candidates());
+ newCandidates.add(localNodeId);
+ return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
+ });
+ return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
+ }
+
+ @Override
+ public void removeRegistration(String topic) {
+ removeRegistration(topic, localNodeId);
+ }
+
+ private void removeRegistration(String topic, NodeId nodeId) {
+ leadershipMap.computeIf(topic,
+ v -> v != null && v.candidates().contains(nodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = v.candidates()
+ .stream()
+ .filter(id -> !nodeId.equals(id))
+ .collect(Collectors.toList());
+ NodeId newLeader = nodeId.equals(v.leader()) ?
+ newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
+ long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+ v.term() : v.term() + 1;
+ long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
+ v.termStartTime() : System.currentTimeMillis();
+ return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
+ });
+ }
+
+ @Override
+ public void removeRegistration(NodeId nodeId) {
+ leadershipMap.entrySet()
+ .stream()
+ .filter(e -> e.getValue().value().candidates().contains(nodeId))
+ .map(e -> e.getKey())
+ .forEach(topic -> this.removeRegistration(topic, nodeId));
+ }
+
+ @Override
+ public boolean moveLeadership(String topic, NodeId toNodeId) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v != null &&
+ v.candidates().contains(toNodeId) &&
+ !Objects.equal(v.leader(), toNodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = new ArrayList<>();
+ newCandidates.add(toNodeId);
+ newCandidates.addAll(v.candidates()
+ .stream()
+ .filter(id -> !toNodeId.equals(id))
+ .collect(Collectors.toList()));
+ return new InternalLeadership(topic,
+ toNodeId,
+ v.term() + 1,
+ System.currentTimeMillis(),
+ newCandidates);
+ });
+ return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
+ }
+
+ @Override
+ public boolean makeTopCandidate(String topic, NodeId nodeId) {
+ Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
+ v -> v != null &&
+ v.candidates().contains(nodeId) &&
+ !v.candidates().get(0).equals(nodeId),
+ (k, v) -> {
+ List<NodeId> newCandidates = new ArrayList<>();
+ newCandidates.add(nodeId);
+ newCandidates.addAll(v.candidates()
+ .stream()
+ .filter(id -> !nodeId.equals(id))
+ .collect(Collectors.toList()));
+ return new InternalLeadership(topic,
+ v.leader(),
+ v.term(),
+ System.currentTimeMillis(),
+ newCandidates);
+ });
+ return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
+ }
+
+ @Override
+ public Leadership getLeadership(String topic) {
+ return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
+ }
+
+ @Override
+ public Map<String, Leadership> getLeaderships() {
+ Map<String, Leadership> leaderships = Maps.newHashMap();
+ leadershipMap.entrySet().forEach(e -> {
+ leaderships.put(e.getKey(), e.getValue().value().asLeadership());
+ });
+ return ImmutableMap.copyOf(leaderships);
+ }
+
+ private static class InternalLeadership {
+ private final String topic;
+ private final NodeId leader;
+ private final long term;
+ private final long termStartTime;
+ private final List<NodeId> candidates;
+
+ public InternalLeadership(String topic,
+ NodeId leader,
+ long term,
+ long termStartTime,
+ List<NodeId> candidates) {
+ this.topic = topic;
+ this.leader = leader;
+ this.term = term;
+ this.termStartTime = termStartTime;
+ this.candidates = ImmutableList.copyOf(candidates);
+ }
+
+ public NodeId leader() {
+ return this.leader;
+ }
+
+ public long term() {
+ return term;
+ }
+
+ public long termStartTime() {
+ return termStartTime;
+ }
+
+ public List<NodeId> candidates() {
+ return candidates;
+ }
+
+ public Leadership asLeadership() {
+ return new Leadership(topic, leader == null ?
+ null : new Leader(leader, term, termStartTime), candidates);
+ }
+
+ public static Leadership toLeadership(InternalLeadership internalLeadership) {
+ return internalLeadership == null ? null : internalLeadership.asLeadership();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("leader", leader)
+ .add("term", term)
+ .add("termStartTime", termStartTime)
+ .add("candidates", candidates)
+ .toString();
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
index bfb4754..d2c63f3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/intent/impl/IntentPartitionManager.java
@@ -21,8 +21,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
@@ -76,7 +74,6 @@
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
- private ClusterEventListener clusterListener = new InternalClusterEventListener();
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
@@ -84,7 +81,6 @@
@Activate
public void activate() {
leadershipService.addListener(leaderListener);
- clusterService.addListener(clusterListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
@@ -103,7 +99,6 @@
eventDispatcher.removeSink(IntentPartitionEvent.class);
leadershipService.removeListener(leaderListener);
- clusterService.removeListener(clusterListener);
}
/**
@@ -180,7 +175,7 @@
List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
.stream()
- .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
+ .filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
.filter(l -> l.topic().startsWith(ELECTION_PREFIX))
.collect(Collectors.toList());
@@ -220,24 +215,16 @@
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
+ if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
- // See if we need to let some partitions go
- scheduleRebalance(0);
-
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
- }
- }
- private final class InternalClusterEventListener implements
- ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- scheduleRebalance(0);
+ if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
+ scheduleRebalance(0);
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 44fbea0..a2a0081 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -16,7 +16,6 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
-import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +42,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
@@ -63,9 +63,9 @@
import org.slf4j.Logger;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* Implementation of the MastershipStore on top of Leadership Service.
@@ -82,18 +82,18 @@
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipAdminService leadershipAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
private NodeId localNodeId;
- private final Set<DeviceId> connectedDevices = Sets.newHashSet();
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-role-relinquish");
- private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
- new MessageSubject("mastership-store-device-mastership-relinquish");
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Pattern.compile("device:(.*)");
@@ -132,11 +132,6 @@
this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
- clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::decode,
- this::transitionFromMasterToStandby,
- SERIALIZER::encode,
- messageHandlingExecutor);
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
@@ -146,7 +141,6 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
- clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
@@ -159,12 +153,9 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- connectedDevices.add(deviceId);
- return leadershipService.runForLeadership(leadershipTopic)
- .thenApply(leadership -> {
- return Objects.equal(localNodeId, leadership.leader())
- ? MastershipRole.MASTER : MastershipRole.STANDBY;
- });
+ Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+ return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
+ ? MastershipRole.MASTER : MastershipRole.STANDBY);
}
@Override
@@ -173,20 +164,19 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId leader = leadershipService.getLeader(leadershipTopic);
- if (Objects.equal(nodeId, leader)) {
- return MastershipRole.MASTER;
- }
- return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
- MastershipRole.STANDBY : MastershipRole.NONE;
+ Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ return Objects.equal(nodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@Override
public NodeId getMaster(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- return leadershipService.getLeader(leadershipTopic);
+ return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
}
@Override
@@ -194,9 +184,8 @@
checkArgument(deviceId != null, DEVICE_ID_NULL);
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
- clusterService
- .getNodes()
- .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+ clusterService.getNodes()
+ .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
final List<NodeId> standbys = Lists.newLinkedList();
@@ -233,30 +222,10 @@
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
- NodeId currentMaster = getMaster(deviceId);
- if (nodeId.equals(currentMaster)) {
- return CompletableFuture.completedFuture(null);
- } else {
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
- if (candidates.isEmpty()) {
- return CompletableFuture.completedFuture(null);
- }
- if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
- CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
- // There is brief wait before we step down from mastership.
- // This is to ensure any work that happens when standby preference
- // order changes can complete. For example: flow entries need to be backed
- // to the new top standby (ONOS-1883)
- // FIXME: This potentially introduces a race-condition.
- // Right now role changes are only forced via CLI.
- transferExecutor.schedule(() -> {
- result.complete(transitionFromMasterToStandby(deviceId));
- }, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
- return result;
- } else {
- log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
- }
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+ transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+ WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
}
return CompletableFuture.completedFuture(null);
}
@@ -267,7 +236,7 @@
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
- return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+ return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
}
@Override
@@ -318,71 +287,44 @@
private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
- // Check if this node is can be managed by this node.
- if (!connectedDevices.contains(deviceId)) {
+ String leadershipTopic = createDeviceMastershipTopic(deviceId);
+ if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
return CompletableFuture.completedFuture(null);
}
-
- String leadershipTopic = createDeviceMastershipTopic(deviceId);
- NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
-
- MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
- ? MastershipEvent.Type.MASTER_CHANGED
- : MastershipEvent.Type.BACKUPS_CHANGED;
-
- connectedDevices.remove(deviceId);
- return leadershipService.withdraw(leadershipTopic)
- .thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
- }
-
- private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
- checkArgument(deviceId != null, DEVICE_ID_NULL);
-
- NodeId currentMaster = getMaster(deviceId);
- if (currentMaster == null) {
- return null;
- }
-
- if (!currentMaster.equals(localNodeId)) {
- log.info("Forwarding request to relinquish "
- + "mastership for device {} to {}", deviceId, currentMaster);
- return futureGetOrElse(clusterCommunicator.sendAndReceive(
- deviceId,
- TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- currentMaster), null);
- }
-
- return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
- ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
+ MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+ MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+ leadershipService.withdraw(leadershipTopic);
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
}
@Override
public void relinquishAllRole(NodeId nodeId) {
- // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+ // Noop. LeadershipService already takes care of detecting and purging stale locks.
}
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ Leadership leadership = event.subject();
+ return isDeviceMastershipTopic(leadership.topic());
+ }
+
@Override
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
- if (!isDeviceMastershipTopic(leadership.topic())) {
- return;
- }
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+ RoleInfo roleInfo = getNodes(deviceId);
switch (event.type()) {
- case LEADER_ELECTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_AND_CANDIDATES_CHANGED:
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
- case LEADER_REELECTED:
- // There is no concept of leader re-election in the new distributed leadership manager.
- throw new IllegalStateException("Unexpected event type");
- case LEADER_BOOTED:
- notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+ case LEADER_CHANGED:
+ notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
case CANDIDATES_CHANGED:
- notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+ notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
default:
return;
@@ -407,5 +349,4 @@
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
return m.matches();
}
-
-}
+}
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
index 5ba0c7c..0c92280 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/IntentPartitionManagerTest.java
@@ -22,6 +22,7 @@
import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
@@ -31,13 +32,12 @@
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.intent.Key;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
import static junit.framework.TestCase.assertFalse;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
@@ -55,9 +55,10 @@
public class IntentPartitionManagerTest {
private final LeadershipEvent event
- = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED,
+ = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(ELECTION_PREFIX + "0",
- MY_NODE_ID, 0, 0));
+ new Leader(MY_NODE_ID, 0, 0),
+ Arrays.asList(MY_NODE_ID, OTHER_NODE_ID)));
private static final NodeId MY_NODE_ID = new NodeId("local");
private static final NodeId OTHER_NODE_ID = new NodeId("other");
@@ -78,7 +79,7 @@
expectLastCall().andDelegateTo(new TestLeadershipService());
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
- .andReturn(CompletableFuture.completedFuture(null))
+ .andReturn(null)
.times(1);
}
@@ -105,7 +106,9 @@
expect(leadershipService.getLeader(ELECTION_PREFIX + i))
.andReturn(MY_NODE_ID).anyTimes();
leaderBoard.put(ELECTION_PREFIX + i,
- new Leadership(ELECTION_PREFIX + i, MY_NODE_ID, 0, 0));
+ new Leadership(ELECTION_PREFIX + i,
+ new Leader(MY_NODE_ID, 0, 0),
+ Arrays.asList(MY_NODE_ID)));
}
for (int i = numMine; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
@@ -113,7 +116,9 @@
.andReturn(OTHER_NODE_ID).anyTimes();
leaderBoard.put(ELECTION_PREFIX + i,
- new Leadership(ELECTION_PREFIX + i, OTHER_NODE_ID, 0, 0));
+ new Leadership(ELECTION_PREFIX + i,
+ new Leader(OTHER_NODE_ID, 0, 0),
+ Arrays.asList(OTHER_NODE_ID)));
}
expect(leadershipService.getLeaderBoard()).andReturn(leaderBoard).anyTimes();
@@ -131,7 +136,7 @@
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
- .andReturn(CompletableFuture.completedFuture(null))
+ .andReturn(null)
.times(1);
}
@@ -200,9 +205,8 @@
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
- expect(leadershipService.withdraw(anyString()))
- .andReturn(CompletableFuture.completedFuture(null))
- .times(7);
+ leadershipService.withdraw(anyString());
+ expectLastCall().times(7);
replay(leadershipService);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
deleted file mode 100644
index db55a85..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
+++ /dev/null
@@ -1,614 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.commons.lang.math.RandomUtils;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEvent.Type;
-import org.onosproject.cluster.ClusterEventListener;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.ConsistentMapException;
-import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
-import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
-import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
-
-/**
- * Distributed Lock Manager implemented on top of ConsistentMap.
- * <p>
- * This implementation makes use of ClusterService's failure
- * detection capabilities to detect and purge stale locks.
- * TODO: Ensure lock safety and liveness.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class DistributedLeadershipManager implements LeadershipService {
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StorageService storageService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
-
- private final Logger log = getLogger(getClass());
-
- private ScheduledExecutorService electionRunner;
- private ScheduledExecutorService lockExecutor;
- private ScheduledExecutorService staleLeadershipPurgeExecutor;
- private ScheduledExecutorService leadershipRefresher;
-
- // leader for each topic
- private ConsistentMap<String, NodeId> leaderMap;
- // list of candidates (includes chosen leader) for each topic
- private ConsistentMap<String, List<NodeId>> candidateMap;
-
- private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
-
- // cached copy of leaderMap
- // Note: Map value, Leadership, does not contain proper candidates info
- private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
- // cached copy of candidateMap
- // Note: Map value, Leadership, does not contain proper leader info
- private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
-
- private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
-
- private NodeId localNodeId;
- private Set<String> activeTopics = Sets.newConcurrentHashSet();
- private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
-
- // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
- private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
- private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
- private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
- private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
-
- private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
-
- private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
-
- @Activate
- public void activate() {
- leaderMap = storageService.<String, NodeId>consistentMapBuilder()
- .withName("onos-topic-leaders")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
- candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
- .withName("onos-topic-candidates")
- .withSerializer(SERIALIZER)
- .withPartitionsDisabled().build();
-
- leaderMap.addListener(event -> {
- log.debug("Received {}", event);
- LeadershipEvent.Type leadershipEventType = null;
- if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
- } else if (event.type() == MapEvent.Type.REMOVE) {
- leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
- }
- onLeadershipEvent(new LeadershipEvent(
- leadershipEventType,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
-
- candidateMap.addListener(event -> {
- log.debug("Received {}", event);
- if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
- log.error("Entries must not be removed from candidate map");
- return;
- }
- onLeadershipEvent(new LeadershipEvent(
- LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(event.key(),
- event.value().value(),
- event.value().version(),
- event.value().creationTime())));
- });
-
- localNodeId = clusterService.getLocalNode().id();
-
- electionRunner = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "election-runner"));
- lockExecutor = Executors.newScheduledThreadPool(
- 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
- staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
- leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
- groupedThreads("onos/store/leadership", "refresh-thread"));
-
- clusterService.addListener(clusterEventListener);
-
- electionRunner.scheduleWithFixedDelay(
- this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
-
- leadershipRefresher.scheduleWithFixedDelay(
- this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
-
- listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- if (clusterService.getNodes().size() > 1) {
- // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
- leaderBoard.forEach((topic, leadership) -> {
- if (localNodeId.equals(leadership.leader())) {
- withdraw(topic);
- }
- });
- }
-
- clusterService.removeListener(clusterEventListener);
- eventDispatcher.removeSink(LeadershipEvent.class);
-
- electionRunner.shutdown();
- lockExecutor.shutdown();
- staleLeadershipPurgeExecutor.shutdown();
- leadershipRefresher.shutdown();
-
- log.info("Stopped");
- }
-
- @Override
- public Map<String, Leadership> getLeaderBoard() {
- return ImmutableMap.copyOf(leaderBoard);
- }
-
- @Override
- public Map<String, List<NodeId>> getCandidates() {
- return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
- }
-
- @Override
- public List<NodeId> getCandidates(String path) {
- Leadership current = candidateBoard.get(path);
- return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
- }
-
- @Override
- public NodeId getLeader(String path) {
- Leadership leadership = leaderBoard.get(path);
- return leadership != null ? leadership.leader() : null;
- }
-
- @Override
- public Leadership getLeadership(String path) {
- checkArgument(path != null);
- return leaderBoard.get(path);
- }
-
- @Override
- public Set<String> ownedTopics(NodeId nodeId) {
- checkArgument(nodeId != null);
- return leaderBoard.entrySet()
- .stream()
- .filter(entry -> nodeId.equals(entry.getValue().leader()))
- .map(Entry::getKey)
- .collect(Collectors.toSet());
- }
-
- @Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
- log.debug("Running for leadership for topic: {}", path);
- CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
- doRunForLeadership(path, resultFuture);
- return resultFuture;
- }
-
- private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
- try {
- Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
- currentList -> currentList == null || !currentList.contains(localNodeId),
- (topic, currentList) -> {
- if (currentList == null) {
- return ImmutableList.of(localNodeId);
- } else {
- List<NodeId> newList = Lists.newLinkedList();
- newList.addAll(currentList);
- newList.add(localNodeId);
- return newList;
- }
- });
- log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
- activeTopics.add(path);
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership == null) {
- pendingFutures.put(path, future);
- } else {
- future.complete(leadership);
- }
- } catch (ConsistentMapException e) {
- log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
- rerunForLeadership(path, future);
- }
- }
-
- @Override
- public CompletableFuture<Void> withdraw(String path) {
- activeTopics.remove(path);
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- doWithdraw(path, resultFuture);
- return resultFuture;
- }
-
-
- private void doWithdraw(String path, CompletableFuture<Void> future) {
- if (activeTopics.contains(path)) {
- future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
- }
- try {
- leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null);
- candidateMap.computeIf(path,
- candidates -> candidates != null && candidates.contains(localNodeId),
- (topic, candidates) -> candidates.stream()
- .filter(nodeId -> !localNodeId.equals(nodeId))
- .collect(Collectors.toList()));
- future.complete(null);
- } catch (Exception e) {
- log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
- retryWithdraw(path, future);
- }
- }
-
- @Override
- public boolean stepdown(String path) {
- if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
- return false;
- }
-
- try {
- return leaderMap.computeIf(path,
- localNodeId::equals,
- (topic, leader) -> null) == null;
- } catch (Exception e) {
- log.warn("Error executing stepdown for {}", path, e);
- }
- return false;
- }
-
- @Override
- public void addListener(LeadershipEventListener listener) {
- listenerRegistry.addListener(listener);
- }
-
- @Override
- public void removeListener(LeadershipEventListener listener) {
- listenerRegistry.removeListener(listener);
- }
-
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
- candidates -> candidates != null &&
- candidates.contains(nodeId) &&
- !nodeId.equals(Iterables.getFirst(candidates, null)),
- (topic, candidates) -> {
- List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
- updatedCandidates.add(nodeId);
- candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
- return updatedCandidates;
- });
- List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
- return candidates.size() > 0 && nodeId.equals(candidates.get(0));
- }
-
- private Leadership electLeader(String path, List<NodeId> candidates) {
- Leadership currentLeadership = getLeadership(path);
- if (currentLeadership != null) {
- return currentLeadership;
- } else {
- NodeId topCandidate = candidates
- .stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .findFirst()
- .orElse(null);
- try {
- Versioned<NodeId> leader = localNodeId.equals(topCandidate)
- ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
- if (leader != null) {
- Leadership newLeadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- // Since reads only go through the local copy of leader board, we ought to update it
- // first before returning from this method.
- // This is to ensure a subsequent read will not read a stale value.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
- return newLeadership;
- }
- } catch (Exception e) {
- log.debug("Failed to elect leader for {}", path, e);
- }
- }
- return null;
- }
-
- private void electLeaders() {
- try {
- candidateMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- // for active topics, check if this node can become a leader (if it isn't already)
- if (activeTopics.contains(path)) {
- lockExecutor.submit(() -> {
- Leadership leadership = electLeader(path, candidates.value());
- if (leadership != null) {
- CompletableFuture<Leadership> future = pendingFutures.remove(path);
- if (future != null) {
- future.complete(leadership);
- }
- }
- });
- }
- // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
- // and also to update local listeners.
- // Don't worry about duplicate events as they will be suppressed.
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
- new Leadership(path,
- candidates.value(),
- candidates.version(),
- candidates.creationTime())));
- });
- } catch (Exception e) {
- log.debug("Failure electing leaders", e);
- }
- }
-
- private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
-
- Leadership leadershipUpdate = leadershipEvent.subject();
- LeadershipEvent.Type eventType = leadershipEvent.type();
- String topic = leadershipUpdate.topic();
-
- AtomicBoolean updateAccepted = new AtomicBoolean(false);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- leaderBoard.compute(topic, (k, currentLeadership) -> {
- if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- // FIXME: Removing entries from leaderboard is not safe and should be visited.
- return null;
- }
- return currentLeadership;
- });
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- candidateBoard.compute(topic, (k, currentInfo) -> {
- if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
- updateAccepted.set(true);
- return leadershipUpdate;
- }
- return currentInfo;
- });
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
-
- if (updateAccepted.get()) {
- eventDispatcher.post(leadershipEvent);
- }
- }
-
- private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
- lockExecutor.schedule(
- () -> doRunForLeadership(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- TimeUnit.MILLISECONDS);
- }
-
- private void retryWithdraw(String path, CompletableFuture<Void> future) {
- lockExecutor.schedule(
- () -> doWithdraw(path, future),
- RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
- TimeUnit.MILLISECONDS);
- }
-
- private void scheduleStaleLeadershipPurge(int afterDelaySec) {
- if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
- staleLeadershipPurgeExecutor.schedule(
- this::purgeStaleLeadership,
- afterDelaySec,
- TimeUnit.SECONDS);
- }
- }
-
- /**
- * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
- */
- private void purgeStaleLeadership() {
- AtomicBoolean rerunPurge = new AtomicBoolean(false);
- try {
- staleLeadershipPurgeScheduled.set(false);
- leaderMap.entrySet()
- .stream()
- .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
- .forEach(entry -> {
- String path = entry.getKey();
- NodeId nodeId = entry.getValue().value();
- try {
- leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
- } catch (Exception e) {
- log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
- rerunPurge.set(true);
- }
- });
-
- candidateMap.entrySet()
- .forEach(entry -> {
- String path = entry.getKey();
- Versioned<List<NodeId>> candidates = entry.getValue();
- List<NodeId> candidatesList = candidates != null
- ? candidates.value() : Collections.emptyList();
- List<NodeId> activeCandidatesList =
- candidatesList.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
- .collect(Collectors.toList());
- if (activeCandidatesList.size() < candidatesList.size()) {
- Set<NodeId> removedCandidates =
- Sets.difference(Sets.newHashSet(candidatesList),
- Sets.newHashSet(activeCandidatesList));
- try {
- candidateMap.computeIf(path,
- c -> c.stream()
- .filter(n -> clusterService.getState(n) == INACTIVE)
- .count() > 0,
- (topic, c) -> c.stream()
- .filter(n -> clusterService.getState(n) == ACTIVE)
- .filter(n -> !localNodeId.equals(n) ||
- activeTopics.contains(path))
- .collect(Collectors.toList()));
- } catch (Exception e) {
- log.debug("Failed to evict inactive candidates {} from "
- + "candidate list for {}", removedCandidates, path, e);
- rerunPurge.set(true);
- }
- }
- });
- } catch (Exception e) {
- log.debug("Failure purging state leadership.", e);
- rerunPurge.set(true);
- }
-
- if (rerunPurge.get()) {
- log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
- scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
- }
- }
-
- private void refreshLeaderBoard() {
- try {
- Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
- leaderMap.entrySet().forEach(entry -> {
- String path = entry.getKey();
- Versioned<NodeId> leader = entry.getValue();
- Leadership leadership = new Leadership(path,
- leader.value(),
- leader.version(),
- leader.creationTime());
- newLeaderBoard.put(path, leadership);
- });
-
- // first take snapshot of current leader board.
- Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
-
- MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
-
- // evict stale leaders
- diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
- log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
- });
-
- // add missing leaders
- diff.entriesOnlyOnRight().forEach((path, leadership) -> {
- log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
- });
-
- // add updated leaders
- diff.entriesDiffering().forEach((path, difference) -> {
- Leadership current = difference.leftValue();
- Leadership updated = difference.rightValue();
- if (current.epoch() < updated.epoch()) {
- log.debug("Updated {} in leaderboard.", updated);
- onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
- }
- });
- } catch (Exception e) {
- log.debug("Failed to refresh leader board", e);
- }
- }
-
- private class InternalClusterEventListener implements ClusterEventListener {
-
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
- scheduleStaleLeadershipPurge(0);
- }
- }
- }
-}