DistributedLeadershipManager tracks topic election candidates in addition to
leaders. Includes update to leaders CLI command to list candidates.
part of: Device Mastership store on top of LeadershipService
Reference: ONOS-76
Conflicts:
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
Change-Id: I587bb9e9ad16a9c8392969dde45001181053e5e6
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index a60da76..07b5b2b 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -17,12 +17,15 @@
import java.util.Comparator;
import java.util.Map;
+import java.util.List;
import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,6 +39,12 @@
public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
+ private static final String FMT_C = "%-20s | %-15s | %-19s |";
+
+ @Option(name = "-c", aliases = "--candidates",
+ description = "List candidate Nodes for each topic's leadership race",
+ required = false, multiValued = false)
+ private boolean showCandidates = false;
/**
* Compares leaders, sorting by toString() output.
@@ -75,6 +84,28 @@
print("--------------------------------------------------------------");
}
+ private void displayCandidates(Map<String, Leadership> leaderBoard,
+ Map<String, List<NodeId>> candidates) {
+ print("--------------------------------------------------------------");
+ print(FMT_C, "Topic", "Leader", "Candidates");
+ print("--------------------------------------------------------------");
+ leaderBoard
+ .values()
+ .stream()
+ .sorted(leadershipComparator)
+ .forEach(l -> {
+ List<NodeId> list = candidates.get(l.topic());
+ print(FMT_C,
+ l.topic(),
+ l.leader(),
+ list.remove(0).toString());
+ // formatting hacks to get it into a table
+ list.forEach(n -> print(FMT_C, " ", " ", n));
+ print(FMT_C, " ", " ", " ");
+ });
+ print("--------------------------------------------------------------");
+ }
+
/**
* Returns JSON node representing the leaders.
*
@@ -91,6 +122,7 @@
mapper.createObjectNode()
.put("topic", l.topic())
.put("leader", l.leader().toString())
+ .put("candidates", l.candidates().toString())
.put("epoch", l.epoch())
.put("electedTime", Tools.timeAgo(l.electedTime()))));
@@ -106,7 +138,12 @@
if (outputJson()) {
print("%s", json(leaderBoard));
} else {
- displayLeaders(leaderBoard);
+ if (showCandidates) {
+ Map<String, List<NodeId>> candidates = leaderService.getCandidates();
+ displayCandidates(leaderBoard, candidates);
+ } else {
+ displayLeaders(leaderBoard);
+ }
}
}
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index ac0036e..c4f59be 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -49,11 +49,10 @@
LEADER_BOOTED,
/**
- * Signifies that the list of candidates for leadership for a resource
- * has changed. If the change in the backups list is accompanied by a
- * change in the leader, the event is subsumed by the leadership change.
+ * Signifies that the list of candidates for leadership for a topic has
+ * changed.
*/
- LEADER_CANDIDATES_CHANGED
+ CANDIDATES_CHANGED
}
/**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 65ec687..15a198c 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.List;
/**
* Service for leader election.
@@ -67,6 +68,19 @@
Map<String, Leadership> getLeaderBoard();
/**
+ * Returns the candidates for all known topics.
+ * @return A map of topics to lists of NodeIds.
+ */
+ Map<String, List<NodeId>> getCandidates();
+
+ /**
+ * Returns the candidates for a given topic.
+ * @param path topic
+ * @return A lists of NodeIds, which may be empty.
+ */
+ List<NodeId> getCandidates(String path);
+
+ /**
* Registers a event listener to be notified of leadership events.
* @param listener listener that will asynchronously notified of leadership events.
*/
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index 818edb9..02742d4 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.cluster;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -62,4 +63,14 @@
public void removeListener(LeadershipEventListener listener) {
}
+
+ @Override
+ public Map<String, List<NodeId>> getCandidates() {
+ return null;
+ }
+
+ @Override
+ public List<NodeId> getCandidates(String path) {
+ return null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index d4b46ab..2f6a149 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -46,6 +46,7 @@
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -573,4 +574,14 @@
eventDispatcher.post(leadershipEvent);
}
}
+
+ @Override
+ public Map<String, List<NodeId>> getCandidates() {
+ return null;
+ }
+
+ @Override
+ public List<NodeId> getCandidates(String path) {
+ return null;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 39b4844..8bebff7 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -1,8 +1,11 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +15,7 @@
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.ControllerNode.State;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
@@ -24,8 +28,8 @@
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
@@ -35,6 +39,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -77,7 +82,9 @@
private ScheduledExecutorService deadLockDetectionExecutor;
private ScheduledExecutorService leadershipStatusBroadcaster;
- private ConsistentMap<String, NodeId> lockMap;
+ private ConsistentMap<String, NodeId> leaderMap;
+ private ConsistentMap<String, List<NodeId>> candidateMap;
+
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
@@ -85,26 +92,26 @@
private Set<String> activeTopics = Sets.newConcurrentHashSet();
+ private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .build()
- .populate(1);
- }
- };
+ private static final int LEADER_CANDIDATE_POS = 0;
+
+ private static final Serializer SERIALIZER = Serializer.using(
+ new KryoNamespace.Builder().register(KryoNamespaces.API).build());
@Activate
public void activate() {
- lockMap = storageService.<String, NodeId>consistentMapBuilder()
- .withName("onos-leader-locks")
- .withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
- .withPartitionsDisabled().build();
+ leaderMap = storageService.<String, NodeId>consistentMapBuilder()
+ .withName("onos-topic-leaders")
+ .withSerializer(SERIALIZER)
+ .withPartitionsDisabled().build();
+ candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
+ .withName("onos-topic-candidates")
+ .withSerializer(SERIALIZER)
+ .withPartitionsDisabled().build();
localNodeId = clusterService.getLocalNode().id();
@@ -157,6 +164,19 @@
}
@Override
+ public Map<String, List<NodeId>> getCandidates() {
+ Map<String, List<NodeId>> candidates = Maps.newHashMap();
+ candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
+ return ImmutableMap.copyOf(candidates);
+ }
+
+ @Override
+ public List<NodeId> getCandidates(String path) {
+ Versioned<List<NodeId>> candidates = candidateMap.get(path);
+ return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+ }
+
+ @Override
public NodeId getLeader(String path) {
Leadership leadership = leaderBoard.get(path);
return leadership != null ? leadership.leader() : null;
@@ -181,24 +201,62 @@
@Override
public void runForLeadership(String path) {
log.debug("Running for leadership for topic: {}", path);
- activeTopics.add(path);
- tryLeaderLock(path);
+ try {
+ Versioned<List<NodeId>> candidates = candidateMap.get(path);
+ if (candidates != null) {
+ List<NodeId> candidateList = Lists.newArrayList(candidates.value());
+ if (!candidateList.contains(localNodeId)) {
+ candidateList.add(localNodeId);
+ if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+ rerunForLeadership(path);
+ return;
+ }
+ }
+ } else {
+ if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+ rerunForLeadership(path);
+ return;
+ }
+ }
+ log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
+ activeTopics.add(path);
+ tryLeaderLock(path);
+ } catch (ConsistentMapException e) {
+ log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
+ rerunForLeadership(path);
+ }
}
@Override
public void withdraw(String path) {
activeTopics.remove(path);
+
try {
- Versioned<NodeId> leader = lockMap.get(path);
- if (Objects.equals(leader.value(), localNodeId)) {
- if (lockMap.remove(path, leader.version())) {
+ Versioned<NodeId> leader = leaderMap.get(path);
+ if (leader != null && Objects.equals(leader.value(), localNodeId)) {
+ if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
}
}
- // else we are not the current owner.
+ // else we are not the current leader, can still be a candidate.
+ Versioned<List<NodeId>> candidates = candidateMap.get(path);
+ List<NodeId> candidateList = candidates != null
+ ? Lists.newArrayList(candidates.value())
+ : Lists.newArrayList();
+ if (!candidateList.remove(localNodeId)) {
+ return;
+ }
+ boolean success = candidateList.isEmpty()
+ ? candidateMap.remove(path, candidates.version())
+ : candidateMap.replace(path, candidates.version(), candidateList);
+ if (!success) {
+ log.warn("Failed to withdraw from candidates list. Will retry");
+ retryWithdraw(path);
+ }
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
+ retryWithdraw(path);
}
}
@@ -216,39 +274,62 @@
if (!activeTopics.contains(path)) {
return;
}
+
+ Versioned<List<NodeId>> candidates = candidateMap.get(path);
+ if (candidates != null) {
+ List<NodeId> activeNodes = candidates.value().stream()
+ .filter(n -> clusterService.getState(n) == State.ACTIVE)
+ .collect(Collectors.toList());
+ if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
+ leaderLockAttempt(path, candidates.value());
+ } else {
+ retryLock(path);
+ }
+ } else {
+ throw new IllegalStateException("should not be here");
+ }
+ }
+
+ private void leaderLockAttempt(String path, List<NodeId> candidates) {
try {
- Versioned<NodeId> currentLeader = lockMap.get(path);
+ Versioned<NodeId> currentLeader = leaderMap.get(path);
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
- notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
+ // FIXME: candidates can get out of sync.
+ notifyNewLeader(
+ path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
} else {
// someone else has leadership. will retry after sometime.
- retry(path);
+ retryLock(path);
}
} else {
- if (lockMap.putIfAbsent(path, localNodeId) == null) {
+ if (leaderMap.putIfAbsent(path, localNodeId) == null) {
log.info("Assumed leadership for {}", path);
// do a get again to get the version (epoch)
- Versioned<NodeId> newLeader = lockMap.get(path);
- notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
+ Versioned<NodeId> newLeader = leaderMap.get(path);
+ // FIXME: candidates can get out of sync
+ notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
} else {
// someone beat us to it.
- retry(path);
+ retryLock(path);
}
}
} catch (Exception e) {
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
- retry(path);
+ retryLock(path);
}
}
- private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
- Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
+ private void notifyNewLeader(String path, NodeId leader,
+ List<NodeId> candidates, long epoch, long electedTime) {
+ Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
boolean updatedLeader = false;
+ log.debug("candidates for new Leadership {}", candidates);
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
if (currentLeader == null || currentLeader.epoch() < epoch) {
+ log.debug("updating leaderboard with new {}", newLeadership);
leaderBoard.put(path, newLeadership);
updatedLeader = true;
}
@@ -256,17 +337,23 @@
if (updatedLeader) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
- eventDispatcher.post(event);
- clusterCommunicator.broadcast(
- new ClusterMessage(
- clusterService.getLocalNode().id(),
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER.encode(event)));
+ notifyPeers(event);
}
}
+ private void notifyPeers(LeadershipEvent event) {
+ eventDispatcher.post(event);
+ clusterCommunicator.broadcast(
+ new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER.encode(event)));
+ }
+
private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
- Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
+ Versioned<List<NodeId>> candidates = candidateMap.get(path);
+ Leadership oldLeadership = new Leadership(
+ path, leader, candidates.value(), epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
@@ -316,6 +403,11 @@
leaderBoard.remove(topic);
updateAccepted = true;
}
+ } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+ if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
+ leaderBoard.replace(topic, leadershipUpdate);
+ updateAccepted = true;
+ }
} else {
throw new IllegalStateException("Unknown event type.");
}
@@ -326,44 +418,47 @@
}
}
- private void retry(String path) {
+ private void rerunForLeadership(String path) {
+ retryLeaderLockExecutor.schedule(
+ () -> runForLeadership(path),
+ ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
+ TimeUnit.SECONDS);
+ }
+
+ private void retryLock(String path) {
retryLeaderLockExecutor.schedule(
() -> tryLeaderLock(path),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
+ private void retryWithdraw(String path) {
+ retryLeaderLockExecutor.schedule(
+ () -> withdraw(path),
+ DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
+ TimeUnit.SECONDS);
+ }
+
private void purgeStaleLocks() {
try {
- Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
- entries.forEach(entry -> {
- String path = entry.getKey();
- NodeId nodeId = entry.getValue().value();
- long epoch = entry.getValue().version();
- long creationTime = entry.getValue().creationTime();
- if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
- log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
+ leaderMap.entrySet()
+ .stream()
+ .filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
+ .filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
+ .forEach(entry -> {
+ String path = entry.getKey();
+ NodeId nodeId = entry.getValue().value();
+ long epoch = entry.getValue().version();
+ long creationTime = entry.getValue().creationTime();
try {
- if (lockMap.remove(path, epoch)) {
+ if (leaderMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
}
- }
- if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
- log.debug("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
- try {
- if (lockMap.remove(path, epoch)) {
- log.info("Purged stale lock held by {} for {}", nodeId, path);
- notifyRemovedLeader(path, nodeId, epoch, creationTime);
- }
- } catch (Exception e) {
- log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
- }
- }
- });
+ });
} catch (Exception e) {
log.debug("Failed cleaning up stale locks", e);
}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 35654f2..0cf0625 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -17,6 +17,7 @@
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -108,4 +109,14 @@
public void removeListener(LeadershipEventListener listener) {
listeners.remove(listener);
}
+
+ @Override
+ public Map<String, List<NodeId>> getCandidates() {
+ return null;
+ }
+
+ @Override
+ public List<NodeId> getCandidates(String path) {
+ return null;
+ }
}