Create local storage for topic candidates mapping. This also includes:
- using Optional in Leadership, and some commenting.
- using MutableBooleans + compute()
part of: Device Mastership store on top of LeadershipService
Reference: ONOS-76
Conflicts:
core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
Change-Id: I7f090abb123cf23bb5126a935a6e72be00f3e3ce
diff --git a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
index 07b5b2b..bd5ba3a 100644
--- a/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/LeaderCommand.java
@@ -85,7 +85,7 @@
}
private void displayCandidates(Map<String, Leadership> leaderBoard,
- Map<String, List<NodeId>> candidates) {
+ Map<String, Leadership> candidates) {
print("--------------------------------------------------------------");
print(FMT_C, "Topic", "Leader", "Candidates");
print("--------------------------------------------------------------");
@@ -94,13 +94,13 @@
.stream()
.sorted(leadershipComparator)
.forEach(l -> {
- List<NodeId> list = candidates.get(l.topic());
+ List<NodeId> list = candidates.get(l.topic()).candidates();
print(FMT_C,
l.topic(),
l.leader(),
- list.remove(0).toString());
+ list.get(0).toString());
// formatting hacks to get it into a table
- list.forEach(n -> print(FMT_C, " ", " ", n));
+ list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
print(FMT_C, " ", " ", " ");
});
print("--------------------------------------------------------------");
@@ -139,7 +139,7 @@
print("%s", json(leaderBoard));
} else {
if (showCandidates) {
- Map<String, List<NodeId>> candidates = leaderService.getCandidates();
+ Map<String, Leadership> candidates = leaderService.getCandidates();
displayCandidates(leaderBoard, candidates);
} else {
displayLeaders(leaderBoard);
diff --git a/core/api/src/main/java/org/onosproject/cluster/Leadership.java b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
index a0c5e70..175de2d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Leadership.java
@@ -17,6 +17,7 @@
import java.util.Objects;
import java.util.List;
+import java.util.Optional;
import org.joda.time.DateTime;
@@ -33,19 +34,21 @@
* rest in decreasing preference order.</li>
* <li>The epoch is the logical age of a Leadership construct, and should be
* used for comparing two Leaderships, but only of the same topic.</li>
+ * <li>The leader may be null if its accuracy can't be guaranteed. This applies
+ * to CANDIDATES_CHANGED events and candidate board contents.</li>
* </ul>
*/
public class Leadership {
private final String topic;
- private final NodeId leader;
+ private final Optional<NodeId> leader;
private final List<NodeId> candidates;
private final long epoch;
private final long electedTime;
public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
this.topic = topic;
- this.leader = leader;
+ this.leader = Optional.of(leader);
this.candidates = ImmutableList.of(leader);
this.epoch = epoch;
this.electedTime = electedTime;
@@ -54,7 +57,16 @@
public Leadership(String topic, NodeId leader, List<NodeId> candidates,
long epoch, long electedTime) {
this.topic = topic;
- this.leader = leader;
+ this.leader = Optional.of(leader);
+ this.candidates = ImmutableList.copyOf(candidates);
+ this.epoch = epoch;
+ this.electedTime = electedTime;
+ }
+
+ public Leadership(String topic, List<NodeId> candidates,
+ long epoch, long electedTime) {
+ this.topic = topic;
+ this.leader = Optional.empty();
this.candidates = ImmutableList.copyOf(candidates);
this.epoch = epoch;
this.electedTime = electedTime;
@@ -74,8 +86,9 @@
*
* @return leader node.
*/
+ // This will return Optional<NodeId> in the future.
public NodeId leader() {
- return leader;
+ return leader.orElse(null);
}
/**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index c4f59be..3456e22 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -43,14 +43,14 @@
LEADER_REELECTED,
/**
- * Signifies that the leader has been booted and lost leadership. The event subject is the
- * former leader.
+ * Signifies that the leader has been booted and lost leadership. The
+ * event subject is the former leader.
*/
LEADER_BOOTED,
/**
* Signifies that the list of candidates for leadership for a topic has
- * changed.
+ * changed. This event does not guarantee accurate leader information.
*/
CANDIDATES_CHANGED
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 1acdc61..bc490d9 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -76,9 +76,9 @@
/**
* Returns the candidates for all known topics.
*
- * @return A map of topics to lists of NodeIds.
+ * @return A mapping from topics to up-to-date candidate info.
*/
- Map<String, List<NodeId>> getCandidates();
+ Map<String, Leadership> getCandidates();
/**
* Returns the candidates for a given topic.
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index 02742d4..3c503bb 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -65,7 +65,7 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
+ public Map<String, Leadership> getCandidates() {
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index 10bf6a4..1d918c8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -576,7 +576,7 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
+ public Map<String, Leadership> getCandidates() {
return null;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index cf3700b..303129d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -12,6 +12,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
@@ -88,6 +89,7 @@
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+ private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
@@ -164,16 +166,14 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
- Map<String, List<NodeId>> candidates = Maps.newHashMap();
- candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
- return ImmutableMap.copyOf(candidates);
+ public Map<String, Leadership> getCandidates() {
+ return ImmutableMap.copyOf(candidateBoard);
}
@Override
public List<NodeId> getCandidates(String path) {
- Versioned<List<NodeId>> candidates = candidateMap.get(path);
- return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
+ Leadership current = candidateBoard.get(path);
+ return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
}
@Override
@@ -207,13 +207,21 @@
List<NodeId> candidateList = Lists.newArrayList(candidates.value());
if (!candidateList.contains(localNodeId)) {
candidateList.add(localNodeId);
- if (!candidateMap.replace(path, candidates.version(), candidateList)) {
+ if (candidateMap.replace(path, candidates.version(), candidateList)) {
+ Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ notifyCandidateAdded(
+ path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ } else {
rerunForLeadership(path);
return;
}
}
} else {
- if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
+ List<NodeId> candidateList = ImmutableList.of(localNodeId);
+ if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
+ Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
+ } else {
rerunForLeadership(path);
return;
}
@@ -247,10 +255,19 @@
if (!candidateList.remove(localNodeId)) {
return;
}
- boolean success = candidateList.isEmpty()
- ? candidateMap.remove(path, candidates.version())
- : candidateMap.replace(path, candidates.version(), candidateList);
- if (!success) {
+ boolean success = false;
+ if (candidateList.isEmpty()) {
+ if (candidateMap.remove(path, candidates.version())) {
+ success = true;
+ }
+ } else {
+ if (candidateMap.replace(path, candidates.version(), candidateList)) {
+ success = true;
+ }
+ }
+ if (success) {
+ notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
+ } else {
log.warn("Failed to withdraw from candidates list. Will retry");
retryWithdraw(path);
}
@@ -321,21 +338,63 @@
}
}
+ private void notifyCandidateAdded(
+ String path, List<NodeId> candidates, long epoch, long electedTime) {
+ Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+ final MutableBoolean updated = new MutableBoolean(false);
+ candidateBoard.compute(path, (k, current) -> {
+ if (current == null || current.epoch() < newInfo.epoch()) {
+ log.info("updating candidateboard with {}", newInfo);
+ updated.setTrue();
+ return newInfo;
+ }
+ return current;
+ });
+ // maybe rethink types of candidates events
+ if (updated.booleanValue()) {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+ notifyPeers(event);
+ }
+ }
+
+ private void notifyCandidateRemoved(
+ String path, List<NodeId> candidates, long epoch, long electedTime) {
+ Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
+ final MutableBoolean updated = new MutableBoolean(false);
+ candidateBoard.compute(path, (k, current) -> {
+ if (current != null && current.epoch() == newInfo.epoch()) {
+ log.info("updating candidateboard with {}", newInfo);
+ updated.setTrue();
+ if (candidates.isEmpty()) {
+ return null;
+ } else {
+ return newInfo;
+ }
+ }
+ return current;
+ });
+ // maybe rethink types of candidates events
+ if (updated.booleanValue()) {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
+ notifyPeers(event);
+ }
+ }
+
private void notifyNewLeader(String path, NodeId leader,
List<NodeId> candidates, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
- boolean updatedLeader = false;
+ final MutableBoolean updatedLeader = new MutableBoolean(false);
log.debug("candidates for new Leadership {}", candidates);
- synchronized (leaderBoard) {
- Leadership currentLeader = leaderBoard.get(path);
+ leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader == null || currentLeader.epoch() < epoch) {
log.debug("updating leaderboard with new {}", newLeadership);
- leaderBoard.put(path, newLeadership);
- updatedLeader = true;
+ updatedLeader.setTrue();
+ return newLeadership;
}
- }
+ return currentLeader;
+ });
- if (updatedLeader) {
+ if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
notifyPeers(event);
}
@@ -352,21 +411,18 @@
Versioned<List<NodeId>> candidates = candidateMap.get(path);
Leadership oldLeadership = new Leadership(
path, leader, candidates.value(), epoch, electedTime);
- boolean updatedLeader = false;
- synchronized (leaderBoard) {
- Leadership currentLeader = leaderBoard.get(path);
+ final MutableBoolean updatedLeader = new MutableBoolean(false);
+ leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
- leaderBoard.remove(path);
- updatedLeader = true;
+ updatedLeader.setTrue();
+ return null;
}
- }
+ return currentLeader;
+ });
- if (updatedLeader) {
+ if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
- eventDispatcher.post(event);
- clusterCommunicator.broadcast(event,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
+ notifyPeers(event);
}
}
@@ -385,31 +441,37 @@
LeadershipEvent.Type eventType = leadershipEvent.type();
String topic = leadershipUpdate.topic();
- boolean updateAccepted = false;
-
- synchronized (leaderBoard) {
- Leadership currentLeadership = leaderBoard.get(topic);
- if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+ MutableBoolean updateAccepted = new MutableBoolean(false);
+ if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
- leaderBoard.put(topic, leadershipUpdate);
- updateAccepted = true;
+ updateAccepted.setTrue();
+ return leadershipUpdate;
}
- } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
- if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
- leaderBoard.remove(topic);
- updateAccepted = true;
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+ leaderBoard.compute(topic, (k, currentLeadership) -> {
+ if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+ updateAccepted.setTrue();
+ return null;
}
- } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
- if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
- leaderBoard.replace(topic, leadershipUpdate);
- updateAccepted = true;
+ return currentLeadership;
+ });
+ } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+ candidateBoard.compute(topic, (k, currentInfo) -> {
+ if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
+ updateAccepted.setTrue();
+ return leadershipUpdate;
}
- } else {
- throw new IllegalStateException("Unknown event type.");
- }
- if (updateAccepted) {
- eventDispatcher.post(leadershipEvent);
- }
+ return currentInfo;
+ });
+ } else {
+ throw new IllegalStateException("Unknown event type.");
+ }
+
+ if (updateAccepted.booleanValue()) {
+ eventDispatcher.post(leadershipEvent);
}
}
}
@@ -470,6 +532,12 @@
SERIALIZER::encode);
}
});
+ candidateBoard.forEach((path, leadership) -> {
+ LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
+ clusterCommunicator.broadcast(event,
+ LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+ SERIALIZER::encode);
+ });
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 0cf0625..97e9f24 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -111,7 +111,7 @@
}
@Override
- public Map<String, List<NodeId>> getCandidates() {
+ public Map<String, Leadership> getCandidates() {
return null;
}